module Control.Distributed.Process.Internal.CQueue
( CQueue
, BlockSpec(..)
, newCQueue
, enqueue
, dequeue
) where
import Control.Concurrent.MVar (MVar, newMVar, takeMVar, putMVar)
import Control.Concurrent.STM
( atomically
, TChan
, newTChan
, writeTChan
, readTChan
, tryReadTChan
)
import Control.Applicative ((<$>), (<*>))
import Control.Exception (mask, onException)
import System.Timeout (timeout)
data CQueue a = CQueue (MVar [a])
(TChan a)
newCQueue :: IO (CQueue a)
newCQueue = CQueue <$> newMVar [] <*> atomically newTChan
enqueue :: CQueue a -> a -> IO ()
enqueue (CQueue _arrived incoming) a = atomically $ writeTChan incoming a
data BlockSpec =
NonBlocking
| Blocking
| Timeout Int
dequeue :: forall a b.
CQueue a
-> BlockSpec
-> [a -> Maybe b]
-> IO (Maybe b)
dequeue (CQueue arrived incoming) blockSpec matches = go
where
go :: IO (Maybe b)
go = mask $ \restore -> do
arr <- takeMVar arrived
(arr', mb) <- onException (restore (checkArrived [] arr))
(putMVar arrived arr)
case (mb, blockSpec) of
(Just b, _) -> do
putMVar arrived arr'
return (Just b)
(Nothing, NonBlocking) ->
checkNonBlocking arr'
(Nothing, Blocking) ->
Just <$> checkBlocking arr'
(Nothing, Timeout n) ->
timeout n $ checkBlocking arr'
checkArrived :: [a] -> [a] -> IO ([a], Maybe b)
checkArrived acc [] = return (acc, Nothing)
checkArrived acc (x:xs) =
case check x of
Just y -> return (reverse acc ++ xs, Just y)
Nothing -> checkArrived (x:acc) xs
checkBlocking :: [a] -> IO b
checkBlocking acc = do
x <- onException (atomically $ readTChan incoming)
(putMVar arrived $ reverse acc)
case check x of
Nothing -> checkBlocking (x:acc)
Just y -> putMVar arrived (reverse acc) >> return y
checkNonBlocking :: [a] -> IO (Maybe b)
checkNonBlocking acc = do
mx <- atomically $ tryReadTChan incoming
case mx of
Nothing -> putMVar arrived (reverse acc) >> return Nothing
Just x -> case check x of
Nothing -> checkNonBlocking (x:acc)
Just y -> putMVar arrived (reverse acc) >> return (Just y)
check :: a -> Maybe b
check = checkMatches matches
checkMatches :: [a -> Maybe b] -> a -> Maybe b
checkMatches [] _ = Nothing
checkMatches (m:ms) a = case m a of Nothing -> checkMatches ms a
Just b -> Just b