{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
module System.IO.Streams.Concurrent
(
inputToChan
, chanToInput
, chanToOutput
, concurrentMerge
, makeChanPipe
) where
#if !MIN_VERSION_base(4,8,0)
import Control.Applicative ((<$>), (<*>))
#endif
import Control.Concurrent (forkIO)
import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
import Control.Concurrent.MVar (modifyMVar, newEmptyMVar, newMVar, putMVar, takeMVar)
import Control.Exception (SomeException, mask, throwIO, try)
import Control.Monad (forM_)
import Prelude hiding (read)
import System.IO.Streams.Internal (InputStream, OutputStream, makeInputStream, makeOutputStream, nullInput, read)
inputToChan :: InputStream a -> Chan (Maybe a) -> IO ()
inputToChan :: InputStream a -> Chan (Maybe a) -> IO ()
inputToChan InputStream a
is Chan (Maybe a)
ch = IO ()
go
where
go :: IO ()
go = do
Maybe a
mb <- InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
is
Chan (Maybe a) -> Maybe a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe a)
ch Maybe a
mb
IO () -> (a -> IO ()) -> Maybe a -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()) -> () -> IO ()
forall a b. (a -> b) -> a -> b
$! ()) (IO () -> a -> IO ()
forall a b. a -> b -> a
const IO ()
go) Maybe a
mb
chanToInput :: Chan (Maybe a) -> IO (InputStream a)
chanToInput :: Chan (Maybe a) -> IO (InputStream a)
chanToInput Chan (Maybe a)
ch = IO (Maybe a) -> IO (InputStream a)
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream (IO (Maybe a) -> IO (InputStream a))
-> IO (Maybe a) -> IO (InputStream a)
forall a b. (a -> b) -> a -> b
$! Chan (Maybe a) -> IO (Maybe a)
forall a. Chan a -> IO a
readChan Chan (Maybe a)
ch
chanToOutput :: Chan (Maybe a) -> IO (OutputStream a)
chanToOutput :: Chan (Maybe a) -> IO (OutputStream a)
chanToOutput = (Maybe a -> IO ()) -> IO (OutputStream a)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream ((Maybe a -> IO ()) -> IO (OutputStream a))
-> (Chan (Maybe a) -> Maybe a -> IO ())
-> Chan (Maybe a)
-> IO (OutputStream a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Chan (Maybe a) -> Maybe a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan
concurrentMerge :: [InputStream a] -> IO (InputStream a)
concurrentMerge :: [InputStream a] -> IO (InputStream a)
concurrentMerge [] = IO (InputStream a)
forall a. IO (InputStream a)
nullInput
concurrentMerge [InputStream a]
iss = do
MVar (Either SomeException (Maybe a))
mv <- IO (MVar (Either SomeException (Maybe a)))
forall a. IO (MVar a)
newEmptyMVar
MVar Int
nleft <- Int -> IO (MVar Int)
forall a. a -> IO (MVar a)
newMVar (Int -> IO (MVar Int)) -> Int -> IO (MVar Int)
forall a b. (a -> b) -> a -> b
$! [InputStream a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [InputStream a]
iss
((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> [InputStream a] -> (InputStream a -> IO ThreadId) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [InputStream a]
iss ((InputStream a -> IO ThreadId) -> IO ())
-> (InputStream a -> IO ThreadId) -> IO ()
forall a b. (a -> b) -> a -> b
$ \InputStream a
is -> IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
let producer :: IO ()
producer = do
Either SomeException (Maybe a)
emb <- IO (Maybe a) -> IO (Either SomeException (Maybe a))
forall e a. Exception e => IO a -> IO (Either e a)
try (IO (Maybe a) -> IO (Either SomeException (Maybe a)))
-> IO (Maybe a) -> IO (Either SomeException (Maybe a))
forall a b. (a -> b) -> a -> b
$ IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
restore (IO (Maybe a) -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
is
case Either SomeException (Maybe a)
emb of
Left SomeException
exc -> do MVar (Either SomeException (Maybe a))
-> Either SomeException (Maybe a) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Maybe a))
mv (SomeException -> Either SomeException (Maybe a)
forall a b. a -> Either a b
Left (SomeException
exc :: SomeException))
IO ()
producer
Right Maybe a
Nothing -> MVar (Either SomeException (Maybe a))
-> Either SomeException (Maybe a) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Maybe a))
mv (Either SomeException (Maybe a) -> IO ())
-> Either SomeException (Maybe a) -> IO ()
forall a b. (a -> b) -> a -> b
$! Maybe a -> Either SomeException (Maybe a)
forall a b. b -> Either a b
Right Maybe a
forall a. Maybe a
Nothing
Right Maybe a
x -> MVar (Either SomeException (Maybe a))
-> Either SomeException (Maybe a) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Maybe a))
mv (Maybe a -> Either SomeException (Maybe a)
forall a b. b -> Either a b
Right Maybe a
x) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
producer
IO ()
producer
IO (Maybe a) -> IO (InputStream a)
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream (IO (Maybe a) -> IO (InputStream a))
-> IO (Maybe a) -> IO (InputStream a)
forall a b. (a -> b) -> a -> b
$ MVar (Either SomeException (Maybe a)) -> MVar Int -> IO (Maybe a)
forall e a a.
(Exception e, Ord a, Num a) =>
MVar (Either e (Maybe a)) -> MVar a -> IO (Maybe a)
chunk MVar (Either SomeException (Maybe a))
mv MVar Int
nleft
where
chunk :: MVar (Either e (Maybe a)) -> MVar a -> IO (Maybe a)
chunk MVar (Either e (Maybe a))
mv MVar a
nleft = do
Either e (Maybe a)
emb <- MVar (Either e (Maybe a)) -> IO (Either e (Maybe a))
forall a. MVar a -> IO a
takeMVar MVar (Either e (Maybe a))
mv
case Either e (Maybe a)
emb of
Left e
exc -> e -> IO (Maybe a)
forall e a. Exception e => e -> IO a
throwIO e
exc
Right Maybe a
Nothing -> do a
x <- MVar a -> (a -> IO (a, a)) -> IO a
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar a
nleft ((a -> IO (a, a)) -> IO a) -> (a -> IO (a, a)) -> IO a
forall a b. (a -> b) -> a -> b
$ \a
n ->
let !n' :: a
n' = a
n a -> a -> a
forall a. Num a => a -> a -> a
- a
1
in (a, a) -> IO (a, a)
forall (m :: * -> *) a. Monad m => a -> m a
return ((a, a) -> IO (a, a)) -> (a, a) -> IO (a, a)
forall a b. (a -> b) -> a -> b
$! (a
n', a
n')
if a
x a -> a -> Bool
forall a. Ord a => a -> a -> Bool
> a
0
then MVar (Either e (Maybe a)) -> MVar a -> IO (Maybe a)
chunk MVar (Either e (Maybe a))
mv MVar a
nleft
else Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
Right Maybe a
x -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
x
makeChanPipe :: IO (InputStream a, OutputStream a)
makeChanPipe :: IO (InputStream a, OutputStream a)
makeChanPipe = do
Chan (Maybe a)
chan <- IO (Chan (Maybe a))
forall a. IO (Chan a)
newChan
(,) (InputStream a
-> OutputStream a -> (InputStream a, OutputStream a))
-> IO (InputStream a)
-> IO (OutputStream a -> (InputStream a, OutputStream a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Chan (Maybe a) -> IO (InputStream a)
forall a. Chan (Maybe a) -> IO (InputStream a)
chanToInput Chan (Maybe a)
chan IO (OutputStream a -> (InputStream a, OutputStream a))
-> IO (OutputStream a) -> IO (InputStream a, OutputStream a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Chan (Maybe a) -> IO (OutputStream a)
forall a. Chan (Maybe a) -> IO (OutputStream a)
chanToOutput Chan (Maybe a)
chan