{-# options_haddock prune #-}
module Polysemy.Conc.Interpreter.Queue.TBM where
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMQueue (
TBMQueue,
closeTBMQueue,
isClosedTBMQueue,
newTBMQueueIO,
peekTBMQueue,
readTBMQueue,
tryPeekTBMQueue,
tryReadTBMQueue,
tryWriteTBMQueue,
writeTBMQueue,
)
import qualified Polysemy.Conc.Effect.Queue as Queue
import Polysemy.Conc.Effect.Queue (Queue)
import Polysemy.Conc.Effect.Race (Race)
import Polysemy.Conc.Queue.Result (closedBoolResult, closedNaResult, closedResult)
import Polysemy.Conc.Queue.Timeout (withTimeout)
interpretQueueTBMWith ::
∀ d r .
Members [Race, Embed IO] r =>
TBMQueue d ->
InterpreterFor (Queue d) r
interpretQueueTBMWith :: forall d (r :: EffectRow).
Members '[Race, Embed IO] r =>
TBMQueue d -> InterpreterFor (Queue d) r
interpretQueueTBMWith TBMQueue d
queue =
(forall (rInitial :: EffectRow) x.
Queue d (Sem rInitial) x -> Sem r x)
-> Sem (Queue d : r) a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
FirstOrder e "interpret" =>
(forall (rInitial :: EffectRow) x. e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret \case
Queue d (Sem rInitial) x
Queue.Read ->
IO (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult d) -> IO (QueueResult d)
forall a. STM a -> IO a
atomically (Maybe d -> QueueResult d
forall d. Maybe d -> QueueResult d
closedResult (Maybe d -> QueueResult d) -> STM (Maybe d) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> STM (Maybe d)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue d
queue))
Queue d (Sem rInitial) x
Queue.TryRead ->
IO (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult d) -> IO (QueueResult d)
forall a. STM a -> IO a
atomically (Maybe (Maybe d) -> QueueResult d
forall d. Maybe (Maybe d) -> QueueResult d
closedNaResult (Maybe (Maybe d) -> QueueResult d)
-> STM (Maybe (Maybe d)) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> STM (Maybe (Maybe d))
forall a. TBMQueue a -> STM (Maybe (Maybe a))
tryReadTBMQueue TBMQueue d
queue))
Queue.ReadTimeout t
timeout ->
t -> STM (Maybe d) -> Sem r (QueueResult d)
forall t (r :: EffectRow) d.
(TimeUnit t, Members '[Race, Embed IO] r) =>
t -> STM (Maybe d) -> Sem r (QueueResult d)
withTimeout t
timeout (TBMQueue d -> STM (Maybe d)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue d
queue)
Queue d (Sem rInitial) x
Queue.Peek ->
IO (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult d) -> IO (QueueResult d)
forall a. STM a -> IO a
atomically (Maybe d -> QueueResult d
forall d. Maybe d -> QueueResult d
closedResult (Maybe d -> QueueResult d) -> STM (Maybe d) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> STM (Maybe d)
forall a. TBMQueue a -> STM (Maybe a)
peekTBMQueue TBMQueue d
queue))
Queue d (Sem rInitial) x
Queue.TryPeek ->
IO (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult d) -> IO (QueueResult d)
forall a. STM a -> IO a
atomically (Maybe (Maybe d) -> QueueResult d
forall d. Maybe (Maybe d) -> QueueResult d
closedNaResult (Maybe (Maybe d) -> QueueResult d)
-> STM (Maybe (Maybe d)) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> STM (Maybe (Maybe d))
forall a. TBMQueue a -> STM (Maybe (Maybe a))
tryPeekTBMQueue TBMQueue d
queue))
Queue.Write d
d ->
IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM () -> IO ()
forall a. STM a -> IO a
atomically (TBMQueue d -> d -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue d
queue d
d))
Queue.TryWrite d
d ->
IO (QueueResult ()) -> Sem r (QueueResult ())
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult ()) -> IO (QueueResult ())
forall a. STM a -> IO a
atomically (Maybe Bool -> QueueResult ()
closedBoolResult (Maybe Bool -> QueueResult ())
-> STM (Maybe Bool) -> STM (QueueResult ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> d -> STM (Maybe Bool)
forall a. TBMQueue a -> a -> STM (Maybe Bool)
tryWriteTBMQueue TBMQueue d
queue d
d))
Queue.WriteTimeout t
timeout d
d ->
t -> STM (Maybe ()) -> Sem r (QueueResult ())
forall t (r :: EffectRow) d.
(TimeUnit t, Members '[Race, Embed IO] r) =>
t -> STM (Maybe d) -> Sem r (QueueResult d)
withTimeout t
timeout do
STM Bool -> STM (Maybe ()) -> STM (Maybe ()) -> STM (Maybe ())
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM (TBMQueue d -> STM Bool
forall a. TBMQueue a -> STM Bool
isClosedTBMQueue TBMQueue d
queue) (Maybe () -> STM (Maybe ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ()
forall a. Maybe a
Nothing) (() -> Maybe ()
forall a. a -> Maybe a
Just (() -> Maybe ()) -> STM () -> STM (Maybe ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBMQueue d -> d -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue d
queue d
d)
Queue d (Sem rInitial) x
Queue.Closed ->
IO Bool -> Sem r Bool
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (TBMQueue d -> STM Bool
forall a. TBMQueue a -> STM Bool
isClosedTBMQueue TBMQueue d
queue))
Queue d (Sem rInitial) x
Queue.Close ->
IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM () -> IO ()
forall a. STM a -> IO a
atomically (TBMQueue d -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue d
queue))
{-# inline interpretQueueTBMWith #-}
withTBMQueue ::
∀ d r a .
Members [Resource, Embed IO] r =>
Int ->
(TBMQueue d -> Sem r a) ->
Sem r a
withTBMQueue :: forall d (r :: EffectRow) a.
Members '[Resource, Embed IO] r =>
Int -> (TBMQueue d -> Sem r a) -> Sem r a
withTBMQueue Int
maxQueued =
Sem r (TBMQueue d)
-> (TBMQueue d -> Sem r ()) -> (TBMQueue d -> Sem r a) -> Sem r a
forall (r :: EffectRow) a c b.
Member Resource r =>
Sem r a -> (a -> Sem r c) -> (a -> Sem r b) -> Sem r b
bracket (IO (TBMQueue d) -> Sem r (TBMQueue d)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (Int -> IO (TBMQueue d)
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
maxQueued)) (IO () -> Sem r ()
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (IO () -> Sem r ())
-> (TBMQueue d -> IO ()) -> TBMQueue d -> Sem r ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (TBMQueue d -> STM ()) -> TBMQueue d -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TBMQueue d -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue)
interpretQueueTBM ::
∀ d r .
Members [Resource, Race, Embed IO] r =>
Int ->
InterpreterFor (Queue d) r
interpretQueueTBM :: forall d (r :: EffectRow).
Members '[Resource, Race, Embed IO] r =>
Int -> InterpreterFor (Queue d) r
interpretQueueTBM Int
maxQueued Sem (Queue d : r) a
sem = do
Int -> (TBMQueue d -> Sem r a) -> Sem r a
forall d (r :: EffectRow) a.
Members '[Resource, Embed IO] r =>
Int -> (TBMQueue d -> Sem r a) -> Sem r a
withTBMQueue Int
maxQueued \ TBMQueue d
queue ->
TBMQueue d -> InterpreterFor (Queue d) r
forall d (r :: EffectRow).
Members '[Race, Embed IO] r =>
TBMQueue d -> InterpreterFor (Queue d) r
interpretQueueTBMWith TBMQueue d
queue Sem (Queue d : r) a
sem
{-# inline interpretQueueTBM #-}