{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
{-# OPTIONS_GHC -fno-warn-type-defaults #-}
module Box.Queue
( Queue (..),
queueL,
queueR,
queue,
fromAction,
emitQ,
commitQ,
)
where
import Box.Box
import Box.Codensity
import Box.Committer
import Box.Emitter
import Box.Functor
import Control.Applicative
import Control.Concurrent.Classy.Async as C
import Control.Concurrent.Classy.STM as C
import Control.Monad.Catch as C
import Control.Monad.Conc.Class as C
import Prelude
data Queue a
= Unbounded
| Bounded Int
| Single
| Latest a
| Newest Int
| New
ends :: MonadSTM stm => Queue a -> stm (a -> stm (), stm a)
ends :: Queue a -> stm (a -> stm (), stm a)
ends Queue a
qu =
case Queue a
qu of
Bounded Int
n -> do
TBQueue stm a
q <- Natural -> stm (TBQueue stm a)
forall (stm :: * -> *) a.
MonadSTM stm =>
Natural -> stm (TBQueue stm a)
newTBQueue (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
(a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TBQueue stm a -> a -> stm ()
forall (stm :: * -> *) a.
MonadSTM stm =>
TBQueue stm a -> a -> stm ()
writeTBQueue TBQueue stm a
q, TBQueue stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TBQueue stm a -> stm a
readTBQueue TBQueue stm a
q)
Queue a
Unbounded -> do
TQueue stm a
q <- stm (TQueue stm a)
forall (stm :: * -> *) a. MonadSTM stm => stm (TQueue stm a)
newTQueue
(a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TQueue stm a -> a -> stm ()
forall (stm :: * -> *) a.
MonadSTM stm =>
TQueue stm a -> a -> stm ()
writeTQueue TQueue stm a
q, TQueue stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TQueue stm a -> stm a
readTQueue TQueue stm a
q)
Queue a
Single -> do
TMVar stm a
m <- stm (TMVar stm a)
forall (stm :: * -> *) a. MonadSTM stm => stm (TMVar stm a)
newEmptyTMVar
(a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TMVar stm a -> a -> stm ()
forall (stm :: * -> *) a.
MonadSTM stm =>
TMVar stm a -> a -> stm ()
putTMVar TMVar stm a
m, TMVar stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TMVar stm a -> stm a
takeTMVar TMVar stm a
m)
Latest a
a -> do
TVar stm a
t <- a -> stm (TVar stm a)
forall (stm :: * -> *) a. MonadSTM stm => a -> stm (TVar stm a)
newTVar a
a
(a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TVar stm a -> a -> stm ()
forall (stm :: * -> *) a. MonadSTM stm => TVar stm a -> a -> stm ()
writeTVar TVar stm a
t, TVar stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TVar stm a -> stm a
readTVar TVar stm a
t)
Queue a
New -> do
TMVar stm a
m <- stm (TMVar stm a)
forall (stm :: * -> *) a. MonadSTM stm => stm (TMVar stm a)
newEmptyTMVar
(a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (\a
x -> TMVar stm a -> stm (Maybe a)
forall (stm :: * -> *) a.
MonadSTM stm =>
TMVar stm a -> stm (Maybe a)
tryTakeTMVar TMVar stm a
m stm (Maybe a) -> stm () -> stm ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> TMVar stm a -> a -> stm ()
forall (stm :: * -> *) a.
MonadSTM stm =>
TMVar stm a -> a -> stm ()
putTMVar TMVar stm a
m a
x, TMVar stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TMVar stm a -> stm a
takeTMVar TMVar stm a
m)
Newest Int
n -> do
TBQueue stm a
q <- Natural -> stm (TBQueue stm a)
forall (stm :: * -> *) a.
MonadSTM stm =>
Natural -> stm (TBQueue stm a)
newTBQueue (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
let write :: a -> stm ()
write a
x = TBQueue stm a -> a -> stm ()
forall (stm :: * -> *) a.
MonadSTM stm =>
TBQueue stm a -> a -> stm ()
writeTBQueue TBQueue stm a
q a
x stm () -> stm () -> stm ()
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (TBQueue stm a -> stm (Maybe a)
forall (stm :: * -> *) a.
MonadSTM stm =>
TBQueue stm a -> stm (Maybe a)
tryReadTBQueue TBQueue stm a
q stm (Maybe a) -> stm () -> stm ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> a -> stm ()
write a
x)
(a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> stm ()
write, TBQueue stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TBQueue stm a -> stm a
readTBQueue TBQueue stm a
q)
writeCheck :: (MonadSTM stm) => TVar stm Bool -> (a -> stm ()) -> a -> stm Bool
writeCheck :: TVar stm Bool -> (a -> stm ()) -> a -> stm Bool
writeCheck TVar stm Bool
sealed a -> stm ()
i a
a = do
Bool
b <- TVar stm Bool -> stm Bool
forall (stm :: * -> *) a. MonadSTM stm => TVar stm a -> stm a
readTVar TVar stm Bool
sealed
if Bool
b
then Bool -> stm Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
else do
a -> stm ()
i a
a
Bool -> stm Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
readCheck :: MonadSTM stm => TVar stm Bool -> stm a -> stm (Maybe a)
readCheck :: TVar stm Bool -> stm a -> stm (Maybe a)
readCheck TVar stm Bool
sealed stm a
o =
(a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> stm a -> stm (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> stm a
o)
stm (Maybe a) -> stm (Maybe a) -> stm (Maybe a)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ( do
Bool
b <- TVar stm Bool -> stm Bool
forall (stm :: * -> *) a. MonadSTM stm => TVar stm a -> stm a
readTVar TVar stm Bool
sealed
Bool -> stm ()
forall (stm :: * -> *). MonadSTM stm => Bool -> stm ()
C.check Bool
b
Maybe a -> stm (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing
)
toBoxSTM ::
(MonadSTM stm) =>
Queue a ->
stm (Box stm a a, stm ())
toBoxSTM :: Queue a -> stm (Box stm a a, stm ())
toBoxSTM Queue a
q = do
(a -> stm ()
i, stm a
o) <- Queue a -> stm (a -> stm (), stm a)
forall (stm :: * -> *) a.
MonadSTM stm =>
Queue a -> stm (a -> stm (), stm a)
ends Queue a
q
TVar stm Bool
sealed <- String -> Bool -> stm (TVar stm Bool)
forall (stm :: * -> *) a.
MonadSTM stm =>
String -> a -> stm (TVar stm a)
newTVarN String
"sealed" Bool
False
let seal :: stm ()
seal = TVar stm Bool -> Bool -> stm ()
forall (stm :: * -> *) a. MonadSTM stm => TVar stm a -> a -> stm ()
writeTVar TVar stm Bool
sealed Bool
True
(Box stm a a, stm ()) -> stm (Box stm a a, stm ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure
( Committer stm a -> Emitter stm a -> Box stm a a
forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box
((a -> stm Bool) -> Committer stm a
forall (m :: * -> *) a. (a -> m Bool) -> Committer m a
Committer (TVar stm Bool -> (a -> stm ()) -> a -> stm Bool
forall (stm :: * -> *) a.
MonadSTM stm =>
TVar stm Bool -> (a -> stm ()) -> a -> stm Bool
writeCheck TVar stm Bool
sealed a -> stm ()
i))
(stm (Maybe a) -> Emitter stm a
forall (m :: * -> *) a. m (Maybe a) -> Emitter m a
Emitter (TVar stm Bool -> stm a -> stm (Maybe a)
forall (stm :: * -> *) a.
MonadSTM stm =>
TVar stm Bool -> stm a -> stm (Maybe a)
readCheck TVar stm Bool
sealed stm a
o)),
stm ()
seal
)
toBoxM ::
(MonadConc m) =>
Queue a ->
m (Box m a a, m ())
toBoxM :: Queue a -> m (Box m a a, m ())
toBoxM Queue a
q = do
(Box (STM m) a a
b, STM m ()
s) <- STM m (Box (STM m) a a, STM m ()) -> m (Box (STM m) a a, STM m ())
forall (m :: * -> *) a. MonadConc m => STM m a -> m a
atomically (STM m (Box (STM m) a a, STM m ())
-> m (Box (STM m) a a, STM m ()))
-> STM m (Box (STM m) a a, STM m ())
-> m (Box (STM m) a a, STM m ())
forall a b. (a -> b) -> a -> b
$ Queue a -> STM m (Box (STM m) a a, STM m ())
forall (stm :: * -> *) a.
MonadSTM stm =>
Queue a -> stm (Box stm a a, stm ())
toBoxSTM Queue a
q
(Box m a a, m ()) -> m (Box m a a, m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Box (STM m) a a -> Box m a a
forall (m :: * -> *) a b.
MonadConc m =>
Box (STM m) a b -> Box m a b
liftB Box (STM m) a a
b, STM m () -> m ()
forall (m :: * -> *) a. MonadConc m => STM m a -> m a
atomically STM m ()
s)
concurrentlyLeft :: MonadConc m => m a -> m b -> m a
concurrentlyLeft :: m a -> m b -> m a
concurrentlyLeft m a
left m b
right =
m a -> (Async m a -> m a) -> m a
forall (m :: * -> *) a b.
MonadConc m =>
m a -> (Async m a -> m b) -> m b
C.withAsync m a
left ((Async m a -> m a) -> m a) -> (Async m a -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async m a
a ->
m b -> (Async m b -> m a) -> m a
forall (m :: * -> *) a b.
MonadConc m =>
m a -> (Async m a -> m b) -> m b
C.withAsync m b
right ((Async m b -> m a) -> m a) -> (Async m b -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async m b
_ ->
Async m a -> m a
forall (m :: * -> *) a. MonadConc m => Async m a -> m a
C.wait Async m a
a
concurrentlyRight :: MonadConc m => m a -> m b -> m b
concurrentlyRight :: m a -> m b -> m b
concurrentlyRight m a
left m b
right =
m a -> (Async m a -> m b) -> m b
forall (m :: * -> *) a b.
MonadConc m =>
m a -> (Async m a -> m b) -> m b
C.withAsync m a
left ((Async m a -> m b) -> m b) -> (Async m a -> m b) -> m b
forall a b. (a -> b) -> a -> b
$ \Async m a
_ ->
m b -> (Async m b -> m b) -> m b
forall (m :: * -> *) a b.
MonadConc m =>
m a -> (Async m a -> m b) -> m b
C.withAsync m b
right ((Async m b -> m b) -> m b) -> (Async m b -> m b) -> m b
forall a b. (a -> b) -> a -> b
$ \Async m b
b ->
Async m b -> m b
forall (m :: * -> *) a. MonadConc m => Async m a -> m a
C.wait Async m b
b
withQL ::
(MonadConc m) =>
Queue a ->
(Queue a -> m (Box m a a, m ())) ->
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m l
withQL :: Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m l
withQL Queue a
q Queue a -> m (Box m a a, m ())
spawner Committer m a -> m l
cio Emitter m a -> m r
eio =
m (Box m a a, m ())
-> ((Box m a a, m ()) -> m ()) -> ((Box m a a, m ()) -> m l) -> m l
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
C.bracket
(Queue a -> m (Box m a a, m ())
spawner Queue a
q)
(Box m a a, m ()) -> m ()
forall a b. (a, b) -> b
snd
( \(Box m a a
box, m ()
seal) ->
m l -> m r -> m l
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m a
concurrentlyLeft
(Committer m a -> m l
cio (Box m a a -> Committer m a
forall (m :: * -> *) c e. Box m c e -> Committer m c
committer Box m a a
box) m l -> m () -> m l
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`C.finally` m ()
seal)
(Emitter m a -> m r
eio (Box m a a -> Emitter m a
forall (m :: * -> *) c e. Box m c e -> Emitter m e
emitter Box m a a
box) m r -> m () -> m r
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`C.finally` m ()
seal)
)
withQR ::
(MonadConc m) =>
Queue a ->
(Queue a -> m (Box m a a, m ())) ->
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m r
withQR :: Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m r
withQR Queue a
q Queue a -> m (Box m a a, m ())
spawner Committer m a -> m l
cio Emitter m a -> m r
eio =
m (Box m a a, m ())
-> ((Box m a a, m ()) -> m ()) -> ((Box m a a, m ()) -> m r) -> m r
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
C.bracket
(Queue a -> m (Box m a a, m ())
spawner Queue a
q)
(Box m a a, m ()) -> m ()
forall a b. (a, b) -> b
snd
( \(Box m a a
box, m ()
seal) ->
m l -> m r -> m r
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m b
concurrentlyRight
(Committer m a -> m l
cio (Box m a a -> Committer m a
forall (m :: * -> *) c e. Box m c e -> Committer m c
committer Box m a a
box) m l -> m () -> m l
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`C.finally` m ()
seal)
(Emitter m a -> m r
eio (Box m a a -> Emitter m a
forall (m :: * -> *) c e. Box m c e -> Emitter m e
emitter Box m a a
box) m r -> m () -> m r
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`C.finally` m ()
seal)
)
withQ ::
(MonadConc m) =>
Queue a ->
(Queue a -> m (Box m a a, m ())) ->
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m (l, r)
withQ :: Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m (l, r)
withQ Queue a
q Queue a -> m (Box m a a, m ())
spawner Committer m a -> m l
cio Emitter m a -> m r
eio =
m (Box m a a, m ())
-> ((Box m a a, m ()) -> m ())
-> ((Box m a a, m ()) -> m (l, r))
-> m (l, r)
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
C.bracket
(Queue a -> m (Box m a a, m ())
spawner Queue a
q)
(Box m a a, m ()) -> m ()
forall a b. (a, b) -> b
snd
( \(Box m a a
box, m ()
seal) ->
m l -> m r -> m (l, r)
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m (a, b)
concurrently
(Committer m a -> m l
cio (Box m a a -> Committer m a
forall (m :: * -> *) c e. Box m c e -> Committer m c
committer Box m a a
box) m l -> m () -> m l
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`C.finally` m ()
seal)
(Emitter m a -> m r
eio (Box m a a -> Emitter m a
forall (m :: * -> *) c e. Box m c e -> Emitter m e
emitter Box m a a
box) m r -> m () -> m r
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`C.finally` m ()
seal)
)
queueL ::
(MonadConc m) =>
Queue a ->
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m l
queueL :: Queue a -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueL Queue a
q Committer m a -> m l
cm Emitter m a -> m r
em = Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m l
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m l
withQL Queue a
q Queue a -> m (Box m a a, m ())
forall (m :: * -> *) a.
MonadConc m =>
Queue a -> m (Box m a a, m ())
toBoxM Committer m a -> m l
cm Emitter m a -> m r
em
queueR ::
(MonadConc m) =>
Queue a ->
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m r
queueR :: Queue a -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueR Queue a
q Committer m a -> m l
cm Emitter m a -> m r
em = Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m r
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m r
withQR Queue a
q Queue a -> m (Box m a a, m ())
forall (m :: * -> *) a.
MonadConc m =>
Queue a -> m (Box m a a, m ())
toBoxM Committer m a -> m l
cm Emitter m a -> m r
em
queue ::
(MonadConc m) =>
Queue a ->
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m (l, r)
queue :: Queue a
-> (Committer m a -> m l) -> (Emitter m a -> m r) -> m (l, r)
queue Queue a
q Committer m a -> m l
cm Emitter m a -> m r
em = Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m (l, r)
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m (l, r)
withQ Queue a
q Queue a -> m (Box m a a, m ())
forall (m :: * -> *) a.
MonadConc m =>
Queue a -> m (Box m a a, m ())
toBoxM Committer m a -> m l
cm Emitter m a -> m r
em
liftB :: (MonadConc m) => Box (STM m) a b -> Box m a b
liftB :: Box (STM m) a b -> Box m a b
liftB (Box Committer (STM m) a
c Emitter (STM m) b
e) = Committer m a -> Emitter m b -> Box m a b
forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box ((forall x. STM m x -> m x) -> Committer (STM m) a -> Committer m a
forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall x. STM m x -> m x
forall (m :: * -> *) a. MonadConc m => STM m a -> m a
atomically Committer (STM m) a
c) ((forall x. STM m x -> m x) -> Emitter (STM m) b -> Emitter m b
forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall x. STM m x -> m x
forall (m :: * -> *) a. MonadConc m => STM m a -> m a
atomically Emitter (STM m) b
e)
fromAction :: (MonadConc m) => (Box m a b -> m r) -> CoBox m b a
fromAction :: (Box m a b -> m r) -> CoBox m b a
fromAction Box m a b -> m r
baction = (forall b. (Box m b a -> m b) -> m b) -> CoBox m b a
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Box m b a -> m b) -> m b) -> CoBox m b a)
-> (forall b. (Box m b a -> m b) -> m b) -> CoBox m b a
forall a b. (a -> b) -> a -> b
$ (Box m a b -> m r) -> (Box m b a -> m b) -> m b
forall (m :: * -> *) a b r r'.
MonadConc m =>
(Box m a b -> m r) -> (Box m b a -> m r') -> m r'
fuseActions Box m a b -> m r
baction
fuseActions :: (MonadConc m) => (Box m a b -> m r) -> (Box m b a -> m r') -> m r'
fuseActions :: (Box m a b -> m r) -> (Box m b a -> m r') -> m r'
fuseActions Box m a b -> m r
abm Box m b a -> m r'
bam = do
(Box Committer m a
ca Emitter m a
ea, m ()
_) <- Queue a -> m (Box m a a, m ())
forall (m :: * -> *) a.
MonadConc m =>
Queue a -> m (Box m a a, m ())
toBoxM Queue a
forall a. Queue a
Unbounded
(Box Committer m b
cb Emitter m b
eb, m ()
_) <- Queue b -> m (Box m b b, m ())
forall (m :: * -> *) a.
MonadConc m =>
Queue a -> m (Box m a a, m ())
toBoxM Queue b
forall a. Queue a
Unbounded
m r -> m r' -> m r'
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m b
concurrentlyRight (Box m a b -> m r
abm (Committer m a -> Emitter m b -> Box m a b
forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box Committer m a
ca Emitter m b
eb)) (Box m b a -> m r'
bam (Committer m b -> Emitter m a -> Box m b a
forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box Committer m b
cb Emitter m a
ea))
emitQ :: (MonadConc m) => Queue a -> (Committer m a -> m r) -> CoEmitter m a
emitQ :: Queue a -> (Committer m a -> m r) -> CoEmitter m a
emitQ Queue a
q Committer m a -> m r
cio = (forall b. (Emitter m a -> m b) -> m b) -> CoEmitter m a
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Emitter m a -> m b) -> m b) -> CoEmitter m a)
-> (forall b. (Emitter m a -> m b) -> m b) -> CoEmitter m a
forall a b. (a -> b) -> a -> b
$ \Emitter m a -> m b
eio -> Queue a -> (Committer m a -> m r) -> (Emitter m a -> m b) -> m b
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueR Queue a
q Committer m a -> m r
cio Emitter m a -> m b
eio
commitQ :: (MonadConc m) => Queue a -> (Emitter m a -> m r) -> CoCommitter m a
commitQ :: Queue a -> (Emitter m a -> m r) -> CoCommitter m a
commitQ Queue a
q Emitter m a -> m r
eio = (forall b. (Committer m a -> m b) -> m b) -> CoCommitter m a
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Committer m a -> m b) -> m b) -> CoCommitter m a)
-> (forall b. (Committer m a -> m b) -> m b) -> CoCommitter m a
forall a b. (a -> b) -> a -> b
$ \Committer m a -> m b
cio -> Queue a -> (Committer m a -> m b) -> (Emitter m a -> m r) -> m b
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueL Queue a
q Committer m a -> m b
cio Emitter m a -> m r
eio