{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
{-# OPTIONS_GHC -fno-warn-type-defaults #-}

-- | STM Queues, based originally on [pipes-concurrency](https://hackage.haskell.org/package/pipes-concurrency)
module Box.Queue
  ( Queue (..),
    queueL,
    queueR,
    queue,
    fromAction,
    emitQ,
    commitQ,
  )
where

import Box.Box
import Box.Codensity
import Box.Committer
import Box.Emitter
import Box.Functor
import Control.Applicative
import Control.Concurrent.Classy.Async as C
import Control.Concurrent.Classy.STM as C
import Control.Monad.Catch as C
import Control.Monad.Conc.Class as C
import Prelude

-- $setup
-- >>> :set -XOverloadedStrings
-- >>> import Box
-- >>> import Prelude

-- | 'Queue' specifies how messages are queued
data Queue a
  = Unbounded
  | Bounded Int
  | Single
  | Latest a
  | Newest Int
  | New

-- | create a queue, supplying the ends and a sealing function.
ends :: MonadSTM stm => Queue a -> stm (a -> stm (), stm a)
ends :: Queue a -> stm (a -> stm (), stm a)
ends Queue a
qu =
  case Queue a
qu of
    Bounded Int
n -> do
      TBQueue stm a
q <- Natural -> stm (TBQueue stm a)
forall (stm :: * -> *) a.
MonadSTM stm =>
Natural -> stm (TBQueue stm a)
newTBQueue (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
      (a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TBQueue stm a -> a -> stm ()
forall (stm :: * -> *) a.
MonadSTM stm =>
TBQueue stm a -> a -> stm ()
writeTBQueue TBQueue stm a
q, TBQueue stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TBQueue stm a -> stm a
readTBQueue TBQueue stm a
q)
    Queue a
Unbounded -> do
      TQueue stm a
q <- stm (TQueue stm a)
forall (stm :: * -> *) a. MonadSTM stm => stm (TQueue stm a)
newTQueue
      (a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TQueue stm a -> a -> stm ()
forall (stm :: * -> *) a.
MonadSTM stm =>
TQueue stm a -> a -> stm ()
writeTQueue TQueue stm a
q, TQueue stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TQueue stm a -> stm a
readTQueue TQueue stm a
q)
    Queue a
Single -> do
      TMVar stm a
m <- stm (TMVar stm a)
forall (stm :: * -> *) a. MonadSTM stm => stm (TMVar stm a)
newEmptyTMVar
      (a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TMVar stm a -> a -> stm ()
forall (stm :: * -> *) a.
MonadSTM stm =>
TMVar stm a -> a -> stm ()
putTMVar TMVar stm a
m, TMVar stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TMVar stm a -> stm a
takeTMVar TMVar stm a
m)
    Latest a
a -> do
      TVar stm a
t <- a -> stm (TVar stm a)
forall (stm :: * -> *) a. MonadSTM stm => a -> stm (TVar stm a)
newTVar a
a
      (a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TVar stm a -> a -> stm ()
forall (stm :: * -> *) a. MonadSTM stm => TVar stm a -> a -> stm ()
writeTVar TVar stm a
t, TVar stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TVar stm a -> stm a
readTVar TVar stm a
t)
    Queue a
New -> do
      TMVar stm a
m <- stm (TMVar stm a)
forall (stm :: * -> *) a. MonadSTM stm => stm (TMVar stm a)
newEmptyTMVar
      (a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (\a
x -> TMVar stm a -> stm (Maybe a)
forall (stm :: * -> *) a.
MonadSTM stm =>
TMVar stm a -> stm (Maybe a)
tryTakeTMVar TMVar stm a
m stm (Maybe a) -> stm () -> stm ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> TMVar stm a -> a -> stm ()
forall (stm :: * -> *) a.
MonadSTM stm =>
TMVar stm a -> a -> stm ()
putTMVar TMVar stm a
m a
x, TMVar stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TMVar stm a -> stm a
takeTMVar TMVar stm a
m)
    Newest Int
n -> do
      TBQueue stm a
q <- Natural -> stm (TBQueue stm a)
forall (stm :: * -> *) a.
MonadSTM stm =>
Natural -> stm (TBQueue stm a)
newTBQueue (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
      let write :: a -> stm ()
write a
x = TBQueue stm a -> a -> stm ()
forall (stm :: * -> *) a.
MonadSTM stm =>
TBQueue stm a -> a -> stm ()
writeTBQueue TBQueue stm a
q a
x stm () -> stm () -> stm ()
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (TBQueue stm a -> stm (Maybe a)
forall (stm :: * -> *) a.
MonadSTM stm =>
TBQueue stm a -> stm (Maybe a)
tryReadTBQueue TBQueue stm a
q stm (Maybe a) -> stm () -> stm ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> a -> stm ()
write a
x)
      (a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> stm ()
write, TBQueue stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TBQueue stm a -> stm a
readTBQueue TBQueue stm a
q)

-- | write to a queue, checking the seal
writeCheck :: (MonadSTM stm) => TVar stm Bool -> (a -> stm ()) -> a -> stm Bool
writeCheck :: TVar stm Bool -> (a -> stm ()) -> a -> stm Bool
writeCheck TVar stm Bool
sealed a -> stm ()
i a
a = do
  Bool
b <- TVar stm Bool -> stm Bool
forall (stm :: * -> *) a. MonadSTM stm => TVar stm a -> stm a
readTVar TVar stm Bool
sealed
  if Bool
b
    then Bool -> stm Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
    else do
      a -> stm ()
i a
a
      Bool -> stm Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True

-- | read from a queue, and retry if not sealed
readCheck :: MonadSTM stm => TVar stm Bool -> stm a -> stm (Maybe a)
readCheck :: TVar stm Bool -> stm a -> stm (Maybe a)
readCheck TVar stm Bool
sealed stm a
o =
  (a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> stm a -> stm (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> stm a
o)
    stm (Maybe a) -> stm (Maybe a) -> stm (Maybe a)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ( do
            Bool
b <- TVar stm Bool -> stm Bool
forall (stm :: * -> *) a. MonadSTM stm => TVar stm a -> stm a
readTVar TVar stm Bool
sealed
            Bool -> stm ()
forall (stm :: * -> *). MonadSTM stm => Bool -> stm ()
C.check Bool
b
            Maybe a -> stm (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing
        )

-- | turn a queue into a box (and a seal)
toBoxSTM ::
  (MonadSTM stm) =>
  Queue a ->
  stm (Box stm a a, stm ())
toBoxSTM :: Queue a -> stm (Box stm a a, stm ())
toBoxSTM Queue a
q = do
  (a -> stm ()
i, stm a
o) <- Queue a -> stm (a -> stm (), stm a)
forall (stm :: * -> *) a.
MonadSTM stm =>
Queue a -> stm (a -> stm (), stm a)
ends Queue a
q
  TVar stm Bool
sealed <- String -> Bool -> stm (TVar stm Bool)
forall (stm :: * -> *) a.
MonadSTM stm =>
String -> a -> stm (TVar stm a)
newTVarN String
"sealed" Bool
False
  let seal :: stm ()
seal = TVar stm Bool -> Bool -> stm ()
forall (stm :: * -> *) a. MonadSTM stm => TVar stm a -> a -> stm ()
writeTVar TVar stm Bool
sealed Bool
True
  (Box stm a a, stm ()) -> stm (Box stm a a, stm ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    ( Committer stm a -> Emitter stm a -> Box stm a a
forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box
        ((a -> stm Bool) -> Committer stm a
forall (m :: * -> *) a. (a -> m Bool) -> Committer m a
Committer (TVar stm Bool -> (a -> stm ()) -> a -> stm Bool
forall (stm :: * -> *) a.
MonadSTM stm =>
TVar stm Bool -> (a -> stm ()) -> a -> stm Bool
writeCheck TVar stm Bool
sealed a -> stm ()
i))
        (stm (Maybe a) -> Emitter stm a
forall (m :: * -> *) a. m (Maybe a) -> Emitter m a
Emitter (TVar stm Bool -> stm a -> stm (Maybe a)
forall (stm :: * -> *) a.
MonadSTM stm =>
TVar stm Bool -> stm a -> stm (Maybe a)
readCheck TVar stm Bool
sealed stm a
o)),
      stm ()
seal
    )

-- | turn a queue into a box (and a seal), and lift from stm to the underlying monad.
toBoxM ::
  (MonadConc m) =>
  Queue a ->
  m (Box m a a, m ())
toBoxM :: Queue a -> m (Box m a a, m ())
toBoxM Queue a
q = do
  (Box (STM m) a a
b, STM m ()
s) <- STM m (Box (STM m) a a, STM m ()) -> m (Box (STM m) a a, STM m ())
forall (m :: * -> *) a. MonadConc m => STM m a -> m a
atomically (STM m (Box (STM m) a a, STM m ())
 -> m (Box (STM m) a a, STM m ()))
-> STM m (Box (STM m) a a, STM m ())
-> m (Box (STM m) a a, STM m ())
forall a b. (a -> b) -> a -> b
$ Queue a -> STM m (Box (STM m) a a, STM m ())
forall (stm :: * -> *) a.
MonadSTM stm =>
Queue a -> stm (Box stm a a, stm ())
toBoxSTM Queue a
q
  (Box m a a, m ()) -> m (Box m a a, m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Box (STM m) a a -> Box m a a
forall (m :: * -> *) a b.
MonadConc m =>
Box (STM m) a b -> Box m a b
liftB Box (STM m) a a
b, STM m () -> m ()
forall (m :: * -> *) a. MonadConc m => STM m a -> m a
atomically STM m ()
s)

-- | run two actions concurrently, but wait and return on the left result.
concurrentlyLeft :: MonadConc m => m a -> m b -> m a
concurrentlyLeft :: m a -> m b -> m a
concurrentlyLeft m a
left m b
right =
  m a -> (Async m a -> m a) -> m a
forall (m :: * -> *) a b.
MonadConc m =>
m a -> (Async m a -> m b) -> m b
C.withAsync m a
left ((Async m a -> m a) -> m a) -> (Async m a -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async m a
a ->
    m b -> (Async m b -> m a) -> m a
forall (m :: * -> *) a b.
MonadConc m =>
m a -> (Async m a -> m b) -> m b
C.withAsync m b
right ((Async m b -> m a) -> m a) -> (Async m b -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async m b
_ ->
      Async m a -> m a
forall (m :: * -> *) a. MonadConc m => Async m a -> m a
C.wait Async m a
a

-- | run two actions concurrently, but wait and return on the right result.
concurrentlyRight :: MonadConc m => m a -> m b -> m b
concurrentlyRight :: m a -> m b -> m b
concurrentlyRight m a
left m b
right =
  m a -> (Async m a -> m b) -> m b
forall (m :: * -> *) a b.
MonadConc m =>
m a -> (Async m a -> m b) -> m b
C.withAsync m a
left ((Async m a -> m b) -> m b) -> (Async m a -> m b) -> m b
forall a b. (a -> b) -> a -> b
$ \Async m a
_ ->
    m b -> (Async m b -> m b) -> m b
forall (m :: * -> *) a b.
MonadConc m =>
m a -> (Async m a -> m b) -> m b
C.withAsync m b
right ((Async m b -> m b) -> m b) -> (Async m b -> m b) -> m b
forall a b. (a -> b) -> a -> b
$ \Async m b
b ->
      Async m b -> m b
forall (m :: * -> *) a. MonadConc m => Async m a -> m a
C.wait Async m b
b

-- | connect a committer and emitter action via spawning a queue, and wait for the Committer action to complete.
withQL ::
  (MonadConc m) =>
  Queue a ->
  (Queue a -> m (Box m a a, m ())) ->
  (Committer m a -> m l) ->
  (Emitter m a -> m r) ->
  m l
withQL :: Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m l
withQL Queue a
q Queue a -> m (Box m a a, m ())
spawner Committer m a -> m l
cio Emitter m a -> m r
eio =
  m (Box m a a, m ())
-> ((Box m a a, m ()) -> m ()) -> ((Box m a a, m ()) -> m l) -> m l
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
C.bracket
    (Queue a -> m (Box m a a, m ())
spawner Queue a
q)
    (Box m a a, m ()) -> m ()
forall a b. (a, b) -> b
snd
    ( \(Box m a a
box, m ()
seal) ->
        m l -> m r -> m l
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m a
concurrentlyLeft
          (Committer m a -> m l
cio (Box m a a -> Committer m a
forall (m :: * -> *) c e. Box m c e -> Committer m c
committer Box m a a
box) m l -> m () -> m l
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`C.finally` m ()
seal)
          (Emitter m a -> m r
eio (Box m a a -> Emitter m a
forall (m :: * -> *) c e. Box m c e -> Emitter m e
emitter Box m a a
box) m r -> m () -> m r
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`C.finally` m ()
seal)
    )

-- | connect a committer and emitter action via spawning a queue, and wait for the Emitter action to complete.
withQR ::
  (MonadConc m) =>
  Queue a ->
  (Queue a -> m (Box m a a, m ())) ->
  (Committer m a -> m l) ->
  (Emitter m a -> m r) ->
  m r
withQR :: Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m r
withQR Queue a
q Queue a -> m (Box m a a, m ())
spawner Committer m a -> m l
cio Emitter m a -> m r
eio =
  m (Box m a a, m ())
-> ((Box m a a, m ()) -> m ()) -> ((Box m a a, m ()) -> m r) -> m r
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
C.bracket
    (Queue a -> m (Box m a a, m ())
spawner Queue a
q)
    (Box m a a, m ()) -> m ()
forall a b. (a, b) -> b
snd
    ( \(Box m a a
box, m ()
seal) ->
        m l -> m r -> m r
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m b
concurrentlyRight
          (Committer m a -> m l
cio (Box m a a -> Committer m a
forall (m :: * -> *) c e. Box m c e -> Committer m c
committer Box m a a
box) m l -> m () -> m l
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`C.finally` m ()
seal)
          (Emitter m a -> m r
eio (Box m a a -> Emitter m a
forall (m :: * -> *) c e. Box m c e -> Emitter m e
emitter Box m a a
box) m r -> m () -> m r
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`C.finally` m ()
seal)
    )

-- | connect a committer and emitter action via spawning a queue, and wait for both to complete.
withQ ::
  (MonadConc m) =>
  Queue a ->
  (Queue a -> m (Box m a a, m ())) ->
  (Committer m a -> m l) ->
  (Emitter m a -> m r) ->
  m (l, r)
withQ :: Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m (l, r)
withQ Queue a
q Queue a -> m (Box m a a, m ())
spawner Committer m a -> m l
cio Emitter m a -> m r
eio =
  m (Box m a a, m ())
-> ((Box m a a, m ()) -> m ())
-> ((Box m a a, m ()) -> m (l, r))
-> m (l, r)
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
C.bracket
    (Queue a -> m (Box m a a, m ())
spawner Queue a
q)
    (Box m a a, m ()) -> m ()
forall a b. (a, b) -> b
snd
    ( \(Box m a a
box, m ()
seal) ->
        m l -> m r -> m (l, r)
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m (a, b)
concurrently
          (Committer m a -> m l
cio (Box m a a -> Committer m a
forall (m :: * -> *) c e. Box m c e -> Committer m c
committer Box m a a
box) m l -> m () -> m l
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`C.finally` m ()
seal)
          (Emitter m a -> m r
eio (Box m a a -> Emitter m a
forall (m :: * -> *) c e. Box m c e -> Emitter m e
emitter Box m a a
box) m r -> m () -> m r
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`C.finally` m ()
seal)
    )

-- | Create an unbounded queue, returning the result from the Committer action.
--
-- >>> queueL New (\c -> glue c <$|> qList [1..3]) toListM
queueL ::
  (MonadConc m) =>
  Queue a ->
  (Committer m a -> m l) ->
  (Emitter m a -> m r) ->
  m l
queueL :: Queue a -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueL Queue a
q Committer m a -> m l
cm Emitter m a -> m r
em = Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m l
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m l
withQL Queue a
q Queue a -> m (Box m a a, m ())
forall (m :: * -> *) a.
MonadConc m =>
Queue a -> m (Box m a a, m ())
toBoxM Committer m a -> m l
cm Emitter m a -> m r
em

-- | Create an unbounded queue, returning the result from the Emitter action.
--
-- >>> queueR New (\c -> glue c <$|> qList [1..3]) toListM
-- [3]
queueR ::
  (MonadConc m) =>
  Queue a ->
  (Committer m a -> m l) ->
  (Emitter m a -> m r) ->
  m r
queueR :: Queue a -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueR Queue a
q Committer m a -> m l
cm Emitter m a -> m r
em = Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m r
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m r
withQR Queue a
q Queue a -> m (Box m a a, m ())
forall (m :: * -> *) a.
MonadConc m =>
Queue a -> m (Box m a a, m ())
toBoxM Committer m a -> m l
cm Emitter m a -> m r
em

-- | Create an unbounded queue, returning both results.
--
-- >>> queue Unbounded (\c -> glue c <$|> qList [1..3]) toListM
-- ((),[1,2,3])
queue ::
  (MonadConc m) =>
  Queue a ->
  (Committer m a -> m l) ->
  (Emitter m a -> m r) ->
  m (l, r)
queue :: Queue a
-> (Committer m a -> m l) -> (Emitter m a -> m r) -> m (l, r)
queue Queue a
q Committer m a -> m l
cm Emitter m a -> m r
em = Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m (l, r)
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m (l, r)
withQ Queue a
q Queue a -> m (Box m a a, m ())
forall (m :: * -> *) a.
MonadConc m =>
Queue a -> m (Box m a a, m ())
toBoxM Committer m a -> m l
cm Emitter m a -> m r
em

-- | lift a box from STM
liftB :: (MonadConc m) => Box (STM m) a b -> Box m a b
liftB :: Box (STM m) a b -> Box m a b
liftB (Box Committer (STM m) a
c Emitter (STM m) b
e) = Committer m a -> Emitter m b -> Box m a b
forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box ((forall x. STM m x -> m x) -> Committer (STM m) a -> Committer m a
forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall x. STM m x -> m x
forall (m :: * -> *) a. MonadConc m => STM m a -> m a
atomically Committer (STM m) a
c) ((forall x. STM m x -> m x) -> Emitter (STM m) b -> Emitter m b
forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall x. STM m x -> m x
forall (m :: * -> *) a. MonadConc m => STM m a -> m a
atomically Emitter (STM m) b
e)

-- | Turn a box action into a box continuation
fromAction :: (MonadConc m) => (Box m a b -> m r) -> CoBox m b a
fromAction :: (Box m a b -> m r) -> CoBox m b a
fromAction Box m a b -> m r
baction = (forall b. (Box m b a -> m b) -> m b) -> CoBox m b a
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Box m b a -> m b) -> m b) -> CoBox m b a)
-> (forall b. (Box m b a -> m b) -> m b) -> CoBox m b a
forall a b. (a -> b) -> a -> b
$ (Box m a b -> m r) -> (Box m b a -> m b) -> m b
forall (m :: * -> *) a b r r'.
MonadConc m =>
(Box m a b -> m r) -> (Box m b a -> m r') -> m r'
fuseActions Box m a b -> m r
baction

-- | Connect up two box actions via two queues
fuseActions :: (MonadConc m) => (Box m a b -> m r) -> (Box m b a -> m r') -> m r'
fuseActions :: (Box m a b -> m r) -> (Box m b a -> m r') -> m r'
fuseActions Box m a b -> m r
abm Box m b a -> m r'
bam = do
  (Box Committer m a
ca Emitter m a
ea, m ()
_) <- Queue a -> m (Box m a a, m ())
forall (m :: * -> *) a.
MonadConc m =>
Queue a -> m (Box m a a, m ())
toBoxM Queue a
forall a. Queue a
Unbounded
  (Box Committer m b
cb Emitter m b
eb, m ()
_) <- Queue b -> m (Box m b b, m ())
forall (m :: * -> *) a.
MonadConc m =>
Queue a -> m (Box m a a, m ())
toBoxM Queue b
forall a. Queue a
Unbounded
  m r -> m r' -> m r'
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m b
concurrentlyRight (Box m a b -> m r
abm (Committer m a -> Emitter m b -> Box m a b
forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box Committer m a
ca Emitter m b
eb)) (Box m b a -> m r'
bam (Committer m b -> Emitter m a -> Box m b a
forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box Committer m b
cb Emitter m a
ea))

-- | Hook a committer action to a queue, creating an emitter continuation.
emitQ :: (MonadConc m) => Queue a -> (Committer m a -> m r) -> CoEmitter m a
emitQ :: Queue a -> (Committer m a -> m r) -> CoEmitter m a
emitQ Queue a
q Committer m a -> m r
cio = (forall b. (Emitter m a -> m b) -> m b) -> CoEmitter m a
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Emitter m a -> m b) -> m b) -> CoEmitter m a)
-> (forall b. (Emitter m a -> m b) -> m b) -> CoEmitter m a
forall a b. (a -> b) -> a -> b
$ \Emitter m a -> m b
eio -> Queue a -> (Committer m a -> m r) -> (Emitter m a -> m b) -> m b
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueR Queue a
q Committer m a -> m r
cio Emitter m a -> m b
eio

-- | Hook a committer action to a queue, creating an emitter continuation.
commitQ :: (MonadConc m) => Queue a -> (Emitter m a -> m r) -> CoCommitter m a
commitQ :: Queue a -> (Emitter m a -> m r) -> CoCommitter m a
commitQ Queue a
q Emitter m a -> m r
eio = (forall b. (Committer m a -> m b) -> m b) -> CoCommitter m a
forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity ((forall b. (Committer m a -> m b) -> m b) -> CoCommitter m a)
-> (forall b. (Committer m a -> m b) -> m b) -> CoCommitter m a
forall a b. (a -> b) -> a -> b
$ \Committer m a -> m b
cio -> Queue a -> (Committer m a -> m b) -> (Emitter m a -> m r) -> m b
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueL Queue a
q Committer m a -> m b
cio Emitter m a -> m r
eio