-- | Stream utilities for working with concurrent channels.

{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP          #-}

module System.IO.Streams.Concurrent
 ( -- * Channel conversions
   inputToChan
 , chanToInput
 , chanToOutput
 , concurrentMerge
 , makeChanPipe
 ) where

------------------------------------------------------------------------------
#if !MIN_VERSION_base(4,8,0)
import           Control.Applicative        ((<$>), (<*>))
#endif
import           Control.Concurrent         (forkIO)
import           Control.Concurrent.Chan    (Chan, newChan, readChan, writeChan)
import           Control.Concurrent.MVar    (modifyMVar, newEmptyMVar, newMVar, putMVar, takeMVar)
import           Control.Exception          (SomeException, mask, throwIO, try)
import           Control.Monad              (forM_)
import           Prelude                    hiding (read)
------------------------------------------------------------------------------
import           System.IO.Streams.Internal (InputStream, OutputStream, makeInputStream, makeOutputStream, nullInput, read)


------------------------------------------------------------------------------
-- | Writes the contents of an input stream to a channel until the input stream
-- yields end-of-stream.
inputToChan :: InputStream a -> Chan (Maybe a) -> IO ()
inputToChan :: InputStream a -> Chan (Maybe a) -> IO ()
inputToChan InputStream a
is Chan (Maybe a)
ch = IO ()
go
  where
    go :: IO ()
go = do
        Maybe a
mb <- InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
is
        Chan (Maybe a) -> Maybe a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe a)
ch Maybe a
mb
        IO () -> (a -> IO ()) -> Maybe a -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return (() -> IO ()) -> () -> IO ()
forall a b. (a -> b) -> a -> b
$! ()) (IO () -> a -> IO ()
forall a b. a -> b -> a
const IO ()
go) Maybe a
mb


------------------------------------------------------------------------------
-- | Turns a 'Chan' into an input stream.
--
chanToInput :: Chan (Maybe a) -> IO (InputStream a)
chanToInput :: Chan (Maybe a) -> IO (InputStream a)
chanToInput Chan (Maybe a)
ch = IO (Maybe a) -> IO (InputStream a)
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream (IO (Maybe a) -> IO (InputStream a))
-> IO (Maybe a) -> IO (InputStream a)
forall a b. (a -> b) -> a -> b
$! Chan (Maybe a) -> IO (Maybe a)
forall a. Chan a -> IO a
readChan Chan (Maybe a)
ch


------------------------------------------------------------------------------
-- | Turns a 'Chan' into an output stream.
--
chanToOutput :: Chan (Maybe a) -> IO (OutputStream a)
chanToOutput :: Chan (Maybe a) -> IO (OutputStream a)
chanToOutput = (Maybe a -> IO ()) -> IO (OutputStream a)
forall a. (Maybe a -> IO ()) -> IO (OutputStream a)
makeOutputStream ((Maybe a -> IO ()) -> IO (OutputStream a))
-> (Chan (Maybe a) -> Maybe a -> IO ())
-> Chan (Maybe a)
-> IO (OutputStream a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Chan (Maybe a) -> Maybe a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan


------------------------------------------------------------------------------
-- | Concurrently merges a list of 'InputStream's, combining values in the
-- order they become available.
--
-- Note: does /not/ forward individual end-of-stream notifications, the
-- produced stream does not yield end-of-stream until all of the input streams
-- have finished.
--
-- Any exceptions raised in one of the worker threads will be trapped and
-- re-raised in the current thread.
--
-- If the supplied list is empty, `concurrentMerge` will return an empty
-- stream. (/Since: 1.5.0.1/)
--
concurrentMerge :: [InputStream a] -> IO (InputStream a)
concurrentMerge :: [InputStream a] -> IO (InputStream a)
concurrentMerge [] = IO (InputStream a)
forall a. IO (InputStream a)
nullInput
concurrentMerge [InputStream a]
iss = do
    MVar (Either SomeException (Maybe a))
mv    <- IO (MVar (Either SomeException (Maybe a)))
forall a. IO (MVar a)
newEmptyMVar
    MVar Int
nleft <- Int -> IO (MVar Int)
forall a. a -> IO (MVar a)
newMVar (Int -> IO (MVar Int)) -> Int -> IO (MVar Int)
forall a b. (a -> b) -> a -> b
$! [InputStream a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [InputStream a]
iss
    ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> [InputStream a] -> (InputStream a -> IO ThreadId) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [InputStream a]
iss ((InputStream a -> IO ThreadId) -> IO ())
-> (InputStream a -> IO ThreadId) -> IO ()
forall a b. (a -> b) -> a -> b
$ \InputStream a
is -> IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
        let producer :: IO ()
producer = do
              Either SomeException (Maybe a)
emb <- IO (Maybe a) -> IO (Either SomeException (Maybe a))
forall e a. Exception e => IO a -> IO (Either e a)
try (IO (Maybe a) -> IO (Either SomeException (Maybe a)))
-> IO (Maybe a) -> IO (Either SomeException (Maybe a))
forall a b. (a -> b) -> a -> b
$ IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
restore (IO (Maybe a) -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ InputStream a -> IO (Maybe a)
forall a. InputStream a -> IO (Maybe a)
read InputStream a
is
              case Either SomeException (Maybe a)
emb of
                  Left SomeException
exc      -> do MVar (Either SomeException (Maybe a))
-> Either SomeException (Maybe a) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Maybe a))
mv (SomeException -> Either SomeException (Maybe a)
forall a b. a -> Either a b
Left (SomeException
exc :: SomeException))
                                      IO ()
producer
                  Right Maybe a
Nothing -> MVar (Either SomeException (Maybe a))
-> Either SomeException (Maybe a) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Maybe a))
mv (Either SomeException (Maybe a) -> IO ())
-> Either SomeException (Maybe a) -> IO ()
forall a b. (a -> b) -> a -> b
$! Maybe a -> Either SomeException (Maybe a)
forall a b. b -> Either a b
Right Maybe a
forall a. Maybe a
Nothing
                  Right Maybe a
x       -> MVar (Either SomeException (Maybe a))
-> Either SomeException (Maybe a) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException (Maybe a))
mv (Maybe a -> Either SomeException (Maybe a)
forall a b. b -> Either a b
Right Maybe a
x) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
producer
        IO ()
producer
    IO (Maybe a) -> IO (InputStream a)
forall a. IO (Maybe a) -> IO (InputStream a)
makeInputStream (IO (Maybe a) -> IO (InputStream a))
-> IO (Maybe a) -> IO (InputStream a)
forall a b. (a -> b) -> a -> b
$ MVar (Either SomeException (Maybe a)) -> MVar Int -> IO (Maybe a)
forall e a a.
(Exception e, Ord a, Num a) =>
MVar (Either e (Maybe a)) -> MVar a -> IO (Maybe a)
chunk MVar (Either SomeException (Maybe a))
mv MVar Int
nleft

  where
    chunk :: MVar (Either e (Maybe a)) -> MVar a -> IO (Maybe a)
chunk MVar (Either e (Maybe a))
mv MVar a
nleft = do
        Either e (Maybe a)
emb <- MVar (Either e (Maybe a)) -> IO (Either e (Maybe a))
forall a. MVar a -> IO a
takeMVar MVar (Either e (Maybe a))
mv
        case Either e (Maybe a)
emb of
            Left e
exc      -> e -> IO (Maybe a)
forall e a. Exception e => e -> IO a
throwIO e
exc
            Right Maybe a
Nothing -> do a
x <- MVar a -> (a -> IO (a, a)) -> IO a
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar a
nleft ((a -> IO (a, a)) -> IO a) -> (a -> IO (a, a)) -> IO a
forall a b. (a -> b) -> a -> b
$ \a
n ->
                                     let !n' :: a
n' = a
n a -> a -> a
forall a. Num a => a -> a -> a
- a
1
                                     in (a, a) -> IO (a, a)
forall (m :: * -> *) a. Monad m => a -> m a
return ((a, a) -> IO (a, a)) -> (a, a) -> IO (a, a)
forall a b. (a -> b) -> a -> b
$! (a
n', a
n')
                                if a
x a -> a -> Bool
forall a. Ord a => a -> a -> Bool
> a
0
                                  then MVar (Either e (Maybe a)) -> MVar a -> IO (Maybe a)
chunk MVar (Either e (Maybe a))
mv MVar a
nleft
                                  else Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
            Right Maybe a
x       -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
x


--------------------------------------------------------------------------------
-- | Create a new pair of streams using an underlying 'Chan'. Everything written
-- to the 'OutputStream' will appear as-is on the 'InputStream'.
--
-- Since reading from the 'InputStream' and writing to the 'OutputStream' are
-- blocking calls, be sure to do so in different threads.
makeChanPipe :: IO (InputStream a, OutputStream a)
makeChanPipe :: IO (InputStream a, OutputStream a)
makeChanPipe = do
    Chan (Maybe a)
chan <- IO (Chan (Maybe a))
forall a. IO (Chan a)
newChan
    (,) (InputStream a
 -> OutputStream a -> (InputStream a, OutputStream a))
-> IO (InputStream a)
-> IO (OutputStream a -> (InputStream a, OutputStream a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Chan (Maybe a) -> IO (InputStream a)
forall a. Chan (Maybe a) -> IO (InputStream a)
chanToInput Chan (Maybe a)
chan IO (OutputStream a -> (InputStream a, OutputStream a))
-> IO (OutputStream a) -> IO (InputStream a, OutputStream a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Chan (Maybe a) -> IO (OutputStream a)
forall a. Chan (Maybe a) -> IO (OutputStream a)
chanToOutput Chan (Maybe a)
chan