{-# OPTIONS_GHC -Wall #-}
module Box.Broadcast
( Broadcaster (..),
broadcast,
subscribe,
Funneler (..),
funnel,
widen,
)
where
import Prelude
import Box.Committer
import Box.Cont
import Box.Emitter
import Box.Queue
import Control.Concurrent.Classy.STM as C
import Control.Monad.Conc.Class as C
newtype Broadcaster m a
= Broadcaster
{ unBroadcast :: TVar m (Committer m a)
}
broadcast :: (MonadSTM stm) => stm (Broadcaster stm a, Committer stm a)
broadcast = do
ref <- newTVar mempty
let com = Committer $ \a -> do
c <- readTVar ref
commit c a
return (Broadcaster ref, com)
subscribe :: (MonadConc m) => Broadcaster (STM m) a -> Cont m (Emitter (STM m) a)
subscribe (Broadcaster tvar) = Cont $ \e -> queueE' cio e
where
cio c = atomically $ modifyTVar' tvar (mappend c)
newtype Funneler m a
= Funneler
{ unFunnel :: TVar m (Emitter m a)
}
funnel :: (MonadSTM stm) => stm (Funneler stm a, Emitter stm a)
funnel = do
ref <- newTVar mempty
let em =
Emitter $ do
e <- readTVar ref
emit e
pure (Funneler ref, em)
widen :: (MonadConc m) => Funneler (STM m) a -> Cont m (Committer (STM m) a)
widen (Funneler tvar) =
Cont $ \c -> queueC' c $ \e -> atomically $ modifyTVar' tvar (mappend e)