{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
{-# OPTIONS_GHC -fno-warn-type-defaults #-}
module Box.Queue
( Queue (..),
queueC,
queueE,
waitCancel,
ends,
withQE,
withQC,
toBox,
toBoxM,
liftB,
concurrentlyLeft,
concurrentlyRight,
fromAction,
fuseActions,
)
where
import Box.Box
import Box.Committer
import Box.Cont
import Box.Emitter
import Control.Concurrent.Classy.Async as C
import Control.Concurrent.Classy.STM as C
import Control.Monad.Catch as C
import Control.Monad.Conc.Class as C
import NumHask.Prelude hiding (STM, atomically)
data Queue a
= Unbounded
| Bounded Int
| Single
| Latest a
| Newest Int
| New
ends :: MonadSTM stm => Queue a -> stm (a -> stm (), stm a)
ends qu =
case qu of
Bounded n -> do
q <- newTBQueue (fromIntegral n)
return (writeTBQueue q, readTBQueue q)
Unbounded -> do
q <- newTQueue
return (writeTQueue q, readTQueue q)
Single -> do
m <- newEmptyTMVar
return (putTMVar m, takeTMVar m)
Latest a -> do
t <- newTVar a
return (writeTVar t, readTVar t)
New -> do
m <- newEmptyTMVar
return (\x -> tryTakeTMVar m *> putTMVar m x, takeTMVar m)
Newest n -> do
q <- newTBQueue (fromIntegral n)
let write x = writeTBQueue q x <|> (tryReadTBQueue q *> write x)
return (write, readTBQueue q)
writeCheck :: (MonadSTM stm) => TVar stm Bool -> (a -> stm ()) -> a -> stm Bool
writeCheck sealed i a = do
b <- readTVar sealed
if b
then pure False
else do
i a
pure True
readCheck :: MonadSTM stm => TVar stm Bool -> stm a -> stm (Maybe a)
readCheck sealed o =
(Just <$> o)
<|> ( do
b <- readTVar sealed
C.check b
pure Nothing
)
toBox ::
(MonadSTM stm) =>
Queue a ->
stm (Box stm a a, stm ())
toBox q = do
(i, o) <- ends q
sealed <- newTVarN "sealed" False
let seal = writeTVar sealed True
pure
( Box
(Committer (writeCheck sealed i))
(Emitter (readCheck sealed o)),
seal
)
toBoxM ::
(MonadConc m) =>
Queue a ->
m (Box m a a, m ())
toBoxM q = do
(b, s) <- atomically $ toBox q
pure (liftB b, atomically s)
waitCancel :: (MonadConc m) => m b -> m a -> m b
waitCancel a b =
C.withAsync a $ \a' ->
C.withAsync b $ \b' -> do
a'' <- C.wait a'
C.cancel b'
pure a''
concurrentlyLeft :: MonadConc m => m a -> m b -> m a
concurrentlyLeft left right =
C.withAsync left $ \a ->
C.withAsync right $ \_ ->
C.wait a
concurrentlyRight :: MonadConc m => m a -> m b -> m b
concurrentlyRight left right =
C.withAsync left $ \_ ->
C.withAsync right $ \b ->
C.wait b
withQC ::
(MonadConc m) =>
Queue a ->
(Queue a -> m (Box m a a, m ())) ->
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m l
withQC q spawner cio eio =
C.bracket
(spawner q)
snd
( \(box, seal) ->
concurrentlyLeft
(cio (committer box) `C.finally` seal)
(eio (emitter box) `C.finally` seal)
)
withQE ::
(MonadConc m) =>
Queue a ->
(Queue a -> m (Box m a a, m ())) ->
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m r
withQE q spawner cio eio =
C.bracket
(spawner q)
snd
( \(box, seal) ->
concurrentlyRight
(cio (committer box) `C.finally` seal)
(eio (emitter box) `C.finally` seal)
)
queueC ::
(MonadConc m) =>
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m l
queueC cm em = withQC Unbounded toBoxM cm em
queueE ::
(MonadConc m) =>
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m r
queueE cm em = withQE Unbounded toBoxM cm em
liftB :: (MonadConc m) => Box (STM m) a b -> Box m a b
liftB (Box c e) = Box (hoist atomically c) (hoist atomically e)
fromAction :: (MonadConc m) => (Box m a b -> m r) -> Cont m (Box m b a)
fromAction baction = Cont $ fuseActions baction
fuseActions :: (MonadConc m) => (Box m a b -> m r) -> (Box m b a -> m r') -> m r'
fuseActions abm bam = do
(Box ca ea, _) <- toBoxM Unbounded
(Box cb eb, _) <- toBoxM Unbounded
concurrentlyRight (abm (Box ca eb)) (bam (Box cb ea))