{-# LANGUAGE UnboxedTuples #-}

-- |
-- Module      : Streamly.Internal.Data.SVar
-- Copyright   : (c) 2017 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.SVar
    (
      MonadAsync
    , SVarStyle (..)
    , SVarStopStyle (..)
    , SVar (..)

    -- State threaded around the stream
    , Limit (..)
    , State (streamVar)
    , defState
    , adaptState
    , getMaxThreads
    , setMaxThreads
    , getMaxBuffer
    , setMaxBuffer
    , getStreamRate
    , setStreamRate
    , setStreamLatency
    , getYieldLimit
    , setYieldLimit
    , getInspectMode
    , setInspectMode
    , recordMaxWorkers

    , cleanupSVar
    , cleanupSVarFromWorker

    -- SVar related
    , newAheadVar
    , newParallelVar
    , captureMonadState
    , RunInIO (..)

    , WorkerInfo (..)
    , YieldRateInfo (..)
    , ThreadAbort (..)
    , ChildEvent (..)
    , AheadHeapEntry (..)
    , send
    , sendToProducer
    , sendYield
    , sendStop
    , sendStopToProducer
    , enqueueLIFO
    , enqueueFIFO
    , enqueueAhead
    , reEnqueueAhead
    , pushWorkerPar
    , handleChildException
    , handleFoldException

    , queueEmptyAhead
    , dequeueAhead

    , HeapDequeueResult(..)
    , dequeueFromHeap
    , dequeueFromHeapSeq
    , requeueOnHeapTop
    , updateHeapSeq
    , withIORef
    , heapIsSane

    , Rate (..)
    , getYieldRateInfo
    , newSVarStats
    , collectLatency
    , workerUpdateLatency
    , isBeyondMaxRate
    , workerRateControl
    , updateYieldCount
    , decrementYieldLimit
    , incrementYieldLimit
    , decrementBufferLimit
    , incrementBufferLimit
    , postProcessBounded
    , postProcessPaced
    , readOutputQBounded
    , readOutputQPaced
    , readOutputQBasic
    , dispatchWorkerPaced
    , sendFirstWorker
    , delThread
    , modifyThread
    , doFork
    , fork
    , forkManaged

    , toStreamVar
    , SVarStats (..)
    , dumpSVar
    , printSVar
    , withDiagMVar
    )
where

import Control.Concurrent (ThreadId, myThreadId, threadDelay, throwTo)
import Control.Concurrent.MVar
       (MVar, newEmptyMVar, tryPutMVar, takeMVar, tryTakeMVar, newMVar,
        tryReadMVar)
import Control.Exception
       (SomeException(..), assert, Exception, catches,
        throwIO, Handler(..), BlockedIndefinitelyOnMVar(..),
        BlockedIndefinitelyOnSTM(..))
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control (MonadBaseControl, control)
import Streamly.Internal.Data.Atomics
       (atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier,
        storeLoadBarrier)
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, pushL)
import Data.Functor (void)
import Data.Heap (Heap, Entry(..))
import Data.Int (Int64)
import Data.IORef
       (IORef, modifyIORef, newIORef, readIORef, writeIORef, atomicModifyIORef)
import Data.Kind (Type)
import Data.Maybe (fromJust, fromMaybe)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup ((<>))
#endif
import Data.Set (Set)
import System.IO (hPutStrLn, stderr)

import Streamly.Internal.Control.Concurrent
    (MonadAsync, RunInIO(..), doFork, fork, forkManaged)
import Streamly.Internal.Data.Time.Clock.Type (Clock(..), getTime)
import Streamly.Internal.Data.Time.Units
       (AbsTime, NanoSecond64(..), MicroSecond64(..), diffAbsTime64,
        fromRelTime64, toRelTime64, showNanoSecond64, showRelTime64)

import qualified Data.Heap as H
import qualified Data.Set                    as S

newtype Count = Count Int64
    deriving ( Count -> Count -> Bool
(Count -> Count -> Bool) -> (Count -> Count -> Bool) -> Eq Count
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Count -> Count -> Bool
$c/= :: Count -> Count -> Bool
== :: Count -> Count -> Bool
$c== :: Count -> Count -> Bool
Eq
             , ReadPrec [Count]
ReadPrec Count
Int -> ReadS Count
ReadS [Count]
(Int -> ReadS Count)
-> ReadS [Count]
-> ReadPrec Count
-> ReadPrec [Count]
-> Read Count
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [Count]
$creadListPrec :: ReadPrec [Count]
readPrec :: ReadPrec Count
$creadPrec :: ReadPrec Count
readList :: ReadS [Count]
$creadList :: ReadS [Count]
readsPrec :: Int -> ReadS Count
$creadsPrec :: Int -> ReadS Count
Read
             , Int -> Count -> ShowS
[Count] -> ShowS
Count -> String
(Int -> Count -> ShowS)
-> (Count -> String) -> ([Count] -> ShowS) -> Show Count
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Count] -> ShowS
$cshowList :: [Count] -> ShowS
show :: Count -> String
$cshow :: Count -> String
showsPrec :: Int -> Count -> ShowS
$cshowsPrec :: Int -> Count -> ShowS
Show
             , Int -> Count
Count -> Int
Count -> [Count]
Count -> Count
Count -> Count -> [Count]
Count -> Count -> Count -> [Count]
(Count -> Count)
-> (Count -> Count)
-> (Int -> Count)
-> (Count -> Int)
-> (Count -> [Count])
-> (Count -> Count -> [Count])
-> (Count -> Count -> [Count])
-> (Count -> Count -> Count -> [Count])
-> Enum Count
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: Count -> Count -> Count -> [Count]
$cenumFromThenTo :: Count -> Count -> Count -> [Count]
enumFromTo :: Count -> Count -> [Count]
$cenumFromTo :: Count -> Count -> [Count]
enumFromThen :: Count -> Count -> [Count]
$cenumFromThen :: Count -> Count -> [Count]
enumFrom :: Count -> [Count]
$cenumFrom :: Count -> [Count]
fromEnum :: Count -> Int
$cfromEnum :: Count -> Int
toEnum :: Int -> Count
$ctoEnum :: Int -> Count
pred :: Count -> Count
$cpred :: Count -> Count
succ :: Count -> Count
$csucc :: Count -> Count
Enum
             , Count
Count -> Count -> Bounded Count
forall a. a -> a -> Bounded a
maxBound :: Count
$cmaxBound :: Count
minBound :: Count
$cminBound :: Count
Bounded
             , Integer -> Count
Count -> Count
Count -> Count -> Count
(Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count)
-> (Count -> Count)
-> (Count -> Count)
-> (Integer -> Count)
-> Num Count
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> Count
$cfromInteger :: Integer -> Count
signum :: Count -> Count
$csignum :: Count -> Count
abs :: Count -> Count
$cabs :: Count -> Count
negate :: Count -> Count
$cnegate :: Count -> Count
* :: Count -> Count -> Count
$c* :: Count -> Count -> Count
- :: Count -> Count -> Count
$c- :: Count -> Count -> Count
+ :: Count -> Count -> Count
$c+ :: Count -> Count -> Count
Num
             , Num Count
Ord Count
Num Count -> Ord Count -> (Count -> Rational) -> Real Count
Count -> Rational
forall a. Num a -> Ord a -> (a -> Rational) -> Real a
toRational :: Count -> Rational
$ctoRational :: Count -> Rational
$cp2Real :: Ord Count
$cp1Real :: Num Count
Real
             , Enum Count
Real Count
Real Count
-> Enum Count
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> (Count, Count))
-> (Count -> Count -> (Count, Count))
-> (Count -> Integer)
-> Integral Count
Count -> Integer
Count -> Count -> (Count, Count)
Count -> Count -> Count
forall a.
Real a
-> Enum a
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> (a, a))
-> (a -> a -> (a, a))
-> (a -> Integer)
-> Integral a
toInteger :: Count -> Integer
$ctoInteger :: Count -> Integer
divMod :: Count -> Count -> (Count, Count)
$cdivMod :: Count -> Count -> (Count, Count)
quotRem :: Count -> Count -> (Count, Count)
$cquotRem :: Count -> Count -> (Count, Count)
mod :: Count -> Count -> Count
$cmod :: Count -> Count -> Count
div :: Count -> Count -> Count
$cdiv :: Count -> Count -> Count
rem :: Count -> Count -> Count
$crem :: Count -> Count -> Count
quot :: Count -> Count -> Count
$cquot :: Count -> Count -> Count
$cp2Integral :: Enum Count
$cp1Integral :: Real Count
Integral
             , Eq Count
Eq Count
-> (Count -> Count -> Ordering)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> Ord Count
Count -> Count -> Bool
Count -> Count -> Ordering
Count -> Count -> Count
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Count -> Count -> Count
$cmin :: Count -> Count -> Count
max :: Count -> Count -> Count
$cmax :: Count -> Count -> Count
>= :: Count -> Count -> Bool
$c>= :: Count -> Count -> Bool
> :: Count -> Count -> Bool
$c> :: Count -> Count -> Bool
<= :: Count -> Count -> Bool
$c<= :: Count -> Count -> Bool
< :: Count -> Count -> Bool
$c< :: Count -> Count -> Bool
compare :: Count -> Count -> Ordering
$ccompare :: Count -> Count -> Ordering
$cp1Ord :: Eq Count
Ord
             )

------------------------------------------------------------------------------
-- Parent child thread communication type
------------------------------------------------------------------------------

data ThreadAbort = ThreadAbort deriving Int -> ThreadAbort -> ShowS
[ThreadAbort] -> ShowS
ThreadAbort -> String
(Int -> ThreadAbort -> ShowS)
-> (ThreadAbort -> String)
-> ([ThreadAbort] -> ShowS)
-> Show ThreadAbort
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ThreadAbort] -> ShowS
$cshowList :: [ThreadAbort] -> ShowS
show :: ThreadAbort -> String
$cshow :: ThreadAbort -> String
showsPrec :: Int -> ThreadAbort -> ShowS
$cshowsPrec :: Int -> ThreadAbort -> ShowS
Show

instance Exception ThreadAbort

-- | Events that a child thread may send to a parent thread.
data ChildEvent a =
      ChildYield a
    | ChildStop ThreadId (Maybe SomeException)

-- | Sorting out-of-turn outputs in a heap for Ahead style streams
data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a =
      AheadEntryNull
    | AheadEntryPure a
    | AheadEntryStream (RunInIO m, t m a)
#undef Type

------------------------------------------------------------------------------
-- State threaded around the monad for thread management
------------------------------------------------------------------------------

-- | Identify the type of the SVar. Two computations using the same style can
-- be scheduled on the same SVar.
data SVarStyle =
      AsyncVar             -- depth first concurrent
    | WAsyncVar            -- breadth first concurrent
    | ParallelVar          -- all parallel
    | AheadVar             -- Concurrent look ahead
    deriving (SVarStyle -> SVarStyle -> Bool
(SVarStyle -> SVarStyle -> Bool)
-> (SVarStyle -> SVarStyle -> Bool) -> Eq SVarStyle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SVarStyle -> SVarStyle -> Bool
$c/= :: SVarStyle -> SVarStyle -> Bool
== :: SVarStyle -> SVarStyle -> Bool
$c== :: SVarStyle -> SVarStyle -> Bool
Eq, Int -> SVarStyle -> ShowS
[SVarStyle] -> ShowS
SVarStyle -> String
(Int -> SVarStyle -> ShowS)
-> (SVarStyle -> String)
-> ([SVarStyle] -> ShowS)
-> Show SVarStyle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SVarStyle] -> ShowS
$cshowList :: [SVarStyle] -> ShowS
show :: SVarStyle -> String
$cshow :: SVarStyle -> String
showsPrec :: Int -> SVarStyle -> ShowS
$cshowsPrec :: Int -> SVarStyle -> ShowS
Show)

-- | An SVar or a Stream Var is a conduit to the output from multiple streams
-- running concurrently and asynchronously. An SVar can be thought of as an
-- asynchronous IO handle. We can write any number of streams to an SVar in a
-- non-blocking manner and then read them back at any time at any pace.  The
-- SVar would run the streams asynchronously and accumulate results. An SVar
-- may not really execute the stream completely and accumulate all the results.
-- However, it ensures that the reader can read the results at whatever paces
-- it wants to read. The SVar monitors and adapts to the consumer's pace.
--
-- An SVar is a mini scheduler, it has an associated workLoop that holds the
-- stream tasks to be picked and run by a pool of worker threads. It has an
-- associated output queue where the output stream elements are placed by the
-- worker threads. A outputDoorBell is used by the worker threads to intimate the
-- consumer thread about availability of new results in the output queue. More
-- workers are added to the SVar by 'fromStreamVar' on demand if the output
-- produced is not keeping pace with the consumer. On bounded SVars, workers
-- block on the output queue to provide throttling of the producer  when the
-- consumer is not pulling fast enough.  The number of workers may even get
-- reduced depending on the consuming pace.
--
-- New work is enqueued either at the time of creation of the SVar or as a
-- result of executing the parallel combinators i.e. '<|' and '<|>' when the
-- already enqueued computations get evaluated. See 'joinStreamVarAsync'.

-- We measure the individual worker latencies to estimate the number of workers
-- needed or the amount of time we have to sleep between dispatches to achieve
-- a particular rate when controlled pace mode it used.
data WorkerInfo = WorkerInfo
    { WorkerInfo -> Count
workerYieldMax   :: Count -- 0 means unlimited
    -- total number of yields by the worker till now
    , WorkerInfo -> IORef Count
workerYieldCount    :: IORef Count
    -- yieldCount at start, timestamp
    , WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart  :: IORef (Count, AbsTime)
    }

-- | Specifies the stream yield rate in yields per second (@Hertz@).
-- We keep accumulating yield credits at 'rateGoal'. At any point of time we
-- allow only as many yields as we have accumulated as per 'rateGoal' since the
-- start of time. If the consumer or the producer is slower or faster, the
-- actual rate may fall behind or exceed 'rateGoal'.  We try to recover the gap
-- between the two by increasing or decreasing the pull rate from the producer.
-- However, if the gap becomes more than 'rateBuffer' we try to recover only as
-- much as 'rateBuffer'.
--
-- 'rateLow' puts a bound on how low the instantaneous rate can go when
-- recovering the rate gap.  In other words, it determines the maximum yield
-- latency.  Similarly, 'rateHigh' puts a bound on how high the instantaneous
-- rate can go when recovering the rate gap.  In other words, it determines the
-- minimum yield latency. We reduce the latency by increasing concurrency,
-- therefore we can say that it puts an upper bound on concurrency.
--
-- If the 'rateGoal' is 0 or negative the stream never yields a value.
-- If the 'rateBuffer' is 0 or negative we do not attempt to recover.
--
-- /Since: 0.5.0 ("Streamly")/
--
-- @since 0.8.0
data Rate = Rate
    { Rate -> Double
rateLow    :: Double -- ^ The lower rate limit
    , Rate -> Double
rateGoal   :: Double -- ^ The target rate we want to achieve
    , Rate -> Double
rateHigh   :: Double -- ^ The upper rate limit
    , Rate -> Int
rateBuffer :: Int    -- ^ Maximum slack from the goal
    }

data LatencyRange = LatencyRange
    { LatencyRange -> NanoSecond64
minLatency :: NanoSecond64
    , LatencyRange -> NanoSecond64
maxLatency :: NanoSecond64
    } deriving Int -> LatencyRange -> ShowS
[LatencyRange] -> ShowS
LatencyRange -> String
(Int -> LatencyRange -> ShowS)
-> (LatencyRange -> String)
-> ([LatencyRange] -> ShowS)
-> Show LatencyRange
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [LatencyRange] -> ShowS
$cshowList :: [LatencyRange] -> ShowS
show :: LatencyRange -> String
$cshow :: LatencyRange -> String
showsPrec :: Int -> LatencyRange -> ShowS
$cshowsPrec :: Int -> LatencyRange -> ShowS
Show

-- Rate control.
data YieldRateInfo = YieldRateInfo
    { YieldRateInfo -> NanoSecond64
svarLatencyTarget    :: NanoSecond64
    , YieldRateInfo -> LatencyRange
svarLatencyRange     :: LatencyRange
    , YieldRateInfo -> Int
svarRateBuffer       :: Int

    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , YieldRateInfo -> IORef Count
svarGainedLostYields :: IORef Count

    -- Actual latency/througput as seen from the consumer side, we count the
    -- yields and the time it took to generates those yields. This is used to
    -- increase or decrease the number of workers needed to achieve the desired
    -- rate. The idle time of workers is adjusted in this, so that we only
    -- account for the rate when the consumer actually demands data.
    -- XXX interval latency is enough, we can move this under diagnostics build
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency :: IORef (Count, AbsTime)

    -- XXX Worker latency specified by the user to be used before the first
    -- actual measurement arrives. Not yet implemented
    , YieldRateInfo -> Maybe NanoSecond64
workerBootstrapLatency :: Maybe NanoSecond64

    -- After how many yields the worker should update the latency information.
    -- If the latency is high, this count is kept lower and vice-versa.  XXX If
    -- the latency suddenly becomes too high this count may remain too high for
    -- long time, in such cases the consumer can change it.
    -- 0 means no latency computation
    -- XXX this is derivable from workerMeasuredLatency, can be removed.
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , YieldRateInfo -> IORef Count
workerPollingInterval :: IORef Count

    -- This is in progress latency stats maintained by the workers which we
    -- empty into workerCollectedLatency stats at certain intervals - whenever
    -- we process the stream elements yielded in this period. The first count
    -- is all yields, the second count is only those yields for which the
    -- latency was measured to be non-zero (note that if the timer resolution
    -- is low the measured latency may be zero e.g. on JS platform).
    -- [LOCKING] Locked access. Modified by the consumer thread as well as
    -- worker threads. Workers modify it periodically based on
    -- workerPollingInterval and not on every yield to reduce the locking
    -- overhead.
    -- (allYieldCount, yieldCount, timeTaken)
    , YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency   :: IORef (Count, Count, NanoSecond64)

    -- This is the second level stat which is an accmulation from
    -- workerPendingLatency stats. We keep accumulating latencies in this
    -- bucket until we have stats for a sufficient period and then we reset it
    -- to start collecting for the next period and retain the computed average
    -- latency for the last period in workerMeasuredLatency.
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    -- (allYieldCount, yieldCount, timeTaken)
    , YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency :: IORef (Count, Count, NanoSecond64)

    -- Latency as measured by workers, aggregated for the last period.
    -- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
    -- read by the worker threads
    , YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency :: IORef NanoSecond64
    }

data SVarStats = SVarStats {
      SVarStats -> IORef Int
totalDispatches  :: IORef Int
    , SVarStats -> IORef Int
maxWorkers       :: IORef Int
    , SVarStats -> IORef Int
maxOutQSize      :: IORef Int
    , SVarStats -> IORef Int
maxHeapSize      :: IORef Int
    , SVarStats -> IORef Int
maxWorkQSize     :: IORef Int
    , SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency :: IORef (Count, NanoSecond64)
    , SVarStats -> IORef NanoSecond64
minWorkerLatency :: IORef NanoSecond64
    , SVarStats -> IORef NanoSecond64
maxWorkerLatency :: IORef NanoSecond64
    , SVarStats -> IORef (Maybe AbsTime)
svarStopTime     :: IORef (Maybe AbsTime)
}

-- This is essentially a 'Maybe Word' type
data Limit = Unlimited | Limited Word deriving Int -> Limit -> ShowS
[Limit] -> ShowS
Limit -> String
(Int -> Limit -> ShowS)
-> (Limit -> String) -> ([Limit] -> ShowS) -> Show Limit
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Limit] -> ShowS
$cshowList :: [Limit] -> ShowS
show :: Limit -> String
$cshow :: Limit -> String
showsPrec :: Int -> Limit -> ShowS
$cshowsPrec :: Int -> Limit -> ShowS
Show

instance Eq Limit where
    Limit
Unlimited == :: Limit -> Limit -> Bool
== Limit
Unlimited = Bool
True
    Limit
Unlimited == Limited Word
_ = Bool
False
    Limited Word
_ == Limit
Unlimited = Bool
False
    Limited Word
x == Limited Word
y = Word
x Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
y

instance Ord Limit where
    Limit
Unlimited <= :: Limit -> Limit -> Bool
<= Limit
Unlimited = Bool
True
    Limit
Unlimited <= Limited Word
_ = Bool
False
    Limited Word
_ <= Limit
Unlimited = Bool
True
    Limited Word
x <= Limited Word
y = Word
x Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
<= Word
y

-- When to stop the composed stream.
data SVarStopStyle =
      StopNone -- stops only when all streams are finished
    | StopAny  -- stop when any stream finishes
    | StopBy   -- stop when a specific stream finishes
    deriving (SVarStopStyle -> SVarStopStyle -> Bool
(SVarStopStyle -> SVarStopStyle -> Bool)
-> (SVarStopStyle -> SVarStopStyle -> Bool) -> Eq SVarStopStyle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SVarStopStyle -> SVarStopStyle -> Bool
$c/= :: SVarStopStyle -> SVarStopStyle -> Bool
== :: SVarStopStyle -> SVarStopStyle -> Bool
$c== :: SVarStopStyle -> SVarStopStyle -> Bool
Eq, Int -> SVarStopStyle -> ShowS
[SVarStopStyle] -> ShowS
SVarStopStyle -> String
(Int -> SVarStopStyle -> ShowS)
-> (SVarStopStyle -> String)
-> ([SVarStopStyle] -> ShowS)
-> Show SVarStopStyle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SVarStopStyle] -> ShowS
$cshowList :: [SVarStopStyle] -> ShowS
show :: SVarStopStyle -> String
$cshow :: SVarStopStyle -> String
showsPrec :: Int -> SVarStopStyle -> ShowS
$cshowsPrec :: Int -> SVarStopStyle -> ShowS
Show)

-- | Buffering policy for persistent push workers (in ParallelT).  In a pull
-- style SVar (in AsyncT, AheadT etc.), the consumer side dispatches workers on
-- demand, workers terminate if the buffer is full or if the consumer is not
-- cosuming fast enough.  In a push style SVar, a worker is dispatched only
-- once, workers are persistent and keep pushing work to the consumer via a
-- bounded buffer. If the buffer becomes full the worker either blocks, or it
-- can drop an item from the buffer to make space.
--
-- Pull style SVars are useful in lazy stream evaluation whereas push style
-- SVars are useful in strict left Folds.
--
-- XXX Maybe we can separate the implementation in two different types instead
-- of using a common SVar type.
--
data PushBufferPolicy =
      PushBufferDropNew  -- drop the latest element and continue
    | PushBufferDropOld  -- drop the oldest element and continue
    | PushBufferBlock    -- block the thread until space
                         -- becomes available

-- IMPORTANT NOTE: we cannot update the SVar after generating it as we have
-- references to the original SVar stored in several functions which will keep
-- pointing to the original data and the new updates won't reflect there.
-- Any updateable parts must be kept in mutable references (IORef).
--
data SVar t m a = SVar
    {
    -- Read only state
      SVar t m a -> SVarStyle
svarStyle       :: SVarStyle
    , SVar t m a -> RunInIO m
svarMrun        :: RunInIO m
    , SVar t m a -> SVarStopStyle
svarStopStyle   :: SVarStopStyle
    , SVar t m a -> IORef ThreadId
svarStopBy      :: IORef ThreadId

    -- Shared output queue (events, length)
    -- XXX For better efficiency we can try a preallocated array type (perhaps
    -- something like a vector) that allows an O(1) append. That way we will
    -- avoid constructing and reversing the list. Possibly we can also avoid
    -- the GC copying overhead. When the size increases we should be able to
    -- allocate the array in chunks.
    --
    -- [LOCKING] Frequent locked access. This is updated by workers on each
    -- yield and once in a while read by the consumer thread. This could have
    -- big locking overhead if the number of workers is high.
    --
    -- XXX We can use a per-CPU data structure to reduce the locking overhead.
    -- However, a per-cpu structure cannot guarantee the exact sequence in
    -- which the elements were added, though that may not be important.
    , SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue    :: IORef ([ChildEvent a], Int)

    -- [LOCKING] Infrequent MVar. Used when the outputQ transitions from empty
    -- to non-empty, or a work item is queued by a worker to the work queue and
    -- needDoorBell is set by the consumer.
    , SVar t m a -> MVar ()
outputDoorBell :: MVar ()  -- signal the consumer about output
    , SVar t m a -> m [ChildEvent a]
readOutputQ    :: m [ChildEvent a]
    , SVar t m a -> m Bool
postProcess    :: m Bool

    -- channel to send events from the consumer to the worker. Used to send
    -- exceptions from a fold driver to the fold computation running as a
    -- consumer thread in the concurrent fold cases. Currently only one event
    -- is sent by the fold so we do not really need a queue for it.
    , SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
    , SVar t m a -> MVar ()
outputDoorBellFromConsumer :: MVar ()

    -- Combined/aggregate parameters
    -- This is truncated to maxBufferLimit if set to more than that. Otherwise
    -- potentially each worker may yield one value to the buffer in the worst
    -- case exceeding the requested buffer size.
    , SVar t m a -> Limit
maxWorkerLimit :: Limit
    , SVar t m a -> Limit
maxBufferLimit :: Limit
    -- These two are valid and used only when maxBufferLimit is Limited.
    , SVar t m a -> IORef Count
pushBufferSpace  :: IORef Count
    , SVar t m a -> PushBufferPolicy
pushBufferPolicy :: PushBufferPolicy
    -- [LOCKING] The consumer puts this MVar after emptying the buffer, workers
    -- block on it when the buffer becomes full. No overhead unless the buffer
    -- becomes full.
    , SVar t m a -> MVar ()
pushBufferMVar :: MVar ()

    -- [LOCKING] Read only access by consumer when dispatching a worker.
    -- Decremented by workers when picking work and undo decrement if the
    -- worker does not yield a value.
    , SVar t m a -> Maybe (IORef Count)
remainingWork  :: Maybe (IORef Count)
    , SVar t m a -> Maybe YieldRateInfo
yieldRateInfo  :: Maybe YieldRateInfo

    -- Used only by bounded SVar types
    , SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue        :: (RunInIO m, t m a) -> IO ()
    , SVar t m a -> IO Bool
isWorkDone     :: IO Bool
    , SVar t m a -> IO Bool
isQueueDone    :: IO Bool
    , SVar t m a -> IORef Bool
needDoorBell   :: IORef Bool
    , SVar t m a -> Maybe WorkerInfo -> m ()
workLoop       :: Maybe WorkerInfo -> m ()

    -- Shared, thread tracking
    -- [LOCKING] Updated unlocked only by consumer thread in case of
    -- Async/Ahead style SVars. Updated locked by worker threads in case of
    -- Parallel style.
    , SVar t m a -> IORef (Set ThreadId)
workerThreads  :: IORef (Set ThreadId)
    -- [LOCKING] Updated locked by consumer thread when dispatching a worker
    -- and by the worker threads when the thread stops. This is read unsafely
    -- at several places where we want to rely on an approximate value.
    , SVar t m a -> IORef Int
workerCount    :: IORef Int
    , SVar t m a -> ThreadId -> m ()
accountThread  :: ThreadId -> m ()
    , SVar t m a -> MVar ()
workerStopMVar :: MVar ()

    , SVar t m a -> SVarStats
svarStats      :: SVarStats
    -- to track garbage collection of SVar
    , SVar t m a -> Maybe (IORef ())
svarRef        :: Maybe (IORef ())

    -- Only for diagnostics
    , SVar t m a -> Bool
svarInspectMode :: Bool
    , SVar t m a -> ThreadId
svarCreator    :: ThreadId
    , SVar t m a
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap     :: IORef ( Heap (Entry Int (AheadHeapEntry t m a))
                              , Maybe Int)
    -- Shared work queue (stream, seqNo)
    , SVar t m a -> IORef ([t m a], Int)
aheadWorkQueue :: IORef ([t m a], Int)
    }

-------------------------------------------------------------------------------
-- State for concurrency control
-------------------------------------------------------------------------------

-- XXX we can put the resettable fields in a oneShotConfig field and others in
-- a persistentConfig field. That way reset would be fast and scalable
-- irrespective of the number of fields.
--
-- XXX make all these Limited types and use phantom types to distinguish them
data State t m a = State
    { -- one shot configuration, automatically reset for each API call
      State t m a -> Maybe (SVar t m a)
streamVar   :: Maybe (SVar t m a)
    , State t m a -> Maybe Count
_yieldLimit  :: Maybe Count

    -- persistent configuration, state that remains valid until changed by
    -- an explicit setting via a combinator.
    , State t m a -> Limit
_threadsHigh    :: Limit
    , State t m a -> Limit
_bufferHigh     :: Limit
    -- XXX these two can be collapsed into a single type
    , State t m a -> Maybe NanoSecond64
_streamLatency  :: Maybe NanoSecond64 -- bootstrap latency
    , State t m a -> Maybe Rate
_maxStreamRate  :: Maybe Rate
    , State t m a -> Bool
_inspectMode    :: Bool
    }

-------------------------------------------------------------------------------
-- State defaults and reset
-------------------------------------------------------------------------------

-- A magical value for the buffer size arrived at by running the smallest
-- possible task and measuring the optimal value of the buffer for that.  This
-- is obviously dependent on hardware, this figure is based on a 2.2GHz intel
-- core-i7 processor.
magicMaxBuffer :: Word
magicMaxBuffer :: Word
magicMaxBuffer = Word
1500

defaultMaxThreads, defaultMaxBuffer :: Limit
defaultMaxThreads :: Limit
defaultMaxThreads = Word -> Limit
Limited Word
magicMaxBuffer
defaultMaxBuffer :: Limit
defaultMaxBuffer = Word -> Limit
Limited Word
magicMaxBuffer

-- The fields prefixed by an _ are not to be accessed or updated directly but
-- via smart accessor APIs.
defState :: State t m a
defState :: State t m a
defState = State :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Maybe (SVar t m a)
-> Maybe Count
-> Limit
-> Limit
-> Maybe NanoSecond64
-> Maybe Rate
-> Bool
-> State t m a
State
    { streamVar :: Maybe (SVar t m a)
streamVar = Maybe (SVar t m a)
forall a. Maybe a
Nothing
    , _yieldLimit :: Maybe Count
_yieldLimit = Maybe Count
forall a. Maybe a
Nothing
    , _threadsHigh :: Limit
_threadsHigh = Limit
defaultMaxThreads
    , _bufferHigh :: Limit
_bufferHigh = Limit
defaultMaxBuffer
    , _maxStreamRate :: Maybe Rate
_maxStreamRate = Maybe Rate
forall a. Maybe a
Nothing
    , _streamLatency :: Maybe NanoSecond64
_streamLatency = Maybe NanoSecond64
forall a. Maybe a
Nothing
    , _inspectMode :: Bool
_inspectMode = Bool
False
    }

-- XXX if perf gets affected we can have all the Nothing params in a single
-- structure so that we reset is fast. We can also use rewrite rules such that
-- reset occurs only in concurrent streams to reduce the impact on serial
-- streams.
-- We can optimize this so that we clear it only if it is a Just value, it
-- results in slightly better perf for zip/zipM but the performance of scan
-- worsens a lot, it does not fuse.
--
-- XXX This has a side effect of clearing the SVar and yieldLimit, therefore it
-- should not be used to convert from the same type to the same type, unless
-- you want to clear the SVar. For clearing the SVar you should be using the
-- appropriate unStream functions instead.
--
-- | Adapt the stream state from one type to another.
adaptState :: State t m a -> State t n b
adaptState :: State t m a -> State t n b
adaptState State t m a
st = State t m a
st
    { streamVar :: Maybe (SVar t n b)
streamVar = Maybe (SVar t n b)
forall a. Maybe a
Nothing
    , _yieldLimit :: Maybe Count
_yieldLimit = Maybe Count
forall a. Maybe a
Nothing
    }

-------------------------------------------------------------------------------
-- Smart get/set routines for State
-------------------------------------------------------------------------------

-- Use get/set routines instead of directly accessing the State fields
setYieldLimit :: Maybe Int64 -> State t m a -> State t m a
setYieldLimit :: Maybe Int64 -> State t m a -> State t m a
setYieldLimit Maybe Int64
lim State t m a
st =
    State t m a
st { _yieldLimit :: Maybe Count
_yieldLimit =
            case Maybe Int64
lim of
                Maybe Int64
Nothing -> Maybe Count
forall a. Maybe a
Nothing
                Just Int64
n  ->
                    if Int64
n Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int64
0
                    then Count -> Maybe Count
forall a. a -> Maybe a
Just Count
0
                    else Count -> Maybe Count
forall a. a -> Maybe a
Just (Int64 -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
n)
       }

getYieldLimit :: State t m a -> Maybe Count
getYieldLimit :: State t m a -> Maybe Count
getYieldLimit = State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
_yieldLimit

setMaxThreads :: Int -> State t m a -> State t m a
setMaxThreads :: Int -> State t m a -> State t m a
setMaxThreads Int
n State t m a
st =
    State t m a
st { _threadsHigh :: Limit
_threadsHigh =
            if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0
            then Limit
Unlimited
            else if Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                 then Limit
defaultMaxThreads
                 else Word -> Limit
Limited (Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
       }

getMaxThreads :: State t m a -> Limit
getMaxThreads :: State t m a -> Limit
getMaxThreads = State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
_threadsHigh

setMaxBuffer :: Int -> State t m a -> State t m a
setMaxBuffer :: Int -> State t m a -> State t m a
setMaxBuffer Int
n State t m a
st =
    State t m a
st { _bufferHigh :: Limit
_bufferHigh =
            if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0
            then Limit
Unlimited
            else if Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                 then Limit
defaultMaxBuffer
                 else Word -> Limit
Limited (Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
       }

getMaxBuffer :: State t m a -> Limit
getMaxBuffer :: State t m a -> Limit
getMaxBuffer = State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
_bufferHigh

setStreamRate :: Maybe Rate -> State t m a -> State t m a
setStreamRate :: Maybe Rate -> State t m a -> State t m a
setStreamRate Maybe Rate
r State t m a
st = State t m a
st { _maxStreamRate :: Maybe Rate
_maxStreamRate = Maybe Rate
r }

getStreamRate :: State t m a -> Maybe Rate
getStreamRate :: State t m a -> Maybe Rate
getStreamRate = State t m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
_maxStreamRate

setStreamLatency :: Int -> State t m a -> State t m a
setStreamLatency :: Int -> State t m a -> State t m a
setStreamLatency Int
n State t m a
st =
    State t m a
st { _streamLatency :: Maybe NanoSecond64
_streamLatency =
            if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
            then Maybe NanoSecond64
forall a. Maybe a
Nothing
            else NanoSecond64 -> Maybe NanoSecond64
forall a. a -> Maybe a
Just (Int -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
       }

getStreamLatency :: State t m a -> Maybe NanoSecond64
getStreamLatency :: State t m a -> Maybe NanoSecond64
getStreamLatency = State t m a -> Maybe NanoSecond64
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
_streamLatency

setInspectMode :: State t m a -> State t m a
setInspectMode :: State t m a -> State t m a
setInspectMode State t m a
st = State t m a
st { _inspectMode :: Bool
_inspectMode = Bool
True }

getInspectMode :: State t m a -> Bool
getInspectMode :: State t m a -> Bool
getInspectMode = State t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
_inspectMode

-------------------------------------------------------------------------------
-- Cleanup
-------------------------------------------------------------------------------

cleanupSVar :: SVar t m a -> IO ()
cleanupSVar :: SVar t m a -> IO ()
cleanupSVar SVar t m a
sv = do
    Set ThreadId
workers <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv)
    (ThreadId -> IO ()) -> Set ThreadId -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort)
          Set ThreadId
workers

cleanupSVarFromWorker :: SVar t m a -> IO ()
cleanupSVarFromWorker :: SVar t m a -> IO ()
cleanupSVarFromWorker SVar t m a
sv = do
    Set ThreadId
workers <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv)
    ThreadId
self <- IO ThreadId
myThreadId
    (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort)
          ((ThreadId -> Bool) -> [ThreadId] -> [ThreadId]
forall a. (a -> Bool) -> [a] -> [a]
Prelude.filter (ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
/= ThreadId
self) ([ThreadId] -> [ThreadId]) -> [ThreadId] -> [ThreadId]
forall a b. (a -> b) -> a -> b
$ Set ThreadId -> [ThreadId]
forall a. Set a -> [a]
S.toList Set ThreadId
workers)

-------------------------------------------------------------------------------
-- Worker latency data collection
-------------------------------------------------------------------------------

-- Every once in a while workers update the latencies and check the yield rate.
-- They return if we are above the expected yield rate. If we check too often
-- it may impact performance, if we check less often we may have a stale
-- picture. We update every minThreadDelay but we translate that into a yield
-- count based on latency so that the checking overhead is little.
--
-- XXX use a generation count to indicate that the value is updated. If the
-- value is updated an existing worker must check it again on the next yield.
-- Otherwise it is possible that we may keep updating it and because of the mod
-- worker keeps skipping it.
updateWorkerPollingInterval :: YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval :: YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval YieldRateInfo
yinfo NanoSecond64
latency = do
    let periodRef :: IORef Count
periodRef = YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo
        cnt :: NanoSecond64
cnt = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
max NanoSecond64
1 (NanoSecond64 -> NanoSecond64) -> NanoSecond64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ NanoSecond64
minThreadDelay NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
latency
        period :: NanoSecond64
period = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min NanoSecond64
cnt (Word -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)

    IORef Count -> Count -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Count
periodRef (NanoSecond64 -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
period)

{-# INLINE recordMinMaxLatency #-}
recordMinMaxLatency :: SVar t m a -> NanoSecond64 -> IO ()
recordMinMaxLatency :: SVar t m a -> NanoSecond64 -> IO ()
recordMinMaxLatency SVar t m a
sv NanoSecond64
new = do
    let ss :: SVarStats
ss = SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv
    NanoSecond64
minLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NanoSecond64
new NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
< NanoSecond64
minLat Bool -> Bool -> Bool
|| NanoSecond64
minLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        IORef NanoSecond64 -> NanoSecond64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss) NanoSecond64
new

    NanoSecond64
maxLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NanoSecond64
new NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
maxLat) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef NanoSecond64 -> NanoSecond64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss) NanoSecond64
new

recordAvgLatency :: SVar t m a -> (Count, NanoSecond64) -> IO ()
recordAvgLatency :: SVar t m a -> (Count, NanoSecond64) -> IO ()
recordAvgLatency SVar t m a
sv (Count
count, NanoSecond64
time) = do
    let ss :: SVarStats
ss = SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv
    IORef (Count, NanoSecond64)
-> ((Count, NanoSecond64) -> (Count, NanoSecond64)) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency SVarStats
ss) (((Count, NanoSecond64) -> (Count, NanoSecond64)) -> IO ())
-> ((Count, NanoSecond64) -> (Count, NanoSecond64)) -> IO ()
forall a b. (a -> b) -> a -> b
$
        \(Count
cnt, NanoSecond64
t) -> (Count
cnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
count, NanoSecond64
t NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
time)

-- Pour the pending latency stats into a collection bucket
{-# INLINE collectWorkerPendingLatency #-}
collectWorkerPendingLatency
    :: IORef (Count, Count, NanoSecond64)
    -> IORef (Count, Count, NanoSecond64)
    -> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency :: IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency IORef (Count, Count, NanoSecond64)
cur IORef (Count, Count, NanoSecond64)
col = do
    (Count
fcount, Count
count, NanoSecond64
time) <- IORef (Count, Count, NanoSecond64)
-> ((Count, Count, NanoSecond64)
    -> ((Count, Count, NanoSecond64), (Count, Count, NanoSecond64)))
-> IO (Count, Count, NanoSecond64)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Count, Count, NanoSecond64)
cur (((Count, Count, NanoSecond64)
  -> ((Count, Count, NanoSecond64), (Count, Count, NanoSecond64)))
 -> IO (Count, Count, NanoSecond64))
-> ((Count, Count, NanoSecond64)
    -> ((Count, Count, NanoSecond64), (Count, Count, NanoSecond64)))
-> IO (Count, Count, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ \(Count, Count, NanoSecond64)
v -> ((Count
0,Count
0,NanoSecond64
0), (Count, Count, NanoSecond64)
v)

    (Count
fcnt, Count
cnt, NanoSecond64
t) <- IORef (Count, Count, NanoSecond64)
-> IO (Count, Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
col
    let totalCount :: Count
totalCount = Count
fcnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
fcount
        latCount :: Count
latCount   = Count
cnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
count
        latTime :: NanoSecond64
latTime    = NanoSecond64
t NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
time
    IORef (Count, Count, NanoSecond64)
-> (Count, Count, NanoSecond64) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Count, Count, NanoSecond64)
col (Count
totalCount, Count
latCount, NanoSecond64
latTime)

    Bool -> IO () -> IO ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Count
latCount Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
== Count
0 Bool -> Bool -> Bool
|| NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
/= NanoSecond64
0) (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
    let latPair :: Maybe (Count, NanoSecond64)
latPair =
            if Count
latCount Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0 Bool -> Bool -> Bool
&& NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
            then (Count, NanoSecond64) -> Maybe (Count, NanoSecond64)
forall a. a -> Maybe a
Just (Count
latCount, NanoSecond64
latTime)
            else Maybe (Count, NanoSecond64)
forall a. Maybe a
Nothing
    (Count, Maybe (Count, NanoSecond64))
-> IO (Count, Maybe (Count, NanoSecond64))
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
totalCount, Maybe (Count, NanoSecond64)
latPair)

{-# INLINE shouldUseCollectedBatch #-}
shouldUseCollectedBatch
    :: Count
    -> NanoSecond64
    -> NanoSecond64
    -> NanoSecond64
    -> Bool
shouldUseCollectedBatch :: Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> Bool
shouldUseCollectedBatch Count
collectedYields NanoSecond64
collectedTime NanoSecond64
newLat NanoSecond64
prevLat =
    let r :: Double
r = NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
newLat Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
prevLat :: Double
    in     (Count
collectedYields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Word -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)
        Bool -> Bool -> Bool
|| (NanoSecond64
collectedTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
minThreadDelay)
        Bool -> Bool -> Bool
|| (NanoSecond64
prevLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 Bool -> Bool -> Bool
&& (Double
r Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
2 Bool -> Bool -> Bool
|| Double
r Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
0.5))
        Bool -> Bool -> Bool
|| (NanoSecond64
prevLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0)

-- Returns a triple, (1) yield count since last collection, (2) the base time
-- when we started counting, (3) average latency in the last measurement
-- period. The former two are used for accurate measurement of the going rate
-- whereas the average is used for future estimates e.g. how many workers
-- should be maintained to maintain the rate.
-- CAUTION! keep it in sync with getWorkerLatency
collectLatency :: SVar t m a
               -> YieldRateInfo
               -> Bool
               -> IO (Count, AbsTime, NanoSecond64)
collectLatency :: SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
drain = do
    let cur :: IORef (Count, Count, NanoSecond64)
cur      = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
        col :: IORef (Count, Count, NanoSecond64)
col      = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency YieldRateInfo
yinfo
        longTerm :: IORef (Count, AbsTime)
longTerm = YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
        measured :: IORef NanoSecond64
measured = YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency YieldRateInfo
yinfo

    (Count
newCount, Maybe (Count, NanoSecond64)
newLatPair) <- IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency IORef (Count, Count, NanoSecond64)
cur IORef (Count, Count, NanoSecond64)
col
    (Count
lcount, AbsTime
ltime) <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef IORef (Count, AbsTime)
longTerm
    NanoSecond64
prevLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef IORef NanoSecond64
measured

    let newLcount :: Count
newLcount = Count
lcount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
newCount
        retWith :: c -> m (Count, AbsTime, c)
retWith c
lat = (Count, AbsTime, c) -> m (Count, AbsTime, c)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
newLcount, AbsTime
ltime, c
lat)

    case Maybe (Count, NanoSecond64)
newLatPair of
        Maybe (Count, NanoSecond64)
Nothing -> NanoSecond64 -> IO (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) c. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
prevLat
        Just (Count
count, NanoSecond64
time) -> do
            let newLat :: NanoSecond64
newLat = NanoSecond64
time NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
count
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> NanoSecond64 -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> NanoSecond64 -> IO ()
recordMinMaxLatency SVar t m a
sv NanoSecond64
newLat
            -- When we have collected a significant sized batch we compute the
            -- new latency using that batch and return the new latency,
            -- otherwise we return the previous latency derived from the
            -- previous batch.
            if Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> Bool
shouldUseCollectedBatch Count
newCount NanoSecond64
time NanoSecond64
newLat NanoSecond64
prevLat Bool -> Bool -> Bool
|| Bool
drain
            then do
                -- XXX make this NOINLINE?
                YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval YieldRateInfo
yinfo (NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
max NanoSecond64
newLat NanoSecond64
prevLat)
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> (Count, NanoSecond64) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (Count, NanoSecond64) -> IO ()
recordAvgLatency SVar t m a
sv (Count
count, NanoSecond64
time)
                IORef (Count, Count, NanoSecond64)
-> (Count, Count, NanoSecond64) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Count, Count, NanoSecond64)
col (Count
0, Count
0, NanoSecond64
0)
                IORef NanoSecond64 -> NanoSecond64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef NanoSecond64
measured ((NanoSecond64
prevLat NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
newLat) NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
2)
                IORef (Count, AbsTime)
-> ((Count, AbsTime) -> (Count, AbsTime)) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Count, AbsTime)
longTerm (((Count, AbsTime) -> (Count, AbsTime)) -> IO ())
-> ((Count, AbsTime) -> (Count, AbsTime)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Count
_, AbsTime
t) -> (Count
newLcount, AbsTime
t)
                NanoSecond64 -> IO (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) c. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
newLat
            else NanoSecond64 -> IO (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) c. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
prevLat

-------------------------------------------------------------------------------
-- Dumping the SVar for debug/diag
-------------------------------------------------------------------------------

dumpSVarStats :: SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats :: SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats SVar t m a
sv SVarStats
ss SVarStyle
style = do
    case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
        Maybe YieldRateInfo
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just YieldRateInfo
yinfo -> do
            (Count, AbsTime, NanoSecond64)
_ <- IO (Count, AbsTime, NanoSecond64)
-> IO (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Count, AbsTime, NanoSecond64)
 -> IO (Count, AbsTime, NanoSecond64))
-> IO (Count, AbsTime, NanoSecond64)
-> IO (Count, AbsTime, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
True
            () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    Int
dispatches <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
totalDispatches SVarStats
ss
    Int
maxWrk <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxWorkers SVarStats
ss
    Int
maxOq <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxOutQSize SVarStats
ss
    Int
maxHp <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxHeapSize SVarStats
ss
    NanoSecond64
minLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef (IORef NanoSecond64 -> IO NanoSecond64)
-> IORef NanoSecond64 -> IO NanoSecond64
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss
    NanoSecond64
maxLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef (IORef NanoSecond64 -> IO NanoSecond64)
-> IORef NanoSecond64 -> IO NanoSecond64
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss
    (Count
avgCnt, NanoSecond64
avgTime) <- IORef (Count, NanoSecond64) -> IO (Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef (IORef (Count, NanoSecond64) -> IO (Count, NanoSecond64))
-> IORef (Count, NanoSecond64) -> IO (Count, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency SVarStats
ss
    (Count
svarCnt, Count
svarGainLossCnt, RelTime64
svarLat) <- case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
        Maybe YieldRateInfo
Nothing -> (Count, Count, RelTime64) -> IO (Count, Count, RelTime64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
0, Count
0, RelTime64
0)
        Just YieldRateInfo
yinfo -> do
            (Count
cnt, AbsTime
startTime) <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef (IORef (Count, AbsTime) -> IO (Count, AbsTime))
-> IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
            if Count
cnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
            then do
                Maybe AbsTime
t <- IORef (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime SVarStats
ss)
                Count
gl <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
                case Maybe AbsTime
t of
                    Maybe AbsTime
Nothing -> do
                        AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
                        let interval :: RelTime64
interval = AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
startTime
                        (Count, Count, RelTime64) -> IO (Count, Count, RelTime64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
cnt, Count
gl, RelTime64
interval RelTime64 -> RelTime64 -> RelTime64
forall a. Integral a => a -> a -> a
`div` Count -> RelTime64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
cnt)
                    Just AbsTime
stopTime -> do
                        let interval :: RelTime64
interval = AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
stopTime AbsTime
startTime
                        (Count, Count, RelTime64) -> IO (Count, Count, RelTime64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
cnt, Count
gl, RelTime64
interval RelTime64 -> RelTime64 -> RelTime64
forall a. Integral a => a -> a -> a
`div` Count -> RelTime64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
cnt)
            else (Count, Count, RelTime64) -> IO (Count, Count, RelTime64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
0, Count
0, RelTime64
0)

    String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
        [ String
"total dispatches = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
dispatches
        , String
"max workers = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
maxWrk
        , String
"max outQSize = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
maxOq
            String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (if SVarStyle
style SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar
               then String
"\nheap max size = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
maxHp
               else String
"")
            String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (if NanoSecond64
minLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
               then String
"\nmin worker latency = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> String
showNanoSecond64 NanoSecond64
minLat
               else String
"")
            String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (if NanoSecond64
maxLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
               then String
"\nmax worker latency = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> String
showNanoSecond64 NanoSecond64
maxLat
               else String
"")
            String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (if Count
avgCnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
                then let lat :: NanoSecond64
lat = NanoSecond64
avgTime NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
avgCnt
                     in String
"\navg worker latency = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> String
showNanoSecond64 NanoSecond64
lat
                else String
"")
            String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (if RelTime64
svarLat RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
> RelTime64
0
               then String
"\nSVar latency = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> RelTime64 -> String
showRelTime64 RelTime64
svarLat
               else String
"")
            String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (if Count
svarCnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
               then String
"\nSVar yield count = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Count -> String
forall a. Show a => a -> String
show Count
svarCnt
               else String
"")
            String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (if Count
svarGainLossCnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
               then String
"\nSVar gain/loss yield count = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Count -> String
forall a. Show a => a -> String
show Count
svarGainLossCnt
               else String
"")
        ]

{-# NOINLINE dumpSVar #-}
dumpSVar :: SVar t m a -> IO String
dumpSVar :: SVar t m a -> IO String
dumpSVar SVar t m a
sv = do
    ([ChildEvent a]
oqList, Int
oqLen) <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int))
-> IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv
    Maybe ()
db <- MVar () -> IO (Maybe ())
forall a. MVar a -> IO (Maybe a)
tryReadMVar (MVar () -> IO (Maybe ())) -> MVar () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv
    String
aheadDump <-
        if SVar t m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar
        then do
            (Heap (Entry Int (AheadHeapEntry t m a))
oheap, Maybe Int
oheapSeq) <- IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. IORef a -> IO a
readIORef (IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
 -> IO (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a b. (a -> b) -> a -> b
$ SVar t m a
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap SVar t m a
sv
            ([t m a]
wq, Int
wqSeq) <- IORef ([t m a], Int) -> IO ([t m a], Int)
forall a. IORef a -> IO a
readIORef (IORef ([t m a], Int) -> IO ([t m a], Int))
-> IORef ([t m a], Int) -> IO ([t m a], Int)
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef ([t m a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int)
aheadWorkQueue SVar t m a
sv
            String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
                [ String
"heap length = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show (Heap (Entry Int (AheadHeapEntry t m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry t m a))
oheap)
                , String
"heap seqeunce = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Maybe Int -> String
forall a. Show a => a -> String
show Maybe Int
oheapSeq
                , String
"work queue length = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show ([t m a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [t m a]
wq)
                , String
"work queue sequence = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
wqSeq
                ]
        else String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return []

    let style :: SVarStyle
style = SVar t m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv
    Bool
waiting <-
        if SVarStyle
style SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
/= SVarStyle
ParallelVar
        then IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (IORef Bool -> IO Bool) -> IORef Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv
        else Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    Set ThreadId
rthread <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (IORef (Set ThreadId) -> IO (Set ThreadId))
-> IORef (Set ThreadId) -> IO (Set ThreadId)
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv
    Int
workers <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
    String
stats <- SVar t m a -> SVarStats -> SVarStyle -> IO String
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats SVar t m a
sv (SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv) (SVar t m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv)

    String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
        [
          String
"Creator tid = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> ThreadId -> String
forall a. Show a => a -> String
show (SVar t m a -> ThreadId
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId
svarCreator SVar t m a
sv),
          String
"style = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> SVarStyle -> String
forall a. Show a => a -> String
show (SVar t m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv)
        , String
"---------CURRENT STATE-----------"
        , String
"outputQueue length computed  = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show ([ChildEvent a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ChildEvent a]
oqList)
        , String
"outputQueue length maintained = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
oqLen
        -- XXX print the types of events in the outputQueue, first 5
        , String
"outputDoorBell = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Maybe () -> String
forall a. Show a => a -> String
show Maybe ()
db
        ]
        String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
aheadDump
        String -> ShowS
forall a. Semigroup a => a -> a -> a
<> [String] -> String
unlines
        [ String
"needDoorBell = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Bool -> String
forall a. Show a => a -> String
show Bool
waiting
        , String
"running threads = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Set ThreadId -> String
forall a. Show a => a -> String
show Set ThreadId
rthread
        -- XXX print the status of first 5 threads
        , String
"running thread count = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
workers
        ]
        String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"---------STATS-----------\n"
        String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
stats

printSVar :: SVar t m a -> String -> IO ()
printSVar :: SVar t m a -> String -> IO ()
printSVar SVar t m a
sv String
how = do
    String
svInfo <- SVar t m a -> IO String
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv
    Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
how String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
svInfo

-- MVar diagnostics has some overhead - around 5% on AsyncT null benchmark, we
-- can keep it on in production to debug problems quickly if and when they
-- happen, but it may result in unexpected output when threads are left hanging
-- until they are GCed because the consumer went away.

{-# NOINLINE mvarExcHandler #-}
mvarExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler SVar t m a
sv String
label e :: BlockedIndefinitelyOnMVar
e@BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar = do
    String
svInfo <- SVar t m a -> IO String
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv
    Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
label String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"BlockedIndefinitelyOnMVar\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
svInfo
    BlockedIndefinitelyOnMVar -> IO ()
forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnMVar
e

{-# NOINLINE stmExcHandler #-}
stmExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler SVar t m a
sv String
label e :: BlockedIndefinitelyOnSTM
e@BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM = do
    String
svInfo <- SVar t m a -> IO String
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv
    Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
label String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"BlockedIndefinitelyOnSTM\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
svInfo
    BlockedIndefinitelyOnSTM -> IO ()
forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnSTM
e

withDiagMVar :: SVar t m a -> String -> IO () -> IO ()
withDiagMVar :: SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar t m a
sv String
label IO ()
action =
    if SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv
    then
        IO ()
action IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
`catches` [ (BlockedIndefinitelyOnMVar -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler SVar t m a
sv String
label)
                         , (BlockedIndefinitelyOnSTM -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler SVar t m a
sv String
label)
                         ]
    else IO ()
action

------------------------------------------------------------------------------
-- Spawning threads
------------------------------------------------------------------------------

-- | When we run computations concurrently, we completely isolate the state of
-- the concurrent computations from the parent computation.  The invariant is
-- that we should never be running two concurrent computations in the same
-- thread without using the runInIO function.  Also, we should never be running
-- a concurrent computation in the parent thread, otherwise it may affect the
-- state of the parent which is against the defined semantics of concurrent
-- execution.
captureMonadState :: MonadBaseControl IO m => m (RunInIO m)
captureMonadState :: m (RunInIO m)
captureMonadState = (RunInBase m IO -> IO (StM m (RunInIO m))) -> m (RunInIO m)
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b (StM m a)) -> m a
control ((RunInBase m IO -> IO (StM m (RunInIO m))) -> m (RunInIO m))
-> (RunInBase m IO -> IO (StM m (RunInIO m))) -> m (RunInIO m)
forall a b. (a -> b) -> a -> b
$ \RunInBase m IO
run -> m (RunInIO m) -> IO (StM m (RunInIO m))
RunInBase m IO
run (RunInIO m -> m (RunInIO m)
forall (m :: * -> *) a. Monad m => a -> m a
return (RunInIO m -> m (RunInIO m)) -> RunInIO m -> m (RunInIO m)
forall a b. (a -> b) -> a -> b
$ RunInBase m IO -> RunInIO m
forall (m :: * -> *). (forall b. m b -> IO (StM m b)) -> RunInIO m
RunInIO RunInBase m IO
run)

------------------------------------------------------------------------------
-- Collecting results from child workers in a streamed fashion
------------------------------------------------------------------------------

-- XXX Can we make access to remainingWork and yieldRateInfo fields in sv
-- faster, along with the fields in sv required by send?
-- XXX make it noinline
--
-- XXX we may want to employ an increment and decrement in batches when the
-- througput is high or when the cost of synchronization is high. For example
-- if the application is distributed then inc/dec of a shared variable may be
-- very costly.
--
-- A worker decrements the yield limit before it executes an action. However,
-- the action may not result in an element being yielded, in that case we have
-- to increment the yield limit.
--
-- Note that we need it to be an Int type so that we have the ability to undo a
-- decrement that takes it below zero.
{-# INLINE decrementYieldLimit #-}
decrementYieldLimit :: SVar t m a -> IO Bool
decrementYieldLimit :: SVar t m a -> IO Bool
decrementYieldLimit SVar t m a
sv =
    case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
        Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Just IORef Count
ref -> do
            Count
r <- IORef Count -> (Count -> (Count, Count)) -> IO Count
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref ((Count -> (Count, Count)) -> IO Count)
-> (Count -> (Count, Count)) -> IO Count
forall a b. (a -> b) -> a -> b
$ \Count
x -> (Count
x Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
1, Count
x)
            Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Count
r Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
1

{-# INLINE incrementYieldLimit #-}
incrementYieldLimit :: SVar t m a -> IO ()
incrementYieldLimit :: SVar t m a -> IO ()
incrementYieldLimit SVar t m a
sv =
    case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
        Maybe (IORef Count)
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just IORef Count
ref -> IORef Count -> (Count -> Count) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ IORef Count
ref (Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
1)

-- XXX exception safety of all atomic/MVar operations

-- TBD Each worker can have their own queue and the consumer can empty one
-- queue at a time, that way contention can be reduced.

-- XXX Only yields should be counted in the buffer limit and not the Stop
-- events.

{-# INLINE decrementBufferLimit #-}
decrementBufferLimit :: SVar t m a -> IO ()
decrementBufferLimit :: SVar t m a -> IO ()
decrementBufferLimit SVar t m a
sv =
    case SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
        Limit
Unlimited -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Limited Word
_ -> do
            let ref :: IORef Count
ref = SVar t m a -> IORef Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv
            Count
old <- IORef Count -> (Count -> (Count, Count)) -> IO Count
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref ((Count -> (Count, Count)) -> IO Count)
-> (Count -> (Count, Count)) -> IO Count
forall a b. (a -> b) -> a -> b
$ \Count
x ->
                        (if Count
x Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
1 then Count
x Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
1 else Count
x, Count
x)
            Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
old Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                case SVar t m a -> PushBufferPolicy
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> PushBufferPolicy
pushBufferPolicy SVar t m a
sv of
                    PushBufferPolicy
PushBufferBlock -> IO ()
blockAndRetry
                    PushBufferPolicy
PushBufferDropNew -> do
                        -- We just drop one item and proceed. It is possible
                        -- that by the time we drop the item the consumer
                        -- thread might have run and created space in the
                        -- buffer, but we do not care about that condition.
                        -- This is not pedantically correct but it should be
                        -- fine in practice.
                        -- XXX we may want to drop only if n == maxBuf
                        -- otherwise we must have space in the buffer and a
                        -- decrement should be possible.
                        Bool
block <- IORef ([ChildEvent a], Int)
-> (([ChildEvent a], Int) -> (([ChildEvent a], Int), Bool))
-> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv) ((([ChildEvent a], Int) -> (([ChildEvent a], Int), Bool))
 -> IO Bool)
-> (([ChildEvent a], Int) -> (([ChildEvent a], Int), Bool))
-> IO Bool
forall a b. (a -> b) -> a -> b
$
                            \([ChildEvent a]
es, Int
n) ->
                                case [ChildEvent a]
es of
                                    [] -> (([],Int
n), Bool
True)
                                    ChildEvent a
_ : [ChildEvent a]
xs -> (([ChildEvent a]
xs, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1), Bool
False)
                        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
block IO ()
blockAndRetry
                    -- XXX need a dequeue or ring buffer for this
                    PushBufferPolicy
PushBufferDropOld -> IO ()
forall a. (?callStack::CallStack) => a
undefined

    where

    blockAndRetry :: IO ()
blockAndRetry = do
        let ref :: IORef Count
ref = SVar t m a -> IORef Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv
        IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv)
        Count
old <- IORef Count -> (Count -> (Count, Count)) -> IO Count
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref ((Count -> (Count, Count)) -> IO Count)
-> (Count -> (Count, Count)) -> IO Count
forall a b. (a -> b) -> a -> b
$ \Count
x ->
                        (if Count
x Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
1 then Count
x Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
1 else Count
x, Count
x)
        -- When multiple threads sleep on takeMVar, the first thread would
        -- wakeup due to a putMVar by the consumer, but the rest of the threads
        -- would have to put back the MVar after taking it and decrementing the
        -- buffer count, otherwise all other threads will remain asleep.
        if Count
old Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
1
        then IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> IO Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
        -- We do not put the MVar back in this case, instead we
        -- wait for the consumer to put it.
        else IO ()
blockAndRetry

{-# INLINE incrementBufferLimit #-}
incrementBufferLimit :: SVar t m a -> IO ()
incrementBufferLimit :: SVar t m a -> IO ()
incrementBufferLimit SVar t m a
sv =
    case SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
        Limit
Unlimited -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Limited Word
_ -> do
            IORef Count -> (Count -> Count) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv) (Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
1)
            IO ()
writeBarrier
            IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> IO Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()

{-# INLINE resetBufferLimit #-}
resetBufferLimit :: SVar t m a -> IO ()
resetBufferLimit :: SVar t m a -> IO ()
resetBufferLimit SVar t m a
sv =
    case SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
        Limit
Unlimited -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Limited Word
n -> IORef Count -> (Count -> Count) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv)
                                           (Count -> Count -> Count
forall a b. a -> b -> a
const (Word -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
n))

{-# INLINE sendWithDoorBell #-}
sendWithDoorBell ::
    IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell :: IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell ChildEvent a
msg = do
    -- XXX can the access to outputQueue be made faster somehow?
    Int
oldlen <- IORef ([ChildEvent a], Int)
-> (([ChildEvent a], Int) -> (([ChildEvent a], Int), Int))
-> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([ChildEvent a], Int)
q ((([ChildEvent a], Int) -> (([ChildEvent a], Int), Int)) -> IO Int)
-> (([ChildEvent a], Int) -> (([ChildEvent a], Int), Int))
-> IO Int
forall a b. (a -> b) -> a -> b
$ \([ChildEvent a]
es, Int
n) ->
        ((ChildEvent a
msg ChildEvent a -> [ChildEvent a] -> [ChildEvent a]
forall a. a -> [a] -> [a]
: [ChildEvent a]
es, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Int
n)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
oldlen Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        -- The wake up must happen only after the store has finished otherwise
        -- we can have lost wakeup problems.
        IO ()
writeBarrier
        -- Since multiple workers can try this at the same time, it is possible
        -- that we may put a spurious MVar after the consumer has already seen
        -- the output. But that's harmless, at worst it may cause the consumer
        -- to read the queue again and find it empty.
        -- The important point is that the consumer is guaranteed to receive a
        -- doorbell if something was added to the queue after it empties it.
        IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
bell ()
    Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
oldlen

-- | This function is used by the producer threads to queue output for the
-- consumer thread to consume. Returns whether the queue has more space.
send :: SVar t m a -> ChildEvent a -> IO Int
send :: SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv = IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv) (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)

-- There is no bound implemented on the buffer, this is assumed to be low
-- traffic.
sendToProducer :: SVar t m a -> ChildEvent a -> IO Int
sendToProducer :: SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv ChildEvent a
msg = do
    -- In case the producer stream is blocked on pushing to the fold buffer
    -- then wake it up so that it can check for the stop event or exception
    -- being sent to it otherwise we will be deadlocked.
    IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
    IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer SVar t m a
sv)
                     (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar t m a
sv) ChildEvent a
msg

-- {-# NOINLINE sendStopToProducer #-}
sendStopToProducer :: MonadIO m => SVar t m a -> m ()
sendStopToProducer :: SVar t m a -> m ()
sendStopToProducer SVar t m a
sv = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    ThreadId
tid <- IO ThreadId
myThreadId
    IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv (ThreadId -> Maybe SomeException -> ChildEvent a
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid Maybe SomeException
forall a. Maybe a
Nothing)

workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo = do
    (Count
cnt0, AbsTime
t0) <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo)
    Count
cnt1 <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo)
    let cnt :: Count
cnt = Count
cnt1 Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
cnt0

    if Count
cnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
    then do
        AbsTime
t1 <- Clock -> IO AbsTime
getTime Clock
Monotonic
        let period :: NanoSecond64
period = RelTime64 -> NanoSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (RelTime64 -> NanoSecond64) -> RelTime64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
t1 AbsTime
t0
        IORef (Count, AbsTime) -> (Count, AbsTime) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo) (Count
cnt1, AbsTime
t1)
        Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64)))
-> Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64))
forall a b. (a -> b) -> a -> b
$ (Count, NanoSecond64) -> Maybe (Count, NanoSecond64)
forall a. a -> Maybe a
Just (Count
cnt, NanoSecond64
period)
    else Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Count, NanoSecond64)
forall a. Maybe a
Nothing

-- XXX There are a number of gotchas in measuring latencies.
-- 1) We measure latencies only when a worker yields a value
-- 2) It is possible that a stream calls the stop continuation, in which case
-- the worker would not yield a value and we would not account that worker in
-- latencies. Even though this case should ideally be accounted we do not
-- account it because we cannot or do not distinguish it from the case
-- described next.
-- 3) It is possible that a worker returns without yielding anything because it
-- never got a chance to pick up work.
-- 4) If the system timer resolution is lower than the latency, the latency
-- computation turns out to be zero.
--
-- We can fix this if we measure the latencies by counting the work items
-- picked rather than based on the outputs yielded.
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
yinfo WorkerInfo
winfo = do
    Maybe (Count, NanoSecond64)
r <- WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo
    case Maybe (Count, NanoSecond64)
r of
        Just (Count
cnt, NanoSecond64
period) -> do
        -- NOTE: On JS platform the timer resolution could be pretty low. When
        -- the timer resolution is low, measurement of latencies could be
        -- tricky. All the worker latencies will turn out to be zero if they
        -- are lower than the resolution. We only take into account those
        -- measurements which are more than the timer resolution.

            let ref :: IORef (Count, Count, NanoSecond64)
ref = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
                (Count
cnt1, NanoSecond64
t1) = if NanoSecond64
period NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then (Count
cnt, NanoSecond64
period) else (Count
0, NanoSecond64
0)
            IORef (Count, Count, NanoSecond64)
-> ((Count, Count, NanoSecond64) -> (Count, Count, NanoSecond64))
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ IORef (Count, Count, NanoSecond64)
ref (((Count, Count, NanoSecond64) -> (Count, Count, NanoSecond64))
 -> IO ())
-> ((Count, Count, NanoSecond64) -> (Count, Count, NanoSecond64))
-> IO ()
forall a b. (a -> b) -> a -> b
$
                    \(Count
fc, Count
n, NanoSecond64
t) -> (Count
fc Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
cnt, Count
n Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
cnt1, NanoSecond64
t NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
t1)
        Maybe (Count, NanoSecond64)
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount WorkerInfo
winfo = do
    Count
cnt <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo)
    let cnt1 :: Count
cnt1 = Count
cnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
1
    IORef Count -> Count -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo) Count
cnt1
    Count -> IO Count
forall (m :: * -> *) a. Monad m => a -> m a
return Count
cnt1

isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
winfo =
    let ymax :: Count
ymax = WorkerInfo -> Count
workerYieldMax WorkerInfo
winfo
    in Count
ymax Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& Count
cnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
ymax

-- XXX we should do rate control periodically based on the total yields rather
-- than based on the worker local yields as other workers may have yielded more
-- and we should stop based on the aggregate yields. However, latency update
-- period can be based on individual worker yields.
{-# NOINLINE checkRatePeriodic #-}
checkRatePeriodic :: SVar t m a
                  -> YieldRateInfo
                  -> WorkerInfo
                  -> Count
                  -> IO Bool
checkRatePeriodic :: SVar t m a -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo Count
ycnt = do
    Count
i <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo)
    -- XXX use generation count to check if the interval has been updated
    if Count
i Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& (Count
ycnt Count -> Count -> Count
forall a. Integral a => a -> a -> a
`mod` Count
i) Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
== Count
0
    then do
        YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
yinfo WorkerInfo
winfo
        -- XXX not required for parallel streams
        SVar t m a -> YieldRateInfo -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar t m a
sv YieldRateInfo
yinfo
    else Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

-- CAUTION! this also updates the yield count and therefore should be called
-- only when we are actually yielding an element.
{-# NOINLINE workerRateControl #-}
workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo = do
    Count
cnt <- WorkerInfo -> IO Count
updateYieldCount WorkerInfo
winfo
    Bool
beyondMaxRate <- SVar t m a -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo Count
cnt
    Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not (Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
winfo Bool -> Bool -> Bool
|| Bool
beyondMaxRate)

-- XXX we should do rate control here but not latency update in case of ahead
-- streams. latency update must be done when we yield directly to outputQueue
-- or when we yield to heap.
--
-- returns whether the worker should continue (True) or stop (False).
{-# INLINE sendYield #-}
sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar t m a
sv Maybe WorkerInfo
mwinfo ChildEvent a
msg = do
    Int
oldlen <- SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv ChildEvent a
msg
    let limit :: Limit
limit = SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv
    Bool
bufferSpaceOk <- case Limit
limit of
            Limit
Unlimited -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            Limited Word
lim -> do
                Int
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv)
                Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ (Int
oldlen Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< (Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
active)
    Bool
rateLimitOk <-
        case Maybe WorkerInfo
mwinfo of
            Just WorkerInfo
winfo ->
                case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
                    Maybe YieldRateInfo
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                    Just YieldRateInfo
yinfo -> SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo
            Maybe WorkerInfo
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
    Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
bufferSpaceOk Bool -> Bool -> Bool
&& Bool
rateLimitOk

{-# INLINE workerStopUpdate #-}
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
info = do
    Count
i <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
info)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
i Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
info WorkerInfo
winfo

{-# INLINABLE sendStop #-}
sendStop :: SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop :: SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
sv Maybe WorkerInfo
mwinfo = do
    IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
    case (Maybe WorkerInfo
mwinfo, SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv) of
      (Just WorkerInfo
winfo, Just YieldRateInfo
info) ->
          WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
info
      (Maybe WorkerInfo, Maybe YieldRateInfo)
_ ->
          () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ThreadId
tid -> IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv (ThreadId -> Maybe SomeException -> ChildEvent a
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid Maybe SomeException
forall a. Maybe a
Nothing)

-------------------------------------------------------------------------------
-- Doorbell
-------------------------------------------------------------------------------

{-# INLINE ringDoorBell #-}
ringDoorBell :: SVar t m a -> IO ()
ringDoorBell :: SVar t m a -> IO ()
ringDoorBell SVar t m a
sv = do
    IO ()
storeLoadBarrier
    Bool
w <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (IORef Bool -> IO Bool) -> IORef Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
w (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        -- Note: the sequence of operations is important for correctness here.
        -- We need to set the flag to false strictly before sending the
        -- outputDoorBell, otherwise the outputDoorBell may get processed too
        -- early and then we may set the flag to False to later making the
        -- consumer lose the flag, even without receiving a outputDoorBell.
        IORef Bool -> (Bool -> Bool) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv) (Bool -> Bool -> Bool
forall a b. a -> b -> a
const Bool
False)
        IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv) ()

-------------------------------------------------------------------------------
-- Async
-------------------------------------------------------------------------------

-- Note: For purely right associated expressions this queue should have at most
-- one element. It grows to more than one when we have left associcated
-- expressions. Large left associated compositions can grow this to a
-- large size
{-# INLINE enqueueLIFO #-}
enqueueLIFO ::
       SVar t m a -> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO :: SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar t m a
sv IORef [(RunInIO m, t m a)]
q (RunInIO m, t m a)
m = do
    IORef [(RunInIO m, t m a)]
-> ([(RunInIO m, t m a)] -> [(RunInIO m, t m a)]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ IORef [(RunInIO m, t m a)]
q (([(RunInIO m, t m a)] -> [(RunInIO m, t m a)]) -> IO ())
-> ([(RunInIO m, t m a)] -> [(RunInIO m, t m a)]) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[(RunInIO m, t m a)]
ms -> (RunInIO m, t m a)
m (RunInIO m, t m a) -> [(RunInIO m, t m a)] -> [(RunInIO m, t m a)]
forall a. a -> [a] -> [a]
: [(RunInIO m, t m a)]
ms
    SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv

-------------------------------------------------------------------------------
-- WAsync
-------------------------------------------------------------------------------

-- XXX we can use the Ahead style sequence/heap mechanism to make the best
-- effort to always try to finish the streams on the left side of an expression
-- first as long as possible.

{-# INLINE enqueueFIFO #-}
enqueueFIFO ::
       SVar t m a
    -> LinkedQueue (RunInIO m, t m a)
    -> (RunInIO m, t m a)
    -> IO ()
enqueueFIFO :: SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar t m a
sv LinkedQueue (RunInIO m, t m a)
q (RunInIO m, t m a)
m = do
    LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
forall a. LinkedQueue a -> a -> IO ()
pushL LinkedQueue (RunInIO m, t m a)
q (RunInIO m, t m a)
m
    SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv

-------------------------------------------------------------------------------
-- Ahead
-------------------------------------------------------------------------------

-- Lookahead streams can execute multiple tasks concurrently, ahead of time,
-- but always serve them in the same order as they appear in the stream. To
-- implement lookahead streams efficiently we assign a sequence number to each
-- task when the task is picked up for execution. When the task finishes, the
-- output is tagged with the same sequence number and we rearrange the outputs
-- in sequence based on that number.
--
-- To explain the mechanism imagine that the current task at the head of the
-- stream has a "token" to yield to the outputQueue. The ownership of the token
-- is determined by the current sequence number is maintained in outputHeap.
-- Sequence number is assigned when a task is queued. When a thread dequeues a
-- task it picks up the sequence number as well and when the output is ready it
-- uses the sequence number to queue the output to the outputQueue.
--
-- The thread with current sequence number sends the output directly to the
-- outputQueue. Other threads push the output to the outputHeap. When the task
-- being queued on the heap is a stream of many elements we evaluate only the
-- first element and keep the rest of the unevaluated computation in the heap.
-- When such a task gets the "token" for outputQueue it evaluates and directly
-- yields all the elements to the outputQueue without checking for the
-- "token".
--
-- Note that no two outputs in the heap can have the same sequence numbers and
-- therefore we do not need a stable heap. We have also separated the buffer
-- for the current task (outputQueue) and the pending tasks (outputHeap) so
-- that the pending tasks cannot interfere with the current task. Note that for
-- a single task just the outputQueue is enough and for the case of many
-- threads just a heap is good enough. However we balance between these two
-- cases, so that both are efficient.
--
-- For bigger streams it may make sense to have separate buffers for each
-- stream. However, for singleton streams this may become inefficient. However,
-- if we do not have separate buffers, then the streams that come later in
-- sequence may hog the buffer, hindering the streams that are ahead. For this
-- reason we have a single element buffer limitation for the streams being
-- executed in advance.
--
-- This scheme works pretty efficiently with less than 40% extra overhead
-- compared to the Async streams where we do not have any kind of sequencing of
-- the outputs. It is especially devised so that we are most efficient when we
-- have short tasks and need just a single thread. Also when a thread yields
-- many items it can hold lockfree access to the outputQueue and do it
-- efficiently.
--
-- XXX Maybe we can start the ahead threads at a lower cpu and IO priority so
-- that they do not hog the resources and hinder the progress of the threads in
-- front of them.

-- XXX Left associated ahead expressions are expensive. We start a new SVar for
-- each left associative expression. The queue is used only for right
-- associated expression, we queue the right expression and execute the left.
-- Thererefore the queue never has more than one item in it.
--
-- XXX we can fix this. When we queue more than one item on the queue we can
-- mark the previously queued item as not-runnable. The not-runnable item is
-- not dequeued until the already running one has finished and at that time we
-- would also know the exact sequence number of the already queued item.
--
-- we can even run the already queued items but they will have to be sorted in
-- layers in the heap. We can use a list of heaps for that.
{-# INLINE enqueueAhead #-}
enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead SVar t m a
sv IORef ([t m a], Int)
q (RunInIO m, t m a)
m = do
    IORef ([t m a], Int) -> (([t m a], Int) -> ([t m a], Int)) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ IORef ([t m a], Int)
q ((([t m a], Int) -> ([t m a], Int)) -> IO ())
-> (([t m a], Int) -> ([t m a], Int)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ case
        ([], Int
n) -> ([(RunInIO m, t m a) -> t m a
forall a b. (a, b) -> b
snd (RunInIO m, t m a)
m], Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)  -- increment sequence
        ([t m a], Int)
_ -> String -> ([t m a], Int)
forall a. (?callStack::CallStack) => String -> a
error String
"enqueueAhead: queue is not empty"
    SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv

-- enqueue without incrementing the sequence number
{-# INLINE reEnqueueAhead #-}
reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar t m a
sv IORef ([t m a], Int)
q t m a
m = do
    IORef ([t m a], Int) -> (([t m a], Int) -> ([t m a], Int)) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ IORef ([t m a], Int)
q ((([t m a], Int) -> ([t m a], Int)) -> IO ())
-> (([t m a], Int) -> ([t m a], Int)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ case
        ([], Int
n) -> ([t m a
m], Int
n)  -- DO NOT increment sequence
        ([t m a], Int)
_ -> String -> ([t m a], Int)
forall a. (?callStack::CallStack) => String -> a
error String
"reEnqueueAhead: queue is not empty"
    SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv

-- Normally the thread that has the token should never go away. The token gets
-- handed over to another thread, but someone or the other has the token at any
-- point of time. But if the task that has the token finds that the outputQueue
-- is full, in that case it can go away without even handing over the token to
-- another thread. In that case it sets the nextSequence number in the heap its
-- own sequence number before going away. To handle this case, any task that
-- does not have the token tries to dequeue from the heap first before
-- dequeuing from the work queue. If it finds that the task at the top of the
-- heap is the one that owns the current sequence number then it grabs the
-- token and starts with that.
--
-- XXX instead of queueing just the head element and the remaining computation
-- on the heap, evaluate as many as we can and place them on the heap. But we
-- need to give higher priority to the lower sequence numbers so that lower
-- priority tasks do not fill up the heap making higher priority tasks block
-- due to full heap. Maybe we can have a weighted space for them in the heap.
-- The weight is inversely proportional to the sequence number.
--
-- XXX review for livelock
--
{-# INLINE queueEmptyAhead #-}
queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool
queueEmptyAhead :: IORef ([t m a], Int) -> m Bool
queueEmptyAhead IORef ([t m a], Int)
q = IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
    ([t m a]
xs, Int
_) <- IORef ([t m a], Int) -> IO ([t m a], Int)
forall a. IORef a -> IO a
readIORef IORef ([t m a], Int)
q
    Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ [t m a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [t m a]
xs

{-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m
    => IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead :: IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([t m a], Int)
q = IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int))
forall a b. (a -> b) -> a -> b
$
    IORef ([t m a], Int)
-> (([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([t m a], Int)
q ((([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
 -> IO (Maybe (t m a, Int)))
-> (([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int))
forall a b. (a -> b) -> a -> b
$ \case
            ([], Int
n) -> (([], Int
n), Maybe (t m a, Int)
forall a. Maybe a
Nothing)
            (t m a
x : [], Int
n) -> (([], Int
n), (t m a, Int) -> Maybe (t m a, Int)
forall a. a -> Maybe a
Just (t m a
x, Int
n))
            ([t m a], Int)
_ -> String -> (([t m a], Int), Maybe (t m a, Int))
forall a. (?callStack::CallStack) => String -> a
error String
"more than one item on queue"

-------------------------------------------------------------------------------
-- Heap manipulation
-------------------------------------------------------------------------------

withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef IORef a
ref a -> IO b
f = IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> IO b
f

atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ IORef a
ref a -> a
f =
    IORef a -> (a -> (a, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef a
ref ((a -> (a, ())) -> IO ()) -> (a -> (a, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
x -> (a -> a
f a
x, ())

data HeapDequeueResult t m a =
      Clearing
    | Waiting Int
    | Ready (Entry Int (AheadHeapEntry t m a))

{-# INLINE dequeueFromHeap #-}
dequeueFromHeap
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> IO (HeapDequeueResult t m a)
dequeueFromHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar =
    IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
        HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
  -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
      HeapDequeueResult t m a))
 -> IO (HeapDequeueResult t m a))
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
        HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. (a -> b) -> a -> b
$ \pair :: (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair@(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        case Maybe Int
snum of
            Maybe Int
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
HeapDequeueResult t m a
Clearing)
            Just Int
n -> do
                let r :: Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r = Heap (Entry Int (AheadHeapEntry t m a))
-> Maybe
     (Entry Int (AheadHeapEntry t m a),
      Heap (Entry Int (AheadHeapEntry t m a)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
                case Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r of
                    Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
                            if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n
                            then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', Maybe Int
forall a. Maybe a
Nothing), Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
                            else Bool
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
    HeapDequeueResult t m a)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
    HeapDequeueResult t m a)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n) ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
                    Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)

{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> Int
    -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
i =
    IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
        HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
  -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
      HeapDequeueResult t m a))
 -> IO (HeapDequeueResult t m a))
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
        HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        case Maybe Int
snum of
            Maybe Int
Nothing -> do
                let r :: Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r = Heap (Entry Int (AheadHeapEntry t m a))
-> Maybe
     (Entry Int (AheadHeapEntry t m a),
      Heap (Entry Int (AheadHeapEntry t m a)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
                case Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r of
                    Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
                        if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
i
                        then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', Maybe Int
forall a. Maybe a
Nothing), Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
                        else Bool
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
    HeapDequeueResult t m a)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
    HeapDequeueResult t m a)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
i) ((Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
i), Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
                    Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
i), Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
            Just Int
_ -> String
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
    HeapDequeueResult t m a)
forall a. (?callStack::CallStack) => String -> a
error String
"dequeueFromHeapSeq: unreachable"

heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo =
    case Maybe Int
snum of
        Maybe Int
Nothing -> Bool
True
        Just Int
n -> Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n

{-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> Entry Int (AheadHeapEntry t m a)
    -> Int
    -> IO ()
requeueOnHeapTop :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Entry Int (AheadHeapEntry t m a)
ent Int
seqNo =
    IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
  -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
 -> IO ())
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        Bool
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Entry Int (AheadHeapEntry t m a)
-> Heap (Entry Int (AheadHeapEntry t m a))
-> Heap (Entry Int (AheadHeapEntry t m a))
forall a. Ord a => a -> Heap a -> Heap a
H.insert Entry Int (AheadHeapEntry t m a)
ent Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
seqNo)

{-# INLINE updateHeapSeq #-}
updateHeapSeq
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> Int
    -> IO ()
updateHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
seqNo =
    IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
  -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
 -> IO ())
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        Bool
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
seqNo)

-------------------------------------------------------------------------------
-- WAhead
-------------------------------------------------------------------------------

-- XXX To be implemented. Use a linked queue like WAsync and put back the
-- remaining computation at the back of the queue instead of the heap, and
-- increment the sequence number.

-------------------------------------------------------------------------------
-- Dispatching workers and tracking them
-------------------------------------------------------------------------------

-- Thread tracking is needed for two reasons:
--
-- 1) Killing threads on exceptions. Threads may not be left to go away by
-- themselves because they may run for significant times before going away or
-- worse they may be stuck in IO and never go away.
--
-- 2) To know when all threads are done and the stream has ended.

{-# NOINLINE addThread #-}
addThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
addThread :: SVar t m a -> ThreadId -> m ()
addThread SVar t m a
sv ThreadId
tid =
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Set ThreadId) -> (Set ThreadId -> Set ThreadId) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv) (ThreadId -> Set ThreadId -> Set ThreadId
forall a. Ord a => a -> Set a -> Set a
S.insert ThreadId
tid)

-- This is cheaper than modifyThread because we do not have to send a
-- outputDoorBell This can make a difference when more workers are being
-- dispatched.
{-# INLINE delThread #-}
delThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
delThread :: SVar t m a -> ThreadId -> m ()
delThread SVar t m a
sv ThreadId
tid =
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Set ThreadId) -> (Set ThreadId -> Set ThreadId) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv) (ThreadId -> Set ThreadId -> Set ThreadId
forall a. Ord a => a -> Set a -> Set a
S.delete ThreadId
tid)

-- If present then delete else add. This takes care of out of order add and
-- delete i.e. a delete arriving before we even added a thread.
-- This occurs when the forked thread is done even before the 'addThread' right
-- after the fork gets a chance to run.
{-# INLINE modifyThread #-}
modifyThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
modifyThread :: SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv ThreadId
tid = do
    Set ThreadId
changed <- IO (Set ThreadId) -> m (Set ThreadId)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Set ThreadId) -> m (Set ThreadId))
-> IO (Set ThreadId) -> m (Set ThreadId)
forall a b. (a -> b) -> a -> b
$ IORef (Set ThreadId)
-> (Set ThreadId -> (Set ThreadId, Set ThreadId))
-> IO (Set ThreadId)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv) ((Set ThreadId -> (Set ThreadId, Set ThreadId))
 -> IO (Set ThreadId))
-> (Set ThreadId -> (Set ThreadId, Set ThreadId))
-> IO (Set ThreadId)
forall a b. (a -> b) -> a -> b
$ \Set ThreadId
old ->
        if ThreadId -> Set ThreadId -> Bool
forall a. Ord a => a -> Set a -> Bool
S.member ThreadId
tid Set ThreadId
old
        then let new :: Set ThreadId
new = ThreadId -> Set ThreadId -> Set ThreadId
forall a. Ord a => a -> Set a -> Set a
S.delete ThreadId
tid Set ThreadId
old in (Set ThreadId
new, Set ThreadId
new)
        else let new :: Set ThreadId
new = ThreadId -> Set ThreadId -> Set ThreadId
forall a. Ord a => a -> Set a -> Set a
S.insert ThreadId
tid Set ThreadId
old in (Set ThreadId
new, Set ThreadId
old)
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set ThreadId -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null Set ThreadId
changed) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
         IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            IO ()
writeBarrier
            IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv) ()

-- | This is safe even if we are adding more threads concurrently because if
-- a child thread is adding another thread then anyway 'workerThreads' will
-- not be empty.
{-# INLINE allThreadsDone #-}
allThreadsDone :: MonadIO m => SVar t m a -> m Bool
allThreadsDone :: SVar t m a -> m Bool
allThreadsDone SVar t m a
sv = IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Set ThreadId -> Bool
forall a. Set a -> Bool
S.null (Set ThreadId -> Bool) -> IO (Set ThreadId) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv)

{-# NOINLINE handleChildException #-}
handleChildException :: SVar t m a -> SomeException -> IO ()
handleChildException :: SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv SomeException
e = do
    ThreadId
tid <- IO ThreadId
myThreadId
    IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv (ThreadId -> Maybe SomeException -> ChildEvent a
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e))

{-# NOINLINE handleFoldException #-}
handleFoldException :: SVar t m a -> SomeException -> IO ()
handleFoldException :: SVar t m a -> SomeException -> IO ()
handleFoldException SVar t m a
sv SomeException
e = do
    ThreadId
tid <- IO ThreadId
myThreadId
    IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv (ThreadId -> Maybe SomeException -> ChildEvent a
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e))

{-# NOINLINE recordMaxWorkers #-}
recordMaxWorkers :: MonadIO m => SVar t m a -> m ()
recordMaxWorkers :: SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    Int
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv)
    Int
maxWrk <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxWorkers (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxWrk) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxWorkers (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv) Int
active
    IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVarStats -> IORef Int
totalDispatches (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)

{-# NOINLINE pushWorker #-}
pushWorker :: MonadAsync m => Count -> SVar t m a -> m ()
pushWorker :: Count -> SVar t m a -> m ()
pushWorker Count
yieldMax SVar t m a
sv = do
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv
    -- This allocation matters when significant number of workers are being
    -- sent. We allocate it only when needed.
    Maybe WorkerInfo
winfo <-
        case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
            Maybe YieldRateInfo
Nothing -> Maybe WorkerInfo -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe WorkerInfo
forall a. Maybe a
Nothing
            Just YieldRateInfo
_ -> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo))
-> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ do
                IORef Count
cntRef <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
0
                AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
                IORef (Count, AbsTime)
lat <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
                Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe WorkerInfo -> IO (Maybe WorkerInfo))
-> Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ WorkerInfo -> Maybe WorkerInfo
forall a. a -> Maybe a
Just WorkerInfo :: Count -> IORef Count -> IORef (Count, AbsTime) -> WorkerInfo
WorkerInfo
                    { workerYieldMax :: Count
workerYieldMax = Count
yieldMax
                    , workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
                    , workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
                    }
    m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (SVar t m a -> Maybe WorkerInfo -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> m ()
workLoop SVar t m a
sv Maybe WorkerInfo
winfo) (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
        m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
addThread SVar t m a
sv

-- XXX we can push the workerCount modification in accountThread and use the
-- same pushWorker for Parallel case as well.
--
-- | In contrast to pushWorker which always happens only from the consumer
-- thread, a pushWorkerPar can happen concurrently from multiple threads on the
-- producer side. So we need to use a thread safe modification of
-- workerThreads. Alternatively, we can use a CreateThread event to avoid
-- using a CAS based modification.
{-# INLINE pushWorkerPar #-}
pushWorkerPar
    :: MonadAsync m
    => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar :: SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar t m a
sv Maybe WorkerInfo -> m ()
wloop =
    if SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv
    then m ()
forkWithDiag
    else m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
wloop Maybe WorkerInfo
forall a. Maybe a
Nothing) (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
            m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv

    where

    {-# NOINLINE forkWithDiag #-}
    forkWithDiag :: m ()
forkWithDiag = do
        -- We do not use workerCount in case of ParallelVar but still there is
        -- no harm in maintaining it correctly.
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
        SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv
        -- This allocation matters when significant number of workers are being
        -- sent. We allocate it only when needed. The overhead increases by 4x.
        Maybe WorkerInfo
winfo <-
            case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
                Maybe YieldRateInfo
Nothing -> Maybe WorkerInfo -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe WorkerInfo
forall a. Maybe a
Nothing
                Just YieldRateInfo
_ -> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo))
-> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ do
                    IORef Count
cntRef <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
0
                    AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
                    IORef (Count, AbsTime)
lat <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
                    Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe WorkerInfo -> IO (Maybe WorkerInfo))
-> Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ WorkerInfo -> Maybe WorkerInfo
forall a. a -> Maybe a
Just WorkerInfo :: Count -> IORef Count -> IORef (Count, AbsTime) -> WorkerInfo
WorkerInfo
                        { workerYieldMax :: Count
workerYieldMax = Count
0
                        , workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
                        , workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
                        }

        m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
wloop Maybe WorkerInfo
winfo) (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
            m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv

-- Returns:
-- True: can dispatch more
-- False: cannot dispatch any more
dispatchWorker :: MonadAsync m => Count -> SVar t m a -> m Bool
dispatchWorker :: Count -> SVar t m a -> m Bool
dispatchWorker Count
yieldCount SVar t m a
sv = do
    let workerLimit :: Limit
workerLimit = SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit SVar t m a
sv
    -- XXX in case of Ahead streams we should not send more than one worker
    -- when the work queue is done but heap is not done.
    Bool
done <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
    -- Note, "done" may not mean that the work is actually finished if there
    -- are workers active, because there may be a worker which has not yet
    -- queued the leftover work.
    if Bool -> Bool
not Bool
done
    then do
        Bool
qDone <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isQueueDone SVar t m a
sv
        -- This count may not be accurate as it is decremented by the workers
        -- and we have no synchronization with that decrement.
        Int
active <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
        if Bool -> Bool
not Bool
qDone
        then do
            -- Note that we may deadlock if the previous workers (tasks in the
            -- stream) wait/depend on the future workers (tasks in the stream)
            -- executing. In that case we should either configure the maxWorker
            -- count to higher or use parallel style instead of ahead or async
            -- style.
            Limit
limit <- case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
                Maybe (IORef Count)
Nothing -> Limit -> m Limit
forall (m :: * -> *) a. Monad m => a -> m a
return Limit
workerLimit
                Just IORef Count
ref -> do
                    Count
n <- IO Count -> m Count
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Count -> m Count) -> IO Count -> m Count
forall a b. (a -> b) -> a -> b
$ IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
                    case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
                        Just YieldRateInfo
_ -> Limit -> m Limit
forall (m :: * -> *) a. Monad m => a -> m a
return Limit
workerLimit
                        Maybe YieldRateInfo
Nothing ->
                            Limit -> m Limit
forall (m :: * -> *) a. Monad m => a -> m a
return (Limit -> m Limit) -> Limit -> m Limit
forall a b. (a -> b) -> a -> b
$
                                case Limit
workerLimit of
                                    Limit
Unlimited -> Word -> Limit
Limited (Count -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
n)
                                    Limited Word
lim -> Word -> Limit
Limited (Word -> Limit) -> Word -> Limit
forall a b. (a -> b) -> a -> b
$ Word -> Word -> Word
forall a. Ord a => a -> a -> a
min Word
lim (Count -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
n)

            -- XXX for ahead streams shall we take the heap yields into account
            -- for controlling the dispatch? We should not dispatch if the heap
            -- has already got the limit covered.
            let dispatch :: m Bool
dispatch = Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
yieldCount SVar t m a
sv m () -> m Bool -> m Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
             in case Limit
limit of
                Limit
Unlimited -> m Bool
dispatch
                -- Note that the use of remainingWork and workerCount is not
                -- atomic and the counts may even have changed between reading
                -- and using them here, so this is just approximate logic and
                -- we cannot rely on it for correctness. We may actually
                -- dispatch more workers than required.
                Limited Word
lim | Word
lim Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
active -> m Bool
dispatch
                Limit
_ -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        else do
            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
            Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

-------------------------------------------------------------------------------
-- Dispatch workers with rate control
-------------------------------------------------------------------------------

-- | This is a magic number and it is overloaded, and used at several places to
-- achieve batching:
--
-- 1. If we have to sleep to slowdown this is the minimum period that we
--    accumulate before we sleep. Also, workers do not stop until this much
--    sleep time is accumulated.
-- 3. Collected latencies are computed and transferred to measured latency
--    after a minimum of this period.
minThreadDelay :: NanoSecond64
minThreadDelay :: NanoSecond64
minThreadDelay = NanoSecond64
1000000

-- | Another magic number! When we have to start more workers to cover up a
-- number of yields that we are lagging by then we cannot start one worker for
-- each yield because that may be a very big number and if the latency of the
-- workers is low these number of yields could be very high. We assume that we
-- run each extra worker for at least this much time.
rateRecoveryTime :: NanoSecond64
rateRecoveryTime :: NanoSecond64
rateRecoveryTime = NanoSecond64
1000000

-- We either block, or send one worker with limited yield count or one or more
-- workers with unlimited yield count.
data Work
    = BlockWait NanoSecond64
    | PartialWorker Count
    | ManyWorkers Int Count
    deriving Int -> Work -> ShowS
[Work] -> ShowS
Work -> String
(Int -> Work -> ShowS)
-> (Work -> String) -> ([Work] -> ShowS) -> Show Work
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Work] -> ShowS
$cshowList :: [Work] -> ShowS
show :: Work -> String
$cshow :: Work -> String
showsPrec :: Int -> Work -> ShowS
$cshowsPrec :: Int -> Work -> ShowS
Show

-- XXX we can use phantom types to distinguish the duration/latency/expectedLat
estimateWorkers
    :: Limit
    -> Count
    -> Count
    -> NanoSecond64
    -> NanoSecond64
    -> NanoSecond64
    -> LatencyRange
    -> Work
estimateWorkers :: Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
gainLossYields
                NanoSecond64
svarElapsed NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range =
    -- XXX we can have a maxEfficiency combinator as well which runs the
    -- producer at the maximal efficiency i.e. the number of workers are chosen
    -- such that the latency is minimum or within a range. Or we can call it
    -- maxWorkerLatency.
    --
    let
        -- How many workers do we need to achieve the required rate?
        --
        -- When the workers are IO bound we can increase the throughput by
        -- increasing the number of workers as long as the IO device has enough
        -- capacity to process all the requests concurrently. If the IO
        -- bandwidth is saturated increasing the workers won't help. Also, if
        -- the CPU utilization in processing all these requests exceeds the CPU
        -- bandwidth, then increasing the number of workers won't help.
        --
        -- When the workers are purely CPU bound, increasing the workers beyond
        -- the number of CPUs won't help.
        --
        -- TODO - measure the CPU and IO requirements of the workers. Have a
        -- way to specify the max bandwidth of the underlying IO mechanism and
        -- use that to determine the max rate of workers, and also take the CPU
        -- bandwidth into account. We can also discover the IO bandwidth if we
        -- know that we are not CPU bound, then how much steady state rate are
        -- we able to achieve. Design tests for CPU bound and IO bound cases.

        -- Calculate how many yields are we ahead or behind to match the exact
        -- required rate. Based on that we increase or decrease the effective
        -- workers.
        --
        -- When the worker latency is lower than required latency we begin with
        -- a yield and then wait rather than first waiting and then yielding.
        targetYields :: NanoSecond64
targetYields = (NanoSecond64
svarElapsed NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
targetLat NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
- NanoSecond64
1) NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
targetLat
        effectiveYields :: Count
effectiveYields = Count
svarYields Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
gainLossYields
        deltaYields :: Count
deltaYields = NanoSecond64 -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetYields Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
effectiveYields

        -- We recover the deficit by running at a higher/lower rate for a
        -- certain amount of time. To keep the effective rate in reasonable
        -- limits we use rateRecoveryTime, minLatency and maxLatency.
        in  if Count
deltaYields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
            then
                let deltaYieldsFreq :: Double
                    deltaYieldsFreq :: Double
deltaYieldsFreq =
                        Count -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/
                            NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
rateRecoveryTime
                    yieldsFreq :: Double
yieldsFreq = Double
1.0 Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetLat
                    totalYieldsFreq :: Double
totalYieldsFreq = Double
yieldsFreq Double -> Double -> Double
forall a. Num a => a -> a -> a
+ Double
deltaYieldsFreq
                    requiredLat :: NanoSecond64
requiredLat = Int64 -> NanoSecond64
NanoSecond64 (Int64 -> NanoSecond64) -> Int64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ Double -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int64) -> Double -> Int64
forall a b. (a -> b) -> a -> b
$ Double
1.0 Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
totalYieldsFreq
                    adjustedLat :: NanoSecond64
adjustedLat = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min (NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
max NanoSecond64
requiredLat (LatencyRange -> NanoSecond64
minLatency LatencyRange
range))
                                      (LatencyRange -> NanoSecond64
maxLatency LatencyRange
range)
                in  Bool -> Work -> Work
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
adjustedLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0) (Work -> Work) -> Work -> Work
forall a b. (a -> b) -> a -> b
$
                    if NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
<= NanoSecond64
adjustedLat
                    then Count -> Work
PartialWorker Count
deltaYields
                    else let workers :: NanoSecond64
workers = NanoSecond64 -> NanoSecond64
forall a. (Ord a, Num a) => a -> a
withLimit (NanoSecond64 -> NanoSecond64) -> NanoSecond64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
adjustedLat
                             limited :: NanoSecond64
limited = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min NanoSecond64
workers (Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields)
                         in Int -> Count -> Work
ManyWorkers (NanoSecond64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
limited) Count
deltaYields
            else
                let expectedDuration :: NanoSecond64
expectedDuration = Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
effectiveYields NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
* NanoSecond64
targetLat
                    sleepTime :: NanoSecond64
sleepTime = NanoSecond64
expectedDuration NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
- NanoSecond64
svarElapsed
                    maxSleepTime :: NanoSecond64
maxSleepTime = LatencyRange -> NanoSecond64
maxLatency LatencyRange
range NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
- NanoSecond64
wLatency
                    s :: NanoSecond64
s = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min NanoSecond64
sleepTime NanoSecond64
maxSleepTime
                in Bool -> Work -> Work
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
sleepTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) (Work -> Work) -> Work -> Work
forall a b. (a -> b) -> a -> b
$
                    -- if s is less than 0 it means our maxSleepTime is less
                    -- than the worker latency.
                    if NanoSecond64
s NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then NanoSecond64 -> Work
BlockWait NanoSecond64
s else Int -> Count -> Work
ManyWorkers Int
1 (Int64 -> Count
Count Int64
0)
    where
        withLimit :: a -> a
withLimit a
n =
            case Limit
workerLimit of
                Limit
Unlimited -> a
n
                Limited Word
x -> a -> a -> a
forall a. Ord a => a -> a -> a
min a
n (Word -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
x)

-- | Get the worker latency without resetting workerPendingLatency
-- Returns (total yield count, base time, measured latency)
-- CAUTION! keep it in sync with collectLatency
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
yinfo  = do
    let cur :: IORef (Count, Count, NanoSecond64)
cur      = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
        col :: IORef (Count, Count, NanoSecond64)
col      = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency YieldRateInfo
yinfo
        longTerm :: IORef (Count, AbsTime)
longTerm = YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
        measured :: IORef NanoSecond64
measured = YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency YieldRateInfo
yinfo

    (Count
curTotalCount, Count
curCount, NanoSecond64
curTime) <- IORef (Count, Count, NanoSecond64)
-> IO (Count, Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
cur
    (Count
colTotalCount, Count
colCount, NanoSecond64
colTime) <- IORef (Count, Count, NanoSecond64)
-> IO (Count, Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
col
    (Count
lcount, AbsTime
ltime)     <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef IORef (Count, AbsTime)
longTerm
    NanoSecond64
prevLat             <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef IORef NanoSecond64
measured

    let latCount :: Count
latCount = Count
colCount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
curCount
        latTime :: NanoSecond64
latTime  = NanoSecond64
colTime NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
curTime
        totalCount :: Count
totalCount = Count
colTotalCount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
curTotalCount
        newLat :: NanoSecond64
newLat =
            if Count
latCount Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0 Bool -> Bool -> Bool
&& NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
            then let lat :: NanoSecond64
lat = NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
latCount
                 -- XXX Give more weight to new?
                 in (NanoSecond64
lat NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
prevLat) NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
2
            else NanoSecond64
prevLat
    (Count, AbsTime, NanoSecond64) -> IO (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
lcount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
totalCount, AbsTime
ltime, NanoSecond64
newLat)

isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar t m a
sv YieldRateInfo
yinfo = do
    (Count
count, AbsTime
tstamp, NanoSecond64
wLatency) <- YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
yinfo
    AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
    let duration :: NanoSecond64
duration = RelTime64 -> NanoSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (RelTime64 -> NanoSecond64) -> RelTime64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
tstamp
    let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo
    Count
gainLoss <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
    let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers (SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit SVar t m a
sv) Count
count Count
gainLoss NanoSecond64
duration
                               NanoSecond64
wLatency NanoSecond64
targetLat (YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
yinfo)
    Int
cnt <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
    Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ case Work
work of
        -- XXX set the worker's maxYields or polling interval based on yields
        PartialWorker Count
_yields -> Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1
        ManyWorkers Int
n Count
_ -> Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n
        BlockWait NanoSecond64
_ -> Bool
True

-- XXX in case of ahead style stream we need to take the heap size into account
-- because we return the workers on the basis of that which causes a condition
-- where we keep dispatching and they keep returning. So we must have exactly
-- the same logic for not dispatching and for returning.
--
-- Returns:
-- True: can dispatch more
-- False: full, no more dispatches
dispatchWorkerPaced :: MonadAsync m => SVar t m a -> m Bool
dispatchWorkerPaced :: SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv = do
    let yinfo :: YieldRateInfo
yinfo = Maybe YieldRateInfo -> YieldRateInfo
forall a. (?callStack::CallStack) => Maybe a -> a
fromJust (Maybe YieldRateInfo -> YieldRateInfo)
-> Maybe YieldRateInfo -> YieldRateInfo
forall a b. (a -> b) -> a -> b
$ SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv
    (Count
svarYields, NanoSecond64
svarElapsed, NanoSecond64
wLatency) <- do
        AbsTime
now <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
        (Count
yieldCount, AbsTime
baseTime, NanoSecond64
lat) <-
            IO (Count, AbsTime, NanoSecond64)
-> m (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Count, AbsTime, NanoSecond64)
 -> m (Count, AbsTime, NanoSecond64))
-> IO (Count, AbsTime, NanoSecond64)
-> m (Count, AbsTime, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
False
        let elapsed :: NanoSecond64
elapsed = RelTime64 -> NanoSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (RelTime64 -> NanoSecond64) -> RelTime64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
baseTime
        let latency :: NanoSecond64
latency =
                if NanoSecond64
lat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0
                then NanoSecond64 -> Maybe NanoSecond64 -> NanoSecond64
forall a. a -> Maybe a -> a
fromMaybe NanoSecond64
lat (YieldRateInfo -> Maybe NanoSecond64
workerBootstrapLatency YieldRateInfo
yinfo)
                else NanoSecond64
lat

        (Count, NanoSecond64, NanoSecond64)
-> m (Count, NanoSecond64, NanoSecond64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
yieldCount, NanoSecond64
elapsed, NanoSecond64
latency)

    if NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0
    -- Need to measure the latency with a single worker before we can perform
    -- any computation.
    then Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    else do
        let workerLimit :: Limit
workerLimit = SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit SVar t m a
sv
        let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo
        let range :: LatencyRange
range = YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
yinfo
        Count
gainLoss <- IO Count -> m Count
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Count -> m Count) -> IO Count -> m Count
forall a b. (a -> b) -> a -> b
$ IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
        let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
gainLoss NanoSecond64
svarElapsed
                                   NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range

        -- XXX we need to take yieldLimit into account here. If we are at the
        -- end of the limit as well as the time, we should not be sleeping.
        -- If we are not actually planning to dispatch any more workers we need
        -- to take that in account.
        case Work
work of
            BlockWait NanoSecond64
s -> do
                Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
s NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                -- XXX note that when we return from here we will block waiting
                -- for the result from the existing worker. If that takes too
                -- long we won't be able to send another worker until the
                -- result arrives.
                --
                -- Sleep only if there are no active workers, otherwise we will
                -- defer the output of those. Note we cannot use workerCount
                -- here as it is not a reliable way to ensure there are
                -- definitely no active workers. When workerCount is 0 we may
                -- still have a Stop event waiting in the outputQueue.
                Bool
done <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
                Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ do
                    let us :: MicroSecond64
us = RelTime64 -> MicroSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (NanoSecond64 -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 NanoSecond64
s) :: MicroSecond64
                    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (MicroSecond64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral MicroSecond64
us)
                    Count -> SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
1 SVar t m a
sv
                Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            PartialWorker Count
yields -> do
                Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                YieldRateInfo -> Count -> m ()
forall (f :: * -> *). MonadIO f => YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields

                Bool
done <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
                Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
yields SVar t m a
sv
                Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
            ManyWorkers Int
netWorkers Count
yields -> do
                Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
netWorkers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
0) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                YieldRateInfo -> Count -> m ()
forall (f :: * -> *). MonadIO f => YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields

                let periodRef :: IORef Count
periodRef = YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo
                    ycnt :: Count
ycnt = Count -> Count -> Count
forall a. Ord a => a -> a -> a
max Count
1 (Count -> Count) -> Count -> Count
forall a b. (a -> b) -> a -> b
$ Count
yields Count -> Count -> Count
forall a. Integral a => a -> a -> a
`div` Int -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
netWorkers
                    period :: Count
period = Count -> Count -> Count
forall a. Ord a => a -> a -> a
min Count
ycnt (Word -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)

                Count
old <- IO Count -> m Count
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Count -> m Count) -> IO Count -> m Count
forall a b. (a -> b) -> a -> b
$ IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
periodRef
                Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
period Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
< Count
old) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
                    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Count -> Count -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Count
periodRef Count
period

                Int
cnt <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
                if Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
netWorkers
                then do
                    let total :: Int
total = Int
netWorkers Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
cnt
                        batch :: Int
batch = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ NanoSecond64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (NanoSecond64 -> Int) -> NanoSecond64 -> Int
forall a b. (a -> b) -> a -> b
$
                                    NanoSecond64
minThreadDelay NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
targetLat
                    -- XXX stagger the workers over a period?
                    -- XXX cannot sleep, as that would mean we cannot process
                    -- the outputs. need to try a different mechanism to
                    -- stagger.
                    -- when (total > batch) $
                       -- liftIO $ threadDelay $ nanoToMicroSecs minThreadDelay
                    Int -> m Bool
forall t. (Eq t, Num t) => t -> m Bool
dispatchN (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
total Int
batch)
                else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

    where

    updateGainedLostYields :: YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields = do
        let buf :: Count
buf = Int -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Count) -> Int -> Count
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> Int
svarRateBuffer YieldRateInfo
yinfo
        Bool -> f () -> f ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
yields Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& Count -> Count
forall a. Num a => a -> a
abs Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
buf) (f () -> f ()) -> f () -> f ()
forall a b. (a -> b) -> a -> b
$ do
            let delta :: Count
delta =
                   if Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
                   then Count
yields Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
buf
                   else Count
yields Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
buf
            IO () -> f ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> f ()) -> IO () -> f ()
forall a b. (a -> b) -> a -> b
$ IORef Count -> (Count -> Count) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo) (Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
delta)

    dispatchN :: t -> m Bool
dispatchN t
n =
        if t
n t -> t -> Bool
forall a. Eq a => a -> a -> Bool
== t
0
        then Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        else do
            Bool
r <- Count -> SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
0 SVar t m a
sv
            if Bool
r
            then t -> m Bool
dispatchN (t
n t -> t -> t
forall a. Num a => a -> a -> a
- t
1)
            else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

-------------------------------------------------------------------------------
-- Worker dispatch and wait loop
-------------------------------------------------------------------------------

sendWorkerDelayPaced :: SVar t m a -> IO ()
sendWorkerDelayPaced :: SVar t m a -> IO ()
sendWorkerDelayPaced SVar t m a
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

sendWorkerDelay :: SVar t m a -> IO ()
sendWorkerDelay :: SVar t m a -> IO ()
sendWorkerDelay SVar t m a
_sv =
    -- XXX we need a better way to handle this than hardcoded delays. The
    -- delays may be different for different systems.
    -- If there is a usecase where this is required we can create a combinator
    -- to set it as a config in the state.
    {-
  do
    ncpu <- getNumCapabilities
    if ncpu <= 1
    then
        if (svarStyle sv == AheadVar)
        then threadDelay 100
        else threadDelay 25
    else
        if (svarStyle sv == AheadVar)
        then threadDelay 100
        else threadDelay 10
    -}
    () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

{-# NOINLINE sendWorkerWait #-}
sendWorkerWait
    :: MonadAsync m
    => (SVar t m a -> IO ())
    -> (SVar t m a -> m Bool)
    -> SVar t m a
    -> m ()
sendWorkerWait :: (SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
delay SVar t m a -> m Bool
dispatch SVar t m a
sv = do
    -- Note that we are guaranteed to have at least one outstanding worker when
    -- we enter this function. So if we sleep we are guaranteed to be woken up
    -- by an outputDoorBell, when the worker exits.

    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO ()
delay SVar t m a
sv
    ([ChildEvent a]
_, Int
n) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv)
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        -- The queue may be empty temporarily if the worker has dequeued the
        -- work item but has not enqueued the remaining part yet. For the same
        -- reason, a worker may come back if it tries to dequeue and finds the
        -- queue empty, even though the whole work has not finished yet.

        -- If we find that the queue is empty, but it may be empty
        -- temporarily, when we checked it. If that's the case we might
        -- sleep indefinitely unless the active workers produce some
        -- output. We may deadlock specially if the otuput from the active
        -- workers depends on the future workers that we may never send.
        -- So in case the queue was temporarily empty set a flag to inform
        -- the enqueue to send us a doorbell.

        -- Note that this is just a best effort mechanism to avoid a
        -- deadlock. Deadlocks may still happen if for some weird reason
        -- the consuming computation shares an MVar or some other resource
        -- with the producing computation and gets blocked on that resource
        -- and therefore cannot do any pushworker to add more threads to
        -- the producer. In such cases the programmer should use a parallel
        -- style so that all the producers are scheduled immediately and
        -- unconditionally. We can also use a separate monitor thread to
        -- push workers instead of pushing them from the consumer, but then
        -- we are no longer using pull based concurrency rate adaptation.
        --
        -- XXX update this in the tutorial.
        --
        -- Having pending active workers does not mean that we are guaranteed
        -- to be woken up if we sleep. In case of Ahead streams, there may be
        -- queued items in the heap even though the outputQueue is empty, and
        -- we may have active workers which are deadlocked on those items to be
        -- processed by the consumer. We should either guarantee that any
        -- worker, before returning, clears the heap or we send a worker to
        -- clear it. Normally we always send a worker if no output is seen, but
        -- if the thread limit is reached or we are using pacing then we may
        -- not send a worker. See the concurrentApplication test in the tests,
        -- that test case requires at least one yield from the producer to not
        -- deadlock, if the last workers output is stuck in the heap then this
        -- test fails.  This problem can be extended to n threads when the
        -- consumer may depend on the evaluation of next n items in the
        -- producer stream.

        -- register for the outputDoorBell before we check the queue so that if
        -- we sleep because the queue was empty we are guaranteed to get a
        -- doorbell on the next enqueue.

        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Bool -> (Bool -> Bool) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv) ((Bool -> Bool) -> IO ()) -> (Bool -> Bool) -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> Bool -> Bool
forall a b. a -> b -> a
const Bool
True
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
storeLoadBarrier
        Bool
canDoMore <- SVar t m a -> m Bool
dispatch SVar t m a
sv

        -- XXX test for the case when we miss sending a worker when the worker
        -- count is more than 1500.
        --
        -- XXX Assert here that if the heap is not empty then there is at
        -- least one outstanding worker. Otherwise we could be sleeping
        -- forever.

        if Bool
canDoMore
        then (SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
delay SVar t m a -> m Bool
dispatch SVar t m a
sv
        else do
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> String -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar t m a
sv String
"sendWorkerWait: nothing to do"
                             (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)
            ([ChildEvent a]
_, Int
len) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv)
            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ (SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
delay SVar t m a -> m Bool
dispatch SVar t m a
sv

-------------------------------------------------------------------------------
-- Reading from the workers' output queue/buffer
-------------------------------------------------------------------------------

{-# INLINE readOutputQBasic #-}
readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic IORef ([ChildEvent a], Int)
q = IORef ([ChildEvent a], Int)
-> (([ChildEvent a], Int)
    -> (([ChildEvent a], Int), ([ChildEvent a], Int)))
-> IO ([ChildEvent a], Int)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([ChildEvent a], Int)
q ((([ChildEvent a], Int)
  -> (([ChildEvent a], Int), ([ChildEvent a], Int)))
 -> IO ([ChildEvent a], Int))
-> (([ChildEvent a], Int)
    -> (([ChildEvent a], Int), ([ChildEvent a], Int)))
-> IO ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ \([ChildEvent a], Int)
x -> (([],Int
0), ([ChildEvent a], Int)
x)

{-# INLINE readOutputQRaw #-}
readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv = do
    ([ChildEvent a]
list, Int
len) <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        let ref :: IORef Int
ref = SVarStats -> IORef Int
maxOutQSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv
        Int
oqLen <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
ref
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
oqLen) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int
ref Int
len
    ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a]
list, Int
len)

readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQBounded :: SVar t m a -> m [ChildEvent a]
readOutputQBounded SVar t m a
sv = do
    ([ChildEvent a]
list, Int
len) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
    -- When there is no output seen we dispatch more workers to help
    -- out if there is work pending in the work queue.
    if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
    then m [ChildEvent a]
blockingRead
    else do
        -- send a worker proactively, if needed, even before we start
        -- processing the output.  This may degrade single processor
        -- perf but improves multi-processor, because of more
        -- parallelism
        m ()
sendOneWorker
        [ChildEvent a] -> m [ChildEvent a]
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
list

    where

    sendOneWorker :: m ()
sendOneWorker = do
        Int
cnt <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            Bool
done <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv)

    {-# INLINE blockingRead #-}
    blockingRead :: m [ChildEvent a]
blockingRead = do
        (SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
sendWorkerDelay (Count -> SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
0) SVar t m a
sv
        IO [ChildEvent a] -> m [ChildEvent a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` SVar t m a -> IO ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv)

readOutputQPaced :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQPaced :: SVar t m a -> m [ChildEvent a]
readOutputQPaced SVar t m a
sv = do
    ([ChildEvent a]
list, Int
len) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
    if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
    then m [ChildEvent a]
blockingRead
    else do
        -- XXX send a worker proactively, if needed, even before we start
        -- processing the output.
        m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv
        [ChildEvent a] -> m [ChildEvent a]
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
list

    where

    {-# INLINE blockingRead #-}
    blockingRead :: m [ChildEvent a]
blockingRead = do
        (SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
sendWorkerDelayPaced SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv
        IO [ChildEvent a] -> m [ChildEvent a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` SVar t m a -> IO ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv)

postProcessBounded :: MonadAsync m => SVar t m a -> m Bool
postProcessBounded :: SVar t m a -> m Bool
postProcessBounded SVar t m a
sv = do
    Bool
workersDone <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
    -- There may still be work pending even if there are no workers pending
    -- because all the workers may return if the outputQueue becomes full. In
    -- that case send off a worker to kickstart the work again.
    --
    -- Note that isWorkDone can only be safely checked if all workers are done.
    -- When some workers are in progress they may have decremented the yield
    -- Limit and later ending up incrementing it again. If we look at the yield
    -- limit in that window we may falsely say that it is 0 and therefore we
    -- are done.
    if Bool
workersDone
    then do
        Bool
r <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
        -- Note that we need to guarantee a worker, therefore we cannot just
        -- use dispatchWorker which may or may not send a worker.
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) (Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv)
        -- XXX do we need to dispatch many here?
        -- void $ dispatchWorker sv
        Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
r
    else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

postProcessPaced :: MonadAsync m => SVar t m a -> m Bool
postProcessPaced :: SVar t m a -> m Bool
postProcessPaced SVar t m a
sv = do
    Bool
workersDone <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
    -- XXX If during consumption we figure out we are getting delayed then we
    -- should trigger dispatch there as well.  We should try to check on the
    -- workers after consuming every n item from the buffer?
    if Bool
workersDone
    then do
        Bool
r <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv
            -- Note that we need to guarantee a worker since the work is not
            -- finished, therefore we cannot just rely on dispatchWorkerPaced
            -- which may or may not send a worker.
            Bool
noWorker <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
noWorker (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
        Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
r
    else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

-------------------------------------------------------------------------------
-- Creating an SVar
-------------------------------------------------------------------------------

getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st = do
    -- convert rate in Hertz to latency in Nanoseconds
    let rateToLatency :: a -> p
rateToLatency a
r = if a
r a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
0 then p
forall a. Bounded a => a
maxBound else a -> p
forall a b. (RealFrac a, Integral b) => a -> b
round (a -> p) -> a -> p
forall a b. (a -> b) -> a -> b
$ a
1.0e9 a -> a -> a
forall a. Fractional a => a -> a -> a
/ a
r
    case State t m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State t m a
st of
        Just (Rate Double
low Double
goal Double
high Int
buf) ->
            let l :: NanoSecond64
l    = Double -> NanoSecond64
forall p a. (Bounded p, RealFrac a, Integral p) => a -> p
rateToLatency Double
goal
                minl :: NanoSecond64
minl = Double -> NanoSecond64
forall p a. (Bounded p, RealFrac a, Integral p) => a -> p
rateToLatency Double
high
                maxl :: NanoSecond64
maxl = Double -> NanoSecond64
forall p a. (Bounded p, RealFrac a, Integral p) => a -> p
rateToLatency Double
low
            in NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
l (NanoSecond64 -> NanoSecond64 -> LatencyRange
LatencyRange NanoSecond64
minl NanoSecond64
maxl) Int
buf
        Maybe Rate
Nothing -> Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe YieldRateInfo
forall a. Maybe a
Nothing

    where

    mkYieldRateInfo :: NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
latency LatencyRange
latRange Int
buf = do
        IORef NanoSecond64
measured <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef NanoSecond64
0
        IORef (Count, Count, NanoSecond64)
wcur     <- (Count, Count, NanoSecond64)
-> IO (IORef (Count, Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
        IORef (Count, Count, NanoSecond64)
wcol     <- (Count, Count, NanoSecond64)
-> IO (IORef (Count, Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
        AbsTime
now      <- Clock -> IO AbsTime
getTime Clock
Monotonic
        IORef (Count, AbsTime)
wlong    <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0,AbsTime
now)
        IORef Count
period   <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
1
        IORef Count
gainLoss <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef (Int64 -> Count
Count Int64
0)

        Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe YieldRateInfo -> IO (Maybe YieldRateInfo))
-> Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> Maybe YieldRateInfo
forall a. a -> Maybe a
Just YieldRateInfo :: NanoSecond64
-> LatencyRange
-> Int
-> IORef Count
-> IORef (Count, AbsTime)
-> Maybe NanoSecond64
-> IORef Count
-> IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IORef NanoSecond64
-> YieldRateInfo
YieldRateInfo
            { svarLatencyTarget :: NanoSecond64
svarLatencyTarget      = NanoSecond64
latency
            , svarLatencyRange :: LatencyRange
svarLatencyRange       = LatencyRange
latRange
            , svarRateBuffer :: Int
svarRateBuffer         = Int
buf
            , svarGainedLostYields :: IORef Count
svarGainedLostYields   = IORef Count
gainLoss
            , workerBootstrapLatency :: Maybe NanoSecond64
workerBootstrapLatency = State t m a -> Maybe NanoSecond64
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
getStreamLatency State t m a
st
            , workerPollingInterval :: IORef Count
workerPollingInterval  = IORef Count
period
            , workerMeasuredLatency :: IORef NanoSecond64
workerMeasuredLatency  = IORef NanoSecond64
measured
            , workerPendingLatency :: IORef (Count, Count, NanoSecond64)
workerPendingLatency   = IORef (Count, Count, NanoSecond64)
wcur
            , workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
workerCollectedLatency = IORef (Count, Count, NanoSecond64)
wcol
            , svarAllTimeLatency :: IORef (Count, AbsTime)
svarAllTimeLatency     = IORef (Count, AbsTime)
wlong
            }

newSVarStats :: IO SVarStats
newSVarStats :: IO SVarStats
newSVarStats = do
    IORef Int
disp   <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxWrk <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxOq  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxHs  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxWq  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef (Count, NanoSecond64)
avgLat <- (Count, NanoSecond64) -> IO (IORef (Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0, Int64 -> NanoSecond64
NanoSecond64 Int64
0)
    IORef NanoSecond64
maxLat <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
    IORef NanoSecond64
minLat <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
    IORef (Maybe AbsTime)
stpTime <- Maybe AbsTime -> IO (IORef (Maybe AbsTime))
forall a. a -> IO (IORef a)
newIORef Maybe AbsTime
forall a. Maybe a
Nothing

    SVarStats -> IO SVarStats
forall (m :: * -> *) a. Monad m => a -> m a
return SVarStats :: IORef Int
-> IORef Int
-> IORef Int
-> IORef Int
-> IORef Int
-> IORef (Count, NanoSecond64)
-> IORef NanoSecond64
-> IORef NanoSecond64
-> IORef (Maybe AbsTime)
-> SVarStats
SVarStats
        { totalDispatches :: IORef Int
totalDispatches  = IORef Int
disp
        , maxWorkers :: IORef Int
maxWorkers       = IORef Int
maxWrk
        , maxOutQSize :: IORef Int
maxOutQSize      = IORef Int
maxOq
        , maxHeapSize :: IORef Int
maxHeapSize      = IORef Int
maxHs
        , maxWorkQSize :: IORef Int
maxWorkQSize     = IORef Int
maxWq
        , avgWorkerLatency :: IORef (Count, NanoSecond64)
avgWorkerLatency = IORef (Count, NanoSecond64)
avgLat
        , minWorkerLatency :: IORef NanoSecond64
minWorkerLatency = IORef NanoSecond64
minLat
        , maxWorkerLatency :: IORef NanoSecond64
maxWorkerLatency = IORef NanoSecond64
maxLat
        , svarStopTime :: IORef (Maybe AbsTime)
svarStopTime     = IORef (Maybe AbsTime)
stpTime
        }

-- XXX remove polymorphism in t, inline f
getAheadSVar :: MonadAsync m
    => State t m a
    -> (   IORef ([t m a], Int)
        -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
        -> State t m a
        -> SVar t m a
        -> Maybe WorkerInfo
        -> m ())
    -> RunInIO m
    -> IO (SVar t m a)
getAheadSVar :: State t m a
-> (IORef ([t m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> State t m a
    -> SVar t m a
    -> Maybe WorkerInfo
    -> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar State t m a
st IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
f RunInIO m
mrun = do
    IORef ([ChildEvent a], Int)
outQ    <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    -- the second component of the tuple is "Nothing" when heap is being
    -- cleared, "Just n" when we are expecting sequence number n to arrive
    -- before we can start clearing the heap.
    IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH    <- (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
forall a. a -> IO (IORef a)
newIORef (Heap (Entry Int (AheadHeapEntry t m a))
forall a. Heap a
H.empty, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
0)
    MVar ()
outQMv  <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    IORef Int
active  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Bool
wfw     <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
    IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
S.empty
    -- Sequence number is incremented whenever something is queued, therefore,
    -- first sequence number would be 0
    IORef ([t m a], Int)
q <- ([t m a], Int) -> IO (IORef ([t m a], Int))
forall a. a -> IO (IORef a)
newIORef ([], -Int
1)
    MVar ()
stopMVar <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
    Maybe (IORef Count)
yl <- case State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
            Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
            Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
    Maybe YieldRateInfo
rateInfo <- State t m a -> IO (Maybe YieldRateInfo)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st

    SVarStats
stats <- IO SVarStats
newSVarStats
    ThreadId
tid <- IO ThreadId
myThreadId

    let getSVar :: SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv SVar t m a -> m [ChildEvent a]
readOutput SVar t m a -> m Bool
postProc = SVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVarStyle
-> RunInIO m
-> SVarStopStyle
-> IORef ThreadId
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> m [ChildEvent a]
-> m Bool
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Limit
-> Limit
-> IORef Count
-> PushBufferPolicy
-> MVar ()
-> Maybe (IORef Count)
-> Maybe YieldRateInfo
-> ((RunInIO m, t m a) -> IO ())
-> IO Bool
-> IO Bool
-> IORef Bool
-> (Maybe WorkerInfo -> m ())
-> IORef (Set ThreadId)
-> IORef Int
-> (ThreadId -> m ())
-> MVar ()
-> SVarStats
-> Maybe (IORef ())
-> Bool
-> ThreadId
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IORef ([t m a], Int)
-> SVar t m a
SVar
            { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue      = IORef ([ChildEvent a], Int)
outQ
            , outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
forall a. (?callStack::CallStack) => a
undefined
            , remainingWork :: Maybe (IORef Count)
remainingWork  = Maybe (IORef Count)
yl
            , maxBufferLimit :: Limit
maxBufferLimit   = State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st
            , pushBufferSpace :: IORef Count
pushBufferSpace = IORef Count
forall a. (?callStack::CallStack) => a
undefined
            , pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
forall a. (?callStack::CallStack) => a
undefined
            , pushBufferMVar :: MVar ()
pushBufferMVar   = MVar ()
forall a. (?callStack::CallStack) => a
undefined
            , maxWorkerLimit :: Limit
maxWorkerLimit   = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State t m a
st) (State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st)
            , yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo    = Maybe YieldRateInfo
rateInfo
            , outputDoorBell :: MVar ()
outputDoorBell   = MVar ()
outQMv
            , outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
forall a. (?callStack::CallStack) => a
undefined
            , readOutputQ :: m [ChildEvent a]
readOutputQ      = SVar t m a -> m [ChildEvent a]
readOutput SVar t m a
sv
            , postProcess :: m Bool
postProcess      = SVar t m a -> m Bool
postProc SVar t m a
sv
            , workerThreads :: IORef (Set ThreadId)
workerThreads    = IORef (Set ThreadId)
running
            , workLoop :: Maybe WorkerInfo -> m ()
workLoop         = IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
f IORef ([t m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH State t m a
st{streamVar :: Maybe (SVar t m a)
streamVar = SVar t m a -> Maybe (SVar t m a)
forall a. a -> Maybe a
Just SVar t m a
sv} SVar t m a
sv
            , enqueue :: (RunInIO m, t m a) -> IO ()
enqueue          = SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead SVar t m a
sv IORef ([t m a], Int)
q
            , isWorkDone :: IO Bool
isWorkDone       = SVar t m a
-> IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO Bool
forall (t :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) a a b
       a b.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead SVar t m a
sv IORef ([t m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH
            , isQueueDone :: IO Bool
isQueueDone      = SVar t m a -> IORef ([t m a], Int) -> IO Bool
forall (t :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) a a b.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef ([t m a], Int)
q
            , needDoorBell :: IORef Bool
needDoorBell     = IORef Bool
wfw
            , svarStyle :: SVarStyle
svarStyle        = SVarStyle
AheadVar
            , svarStopStyle :: SVarStopStyle
svarStopStyle    = SVarStopStyle
StopNone
            , svarStopBy :: IORef ThreadId
svarStopBy       = IORef ThreadId
forall a. (?callStack::CallStack) => a
undefined
            , svarMrun :: RunInIO m
svarMrun         = RunInIO m
mrun
            , workerCount :: IORef Int
workerCount      = IORef Int
active
            , accountThread :: ThreadId -> m ()
accountThread    = SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar t m a
sv
            , workerStopMVar :: MVar ()
workerStopMVar   = MVar ()
stopMVar
            , svarRef :: Maybe (IORef ())
svarRef          = Maybe (IORef ())
forall a. Maybe a
Nothing
            , svarInspectMode :: Bool
svarInspectMode  = State t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State t m a
st
            , svarCreator :: ThreadId
svarCreator      = ThreadId
tid
            , aheadWorkQueue :: IORef ([t m a], Int)
aheadWorkQueue   = IORef ([t m a], Int)
q
            , outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap       = IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH
            , svarStats :: SVarStats
svarStats        = SVarStats
stats
            }

    let sv :: SVar t m a
sv =
            case State t m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State t m a
st of
                Maybe Rate
Nothing -> SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv SVar t m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
                Just Rate
_  -> SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv SVar t m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
     in SVar t m a -> IO (SVar t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
sv

    where

    {-# INLINE isQueueDoneAhead #-}
    isQueueDoneAhead :: SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef (t a, b)
q = do
        Bool
queueDone <- IORef (t a, b) -> IO Bool
forall (t :: * -> *) a b. Foldable t => IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q
        Bool
yieldsDone <-
                case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
                    Just IORef Count
yref -> do
                        Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
yref
                        Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
                    Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        -- XXX note that yieldsDone can only be authoritative only when there
        -- are no workers running. If there are active workers they can
        -- later increment the yield count and therefore change the result.
        Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
yieldsDone Bool -> Bool -> Bool
|| Bool
queueDone

    {-# INLINE isWorkDoneAhead #-}
    isWorkDoneAhead :: SVar t m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead SVar t m a
sv IORef (t a, b)
q IORef (Heap a, b)
ref = do
        Bool
heapDone <- do
                (Heap a
hp, b
_) <- IORef (Heap a, b) -> IO (Heap a, b)
forall a. IORef a -> IO a
readIORef IORef (Heap a, b)
ref
                Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap a -> Int
forall a. Heap a -> Int
H.size Heap a
hp Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0)
        Bool
queueDone <- SVar t m a -> IORef (t a, b) -> IO Bool
forall (t :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) a a b.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef (t a, b)
q
        Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
heapDone Bool -> Bool -> Bool
&& Bool
queueDone

    checkEmpty :: IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q = do
        (t a
xs, b
_) <- IORef (t a, b) -> IO (t a, b)
forall a. IORef a -> IO a
readIORef IORef (t a, b)
q
        Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ t a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null t a
xs

getParallelSVar :: MonadIO m
    => SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar :: SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar SVarStopStyle
ss State t m a
st RunInIO m
mrun = do
    IORef ([ChildEvent a], Int)
outQ    <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    IORef ([ChildEvent a], Int)
outQRev <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMv  <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    MVar ()
outQMvRev <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    IORef Int
active  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
S.empty
    Maybe (IORef Count)
yl <- case State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
            Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
            Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
    Maybe YieldRateInfo
rateInfo <- State t m a -> IO (Maybe YieldRateInfo)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st
    let bufLim :: Count
bufLim =
            case State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st of
                Limit
Unlimited -> Count
forall a. (?callStack::CallStack) => a
undefined
                Limited Word
x -> Word -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
x
    IORef Count
remBuf <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
bufLim
    MVar ()
pbMVar <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()

    SVarStats
stats <- IO SVarStats
newSVarStats
    ThreadId
tid <- IO ThreadId
myThreadId

    IORef ThreadId
stopBy <-
        case SVarStopStyle
ss of
            SVarStopStyle
StopBy -> IO (IORef ThreadId) -> IO (IORef ThreadId)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ThreadId) -> IO (IORef ThreadId))
-> IO (IORef ThreadId) -> IO (IORef ThreadId)
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO (IORef ThreadId)
forall a. a -> IO (IORef a)
newIORef ThreadId
forall a. (?callStack::CallStack) => a
undefined
            SVarStopStyle
_ -> IORef ThreadId -> IO (IORef ThreadId)
forall (m :: * -> *) a. Monad m => a -> m a
return IORef ThreadId
forall a. (?callStack::CallStack) => a
undefined

    let sv :: SVar t m a
sv =
            SVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVarStyle
-> RunInIO m
-> SVarStopStyle
-> IORef ThreadId
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> m [ChildEvent a]
-> m Bool
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Limit
-> Limit
-> IORef Count
-> PushBufferPolicy
-> MVar ()
-> Maybe (IORef Count)
-> Maybe YieldRateInfo
-> ((RunInIO m, t m a) -> IO ())
-> IO Bool
-> IO Bool
-> IORef Bool
-> (Maybe WorkerInfo -> m ())
-> IORef (Set ThreadId)
-> IORef Int
-> (ThreadId -> m ())
-> MVar ()
-> SVarStats
-> Maybe (IORef ())
-> Bool
-> ThreadId
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IORef ([t m a], Int)
-> SVar t m a
SVar { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue      = IORef ([ChildEvent a], Int)
outQ
                 , outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
outQRev
                 , remainingWork :: Maybe (IORef Count)
remainingWork    = Maybe (IORef Count)
yl
                 , maxBufferLimit :: Limit
maxBufferLimit   = State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st
                 , pushBufferSpace :: IORef Count
pushBufferSpace  = IORef Count
remBuf
                 , pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
PushBufferBlock
                 , pushBufferMVar :: MVar ()
pushBufferMVar   = MVar ()
pbMVar
                 , maxWorkerLimit :: Limit
maxWorkerLimit   = Limit
Unlimited
                 -- Used only for diagnostics
                 , yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo    = Maybe YieldRateInfo
rateInfo
                 , outputDoorBell :: MVar ()
outputDoorBell   = MVar ()
outQMv
                 , outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
outQMvRev
                 , readOutputQ :: m [ChildEvent a]
readOutputQ      = SVar t m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) a.
MonadIO m =>
SVar t m a -> m [ChildEvent a]
readOutputQPar SVar t m a
sv
                 , postProcess :: m Bool
postProcess      = SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
                 , workerThreads :: IORef (Set ThreadId)
workerThreads    = IORef (Set ThreadId)
running
                 , workLoop :: Maybe WorkerInfo -> m ()
workLoop         = Maybe WorkerInfo -> m ()
forall a. (?callStack::CallStack) => a
undefined
                 , enqueue :: (RunInIO m, t m a) -> IO ()
enqueue          = (RunInIO m, t m a) -> IO ()
forall a. (?callStack::CallStack) => a
undefined
                 , isWorkDone :: IO Bool
isWorkDone       = IO Bool
forall a. (?callStack::CallStack) => a
undefined
                 , isQueueDone :: IO Bool
isQueueDone      = IO Bool
forall a. (?callStack::CallStack) => a
undefined
                 , needDoorBell :: IORef Bool
needDoorBell     = IORef Bool
forall a. (?callStack::CallStack) => a
undefined
                 , svarStyle :: SVarStyle
svarStyle        = SVarStyle
ParallelVar
                 , svarStopStyle :: SVarStopStyle
svarStopStyle    = SVarStopStyle
ss
                 , svarStopBy :: IORef ThreadId
svarStopBy       = IORef ThreadId
stopBy
                 , svarMrun :: RunInIO m
svarMrun         = RunInIO m
mrun
                 , workerCount :: IORef Int
workerCount      = IORef Int
active
                 , accountThread :: ThreadId -> m ()
accountThread    = SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
                 , workerStopMVar :: MVar ()
workerStopMVar   = MVar ()
forall a. (?callStack::CallStack) => a
undefined
                 , svarRef :: Maybe (IORef ())
svarRef          = Maybe (IORef ())
forall a. Maybe a
Nothing
                 , svarInspectMode :: Bool
svarInspectMode  = State t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State t m a
st
                 , svarCreator :: ThreadId
svarCreator      = ThreadId
tid
                 , aheadWorkQueue :: IORef ([t m a], Int)
aheadWorkQueue   = IORef ([t m a], Int)
forall a. (?callStack::CallStack) => a
undefined
                 , outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap       = IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. (?callStack::CallStack) => a
undefined
                 , svarStats :: SVarStats
svarStats        = SVarStats
stats
                 }
     in SVar t m a -> IO (SVar t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
forall (t :: (* -> *) -> * -> *). SVar t m a
sv

    where

    readOutputQPar :: SVar t m a -> m [ChildEvent a]
readOutputQPar SVar t m a
sv = IO [ChildEvent a] -> m [ChildEvent a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [ChildEvent a] -> m [ChildEvent a])
-> IO [ChildEvent a] -> m [ChildEvent a]
forall a b. (a -> b) -> a -> b
$ do
        SVar t m a -> String -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar t m a
sv String
"readOutputQPar: doorbell"
            (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)
        case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
            Maybe YieldRateInfo
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just YieldRateInfo
yinfo -> IO (Count, AbsTime, NanoSecond64) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Count, AbsTime, NanoSecond64) -> IO ())
-> IO (Count, AbsTime, NanoSecond64) -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
False
        [ChildEvent a]
r <- ([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` SVar t m a -> IO ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
        IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO (Maybe ())
forall a. MVar a -> IO (Maybe a)
tryTakeMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv)
            SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
resetBufferLimit SVar t m a
sv
            IO ()
writeBarrier
            IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
        [ChildEvent a] -> IO [ChildEvent a]
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
r

sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker :: SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar t m a
sv t m a
m = do
    -- Note: We must have all the work on the queue before sending the
    -- pushworker, otherwise the pushworker may exit before we even get a
    -- chance to push.
    RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> (RunInIO m, t m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar t m a
sv (RunInIO m
runIn, t m a
m)
    case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
        Maybe YieldRateInfo
Nothing -> Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
        Just YieldRateInfo
yinfo  ->
            if YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
forall a. Bounded a => a
maxBound
            then IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
forall a. Bounded a => a
maxBound
            else Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
1 SVar t m a
sv
    SVar t m a -> m (SVar t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
sv

{-# INLINABLE newAheadVar #-}
newAheadVar :: MonadAsync m
    => State t m a
    -> t m a
    -> (   IORef ([t m a], Int)
        -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
        -> State t m a
        -> SVar t m a
        -> Maybe WorkerInfo
        -> m ())
    -> m (SVar t m a)
newAheadVar :: State t m a
-> t m a
-> (IORef ([t m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> State t m a
    -> SVar t m a
    -> Maybe WorkerInfo
    -> m ())
-> m (SVar t m a)
newAheadVar State t m a
st t m a
m IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
wloop = do
    RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
    SVar t m a
sv <- IO (SVar t m a) -> m (SVar t m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SVar t m a) -> m (SVar t m a))
-> IO (SVar t m a) -> m (SVar t m a)
forall a b. (a -> b) -> a -> b
$ State t m a
-> (IORef ([t m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> State t m a
    -> SVar t m a
    -> Maybe WorkerInfo
    -> m ())
-> RunInIO m
-> IO (SVar t m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> (IORef ([t m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> State t m a
    -> SVar t m a
    -> Maybe WorkerInfo
    -> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar State t m a
st IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
wloop RunInIO m
mrun
    SVar t m a -> t m a -> m (SVar t m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar t m a
sv t m a
m

{-# INLINABLE newParallelVar #-}
newParallelVar :: MonadAsync m
    => SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar :: SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
ss State t m a
st = do
    RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
    IO (SVar t m a) -> m (SVar t m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SVar t m a) -> m (SVar t m a))
-> IO (SVar t m a) -> m (SVar t m a)
forall a b. (a -> b) -> a -> b
$ SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar SVarStopStyle
ss State t m a
st RunInIO m
mrun

-------------------------------------------------------------------------------
-- Write a stream to an SVar
-------------------------------------------------------------------------------

-- XXX this errors out for Parallel/Ahead SVars
-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then
-- be read back from the SVar using 'fromSVar'.
toStreamVar :: MonadAsync m => SVar t m a -> t m a -> m ()
toStreamVar :: SVar t m a -> t m a -> m ()
toStreamVar SVar t m a
sv t m a
m = do
    RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> (RunInIO m, t m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar t m a
sv (RunInIO m
runIn, t m a
m)
    Bool
done <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
    -- XXX This is safe only when called from the consumer thread or when no
    -- consumer is present.  There may be a race if we are not running in the
    -- consumer thread.
    -- XXX do this only if the work queue is not empty. The work may have been
    -- carried out by existing workers.
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
        case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
            Maybe YieldRateInfo
Nothing -> Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
            Just YieldRateInfo
_  -> Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
1 SVar t m a
sv