{-# LANGUAGE CPP #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE UnboxedTuples #-}
module Streamly.Internal.Data.SVar
(
MonadAsync
, SVarStyle (..)
, SVarStopStyle (..)
, SVar (..)
, Limit (..)
, State (streamVar)
, defState
, adaptState
, getMaxThreads
, setMaxThreads
, getMaxBuffer
, setMaxBuffer
, getStreamRate
, setStreamRate
, setStreamLatency
, getYieldLimit
, setYieldLimit
, getInspectMode
, setInspectMode
, recordMaxWorkers
, cleanupSVar
, cleanupSVarFromWorker
, newAheadVar
, newParallelVar
, captureMonadState
, RunInIO (..)
, WorkerInfo (..)
, YieldRateInfo (..)
, ThreadAbort (..)
, ChildEvent (..)
, AheadHeapEntry (..)
, send
, sendToProducer
, sendYield
, sendStop
, sendStopToProducer
, enqueueLIFO
, enqueueFIFO
, enqueueAhead
, reEnqueueAhead
, pushWorkerPar
, handleChildException
, handleFoldException
, queueEmptyAhead
, dequeueAhead
, HeapDequeueResult(..)
, dequeueFromHeap
, dequeueFromHeapSeq
, requeueOnHeapTop
, updateHeapSeq
, withIORef
, heapIsSane
, Rate (..)
, getYieldRateInfo
, newSVarStats
, collectLatency
, workerUpdateLatency
, isBeyondMaxRate
, workerRateControl
, updateYieldCount
, decrementYieldLimit
, incrementYieldLimit
, decrementBufferLimit
, incrementBufferLimit
, postProcessBounded
, postProcessPaced
, readOutputQBounded
, readOutputQPaced
, readOutputQBasic
, dispatchWorkerPaced
, sendFirstWorker
, delThread
, modifyThread
, doFork
, fork
, forkManaged
, toStreamVar
, SVarStats (..)
, dumpSVar
, printSVar
, withDiagMVar
)
where
import Control.Concurrent
(ThreadId, myThreadId, threadDelay, throwTo, forkIO, killThread)
import Control.Concurrent.MVar
(MVar, newEmptyMVar, tryPutMVar, takeMVar, tryTakeMVar, newMVar,
tryReadMVar)
import Control.Exception
(SomeException(..), catch, mask, assert, Exception, catches,
throwIO, Handler(..), BlockedIndefinitelyOnMVar(..),
BlockedIndefinitelyOnSTM(..))
import Control.Monad (when)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control
(MonadBaseControl, control, StM, liftBaseDiscard)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier,
storeLoadBarrier)
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, pushL)
import Data.Functor (void)
import Data.Heap (Heap, Entry(..))
import Data.Int (Int64)
#if __GLASGOW_HASKELL__ >= 800
import Data.Kind (Type)
#endif
import Data.IORef
(IORef, modifyIORef, newIORef, readIORef, writeIORef, atomicModifyIORef)
import Data.Maybe (fromJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup ((<>))
#endif
import Data.Set (Set)
import GHC.Conc (ThreadId(..))
import GHC.Exts
import GHC.IO (IO(..))
import System.IO (hPutStrLn, stderr)
import System.Mem.Weak (addFinalizer)
import Streamly.Internal.Data.Time.Clock (Clock(..), getTime)
import Streamly.Internal.Data.Time.Units
(AbsTime, NanoSecond64(..), MicroSecond64(..), diffAbsTime64,
fromRelTime64, toRelTime64, showNanoSecond64, showRelTime64)
import qualified Data.Heap as H
import qualified Data.Set as S
newtype Count = Count Int64
deriving ( Count -> Count -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Count -> Count -> Bool
$c/= :: Count -> Count -> Bool
== :: Count -> Count -> Bool
$c== :: Count -> Count -> Bool
Eq
, ReadPrec [Count]
ReadPrec Count
Int -> ReadS Count
ReadS [Count]
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [Count]
$creadListPrec :: ReadPrec [Count]
readPrec :: ReadPrec Count
$creadPrec :: ReadPrec Count
readList :: ReadS [Count]
$creadList :: ReadS [Count]
readsPrec :: Int -> ReadS Count
$creadsPrec :: Int -> ReadS Count
Read
, Int -> Count -> ShowS
[Count] -> ShowS
Count -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Count] -> ShowS
$cshowList :: [Count] -> ShowS
show :: Count -> String
$cshow :: Count -> String
showsPrec :: Int -> Count -> ShowS
$cshowsPrec :: Int -> Count -> ShowS
Show
, Int -> Count
Count -> Int
Count -> [Count]
Count -> Count
Count -> Count -> [Count]
Count -> Count -> Count -> [Count]
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: Count -> Count -> Count -> [Count]
$cenumFromThenTo :: Count -> Count -> Count -> [Count]
enumFromTo :: Count -> Count -> [Count]
$cenumFromTo :: Count -> Count -> [Count]
enumFromThen :: Count -> Count -> [Count]
$cenumFromThen :: Count -> Count -> [Count]
enumFrom :: Count -> [Count]
$cenumFrom :: Count -> [Count]
fromEnum :: Count -> Int
$cfromEnum :: Count -> Int
toEnum :: Int -> Count
$ctoEnum :: Int -> Count
pred :: Count -> Count
$cpred :: Count -> Count
succ :: Count -> Count
$csucc :: Count -> Count
Enum
, Count
forall a. a -> a -> Bounded a
maxBound :: Count
$cmaxBound :: Count
minBound :: Count
$cminBound :: Count
Bounded
, Integer -> Count
Count -> Count
Count -> Count -> Count
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> Count
$cfromInteger :: Integer -> Count
signum :: Count -> Count
$csignum :: Count -> Count
abs :: Count -> Count
$cabs :: Count -> Count
negate :: Count -> Count
$cnegate :: Count -> Count
* :: Count -> Count -> Count
$c* :: Count -> Count -> Count
- :: Count -> Count -> Count
$c- :: Count -> Count -> Count
+ :: Count -> Count -> Count
$c+ :: Count -> Count -> Count
Num
, Num Count
Ord Count
Count -> Rational
forall a. Num a -> Ord a -> (a -> Rational) -> Real a
toRational :: Count -> Rational
$ctoRational :: Count -> Rational
Real
, Enum Count
Real Count
Count -> Integer
Count -> Count -> (Count, Count)
Count -> Count -> Count
forall a.
Real a
-> Enum a
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> (a, a))
-> (a -> a -> (a, a))
-> (a -> Integer)
-> Integral a
toInteger :: Count -> Integer
$ctoInteger :: Count -> Integer
divMod :: Count -> Count -> (Count, Count)
$cdivMod :: Count -> Count -> (Count, Count)
quotRem :: Count -> Count -> (Count, Count)
$cquotRem :: Count -> Count -> (Count, Count)
mod :: Count -> Count -> Count
$cmod :: Count -> Count -> Count
div :: Count -> Count -> Count
$cdiv :: Count -> Count -> Count
rem :: Count -> Count -> Count
$crem :: Count -> Count -> Count
quot :: Count -> Count -> Count
$cquot :: Count -> Count -> Count
Integral
, Eq Count
Count -> Count -> Bool
Count -> Count -> Ordering
Count -> Count -> Count
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Count -> Count -> Count
$cmin :: Count -> Count -> Count
max :: Count -> Count -> Count
$cmax :: Count -> Count -> Count
>= :: Count -> Count -> Bool
$c>= :: Count -> Count -> Bool
> :: Count -> Count -> Bool
$c> :: Count -> Count -> Bool
<= :: Count -> Count -> Bool
$c<= :: Count -> Count -> Bool
< :: Count -> Count -> Bool
$c< :: Count -> Count -> Bool
compare :: Count -> Count -> Ordering
$ccompare :: Count -> Count -> Ordering
Ord
)
data ThreadAbort = ThreadAbort deriving Int -> ThreadAbort -> ShowS
[ThreadAbort] -> ShowS
ThreadAbort -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ThreadAbort] -> ShowS
$cshowList :: [ThreadAbort] -> ShowS
show :: ThreadAbort -> String
$cshow :: ThreadAbort -> String
showsPrec :: Int -> ThreadAbort -> ShowS
$cshowsPrec :: Int -> ThreadAbort -> ShowS
Show
instance Exception ThreadAbort
data ChildEvent a =
ChildYield a
| ChildStop ThreadId (Maybe SomeException)
#if __GLASGOW_HASKELL__ < 800
#define Type *
#endif
data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a =
AheadEntryNull
| AheadEntryPure a
| AheadEntryStream (t m a)
#undef Type
data SVarStyle =
AsyncVar
| WAsyncVar
| ParallelVar
| AheadVar
deriving (SVarStyle -> SVarStyle -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SVarStyle -> SVarStyle -> Bool
$c/= :: SVarStyle -> SVarStyle -> Bool
== :: SVarStyle -> SVarStyle -> Bool
$c== :: SVarStyle -> SVarStyle -> Bool
Eq, Int -> SVarStyle -> ShowS
[SVarStyle] -> ShowS
SVarStyle -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SVarStyle] -> ShowS
$cshowList :: [SVarStyle] -> ShowS
show :: SVarStyle -> String
$cshow :: SVarStyle -> String
showsPrec :: Int -> SVarStyle -> ShowS
$cshowsPrec :: Int -> SVarStyle -> ShowS
Show)
data WorkerInfo = WorkerInfo
{ WorkerInfo -> Count
workerYieldMax :: Count
, WorkerInfo -> IORef Count
workerYieldCount :: IORef Count
, WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart :: IORef (Count, AbsTime)
}
data Rate = Rate
{ Rate -> Double
rateLow :: Double
, Rate -> Double
rateGoal :: Double
, Rate -> Double
rateHigh :: Double
, Rate -> Int
rateBuffer :: Int
}
data LatencyRange = LatencyRange
{ LatencyRange -> NanoSecond64
minLatency :: NanoSecond64
, LatencyRange -> NanoSecond64
maxLatency :: NanoSecond64
} deriving Int -> LatencyRange -> ShowS
[LatencyRange] -> ShowS
LatencyRange -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [LatencyRange] -> ShowS
$cshowList :: [LatencyRange] -> ShowS
show :: LatencyRange -> String
$cshow :: LatencyRange -> String
showsPrec :: Int -> LatencyRange -> ShowS
$cshowsPrec :: Int -> LatencyRange -> ShowS
Show
data YieldRateInfo = YieldRateInfo
{ YieldRateInfo -> NanoSecond64
svarLatencyTarget :: NanoSecond64
, YieldRateInfo -> LatencyRange
svarLatencyRange :: LatencyRange
, YieldRateInfo -> Int
svarRateBuffer :: Int
, YieldRateInfo -> IORef Count
svarGainedLostYields :: IORef Count
, YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency :: IORef (Count, AbsTime)
, YieldRateInfo -> Maybe NanoSecond64
workerBootstrapLatency :: Maybe NanoSecond64
, YieldRateInfo -> IORef Count
workerPollingInterval :: IORef Count
, YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency :: IORef (Count, Count, NanoSecond64)
, YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
, YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency :: IORef NanoSecond64
}
data SVarStats = SVarStats {
SVarStats -> IORef Int
totalDispatches :: IORef Int
, SVarStats -> IORef Int
maxWorkers :: IORef Int
, SVarStats -> IORef Int
maxOutQSize :: IORef Int
, SVarStats -> IORef Int
maxHeapSize :: IORef Int
, SVarStats -> IORef Int
maxWorkQSize :: IORef Int
, SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency :: IORef (Count, NanoSecond64)
, SVarStats -> IORef NanoSecond64
minWorkerLatency :: IORef NanoSecond64
, SVarStats -> IORef NanoSecond64
maxWorkerLatency :: IORef NanoSecond64
, SVarStats -> IORef (Maybe AbsTime)
svarStopTime :: IORef (Maybe AbsTime)
}
data Limit = Unlimited | Limited Word deriving Int -> Limit -> ShowS
[Limit] -> ShowS
Limit -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Limit] -> ShowS
$cshowList :: [Limit] -> ShowS
show :: Limit -> String
$cshow :: Limit -> String
showsPrec :: Int -> Limit -> ShowS
$cshowsPrec :: Int -> Limit -> ShowS
Show
instance Eq Limit where
Limit
Unlimited == :: Limit -> Limit -> Bool
== Limit
Unlimited = Bool
True
Limit
Unlimited == Limited Word
_ = Bool
False
Limited Word
_ == Limit
Unlimited = Bool
False
Limited Word
x == Limited Word
y = Word
x forall a. Eq a => a -> a -> Bool
== Word
y
instance Ord Limit where
Limit
Unlimited <= :: Limit -> Limit -> Bool
<= Limit
Unlimited = Bool
True
Limit
Unlimited <= Limited Word
_ = Bool
False
Limited Word
_ <= Limit
Unlimited = Bool
True
Limited Word
x <= Limited Word
y = Word
x forall a. Ord a => a -> a -> Bool
<= Word
y
data SVarStopStyle =
StopNone
| StopAny
| StopBy
deriving (SVarStopStyle -> SVarStopStyle -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SVarStopStyle -> SVarStopStyle -> Bool
$c/= :: SVarStopStyle -> SVarStopStyle -> Bool
== :: SVarStopStyle -> SVarStopStyle -> Bool
$c== :: SVarStopStyle -> SVarStopStyle -> Bool
Eq, Int -> SVarStopStyle -> ShowS
[SVarStopStyle] -> ShowS
SVarStopStyle -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SVarStopStyle] -> ShowS
$cshowList :: [SVarStopStyle] -> ShowS
show :: SVarStopStyle -> String
$cshow :: SVarStopStyle -> String
showsPrec :: Int -> SVarStopStyle -> ShowS
$cshowsPrec :: Int -> SVarStopStyle -> ShowS
Show)
data PushBufferPolicy =
PushBufferDropNew
| PushBufferDropOld
| PushBufferBlock
data SVar t m a = SVar
{
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle :: SVarStyle
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun :: RunInIO m
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle :: SVarStopStyle
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy :: IORef ThreadId
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue :: IORef ([ChildEvent a], Int)
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell :: MVar ()
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ :: m [ChildEvent a]
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m Bool
postProcess :: m Bool
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer :: MVar ()
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit :: Limit
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit :: Limit
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace :: IORef Count
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> PushBufferPolicy
pushBufferPolicy :: PushBufferPolicy
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar :: MVar ()
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork :: Maybe (IORef Count)
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo :: Maybe YieldRateInfo
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> t m a -> IO ()
enqueue :: t m a -> IO ()
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone :: IO Bool
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isQueueDone :: IO Bool
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell :: IORef Bool
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> m ()
workLoop :: Maybe WorkerInfo -> m ()
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads :: IORef (Set ThreadId)
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount :: IORef Int
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread :: ThreadId -> m ()
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar :: MVar ()
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats :: SVarStats
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef ())
svarRef :: Maybe (IORef ())
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode :: Bool
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId
svarCreator :: ThreadId
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap :: IORef ( Heap (Entry Int (AheadHeapEntry t m a))
, Maybe Int)
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int)
aheadWorkQueue :: IORef ([t m a], Int)
}
data State t m a = State
{
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar :: Maybe (SVar t m a)
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
_yieldLimit :: Maybe Count
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
_threadsHigh :: Limit
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
_bufferHigh :: Limit
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
_streamLatency :: Maybe NanoSecond64
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
_maxStreamRate :: Maybe Rate
, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
_inspectMode :: Bool
}
magicMaxBuffer :: Word
magicMaxBuffer :: Word
magicMaxBuffer = Word
1500
defaultMaxThreads, defaultMaxBuffer :: Limit
defaultMaxThreads :: Limit
defaultMaxThreads = Word -> Limit
Limited Word
magicMaxBuffer
defaultMaxBuffer :: Limit
defaultMaxBuffer = Word -> Limit
Limited Word
magicMaxBuffer
defState :: State t m a
defState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState = State
{ streamVar :: Maybe (SVar t m a)
streamVar = forall a. Maybe a
Nothing
, _yieldLimit :: Maybe Count
_yieldLimit = forall a. Maybe a
Nothing
, _threadsHigh :: Limit
_threadsHigh = Limit
defaultMaxThreads
, _bufferHigh :: Limit
_bufferHigh = Limit
defaultMaxBuffer
, _maxStreamRate :: Maybe Rate
_maxStreamRate = forall a. Maybe a
Nothing
, _streamLatency :: Maybe NanoSecond64
_streamLatency = forall a. Maybe a
Nothing
, _inspectMode :: Bool
_inspectMode = Bool
False
}
adaptState :: State t m a -> State t n b
adaptState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State t m a
st = State t m a
st
{ streamVar :: Maybe (SVar t n b)
streamVar = forall a. Maybe a
Nothing
, _yieldLimit :: Maybe Count
_yieldLimit = forall a. Maybe a
Nothing
}
setYieldLimit :: Maybe Int64 -> State t m a -> State t m a
setYieldLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Maybe Int64 -> State t m a -> State t m a
setYieldLimit Maybe Int64
lim State t m a
st =
State t m a
st { _yieldLimit :: Maybe Count
_yieldLimit =
case Maybe Int64
lim of
Maybe Int64
Nothing -> forall a. Maybe a
Nothing
Just Int64
n ->
if Int64
n forall a. Ord a => a -> a -> Bool
<= Int64
0
then forall a. a -> Maybe a
Just Count
0
else forall a. a -> Maybe a
Just (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
n)
}
getYieldLimit :: State t m a -> Maybe Count
getYieldLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
_yieldLimit
setMaxThreads :: Int -> State t m a -> State t m a
setMaxThreads :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setMaxThreads Int
n State t m a
st =
State t m a
st { _threadsHigh :: Limit
_threadsHigh =
if Int
n forall a. Ord a => a -> a -> Bool
< Int
0
then Limit
Unlimited
else if Int
n forall a. Eq a => a -> a -> Bool
== Int
0
then Limit
defaultMaxThreads
else Word -> Limit
Limited (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
}
getMaxThreads :: State t m a -> Limit
getMaxThreads :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
_threadsHigh
setMaxBuffer :: Int -> State t m a -> State t m a
setMaxBuffer :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setMaxBuffer Int
n State t m a
st =
State t m a
st { _bufferHigh :: Limit
_bufferHigh =
if Int
n forall a. Ord a => a -> a -> Bool
< Int
0
then Limit
Unlimited
else if Int
n forall a. Eq a => a -> a -> Bool
== Int
0
then Limit
defaultMaxBuffer
else Word -> Limit
Limited (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
}
getMaxBuffer :: State t m a -> Limit
getMaxBuffer :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
_bufferHigh
setStreamRate :: Maybe Rate -> State t m a -> State t m a
setStreamRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Maybe Rate -> State t m a -> State t m a
setStreamRate Maybe Rate
r State t m a
st = State t m a
st { _maxStreamRate :: Maybe Rate
_maxStreamRate = Maybe Rate
r }
getStreamRate :: State t m a -> Maybe Rate
getStreamRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
_maxStreamRate
setStreamLatency :: Int -> State t m a -> State t m a
setStreamLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setStreamLatency Int
n State t m a
st =
State t m a
st { _streamLatency :: Maybe NanoSecond64
_streamLatency =
if Int
n forall a. Ord a => a -> a -> Bool
<= Int
0
then forall a. Maybe a
Nothing
else forall a. a -> Maybe a
Just (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
}
getStreamLatency :: State t m a -> Maybe NanoSecond64
getStreamLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
getStreamLatency = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
_streamLatency
setInspectMode :: State t m a -> State t m a
setInspectMode :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> State t m a
setInspectMode State t m a
st = State t m a
st { _inspectMode :: Bool
_inspectMode = Bool
True }
getInspectMode :: State t m a -> Bool
getInspectMode :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
_inspectMode
cleanupSVar :: SVar t m a -> IO ()
cleanupSVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
sv = do
Set ThreadId
workers <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv)
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort)
Set ThreadId
workers
cleanupSVarFromWorker :: SVar t m a -> IO ()
cleanupSVarFromWorker :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar t m a
sv = do
Set ThreadId
workers <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv)
ThreadId
self <- IO ThreadId
myThreadId
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort)
(forall a. (a -> Bool) -> [a] -> [a]
Prelude.filter (forall a. Eq a => a -> a -> Bool
/= ThreadId
self) forall a b. (a -> b) -> a -> b
$ forall a. Set a -> [a]
S.toList Set ThreadId
workers)
updateWorkerPollingInterval :: YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval :: YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval YieldRateInfo
yinfo NanoSecond64
latency = do
let periodRef :: IORef Count
periodRef = YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo
cnt :: NanoSecond64
cnt = forall a. Ord a => a -> a -> a
max NanoSecond64
1 forall a b. (a -> b) -> a -> b
$ NanoSecond64
minThreadDelay forall a. Integral a => a -> a -> a
`div` NanoSecond64
latency
period :: NanoSecond64
period = forall a. Ord a => a -> a -> a
min NanoSecond64
cnt (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)
forall a. IORef a -> a -> IO ()
writeIORef IORef Count
periodRef (forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
period)
{-# INLINE recordMinMaxLatency #-}
recordMinMaxLatency :: SVar t m a -> NanoSecond64 -> IO ()
recordMinMaxLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> NanoSecond64 -> IO ()
recordMinMaxLatency SVar t m a
sv NanoSecond64
new = do
let ss :: SVarStats
ss = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv
NanoSecond64
minLat <- forall a. IORef a -> IO a
readIORef (SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NanoSecond64
new forall a. Ord a => a -> a -> Bool
< NanoSecond64
minLat Bool -> Bool -> Bool
|| NanoSecond64
minLat forall a. Eq a => a -> a -> Bool
== NanoSecond64
0) forall a b. (a -> b) -> a -> b
$
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss) NanoSecond64
new
NanoSecond64
maxLat <- forall a. IORef a -> IO a
readIORef (SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NanoSecond64
new forall a. Ord a => a -> a -> Bool
> NanoSecond64
maxLat) forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss) NanoSecond64
new
recordAvgLatency :: SVar t m a -> (Count, NanoSecond64) -> IO ()
recordAvgLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (Count, NanoSecond64) -> IO ()
recordAvgLatency SVar t m a
sv (Count
count, NanoSecond64
time) = do
let ss :: SVarStats
ss = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency SVarStats
ss) forall a b. (a -> b) -> a -> b
$
\(Count
cnt, NanoSecond64
t) -> (Count
cnt forall a. Num a => a -> a -> a
+ Count
count, NanoSecond64
t forall a. Num a => a -> a -> a
+ NanoSecond64
time)
{-# INLINE collectWorkerPendingLatency #-}
collectWorkerPendingLatency
:: IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency :: IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency IORef (Count, Count, NanoSecond64)
cur IORef (Count, Count, NanoSecond64)
col = do
(Count
fcount, Count
count, NanoSecond64
time) <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Count, Count, NanoSecond64)
cur forall a b. (a -> b) -> a -> b
$ \(Count, Count, NanoSecond64)
v -> ((Count
0,Count
0,NanoSecond64
0), (Count, Count, NanoSecond64)
v)
(Count
fcnt, Count
cnt, NanoSecond64
t) <- forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
col
let totalCount :: Count
totalCount = Count
fcnt forall a. Num a => a -> a -> a
+ Count
fcount
latCount :: Count
latCount = Count
cnt forall a. Num a => a -> a -> a
+ Count
count
latTime :: NanoSecond64
latTime = NanoSecond64
t forall a. Num a => a -> a -> a
+ NanoSecond64
time
forall a. IORef a -> a -> IO ()
writeIORef IORef (Count, Count, NanoSecond64)
col (Count
totalCount, Count
latCount, NanoSecond64
latTime)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Count
latCount forall a. Eq a => a -> a -> Bool
== Count
0 Bool -> Bool -> Bool
|| NanoSecond64
latTime forall a. Eq a => a -> a -> Bool
/= NanoSecond64
0) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
let latPair :: Maybe (Count, NanoSecond64)
latPair =
if Count
latCount forall a. Ord a => a -> a -> Bool
> Count
0 Bool -> Bool -> Bool
&& NanoSecond64
latTime forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
then forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ (Count
latCount, NanoSecond64
latTime)
else forall a. Maybe a
Nothing
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
totalCount, Maybe (Count, NanoSecond64)
latPair)
{-# INLINE shouldUseCollectedBatch #-}
shouldUseCollectedBatch
:: Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> Bool
shouldUseCollectedBatch :: Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> Bool
shouldUseCollectedBatch Count
collectedYields NanoSecond64
collectedTime NanoSecond64
newLat NanoSecond64
prevLat =
let r :: Double
r = forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
newLat forall a. Fractional a => a -> a -> a
/ forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
prevLat :: Double
in (Count
collectedYields forall a. Ord a => a -> a -> Bool
> forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)
Bool -> Bool -> Bool
|| (NanoSecond64
collectedTime forall a. Ord a => a -> a -> Bool
> NanoSecond64
minThreadDelay)
Bool -> Bool -> Bool
|| (NanoSecond64
prevLat forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 Bool -> Bool -> Bool
&& (Double
r forall a. Ord a => a -> a -> Bool
> Double
2 Bool -> Bool -> Bool
|| Double
r forall a. Ord a => a -> a -> Bool
< Double
0.5))
Bool -> Bool -> Bool
|| (NanoSecond64
prevLat forall a. Eq a => a -> a -> Bool
== NanoSecond64
0)
collectLatency :: SVar t m a
-> YieldRateInfo
-> Bool
-> IO (Count, AbsTime, NanoSecond64)
collectLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
drain = do
let cur :: IORef (Count, Count, NanoSecond64)
cur = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
col :: IORef (Count, Count, NanoSecond64)
col = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency YieldRateInfo
yinfo
longTerm :: IORef (Count, AbsTime)
longTerm = YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
measured :: IORef NanoSecond64
measured = YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency YieldRateInfo
yinfo
(Count
newCount, Maybe (Count, NanoSecond64)
newLatPair) <- IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency IORef (Count, Count, NanoSecond64)
cur IORef (Count, Count, NanoSecond64)
col
(Count
lcount, AbsTime
ltime) <- forall a. IORef a -> IO a
readIORef IORef (Count, AbsTime)
longTerm
NanoSecond64
prevLat <- forall a. IORef a -> IO a
readIORef IORef NanoSecond64
measured
let newLcount :: Count
newLcount = Count
lcount forall a. Num a => a -> a -> a
+ Count
newCount
retWith :: c -> m (Count, AbsTime, c)
retWith c
lat = forall (m :: * -> *) a. Monad m => a -> m a
return (Count
newLcount, AbsTime
ltime, c
lat)
case Maybe (Count, NanoSecond64)
newLatPair of
Maybe (Count, NanoSecond64)
Nothing -> forall {m :: * -> *} {c}. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
prevLat
Just (Count
count, NanoSecond64
time) -> do
let newLat :: NanoSecond64
newLat = NanoSecond64
time forall a. Integral a => a -> a -> a
`div` (forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
count)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> NanoSecond64 -> IO ()
recordMinMaxLatency SVar t m a
sv NanoSecond64
newLat
if Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> Bool
shouldUseCollectedBatch Count
newCount NanoSecond64
time NanoSecond64
newLat NanoSecond64
prevLat Bool -> Bool -> Bool
|| Bool
drain
then do
YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval YieldRateInfo
yinfo (forall a. Ord a => a -> a -> a
max NanoSecond64
newLat NanoSecond64
prevLat)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (Count, NanoSecond64) -> IO ()
recordAvgLatency SVar t m a
sv (Count
count, NanoSecond64
time)
forall a. IORef a -> a -> IO ()
writeIORef IORef (Count, Count, NanoSecond64)
col (Count
0, Count
0, NanoSecond64
0)
forall a. IORef a -> a -> IO ()
writeIORef IORef NanoSecond64
measured ((NanoSecond64
prevLat forall a. Num a => a -> a -> a
+ NanoSecond64
newLat) forall a. Integral a => a -> a -> a
`div` NanoSecond64
2)
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Count, AbsTime)
longTerm forall a b. (a -> b) -> a -> b
$ \(Count
_, AbsTime
t) -> (Count
newLcount, AbsTime
t)
forall {m :: * -> *} {c}. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
newLat
else forall {m :: * -> *} {c}. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
prevLat
dumpSVarStats :: SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats SVar t m a
sv SVarStats
ss SVarStyle
style = do
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just YieldRateInfo
yinfo -> do
(Count, AbsTime, NanoSecond64)
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
True
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Int
dispatches <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
totalDispatches SVarStats
ss
Int
maxWrk <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxWorkers SVarStats
ss
Int
maxOq <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxOutQSize SVarStats
ss
Int
maxHp <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxHeapSize SVarStats
ss
NanoSecond64
minLat <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss
NanoSecond64
maxLat <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss
(Count
avgCnt, NanoSecond64
avgTime) <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency SVarStats
ss
(Count
svarCnt, Count
svarGainLossCnt, RelTime64
svarLat) <- case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return (Count
0, Count
0, RelTime64
0)
Just YieldRateInfo
yinfo -> do
(Count
cnt, AbsTime
startTime) <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
if Count
cnt forall a. Ord a => a -> a -> Bool
> Count
0
then do
Maybe AbsTime
t <- forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime SVarStats
ss)
Count
gl <- forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
case Maybe AbsTime
t of
Maybe AbsTime
Nothing -> do
AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
let interval :: RelTime64
interval = AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
startTime
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
cnt, Count
gl, RelTime64
interval forall a. Integral a => a -> a -> a
`div` forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
cnt)
Just AbsTime
stopTime -> do
let interval :: RelTime64
interval = AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
stopTime AbsTime
startTime
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
cnt, Count
gl, RelTime64
interval forall a. Integral a => a -> a -> a
`div` forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
cnt)
else forall (m :: * -> *) a. Monad m => a -> m a
return (Count
0, Count
0, RelTime64
0)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
[ String
"total dispatches = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
dispatches
, String
"max workers = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
maxWrk
, String
"max outQSize = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
maxOq
forall a. Semigroup a => a -> a -> a
<> (if SVarStyle
style forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar
then String
"\nheap max size = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
maxHp
else String
"")
forall a. Semigroup a => a -> a -> a
<> (if NanoSecond64
minLat forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
then String
"\nmin worker latency = " forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> String
showNanoSecond64 NanoSecond64
minLat
else String
"")
forall a. Semigroup a => a -> a -> a
<> (if NanoSecond64
maxLat forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
then String
"\nmax worker latency = " forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> String
showNanoSecond64 NanoSecond64
maxLat
else String
"")
forall a. Semigroup a => a -> a -> a
<> (if Count
avgCnt forall a. Ord a => a -> a -> Bool
> Count
0
then let lat :: NanoSecond64
lat = NanoSecond64
avgTime forall a. Integral a => a -> a -> a
`div` forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
avgCnt
in String
"\navg worker latency = " forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> String
showNanoSecond64 NanoSecond64
lat
else String
"")
forall a. Semigroup a => a -> a -> a
<> (if RelTime64
svarLat forall a. Ord a => a -> a -> Bool
> RelTime64
0
then String
"\nSVar latency = " forall a. Semigroup a => a -> a -> a
<> RelTime64 -> String
showRelTime64 RelTime64
svarLat
else String
"")
forall a. Semigroup a => a -> a -> a
<> (if Count
svarCnt forall a. Ord a => a -> a -> Bool
> Count
0
then String
"\nSVar yield count = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Count
svarCnt
else String
"")
forall a. Semigroup a => a -> a -> a
<> (if Count
svarGainLossCnt forall a. Ord a => a -> a -> Bool
> Count
0
then String
"\nSVar gain/loss yield count = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Count
svarGainLossCnt
else String
"")
]
{-# NOINLINE dumpSVar #-}
dumpSVar :: SVar t m a -> IO String
dumpSVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv = do
([ChildEvent a]
oqList, Int
oqLen) <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv
Maybe ()
db <- forall a. MVar a -> IO (Maybe a)
tryReadMVar forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv
String
aheadDump <-
if forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar
then do
(Heap (Entry Int (AheadHeapEntry t m a))
oheap, Maybe Int
oheapSeq) <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap SVar t m a
sv
([t m a]
wq, Int
wqSeq) <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int)
aheadWorkQueue SVar t m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
[ String
"heap length = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry t m a))
oheap)
, String
"heap seqeunce = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Maybe Int
oheapSeq
, String
"work queue length = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (forall (t :: * -> *) a. Foldable t => t a -> Int
length [t m a]
wq)
, String
"work queue sequence = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
wqSeq
]
else forall (m :: * -> *) a. Monad m => a -> m a
return []
let style :: SVarStyle
style = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv
Bool
waiting <-
if SVarStyle
style forall a. Eq a => a -> a -> Bool
/= SVarStyle
ParallelVar
then forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Set ThreadId
rthread <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv
Int
workers <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
String
stats <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats SVar t m a
sv (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
[
String
"Creator tid = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId
svarCreator SVar t m a
sv),
String
"style = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv)
, String
"---------CURRENT STATE-----------"
, String
"outputQueue length computed = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (forall (t :: * -> *) a. Foldable t => t a -> Int
length [ChildEvent a]
oqList)
, String
"outputQueue length maintained = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
oqLen
, String
"outputDoorBell = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Maybe ()
db
]
forall a. Semigroup a => a -> a -> a
<> String
aheadDump
forall a. Semigroup a => a -> a -> a
<> [String] -> String
unlines
[ String
"needDoorBell = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Bool
waiting
, String
"running threads = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Set ThreadId
rthread
, String
"running thread count = " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
workers
]
forall a. Semigroup a => a -> a -> a
<> String
"---------STATS-----------\n"
forall a. Semigroup a => a -> a -> a
<> String
stats
printSVar :: SVar t m a -> String -> IO ()
printSVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
sv String
how = do
String
svInfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv
Handle -> String -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ String
"\n" forall a. Semigroup a => a -> a -> a
<> String
how forall a. Semigroup a => a -> a -> a
<> String
"\n" forall a. Semigroup a => a -> a -> a
<> String
svInfo
{-# NOINLINE mvarExcHandler #-}
mvarExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler SVar t m a
sv String
label e :: BlockedIndefinitelyOnMVar
e@BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar = do
String
svInfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv
Handle -> String -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ String
label forall a. Semigroup a => a -> a -> a
<> String
" " forall a. Semigroup a => a -> a -> a
<> String
"BlockedIndefinitelyOnMVar\n" forall a. Semigroup a => a -> a -> a
<> String
svInfo
forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnMVar
e
{-# NOINLINE stmExcHandler #-}
stmExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler SVar t m a
sv String
label e :: BlockedIndefinitelyOnSTM
e@BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM = do
String
svInfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv
Handle -> String -> IO ()
hPutStrLn Handle
stderr forall a b. (a -> b) -> a -> b
$ String
label forall a. Semigroup a => a -> a -> a
<> String
" " forall a. Semigroup a => a -> a -> a
<> String
"BlockedIndefinitelyOnSTM\n" forall a. Semigroup a => a -> a -> a
<> String
svInfo
forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnSTM
e
withDiagMVar :: SVar t m a -> String -> IO () -> IO ()
withDiagMVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar t m a
sv String
label IO ()
action =
if forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv
then
IO ()
action forall a. IO a -> [Handler a] -> IO a
`catches` [ forall a e. Exception e => (e -> IO a) -> Handler a
Handler (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler SVar t m a
sv String
label)
, forall a e. Exception e => (e -> IO a) -> Handler a
Handler (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler SVar t m a
sv String
label)
]
else IO ()
action
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
newtype RunInIO m = RunInIO { forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO :: forall b. m b -> IO (StM m b) }
captureMonadState :: MonadBaseControl IO m => m (RunInIO m)
captureMonadState :: forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState = forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b (StM m a)) -> m a
control forall a b. (a -> b) -> a -> b
$ \RunInBase m IO
run -> RunInBase m IO
run (forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). (forall b. m b -> IO (StM m b)) -> RunInIO m
RunInIO RunInBase m IO
run)
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
#if MIN_VERSION_base(4,17,0)
rawForkIO (IO action) =
#else
rawForkIO :: IO () -> IO ThreadId
rawForkIO IO ()
action =
#endif
forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO forall a b. (a -> b) -> a -> b
$ \ State# RealWorld
s -> case forall a.
a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
fork# IO ()
action State# RealWorld
s of (# State# RealWorld
s1, ThreadId#
tid #) -> (# State# RealWorld
s1, ThreadId# -> ThreadId
ThreadId ThreadId#
tid #)
{-# INLINE doFork #-}
doFork :: MonadBaseControl IO m
=> m ()
-> RunInIO m
-> (SomeException -> IO ())
-> m ThreadId
doFork :: forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork m ()
action (RunInIO forall b. m b -> IO (StM m b)
mrun) SomeException -> IO ()
exHandler =
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b (StM m a)) -> m a
control forall a b. (a -> b) -> a -> b
$ \forall b. m b -> IO (StM m b)
run ->
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
ThreadId
tid <- IO () -> IO ThreadId
rawForkIO forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch (forall a. IO a -> IO a
restore forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun m ()
action)
SomeException -> IO ()
exHandler
forall b. m b -> IO (StM m b)
run (forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
tid)
{-# INLINABLE fork #-}
fork :: MonadBaseControl IO m => m () -> m ThreadId
fork :: forall (m :: * -> *). MonadBaseControl IO m => m () -> m ThreadId
fork = forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(b () -> b a) -> m () -> m a
liftBaseDiscard IO () -> IO ThreadId
forkIO
{-# INLINABLE forkManaged #-}
forkManaged :: (MonadIO m, MonadBaseControl IO m) => m () -> m ThreadId
forkManaged :: forall (m :: * -> *).
(MonadIO m, MonadBaseControl IO m) =>
m () -> m ThreadId
forkManaged m ()
action = do
ThreadId
tid <- forall (m :: * -> *). MonadBaseControl IO m => m () -> m ThreadId
fork m ()
action
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall key. key -> IO () -> IO ()
addFinalizer ThreadId
tid (ThreadId -> IO ()
killThread ThreadId
tid)
forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
tid
{-# INLINE decrementYieldLimit #-}
decrementYieldLimit :: SVar t m a -> IO Bool
decrementYieldLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar t m a
sv =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just IORef Count
ref -> do
Count
r <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref forall a b. (a -> b) -> a -> b
$ \Count
x -> (Count
x forall a. Num a => a -> a -> a
- Count
1, Count
x)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Count
r forall a. Ord a => a -> a -> Bool
>= Count
1
{-# INLINE incrementYieldLimit #-}
incrementYieldLimit :: SVar t m a -> IO ()
incrementYieldLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar t m a
sv =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just IORef Count
ref -> forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ IORef Count
ref (forall a. Num a => a -> a -> a
+ Count
1)
{-# INLINE decrementBufferLimit #-}
decrementBufferLimit :: SVar t m a -> IO ()
decrementBufferLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar t m a
sv =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Limited Word
_ -> do
let ref :: IORef Count
ref = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv
Count
old <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref forall a b. (a -> b) -> a -> b
$ \Count
x ->
(if Count
x forall a. Ord a => a -> a -> Bool
>= Count
1 then Count
x forall a. Num a => a -> a -> a
- Count
1 else Count
x, Count
x)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
old forall a. Ord a => a -> a -> Bool
<= Count
0) forall a b. (a -> b) -> a -> b
$
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> PushBufferPolicy
pushBufferPolicy SVar t m a
sv of
PushBufferPolicy
PushBufferBlock -> IO ()
blockAndRetry
PushBufferPolicy
PushBufferDropNew -> do
Bool
block <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv) forall a b. (a -> b) -> a -> b
$
\([ChildEvent a]
es, Int
n) ->
case [ChildEvent a]
es of
[] -> (([],Int
n), Bool
True)
ChildEvent a
_ : [ChildEvent a]
xs -> (([ChildEvent a]
xs, Int
n forall a. Num a => a -> a -> a
- Int
1), Bool
False)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
block IO ()
blockAndRetry
PushBufferPolicy
PushBufferDropOld -> forall a. (?callStack::CallStack) => a
undefined
where
blockAndRetry :: IO ()
blockAndRetry = do
let ref :: IORef Count
ref = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv)
Count
old <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref forall a b. (a -> b) -> a -> b
$ \Count
x ->
(if Count
x forall a. Ord a => a -> a -> Bool
>= Count
1 then Count
x forall a. Num a => a -> a -> a
- Count
1 else Count
x, Count
x)
if Count
old forall a. Ord a => a -> a -> Bool
>= Count
1
then forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
else IO ()
blockAndRetry
{-# INLINE incrementBufferLimit #-}
incrementBufferLimit :: SVar t m a -> IO ()
incrementBufferLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar t m a
sv =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Limited Word
_ -> do
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv) (forall a. Num a => a -> a -> a
+ Count
1)
IO ()
writeBarrier
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
{-# INLINE resetBufferLimit #-}
resetBufferLimit :: SVar t m a -> IO ()
resetBufferLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
resetBufferLimit SVar t m a
sv =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Limited Word
n -> forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv)
(forall a b. a -> b -> a
const (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
n))
{-# INLINE sendWithDoorBell #-}
sendWithDoorBell ::
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell :: forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell ChildEvent a
msg = do
Int
oldlen <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([ChildEvent a], Int)
q forall a b. (a -> b) -> a -> b
$ \([ChildEvent a]
es, Int
n) ->
((ChildEvent a
msg forall a. a -> [a] -> [a]
: [ChildEvent a]
es, Int
n forall a. Num a => a -> a -> a
+ Int
1), Int
n)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
oldlen forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$ do
IO ()
writeBarrier
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
bell ()
forall (m :: * -> *) a. Monad m => a -> m a
return Int
oldlen
send :: SVar t m a -> ChildEvent a -> IO Int
send :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv ChildEvent a
msg = forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv) ChildEvent a
msg
sendToProducer :: SVar t m a -> ChildEvent a -> IO Int
sendToProducer :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv ChildEvent a
msg = do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer SVar t m a
sv)
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar t m a
sv) ChildEvent a
msg
sendStopToProducer :: MonadIO m => SVar t m a -> m ()
sendStopToProducer :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
sendStopToProducer SVar t m a
sv = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
ThreadId
tid <- IO ThreadId
myThreadId
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv (forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid forall a. Maybe a
Nothing)
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo = do
(Count
cnt0, AbsTime
t0) <- forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo)
Count
cnt1 <- forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo)
let cnt :: Count
cnt = Count
cnt1 forall a. Num a => a -> a -> a
- Count
cnt0
if Count
cnt forall a. Ord a => a -> a -> Bool
> Count
0
then do
AbsTime
t1 <- Clock -> IO AbsTime
getTime Clock
Monotonic
let period :: NanoSecond64
period = forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
t1 AbsTime
t0
forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo) (Count
cnt1, AbsTime
t1)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just (Count
cnt, NanoSecond64
period)
else forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
yinfo WorkerInfo
winfo = do
Maybe (Count, NanoSecond64)
r <- WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo
case Maybe (Count, NanoSecond64)
r of
Just (Count
cnt, NanoSecond64
period) -> do
let ref :: IORef (Count, Count, NanoSecond64)
ref = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
(Count
cnt1, NanoSecond64
t1) = if NanoSecond64
period forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then (Count
cnt, NanoSecond64
period) else (Count
0, NanoSecond64
0)
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ IORef (Count, Count, NanoSecond64)
ref forall a b. (a -> b) -> a -> b
$
\(Count
fc, Count
n, NanoSecond64
t) -> (Count
fc forall a. Num a => a -> a -> a
+ Count
cnt, Count
n forall a. Num a => a -> a -> a
+ Count
cnt1, NanoSecond64
t forall a. Num a => a -> a -> a
+ NanoSecond64
t1)
Maybe (Count, NanoSecond64)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount WorkerInfo
winfo = do
Count
cnt <- forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo)
let cnt1 :: Count
cnt1 = Count
cnt forall a. Num a => a -> a -> a
+ Count
1
forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo) Count
cnt1
forall (m :: * -> *) a. Monad m => a -> m a
return Count
cnt1
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
winfo =
let ymax :: Count
ymax = WorkerInfo -> Count
workerYieldMax WorkerInfo
winfo
in Count
ymax forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& Count
cnt forall a. Ord a => a -> a -> Bool
>= Count
ymax
{-# NOINLINE checkRatePeriodic #-}
checkRatePeriodic :: SVar t m a
-> YieldRateInfo
-> WorkerInfo
-> Count
-> IO Bool
checkRatePeriodic :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo Count
ycnt = do
Count
i <- forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo)
if Count
i forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& (Count
ycnt forall a. Integral a => a -> a -> a
`mod` Count
i) forall a. Eq a => a -> a -> Bool
== Count
0
then do
YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
yinfo WorkerInfo
winfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar t m a
sv YieldRateInfo
yinfo
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
{-# NOINLINE workerRateControl #-}
workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo = do
Count
cnt <- WorkerInfo -> IO Count
updateYieldCount WorkerInfo
winfo
Bool
beyondMaxRate <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo Count
cnt
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not (Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
winfo Bool -> Bool -> Bool
|| Bool
beyondMaxRate)
{-# INLINE sendYield #-}
sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar t m a
sv Maybe WorkerInfo
mwinfo ChildEvent a
msg = do
Int
oldlen <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv ChildEvent a
msg
let limit :: Limit
limit = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv
Bool
bufferSpaceOk <- case Limit
limit of
Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Limited Word
lim -> do
Int
active <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (Int
oldlen forall a. Num a => a -> a -> a
+ Int
1) forall a. Ord a => a -> a -> Bool
< (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim forall a. Num a => a -> a -> a
- Int
active)
Bool
rateLimitOk <-
case Maybe WorkerInfo
mwinfo of
Just WorkerInfo
winfo ->
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just YieldRateInfo
yinfo -> forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo
Maybe WorkerInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
bufferSpaceOk Bool -> Bool -> Bool
&& Bool
rateLimitOk
{-# INLINE workerStopUpdate #-}
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
info = do
Count
i <- forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
info)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
i forall a. Eq a => a -> a -> Bool
/= Count
0) forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
info WorkerInfo
winfo
{-# INLINABLE sendStop #-}
sendStop :: SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
sv Maybe WorkerInfo
mwinfo = do
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n forall a. Num a => a -> a -> a
- Int
1
case (Maybe WorkerInfo
mwinfo, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv) of
(Just WorkerInfo
winfo, Just YieldRateInfo
info) ->
WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
info
(Maybe WorkerInfo, Maybe YieldRateInfo)
_ ->
forall (m :: * -> *) a. Monad m => a -> m a
return ()
IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ThreadId
tid -> forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv (forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid forall a. Maybe a
Nothing)
{-# INLINE ringDoorBell #-}
ringDoorBell :: SVar t m a -> IO ()
ringDoorBell :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv = do
IO ()
storeLoadBarrier
Bool
w <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
w forall a b. (a -> b) -> a -> b
$ do
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv) (forall a b. a -> b -> a
const Bool
False)
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv) ()
{-# INLINE enqueueLIFO #-}
enqueueLIFO :: SVar t m a -> IORef [t m a] -> t m a -> IO ()
enqueueLIFO :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef [t m a] -> t m a -> IO ()
enqueueLIFO SVar t m a
sv IORef [t m a]
q t m a
m = do
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ IORef [t m a]
q forall a b. (a -> b) -> a -> b
$ \[t m a]
ms -> t m a
m forall a. a -> [a] -> [a]
: [t m a]
ms
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
{-# INLINE enqueueFIFO #-}
enqueueFIFO :: SVar t m a -> LinkedQueue (t m a) -> t m a -> IO ()
enqueueFIFO :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> LinkedQueue (t m a) -> t m a -> IO ()
enqueueFIFO SVar t m a
sv LinkedQueue (t m a)
q t m a
m = do
forall a. LinkedQueue a -> a -> IO ()
pushL LinkedQueue (t m a)
q t m a
m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
{-# INLINE enqueueAhead #-}
enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
enqueueAhead :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
enqueueAhead SVar t m a
sv IORef ([t m a], Int)
q t m a
m = do
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ IORef ([t m a], Int)
q forall a b. (a -> b) -> a -> b
$ \ case
([], Int
n) -> ([t m a
m], Int
n forall a. Num a => a -> a -> a
+ Int
1)
([t m a], Int)
_ -> forall a. (?callStack::CallStack) => String -> a
error String
"enqueueAhead: queue is not empty"
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
{-# INLINE reEnqueueAhead #-}
reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar t m a
sv IORef ([t m a], Int)
q t m a
m = do
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ IORef ([t m a], Int)
q forall a b. (a -> b) -> a -> b
$ \ case
([], Int
n) -> ([t m a
m], Int
n)
([t m a], Int)
_ -> forall a. (?callStack::CallStack) => String -> a
error String
"reEnqueueAhead: queue is not empty"
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
{-# INLINE queueEmptyAhead #-}
queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool
queueEmptyAhead :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m Bool
queueEmptyAhead IORef ([t m a], Int)
q = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
([t m a]
xs, Int
_) <- forall a. IORef a -> IO a
readIORef IORef ([t m a], Int)
q
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t a -> Bool
null [t m a]
xs
{-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m
=> IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([t m a], Int)
q = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([t m a], Int)
q forall a b. (a -> b) -> a -> b
$ \case
([], Int
n) -> (([], Int
n), forall a. Maybe a
Nothing)
(t m a
x : [], Int
n) -> (([], Int
n), forall a. a -> Maybe a
Just (t m a
x, Int
n))
([t m a], Int)
_ -> forall a. (?callStack::CallStack) => String -> a
error String
"more than one item on queue"
withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef :: forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef a
ref a -> IO b
f = forall a. IORef a -> IO a
readIORef IORef a
ref forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> IO b
f
atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ :: forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ IORef a
ref a -> a
f =
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef a
ref forall a b. (a -> b) -> a -> b
$ \a
x -> (a -> a
f a
x, ())
data HeapDequeueResult t m a =
Clearing
| Waiting Int
| Ready (Entry Int (AheadHeapEntry t m a))
{-# INLINE dequeueFromHeap #-}
dequeueFromHeap
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar =
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \pair :: (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair@(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
case Maybe Int
snum of
Maybe Int
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
HeapDequeueResult t m a
Clearing)
Just Int
n -> do
let r :: Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r = forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
case Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r of
Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
n
then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', forall a. Maybe a
Nothing), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
else forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
seqNo forall a. Ord a => a -> a -> Bool
>= Int
n) ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
i =
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
case Maybe Int
snum of
Maybe Int
Nothing -> do
let r :: Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r = forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
case Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r of
Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
i
then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', forall a. Maybe a
Nothing), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
else forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
seqNo forall a. Ord a => a -> a -> Bool
>= Int
i) ((Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
i), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
i), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
Just Int
_ -> forall a. (?callStack::CallStack) => String -> a
error String
"dequeueFromHeapSeq: unreachable"
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo =
case Maybe Int
snum of
Maybe Int
Nothing -> Bool
True
Just Int
n -> Int
seqNo forall a. Ord a => a -> a -> Bool
>= Int
n
{-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a)
-> Int
-> IO ()
requeueOnHeapTop :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Entry Int (AheadHeapEntry t m a)
ent Int
seqNo =
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (forall a. Ord a => a -> Heap a -> Heap a
H.insert Entry Int (AheadHeapEntry t m a)
ent Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
seqNo)
{-# INLINE updateHeapSeq #-}
updateHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO ()
updateHeapSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
seqNo =
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
seqNo)
{-# NOINLINE addThread #-}
addThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
addThread :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
addThread SVar t m a
sv ThreadId
tid =
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv) (forall a. Ord a => a -> Set a -> Set a
S.insert ThreadId
tid)
{-# INLINE delThread #-}
delThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
delThread :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar t m a
sv ThreadId
tid =
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv) (forall a. Ord a => a -> Set a -> Set a
S.delete ThreadId
tid)
{-# INLINE modifyThread #-}
modifyThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
modifyThread :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv ThreadId
tid = do
Set ThreadId
changed <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ \Set ThreadId
old ->
if forall a. Ord a => a -> Set a -> Bool
S.member ThreadId
tid Set ThreadId
old
then let new :: Set ThreadId
new = forall a. Ord a => a -> Set a -> Set a
S.delete ThreadId
tid Set ThreadId
old in (Set ThreadId
new, Set ThreadId
new)
else let new :: Set ThreadId
new = forall a. Ord a => a -> Set a -> Set a
S.insert ThreadId
tid Set ThreadId
old in (Set ThreadId
new, Set ThreadId
old)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: * -> *) a. Foldable t => t a -> Bool
null Set ThreadId
changed) forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
IO ()
writeBarrier
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv) ()
{-# INLINE allThreadsDone #-}
allThreadsDone :: MonadIO m => SVar t m a -> m Bool
allThreadsDone :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Set a -> Bool
S.null forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv)
{-# NOINLINE handleChildException #-}
handleChildException :: SVar t m a -> SomeException -> IO ()
handleChildException :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv SomeException
e = do
ThreadId
tid <- IO ThreadId
myThreadId
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv (forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (forall a. a -> Maybe a
Just SomeException
e))
{-# NOINLINE handleFoldException #-}
handleFoldException :: SVar t m a -> SomeException -> IO ()
handleFoldException :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleFoldException SVar t m a
sv SomeException
e = do
ThreadId
tid <- IO ThreadId
myThreadId
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv (forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (forall a. a -> Maybe a
Just SomeException
e))
{-# NOINLINE recordMaxWorkers #-}
recordMaxWorkers :: MonadIO m => SVar t m a -> m ()
recordMaxWorkers :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
Int
active <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv)
Int
maxWrk <- forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxWorkers forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active forall a. Ord a => a -> a -> Bool
> Int
maxWrk) forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxWorkers forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv) Int
active
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVarStats -> IORef Int
totalDispatches forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv) (forall a. Num a => a -> a -> a
+Int
1)
{-# NOINLINE pushWorker #-}
pushWorker :: MonadAsync m => Count -> SVar t m a -> m ()
pushWorker :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
yieldMax SVar t m a
sv = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n forall a. Num a => a -> a -> a
+ Int
1
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv
Maybe WorkerInfo
winfo <-
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just YieldRateInfo
_ -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
IORef Count
cntRef <- forall a. a -> IO (IORef a)
newIORef Count
0
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Count, AbsTime)
lat <- forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just WorkerInfo
{ workerYieldMax :: Count
workerYieldMax = Count
yieldMax
, workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
, workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
}
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> m ()
workLoop SVar t m a
sv Maybe WorkerInfo
winfo) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
addThread SVar t m a
sv
{-# INLINE pushWorkerPar #-}
pushWorkerPar
:: MonadAsync m
=> SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar t m a
sv Maybe WorkerInfo -> m ()
wloop =
if forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv
then m ()
forkWithDiag
else forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
wloop forall a. Maybe a
Nothing) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
where
{-# NOINLINE forkWithDiag #-}
forkWithDiag :: m ()
forkWithDiag = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n forall a. Num a => a -> a -> a
+ Int
1
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv
Maybe WorkerInfo
winfo <-
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just YieldRateInfo
_ -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
IORef Count
cntRef <- forall a. a -> IO (IORef a)
newIORef Count
0
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Count, AbsTime)
lat <- forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just WorkerInfo
{ workerYieldMax :: Count
workerYieldMax = Count
0
, workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
, workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
}
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
wloop Maybe WorkerInfo
winfo) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
dispatchWorker :: MonadAsync m => Count -> SVar t m a -> m Bool
dispatchWorker :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
yieldCount SVar t m a
sv = do
let workerLimit :: Limit
workerLimit = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit SVar t m a
sv
Bool
done <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
if Bool -> Bool
not Bool
done
then do
Bool
qDone <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isQueueDone SVar t m a
sv
Int
active <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
if Bool -> Bool
not Bool
qDone
then do
Limit
limit <- case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Limit
workerLimit
Just IORef Count
ref -> do
Count
n <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef Count
ref
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Just YieldRateInfo
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Limit
workerLimit
Maybe YieldRateInfo
Nothing ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
case Limit
workerLimit of
Limit
Unlimited -> Word -> Limit
Limited (forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
n)
Limited Word
lim -> Word -> Limit
Limited forall a b. (a -> b) -> a -> b
$ forall a. Ord a => a -> a -> a
min Word
lim (forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
n)
let dispatch :: m Bool
dispatch = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
yieldCount SVar t m a
sv forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
in case Limit
limit of
Limit
Unlimited -> m Bool
dispatch
Limited Word
lim | Word
lim forall a. Ord a => a -> a -> Bool
> forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
active -> m Bool
dispatch
Limit
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
minThreadDelay :: NanoSecond64
minThreadDelay :: NanoSecond64
minThreadDelay = NanoSecond64
1000000
rateRecoveryTime :: NanoSecond64
rateRecoveryTime :: NanoSecond64
rateRecoveryTime = NanoSecond64
1000000
data Work
= BlockWait NanoSecond64
| PartialWorker Count
| ManyWorkers Int Count
deriving Int -> Work -> ShowS
[Work] -> ShowS
Work -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Work] -> ShowS
$cshowList :: [Work] -> ShowS
show :: Work -> String
$cshow :: Work -> String
showsPrec :: Int -> Work -> ShowS
$cshowsPrec :: Int -> Work -> ShowS
Show
estimateWorkers
:: Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers :: Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
gainLossYields
NanoSecond64
svarElapsed NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range =
let
targetYields :: NanoSecond64
targetYields = (NanoSecond64
svarElapsed forall a. Num a => a -> a -> a
+ NanoSecond64
wLatency forall a. Num a => a -> a -> a
+ NanoSecond64
targetLat forall a. Num a => a -> a -> a
- NanoSecond64
1) forall a. Integral a => a -> a -> a
`div` NanoSecond64
targetLat
effectiveYields :: Count
effectiveYields = Count
svarYields forall a. Num a => a -> a -> a
+ Count
gainLossYields
deltaYields :: Count
deltaYields = forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetYields forall a. Num a => a -> a -> a
- Count
effectiveYields
in if Count
deltaYields forall a. Ord a => a -> a -> Bool
> Count
0
then
let deltaYieldsFreq :: Double
deltaYieldsFreq :: Double
deltaYieldsFreq =
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields forall a. Fractional a => a -> a -> a
/
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
rateRecoveryTime
yieldsFreq :: Double
yieldsFreq = Double
1.0 forall a. Fractional a => a -> a -> a
/ forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetLat
totalYieldsFreq :: Double
totalYieldsFreq = Double
yieldsFreq forall a. Num a => a -> a -> a
+ Double
deltaYieldsFreq
requiredLat :: NanoSecond64
requiredLat = Int64 -> NanoSecond64
NanoSecond64 forall a b. (a -> b) -> a -> b
$ forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
1.0 forall a. Fractional a => a -> a -> a
/ Double
totalYieldsFreq
adjustedLat :: NanoSecond64
adjustedLat = forall a. Ord a => a -> a -> a
min (forall a. Ord a => a -> a -> a
max NanoSecond64
requiredLat (LatencyRange -> NanoSecond64
minLatency LatencyRange
range))
(LatencyRange -> NanoSecond64
maxLatency LatencyRange
range)
in forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
adjustedLat forall a. Ord a => a -> a -> Bool
> NanoSecond64
0) forall a b. (a -> b) -> a -> b
$
if NanoSecond64
wLatency forall a. Ord a => a -> a -> Bool
<= NanoSecond64
adjustedLat
then Count -> Work
PartialWorker Count
deltaYields
else let workers :: NanoSecond64
workers = forall {p}. (Ord p, Num p) => p -> p
withLimit forall a b. (a -> b) -> a -> b
$ NanoSecond64
wLatency forall a. Integral a => a -> a -> a
`div` NanoSecond64
adjustedLat
limited :: NanoSecond64
limited = forall a. Ord a => a -> a -> a
min NanoSecond64
workers (forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields)
in Int -> Count -> Work
ManyWorkers (forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
limited) Count
deltaYields
else
let expectedDuration :: NanoSecond64
expectedDuration = forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
effectiveYields forall a. Num a => a -> a -> a
* NanoSecond64
targetLat
sleepTime :: NanoSecond64
sleepTime = NanoSecond64
expectedDuration forall a. Num a => a -> a -> a
- NanoSecond64
svarElapsed
maxSleepTime :: NanoSecond64
maxSleepTime = LatencyRange -> NanoSecond64
maxLatency LatencyRange
range forall a. Num a => a -> a -> a
- NanoSecond64
wLatency
s :: NanoSecond64
s = forall a. Ord a => a -> a -> a
min NanoSecond64
sleepTime NanoSecond64
maxSleepTime
in forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
sleepTime forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) forall a b. (a -> b) -> a -> b
$
if NanoSecond64
s forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then NanoSecond64 -> Work
BlockWait NanoSecond64
s else Int -> Count -> Work
ManyWorkers Int
1 (Int64 -> Count
Count Int64
0)
where
withLimit :: p -> p
withLimit p
n =
case Limit
workerLimit of
Limit
Unlimited -> p
n
Limited Word
x -> forall a. Ord a => a -> a -> a
min p
n (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
x)
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
yinfo = do
let cur :: IORef (Count, Count, NanoSecond64)
cur = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
col :: IORef (Count, Count, NanoSecond64)
col = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency YieldRateInfo
yinfo
longTerm :: IORef (Count, AbsTime)
longTerm = YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
measured :: IORef NanoSecond64
measured = YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency YieldRateInfo
yinfo
(Count
curTotalCount, Count
curCount, NanoSecond64
curTime) <- forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
cur
(Count
colTotalCount, Count
colCount, NanoSecond64
colTime) <- forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
col
(Count
lcount, AbsTime
ltime) <- forall a. IORef a -> IO a
readIORef IORef (Count, AbsTime)
longTerm
NanoSecond64
prevLat <- forall a. IORef a -> IO a
readIORef IORef NanoSecond64
measured
let latCount :: Count
latCount = Count
colCount forall a. Num a => a -> a -> a
+ Count
curCount
latTime :: NanoSecond64
latTime = NanoSecond64
colTime forall a. Num a => a -> a -> a
+ NanoSecond64
curTime
totalCount :: Count
totalCount = Count
colTotalCount forall a. Num a => a -> a -> a
+ Count
curTotalCount
newLat :: NanoSecond64
newLat =
if Count
latCount forall a. Ord a => a -> a -> Bool
> Count
0 Bool -> Bool -> Bool
&& NanoSecond64
latTime forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
then let lat :: NanoSecond64
lat = NanoSecond64
latTime forall a. Integral a => a -> a -> a
`div` forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
latCount
in (NanoSecond64
lat forall a. Num a => a -> a -> a
+ NanoSecond64
prevLat) forall a. Integral a => a -> a -> a
`div` NanoSecond64
2
else NanoSecond64
prevLat
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
lcount forall a. Num a => a -> a -> a
+ Count
totalCount, AbsTime
ltime, NanoSecond64
newLat)
isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar t m a
sv YieldRateInfo
yinfo = do
(Count
count, AbsTime
tstamp, NanoSecond64
wLatency) <- YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
yinfo
AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
let duration :: NanoSecond64
duration = forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
tstamp
let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo
Count
gainLoss <- forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit SVar t m a
sv) Count
count Count
gainLoss NanoSecond64
duration
NanoSecond64
wLatency NanoSecond64
targetLat (YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
yinfo)
Int
cnt <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Work
work of
PartialWorker Count
_yields -> Int
cnt forall a. Ord a => a -> a -> Bool
> Int
1
ManyWorkers Int
n Count
_ -> Int
cnt forall a. Ord a => a -> a -> Bool
> Int
n
BlockWait NanoSecond64
_ -> Bool
True
dispatchWorkerPaced :: MonadAsync m => SVar t m a -> m Bool
dispatchWorkerPaced :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv = do
let yinfo :: YieldRateInfo
yinfo = forall a. (?callStack::CallStack) => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv
(Count
svarYields, NanoSecond64
svarElapsed, NanoSecond64
wLatency) <- do
AbsTime
now <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
(Count
yieldCount, AbsTime
baseTime, NanoSecond64
lat) <-
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
False
let elapsed :: NanoSecond64
elapsed = forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
baseTime
let latency :: NanoSecond64
latency =
if NanoSecond64
lat forall a. Eq a => a -> a -> Bool
== NanoSecond64
0
then
case YieldRateInfo -> Maybe NanoSecond64
workerBootstrapLatency YieldRateInfo
yinfo of
Maybe NanoSecond64
Nothing -> NanoSecond64
lat
Just NanoSecond64
t -> NanoSecond64
t
else NanoSecond64
lat
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
yieldCount, NanoSecond64
elapsed, NanoSecond64
latency)
if NanoSecond64
wLatency forall a. Eq a => a -> a -> Bool
== NanoSecond64
0
then forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do
let workerLimit :: Limit
workerLimit = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit SVar t m a
sv
let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo
let range :: LatencyRange
range = YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
yinfo
Count
gainLoss <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
gainLoss NanoSecond64
svarElapsed
NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range
case Work
work of
BlockWait NanoSecond64
s -> do
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
s forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
Bool
done <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ do
let us :: MicroSecond64
us = forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 NanoSecond64
s) :: MicroSecond64
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (forall a b. (Integral a, Num b) => a -> b
fromIntegral MicroSecond64
us)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
1 SVar t m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
PartialWorker Count
yields -> do
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Count
yields forall a. Ord a => a -> a -> Bool
> Count
0) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
forall {f :: * -> *}. MonadIO f => YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields
Bool
done <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
yields SVar t m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
ManyWorkers Int
netWorkers Count
yields -> do
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
netWorkers forall a. Ord a => a -> a -> Bool
>= Int
1) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Count
yields forall a. Ord a => a -> a -> Bool
>= Count
0) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
forall {f :: * -> *}. MonadIO f => YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields
let periodRef :: IORef Count
periodRef = YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo
ycnt :: Count
ycnt = forall a. Ord a => a -> a -> a
max Count
1 forall a b. (a -> b) -> a -> b
$ Count
yields forall a. Integral a => a -> a -> a
`div` forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
netWorkers
period :: Count
period = forall a. Ord a => a -> a -> a
min Count
ycnt (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)
Count
old <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef Count
periodRef
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
period forall a. Ord a => a -> a -> Bool
< Count
old) forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef Count
periodRef Count
period
Int
cnt <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
if Int
cnt forall a. Ord a => a -> a -> Bool
< Int
netWorkers
then do
let total :: Int
total = Int
netWorkers forall a. Num a => a -> a -> a
- Int
cnt
batch :: Int
batch = forall a. Ord a => a -> a -> a
max Int
1 forall a b. (a -> b) -> a -> b
$ forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$
NanoSecond64
minThreadDelay forall a. Integral a => a -> a -> a
`div` NanoSecond64
targetLat
forall {t}. (Eq t, Num t) => t -> m Bool
dispatchN (forall a. Ord a => a -> a -> a
min Int
total Int
batch)
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
where
updateGainedLostYields :: YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields = do
let buf :: Count
buf = forall a b. (Integral a, Num b) => a -> b
fromIntegral forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> Int
svarRateBuffer YieldRateInfo
yinfo
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
yields forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& forall a. Num a => a -> a
abs Count
yields forall a. Ord a => a -> a -> Bool
> Count
buf) forall a b. (a -> b) -> a -> b
$ do
let delta :: Count
delta =
if Count
yields forall a. Ord a => a -> a -> Bool
> Count
0
then Count
yields forall a. Num a => a -> a -> a
- Count
buf
else Count
yields forall a. Num a => a -> a -> a
+ Count
buf
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo) (forall a. Num a => a -> a -> a
+ Count
delta)
dispatchN :: t -> m Bool
dispatchN t
n =
if t
n forall a. Eq a => a -> a -> Bool
== t
0
then forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else do
Bool
r <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
0 SVar t m a
sv
if Bool
r
then t -> m Bool
dispatchN (t
n forall a. Num a => a -> a -> a
- t
1)
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
sendWorkerDelayPaced :: SVar t m a -> IO ()
sendWorkerDelayPaced :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
sendWorkerDelayPaced SVar t m a
_ = forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendWorkerDelay :: SVar t m a -> IO ()
sendWorkerDelay :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
sendWorkerDelay SVar t m a
_sv =
forall (m :: * -> *) a. Monad m => a -> m a
return ()
{-# NOINLINE sendWorkerWait #-}
sendWorkerWait
:: MonadAsync m
=> (SVar t m a -> IO ())
-> (SVar t m a -> m Bool)
-> SVar t m a
-> m ()
sendWorkerWait :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
delay SVar t m a -> m Bool
dispatch SVar t m a
sv = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO ()
delay SVar t m a
sv
([ChildEvent a]
_, Int
n) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> a
const Bool
True
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
storeLoadBarrier
Bool
canDoMore <- SVar t m a -> m Bool
dispatch SVar t m a
sv
if Bool
canDoMore
then forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
delay SVar t m a -> m Bool
dispatch SVar t m a
sv
else do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar t m a
sv String
"sendWorkerWait: nothing to do"
forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)
([ChildEvent a]
_, Int
len) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
delay SVar t m a -> m Bool
dispatch SVar t m a
sv
{-# INLINE readOutputQBasic #-}
readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic :: forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic IORef ([ChildEvent a], Int)
q = forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([ChildEvent a], Int)
q forall a b. (a -> b) -> a -> b
$ \([ChildEvent a], Int)
x -> (([],Int
0), ([ChildEvent a], Int)
x)
{-# INLINE readOutputQRaw #-}
readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv = do
([ChildEvent a]
list, Int
len) <- forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ do
let ref :: IORef Int
ref = SVarStats -> IORef Int
maxOutQSize forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv
Int
oqLen <- forall a. IORef a -> IO a
readIORef IORef Int
ref
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len forall a. Ord a => a -> a -> Bool
> Int
oqLen) forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef Int
ref Int
len
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a]
list, Int
len)
readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQBounded :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded SVar t m a
sv = do
([ChildEvent a]
list, Int
len) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
if Int
len forall a. Ord a => a -> a -> Bool
<= Int
0
then m [ChildEvent a]
blockingRead
else do
m ()
sendOneWorker
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
list
where
sendOneWorker :: m ()
sendOneWorker = do
Int
cnt <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
cnt forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$ do
Bool
done <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv)
{-# INLINE blockingRead #-}
blockingRead :: m [ChildEvent a]
blockingRead = do
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
sendWorkerDelay (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
0) SVar t m a
sv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a b. (a, b) -> a
fst forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv)
readOutputQPaced :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQPaced :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced SVar t m a
sv = do
([ChildEvent a]
list, Int
len) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
if Int
len forall a. Ord a => a -> a -> Bool
<= Int
0
then m [ChildEvent a]
blockingRead
else do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
list
where
{-# INLINE blockingRead #-}
blockingRead :: m [ChildEvent a]
blockingRead = do
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
sendWorkerDelayPaced forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a b. (a, b) -> a
fst forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv)
postProcessBounded :: MonadAsync m => SVar t m a -> m Bool
postProcessBounded :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded SVar t m a
sv = do
Bool
workersDone <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
if Bool
workersDone
then do
Bool
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
r
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
postProcessPaced :: MonadAsync m => SVar t m a -> m Bool
postProcessPaced :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced SVar t m a
sv = do
Bool
workersDone <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
if Bool
workersDone
then do
Bool
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) forall a b. (a -> b) -> a -> b
$ do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv
Bool
noWorker <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
noWorker forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
r
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st = do
let rateToLatency :: a -> a
rateToLatency a
r = if a
r forall a. Ord a => a -> a -> Bool
<= a
0 then forall a. Bounded a => a
maxBound else forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ a
1.0e9 forall a. Fractional a => a -> a -> a
/ a
r
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State t m a
st of
Just (Rate Double
low Double
goal Double
high Int
buf) ->
let l :: NanoSecond64
l = forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
goal
minl :: NanoSecond64
minl = forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
high
maxl :: NanoSecond64
maxl = forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
low
in NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
l (NanoSecond64 -> NanoSecond64 -> LatencyRange
LatencyRange NanoSecond64
minl NanoSecond64
maxl) Int
buf
Maybe Rate
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
where
mkYieldRateInfo :: NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
latency LatencyRange
latRange Int
buf = do
IORef NanoSecond64
measured <- forall a. a -> IO (IORef a)
newIORef NanoSecond64
0
IORef (Count, Count, NanoSecond64)
wcur <- forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
IORef (Count, Count, NanoSecond64)
wcol <- forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Count, AbsTime)
wlong <- forall a. a -> IO (IORef a)
newIORef (Count
0,AbsTime
now)
IORef Count
period <- forall a. a -> IO (IORef a)
newIORef Count
1
IORef Count
gainLoss <- forall a. a -> IO (IORef a)
newIORef (Int64 -> Count
Count Int64
0)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just YieldRateInfo
{ svarLatencyTarget :: NanoSecond64
svarLatencyTarget = NanoSecond64
latency
, svarLatencyRange :: LatencyRange
svarLatencyRange = LatencyRange
latRange
, svarRateBuffer :: Int
svarRateBuffer = Int
buf
, svarGainedLostYields :: IORef Count
svarGainedLostYields = IORef Count
gainLoss
, workerBootstrapLatency :: Maybe NanoSecond64
workerBootstrapLatency = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
getStreamLatency State t m a
st
, workerPollingInterval :: IORef Count
workerPollingInterval = IORef Count
period
, workerMeasuredLatency :: IORef NanoSecond64
workerMeasuredLatency = IORef NanoSecond64
measured
, workerPendingLatency :: IORef (Count, Count, NanoSecond64)
workerPendingLatency = IORef (Count, Count, NanoSecond64)
wcur
, workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
workerCollectedLatency = IORef (Count, Count, NanoSecond64)
wcol
, svarAllTimeLatency :: IORef (Count, AbsTime)
svarAllTimeLatency = IORef (Count, AbsTime)
wlong
}
newSVarStats :: IO SVarStats
newSVarStats :: IO SVarStats
newSVarStats = do
IORef Int
disp <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxWrk <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxOq <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxHs <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxWq <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef (Count, NanoSecond64)
avgLat <- forall a. a -> IO (IORef a)
newIORef (Count
0, Int64 -> NanoSecond64
NanoSecond64 Int64
0)
IORef NanoSecond64
maxLat <- forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
IORef NanoSecond64
minLat <- forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
IORef (Maybe AbsTime)
stpTime <- forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing
forall (m :: * -> *) a. Monad m => a -> m a
return SVarStats
{ totalDispatches :: IORef Int
totalDispatches = IORef Int
disp
, maxWorkers :: IORef Int
maxWorkers = IORef Int
maxWrk
, maxOutQSize :: IORef Int
maxOutQSize = IORef Int
maxOq
, maxHeapSize :: IORef Int
maxHeapSize = IORef Int
maxHs
, maxWorkQSize :: IORef Int
maxWorkQSize = IORef Int
maxWq
, avgWorkerLatency :: IORef (Count, NanoSecond64)
avgWorkerLatency = IORef (Count, NanoSecond64)
avgLat
, minWorkerLatency :: IORef NanoSecond64
minWorkerLatency = IORef NanoSecond64
minLat
, maxWorkerLatency :: IORef NanoSecond64
maxWorkerLatency = IORef NanoSecond64
maxLat
, svarStopTime :: IORef (Maybe AbsTime)
svarStopTime = IORef (Maybe AbsTime)
stpTime
}
getAheadSVar :: MonadAsync m
=> State t m a
-> ( IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar State t m a
st IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
f RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH <- forall a. a -> IO (IORef a)
newIORef (forall a. Heap a
H.empty, forall a. a -> Maybe a
Just Int
0)
MVar ()
outQMv <- forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- forall a. a -> IO (IORef a)
newIORef forall a. Set a
S.empty
IORef ([t m a], Int)
q <- forall a. a -> IO (IORef a)
newIORef ([], -Int
1)
MVar ()
stopMVar <- forall a. a -> IO (MVar a)
newMVar ()
Maybe (IORef Count)
yl <- case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
Maybe Count
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just Count
x -> forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let getSVar :: SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv SVar t m a -> m [ChildEvent a]
readOutput SVar t m a -> m Bool
postProc = SVar
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = forall a. (?callStack::CallStack) => a
undefined
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = forall a. (?callStack::CallStack) => a
undefined
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = forall a. (?callStack::CallStack) => a
undefined
, pushBufferMVar :: MVar ()
pushBufferMVar = forall a. (?callStack::CallStack) => a
undefined
, maxWorkerLimit :: Limit
maxWorkerLimit = forall a. Ord a => a -> a -> a
min (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State t m a
st) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = forall a. (?callStack::CallStack) => a
undefined
, readOutputQ :: m [ChildEvent a]
readOutputQ = SVar t m a -> m [ChildEvent a]
readOutput SVar t m a
sv
, postProcess :: m Bool
postProcess = SVar t m a -> m Bool
postProc SVar t m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
f IORef ([t m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH State t m a
st{streamVar :: Maybe (SVar t m a)
streamVar = forall a. a -> Maybe a
Just SVar t m a
sv} SVar t m a
sv
, enqueue :: t m a -> IO ()
enqueue = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
enqueueAhead SVar t m a
sv IORef ([t m a], Int)
q
, isWorkDone :: IO Bool
isWorkDone = forall {t :: * -> *} {t :: (* -> *) -> * -> *} {m :: * -> *} {a}
{a} {b} {a} {b}.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead SVar t m a
sv IORef ([t m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH
, isQueueDone :: IO Bool
isQueueDone = forall {t :: * -> *} {t :: (* -> *) -> * -> *} {m :: * -> *} {a}
{a} {b}.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef ([t m a], Int)
q
, needDoorBell :: IORef Bool
needDoorBell = IORef Bool
wfw
, svarStyle :: SVarStyle
svarStyle = SVarStyle
AheadVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
StopNone
, svarStopBy :: IORef ThreadId
svarStopBy = forall a. (?callStack::CallStack) => a
undefined
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar t m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = MVar ()
stopMVar
, svarRef :: Maybe (IORef ())
svarRef = forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State t m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([t m a], Int)
aheadWorkQueue = IORef ([t m a], Int)
q
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap = IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: SVar t m a
sv =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State t m a
st of
Maybe Rate
Nothing -> SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
Just Rate
_ -> SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
in forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
sv
where
{-# INLINE isQueueDoneAhead #-}
isQueueDoneAhead :: SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef (t a, b)
q = do
Bool
queueDone <- forall {t :: * -> *} {a} {b}.
Foldable t =>
IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q
Bool
yieldsDone <-
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Just IORef Count
yref -> do
Count
n <- forall a. IORef a -> IO a
readIORef IORef Count
yref
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
yieldsDone Bool -> Bool -> Bool
|| Bool
queueDone
{-# INLINE isWorkDoneAhead #-}
isWorkDoneAhead :: SVar t m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead SVar t m a
sv IORef (t a, b)
q IORef (Heap a, b)
ref = do
Bool
heapDone <- do
(Heap a
hp, b
_) <- forall a. IORef a -> IO a
readIORef IORef (Heap a, b)
ref
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. Heap a -> Int
H.size Heap a
hp forall a. Ord a => a -> a -> Bool
<= Int
0)
Bool
queueDone <- forall {t :: * -> *} {t :: (* -> *) -> * -> *} {m :: * -> *} {a}
{a} {b}.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef (t a, b)
q
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
heapDone Bool -> Bool -> Bool
&& Bool
queueDone
checkEmpty :: IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q = do
(t a
xs, b
_) <- forall a. IORef a -> IO a
readIORef IORef (t a, b)
q
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t a -> Bool
null t a
xs
getParallelSVar :: MonadIO m
=> SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar SVarStopStyle
ss State t m a
st RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
IORef ([ChildEvent a], Int)
outQRev <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- forall a. IO (MVar a)
newEmptyMVar
MVar ()
outQMvRev <- forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef (Set ThreadId)
running <- forall a. a -> IO (IORef a)
newIORef forall a. Set a
S.empty
Maybe (IORef Count)
yl <- case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
Maybe Count
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just Count
x -> forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st
let bufLim :: Count
bufLim =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st of
Limit
Unlimited -> forall a. (?callStack::CallStack) => a
undefined
Limited Word
x -> (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
x)
IORef Count
remBuf <- forall a. a -> IO (IORef a)
newIORef Count
bufLim
MVar ()
pbMVar <- forall a. a -> IO (MVar a)
newMVar ()
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
IORef ThreadId
stopBy <-
case SVarStopStyle
ss of
SVarStopStyle
StopBy -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a. (?callStack::CallStack) => a
undefined
SVarStopStyle
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. (?callStack::CallStack) => a
undefined
let sv :: SVar t m a
sv =
SVar { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
outQRev
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = IORef Count
remBuf
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
PushBufferBlock
, pushBufferMVar :: MVar ()
pushBufferMVar = MVar ()
pbMVar
, maxWorkerLimit :: Limit
maxWorkerLimit = Limit
Unlimited
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
outQMvRev
, readOutputQ :: m [ChildEvent a]
readOutputQ = forall {m :: * -> *} {t :: (* -> *) -> * -> *} {m :: * -> *} {a}.
MonadIO m =>
SVar t m a -> m [ChildEvent a]
readOutputQPar SVar t m a
sv
, postProcess :: m Bool
postProcess = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = forall a. (?callStack::CallStack) => a
undefined
, enqueue :: t m a -> IO ()
enqueue = forall a. (?callStack::CallStack) => a
undefined
, isWorkDone :: IO Bool
isWorkDone = forall a. (?callStack::CallStack) => a
undefined
, isQueueDone :: IO Bool
isQueueDone = forall a. (?callStack::CallStack) => a
undefined
, needDoorBell :: IORef Bool
needDoorBell = forall a. (?callStack::CallStack) => a
undefined
, svarStyle :: SVarStyle
svarStyle = SVarStyle
ParallelVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
ss
, svarStopBy :: IORef ThreadId
svarStopBy = IORef ThreadId
stopBy
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = forall a. (?callStack::CallStack) => a
undefined
, svarRef :: Maybe (IORef ())
svarRef = forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State t m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([t m a], Int)
aheadWorkQueue = forall a. (?callStack::CallStack) => a
undefined
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap = forall a. (?callStack::CallStack) => a
undefined
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
in forall (m :: * -> *) a. Monad m => a -> m a
return forall {t :: (* -> *) -> * -> *}. SVar t m a
sv
where
readOutputQPar :: SVar t m a -> m [ChildEvent a]
readOutputQPar SVar t m a
sv = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar t m a
sv String
"readOutputQPar: doorbell"
forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just YieldRateInfo
yinfo -> forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
False
[ChildEvent a]
r <- forall a b. (a, b) -> a
fst forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO (Maybe a)
tryTakeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
resetBufferLimit SVar t m a
sv
IO ()
writeBarrier
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
r
sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar t m a
sv t m a
m = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> t m a -> IO ()
enqueue SVar t m a
sv t m a
m
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
Just YieldRateInfo
yinfo ->
if YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo forall a. Eq a => a -> a -> Bool
== forall a. Bounded a => a
maxBound
then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay forall a. Bounded a => a
maxBound
else forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
1 SVar t m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
sv
{-# INLINABLE newAheadVar #-}
newAheadVar :: MonadAsync m
=> State t m a
-> t m a
-> ( IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar t m a)
newAheadVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar t m a)
newAheadVar State t m a
st t m a
m IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
wloop = do
RunInIO m
mrun <- forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
SVar t m a
sv <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar State t m a
st IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
wloop RunInIO m
mrun
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar t m a
sv t m a
m
{-# INLINABLE newParallelVar #-}
newParallelVar :: MonadAsync m
=> SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
ss State t m a
st = do
RunInIO m
mrun <- forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar SVarStopStyle
ss State t m a
st RunInIO m
mrun
toStreamVar :: MonadAsync m => SVar t m a -> t m a -> m ()
toStreamVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m ()
toStreamVar SVar t m a
sv t m a
m = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> t m a -> IO ()
enqueue SVar t m a
sv t m a
m
Bool
done <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done forall a b. (a -> b) -> a -> b
$
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
Just YieldRateInfo
_ -> forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
1 SVar t m a
sv