{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
{-# OPTIONS_GHC -fno-warn-type-defaults #-}
module Box.Queue
( Queue (..),
queue,
queueC,
queueC',
queueE,
queueE',
queueCM,
queueCM',
queueEM,
queueEM',
waitCancel,
ends,
withQ,
withQE,
withQC,
toBox,
concurrentlyLeft,
concurrentlyRight,
)
where
import Prelude
import Box.Box
import Box.Committer
import Box.Emitter
import Control.Applicative
import Control.Concurrent.Classy.Async as C
import Control.Concurrent.Classy.STM as C
import Control.Monad.Catch as C
import Control.Monad.Conc.Class as C
data Queue a
= Unbounded
| Bounded Int
| Single
| Latest a
| Newest Int
| New
ends :: MonadSTM stm => Queue a -> stm (a -> stm (), stm a)
ends qu =
case qu of
Bounded n -> do
q <- newTBQueue (fromIntegral n)
return (writeTBQueue q, readTBQueue q)
Unbounded -> do
q <- newTQueue
return (writeTQueue q, readTQueue q)
Single -> do
m <- newEmptyTMVar
return (putTMVar m, takeTMVar m)
Latest a -> do
t <- newTVar a
return (writeTVar t, readTVar t)
New -> do
m <- newEmptyTMVar
return (\x -> tryTakeTMVar m *> putTMVar m x, takeTMVar m)
Newest n -> do
q <- newTBQueue (fromIntegral n)
let write x = writeTBQueue q x <|> (tryReadTBQueue q *> write x)
return (write, readTBQueue q)
writeCheck :: (MonadSTM stm) => TVar stm Bool -> (a -> stm ()) -> a -> stm Bool
writeCheck sealed i a = do
b <- readTVar sealed
if b
then pure False
else do
i a
pure True
readCheck :: MonadSTM stm => TVar stm Bool -> stm a -> stm (Maybe a)
readCheck sealed o =
(Just <$> o)
<|> ( do
b <- readTVar sealed
C.check b
pure Nothing
)
toBox ::
(MonadSTM stm) =>
Queue a ->
stm (Box stm a a, stm ())
toBox q = do
(i, o) <- ends q
sealed <- newTVarN "sealed" False
let seal = writeTVar sealed True
pure
( Box
(Committer (writeCheck sealed i))
(Emitter (readCheck sealed o)),
seal
)
toBoxM ::
(MonadConc m) =>
Queue a ->
m (Box m a a, m ())
toBoxM q = do
(i, o) <- atomically $ ends q
sealed <- atomically $ newTVarN "sealed" False
let seal = atomically $ writeTVar sealed True
pure
( Box
(Committer (atomically . writeCheck sealed i))
(Emitter (atomically $ readCheck sealed o)),
seal
)
waitCancel :: (MonadConc m) => m b -> m a -> m b
waitCancel a b =
withAsync a $ \a' ->
withAsync b $ \b' -> do
a'' <- wait a'
cancel b'
pure a''
concurrentlyLeft :: MonadConc m => m a -> m b -> m a
concurrentlyLeft left right =
withAsync left $ \a ->
withAsync right $ \_ ->
wait a
concurrentlyRight :: MonadConc m => m a -> m b -> m b
concurrentlyRight left right =
withAsync left $ \_ ->
withAsync right $ \b ->
wait b
withQ ::
(MonadConc m) =>
Queue a ->
(Queue a -> (STM m) (Box (STM m) a a, (STM m) ())) ->
(Committer (STM m) a -> m l) ->
(Emitter (STM m) a -> m r) ->
m (l, r)
withQ q spawner cio eio =
C.bracket
(atomically $ spawner q)
(\(_, seal) -> atomically seal)
( \(box, seal) ->
concurrently
(cio (committer box) `C.finally` atomically seal)
(eio (emitter box) `C.finally` atomically seal)
)
withQC ::
(MonadConc m) =>
Queue a ->
(Queue a -> (STM m) (Box (STM m) a a, (STM m) ())) ->
(Committer (STM m) a -> m l) ->
(Emitter (STM m) a -> m r) ->
m l
withQC q spawner cio eio =
C.bracket
(atomically $ spawner q)
(\(_, seal) -> atomically seal)
( \(box, seal) ->
concurrentlyLeft
(cio (committer box) `C.finally` atomically seal)
(eio (emitter box) `C.finally` atomically seal)
)
withQE ::
(MonadConc m) =>
Queue a ->
(Queue a -> (STM m) (Box (STM m) a a, (STM m) ())) ->
(Committer (STM m) a -> m l) ->
(Emitter (STM m) a -> m r) ->
m r
withQE q spawner cio eio =
C.bracket
(atomically $ spawner q)
(\(_, seal) -> atomically seal)
( \(box, seal) ->
concurrentlyRight
(cio (committer box) `C.finally` atomically seal)
(eio (emitter box) `C.finally` atomically seal)
)
withQM ::
(MonadConc m) =>
Queue a ->
(Queue a -> m (Box m a a, m ())) ->
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m (l, r)
withQM q spawner cio eio =
C.bracket
(spawner q)
snd
( \(box, seal) ->
concurrently
(cio (committer box) `C.finally` seal)
(eio (emitter box) `C.finally` seal)
)
withQEM ::
(MonadConc m) =>
Queue a ->
(Queue a -> m (Box m a a, m ())) ->
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m r
withQEM q spawner cio eio =
C.bracket
(spawner q)
snd
( \(box, seal) ->
concurrentlyRight
(cio (committer box) `C.finally` seal)
(eio (emitter box) `C.finally` seal)
)
withQCM ::
(MonadConc m) =>
Queue a ->
(Queue a -> m (Box m a a, m ())) ->
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m l
withQCM q spawner cio eio =
C.bracket
(spawner q)
snd
( \(box, seal) ->
concurrentlyLeft
(cio (committer box) `C.finally` seal)
(eio (emitter box) `C.finally` seal)
)
queue ::
(MonadConc m) =>
(Committer (STM m) a -> m l) ->
(Emitter (STM m) a -> m r) ->
m (l, r)
queue = withQ Unbounded toBox
queueE ::
(MonadConc m) =>
(Committer (STM m) a -> m l) ->
(Emitter (STM m) a -> m r) ->
m r
queueE cm em = snd <$> withQ Unbounded toBox cm em
queueE' ::
(MonadConc m) =>
(Committer (STM m) a -> m l) ->
(Emitter (STM m) a -> m r) ->
m r
queueE' cm em = withQE Unbounded toBox cm em
queueC ::
(MonadConc m) =>
(Committer (STM m) a -> m l) ->
(Emitter (STM m) a -> m r) ->
m l
queueC cm em = fst <$> withQ Unbounded toBox cm em
queueC' ::
(MonadConc m) =>
(Committer (STM m) a -> m l) ->
(Emitter (STM m) a -> m r) ->
m l
queueC' cm em = withQC Unbounded toBox cm em
queueCM ::
(MonadConc m) =>
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m l
queueCM cm em = fst <$> withQM Unbounded toBoxM cm em
queueCM' ::
(MonadConc m) =>
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m l
queueCM' cm em = withQCM Unbounded toBoxM cm em
queueEM ::
(MonadConc m) =>
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m r
queueEM cm em = snd <$> withQM Unbounded toBoxM cm em
queueEM' ::
(MonadConc m) =>
(Committer m a -> m l) ->
(Emitter m a -> m r) ->
m r
queueEM' cm em = withQEM Unbounded toBoxM cm em