module Streamly.Internal.Data.Channel.Worker
(
Work (..)
, estimateWorkers
, isBeyondMaxRate
, workerRateControl
, sendWithDoorBell
, sendYield
, sendStop
, handleChildException
)
where
import Control.Concurrent (myThreadId)
import Control.Concurrent.MVar (MVar, tryPutMVar)
import Control.Exception (SomeException(..), assert)
import Control.Monad (when, void)
import Data.IORef (IORef, readIORef, writeIORef)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
(AbsTime, NanoSecond64(..), diffAbsTime64, fromRelTime64)
import Streamly.Internal.Data.Channel.Types
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount WorkerInfo
winfo = do
Count
cnt <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo)
let cnt1 :: Count
cnt1 = Count
cnt Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
1
IORef Count -> Count -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo) Count
cnt1
Count -> IO Count
forall (m :: * -> *) a. Monad m => a -> m a
return Count
cnt1
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
winfo =
let ymax :: Count
ymax = WorkerInfo -> Count
workerYieldMax WorkerInfo
winfo
in Count
ymax Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& Count
cnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
ymax
{-# INLINE sendWithDoorBell #-}
sendWithDoorBell ::
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell :: forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell ChildEvent a
msg = do
Int
oldlen <- IORef ([ChildEvent a], Int)
-> (([ChildEvent a], Int) -> (([ChildEvent a], Int), Int))
-> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([ChildEvent a], Int)
q ((([ChildEvent a], Int) -> (([ChildEvent a], Int), Int)) -> IO Int)
-> (([ChildEvent a], Int) -> (([ChildEvent a], Int), Int))
-> IO Int
forall a b. (a -> b) -> a -> b
$ \([ChildEvent a]
es, Int
n) ->
((ChildEvent a
msg ChildEvent a -> [ChildEvent a] -> [ChildEvent a]
forall a. a -> [a] -> [a]
: [ChildEvent a]
es, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Int
n)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
oldlen Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IO ()
writeBarrier
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
bell ()
Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
oldlen
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo = do
(Count
cnt0, AbsTime
t0) <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo)
Count
cnt1 <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo)
let cnt :: Count
cnt = Count
cnt1 Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
cnt0
if Count
cnt Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then do
AbsTime
t1 <- Clock -> IO AbsTime
getTime Clock
Monotonic
let period :: NanoSecond64
period = RelTime64 -> NanoSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (RelTime64 -> NanoSecond64) -> RelTime64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
t1 AbsTime
t0
IORef (Count, AbsTime) -> (Count, AbsTime) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo) (Count
cnt1, AbsTime
t1)
Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64)))
-> Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64))
forall a b. (a -> b) -> a -> b
$ (Count, NanoSecond64) -> Maybe (Count, NanoSecond64)
forall a. a -> Maybe a
Just (Count
cnt, NanoSecond64
period)
else Maybe (Count, NanoSecond64) -> IO (Maybe (Count, NanoSecond64))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Count, NanoSecond64)
forall a. Maybe a
Nothing
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
yinfo WorkerInfo
winfo = do
Maybe (Count, NanoSecond64)
r <- WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo
case Maybe (Count, NanoSecond64)
r of
Just (Count
cnt, NanoSecond64
period) -> do
let ref :: IORef (Count, Count, NanoSecond64)
ref = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
(Count
cnt1, NanoSecond64
t1) = if NanoSecond64
period NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then (Count
cnt, NanoSecond64
period) else (Count
0, NanoSecond64
0)
IORef (Count, Count, NanoSecond64)
-> ((Count, Count, NanoSecond64) -> (Count, Count, NanoSecond64))
-> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef (Count, Count, NanoSecond64)
ref (((Count, Count, NanoSecond64) -> (Count, Count, NanoSecond64))
-> IO ())
-> ((Count, Count, NanoSecond64) -> (Count, Count, NanoSecond64))
-> IO ()
forall a b. (a -> b) -> a -> b
$
\(Count
fc, Count
n, NanoSecond64
t) -> (Count
fc Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
cnt, Count
n Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
cnt1, NanoSecond64
t NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
t1)
Maybe (Count, NanoSecond64)
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
data Work
= BlockWait NanoSecond64
| PartialWorker Count
| ManyWorkers Int Count
deriving Int -> Work -> ShowS
[Work] -> ShowS
Work -> String
(Int -> Work -> ShowS)
-> (Work -> String) -> ([Work] -> ShowS) -> Show Work
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Work] -> ShowS
$cshowList :: [Work] -> ShowS
show :: Work -> String
$cshow :: Work -> String
showsPrec :: Int -> Work -> ShowS
$cshowsPrec :: Int -> Work -> ShowS
Show
rateRecoveryTime :: NanoSecond64
rateRecoveryTime :: NanoSecond64
rateRecoveryTime = NanoSecond64
1000000
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
yinfo = do
let cur :: IORef (Count, Count, NanoSecond64)
cur = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
col :: IORef (Count, Count, NanoSecond64)
col = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency YieldRateInfo
yinfo
longTerm :: IORef (Count, AbsTime)
longTerm = YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
measured :: IORef NanoSecond64
measured = YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency YieldRateInfo
yinfo
(Count
curTotalCount, Count
curCount, NanoSecond64
curTime) <- IORef (Count, Count, NanoSecond64)
-> IO (Count, Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
cur
(Count
colTotalCount, Count
colCount, NanoSecond64
colTime) <- IORef (Count, Count, NanoSecond64)
-> IO (Count, Count, NanoSecond64)
forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
col
(Count
lcount, AbsTime
ltime) <- IORef (Count, AbsTime) -> IO (Count, AbsTime)
forall a. IORef a -> IO a
readIORef IORef (Count, AbsTime)
longTerm
NanoSecond64
prevLat <- IORef NanoSecond64 -> IO NanoSecond64
forall a. IORef a -> IO a
readIORef IORef NanoSecond64
measured
let latCount :: Count
latCount = Count
colCount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
curCount
latTime :: NanoSecond64
latTime = NanoSecond64
colTime NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
curTime
totalCount :: Count
totalCount = Count
colTotalCount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
curTotalCount
newLat :: NanoSecond64
newLat =
if Count
latCount Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0 Bool -> Bool -> Bool
&& NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
then let lat :: NanoSecond64
lat = NanoSecond64
latTime NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
latCount
in (NanoSecond64
lat NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
prevLat) NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
2
else NanoSecond64
prevLat
(Count, AbsTime, NanoSecond64) -> IO (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
lcount Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
totalCount, AbsTime
ltime, NanoSecond64
newLat)
estimateWorkers
:: Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers :: Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
gainLossYields
NanoSecond64
svarElapsed NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range =
let
targetYields :: NanoSecond64
targetYields = (NanoSecond64
svarElapsed NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
+ NanoSecond64
targetLat NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
- NanoSecond64
1) NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
targetLat
effectiveYields :: Count
effectiveYields = Count
svarYields Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
gainLossYields
deltaYields :: Count
deltaYields = NanoSecond64 -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetYields Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
effectiveYields
in if Count
deltaYields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then
let deltaYieldsFreq :: Double
deltaYieldsFreq :: Double
deltaYieldsFreq =
Count -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/
NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
rateRecoveryTime
yieldsFreq :: Double
yieldsFreq = Double
1.0 Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ NanoSecond64 -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetLat
totalYieldsFreq :: Double
totalYieldsFreq = Double
yieldsFreq Double -> Double -> Double
forall a. Num a => a -> a -> a
+ Double
deltaYieldsFreq
requiredLat :: NanoSecond64
requiredLat = Int64 -> NanoSecond64
NanoSecond64 (Int64 -> NanoSecond64) -> Int64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ Double -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int64) -> Double -> Int64
forall a b. (a -> b) -> a -> b
$ Double
1.0 Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
totalYieldsFreq
adjustedLat :: NanoSecond64
adjustedLat = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min (NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
max NanoSecond64
requiredLat (LatencyRange -> NanoSecond64
minLatency LatencyRange
range))
(LatencyRange -> NanoSecond64
maxLatency LatencyRange
range)
in Bool -> Work -> Work
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
adjustedLat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0) (Work -> Work) -> Work -> Work
forall a b. (a -> b) -> a -> b
$
if NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
<= NanoSecond64
adjustedLat
then Count -> Work
PartialWorker Count
deltaYields
else let workers :: NanoSecond64
workers = NanoSecond64 -> NanoSecond64
forall {a}. (Ord a, Num a) => a -> a
withLimit (NanoSecond64 -> NanoSecond64) -> NanoSecond64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
adjustedLat
limited :: NanoSecond64
limited = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min NanoSecond64
workers (Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields)
in Int -> Count -> Work
ManyWorkers (NanoSecond64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
limited) Count
deltaYields
else
let expectedDuration :: NanoSecond64
expectedDuration = Count -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
effectiveYields NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
* NanoSecond64
targetLat
sleepTime :: NanoSecond64
sleepTime = NanoSecond64
expectedDuration NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
- NanoSecond64
svarElapsed
maxSleepTime :: NanoSecond64
maxSleepTime = LatencyRange -> NanoSecond64
maxLatency LatencyRange
range NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Num a => a -> a -> a
- NanoSecond64
wLatency
s :: NanoSecond64
s = NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Ord a => a -> a -> a
min NanoSecond64
sleepTime NanoSecond64
maxSleepTime
in Bool -> Work -> Work
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
sleepTime NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) (Work -> Work) -> Work -> Work
forall a b. (a -> b) -> a -> b
$
if NanoSecond64
s NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then NanoSecond64 -> Work
BlockWait NanoSecond64
s else Int -> Count -> Work
ManyWorkers Int
1 (Int64 -> Count
Count Int64
0)
where
withLimit :: a -> a
withLimit a
n =
case Limit
workerLimit of
Limit
Unlimited -> a
n
Limited Word
x -> a -> a -> a
forall a. Ord a => a -> a -> a
min a
n (Word -> a
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
x)
isBeyondMaxRate :: Limit -> IORef Int -> YieldRateInfo -> IO Bool
isBeyondMaxRate :: Limit -> IORef Int -> YieldRateInfo -> IO Bool
isBeyondMaxRate Limit
workerLimit IORef Int
workerCount YieldRateInfo
rateInfo = do
(Count
count, AbsTime
tstamp, NanoSecond64
wLatency) <- YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
rateInfo
AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
let duration :: NanoSecond64
duration = RelTime64 -> NanoSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (RelTime64 -> NanoSecond64) -> RelTime64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
tstamp
let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
rateInfo
Count
gainLoss <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
rateInfo)
let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
count Count
gainLoss NanoSecond64
duration
NanoSecond64
wLatency NanoSecond64
targetLat (YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
rateInfo)
Int
cnt <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
workerCount
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ case Work
work of
PartialWorker Count
_yields -> Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1
ManyWorkers Int
n Count
_ -> Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n
BlockWait NanoSecond64
_ -> Bool
True
{-# NOINLINE checkRatePeriodic #-}
checkRatePeriodic ::
Limit
-> IORef Int
-> YieldRateInfo
-> WorkerInfo
-> Count
-> IO Bool
checkRatePeriodic :: Limit
-> IORef Int -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic Limit
workerLimit IORef Int
workerCount YieldRateInfo
rateInfo WorkerInfo
workerInfo Count
ycnt = do
Count
i <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
rateInfo)
if Count
i Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& (Count
ycnt Count -> Count -> Count
forall a. Integral a => a -> a -> a
`mod` Count
i) Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
== Count
0
then do
YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
rateInfo WorkerInfo
workerInfo
Limit -> IORef Int -> YieldRateInfo -> IO Bool
isBeyondMaxRate Limit
workerLimit IORef Int
workerCount YieldRateInfo
rateInfo
else Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
{-# NOINLINE workerRateControl #-}
workerRateControl :: Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl :: Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl Limit
workerLimit IORef Int
workerCount YieldRateInfo
rateInfo WorkerInfo
workerInfo = do
Count
cnt <- WorkerInfo -> IO Count
updateYieldCount WorkerInfo
workerInfo
Bool
beyondMaxRate <- Limit
-> IORef Int -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic Limit
workerLimit IORef Int
workerCount YieldRateInfo
rateInfo WorkerInfo
workerInfo Count
cnt
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not (Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
workerInfo Bool -> Bool -> Bool
|| Bool
beyondMaxRate)
{-# INLINE sendYield #-}
sendYield ::
Limit
-> Limit
-> IORef Int
-> Maybe WorkerInfo
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> ChildEvent a
-> IO Bool
sendYield :: forall a.
Limit
-> Limit
-> IORef Int
-> Maybe WorkerInfo
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> ChildEvent a
-> IO Bool
sendYield Limit
bufferLimit Limit
workerLimit IORef Int
workerCount Maybe WorkerInfo
workerInfo Maybe YieldRateInfo
rateInfo IORef ([ChildEvent a], Int)
q MVar ()
bell ChildEvent a
msg =
do
Int
oldlen <- IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell ChildEvent a
msg
Bool
bufferSpaceOk <-
case Limit
bufferLimit of
Limit
Unlimited -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Limited Word
lim -> do
Int
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
workerCount
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ (Int
oldlen Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< (Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
active)
Bool
rateLimitOk <-
case Maybe WorkerInfo
workerInfo of
Just WorkerInfo
winfo ->
case Maybe YieldRateInfo
rateInfo of
Maybe YieldRateInfo
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just YieldRateInfo
yinfo ->
Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl Limit
workerLimit IORef Int
workerCount YieldRateInfo
yinfo WorkerInfo
winfo
Maybe WorkerInfo
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
bufferSpaceOk Bool -> Bool -> Bool
&& Bool
rateLimitOk
{-# INLINE workerStopUpdate #-}
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
info = do
Count
i <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
info)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
i Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
info WorkerInfo
winfo
{-# INLINABLE sendStop #-}
sendStop ::
IORef Int
-> Maybe WorkerInfo
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> IO ()
sendStop :: forall a.
IORef Int
-> Maybe WorkerInfo
-> Maybe YieldRateInfo
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> IO ()
sendStop IORef Int
workerCount Maybe WorkerInfo
workerInfo Maybe YieldRateInfo
rateInfo IORef ([ChildEvent a], Int)
q MVar ()
bell = do
IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef Int
workerCount ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
case (Maybe WorkerInfo
workerInfo, Maybe YieldRateInfo
rateInfo) of
(Just WorkerInfo
winfo, Just YieldRateInfo
rinfo) ->
WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
rinfo
(Maybe WorkerInfo, Maybe YieldRateInfo)
_ ->
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ThreadId
tid ->
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell (ThreadId -> Maybe SomeException -> ChildEvent a
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid Maybe SomeException
forall a. Maybe a
Nothing)
{-# NOINLINE handleChildException #-}
handleChildException ::
IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
handleChildException :: forall a.
IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
handleChildException IORef ([ChildEvent a], Int)
q MVar ()
bell SomeException
e = do
ThreadId
tid <- IO ThreadId
myThreadId
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell (ThreadId -> Maybe SomeException -> ChildEvent a
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e))