-- |
-- Module      : Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer
    (
    -- * Read Output
      readOutputQPaced
    , readOutputQBounded

    -- * Postprocess Hook After Reading
    , postProcessPaced
    , postProcessBounded
    )
where

import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (readIORef)
import Streamly.Internal.Control.Concurrent (MonadRunInIO)

import Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Stream.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Channel.Types

-------------------------------------------------------------------------------
-- Reading from the workers' output queue/buffer
-------------------------------------------------------------------------------

{-# INLINE readOutputQChan #-}
readOutputQChan :: Channel m a -> IO ([ChildEvent a], Int)
readOutputQChan :: forall (m :: * -> *) a. Channel m a -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a
sv = do
    let ss :: Maybe SVarStats
ss = if forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv then forall a. a -> Maybe a
Just (forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv) else forall a. Maybe a
Nothing
     in forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv) Maybe SVarStats
ss

readOutputQBounded :: MonadRunInIO m => Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded :: forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
eagerEval Channel m a
sv = do
    ([ChildEvent a]
list, Int
len) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a
sv
    -- When there is no output seen we dispatch more workers to help
    -- out if there is work pending in the work queue.
    if Int
len forall a. Ord a => a -> a -> Bool
<= Int
0
    then m [ChildEvent a]
blockingRead
    else do
        -- send a worker proactively, if needed, even before we start
        -- processing the output.  This may degrade single processor
        -- perf but improves multi-processor, because of more
        -- parallelism
        m ()
sendOneWorker
        forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
list

    where

    sendOneWorker :: m ()
sendOneWorker = do
        Int
cnt <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
cnt forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$ do
            Bool
done <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone Channel m a
sv
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
0 Channel m a
sv)

    {-# INLINE blockingRead #-}
    blockingRead :: m [ChildEvent a]
blockingRead = do
        forall (m :: * -> *) a.
MonadIO m =>
Bool
-> (Channel m a -> IO ())
-> (Channel m a -> m Bool)
-> Channel m a
-> m ()
sendWorkerWait Bool
eagerEval forall (m :: * -> *) a. Channel m a -> IO ()
sendWorkerDelay (forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
0) Channel m a
sv
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a b. (a, b) -> a
fst forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` forall (m :: * -> *) a. Channel m a -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a
sv)

readOutputQPaced :: MonadRunInIO m => Channel m a -> m [ChildEvent a]
readOutputQPaced :: forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced Channel m a
sv = do
    ([ChildEvent a]
list, Int
len) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a
sv
    if Int
len forall a. Ord a => a -> a -> Bool
<= Int
0
    then m [ChildEvent a]
blockingRead
    else do
        -- XXX send a worker proactively, if needed, even before we start
        -- processing the output.
        forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
dispatchWorkerPaced Channel m a
sv
        forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
list

    where

    {-# INLINE blockingRead #-}
    blockingRead :: m [ChildEvent a]
blockingRead = do
        forall (m :: * -> *) a.
MonadIO m =>
Bool
-> (Channel m a -> IO ())
-> (Channel m a -> m Bool)
-> Channel m a
-> m ()
sendWorkerWait Bool
False forall (m :: * -> *) a. Channel m a -> IO ()
sendWorkerDelayPaced forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
dispatchWorkerPaced Channel m a
sv
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a b. (a, b) -> a
fst forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` forall (m :: * -> *) a. Channel m a -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a
sv)

postProcessPaced :: MonadRunInIO m => Channel m a -> m Bool
postProcessPaced :: forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced Channel m a
sv = do
    Bool
workersDone <- forall (m :: * -> *). MonadIO m => IORef (Set ThreadId) -> m Bool
allThreadsDone (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
    -- XXX If during consumption we figure out we are getting delayed then we
    -- should trigger dispatch there as well.  We should try to check on the
    -- workers after consuming every n item from the buffer?
    if Bool
workersDone
    then do
        Bool
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone Channel m a
sv
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) forall a b. (a -> b) -> a -> b
$ do
            forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
dispatchWorkerPaced Channel m a
sv
            -- Note that we need to guarantee a worker since the work is not
            -- finished, therefore we cannot just rely on dispatchWorkerPaced
            -- which may or may not send a worker.
            Bool
noWorker <- forall (m :: * -> *). MonadIO m => IORef (Set ThreadId) -> m Bool
allThreadsDone (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads  Channel m a
sv)
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
noWorker forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
0 Channel m a
sv
        forall (m :: * -> *) a. Monad m => a -> m a
return Bool
r
    else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

postProcessBounded :: MonadRunInIO m => Channel m a -> m Bool
postProcessBounded :: forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded Channel m a
sv = do
    Bool
workersDone <- forall (m :: * -> *). MonadIO m => IORef (Set ThreadId) -> m Bool
allThreadsDone (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
    -- There may still be work pending even if there are no workers pending
    -- because all the workers may return if the outputQueue becomes full. In
    -- that case send off a worker to kickstart the work again.
    --
    -- Note that isWorkDone can only be safely checked if all workers are done.
    -- When some workers are in progress they may have decremented the yield
    -- Limit and later ending up incrementing it again. If we look at the yield
    -- limit in that window we may falsely say that it is 0 and therefore we
    -- are done.
    if Bool
workersDone
    then do
        Bool
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone Channel m a
sv
        -- Note that we need to guarantee a worker, therefore we cannot just
        -- use dispatchWorker which may or may not send a worker.
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) (forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
0 Channel m a
sv)
        -- XXX do we need to dispatch many here?
        -- void $ dispatchWorker sv
        forall (m :: * -> *) a. Monad m => a -> m a
return Bool
r
    else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False