-- |
-- Module      : Streamly.Internal.Data.Stream.Concurrent.Channel.Worker
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
-- Collecting results from child workers in a streamed fashion

module Streamly.Internal.Data.Stream.Channel.Worker
      Work (..)
    , estimateWorkers
    , isBeyondMaxRate
    , workerRateControl

    -- * Send Events
    , sendWithDoorBell
    , sendYield
    , sendStop
    , handleChildException -- XXX rename to sendException

import Control.Concurrent (myThreadId)
import Control.Concurrent.MVar (MVar, tryPutMVar)
import Control.Exception (SomeException(..), assert)
import Control.Monad (when, void)
import Data.IORef (IORef, readIORef, writeIORef)
import Streamly.Internal.Data.Atomics
       (atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
       (AbsTime, NanoSecond64(..), diffAbsTime64, fromRelTime64)

import Streamly.Internal.Data.Stream.Channel.Types

-- Yield control

updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount WorkerInfo
winfo = do
cnt <- forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
    let cnt1 :: Count
cnt1 = Count
cnt forall a. Num a => a -> a -> a
+ Count
    forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo) Count
    forall (m :: * -> *) a. Monad m => a -> m a
return Count

isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
winfo =
    let ymax :: Count
ymax = WorkerInfo -> Count
workerYieldMax WorkerInfo
    in Count
ymax forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& Count
cnt forall a. Ord a => a -> a -> Bool
>= Count

-- Sending results from worker

{-# INLINE sendWithDoorBell #-}
sendWithDoorBell ::
    IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell :: forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell ChildEvent a
msg = do
    -- XXX can the access to outputQueue be made faster somehow?
oldlen <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([ChildEvent a], Int)
q forall a b. (a -> b) -> a -> b
$ \([ChildEvent a]
es, Int
n) ->
        ((ChildEvent a
msg forall a. a -> [a] -> [a]
: [ChildEvent a]
es, Int
n forall a. Num a => a -> a -> a
+ Int
1), Int
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
oldlen forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$ do
        -- The wake up must happen only after the store has finished otherwise
        -- we can have lost wakeup problems.
        IO ()
        -- Since multiple workers can try this at the same time, it is possible
        -- that we may put a spurious MVar after the consumer has already seen
        -- the output. But that's harmless, at worst it may cause the consumer
        -- to read the queue again and find it empty.
        -- The important point is that the consumer is guaranteed to receive a
        -- doorbell if something was added to the queue after it empties it.
        forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
bell ()
    forall (m :: * -> *) a. Monad m => a -> m a
return Int

-- Collect and update worker latency

workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo = do
cnt0, AbsTime
t0) <- forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
cnt1 <- forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
    let cnt :: Count
cnt = Count
cnt1 forall a. Num a => a -> a -> a
- Count

    if Count
cnt forall a. Ord a => a -> a -> Bool
> Count
    then do
t1 <- Clock -> IO AbsTime
getTime Clock
        let period :: NanoSecond64
period = forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
t1 AbsTime
        forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo) (Count
cnt1, AbsTime
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just (Count
cnt, NanoSecond64
    else forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a

-- XXX There are a number of gotchas in measuring latencies.
-- 1) We measure latencies only when a worker yields a value
-- 2) It is possible that a stream calls the stop continuation, in which case
-- the worker would not yield a value and we would not account that worker in
-- latencies. Even though this case should ideally be accounted we do not
-- account it because we cannot or do not distinguish it from the case
-- described next.
-- 3) It is possible that a worker returns without yielding anything because it
-- never got a chance to pick up work.
-- 4) If the system timer resolution is lower than the latency, the latency
-- computation turns out to be zero.
-- We can fix this if we measure the latencies by counting the work items
-- picked rather than based on the outputs yielded.
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
yinfo WorkerInfo
winfo = do
    Maybe (Count, NanoSecond64)
r <- WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
    case Maybe (Count, NanoSecond64)
r of
        Just (Count
cnt, NanoSecond64
period) -> do
        -- NOTE: On JS platform the timer resolution could be pretty low. When
        -- the timer resolution is low, measurement of latencies could be
        -- tricky. All the worker latencies will turn out to be zero if they
        -- are lower than the resolution. We only take into account those
        -- measurements which are more than the timer resolution.

            let ref :: IORef (Count, Count, NanoSecond64)
ref = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
cnt1, NanoSecond64
t1) = if NanoSecond64
period forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then (Count
cnt, NanoSecond64
period) else (Count
0, NanoSecond64
            forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef (Count, Count, NanoSecond64)
ref forall a b. (a -> b) -> a -> b
fc, Count
n, NanoSecond64
t) -> (Count
fc forall a. Num a => a -> a -> a
+ Count
cnt, Count
n forall a. Num a => a -> a -> a
+ Count
cnt1, NanoSecond64
t forall a. Num a => a -> a -> a
+ NanoSecond64
        Maybe (Count, NanoSecond64)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- Worker rate control

-- We either block, or send one worker with limited yield count or one or more
-- workers with unlimited yield count.
data Work
    = BlockWait NanoSecond64
    | PartialWorker Count
    | ManyWorkers Int Count
    deriving Int -> Work -> ShowS
[Work] -> ShowS
Work -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Work] -> ShowS
$cshowList :: [Work] -> ShowS
show :: Work -> String
$cshow :: Work -> String
showsPrec :: Int -> Work -> ShowS
$cshowsPrec :: Int -> Work -> ShowS

-- | Another magic number! When we have to start more workers to cover up a
-- number of yields that we are lagging by then we cannot start one worker for
-- each yield because that may be a very big number and if the latency of the
-- workers is low these number of yields could be very high. We assume that we
-- run each extra worker for at least this much time.
rateRecoveryTime :: NanoSecond64
rateRecoveryTime :: NanoSecond64
rateRecoveryTime = NanoSecond64

-- | Get the worker latency without resetting workerPendingLatency
-- Returns (total yield count, base time, measured latency)
-- CAUTION! keep it in sync with collectLatency
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
yinfo  = do
    let cur :: IORef (Count, Count, NanoSecond64)
cur      = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
        col :: IORef (Count, Count, NanoSecond64)
col      = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency YieldRateInfo
        longTerm :: IORef (Count, AbsTime)
longTerm = YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
        measured :: IORef NanoSecond64
measured = YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency YieldRateInfo

curTotalCount, Count
curCount, NanoSecond64
curTime) <- forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
colTotalCount, Count
colCount, NanoSecond64
colTime) <- forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
lcount, AbsTime
ltime)     <- forall a. IORef a -> IO a
readIORef IORef (Count, AbsTime)
prevLat             <- forall a. IORef a -> IO a
readIORef IORef NanoSecond64

    let latCount :: Count
latCount = Count
colCount forall a. Num a => a -> a -> a
+ Count
        latTime :: NanoSecond64
latTime  = NanoSecond64
colTime forall a. Num a => a -> a -> a
+ NanoSecond64
        totalCount :: Count
totalCount = Count
colTotalCount forall a. Num a => a -> a -> a
+ Count
        newLat :: NanoSecond64
newLat =
            if Count
latCount forall a. Ord a => a -> a -> Bool
> Count
0 Bool -> Bool -> Bool
&& NanoSecond64
latTime forall a. Ord a => a -> a -> Bool
> NanoSecond64
            then let lat :: NanoSecond64
lat = NanoSecond64
latTime forall a. Integral a => a -> a -> a
`div` forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
                 -- XXX Give more weight to new?
                 in (NanoSecond64
lat forall a. Num a => a -> a -> a
+ NanoSecond64
prevLat) forall a. Integral a => a -> a -> a
`div` NanoSecond64
            else NanoSecond64
    forall (m :: * -> *) a. Monad m => a -> m a
return (Count
lcount forall a. Num a => a -> a -> a
+ Count
totalCount, AbsTime
ltime, NanoSecond64

-- XXX we can use phantom types to distinguish the duration/latency/expectedLat
    :: Limit
    -> Count
    -> Count
    -> NanoSecond64
    -> NanoSecond64
    -> NanoSecond64
    -> LatencyRange
    -> Work
estimateWorkers :: Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
svarElapsed NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range =
    -- XXX we can have a maxEfficiency combinator as well which runs the
    -- producer at the maximal efficiency i.e. the number of workers are chosen
    -- such that the latency is minimum or within a range. Or we can call it
    -- maxWorkerLatency.
        -- How many workers do we need to achieve the required rate?
        -- When the workers are IO bound we can increase the throughput by
        -- increasing the number of workers as long as the IO device has enough
        -- capacity to process all the requests concurrently. If the IO
        -- bandwidth is saturated increasing the workers won't help. Also, if
        -- the CPU utilization in processing all these requests exceeds the CPU
        -- bandwidth, then increasing the number of workers won't help.
        -- When the workers are purely CPU bound, increasing the workers beyond
        -- the number of CPUs won't help.
        -- TODO - measure the CPU and IO requirements of the workers. Have a
        -- way to specify the max bandwidth of the underlying IO mechanism and
        -- use that to determine the max rate of workers, and also take the CPU
        -- bandwidth into account. We can also discover the IO bandwidth if we
        -- know that we are not CPU bound, then how much steady state rate are
        -- we able to achieve. Design tests for CPU bound and IO bound cases.

        -- Calculate how many yields are we ahead or behind to match the exact
        -- required rate. Based on that we increase or decrease the effective
        -- workers.
        -- When the worker latency is lower than required latency we begin with
        -- a yield and then wait rather than first waiting and then yielding.
        targetYields :: NanoSecond64
targetYields = (NanoSecond64
svarElapsed forall a. Num a => a -> a -> a
+ NanoSecond64
wLatency forall a. Num a => a -> a -> a
+ NanoSecond64
targetLat forall a. Num a => a -> a -> a
- NanoSecond64
1) forall a. Integral a => a -> a -> a
`div` NanoSecond64
        effectiveYields :: Count
effectiveYields = Count
svarYields forall a. Num a => a -> a -> a
+ Count
        deltaYields :: Count
deltaYields = forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetYields forall a. Num a => a -> a -> a
- Count

        -- We recover the deficit by running at a higher/lower rate for a
        -- certain amount of time. To keep the effective rate in reasonable
        -- limits we use rateRecoveryTime, minLatency and maxLatency.
        in  if Count
deltaYields forall a. Ord a => a -> a -> Bool
> Count
                let deltaYieldsFreq :: Double
                    deltaYieldsFreq :: Double
deltaYieldsFreq =
                        forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields forall a. Fractional a => a -> a -> a
                            forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
                    yieldsFreq :: Double
yieldsFreq = Double
1.0 forall a. Fractional a => a -> a -> a
/ forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
                    totalYieldsFreq :: Double
totalYieldsFreq = Double
yieldsFreq forall a. Num a => a -> a -> a
+ Double
                    requiredLat :: NanoSecond64
requiredLat = Int64 -> NanoSecond64
NanoSecond64 forall a b. (a -> b) -> a -> b
$ forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
1.0 forall a. Fractional a => a -> a -> a
/ Double
                    adjustedLat :: NanoSecond64
adjustedLat = forall a. Ord a => a -> a -> a
min (forall a. Ord a => a -> a -> a
max NanoSecond64
requiredLat (LatencyRange -> NanoSecond64
minLatency LatencyRange
                                      (LatencyRange -> NanoSecond64
maxLatency LatencyRange
                in  forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
adjustedLat forall a. Ord a => a -> a -> Bool
> NanoSecond64
0) forall a b. (a -> b) -> a -> b
                    if NanoSecond64
wLatency forall a. Ord a => a -> a -> Bool
<= NanoSecond64
                    then Count -> Work
PartialWorker Count
                    else let workers :: NanoSecond64
workers = forall {p}. (Ord p, Num p) => p -> p
withLimit forall a b. (a -> b) -> a -> b
$ NanoSecond64
wLatency forall a. Integral a => a -> a -> a
`div` NanoSecond64
                             limited :: NanoSecond64
limited = forall a. Ord a => a -> a -> a
min NanoSecond64
workers (forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
                         in Int -> Count -> Work
ManyWorkers (forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
limited) Count
                let expectedDuration :: NanoSecond64
expectedDuration = forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
effectiveYields forall a. Num a => a -> a -> a
* NanoSecond64
                    sleepTime :: NanoSecond64
sleepTime = NanoSecond64
expectedDuration forall a. Num a => a -> a -> a
- NanoSecond64
                    maxSleepTime :: NanoSecond64
maxSleepTime = LatencyRange -> NanoSecond64
maxLatency LatencyRange
range forall a. Num a => a -> a -> a
- NanoSecond64
                    s :: NanoSecond64
s = forall a. Ord a => a -> a -> a
min NanoSecond64
sleepTime NanoSecond64
                in forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
sleepTime forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) forall a b. (a -> b) -> a -> b
                    -- if s is less than 0 it means our maxSleepTime is less
                    -- than the worker latency.
                    if NanoSecond64
s forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then NanoSecond64 -> Work
BlockWait NanoSecond64
s else Int -> Count -> Work
ManyWorkers Int
1 (Int64 -> Count
Count Int64
        withLimit :: p -> p
withLimit p
n =
            case Limit
workerLimit of
Unlimited -> p
                Limited Word
x -> forall a. Ord a => a -> a -> a
min p
n (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word

isBeyondMaxRate :: Limit -> IORef Int -> YieldRateInfo -> IO Bool
isBeyondMaxRate :: Limit -> IORef Int -> YieldRateInfo -> IO Bool
isBeyondMaxRate Limit
workerLimit IORef Int
workerCount YieldRateInfo
rateInfo = do
count, AbsTime
tstamp, NanoSecond64
wLatency) <- YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
now <- Clock -> IO AbsTime
getTime Clock
    let duration :: NanoSecond64
duration = forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
    let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
gainLoss <- forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
    let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
count Count
gainLoss NanoSecond64
wLatency NanoSecond64
targetLat (YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
cnt <- forall a. IORef a -> IO a
readIORef IORef Int
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Work
work of
        -- XXX set the worker's maxYields or polling interval based on yields
        PartialWorker Count
_yields -> Int
cnt forall a. Ord a => a -> a -> Bool
> Int
        ManyWorkers Int
n Count
_ -> Int
cnt forall a. Ord a => a -> a -> Bool
> Int
        BlockWait NanoSecond64
_ -> Bool

-- XXX we should do rate control periodically based on the total yields rather
-- than based on the worker local yields as other workers may have yielded more
-- and we should stop based on the aggregate yields. However, latency update
-- period can be based on individual worker yields.
{-# NOINLINE checkRatePeriodic #-}
checkRatePeriodic ::
    -> IORef Int
    -> YieldRateInfo
    -> WorkerInfo
    -> Count
    -> IO Bool
checkRatePeriodic :: Limit
-> IORef Int -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic Limit
workerLimit IORef Int
workerCount YieldRateInfo
rateInfo WorkerInfo
workerInfo Count
ycnt = do
i <- forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
    -- XXX use generation count to check if the interval has been updated
    if Count
i forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& (Count
ycnt forall a. Integral a => a -> a -> a
`mod` Count
i) forall a. Eq a => a -> a -> Bool
== Count
    then do
        YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
rateInfo WorkerInfo
        -- XXX not required for parallel streams
        Limit -> IORef Int -> YieldRateInfo -> IO Bool
isBeyondMaxRate Limit
workerLimit IORef Int
workerCount YieldRateInfo
    else forall (m :: * -> *) a. Monad m => a -> m a
return Bool

-- | CAUTION! this also updates the yield count and therefore should be called
-- only when we are actually yielding an element.
{-# NOINLINE workerRateControl #-}
workerRateControl :: Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl :: Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl Limit
workerLimit IORef Int
workerCount YieldRateInfo
rateInfo WorkerInfo
workerInfo = do
cnt <- WorkerInfo -> IO Count
updateYieldCount WorkerInfo
beyondMaxRate <- Limit
-> IORef Int -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic Limit
workerLimit IORef Int
workerCount YieldRateInfo
rateInfo WorkerInfo
workerInfo Count
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not (Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
workerInfo Bool -> Bool -> Bool
|| Bool

-- Send a yield event

-- XXX we should do rate control here but not latency update in case of ahead
-- streams. latency update must be done when we yield directly to outputQueue
-- or when we yield to heap.

-- | Returns whether the worker should continue (True) or stop (False).
{-# INLINE sendYield #-}
sendYield ::
    -> Limit
    -> IORef Int
    -> Maybe WorkerInfo
    -> Maybe YieldRateInfo
    -> IORef ([ChildEvent a], Int)
    -> MVar ()
    -> ChildEvent a
    -> IO Bool
sendYield :: forall a.
-> Limit
-> IORef Int
-> Maybe WorkerInfo
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> ChildEvent a
-> IO Bool
sendYield Limit
bufferLimit Limit
workerLimit IORef Int
workerCount Maybe WorkerInfo
workerInfo Maybe YieldRateInfo
rateInfo IORef ([ChildEvent a], Int)
q MVar ()
bell ChildEvent a
msg =
oldlen <- forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell ChildEvent a
bufferSpaceOk <-
        case Limit
bufferLimit of
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
            Limited Word
lim -> do
active <- forall a. IORef a -> IO a
readIORef IORef Int
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (Int
oldlen forall a. Num a => a -> a -> a
+ Int
1) forall a. Ord a => a -> a -> Bool
< (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim forall a. Num a => a -> a -> a
- Int
rateLimitOk <-
        case Maybe WorkerInfo
workerInfo of
            Just WorkerInfo
winfo ->
                case Maybe YieldRateInfo
rateInfo of
                    Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
                    Just YieldRateInfo
yinfo ->
                        Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl Limit
workerLimit IORef Int
workerCount YieldRateInfo
yinfo WorkerInfo
            Maybe WorkerInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
bufferSpaceOk Bool -> Bool -> Bool
&& Bool

-- Send a Stop event

{-# INLINE workerStopUpdate #-}
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
info = do
i <- forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
i forall a. Eq a => a -> a -> Bool
/= Count
0) forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
info WorkerInfo

{-# INLINABLE sendStop #-}
sendStop ::
       IORef Int
    -> Maybe WorkerInfo
    -> Maybe YieldRateInfo
    -> IORef ([ChildEvent a], Int)
    -> MVar ()
    -> IO ()
sendStop :: forall a.
IORef Int
-> Maybe WorkerInfo
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> IO ()
sendStop IORef Int
workerCount Maybe WorkerInfo
workerInfo Maybe YieldRateInfo
rateInfo IORef ([ChildEvent a], Int)
q MVar ()
bell = do
    forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef Int
workerCount forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n forall a. Num a => a -> a -> a
- Int
    case (Maybe WorkerInfo
workerInfo, Maybe YieldRateInfo
rateInfo) of
      (Just WorkerInfo
winfo, Just YieldRateInfo
rinfo) ->
          WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
      (Maybe WorkerInfo, Maybe YieldRateInfo)
_ ->
          forall (m :: * -> *) a. Monad m => a -> m a
return ()
    IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ThreadId
tid ->
        forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell (forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid forall a. Maybe a

{-# NOINLINE handleChildException #-}
handleChildException ::
    IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
handleChildException :: forall a.
IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
handleChildException IORef ([ChildEvent a], Int)
q MVar ()
bell SomeException
e = do
tid <- IO ThreadId
    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell (forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (forall a. a -> Maybe a
Just SomeException