module Control.Eff.Concurrent.Api.Observer.Queue
( ObservationQueue()
, ObservationQueueReader
, readObservationQueue
, tryReadObservationQueue
, flushObservationQueue
, withObservationQueue
, spawnLinkObservationQueueWriter
)
where
import Control.Concurrent.STM
import Control.DeepSeq (NFData)
import Control.Eff
import Control.Eff.Concurrent.Api
import Control.Eff.Concurrent.Api.Observer
import Control.Eff.Concurrent.Api.Server
import Control.Eff.Concurrent.Process
import Control.Eff.ExceptionExtra ( )
import Control.Eff.Log
import Control.Eff.Reader.Strict
import Control.Exception.Safe as Safe
import Control.Monad.IO.Class
import Control.Monad ( unless )
import Data.Typeable
import qualified Data.Text as T
import GHC.Stack
newtype ObservationQueue a = ObservationQueue (TBQueue a)
type ObservationQueueReader a = Reader (ObservationQueue a)
logPrefix :: forall o proxy . (HasCallStack, Typeable o) => proxy o -> T.Text
logPrefix px = "observation queue: " <> T.pack (show (typeRep px))
readObservationQueue
:: forall o r
. ( Member (ObservationQueueReader o) r
, HasCallStack
, MonadIO (Eff r)
, Typeable o
, Member Logs r
)
=> Eff r o
readObservationQueue = do
ObservationQueue q <- ask @(ObservationQueue o)
liftIO (atomically (readTBQueue q))
tryReadObservationQueue
:: forall o r
. ( Member (ObservationQueueReader o) r
, HasCallStack
, MonadIO (Eff r)
, Typeable o
, Member Logs r
)
=> Eff r (Maybe o)
tryReadObservationQueue = do
ObservationQueue q <- ask @(ObservationQueue o)
liftIO (atomically (tryReadTBQueue q))
flushObservationQueue
:: forall o r
. ( Member (ObservationQueueReader o) r
, HasCallStack
, MonadIO (Eff r)
, Typeable o
, Member Logs r
)
=> Eff r [o]
flushObservationQueue = do
ObservationQueue q <- ask @(ObservationQueue o)
liftIO (atomically (flushTBQueue q))
withObservationQueue
:: forall o b e len
. ( HasCallStack
, Typeable o
, Show o
, Member Logs e
, Lifted IO e
, Integral len
, Member Interrupts e
)
=> len
-> Eff (ObservationQueueReader o ': e) b
-> Eff e b
withObservationQueue queueLimit e = do
q <- lift (newTBQueueIO (fromIntegral queueLimit))
res <- handleInterrupts (return . Left)
(Right <$> runReader (ObservationQueue q) e)
rest <- lift (atomically (flushTBQueue q))
unless
(null rest)
(logNotice (logPrefix (Proxy @o) <> " unread observations: " <> T.pack (show rest)))
either (\em -> logError (T.pack (show em)) >> lift (throwIO em)) return res
spawnLinkObservationQueueWriter
:: forall o q
. ( Typeable o
, Show o
, NFData o
, NFData (Api (Observer o) 'Asynchronous)
, Member Logs q
, Lifted IO q
, HasCallStack)
=> ObservationQueue o
-> Eff (InterruptableProcess q) (Observer o)
spawnLinkObservationQueueWriter (ObservationQueue q) = do
cbo <- spawnLinkApiServer
(handleObservations
(\case
o -> do
lift (atomically (writeTBQueue q o))
pure AwaitNext
)
)
stopServerOnInterrupt
pure (toObserver cbo)