module Pipes.Concurrent (
Input(..),
Output(..),
fromInput,
toOutput,
spawn,
spawn',
withSpawn,
withBuffer,
Buffer(..),
unbounded,
bounded,
latest,
newest,
module Control.Concurrent,
module Control.Concurrent.STM,
module System.Mem
) where
import Control.Applicative (
Alternative(empty, (<|>)), Applicative(pure, (*>), (<*>)), (<*), (<$>) )
import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically, STM, mkWeakTVar, newTVarIO, readTVar)
import Control.Exception (bracket)
import Control.Monad (when,void, MonadPlus(..))
import Data.Functor.Contravariant (Contravariant(contramap))
import Data.Functor.Contravariant.Divisible (
Divisible(divide, conquer), Decidable(lose, choose))
import Data.Monoid (Monoid(mempty, mappend))
import Data.Void (absurd)
import Pipes (MonadIO(liftIO), yield, await, Producer', Consumer')
import System.Mem (performGC)
import qualified Control.Concurrent.Async
import qualified Control.Concurrent.STM as S
import qualified Control.Exception
newtype Input a = Input {
recv :: S.STM (Maybe a) }
instance Functor Input where
fmap f m = Input (fmap (fmap f) (recv m))
instance Applicative Input where
pure r = Input (pure (pure r))
mf <*> mx = Input ((<*>) <$> recv mf <*> recv mx)
instance Monad Input where
return r = Input (return (return r))
m >>= f = Input $ do
ma <- recv m
case ma of
Nothing -> return Nothing
Just a -> recv (f a)
instance Alternative Input where
empty = Input (return Nothing)
x <|> y = Input $ do
(i, ma) <- fmap ((,) y) (recv x) <|> fmap ((,) x)(recv y)
case ma of
Nothing -> recv i
Just a -> return (Just a)
instance MonadPlus Input where
mzero = empty
mplus = (<|>)
instance Monoid (Input a) where
mempty = empty
mappend = (<|>)
newtype Output a = Output {
send :: a -> S.STM Bool }
instance Monoid (Output a) where
mempty = Output (\_ -> return False)
mappend i1 i2 = Output (\a -> (||) <$> send i1 a <*> send i2 a)
instance Contravariant Output where
contramap f (Output a) = Output (a . f)
instance Divisible Output where
conquer = Output (\_ -> return False)
divide f i1 i2 = Output $ \a -> case f a of
(b, c) -> (||) <$> send i1 b <*> send i2 c
instance Decidable Output where
lose f = Output (absurd . f)
choose f i1 i2 = Output $ \a -> case f a of
Left b -> send i1 b
Right c -> send i2 c
toOutput :: (MonadIO m) => Output a -> Consumer' a m ()
toOutput output = loop
where
loop = do
a <- await
alive <- liftIO $ S.atomically $ send output a
when alive loop
fromInput :: (MonadIO m) => Input a -> Producer' a m ()
fromInput input = loop
where
loop = do
ma <- liftIO $ S.atomically $ recv input
case ma of
Nothing -> return ()
Just a -> do
yield a
loop
spawn :: Buffer a -> IO (Output a, Input a)
spawn buffer = fmap simplify (spawn' buffer)
where
simplify (output, input, _) = (output, input)
spawn' :: Buffer a -> IO (Output a, Input a, STM ())
spawn' buffer = do
(write, read) <- case buffer of
Bounded n -> do
q <- S.newTBQueueIO n
return (S.writeTBQueue q, S.readTBQueue q)
Unbounded -> do
q <- S.newTQueueIO
return (S.writeTQueue q, S.readTQueue q)
Single -> do
m <- S.newEmptyTMVarIO
return (S.putTMVar m, S.takeTMVar m)
Latest a -> do
t <- S.newTVarIO a
return (S.writeTVar t, S.readTVar t)
New -> do
m <- S.newEmptyTMVarIO
return (\x -> S.tryTakeTMVar m *> S.putTMVar m x, S.takeTMVar m)
Newest n -> do
q <- S.newTBQueueIO n
let write x = S.writeTBQueue q x <|> (S.tryReadTBQueue q *> write x)
return (write, S.readTBQueue q)
sealed <- S.newTVarIO False
let seal = S.writeTVar sealed True
rSend <- newTVarIO ()
void $ mkWeakTVar rSend (S.atomically seal)
rRecv <- newTVarIO ()
void $ mkWeakTVar rRecv (S.atomically seal)
let sendOrEnd a = do
b <- S.readTVar sealed
if b
then return False
else do
write a
return True
readOrEnd = (Just <$> read) <|> (do
b <- S.readTVar sealed
S.check b
return Nothing )
_send a = sendOrEnd a <* readTVar rSend
_recv = readOrEnd <* readTVar rRecv
return (Output _send, Input _recv, seal)
withSpawn :: Buffer a -> ((Output a, Input a) -> IO r) -> IO r
withSpawn buffer action = bracket
(spawn' buffer)
(\(_, _, seal) -> atomically seal)
(\(output, input, _) -> action (output, input))
withBuffer
:: Buffer a
-> (Output a -> IO l)
-> (Input a -> IO r)
-> IO (l, r)
withBuffer buffer fOutput fInput = bracket
(spawn' buffer)
(\(_, _, seal) -> atomically seal)
(\(output, input, seal) ->
Control.Concurrent.Async.concurrently
(fOutput output `Control.Exception.finally` atomically seal)
(fInput input `Control.Exception.finally` atomically seal)
)
data Buffer a
= Unbounded
| Bounded Int
| Single
| Latest a
| Newest Int
| New
unbounded :: Buffer a
unbounded = Unbounded
bounded :: Int -> Buffer a
bounded 1 = Single
bounded n = Bounded n
latest :: a -> Buffer a
latest = Latest
newest :: Int -> Buffer a
newest 1 = New
newest n = Newest n