module System.IO.Streams.Concurrent.Unagi
(
inputToChan
, chanToInput
, chanToOutput
, concurrentMerge
, makeChanPipe
) where
import Control.Applicative ((<$>), (<*>))
import Control.Concurrent (forkIO)
import Control.Concurrent.Chan.Unagi (InChan, OutChan, newChan,
readChan, writeChan)
import Control.Concurrent.MVar (modifyMVar, newEmptyMVar,
newMVar, putMVar, takeMVar)
import Control.Exception (SomeException, mask, throwIO,
try)
import Control.Monad (forM_)
import Prelude hiding (read)
import System.IO.Streams.Internal (InputStream, OutputStream,
makeInputStream,
makeOutputStream, read)
inputToChan :: InputStream a -> InChan (Maybe a) -> IO ()
inputToChan is ch = go
where
go = do
mb <- read is
writeChan ch mb
maybe (return $! ()) (const go) mb
chanToInput :: OutChan (Maybe a) -> IO (InputStream a)
chanToInput ch = makeInputStream $! readChan ch
chanToOutput :: InChan (Maybe a) -> IO (OutputStream a)
chanToOutput = makeOutputStream . writeChan
concurrentMerge :: [InputStream a] -> IO (InputStream a)
concurrentMerge iss = do
mv <- newEmptyMVar
nleft <- newMVar $! length iss
mask $ \restore -> forM_ iss $ \is -> forkIO $ do
let producer = do
emb <- try $ restore $ read is
case emb of
Left exc -> do putMVar mv (Left (exc :: SomeException))
producer
Right Nothing -> putMVar mv $! Right Nothing
Right x -> putMVar mv (Right x) >> producer
producer
makeInputStream $ chunk mv nleft
where
chunk mv nleft = do
emb <- takeMVar mv
case emb of
Left exc -> throwIO exc
Right Nothing -> do x <- modifyMVar nleft $ \n ->
let !n' = n 1
in return $! (n', n')
if x > 0
then chunk mv nleft
else return Nothing
Right x -> return x
makeChanPipe :: IO (InputStream a, OutputStream a)
makeChanPipe = do
(inChan, outChan) <- newChan
(,) <$> chanToInput outChan <*> chanToOutput inChan