-- |
-- Module      : Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
    (
    -- * Dispatching
      pushWorker
    , dispatchWorker
    , dispatchWorkerPaced
    , sendWorkerWait
    , startChannel
    , sendWorkerDelay
    , sendWorkerDelayPaced
    )
where

import Control.Concurrent (takeMVar, threadDelay)
import Control.Exception (assert)
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Maybe (fromJust, fromMaybe)
import Data.IORef (modifyIORef, newIORef, readIORef, writeIORef)
import Streamly.Internal.Control.Concurrent (MonadRunInIO)
import Streamly.Internal.Control.ForkLifted (doFork)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_, storeLoadBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
       (MicroSecond64(..), diffAbsTime64, fromRelTime64, toRelTime64)

import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Channel.Dispatcher
import Streamly.Internal.Data.Channel.Types
import Streamly.Internal.Data.Channel.Worker

-------------------------------------------------------------------------------
-- Dispatching workers
-------------------------------------------------------------------------------

{-# NOINLINE pushWorker #-}
pushWorker :: MonadRunInIO m => Count -> Channel m a -> m ()
pushWorker :: forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
yieldMax Channel m a
sv = do
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv) ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv)
        (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> SVarStats -> m ()
forall (m :: * -> *). MonadIO m => IORef Int -> SVarStats -> m ()
recordMaxWorkers (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv) (Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)
    -- This allocation matters when significant number of workers are being
    -- sent. We allocate it only when needed.
    Maybe WorkerInfo
winfo <-
        case Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv of
            Maybe YieldRateInfo
Nothing -> Maybe WorkerInfo -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe WorkerInfo
forall a. Maybe a
Nothing
            Just YieldRateInfo
_ -> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo))
-> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ do
                IORef Count
cntRef <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
0
                AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
                IORef (Count, AbsTime)
lat <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
                Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe WorkerInfo -> IO (Maybe WorkerInfo))
-> Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ WorkerInfo -> Maybe WorkerInfo
forall a. a -> Maybe a
Just WorkerInfo :: Count -> IORef Count -> IORef (Count, AbsTime) -> WorkerInfo
WorkerInfo
                    { workerYieldMax :: Count
workerYieldMax = Count
yieldMax
                    , workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
                    , workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
                    }
    -- In case of lazy dispatch we dispatch workers only from the consumer
    -- thread. In that case it is ok to use addThread here as it is guaranteed
    -- that the thread will be added to the workerSet before the thread STOP
    -- event is processed, because we do both of these actions in the same
    -- consumer thread. However, in case of eager dispatch we may dispatch
    -- workers from workers, in which case the thread Stop even may get
    -- processed before the addThread occurs, so in that case we have to use
    -- modifyThread which performs a toggle rather than adding or deleting.
    --
    -- XXX We can use addThread or modThread based on eager flag.
    m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> m ()
workLoop Channel m a
sv Maybe WorkerInfo
winfo) (Channel m a -> RunInIO m
forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv) SomeException -> IO ()
exception m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ThreadId -> m ()
modThread

    where

    modThread :: ThreadId -> m ()
modThread = IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
modifyThread (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
    exception :: SomeException -> IO ()
exception = IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
handleChildException (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)

-- | Determine the maximum number of workers required based on 'maxWorkerLimit'
-- and 'remainingWork'.
{-# INLINE getEffectiveWorkerLimit #-}
getEffectiveWorkerLimit :: MonadIO m => Channel m a -> m Limit
getEffectiveWorkerLimit :: forall (m :: * -> *) a. MonadIO m => Channel m a -> m Limit
getEffectiveWorkerLimit Channel m a
sv = do
    let workerLimit :: Limit
workerLimit = Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv
    case Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
        Maybe (IORef Count)
Nothing -> Limit -> m Limit
forall (m :: * -> *) a. Monad m => a -> m a
return Limit
workerLimit
        Just IORef Count
ref -> do
            Count
n <- IO Count -> m Count
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Count -> m Count) -> IO Count -> m Count
forall a b. (a -> b) -> a -> b
$ IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
            case Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv of
                Just YieldRateInfo
_ -> Limit -> m Limit
forall (m :: * -> *) a. Monad m => a -> m a
return Limit
workerLimit
                Maybe YieldRateInfo
Nothing ->
                    Limit -> m Limit
forall (m :: * -> *) a. Monad m => a -> m a
return (Limit -> m Limit) -> Limit -> m Limit
forall a b. (a -> b) -> a -> b
$
                        case Limit
workerLimit of
                            Limit
Unlimited -> Word -> Limit
Limited (Count -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
n)
                            Limited Word
lim -> Word -> Limit
Limited (Word -> Limit) -> Word -> Limit
forall a b. (a -> b) -> a -> b
$ Word -> Word -> Word
forall a. Ord a => a -> a -> a
min Word
lim (Count -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
n)

-- | Determine whether the active threads are more than the max threads we are
-- allowed to dispatch.
{-# INLINE checkMaxThreads #-}
checkMaxThreads :: MonadIO m => Int -> Channel m a -> m Bool
checkMaxThreads :: forall (m :: * -> *) a. MonadIO m => Int -> Channel m a -> m Bool
checkMaxThreads Int
active Channel m a
sv = do
    -- Note that we may deadlock if the previous workers (tasks in the
    -- stream) wait/depend on the future workers (tasks in the stream)
    -- executing. In that case we should either configure the maxWorker
    -- count to higher or use parallel style instead of ahead or async
    -- style.
    Limit
limit <- Channel m a -> m Limit
forall (m :: * -> *) a. MonadIO m => Channel m a -> m Limit
getEffectiveWorkerLimit Channel m a
sv
    Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return
        (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ case Limit
limit of
            Limit
Unlimited -> Bool
True
            -- Note that the use of remainingWork and workerCount is not
            -- atomic and the counts may even have changed between reading
            -- and using them here, so this is just approximate logic and
            -- we cannot rely on it for correctness. We may actually
            -- dispatch more workers than required.
            Limited Word
lim -> Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
active

-- | Determine whether we would exceed max buffer if we dispatch more workers
-- based on the current outputQueue size and active workers.
{-# INLINE checkMaxBuffer #-}
checkMaxBuffer :: MonadIO m => Int -> Channel m a -> m Bool
checkMaxBuffer :: forall (m :: * -> *) a. MonadIO m => Int -> Channel m a -> m Bool
checkMaxBuffer Int
active Channel m a
sv = do
    let limit :: Limit
limit = Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit Channel m a
sv
    case Limit
limit of
        Limit
Unlimited -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Limited Word
lim -> do
            ([ChildEvent a]
_, Int
n) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
            Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
active

dispatchWorker :: MonadRunInIO m =>
    Count -> Channel m a -> m Bool
dispatchWorker :: forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
yieldCount Channel m a
sv = do
    -- XXX in case of Ahead streams we should not send more than one worker
    -- when the work queue is done but heap is not done.
    -- XXX Should we have a single abstraction for checking q and
    -- work instead checking the two separately?
    Bool
done <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> IO Bool
forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone Channel m a
sv
    -- Note, "done" may not mean that the work is actually finished if there
    -- are workers active, because there may be a worker which has not yet
    -- queued the leftover work.
    if Bool -> Bool
not Bool
done
    then do
        Bool
qDone <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> IO Bool
forall (m :: * -> *) a. Channel m a -> IO Bool
isQueueDone Channel m a
sv
        -- This count may be more until the sendStop events are processed.
        Int
active <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ [Char] -> m ()
forall a. HasCallStack => [Char] -> a
error [Char]
"dispatchWorker active negative"
        if Bool -> Bool
not Bool
qDone
        then do
            -- XXX for ahead streams shall we take the heap yields into account
            -- for controlling the dispatch? We should not dispatch if the heap
            -- has already got the limit covered.
            Bool
r <- Int -> Channel m a -> m Bool
forall (m :: * -> *) a. MonadIO m => Int -> Channel m a -> m Bool
checkMaxThreads Int
active Channel m a
sv
            if Bool
r
            then do
                Bool
r1 <- Int -> Channel m a -> m Bool
forall (m :: * -> *) a. MonadIO m => Int -> Channel m a -> m Bool
checkMaxBuffer Int
active Channel m a
sv
                if Bool
r1
                then Count -> Channel m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
yieldCount Channel m a
sv m () -> m Bool -> m Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        else do
            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                Bool
r <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> IO Bool
forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone Channel m a
sv
                Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> Channel m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
0 Channel m a
sv
            Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

-- XXX in case of ahead style stream we need to take the heap size into account
-- because we return the workers on the basis of that which causes a condition
-- where we keep dispatching and they keep returning. So we must have exactly
-- the same logic for not dispatching and for returning.
--
-- Returns:
-- True: can dispatch more
-- False: full, no more dispatches
dispatchWorkerPaced :: MonadRunInIO m =>
    Channel m a -> m Bool
dispatchWorkerPaced :: forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
dispatchWorkerPaced Channel m a
sv = do
    let yinfo :: YieldRateInfo
yinfo = Maybe YieldRateInfo -> YieldRateInfo
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe YieldRateInfo -> YieldRateInfo)
-> Maybe YieldRateInfo -> YieldRateInfo
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv
    (Count
svarYields, NanoSecond64
svarElapsed, NanoSecond64
wLatency) <- do
        AbsTime
now <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
        (Count
yieldCount, AbsTime
baseTime, NanoSecond64
lat) <-
            IO (Count, AbsTime, NanoSecond64)
-> m (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                (IO (Count, AbsTime, NanoSecond64)
 -> m (Count, AbsTime, NanoSecond64))
-> IO (Count, AbsTime, NanoSecond64)
-> m (Count, AbsTime, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ Bool
-> SVarStats
-> YieldRateInfo
-> Bool
-> IO (Count, AbsTime, NanoSecond64)
collectLatency
                    (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) (Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv) YieldRateInfo
yinfo Bool
False
        let elapsed :: NanoSecond64
elapsed = RelTime64 -> NanoSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (RelTime64 -> NanoSecond64) -> RelTime64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
baseTime
        let latency :: NanoSecond64
latency =
                if NanoSecond64
lat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0
                then NanoSecond64 -> Maybe NanoSecond64 -> NanoSecond64
forall a. a -> Maybe a -> a
fromMaybe NanoSecond64
lat (YieldRateInfo -> Maybe NanoSecond64
workerBootstrapLatency YieldRateInfo
yinfo)
                else NanoSecond64
lat

        (Count, NanoSecond64, NanoSecond64)
-> m (Count, NanoSecond64, NanoSecond64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
yieldCount, NanoSecond64
elapsed, NanoSecond64
latency)

    if NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0
    -- Need to measure the latency with a single worker before we can perform
    -- any computation.
    then Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    else do
        let workerLimit :: Limit
workerLimit = Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv
        let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo
        let range :: LatencyRange
range = YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
yinfo
        Count
gainLoss <- IO Count -> m Count
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Count -> m Count) -> IO Count -> m Count
forall a b. (a -> b) -> a -> b
$ IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
        let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
gainLoss NanoSecond64
svarElapsed
                                   NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range

        -- XXX we need to take yieldLimit into account here. If we are at the
        -- end of the limit as well as the time, we should not be sleeping.
        -- If we are not actually planning to dispatch any more workers we need
        -- to take that in account.
        case Work
work of
            BlockWait NanoSecond64
s -> do
                Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (NanoSecond64
s NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                -- XXX note that when we return from here we will block waiting
                -- for the result from the existing worker. If that takes too
                -- long we won't be able to send another worker until the
                -- result arrives.
                --
                -- Sleep only if there are no active workers, otherwise we will
                -- defer the output of those. Note we cannot use workerCount
                -- here as it is not a reliable way to ensure there are
                -- definitely no active workers. When workerCount is 0 we may
                -- still have a Stop event waiting in the outputQueue.
                Bool
done <- IORef (Set ThreadId) -> m Bool
forall (m :: * -> *). MonadIO m => IORef (Set ThreadId) -> m Bool
allThreadsDone (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
                Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ do
                    let us :: MicroSecond64
us = RelTime64 -> MicroSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (NanoSecond64 -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 NanoSecond64
s) :: MicroSecond64
                    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (MicroSecond64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral MicroSecond64
us)
                    Count -> Channel m a -> m Bool
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
1 Channel m a
sv
                Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            PartialWorker Count
yields -> do
                Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                YieldRateInfo -> Count -> m ()
forall {f :: * -> *}. MonadIO f => YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields

                Bool
done <- IORef (Set ThreadId) -> m Bool
forall (m :: * -> *). MonadIO m => IORef (Set ThreadId) -> m Bool
allThreadsDone (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
                Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> Channel m a -> m Bool
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
yields Channel m a
sv
                Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            ManyWorkers Int
netWorkers Count
yields -> do
                Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Int
netWorkers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
0) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                YieldRateInfo -> Count -> m ()
forall {f :: * -> *}. MonadIO f => YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields

                let periodRef :: IORef Count
periodRef = YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo
                    ycnt :: Count
ycnt = Count -> Count -> Count
forall a. Ord a => a -> a -> a
max Count
1 (Count -> Count) -> Count -> Count
forall a b. (a -> b) -> a -> b
$ Count
yields Count -> Count -> Count
forall a. Integral a => a -> a -> a
`div` Int -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
netWorkers
                    period :: Count
period = Count -> Count -> Count
forall a. Ord a => a -> a -> a
min Count
ycnt (Word -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)

                Count
old <- IO Count -> m Count
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Count -> m Count) -> IO Count -> m Count
forall a b. (a -> b) -> a -> b
$ IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
periodRef
                Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
period Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
< Count
old) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
                    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Count -> Count -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Count
periodRef Count
period

                Int
cnt <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv
                if Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
netWorkers
                then do
                    let total :: Int
total = Int
netWorkers Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
cnt
                        batch :: Int
batch = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ NanoSecond64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (NanoSecond64 -> Int) -> NanoSecond64 -> Int
forall a b. (a -> b) -> a -> b
$
                                    NanoSecond64
minThreadDelay NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
targetLat
                    -- XXX stagger the workers over a period?
                    -- XXX cannot sleep, as that would mean we cannot process
                    -- the outputs. need to try a different mechanism to
                    -- stagger.
                    -- when (total > batch) $
                       -- liftIO $ threadDelay $ nanoToMicroSecs minThreadDelay
                    Int -> m Bool
forall {t}. (Eq t, Num t) => t -> m Bool
dispatchN (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
total Int
batch)
                else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

    where

    updateGainedLostYields :: YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields = do
        let buf :: Count
buf = Int -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Count) -> Int -> Count
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> Int
svarRateBuffer YieldRateInfo
yinfo
        Bool -> f () -> f ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
yields Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& Count -> Count
forall a. Num a => a -> a
abs Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
buf) (f () -> f ()) -> f () -> f ()
forall a b. (a -> b) -> a -> b
$ do
            let delta :: Count
delta =
                   if Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
                   then Count
yields Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
buf
                   else Count
yields Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
buf
            IO () -> f ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> f ()) -> IO () -> f ()
forall a b. (a -> b) -> a -> b
$ IORef Count -> (Count -> Count) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
modifyIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo) (Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
delta)

    dispatchN :: t -> m Bool
dispatchN t
n =
        if t
n t -> t -> Bool
forall a. Eq a => a -> a -> Bool
== t
0
        then Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        else do
            Bool
r <- Count -> Channel m a -> m Bool
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
0 Channel m a
sv
            if Bool
r
            then t -> m Bool
dispatchN (t
n t -> t -> t
forall a. Num a => a -> a -> a
- t
1)
            else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

{-# NOINLINE sendWorkerWait #-}
sendWorkerWait
    :: MonadIO m
    => Bool
    -> (Channel m a -> IO ())
    -> (Channel m a -> m Bool)
    -> Channel m a
    -> m ()
sendWorkerWait :: forall (m :: * -> *) a.
MonadIO m =>
Bool
-> (Channel m a -> IO ())
-> (Channel m a -> m Bool)
-> Channel m a
-> m ()
sendWorkerWait Bool
eagerEval Channel m a -> IO ()
delay Channel m a -> m Bool
dispatch Channel m a
sv = m ()
go

    where

    go :: m ()
go = do

        -- Note that we are guaranteed to have at least one outstanding worker
        -- when we enter this function. So if we sleep we are guaranteed to be
        -- woken up by an outputDoorBell, when the worker exits.

        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> IO ()
delay Channel m a
sv
        ([ChildEvent a]
_, Int
n) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 Bool -> Bool -> Bool
|| Bool
eagerEval) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            -- The queue may be empty temporarily if the worker has dequeued
            -- the work item but has not enqueued the remaining part yet. For
            -- the same reason, a worker may come back if it tries to dequeue
            -- and finds the queue empty, even though the whole work has not
            -- finished yet.

            -- If we find that the queue is empty, but it may be empty
            -- temporarily, when we checked it. If that's the case we might
            -- sleep indefinitely unless the active workers produce some
            -- output. We may deadlock specially if the otuput from the active
            -- workers depends on the future workers that we may never send.
            -- So in case the queue was temporarily empty set a flag to inform
            -- the enqueue to send us a doorbell.

            -- Note that this is just a best effort mechanism to avoid a
            -- deadlock. Deadlocks may still happen if for some weird reason
            -- the consuming computation shares an MVar or some other resource
            -- with the producing computation and gets blocked on that resource
            -- and therefore cannot do any pushworker to add more threads to
            -- the producer. In such cases the programmer should use a parallel
            -- style so that all the producers are scheduled immediately and
            -- unconditionally. We can also use a separate monitor thread to
            -- push workers instead of pushing them from the consumer, but then
            -- we are no longer using pull based concurrency rate adaptation.
            --
            -- XXX update this in the tutorial.
            --
            -- Having pending active workers does not mean that we are
            -- guaranteed to be woken up if we sleep. In case of Ahead streams,
            -- there may be queued items in the heap even though the
            -- outputQueue is empty, and we may have active workers which are
            -- deadlocked on those items to be processed by the consumer. We
            -- should either guarantee that any worker, before returning,
            -- clears the heap or we send a worker to clear it. Normally we
            -- always send a worker if no output is seen, but if the thread
            -- limit is reached or we are using pacing then we may not send a
            -- worker. See the concurrentApplication test in the tests, that
            -- test case requires at least one yield from the producer to not
            -- deadlock, if the last workers output is stuck in the heap then
            -- this test fails.  This problem can be extended to n threads when
            -- the consumer may depend on the evaluation of next n items in the
            -- producer stream.

            -- register for the outputDoorBell before we check the queue so
            -- that if we sleep because the queue was empty we are guaranteed
            -- to get a doorbell on the next enqueue.

            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Bool -> (Bool -> Bool) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) ((Bool -> Bool) -> IO ()) -> (Bool -> Bool) -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> Bool -> Bool
forall a b. a -> b -> a
const Bool
True
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
storeLoadBarrier
            Bool
canDoMore <- Channel m a -> m Bool
dispatch Channel m a
sv

            -- XXX test for the case when we miss sending a worker when the
            -- worker count is more than 1500.
            --
            -- XXX Assert here that if the heap is not empty then there is at
            -- least one outstanding worker. Otherwise we could be sleeping
            -- forever.

            if Bool
canDoMore
            then m ()
go
            else do
                IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                    (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> IO [Char] -> [Char] -> IO () -> IO ()
withDiagMVar
                        (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv)
                        (Channel m a -> IO [Char]
forall (m :: * -> *) a. Channel m a -> IO [Char]
dumpSVar Channel m a
sv)
                        [Char]
"sendWorkerWait: nothing to do"
                    (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
                ([ChildEvent a]
_, Int
len) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
                if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
                then m ()
go
                else
                    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                        (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Bool -> (Bool -> Bool) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv)
                        ((Bool -> Bool) -> IO ()) -> (Bool -> Bool) -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> Bool -> Bool
forall a b. a -> b -> a
const Bool
False

-- | Start the evaluation of the channel's work queue by kicking off a worker.
-- Note: Work queue must not be empty otherwise the worker will exit without
-- doing anything.
startChannel :: MonadRunInIO m =>
    Channel m a -> m ()
startChannel :: forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m ()
startChannel Channel m a
chan = do
    case Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
chan of
        Maybe YieldRateInfo
Nothing -> Count -> Channel m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
0 Channel m a
chan
        Just YieldRateInfo
yinfo  ->
            if YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
forall a. Bounded a => a
maxBound
            then IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
forall a. Bounded a => a
maxBound
            else Count -> Channel m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
1 Channel m a
chan

sendWorkerDelayPaced :: Channel m a -> IO ()
sendWorkerDelayPaced :: forall (m :: * -> *) a. Channel m a -> IO ()
sendWorkerDelayPaced Channel m a
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

sendWorkerDelay :: Channel m a -> IO ()
sendWorkerDelay :: forall (m :: * -> *) a. Channel m a -> IO ()
sendWorkerDelay Channel m a
_sv =
    -- XXX we need a better way to handle this than hardcoded delays. The
    -- delays may be different for different systems.
    -- If there is a usecase where this is required we can create a combinator
    -- to set it as a config in the state.
    {-
  do
    ncpu <- getNumCapabilities
    if ncpu <= 1
    then
        if (svarStyle sv == AheadVar)
        then threadDelay 100
        else threadDelay 25
    else
        if (svarStyle sv == AheadVar)
        then threadDelay 100
        else threadDelay 10
    -}
    () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()