{-# LANGUAGE UnboxedTuples #-}
module Streamly.Internal.Data.SVar
(
MonadAsync
, SVarStyle (..)
, SVarStopStyle (..)
, SVar (..)
, Limit (..)
, State (streamVar)
, defState
, adaptState
, getMaxThreads
, setMaxThreads
, getMaxBuffer
, setMaxBuffer
, getStreamRate
, setStreamRate
, setStreamLatency
, getYieldLimit
, setYieldLimit
, getInspectMode
, setInspectMode
, recordMaxWorkers
, cleanupSVar
, cleanupSVarFromWorker
, newAheadVar
, newParallelVar
, captureMonadState
, RunInIO (..)
, WorkerInfo (..)
, YieldRateInfo (..)
, ThreadAbort (..)
, ChildEvent (..)
, AheadHeapEntry (..)
, send
, sendToProducer
, sendYield
, sendStop
, sendStopToProducer
, enqueueLIFO
, enqueueFIFO
, enqueueAhead
, reEnqueueAhead
, pushWorkerPar
, handleChildException
, handleFoldException
, queueEmptyAhead
, dequeueAhead
, HeapDequeueResult(..)
, dequeueFromHeap
, dequeueFromHeapSeq
, requeueOnHeapTop
, updateHeapSeq
, withIORef
, heapIsSane
, Rate (..)
, getYieldRateInfo
, newSVarStats
, collectLatency
, workerUpdateLatency
, isBeyondMaxRate
, workerRateControl
, updateYieldCount
, decrementYieldLimit
, incrementYieldLimit
, decrementBufferLimit
, incrementBufferLimit
, postProcessBounded
, postProcessPaced
, readOutputQBounded
, readOutputQPaced
, readOutputQBasic
, dispatchWorkerPaced
, sendFirstWorker
, delThread
, modifyThread
, doFork
, fork
, forkManaged
, toStreamVar
, SVarStats (..)
, dumpSVar
, printSVar
, withDiagMVar
)
where
import Control.Concurrent (ThreadId, myThreadId, threadDelay, throwTo)
import Control.Concurrent.MVar
(MVar, newEmptyMVar, tryPutMVar, takeMVar, tryTakeMVar, newMVar,
tryReadMVar)
import Control.Exception
(SomeException(..), assert, Exception, catches,
throwIO, Handler(..), BlockedIndefinitelyOnMVar(..),
BlockedIndefinitelyOnSTM(..))
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control (MonadBaseControl, control)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier,
storeLoadBarrier)
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, pushL)
import Data.Functor (void)
import Data.Heap (Heap, Entry(..))
import Data.Int (Int64)
import Data.IORef
(IORef, modifyIORef, newIORef, readIORef, writeIORef, atomicModifyIORef)
import Data.Kind (Type)
import Data.Maybe (fromJust, fromMaybe)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup ((<>))
#endif
import Data.Set (Set)
import System.IO (hPutStrLn, stderr)
import Streamly.Internal.Control.Concurrent
(MonadAsync, RunInIO(..), doFork, fork, forkManaged)
import Streamly.Internal.Data.Time.Clock.Type (Clock(..), getTime)
import Streamly.Internal.Data.Time.Units
(AbsTime, NanoSecond64(..), MicroSecond64(..), diffAbsTime64,
fromRelTime64, toRelTime64, showNanoSecond64, showRelTime64)
import qualified Data.Heap as H
import qualified Data.Set as S
newtype Count = Count Int64
deriving ( Count -> Count -> Bool
(Count -> Count -> Bool) -> (Count -> Count -> Bool) -> Eq Count
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Count -> Count -> Bool
$c/= :: Count -> Count -> Bool
== :: Count -> Count -> Bool
$c== :: Count -> Count -> Bool
Eq
, ReadPrec [Count]
ReadPrec Count
Int -> ReadS Count
ReadS [Count]
(Int -> ReadS Count)
-> ReadS [Count]
-> ReadPrec Count
-> ReadPrec [Count]
-> Read Count
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [Count]
$creadListPrec :: ReadPrec [Count]
readPrec :: ReadPrec Count
$creadPrec :: ReadPrec Count
readList :: ReadS [Count]
$creadList :: ReadS [Count]
readsPrec :: Int -> ReadS Count
$creadsPrec :: Int -> ReadS Count
Read
, Int -> Count -> ShowS
[Count] -> ShowS
Count -> String
(Int -> Count -> ShowS)
-> (Count -> String) -> ([Count] -> ShowS) -> Show Count
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Count] -> ShowS
$cshowList :: [Count] -> ShowS
show :: Count -> String
$cshow :: Count -> String
showsPrec :: Int -> Count -> ShowS
$cshowsPrec :: Int -> Count -> ShowS
Show
, Int -> Count
Count -> Int
Count -> [Count]
Count -> Count
Count -> Count -> [Count]
Count -> Count -> Count -> [Count]
(Count -> Count)
-> (Count -> Count)
-> (Int -> Count)
-> (Count -> Int)
-> (Count -> [Count])
-> (Count -> Count -> [Count])
-> (Count -> Count -> [Count])
-> (Count -> Count -> Count -> [Count])
-> Enum Count
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: Count -> Count -> Count -> [Count]
$cenumFromThenTo :: Count -> Count -> Count -> [Count]
enumFromTo :: Count -> Count -> [Count]
$cenumFromTo :: Count -> Count -> [Count]
enumFromThen :: Count -> Count -> [Count]
$cenumFromThen :: Count -> Count -> [Count]
enumFrom :: Count -> [Count]
$cenumFrom :: Count -> [Count]
fromEnum :: Count -> Int
$cfromEnum :: Count -> Int
toEnum :: Int -> Count
$ctoEnum :: Int -> Count
pred :: Count -> Count
$cpred :: Count -> Count
succ :: Count -> Count
$csucc :: Count -> Count
Enum
, Count
Count -> Count -> Bounded Count
forall a. a -> a -> Bounded a
maxBound :: Count
$cmaxBound :: Count
minBound :: Count
$cminBound :: Count
Bounded
, Integer -> Count
Count -> Count
Count -> Count -> Count
(Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count)
-> (Count -> Count)
-> (Count -> Count)
-> (Integer -> Count)
-> Num Count
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> Count
$cfromInteger :: Integer -> Count
signum :: Count -> Count
$csignum :: Count -> Count
abs :: Count -> Count
$cabs :: Count -> Count
negate :: Count -> Count
$cnegate :: Count -> Count
* :: Count -> Count -> Count
$c* :: Count -> Count -> Count
- :: Count -> Count -> Count
$c- :: Count -> Count -> Count
+ :: Count -> Count -> Count
$c+ :: Count -> Count -> Count
Num
, Num Count
Ord Count
Num Count -> Ord Count -> (Count -> Rational) -> Real Count
Count -> Rational
forall a. Num a -> Ord a -> (a -> Rational) -> Real a
toRational :: Count -> Rational
$ctoRational :: Count -> Rational
$cp2Real :: Ord Count
$cp1Real :: Num Count
Real
, Enum Count
Real Count
Real Count
-> Enum Count
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> (Count, Count))
-> (Count -> Count -> (Count, Count))
-> (Count -> Integer)
-> Integral Count
Count -> Integer
Count -> Count -> (Count, Count)
Count -> Count -> Count
forall a.
Real a
-> Enum a
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> (a, a))
-> (a -> a -> (a, a))
-> (a -> Integer)
-> Integral a
toInteger :: Count -> Integer
$ctoInteger :: Count -> Integer
divMod :: Count -> Count -> (Count, Count)
$cdivMod :: Count -> Count -> (Count, Count)
quotRem :: Count -> Count -> (Count, Count)
$cquotRem :: Count -> Count -> (Count, Count)
mod :: Count -> Count -> Count
$cmod :: Count -> Count -> Count
div :: Count -> Count -> Count
$cdiv :: Count -> Count -> Count
rem :: Count -> Count -> Count
$crem :: Count -> Count -> Count
quot :: Count -> Count -> Count
$cquot :: Count -> Count -> Count
$cp2Integral :: Enum Count
$cp1Integral :: Real Count
Integral
, Eq Count
Eq Count
-> (Count -> Count -> Ordering)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> Ord Count
Count -> Count -> Bool
Count -> Count -> Ordering
Count -> Count -> Count
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Count -> Count -> Count
$cmin :: Count -> Count -> Count
max :: Count -> Count -> Count
$cmax :: Count -> Count -> Count
>= :: Count -> Count -> Bool
$c>= :: Count -> Count -> Bool
> :: Count -> Count -> Bool
$c> :: Count -> Count -> Bool
<= :: Count -> Count -> Bool
$c<= :: Count -> Count -> Bool
< :: Count -> Count -> Bool
$c< :: Count -> Count -> Bool
compare :: Count -> Count -> Ordering
$ccompare :: Count -> Count -> Ordering
$cp1Ord :: Eq Count
Ord
)
data ThreadAbort = ThreadAbort deriving Int -> ThreadAbort -> ShowS
[ThreadAbort] -> ShowS
ThreadAbort -> String
(Int -> ThreadAbort -> ShowS)
-> (ThreadAbort -> String)
-> ([ThreadAbort] -> ShowS)
-> Show ThreadAbort
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ThreadAbort] -> ShowS
$cshowList :: [ThreadAbort] -> ShowS
show :: ThreadAbort -> String
$cshow :: ThreadAbort -> String
showsPrec :: Int -> ThreadAbort -> ShowS
$cshowsPrec :: Int -> ThreadAbort -> ShowS
Show
instance Exception ThreadAbort
data ChildEvent a =
ChildYield a
| ChildStop ThreadId (Maybe SomeException)
data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a =
AheadEntryNull
| AheadEntryPure a
| AheadEntryStream (RunInIO m, t m a)
#undef Type
data SVarStyle =
AsyncVar
| WAsyncVar
| ParallelVar
| AheadVar
deriving (SVarStyle -> SVarStyle -> Bool
(SVarStyle -> SVarStyle -> Bool)
-> (SVarStyle -> SVarStyle -> Bool) -> Eq SVarStyle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SVarStyle -> SVarStyle -> Bool
$c/= :: SVarStyle -> SVarStyle -> Bool
== :: SVarStyle -> SVarStyle -> Bool
$c== :: SVarStyle -> SVarStyle -> Bool
Eq, Int -> SVarStyle -> ShowS
[SVarStyle] -> ShowS
SVarStyle -> String
(Int -> SVarStyle -> ShowS)
-> (SVarStyle -> String)
-> ([SVarStyle] -> ShowS)
-> Show SVarStyle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SVarStyle] -> ShowS
$cshowList :: [SVarStyle] -> ShowS
show :: SVarStyle -> String
$cshow :: SVarStyle -> String
showsPrec :: Int -> SVarStyle -> ShowS
$cshowsPrec :: Int -> SVarStyle -> ShowS
Show)
data WorkerInfo = WorkerInfo
{ WorkerInfo -> Count
workerYieldMax :: Count
, WorkerInfo -> IORef Count
workerYieldCount :: IORef Count
, WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart :: IORef (Count, AbsTime)
}
data Rate = Rate
{ Rate -> Double
rateLow :: Double
, Rate -> Double
rateGoal :: Double
, Rate -> Double
rateHigh :: Double
, Rate -> Int
rateBuffer :: Int
}
data LatencyRange = LatencyRange
{ LatencyRange -> NanoSecond64
minLatency :: NanoSecond64
, LatencyRange -> NanoSecond64
maxLatency :: NanoSecond64
} deriving Int -> LatencyRange -> ShowS
[LatencyRange] -> ShowS
LatencyRange -> String
(Int -> LatencyRange -> ShowS)
-> (LatencyRange -> String)
-> ([LatencyRange] -> ShowS)
-> Show LatencyRange
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [LatencyRange] -> ShowS
$cshowList :: [LatencyRange] -> ShowS
show :: LatencyRange -> String
$cshow :: LatencyRange -> String
showsPrec :: Int -> LatencyRange -> ShowS
$cshowsPrec :: Int -> LatencyRange -> ShowS
Show
data YieldRateInfo = YieldRateInfo
{ YieldRateInfo -> NanoSecond64
svarLatencyTarget :: NanoSecond64
, YieldRateInfo -> LatencyRange
svarLatencyRange :: LatencyRange
, YieldRateInfo -> Int
svarRateBuffer :: Int
, YieldRateInfo -> IORef Count
svarGainedLostYields :: IORef Count
, YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency :: IORef (Count, AbsTime)
, YieldRateInfo -> Maybe NanoSecond64
workerBootstrapLatency :: Maybe NanoSecond64
, YieldRateInfo -> IORef Count
workerPollingInterval :: IORef Count
, YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency :: IORef (Count, Count, NanoSecond64)
, YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
, YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency :: IORef NanoSecond64
}
data SVarStats = SVarStats {
SVarStats -> IORef Int
totalDispatches :: IORef Int
, SVarStats -> IORef Int
maxWorkers :: IORef Int
, SVarStats -> IORef Int
maxOutQSize :: IORef Int
, SVarStats -> IORef Int
maxHeapSize :: IORef Int
, SVarStats -> IORef Int
maxWorkQSize :: IORef Int
, SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency :: IORef (Count, NanoSecond64)
, SVarStats -> IORef NanoSecond64
minWorkerLatency :: IORef NanoSecond64
, SVarStats -> IORef NanoSecond64
maxWorkerLatency :: IORef NanoSecond64
, SVarStats -> IORef (Maybe AbsTime)
svarStopTime :: IORef (Maybe AbsTime)
}
data Limit = Unlimited | Limited Word deriving Int -> Limit -> ShowS
[Limit] -> ShowS
Limit -> String
(Int -> Limit -> ShowS)
-> (Limit -> String) -> ([Limit] -> ShowS) -> Show Limit
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Limit] -> ShowS
$cshowList :: [Limit] -> ShowS
show :: Limit -> String
$cshow :: Limit -> String
showsPrec :: Int -> Limit -> ShowS
$cshowsPrec :: Int -> Limit -> ShowS
Show
instance Eq Limit where
Limit
Unlimited == :: Limit -> Limit -> Bool
== Limit
Unlimited = Bool
True
Limit
Unlimited == Limited Word
_ = Bool
False
Limited Word
_ == Limit
Unlimited = Bool
False
Limited Word
x == Limited Word
y = Word
x Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
y
instance Ord Limit where
Limit
Unlimited <= :: Limit -> Limit -> Bool
<= Limit
Unlimited = Bool
True
Limit
Unlimited <= Limited Word
_ = Bool
False
Limited Word
_ <= Limit
Unlimited = Bool
True
Limited Word
x <= Limited Word
y = Word
x Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
<= Word
y
data SVarStopStyle =
StopNone
| StopAny
| StopBy
deriving (SVarStopStyle -> SVarStopStyle -> Bool
(SVarStopStyle -> SVarStopStyle -> Bool)
-> (SVarStopStyle -> SVarStopStyle -> Bool) -> Eq SVarStopStyle
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SVarStopStyle -> SVarStopStyle -> Bool
$c/= :: SVarStopStyle -> SVarStopStyle -> Bool
== :: SVarStopStyle -> SVarStopStyle -> Bool
$c== :: SVarStopStyle -> SVarStopStyle -> Bool
Eq, Int -> SVarStopStyle -> ShowS
[SVarStopStyle] -> ShowS
SVarStopStyle -> String
(Int -> SVarStopStyle -> ShowS)
-> (SVarStopStyle -> String)
-> ([SVarStopStyle] -> ShowS)
-> Show SVarStopStyle
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SVarStopStyle] -> ShowS
$cshowList :: [SVarStopStyle] -> ShowS
show :: SVarStopStyle -> String
$cshow :: SVarStopStyle -> String
showsPrec :: Int -> SVarStopStyle -> ShowS
$cshowsPrec :: Int -> SVarStopStyle -> ShowS
Show)
data PushBufferPolicy =
PushBufferDropNew
| PushBufferDropOld
| PushBufferBlock
data SVar t m a = SVar
{
SVar t m a -> SVarStyle
svarStyle :: SVarStyle
, SVar t m a -> RunInIO m
svarMrun :: RunInIO m
, SVar t m a -> SVarStopStyle
svarStopStyle :: SVarStopStyle
, SVar t m a -> IORef ThreadId
svarStopBy :: IORef ThreadId
, SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue :: IORef ([ChildEvent a], Int)
, SVar t m a -> MVar ()
outputDoorBell :: MVar ()
, SVar t m a -> m [ChildEvent a]
readOutputQ :: m [ChildEvent a]
, SVar t m a -> m Bool
postProcess :: m Bool
, SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
, SVar t m a -> MVar ()
outputDoorBellFromConsumer :: MVar ()
, SVar t m a -> Limit
maxWorkerLimit :: Limit
, SVar t m a -> Limit
maxBufferLimit :: Limit
, SVar t m a -> IORef Count
pushBufferSpace :: IORef Count
, SVar t m a -> PushBufferPolicy
pushBufferPolicy :: PushBufferPolicy
, SVar t m a -> MVar ()
pushBufferMVar :: MVar ()
, SVar t m a -> Maybe (IORef Count)
remainingWork :: Maybe (IORef Count)
, SVar t m a -> Maybe YieldRateInfo
yieldRateInfo :: Maybe YieldRateInfo
, SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue :: (RunInIO m, t m a) -> IO ()
, SVar t m a -> IO Bool
isWorkDone :: IO Bool
, SVar t m a -> IO Bool
isQueueDone :: IO Bool
, SVar t m a -> IORef Bool
needDoorBell :: IORef Bool
, SVar t m a -> Maybe WorkerInfo -> m ()
workLoop :: Maybe WorkerInfo -> m ()
, SVar t m a -> IORef (Set ThreadId)
workerThreads :: IORef (Set ThreadId)
, SVar t m a -> IORef Int
workerCount :: IORef Int
, SVar t m a -> ThreadId -> m ()
accountThread :: ThreadId -> m ()
, SVar t m a -> MVar ()
workerStopMVar :: MVar ()
, SVar t m a -> SVarStats
svarStats :: SVarStats
, SVar t m a -> Maybe (IORef ())
svarRef :: Maybe (IORef ())
, SVar t m a -> Bool
svarInspectMode :: Bool
, SVar t m a -> ThreadId
svarCreator :: ThreadId
, SVar t m a
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap :: IORef ( Heap (Entry Int (AheadHeapEntry t m a))
, Maybe Int)
, SVar t m a -> IORef ([t m a], Int)
aheadWorkQueue :: IORef ([t m a], Int)
}
data State t m a = State
{
State t m a -> Maybe (SVar t m a)
streamVar :: Maybe (SVar t m a)
, State t m a -> Maybe Count
_yieldLimit :: Maybe Count
, State t m a -> Limit
_threadsHigh :: Limit
, State t m a -> Limit
_bufferHigh :: Limit
, State t m a -> Maybe NanoSecond64
_streamLatency :: Maybe NanoSecond64
, State t m a -> Maybe Rate
_maxStreamRate :: Maybe Rate
, State t m a -> Bool
_inspectMode :: Bool
}
magicMaxBuffer :: Word
magicMaxBuffer :: Word
magicMaxBuffer = Word
1500
defaultMaxThreads, defaultMaxBuffer :: Limit
defaultMaxThreads :: Limit
defaultMaxThreads = Word -> Limit
Limited Word
magicMaxBuffer
defaultMaxBuffer :: Limit
defaultMaxBuffer = Word -> Limit
Limited Word
magicMaxBuffer
defState :: State t m a
defState :: State t m a
defState = State :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Maybe (SVar t m a)
-> Maybe Count
-> Limit
-> Limit
-> Maybe NanoSecond64
-> Maybe Rate
-> Bool
-> State t m a
State
{ streamVar :: Maybe (SVar t m a)
streamVar = Maybe (SVar t m a)
forall a. Maybe a
Nothing
, _yieldLimit :: Maybe Count
_yieldLimit = Maybe Count
forall a. Maybe a
Nothing
, _threadsHigh :: Limit
_threadsHigh = Limit
defaultMaxThreads
, _bufferHigh :: Limit
_bufferHigh = Limit
defaultMaxBuffer
, _maxStreamRate :: Maybe Rate
_maxStreamRate = Maybe Rate
forall a. Maybe a
Nothing
, _streamLatency :: Maybe NanoSecond64
_streamLatency = Maybe NanoSecond64
forall a. Maybe a
Nothing
, _inspectMode :: Bool
_inspectMode = Bool
False
}
adaptState :: State t m a -> State t n b
adaptState :: State t m a -> State t n b
adaptState State t m a
st = State t m a
st
{ streamVar :: Maybe (SVar t n b)
streamVar = Maybe (SVar t n b)
forall a. Maybe a
Nothing
, _yieldLimit :: Maybe Count
_yieldLimit = Maybe Count
forall a. Maybe a
Nothing
}
setYieldLimit :: Maybe Int64 -> State t m a -> State t m a
setYieldLimit :: Maybe Int64 -> State t m a -> State t m a
setYieldLimit Maybe Int64
lim State t m a
st =
State t m a
st { _yieldLimit :: Maybe Count
_yieldLimit =
case Maybe Int64
lim of
Maybe Int64
Nothing -> Maybe Count
forall a. Maybe a
Nothing
Just Int64
n ->
if Int64
n Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int64
0
then Count -> Maybe Count
forall a. a -> Maybe a
Just Count
0
else Count -> Maybe Count
forall a. a -> Maybe a
Just (Int64 -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
n)
}
getYieldLimit :: State t m a -> Maybe Count
getYieldLimit :: State t m a -> Maybe Count
getYieldLimit = State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
_yieldLimit
setMaxThreads :: Int -> State t m a -> State t m a
setMaxThreads :: Int -> State t m a -> State t m a
setMaxThreads Int
n State t m a
st =
State t m a
st { _threadsHigh :: Limit
_threadsHigh =
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0
then Limit
Unlimited
else if Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then Limit
defaultMaxThreads
else Word -> Limit
Limited (Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
}
getMaxThreads :: State t m a -> Limit
getMaxThreads :: State t m a -> Limit
getMaxThreads = State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
_threadsHigh
setMaxBuffer :: Int -> State t m a -> State t m a
setMaxBuffer :: Int -> State t m a -> State t m a
setMaxBuffer Int
n State t m a
st =
State t m a
st { _bufferHigh :: Limit
_bufferHigh =
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0
then Limit
Unlimited
else if Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then Limit
defaultMaxBuffer
else Word -> Limit
Limited (Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
}
getMaxBuffer :: State t m a -> Limit
getMaxBuffer :: State t m a -> Limit
getMaxBuffer = State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
_bufferHigh
setStreamRate :: Maybe Rate -> State t m a -> State t m a
setStreamRate :: Maybe Rate -> State t m a -> State t m a
setStreamRate Maybe Rate
r State t m a
st = State t m a
st { _maxStreamRate :: Maybe Rate
_maxStreamRate = Maybe Rate
r }
getStreamRate :: State t m a -> Maybe Rate
getStreamRate :: State t m a -> Maybe Rate
getStreamRate = State t m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
_maxStreamRate
setStreamLatency :: Int -> State t m a -> State t m a
setStreamLatency :: Int -> State t m a -> State t m a
setStreamLatency Int
n State t m a
st =
State t m a
st { _streamLatency :: Maybe NanoSecond64
_streamLatency =
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
then Maybe NanoSecond64
forall a. Maybe a
Nothing
else NanoSecond64 -> Maybe NanoSecond64
forall a. a -> Maybe a
Just (Int -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
}
getStreamLatency :: State t m a -> Maybe NanoSecond64
getStreamLatency :: State t m a -> Maybe NanoSecond64
getStreamLatency = State t m a -> Maybe NanoSecond64
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
_streamLatency
setInspectMode :: State t m a -> State t m a
setInspectMode :: State t m a -> State t m a
setInspectMode State t m a
st = State t m a
st { _inspectMode :: Bool
_inspectMode = Bool
True }
getInspectMode :: State t m a -> Bool
getInspectMode :: State t m a -> Bool
getInspectMode = State t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
_inspectMode
cleanupSVar :: SVar t m a -> IO ()
cleanupSVar :: SVar t m a -> IO ()
cleanupSVar SVar t m a
sv = do
Set ThreadId
workers <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv)
(ThreadId -> IO ()) -> Set ThreadId -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort)
Set ThreadId
workers
cleanupSVarFromWorker :: SVar t m a -> IO ()
cleanupSVarFromWorker :: SVar t m a -> IO ()
cleanupSVarFromWorker SVar t m a
sv = do
Set ThreadId
workers <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv)
ThreadId
self <- IO ThreadId
myThreadId
(ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort)
((ThreadId -> Bool) -> [ThreadId] -> [ThreadId]
forall a. (a -> Bool) -> [a] -> [a]
Prelude.filter (ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
/= ThreadId
self) ([ThreadId] -> [ThreadId]) -> [ThreadId] -> [ThreadId]
forall a b. (a -> b) -> a -> b
$ Set ThreadId -> [ThreadId]
forall a. Set a -> [a]
S.toList Set ThreadId
workers)
updateWorkerPollingInterval :: YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval :: YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval YieldRateInfo
yinfo NanoSecond64
latency = do
let periodRef :: IORef Count
periodRef = YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo
cnt :: NanoSecond64
cnt = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
max NanoSecond64
1 (NanoSecond64 -> NanoSecond64) -> NanoSecond64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ NanoSecond64
minThreadDelay NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
latency
period :: NanoSecond64
period = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min NanoSecond64
cnt (Word -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)
IORef Count -> Count -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Count
periodRef (NanoSecond64 -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
period)
{-# INLINE recordMinMaxLatency #-}
recordMinMaxLatency :: SVar t m a -> NanoSecond64 -> IO ()
recordMinMaxLatency :: SVar t m a -> NanoSecond64 -> IO ()
recordMinMaxLatency SVar t m a
sv NanoSecond64
new = do
let ss :: SVarStats
ss = SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv
NanoSecond64
minLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NanoSecond64
new NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
< NanoSecond64
minLat Bool -> Bool -> Bool
|| NanoSecond64
minLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IORef NanoSecond64 -> NanoSecond64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss) NanoSecond64
new
NanoSecond64
maxLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NanoSecond64
new NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
maxLat) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef NanoSecond64 -> NanoSecond64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss) NanoSecond64
new
recordAvgLatency :: SVar t m a -> (Count, NanoSecond64) -> IO ()
recordAvgLatency :: SVar t m a -> (Count, NanoSecond64) -> IO ()
recordAvgLatency SVar t m a
sv (Count
count, NanoSecond64
time) = do
let ss :: SVarStats
ss = SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv
IORef (Count, NanoSecond64)
-> ((Count, NanoSecond64) -> (Count, NanoSecond64)) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency SVarStats
ss) (((Count, NanoSecond64) -> (Count, NanoSecond64)) -> IO ())
-> ((Count, NanoSecond64) -> (Count, NanoSecond64)) -> IO ()
forall a b. (a -> b) -> a -> b
$
\(Count
cnt, NanoSecond64
t) -> (Count
cnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
count, NanoSecond64
t NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
time)
{-# INLINE collectWorkerPendingLatency #-}
collectWorkerPendingLatency
:: IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency :: IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency IORef (Count, Count, NanoSecond64)
cur IORef (Count, Count, NanoSecond64)
col = do
(Count
fcount, Count
count, NanoSecond64
time) <- IORef (Count, Count, NanoSecond64)
-> ((Count, Count, NanoSecond64)
-> ((Count, Count, NanoSecond64), (Count, Count, NanoSecond64)))
-> IO (Count, Count, NanoSecond64)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Count, Count, NanoSecond64)
cur (((Count, Count, NanoSecond64)
-> ((Count, Count, NanoSecond64), (Count, Count, NanoSecond64)))
-> IO (Count, Count, NanoSecond64))
-> ((Count, Count, NanoSecond64)
-> ((Count, Count, NanoSecond64), (Count, Count, NanoSecond64)))
-> IO (Count, Count, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ \(Count, Count, NanoSecond64)
v -> ((Count
0,Count
0,NanoSecond64
0), (Count, Count, NanoSecond64)
v)
(Count
fcnt, Count
cnt, NanoSecond64
t) <- IORef (Count, Count, NanoSecond64)
-> IO (Count, Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
col
let totalCount :: Count
totalCount = Count
fcnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
fcount
latCount :: Count
latCount = Count
cnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
count
latTime :: NanoSecond64
latTime = NanoSecond64
t NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
time
IORef (Count, Count, NanoSecond64)
-> (Count, Count, NanoSecond64) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Count, Count, NanoSecond64)
col (Count
totalCount, Count
latCount, NanoSecond64
latTime)
Bool -> IO () -> IO ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Count
latCount Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
== Count
0 Bool -> Bool -> Bool
|| NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
/= NanoSecond64
0) (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
let latPair :: Maybe (Count, NanoSecond64)
latPair =
if Count
latCount Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0 Bool -> Bool -> Bool
&& NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
then (Count, NanoSecond64) -> Maybe (Count, NanoSecond64)
forall a. a -> Maybe a
Just (Count
latCount, NanoSecond64
latTime)
else Maybe (Count, NanoSecond64)
forall a. Maybe a
Nothing
(Count, Maybe (Count, NanoSecond64))
-> IO (Count, Maybe (Count, NanoSecond64))
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
totalCount, Maybe (Count, NanoSecond64)
latPair)
{-# INLINE shouldUseCollectedBatch #-}
shouldUseCollectedBatch
:: Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> Bool
shouldUseCollectedBatch :: Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> Bool
shouldUseCollectedBatch Count
collectedYields NanoSecond64
collectedTime NanoSecond64
newLat NanoSecond64
prevLat =
let r :: Double
r = NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
newLat Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
prevLat :: Double
in (Count
collectedYields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Word -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)
Bool -> Bool -> Bool
|| (NanoSecond64
collectedTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
minThreadDelay)
Bool -> Bool -> Bool
|| (NanoSecond64
prevLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 Bool -> Bool -> Bool
&& (Double
r Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
> Double
2 Bool -> Bool -> Bool
|| Double
r Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
0.5))
Bool -> Bool -> Bool
|| (NanoSecond64
prevLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0)
collectLatency :: SVar t m a
-> YieldRateInfo
-> Bool
-> IO (Count, AbsTime, NanoSecond64)
collectLatency :: SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
drain = do
let cur :: IORef (Count, Count, NanoSecond64)
cur = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
col :: IORef (Count, Count, NanoSecond64)
col = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency YieldRateInfo
yinfo
longTerm :: IORef (Count, AbsTime)
longTerm = YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
measured :: IORef NanoSecond64
measured = YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency YieldRateInfo
yinfo
(Count
newCount, Maybe (Count, NanoSecond64)
newLatPair) <- IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency IORef (Count, Count, NanoSecond64)
cur IORef (Count, Count, NanoSecond64)
col
(Count
lcount, AbsTime
ltime) <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef IORef (Count, AbsTime)
longTerm
NanoSecond64
prevLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef IORef NanoSecond64
measured
let newLcount :: Count
newLcount = Count
lcount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
newCount
retWith :: c -> m (Count, AbsTime, c)
retWith c
lat = (Count, AbsTime, c) -> m (Count, AbsTime, c)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
newLcount, AbsTime
ltime, c
lat)
case Maybe (Count, NanoSecond64)
newLatPair of
Maybe (Count, NanoSecond64)
Nothing -> NanoSecond64 -> IO (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) c. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
prevLat
Just (Count
count, NanoSecond64
time) -> do
let newLat :: NanoSecond64
newLat = NanoSecond64
time NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
count
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> NanoSecond64 -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> NanoSecond64 -> IO ()
recordMinMaxLatency SVar t m a
sv NanoSecond64
newLat
if Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> Bool
shouldUseCollectedBatch Count
newCount NanoSecond64
time NanoSecond64
newLat NanoSecond64
prevLat Bool -> Bool -> Bool
|| Bool
drain
then do
YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval YieldRateInfo
yinfo (NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
max NanoSecond64
newLat NanoSecond64
prevLat)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> (Count, NanoSecond64) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (Count, NanoSecond64) -> IO ()
recordAvgLatency SVar t m a
sv (Count
count, NanoSecond64
time)
IORef (Count, Count, NanoSecond64)
-> (Count, Count, NanoSecond64) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Count, Count, NanoSecond64)
col (Count
0, Count
0, NanoSecond64
0)
IORef NanoSecond64 -> NanoSecond64 -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef NanoSecond64
measured ((NanoSecond64
prevLat NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
newLat) NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
2)
IORef (Count, AbsTime)
-> ((Count, AbsTime) -> (Count, AbsTime)) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef (Count, AbsTime)
longTerm (((Count, AbsTime) -> (Count, AbsTime)) -> IO ())
-> ((Count, AbsTime) -> (Count, AbsTime)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Count
_, AbsTime
t) -> (Count
newLcount, AbsTime
t)
NanoSecond64 -> IO (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) c. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
newLat
else NanoSecond64 -> IO (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) c. Monad m => c -> m (Count, AbsTime, c)
retWith NanoSecond64
prevLat
dumpSVarStats :: SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats :: SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats SVar t m a
sv SVarStats
ss SVarStyle
style = do
case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just YieldRateInfo
yinfo -> do
(Count, AbsTime, NanoSecond64)
_ <- IO (Count, AbsTime, NanoSecond64)
-> IO (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Count, AbsTime, NanoSecond64)
-> IO (Count, AbsTime, NanoSecond64))
-> IO (Count, AbsTime, NanoSecond64)
-> IO (Count, AbsTime, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
True
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Int
dispatches <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
totalDispatches SVarStats
ss
Int
maxWrk <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxWorkers SVarStats
ss
Int
maxOq <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxOutQSize SVarStats
ss
Int
maxHp <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef Int
maxHeapSize SVarStats
ss
NanoSecond64
minLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef (IORef NanoSecond64 -> IO NanoSecond64)
-> IORef NanoSecond64 -> IO NanoSecond64
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef NanoSecond64
minWorkerLatency SVarStats
ss
NanoSecond64
maxLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef (IORef NanoSecond64 -> IO NanoSecond64)
-> IORef NanoSecond64 -> IO NanoSecond64
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef NanoSecond64
maxWorkerLatency SVarStats
ss
(Count
avgCnt, NanoSecond64
avgTime) <- IORef (Count, NanoSecond64) -> IO (Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef (IORef (Count, NanoSecond64) -> IO (Count, NanoSecond64))
-> IORef (Count, NanoSecond64) -> IO (Count, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency SVarStats
ss
(Count
svarCnt, Count
svarGainLossCnt, RelTime64
svarLat) <- case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> (Count, Count, RelTime64) -> IO (Count, Count, RelTime64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
0, Count
0, RelTime64
0)
Just YieldRateInfo
yinfo -> do
(Count
cnt, AbsTime
startTime) <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef (IORef (Count, AbsTime) -> IO (Count, AbsTime))
-> IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
if Count
cnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then do
Maybe AbsTime
t <- IORef (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime SVarStats
ss)
Count
gl <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
case Maybe AbsTime
t of
Maybe AbsTime
Nothing -> do
AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
let interval :: RelTime64
interval = AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
startTime
(Count, Count, RelTime64) -> IO (Count, Count, RelTime64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
cnt, Count
gl, RelTime64
interval RelTime64 -> RelTime64 -> RelTime64
forall a. Integral a => a -> a -> a
`div` Count -> RelTime64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
cnt)
Just AbsTime
stopTime -> do
let interval :: RelTime64
interval = AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
stopTime AbsTime
startTime
(Count, Count, RelTime64) -> IO (Count, Count, RelTime64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
cnt, Count
gl, RelTime64
interval RelTime64 -> RelTime64 -> RelTime64
forall a. Integral a => a -> a -> a
`div` Count -> RelTime64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
cnt)
else (Count, Count, RelTime64) -> IO (Count, Count, RelTime64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
0, Count
0, RelTime64
0)
String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
[ String
"total dispatches = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
dispatches
, String
"max workers = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
maxWrk
, String
"max outQSize = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
maxOq
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (if SVarStyle
style SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar
then String
"\nheap max size = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
maxHp
else String
"")
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (if NanoSecond64
minLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
then String
"\nmin worker latency = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> String
showNanoSecond64 NanoSecond64
minLat
else String
"")
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (if NanoSecond64
maxLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
then String
"\nmax worker latency = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> String
showNanoSecond64 NanoSecond64
maxLat
else String
"")
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (if Count
avgCnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then let lat :: NanoSecond64
lat = NanoSecond64
avgTime NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
avgCnt
in String
"\navg worker latency = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> NanoSecond64 -> String
showNanoSecond64 NanoSecond64
lat
else String
"")
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (if RelTime64
svarLat RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
> RelTime64
0
then String
"\nSVar latency = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> RelTime64 -> String
showRelTime64 RelTime64
svarLat
else String
"")
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (if Count
svarCnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then String
"\nSVar yield count = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Count -> String
forall a. Show a => a -> String
show Count
svarCnt
else String
"")
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (if Count
svarGainLossCnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then String
"\nSVar gain/loss yield count = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Count -> String
forall a. Show a => a -> String
show Count
svarGainLossCnt
else String
"")
]
{-# NOINLINE dumpSVar #-}
dumpSVar :: SVar t m a -> IO String
dumpSVar :: SVar t m a -> IO String
dumpSVar SVar t m a
sv = do
([ChildEvent a]
oqList, Int
oqLen) <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int))
-> IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv
Maybe ()
db <- MVar () -> IO (Maybe ())
forall a. MVar a -> IO (Maybe a)
tryReadMVar (MVar () -> IO (Maybe ())) -> MVar () -> IO (Maybe ())
forall a b. (a -> b) -> a -> b
$ SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv
String
aheadDump <-
if SVar t m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar
then do
(Heap (Entry Int (AheadHeapEntry t m a))
oheap, Maybe Int
oheapSeq) <- IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. IORef a -> IO a
readIORef (IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a b. (a -> b) -> a -> b
$ SVar t m a
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap SVar t m a
sv
([t m a]
wq, Int
wqSeq) <- IORef ([t m a], Int) -> IO ([t m a], Int)
forall a. IORef a -> IO a
readIORef (IORef ([t m a], Int) -> IO ([t m a], Int))
-> IORef ([t m a], Int) -> IO ([t m a], Int)
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef ([t m a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int)
aheadWorkQueue SVar t m a
sv
String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
[ String
"heap length = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show (Heap (Entry Int (AheadHeapEntry t m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry t m a))
oheap)
, String
"heap seqeunce = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Maybe Int -> String
forall a. Show a => a -> String
show Maybe Int
oheapSeq
, String
"work queue length = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show ([t m a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [t m a]
wq)
, String
"work queue sequence = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
wqSeq
]
else String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return []
let style :: SVarStyle
style = SVar t m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv
Bool
waiting <-
if SVarStyle
style SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
/= SVarStyle
ParallelVar
then IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (IORef Bool -> IO Bool) -> IORef Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv
else Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Set ThreadId
rthread <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (IORef (Set ThreadId) -> IO (Set ThreadId))
-> IORef (Set ThreadId) -> IO (Set ThreadId)
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv
Int
workers <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
String
stats <- SVar t m a -> SVarStats -> SVarStyle -> IO String
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats SVar t m a
sv (SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv) (SVar t m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv)
String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
[
String
"Creator tid = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> ThreadId -> String
forall a. Show a => a -> String
show (SVar t m a -> ThreadId
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId
svarCreator SVar t m a
sv),
String
"style = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> SVarStyle -> String
forall a. Show a => a -> String
show (SVar t m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar t m a
sv)
, String
"---------CURRENT STATE-----------"
, String
"outputQueue length computed = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show ([ChildEvent a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ChildEvent a]
oqList)
, String
"outputQueue length maintained = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
oqLen
, String
"outputDoorBell = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Maybe () -> String
forall a. Show a => a -> String
show Maybe ()
db
]
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
aheadDump
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> [String] -> String
unlines
[ String
"needDoorBell = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Bool -> String
forall a. Show a => a -> String
show Bool
waiting
, String
"running threads = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Set ThreadId -> String
forall a. Show a => a -> String
show Set ThreadId
rthread
, String
"running thread count = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
workers
]
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"---------STATS-----------\n"
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
stats
printSVar :: SVar t m a -> String -> IO ()
printSVar :: SVar t m a -> String -> IO ()
printSVar SVar t m a
sv String
how = do
String
svInfo <- SVar t m a -> IO String
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv
Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
how String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
svInfo
{-# NOINLINE mvarExcHandler #-}
mvarExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler SVar t m a
sv String
label e :: BlockedIndefinitelyOnMVar
e@BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar = do
String
svInfo <- SVar t m a -> IO String
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv
Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
label String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"BlockedIndefinitelyOnMVar\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
svInfo
BlockedIndefinitelyOnMVar -> IO ()
forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnMVar
e
{-# NOINLINE stmExcHandler #-}
stmExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler SVar t m a
sv String
label e :: BlockedIndefinitelyOnSTM
e@BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM = do
String
svInfo <- SVar t m a -> IO String
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO String
dumpSVar SVar t m a
sv
Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
label String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"BlockedIndefinitelyOnSTM\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
svInfo
BlockedIndefinitelyOnSTM -> IO ()
forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnSTM
e
withDiagMVar :: SVar t m a -> String -> IO () -> IO ()
withDiagMVar :: SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar t m a
sv String
label IO ()
action =
if SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv
then
IO ()
action IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
`catches` [ (BlockedIndefinitelyOnMVar -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler SVar t m a
sv String
label)
, (BlockedIndefinitelyOnSTM -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler SVar t m a
sv String
label)
]
else IO ()
action
captureMonadState :: MonadBaseControl IO m => m (RunInIO m)
captureMonadState :: m (RunInIO m)
captureMonadState = (RunInBase m IO -> IO (StM m (RunInIO m))) -> m (RunInIO m)
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b (StM m a)) -> m a
control ((RunInBase m IO -> IO (StM m (RunInIO m))) -> m (RunInIO m))
-> (RunInBase m IO -> IO (StM m (RunInIO m))) -> m (RunInIO m)
forall a b. (a -> b) -> a -> b
$ \RunInBase m IO
run -> m (RunInIO m) -> IO (StM m (RunInIO m))
RunInBase m IO
run (RunInIO m -> m (RunInIO m)
forall (m :: * -> *) a. Monad m => a -> m a
return (RunInIO m -> m (RunInIO m)) -> RunInIO m -> m (RunInIO m)
forall a b. (a -> b) -> a -> b
$ RunInBase m IO -> RunInIO m
forall (m :: * -> *). (forall b. m b -> IO (StM m b)) -> RunInIO m
RunInIO RunInBase m IO
run)
{-# INLINE decrementYieldLimit #-}
decrementYieldLimit :: SVar t m a -> IO Bool
decrementYieldLimit :: SVar t m a -> IO Bool
decrementYieldLimit SVar t m a
sv =
case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just IORef Count
ref -> do
Count
r <- IORef Count -> (Count -> (Count, Count)) -> IO Count
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref ((Count -> (Count, Count)) -> IO Count)
-> (Count -> (Count, Count)) -> IO Count
forall a b. (a -> b) -> a -> b
$ \Count
x -> (Count
x Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
1, Count
x)
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Count
r Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
1
{-# INLINE incrementYieldLimit #-}
incrementYieldLimit :: SVar t m a -> IO ()
incrementYieldLimit :: SVar t m a -> IO ()
incrementYieldLimit SVar t m a
sv =
case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Maybe (IORef Count)
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just IORef Count
ref -> IORef Count -> (Count -> Count) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ IORef Count
ref (Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
1)
{-# INLINE decrementBufferLimit #-}
decrementBufferLimit :: SVar t m a -> IO ()
decrementBufferLimit :: SVar t m a -> IO ()
decrementBufferLimit SVar t m a
sv =
case SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
Limit
Unlimited -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Limited Word
_ -> do
let ref :: IORef Count
ref = SVar t m a -> IORef Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv
Count
old <- IORef Count -> (Count -> (Count, Count)) -> IO Count
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref ((Count -> (Count, Count)) -> IO Count)
-> (Count -> (Count, Count)) -> IO Count
forall a b. (a -> b) -> a -> b
$ \Count
x ->
(if Count
x Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
1 then Count
x Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
1 else Count
x, Count
x)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
old Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
case SVar t m a -> PushBufferPolicy
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> PushBufferPolicy
pushBufferPolicy SVar t m a
sv of
PushBufferPolicy
PushBufferBlock -> IO ()
blockAndRetry
PushBufferPolicy
PushBufferDropNew -> do
Bool
block <- IORef ([ChildEvent a], Int)
-> (([ChildEvent a], Int) -> (([ChildEvent a], Int), Bool))
-> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv) ((([ChildEvent a], Int) -> (([ChildEvent a], Int), Bool))
-> IO Bool)
-> (([ChildEvent a], Int) -> (([ChildEvent a], Int), Bool))
-> IO Bool
forall a b. (a -> b) -> a -> b
$
\([ChildEvent a]
es, Int
n) ->
case [ChildEvent a]
es of
[] -> (([],Int
n), Bool
True)
ChildEvent a
_ : [ChildEvent a]
xs -> (([ChildEvent a]
xs, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1), Bool
False)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
block IO ()
blockAndRetry
PushBufferPolicy
PushBufferDropOld -> IO ()
forall a. (?callStack::CallStack) => a
undefined
where
blockAndRetry :: IO ()
blockAndRetry = do
let ref :: IORef Count
ref = SVar t m a -> IORef Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv)
Count
old <- IORef Count -> (Count -> (Count, Count)) -> IO Count
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref ((Count -> (Count, Count)) -> IO Count)
-> (Count -> (Count, Count)) -> IO Count
forall a b. (a -> b) -> a -> b
$ \Count
x ->
(if Count
x Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
1 then Count
x Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
1 else Count
x, Count
x)
if Count
old Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
1
then IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> IO Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
else IO ()
blockAndRetry
{-# INLINE incrementBufferLimit #-}
incrementBufferLimit :: SVar t m a -> IO ()
incrementBufferLimit :: SVar t m a -> IO ()
incrementBufferLimit SVar t m a
sv =
case SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
Limit
Unlimited -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Limited Word
_ -> do
IORef Count -> (Count -> Count) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv) (Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
1)
IO ()
writeBarrier
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> IO Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
{-# INLINE resetBufferLimit #-}
resetBufferLimit :: SVar t m a -> IO ()
resetBufferLimit :: SVar t m a -> IO ()
resetBufferLimit SVar t m a
sv =
case SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
Limit
Unlimited -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Limited Word
n -> IORef Count -> (Count -> Count) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv)
(Count -> Count -> Count
forall a b. a -> b -> a
const (Word -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
n))
{-# INLINE sendWithDoorBell #-}
sendWithDoorBell ::
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell :: IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell ChildEvent a
msg = do
Int
oldlen <- IORef ([ChildEvent a], Int)
-> (([ChildEvent a], Int) -> (([ChildEvent a], Int), Int))
-> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([ChildEvent a], Int)
q ((([ChildEvent a], Int) -> (([ChildEvent a], Int), Int)) -> IO Int)
-> (([ChildEvent a], Int) -> (([ChildEvent a], Int), Int))
-> IO Int
forall a b. (a -> b) -> a -> b
$ \([ChildEvent a]
es, Int
n) ->
((ChildEvent a
msg ChildEvent a -> [ChildEvent a] -> [ChildEvent a]
forall a. a -> [a] -> [a]
: [ChildEvent a]
es, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Int
n)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
oldlen Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO ()
writeBarrier
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
bell ()
Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
oldlen
send :: SVar t m a -> ChildEvent a -> IO Int
send :: SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv = IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv) (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)
sendToProducer :: SVar t m a -> ChildEvent a -> IO Int
sendToProducer :: SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv ChildEvent a
msg = do
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer SVar t m a
sv)
(SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar t m a
sv) ChildEvent a
msg
sendStopToProducer :: MonadIO m => SVar t m a -> m ()
sendStopToProducer :: SVar t m a -> m ()
sendStopToProducer SVar t m a
sv = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ThreadId
tid <- IO ThreadId
myThreadId
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv (ThreadId -> Maybe SomeException -> ChildEvent a
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid Maybe SomeException
forall a. Maybe a
Nothing)
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo = do
(Count
cnt0, AbsTime
t0) <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo)
Count
cnt1 <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo)
let cnt :: Count
cnt = Count
cnt1 Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
cnt0
if Count
cnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then do
AbsTime
t1 <- Clock -> IO AbsTime
getTime Clock
Monotonic
let period :: NanoSecond64
period = RelTime64 -> NanoSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (RelTime64 -> NanoSecond64) -> RelTime64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
t1 AbsTime
t0
IORef (Count, AbsTime) -> (Count, AbsTime) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo) (Count
cnt1, AbsTime
t1)
Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64)))
-> Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64))
forall a b. (a -> b) -> a -> b
$ (Count, NanoSecond64) -> Maybe (Count, NanoSecond64)
forall a. a -> Maybe a
Just (Count
cnt, NanoSecond64
period)
else Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Count, NanoSecond64)
forall a. Maybe a
Nothing
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
yinfo WorkerInfo
winfo = do
Maybe (Count, NanoSecond64)
r <- WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo
case Maybe (Count, NanoSecond64)
r of
Just (Count
cnt, NanoSecond64
period) -> do
let ref :: IORef (Count, Count, NanoSecond64)
ref = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
(Count
cnt1, NanoSecond64
t1) = if NanoSecond64
period NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then (Count
cnt, NanoSecond64
period) else (Count
0, NanoSecond64
0)
IORef (Count, Count, NanoSecond64)
-> ((Count, Count, NanoSecond64) -> (Count, Count, NanoSecond64))
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ IORef (Count, Count, NanoSecond64)
ref (((Count, Count, NanoSecond64) -> (Count, Count, NanoSecond64))
-> IO ())
-> ((Count, Count, NanoSecond64) -> (Count, Count, NanoSecond64))
-> IO ()
forall a b. (a -> b) -> a -> b
$
\(Count
fc, Count
n, NanoSecond64
t) -> (Count
fc Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
cnt, Count
n Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
cnt1, NanoSecond64
t NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
t1)
Maybe (Count, NanoSecond64)
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount WorkerInfo
winfo = do
Count
cnt <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo)
let cnt1 :: Count
cnt1 = Count
cnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
1
IORef Count -> Count -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo) Count
cnt1
Count -> IO Count
forall (m :: * -> *) a. Monad m => a -> m a
return Count
cnt1
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
winfo =
let ymax :: Count
ymax = WorkerInfo -> Count
workerYieldMax WorkerInfo
winfo
in Count
ymax Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& Count
cnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
ymax
{-# NOINLINE checkRatePeriodic #-}
checkRatePeriodic :: SVar t m a
-> YieldRateInfo
-> WorkerInfo
-> Count
-> IO Bool
checkRatePeriodic :: SVar t m a -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo Count
ycnt = do
Count
i <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo)
if Count
i Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& (Count
ycnt Count -> Count -> Count
forall a. Integral a => a -> a -> a
`mod` Count
i) Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
== Count
0
then do
YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
yinfo WorkerInfo
winfo
SVar t m a -> YieldRateInfo -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar t m a
sv YieldRateInfo
yinfo
else Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
{-# NOINLINE workerRateControl #-}
workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo = do
Count
cnt <- WorkerInfo -> IO Count
updateYieldCount WorkerInfo
winfo
Bool
beyondMaxRate <- SVar t m a -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo Count
cnt
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not (Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
winfo Bool -> Bool -> Bool
|| Bool
beyondMaxRate)
{-# INLINE sendYield #-}
sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar t m a
sv Maybe WorkerInfo
mwinfo ChildEvent a
msg = do
Int
oldlen <- SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv ChildEvent a
msg
let limit :: Limit
limit = SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv
Bool
bufferSpaceOk <- case Limit
limit of
Limit
Unlimited -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Limited Word
lim -> do
Int
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv)
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ (Int
oldlen Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< (Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
active)
Bool
rateLimitOk <-
case Maybe WorkerInfo
mwinfo of
Just WorkerInfo
winfo ->
case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just YieldRateInfo
yinfo -> SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo
Maybe WorkerInfo
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
bufferSpaceOk Bool -> Bool -> Bool
&& Bool
rateLimitOk
{-# INLINE workerStopUpdate #-}
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
info = do
Count
i <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
info)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
i Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
info WorkerInfo
winfo
{-# INLINABLE sendStop #-}
sendStop :: SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop :: SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
sv Maybe WorkerInfo
mwinfo = do
IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
case (Maybe WorkerInfo
mwinfo, SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv) of
(Just WorkerInfo
winfo, Just YieldRateInfo
info) ->
WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
info
(Maybe WorkerInfo, Maybe YieldRateInfo)
_ ->
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ThreadId
tid -> IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv (ThreadId -> Maybe SomeException -> ChildEvent a
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid Maybe SomeException
forall a. Maybe a
Nothing)
{-# INLINE ringDoorBell #-}
ringDoorBell :: SVar t m a -> IO ()
ringDoorBell :: SVar t m a -> IO ()
ringDoorBell SVar t m a
sv = do
IO ()
storeLoadBarrier
Bool
w <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (IORef Bool -> IO Bool) -> IORef Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
w (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IORef Bool -> (Bool -> Bool) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv) (Bool -> Bool -> Bool
forall a b. a -> b -> a
const Bool
False)
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv) ()
{-# INLINE enqueueLIFO #-}
enqueueLIFO ::
SVar t m a -> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO :: SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar t m a
sv IORef [(RunInIO m, t m a)]
q (RunInIO m, t m a)
m = do
IORef [(RunInIO m, t m a)]
-> ([(RunInIO m, t m a)] -> [(RunInIO m, t m a)]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ IORef [(RunInIO m, t m a)]
q (([(RunInIO m, t m a)] -> [(RunInIO m, t m a)]) -> IO ())
-> ([(RunInIO m, t m a)] -> [(RunInIO m, t m a)]) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[(RunInIO m, t m a)]
ms -> (RunInIO m, t m a)
m (RunInIO m, t m a) -> [(RunInIO m, t m a)] -> [(RunInIO m, t m a)]
forall a. a -> [a] -> [a]
: [(RunInIO m, t m a)]
ms
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
{-# INLINE enqueueFIFO #-}
enqueueFIFO ::
SVar t m a
-> LinkedQueue (RunInIO m, t m a)
-> (RunInIO m, t m a)
-> IO ()
enqueueFIFO :: SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar t m a
sv LinkedQueue (RunInIO m, t m a)
q (RunInIO m, t m a)
m = do
LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
forall a. LinkedQueue a -> a -> IO ()
pushL LinkedQueue (RunInIO m, t m a)
q (RunInIO m, t m a)
m
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
{-# INLINE enqueueAhead #-}
enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead SVar t m a
sv IORef ([t m a], Int)
q (RunInIO m, t m a)
m = do
IORef ([t m a], Int) -> (([t m a], Int) -> ([t m a], Int)) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ IORef ([t m a], Int)
q ((([t m a], Int) -> ([t m a], Int)) -> IO ())
-> (([t m a], Int) -> ([t m a], Int)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ case
([], Int
n) -> ([(RunInIO m, t m a) -> t m a
forall a b. (a, b) -> b
snd (RunInIO m, t m a)
m], Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
([t m a], Int)
_ -> String -> ([t m a], Int)
forall a. (?callStack::CallStack) => String -> a
error String
"enqueueAhead: queue is not empty"
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
{-# INLINE reEnqueueAhead #-}
reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar t m a
sv IORef ([t m a], Int)
q t m a
m = do
IORef ([t m a], Int) -> (([t m a], Int) -> ([t m a], Int)) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ IORef ([t m a], Int)
q ((([t m a], Int) -> ([t m a], Int)) -> IO ())
-> (([t m a], Int) -> ([t m a], Int)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ case
([], Int
n) -> ([t m a
m], Int
n)
([t m a], Int)
_ -> String -> ([t m a], Int)
forall a. (?callStack::CallStack) => String -> a
error String
"reEnqueueAhead: queue is not empty"
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
{-# INLINE queueEmptyAhead #-}
queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool
queueEmptyAhead :: IORef ([t m a], Int) -> m Bool
queueEmptyAhead IORef ([t m a], Int)
q = IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
([t m a]
xs, Int
_) <- IORef ([t m a], Int) -> IO ([t m a], Int)
forall a. IORef a -> IO a
readIORef IORef ([t m a], Int)
q
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ [t m a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [t m a]
xs
{-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m
=> IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead :: IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([t m a], Int)
q = IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int))
forall a b. (a -> b) -> a -> b
$
IORef ([t m a], Int)
-> (([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([t m a], Int)
q ((([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int)))
-> (([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int))
forall a b. (a -> b) -> a -> b
$ \case
([], Int
n) -> (([], Int
n), Maybe (t m a, Int)
forall a. Maybe a
Nothing)
(t m a
x : [], Int
n) -> (([], Int
n), (t m a, Int) -> Maybe (t m a, Int)
forall a. a -> Maybe a
Just (t m a
x, Int
n))
([t m a], Int)
_ -> String -> (([t m a], Int), Maybe (t m a, Int))
forall a. (?callStack::CallStack) => String -> a
error String
"more than one item on queue"
withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef IORef a
ref a -> IO b
f = IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> IO b
f
atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ IORef a
ref a -> a
f =
IORef a -> (a -> (a, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef a
ref ((a -> (a, ())) -> IO ()) -> (a -> (a, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
x -> (a -> a
f a
x, ())
data HeapDequeueResult t m a =
Clearing
| Waiting Int
| Ready (Entry Int (AheadHeapEntry t m a))
{-# INLINE dequeueFromHeap #-}
dequeueFromHeap
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar =
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a))
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. (a -> b) -> a -> b
$ \pair :: (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair@(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
case Maybe Int
snum of
Maybe Int
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
HeapDequeueResult t m a
Clearing)
Just Int
n -> do
let r :: Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r = Heap (Entry Int (AheadHeapEntry t m a))
-> Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
case Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r of
Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n
then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', Maybe Int
forall a. Maybe a
Nothing), Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
else Bool
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n) ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
i =
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a))
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
case Maybe Int
snum of
Maybe Int
Nothing -> do
let r :: Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r = Heap (Entry Int (AheadHeapEntry t m a))
-> Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
case Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r of
Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
i
then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', Maybe Int
forall a. Maybe a
Nothing), Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
else Bool
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
i) ((Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
i), Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
i), Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
Just Int
_ -> String
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a)
forall a. (?callStack::CallStack) => String -> a
error String
"dequeueFromHeapSeq: unreachable"
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo =
case Maybe Int
snum of
Maybe Int
Nothing -> Bool
True
Just Int
n -> Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
{-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a)
-> Int
-> IO ()
requeueOnHeapTop :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Entry Int (AheadHeapEntry t m a)
ent Int
seqNo =
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ())
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
Bool
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Entry Int (AheadHeapEntry t m a)
-> Heap (Entry Int (AheadHeapEntry t m a))
-> Heap (Entry Int (AheadHeapEntry t m a))
forall a. Ord a => a -> Heap a -> Heap a
H.insert Entry Int (AheadHeapEntry t m a)
ent Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
seqNo)
{-# INLINE updateHeapSeq #-}
updateHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO ()
updateHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
seqNo =
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ())
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
Bool
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
seqNo)
{-# NOINLINE addThread #-}
addThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
addThread :: SVar t m a -> ThreadId -> m ()
addThread SVar t m a
sv ThreadId
tid =
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Set ThreadId) -> (Set ThreadId -> Set ThreadId) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv) (ThreadId -> Set ThreadId -> Set ThreadId
forall a. Ord a => a -> Set a -> Set a
S.insert ThreadId
tid)
{-# INLINE delThread #-}
delThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
delThread :: SVar t m a -> ThreadId -> m ()
delThread SVar t m a
sv ThreadId
tid =
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Set ThreadId) -> (Set ThreadId -> Set ThreadId) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv) (ThreadId -> Set ThreadId -> Set ThreadId
forall a. Ord a => a -> Set a -> Set a
S.delete ThreadId
tid)
{-# INLINE modifyThread #-}
modifyThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
modifyThread :: SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv ThreadId
tid = do
Set ThreadId
changed <- IO (Set ThreadId) -> m (Set ThreadId)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Set ThreadId) -> m (Set ThreadId))
-> IO (Set ThreadId) -> m (Set ThreadId)
forall a b. (a -> b) -> a -> b
$ IORef (Set ThreadId)
-> (Set ThreadId -> (Set ThreadId, Set ThreadId))
-> IO (Set ThreadId)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv) ((Set ThreadId -> (Set ThreadId, Set ThreadId))
-> IO (Set ThreadId))
-> (Set ThreadId -> (Set ThreadId, Set ThreadId))
-> IO (Set ThreadId)
forall a b. (a -> b) -> a -> b
$ \Set ThreadId
old ->
if ThreadId -> Set ThreadId -> Bool
forall a. Ord a => a -> Set a -> Bool
S.member ThreadId
tid Set ThreadId
old
then let new :: Set ThreadId
new = ThreadId -> Set ThreadId -> Set ThreadId
forall a. Ord a => a -> Set a -> Set a
S.delete ThreadId
tid Set ThreadId
old in (Set ThreadId
new, Set ThreadId
new)
else let new :: Set ThreadId
new = ThreadId -> Set ThreadId -> Set ThreadId
forall a. Ord a => a -> Set a -> Set a
S.insert ThreadId
tid Set ThreadId
old in (Set ThreadId
new, Set ThreadId
old)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set ThreadId -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null Set ThreadId
changed) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO ()
writeBarrier
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv) ()
{-# INLINE allThreadsDone #-}
allThreadsDone :: MonadIO m => SVar t m a -> m Bool
allThreadsDone :: SVar t m a -> m Bool
allThreadsDone SVar t m a
sv = IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Set ThreadId -> Bool
forall a. Set a -> Bool
S.null (Set ThreadId -> Bool) -> IO (Set ThreadId) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv)
{-# NOINLINE handleChildException #-}
handleChildException :: SVar t m a -> SomeException -> IO ()
handleChildException :: SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv SomeException
e = do
ThreadId
tid <- IO ThreadId
myThreadId
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv (ThreadId -> Maybe SomeException -> ChildEvent a
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e))
{-# NOINLINE handleFoldException #-}
handleFoldException :: SVar t m a -> SomeException -> IO ()
handleFoldException :: SVar t m a -> SomeException -> IO ()
handleFoldException SVar t m a
sv SomeException
e = do
ThreadId
tid <- IO ThreadId
myThreadId
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv (ThreadId -> Maybe SomeException -> ChildEvent a
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e))
{-# NOINLINE recordMaxWorkers #-}
recordMaxWorkers :: MonadIO m => SVar t m a -> m ()
recordMaxWorkers :: SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Int
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv)
Int
maxWrk <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxWorkers (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxWrk) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxWorkers (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv) Int
active
IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (SVarStats -> IORef Int
totalDispatches (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
{-# NOINLINE pushWorker #-}
pushWorker :: MonadAsync m => Count -> SVar t m a -> m ()
pushWorker :: Count -> SVar t m a -> m ()
pushWorker Count
yieldMax SVar t m a
sv = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv
Maybe WorkerInfo
winfo <-
case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> Maybe WorkerInfo -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe WorkerInfo
forall a. Maybe a
Nothing
Just YieldRateInfo
_ -> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo))
-> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ do
IORef Count
cntRef <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
0
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Count, AbsTime)
lat <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe WorkerInfo -> IO (Maybe WorkerInfo))
-> Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ WorkerInfo -> Maybe WorkerInfo
forall a. a -> Maybe a
Just WorkerInfo :: Count -> IORef Count -> IORef (Count, AbsTime) -> WorkerInfo
WorkerInfo
{ workerYieldMax :: Count
workerYieldMax = Count
yieldMax
, workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
, workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
}
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (SVar t m a -> Maybe WorkerInfo -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> m ()
workLoop SVar t m a
sv Maybe WorkerInfo
winfo) (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
addThread SVar t m a
sv
{-# INLINE pushWorkerPar #-}
pushWorkerPar
:: MonadAsync m
=> SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar :: SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar t m a
sv Maybe WorkerInfo -> m ()
wloop =
if SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv
then m ()
forkWithDiag
else m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
wloop Maybe WorkerInfo
forall a. Maybe a
Nothing) (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
where
{-# NOINLINE forkWithDiag #-}
forkWithDiag :: m ()
forkWithDiag = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv
Maybe WorkerInfo
winfo <-
case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> Maybe WorkerInfo -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe WorkerInfo
forall a. Maybe a
Nothing
Just YieldRateInfo
_ -> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo))
-> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ do
IORef Count
cntRef <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
0
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Count, AbsTime)
lat <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe WorkerInfo -> IO (Maybe WorkerInfo))
-> Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ WorkerInfo -> Maybe WorkerInfo
forall a. a -> Maybe a
Just WorkerInfo :: Count -> IORef Count -> IORef (Count, AbsTime) -> WorkerInfo
WorkerInfo
{ workerYieldMax :: Count
workerYieldMax = Count
0
, workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
, workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
}
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
wloop Maybe WorkerInfo
winfo) (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
dispatchWorker :: MonadAsync m => Count -> SVar t m a -> m Bool
dispatchWorker :: Count -> SVar t m a -> m Bool
dispatchWorker Count
yieldCount SVar t m a
sv = do
let workerLimit :: Limit
workerLimit = SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit SVar t m a
sv
Bool
done <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
if Bool -> Bool
not Bool
done
then do
Bool
qDone <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isQueueDone SVar t m a
sv
Int
active <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
if Bool -> Bool
not Bool
qDone
then do
Limit
limit <- case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Maybe (IORef Count)
Nothing -> Limit -> m Limit
forall (m :: * -> *) a. Monad m => a -> m a
return Limit
workerLimit
Just IORef Count
ref -> do
Count
n <- IO Count -> m Count
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Count -> m Count) -> IO Count -> m Count
forall a b. (a -> b) -> a -> b
$ IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Just YieldRateInfo
_ -> Limit -> m Limit
forall (m :: * -> *) a. Monad m => a -> m a
return Limit
workerLimit
Maybe YieldRateInfo
Nothing ->
Limit -> m Limit
forall (m :: * -> *) a. Monad m => a -> m a
return (Limit -> m Limit) -> Limit -> m Limit
forall a b. (a -> b) -> a -> b
$
case Limit
workerLimit of
Limit
Unlimited -> Word -> Limit
Limited (Count -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
n)
Limited Word
lim -> Word -> Limit
Limited (Word -> Limit) -> Word -> Limit
forall a b. (a -> b) -> a -> b
$ Word -> Word -> Word
forall a. Ord a => a -> a -> a
min Word
lim (Count -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
n)
let dispatch :: m Bool
dispatch = Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
yieldCount SVar t m a
sv m () -> m Bool -> m Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
in case Limit
limit of
Limit
Unlimited -> m Bool
dispatch
Limited Word
lim | Word
lim Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
> Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
active -> m Bool
dispatch
Limit
_ -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
minThreadDelay :: NanoSecond64
minThreadDelay :: NanoSecond64
minThreadDelay = NanoSecond64
1000000
rateRecoveryTime :: NanoSecond64
rateRecoveryTime :: NanoSecond64
rateRecoveryTime = NanoSecond64
1000000
data Work
= BlockWait NanoSecond64
| PartialWorker Count
| ManyWorkers Int Count
deriving Int -> Work -> ShowS
[Work] -> ShowS
Work -> String
(Int -> Work -> ShowS)
-> (Work -> String) -> ([Work] -> ShowS) -> Show Work
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Work] -> ShowS
$cshowList :: [Work] -> ShowS
show :: Work -> String
$cshow :: Work -> String
showsPrec :: Int -> Work -> ShowS
$cshowsPrec :: Int -> Work -> ShowS
Show
estimateWorkers
:: Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers :: Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
gainLossYields
NanoSecond64
svarElapsed NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range =
let
targetYields :: NanoSecond64
targetYields = (NanoSecond64
svarElapsed NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
targetLat NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
- NanoSecond64
1) NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
targetLat
effectiveYields :: Count
effectiveYields = Count
svarYields Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
gainLossYields
deltaYields :: Count
deltaYields = NanoSecond64 -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetYields Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
effectiveYields
in if Count
deltaYields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then
let deltaYieldsFreq :: Double
deltaYieldsFreq :: Double
deltaYieldsFreq =
Count -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/
NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
rateRecoveryTime
yieldsFreq :: Double
yieldsFreq = Double
1.0 Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetLat
totalYieldsFreq :: Double
totalYieldsFreq = Double
yieldsFreq Double -> Double -> Double
forall a. Num a => a -> a -> a
+ Double
deltaYieldsFreq
requiredLat :: NanoSecond64
requiredLat = Int64 -> NanoSecond64
NanoSecond64 (Int64 -> NanoSecond64) -> Int64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ Double -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int64) -> Double -> Int64
forall a b. (a -> b) -> a -> b
$ Double
1.0 Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
totalYieldsFreq
adjustedLat :: NanoSecond64
adjustedLat = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min (NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
max NanoSecond64
requiredLat (LatencyRange -> NanoSecond64
minLatency LatencyRange
range))
(LatencyRange -> NanoSecond64
maxLatency LatencyRange
range)
in Bool -> Work -> Work
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
adjustedLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0) (Work -> Work) -> Work -> Work
forall a b. (a -> b) -> a -> b
$
if NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
<= NanoSecond64
adjustedLat
then Count -> Work
PartialWorker Count
deltaYields
else let workers :: NanoSecond64
workers = NanoSecond64 -> NanoSecond64
forall a. (Ord a, Num a) => a -> a
withLimit (NanoSecond64 -> NanoSecond64) -> NanoSecond64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
adjustedLat
limited :: NanoSecond64
limited = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min NanoSecond64
workers (Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields)
in Int -> Count -> Work
ManyWorkers (NanoSecond64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
limited) Count
deltaYields
else
let expectedDuration :: NanoSecond64
expectedDuration = Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
effectiveYields NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
* NanoSecond64
targetLat
sleepTime :: NanoSecond64
sleepTime = NanoSecond64
expectedDuration NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
- NanoSecond64
svarElapsed
maxSleepTime :: NanoSecond64
maxSleepTime = LatencyRange -> NanoSecond64
maxLatency LatencyRange
range NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
- NanoSecond64
wLatency
s :: NanoSecond64
s = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min NanoSecond64
sleepTime NanoSecond64
maxSleepTime
in Bool -> Work -> Work
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
sleepTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) (Work -> Work) -> Work -> Work
forall a b. (a -> b) -> a -> b
$
if NanoSecond64
s NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then NanoSecond64 -> Work
BlockWait NanoSecond64
s else Int -> Count -> Work
ManyWorkers Int
1 (Int64 -> Count
Count Int64
0)
where
withLimit :: a -> a
withLimit a
n =
case Limit
workerLimit of
Limit
Unlimited -> a
n
Limited Word
x -> a -> a -> a
forall a. Ord a => a -> a -> a
min a
n (Word -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
x)
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
yinfo = do
let cur :: IORef (Count, Count, NanoSecond64)
cur = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
col :: IORef (Count, Count, NanoSecond64)
col = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency YieldRateInfo
yinfo
longTerm :: IORef (Count, AbsTime)
longTerm = YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
measured :: IORef NanoSecond64
measured = YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency YieldRateInfo
yinfo
(Count
curTotalCount, Count
curCount, NanoSecond64
curTime) <- IORef (Count, Count, NanoSecond64)
-> IO (Count, Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
cur
(Count
colTotalCount, Count
colCount, NanoSecond64
colTime) <- IORef (Count, Count, NanoSecond64)
-> IO (Count, Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
col
(Count
lcount, AbsTime
ltime) <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef IORef (Count, AbsTime)
longTerm
NanoSecond64
prevLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef IORef NanoSecond64
measured
let latCount :: Count
latCount = Count
colCount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
curCount
latTime :: NanoSecond64
latTime = NanoSecond64
colTime NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
curTime
totalCount :: Count
totalCount = Count
colTotalCount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
curTotalCount
newLat :: NanoSecond64
newLat =
if Count
latCount Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0 Bool -> Bool -> Bool
&& NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
then let lat :: NanoSecond64
lat = NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
latCount
in (NanoSecond64
lat NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
prevLat) NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
2
else NanoSecond64
prevLat
(Count, AbsTime, NanoSecond64) -> IO (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
lcount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
totalCount, AbsTime
ltime, NanoSecond64
newLat)
isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar t m a
sv YieldRateInfo
yinfo = do
(Count
count, AbsTime
tstamp, NanoSecond64
wLatency) <- YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
yinfo
AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
let duration :: NanoSecond64
duration = RelTime64 -> NanoSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (RelTime64 -> NanoSecond64) -> RelTime64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
tstamp
let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo
Count
gainLoss <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers (SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit SVar t m a
sv) Count
count Count
gainLoss NanoSecond64
duration
NanoSecond64
wLatency NanoSecond64
targetLat (YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
yinfo)
Int
cnt <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ case Work
work of
PartialWorker Count
_yields -> Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1
ManyWorkers Int
n Count
_ -> Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n
BlockWait NanoSecond64
_ -> Bool
True
dispatchWorkerPaced :: MonadAsync m => SVar t m a -> m Bool
dispatchWorkerPaced :: SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv = do
let yinfo :: YieldRateInfo
yinfo = Maybe YieldRateInfo -> YieldRateInfo
forall a. (?callStack::CallStack) => Maybe a -> a
fromJust (Maybe YieldRateInfo -> YieldRateInfo)
-> Maybe YieldRateInfo -> YieldRateInfo
forall a b. (a -> b) -> a -> b
$ SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv
(Count
svarYields, NanoSecond64
svarElapsed, NanoSecond64
wLatency) <- do
AbsTime
now <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
(Count
yieldCount, AbsTime
baseTime, NanoSecond64
lat) <-
IO (Count, AbsTime, NanoSecond64)
-> m (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Count, AbsTime, NanoSecond64)
-> m (Count, AbsTime, NanoSecond64))
-> IO (Count, AbsTime, NanoSecond64)
-> m (Count, AbsTime, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
False
let elapsed :: NanoSecond64
elapsed = RelTime64 -> NanoSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (RelTime64 -> NanoSecond64) -> RelTime64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
baseTime
let latency :: NanoSecond64
latency =
if NanoSecond64
lat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0
then NanoSecond64 -> Maybe NanoSecond64 -> NanoSecond64
forall a. a -> Maybe a -> a
fromMaybe NanoSecond64
lat (YieldRateInfo -> Maybe NanoSecond64
workerBootstrapLatency YieldRateInfo
yinfo)
else NanoSecond64
lat
(Count, NanoSecond64, NanoSecond64)
-> m (Count, NanoSecond64, NanoSecond64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
yieldCount, NanoSecond64
elapsed, NanoSecond64
latency)
if NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0
then Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do
let workerLimit :: Limit
workerLimit = SVar t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit SVar t m a
sv
let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo
let range :: LatencyRange
range = YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
yinfo
Count
gainLoss <- IO Count -> m Count
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Count -> m Count) -> IO Count -> m Count
forall a b. (a -> b) -> a -> b
$ IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
gainLoss NanoSecond64
svarElapsed
NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range
case Work
work of
BlockWait NanoSecond64
s -> do
Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
s NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
Bool
done <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ do
let us :: MicroSecond64
us = RelTime64 -> MicroSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (NanoSecond64 -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 NanoSecond64
s) :: MicroSecond64
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (MicroSecond64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral MicroSecond64
us)
Count -> SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
1 SVar t m a
sv
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
PartialWorker Count
yields -> do
Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
YieldRateInfo -> Count -> m ()
forall (f :: * -> *). MonadIO f => YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields
Bool
done <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
yields SVar t m a
sv
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
ManyWorkers Int
netWorkers Count
yields -> do
Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
netWorkers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
Bool -> m () -> m ()
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
0) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
YieldRateInfo -> Count -> m ()
forall (f :: * -> *). MonadIO f => YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields
let periodRef :: IORef Count
periodRef = YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo
ycnt :: Count
ycnt = Count -> Count -> Count
forall a. Ord a => a -> a -> a
max Count
1 (Count -> Count) -> Count -> Count
forall a b. (a -> b) -> a -> b
$ Count
yields Count -> Count -> Count
forall a. Integral a => a -> a -> a
`div` Int -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
netWorkers
period :: Count
period = Count -> Count -> Count
forall a. Ord a => a -> a -> a
min Count
ycnt (Word -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)
Count
old <- IO Count -> m Count
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Count -> m Count) -> IO Count -> m Count
forall a b. (a -> b) -> a -> b
$ IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
periodRef
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
period Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
< Count
old) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Count -> Count -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Count
periodRef Count
period
Int
cnt <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
if Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
netWorkers
then do
let total :: Int
total = Int
netWorkers Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
cnt
batch :: Int
batch = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ NanoSecond64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (NanoSecond64 -> Int) -> NanoSecond64 -> Int
forall a b. (a -> b) -> a -> b
$
NanoSecond64
minThreadDelay NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
targetLat
Int -> m Bool
forall t. (Eq t, Num t) => t -> m Bool
dispatchN (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
total Int
batch)
else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
where
updateGainedLostYields :: YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields = do
let buf :: Count
buf = Int -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Count) -> Int -> Count
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> Int
svarRateBuffer YieldRateInfo
yinfo
Bool -> f () -> f ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
yields Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& Count -> Count
forall a. Num a => a -> a
abs Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
buf) (f () -> f ()) -> f () -> f ()
forall a b. (a -> b) -> a -> b
$ do
let delta :: Count
delta =
if Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then Count
yields Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
buf
else Count
yields Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
buf
IO () -> f ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> f ()) -> IO () -> f ()
forall a b. (a -> b) -> a -> b
$ IORef Count -> (Count -> Count) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo) (Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
delta)
dispatchN :: t -> m Bool
dispatchN t
n =
if t
n t -> t -> Bool
forall a. Eq a => a -> a -> Bool
== t
0
then Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else do
Bool
r <- Count -> SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
0 SVar t m a
sv
if Bool
r
then t -> m Bool
dispatchN (t
n t -> t -> t
forall a. Num a => a -> a -> a
- t
1)
else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
sendWorkerDelayPaced :: SVar t m a -> IO ()
sendWorkerDelayPaced :: SVar t m a -> IO ()
sendWorkerDelayPaced SVar t m a
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendWorkerDelay :: SVar t m a -> IO ()
sendWorkerDelay :: SVar t m a -> IO ()
sendWorkerDelay SVar t m a
_sv =
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
{-# NOINLINE sendWorkerWait #-}
sendWorkerWait
:: MonadAsync m
=> (SVar t m a -> IO ())
-> (SVar t m a -> m Bool)
-> SVar t m a
-> m ()
sendWorkerWait :: (SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
delay SVar t m a -> m Bool
dispatch SVar t m a
sv = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO ()
delay SVar t m a
sv
([ChildEvent a]
_, Int
n) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Bool -> (Bool -> Bool) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv) ((Bool -> Bool) -> IO ()) -> (Bool -> Bool) -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> Bool -> Bool
forall a b. a -> b -> a
const Bool
True
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
storeLoadBarrier
Bool
canDoMore <- SVar t m a -> m Bool
dispatch SVar t m a
sv
if Bool
canDoMore
then (SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
delay SVar t m a -> m Bool
dispatch SVar t m a
sv
else do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> String -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar t m a
sv String
"sendWorkerWait: nothing to do"
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)
([ChildEvent a]
_, Int
len) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ (SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
delay SVar t m a -> m Bool
dispatch SVar t m a
sv
{-# INLINE readOutputQBasic #-}
readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic IORef ([ChildEvent a], Int)
q = IORef ([ChildEvent a], Int)
-> (([ChildEvent a], Int)
-> (([ChildEvent a], Int), ([ChildEvent a], Int)))
-> IO ([ChildEvent a], Int)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([ChildEvent a], Int)
q ((([ChildEvent a], Int)
-> (([ChildEvent a], Int), ([ChildEvent a], Int)))
-> IO ([ChildEvent a], Int))
-> (([ChildEvent a], Int)
-> (([ChildEvent a], Int), ([ChildEvent a], Int)))
-> IO ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ \([ChildEvent a], Int)
x -> (([],Int
0), ([ChildEvent a], Int)
x)
{-# INLINE readOutputQRaw #-}
readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv = do
([ChildEvent a]
list, Int
len) <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let ref :: IORef Int
ref = SVarStats -> IORef Int
maxOutQSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv
Int
oqLen <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
ref
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
oqLen) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int
ref Int
len
([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a]
list, Int
len)
readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQBounded :: SVar t m a -> m [ChildEvent a]
readOutputQBounded SVar t m a
sv = do
([ChildEvent a]
list, Int
len) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
then m [ChildEvent a]
blockingRead
else do
m ()
sendOneWorker
[ChildEvent a] -> m [ChildEvent a]
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
list
where
sendOneWorker :: m ()
sendOneWorker = do
Int
cnt <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Bool
done <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv)
{-# INLINE blockingRead #-}
blockingRead :: m [ChildEvent a]
blockingRead = do
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
sendWorkerDelay (Count -> SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
0) SVar t m a
sv
IO [ChildEvent a] -> m [ChildEvent a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` SVar t m a -> IO ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv)
readOutputQPaced :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQPaced :: SVar t m a -> m [ChildEvent a]
readOutputQPaced SVar t m a
sv = do
([ChildEvent a]
list, Int
len) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
then m [ChildEvent a]
blockingRead
else do
m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv
[ChildEvent a] -> m [ChildEvent a]
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
list
where
{-# INLINE blockingRead #-}
blockingRead :: m [ChildEvent a]
blockingRead = do
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
sendWorkerDelayPaced SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv
IO [ChildEvent a] -> m [ChildEvent a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` SVar t m a -> IO ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv)
postProcessBounded :: MonadAsync m => SVar t m a -> m Bool
postProcessBounded :: SVar t m a -> m Bool
postProcessBounded SVar t m a
sv = do
Bool
workersDone <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
if Bool
workersDone
then do
Bool
r <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) (Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv)
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
r
else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
postProcessPaced :: MonadAsync m => SVar t m a -> m Bool
postProcessPaced :: SVar t m a -> m Bool
postProcessPaced SVar t m a
sv = do
Bool
workersDone <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
if Bool
workersDone
then do
Bool
r <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv
Bool
noWorker <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
noWorker (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
r
else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st = do
let rateToLatency :: a -> p
rateToLatency a
r = if a
r a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
0 then p
forall a. Bounded a => a
maxBound else a -> p
forall a b. (RealFrac a, Integral b) => a -> b
round (a -> p) -> a -> p
forall a b. (a -> b) -> a -> b
$ a
1.0e9 a -> a -> a
forall a. Fractional a => a -> a -> a
/ a
r
case State t m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State t m a
st of
Just (Rate Double
low Double
goal Double
high Int
buf) ->
let l :: NanoSecond64
l = Double -> NanoSecond64
forall p a. (Bounded p, RealFrac a, Integral p) => a -> p
rateToLatency Double
goal
minl :: NanoSecond64
minl = Double -> NanoSecond64
forall p a. (Bounded p, RealFrac a, Integral p) => a -> p
rateToLatency Double
high
maxl :: NanoSecond64
maxl = Double -> NanoSecond64
forall p a. (Bounded p, RealFrac a, Integral p) => a -> p
rateToLatency Double
low
in NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
l (NanoSecond64 -> NanoSecond64 -> LatencyRange
LatencyRange NanoSecond64
minl NanoSecond64
maxl) Int
buf
Maybe Rate
Nothing -> Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe YieldRateInfo
forall a. Maybe a
Nothing
where
mkYieldRateInfo :: NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
latency LatencyRange
latRange Int
buf = do
IORef NanoSecond64
measured <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef NanoSecond64
0
IORef (Count, Count, NanoSecond64)
wcur <- (Count, Count, NanoSecond64)
-> IO (IORef (Count, Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
IORef (Count, Count, NanoSecond64)
wcol <- (Count, Count, NanoSecond64)
-> IO (IORef (Count, Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Count, AbsTime)
wlong <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0,AbsTime
now)
IORef Count
period <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
1
IORef Count
gainLoss <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef (Int64 -> Count
Count Int64
0)
Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe YieldRateInfo -> IO (Maybe YieldRateInfo))
-> Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> Maybe YieldRateInfo
forall a. a -> Maybe a
Just YieldRateInfo :: NanoSecond64
-> LatencyRange
-> Int
-> IORef Count
-> IORef (Count, AbsTime)
-> Maybe NanoSecond64
-> IORef Count
-> IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IORef NanoSecond64
-> YieldRateInfo
YieldRateInfo
{ svarLatencyTarget :: NanoSecond64
svarLatencyTarget = NanoSecond64
latency
, svarLatencyRange :: LatencyRange
svarLatencyRange = LatencyRange
latRange
, svarRateBuffer :: Int
svarRateBuffer = Int
buf
, svarGainedLostYields :: IORef Count
svarGainedLostYields = IORef Count
gainLoss
, workerBootstrapLatency :: Maybe NanoSecond64
workerBootstrapLatency = State t m a -> Maybe NanoSecond64
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
getStreamLatency State t m a
st
, workerPollingInterval :: IORef Count
workerPollingInterval = IORef Count
period
, workerMeasuredLatency :: IORef NanoSecond64
workerMeasuredLatency = IORef NanoSecond64
measured
, workerPendingLatency :: IORef (Count, Count, NanoSecond64)
workerPendingLatency = IORef (Count, Count, NanoSecond64)
wcur
, workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
workerCollectedLatency = IORef (Count, Count, NanoSecond64)
wcol
, svarAllTimeLatency :: IORef (Count, AbsTime)
svarAllTimeLatency = IORef (Count, AbsTime)
wlong
}
newSVarStats :: IO SVarStats
newSVarStats :: IO SVarStats
newSVarStats = do
IORef Int
disp <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxWrk <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxOq <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxHs <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxWq <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef (Count, NanoSecond64)
avgLat <- (Count, NanoSecond64) -> IO (IORef (Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0, Int64 -> NanoSecond64
NanoSecond64 Int64
0)
IORef NanoSecond64
maxLat <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
IORef NanoSecond64
minLat <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
IORef (Maybe AbsTime)
stpTime <- Maybe AbsTime -> IO (IORef (Maybe AbsTime))
forall a. a -> IO (IORef a)
newIORef Maybe AbsTime
forall a. Maybe a
Nothing
SVarStats -> IO SVarStats
forall (m :: * -> *) a. Monad m => a -> m a
return SVarStats :: IORef Int
-> IORef Int
-> IORef Int
-> IORef Int
-> IORef Int
-> IORef (Count, NanoSecond64)
-> IORef NanoSecond64
-> IORef NanoSecond64
-> IORef (Maybe AbsTime)
-> SVarStats
SVarStats
{ totalDispatches :: IORef Int
totalDispatches = IORef Int
disp
, maxWorkers :: IORef Int
maxWorkers = IORef Int
maxWrk
, maxOutQSize :: IORef Int
maxOutQSize = IORef Int
maxOq
, maxHeapSize :: IORef Int
maxHeapSize = IORef Int
maxHs
, maxWorkQSize :: IORef Int
maxWorkQSize = IORef Int
maxWq
, avgWorkerLatency :: IORef (Count, NanoSecond64)
avgWorkerLatency = IORef (Count, NanoSecond64)
avgLat
, minWorkerLatency :: IORef NanoSecond64
minWorkerLatency = IORef NanoSecond64
minLat
, maxWorkerLatency :: IORef NanoSecond64
maxWorkerLatency = IORef NanoSecond64
maxLat
, svarStopTime :: IORef (Maybe AbsTime)
svarStopTime = IORef (Maybe AbsTime)
stpTime
}
getAheadSVar :: MonadAsync m
=> State t m a
-> ( IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar :: State t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar State t m a
st IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
f RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH <- (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
forall a. a -> IO (IORef a)
newIORef (Heap (Entry Int (AheadHeapEntry t m a))
forall a. Heap a
H.empty, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
0)
MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
S.empty
IORef ([t m a], Int)
q <- ([t m a], Int) -> IO (IORef ([t m a], Int))
forall a. a -> IO (IORef a)
newIORef ([], -Int
1)
MVar ()
stopMVar <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
Maybe (IORef Count)
yl <- case State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- State t m a -> IO (Maybe YieldRateInfo)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let getSVar :: SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv SVar t m a -> m [ChildEvent a]
readOutput SVar t m a -> m Bool
postProc = SVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVarStyle
-> RunInIO m
-> SVarStopStyle
-> IORef ThreadId
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> m [ChildEvent a]
-> m Bool
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Limit
-> Limit
-> IORef Count
-> PushBufferPolicy
-> MVar ()
-> Maybe (IORef Count)
-> Maybe YieldRateInfo
-> ((RunInIO m, t m a) -> IO ())
-> IO Bool
-> IO Bool
-> IORef Bool
-> (Maybe WorkerInfo -> m ())
-> IORef (Set ThreadId)
-> IORef Int
-> (ThreadId -> m ())
-> MVar ()
-> SVarStats
-> Maybe (IORef ())
-> Bool
-> ThreadId
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IORef ([t m a], Int)
-> SVar t m a
SVar
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
forall a. (?callStack::CallStack) => a
undefined
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = IORef Count
forall a. (?callStack::CallStack) => a
undefined
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
forall a. (?callStack::CallStack) => a
undefined
, pushBufferMVar :: MVar ()
pushBufferMVar = MVar ()
forall a. (?callStack::CallStack) => a
undefined
, maxWorkerLimit :: Limit
maxWorkerLimit = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State t m a
st) (State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
forall a. (?callStack::CallStack) => a
undefined
, readOutputQ :: m [ChildEvent a]
readOutputQ = SVar t m a -> m [ChildEvent a]
readOutput SVar t m a
sv
, postProcess :: m Bool
postProcess = SVar t m a -> m Bool
postProc SVar t m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
f IORef ([t m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH State t m a
st{streamVar :: Maybe (SVar t m a)
streamVar = SVar t m a -> Maybe (SVar t m a)
forall a. a -> Maybe a
Just SVar t m a
sv} SVar t m a
sv
, enqueue :: (RunInIO m, t m a) -> IO ()
enqueue = SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead SVar t m a
sv IORef ([t m a], Int)
q
, isWorkDone :: IO Bool
isWorkDone = SVar t m a
-> IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO Bool
forall (t :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) a a b
a b.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead SVar t m a
sv IORef ([t m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH
, isQueueDone :: IO Bool
isQueueDone = SVar t m a -> IORef ([t m a], Int) -> IO Bool
forall (t :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) a a b.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef ([t m a], Int)
q
, needDoorBell :: IORef Bool
needDoorBell = IORef Bool
wfw
, svarStyle :: SVarStyle
svarStyle = SVarStyle
AheadVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
StopNone
, svarStopBy :: IORef ThreadId
svarStopBy = IORef ThreadId
forall a. (?callStack::CallStack) => a
undefined
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar t m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = MVar ()
stopMVar
, svarRef :: Maybe (IORef ())
svarRef = Maybe (IORef ())
forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = State t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State t m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([t m a], Int)
aheadWorkQueue = IORef ([t m a], Int)
q
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap = IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: SVar t m a
sv =
case State t m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State t m a
st of
Maybe Rate
Nothing -> SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv SVar t m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
Just Rate
_ -> SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv SVar t m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
in SVar t m a -> IO (SVar t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
sv
where
{-# INLINE isQueueDoneAhead #-}
isQueueDoneAhead :: SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef (t a, b)
q = do
Bool
queueDone <- IORef (t a, b) -> IO Bool
forall (t :: * -> *) a b. Foldable t => IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q
Bool
yieldsDone <-
case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Just IORef Count
yref -> do
Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
yref
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
yieldsDone Bool -> Bool -> Bool
|| Bool
queueDone
{-# INLINE isWorkDoneAhead #-}
isWorkDoneAhead :: SVar t m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead SVar t m a
sv IORef (t a, b)
q IORef (Heap a, b)
ref = do
Bool
heapDone <- do
(Heap a
hp, b
_) <- IORef (Heap a, b) -> IO (Heap a, b)
forall a. IORef a -> IO a
readIORef IORef (Heap a, b)
ref
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap a -> Int
forall a. Heap a -> Int
H.size Heap a
hp Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0)
Bool
queueDone <- SVar t m a -> IORef (t a, b) -> IO Bool
forall (t :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) a a b.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef (t a, b)
q
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
heapDone Bool -> Bool -> Bool
&& Bool
queueDone
checkEmpty :: IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q = do
(t a
xs, b
_) <- IORef (t a, b) -> IO (t a, b)
forall a. IORef a -> IO a
readIORef IORef (t a, b)
q
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ t a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null t a
xs
getParallelSVar :: MonadIO m
=> SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar :: SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar SVarStopStyle
ss State t m a
st RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
IORef ([ChildEvent a], Int)
outQRev <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
MVar ()
outQMvRev <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
S.empty
Maybe (IORef Count)
yl <- case State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- State t m a -> IO (Maybe YieldRateInfo)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st
let bufLim :: Count
bufLim =
case State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st of
Limit
Unlimited -> Count
forall a. (?callStack::CallStack) => a
undefined
Limited Word
x -> Word -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
x
IORef Count
remBuf <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
bufLim
MVar ()
pbMVar <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
IORef ThreadId
stopBy <-
case SVarStopStyle
ss of
SVarStopStyle
StopBy -> IO (IORef ThreadId) -> IO (IORef ThreadId)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ThreadId) -> IO (IORef ThreadId))
-> IO (IORef ThreadId) -> IO (IORef ThreadId)
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO (IORef ThreadId)
forall a. a -> IO (IORef a)
newIORef ThreadId
forall a. (?callStack::CallStack) => a
undefined
SVarStopStyle
_ -> IORef ThreadId -> IO (IORef ThreadId)
forall (m :: * -> *) a. Monad m => a -> m a
return IORef ThreadId
forall a. (?callStack::CallStack) => a
undefined
let sv :: SVar t m a
sv =
SVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVarStyle
-> RunInIO m
-> SVarStopStyle
-> IORef ThreadId
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> m [ChildEvent a]
-> m Bool
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Limit
-> Limit
-> IORef Count
-> PushBufferPolicy
-> MVar ()
-> Maybe (IORef Count)
-> Maybe YieldRateInfo
-> ((RunInIO m, t m a) -> IO ())
-> IO Bool
-> IO Bool
-> IORef Bool
-> (Maybe WorkerInfo -> m ())
-> IORef (Set ThreadId)
-> IORef Int
-> (ThreadId -> m ())
-> MVar ()
-> SVarStats
-> Maybe (IORef ())
-> Bool
-> ThreadId
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IORef ([t m a], Int)
-> SVar t m a
SVar { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
outQRev
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = IORef Count
remBuf
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
PushBufferBlock
, pushBufferMVar :: MVar ()
pushBufferMVar = MVar ()
pbMVar
, maxWorkerLimit :: Limit
maxWorkerLimit = Limit
Unlimited
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
outQMvRev
, readOutputQ :: m [ChildEvent a]
readOutputQ = SVar t m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) a.
MonadIO m =>
SVar t m a -> m [ChildEvent a]
readOutputQPar SVar t m a
sv
, postProcess :: m Bool
postProcess = SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = Maybe WorkerInfo -> m ()
forall a. (?callStack::CallStack) => a
undefined
, enqueue :: (RunInIO m, t m a) -> IO ()
enqueue = (RunInIO m, t m a) -> IO ()
forall a. (?callStack::CallStack) => a
undefined
, isWorkDone :: IO Bool
isWorkDone = IO Bool
forall a. (?callStack::CallStack) => a
undefined
, isQueueDone :: IO Bool
isQueueDone = IO Bool
forall a. (?callStack::CallStack) => a
undefined
, needDoorBell :: IORef Bool
needDoorBell = IORef Bool
forall a. (?callStack::CallStack) => a
undefined
, svarStyle :: SVarStyle
svarStyle = SVarStyle
ParallelVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
ss
, svarStopBy :: IORef ThreadId
svarStopBy = IORef ThreadId
stopBy
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = MVar ()
forall a. (?callStack::CallStack) => a
undefined
, svarRef :: Maybe (IORef ())
svarRef = Maybe (IORef ())
forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = State t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State t m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([t m a], Int)
aheadWorkQueue = IORef ([t m a], Int)
forall a. (?callStack::CallStack) => a
undefined
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap = IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. (?callStack::CallStack) => a
undefined
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
in SVar t m a -> IO (SVar t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
forall (t :: (* -> *) -> * -> *). SVar t m a
sv
where
readOutputQPar :: SVar t m a -> m [ChildEvent a]
readOutputQPar SVar t m a
sv = IO [ChildEvent a] -> m [ChildEvent a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [ChildEvent a] -> m [ChildEvent a])
-> IO [ChildEvent a] -> m [ChildEvent a]
forall a b. (a -> b) -> a -> b
$ do
SVar t m a -> String -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar t m a
sv String
"readOutputQPar: doorbell"
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)
case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just YieldRateInfo
yinfo -> IO (Count, AbsTime, NanoSecond64) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Count, AbsTime, NanoSecond64) -> IO ())
-> IO (Count, AbsTime, NanoSecond64) -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
False
[ChildEvent a]
r <- ([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` SVar t m a -> IO ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO (Maybe ())
forall a. MVar a -> IO (Maybe a)
tryTakeMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv)
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
resetBufferLimit SVar t m a
sv
IO ()
writeBarrier
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
[ChildEvent a] -> IO [ChildEvent a]
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
r
sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker :: SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar t m a
sv t m a
m = do
RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> (RunInIO m, t m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar t m a
sv (RunInIO m
runIn, t m a
m)
case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
Just YieldRateInfo
yinfo ->
if YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
forall a. Bounded a => a
maxBound
then IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
forall a. Bounded a => a
maxBound
else Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
1 SVar t m a
sv
SVar t m a -> m (SVar t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
sv
{-# INLINABLE newAheadVar #-}
newAheadVar :: MonadAsync m
=> State t m a
-> t m a
-> ( IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar t m a)
newAheadVar :: State t m a
-> t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar t m a)
newAheadVar State t m a
st t m a
m IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
wloop = do
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
SVar t m a
sv <- IO (SVar t m a) -> m (SVar t m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SVar t m a) -> m (SVar t m a))
-> IO (SVar t m a) -> m (SVar t m a)
forall a b. (a -> b) -> a -> b
$ State t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> RunInIO m
-> IO (SVar t m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar State t m a
st IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
wloop RunInIO m
mrun
SVar t m a -> t m a -> m (SVar t m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar t m a
sv t m a
m
{-# INLINABLE newParallelVar #-}
newParallelVar :: MonadAsync m
=> SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar :: SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
ss State t m a
st = do
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
IO (SVar t m a) -> m (SVar t m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SVar t m a) -> m (SVar t m a))
-> IO (SVar t m a) -> m (SVar t m a)
forall a b. (a -> b) -> a -> b
$ SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar SVarStopStyle
ss State t m a
st RunInIO m
mrun
toStreamVar :: MonadAsync m => SVar t m a -> t m a -> m ()
toStreamVar :: SVar t m a -> t m a -> m ()
toStreamVar SVar t m a
sv t m a
m = do
RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> (RunInIO m, t m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar t m a
sv (RunInIO m
runIn, t m a
m)
Bool
done <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
Just YieldRateInfo
_ -> Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
1 SVar t m a
sv