module Streamly.Internal.Data.SVar.Worker
(
decrementYieldLimit
, incrementYieldLimit
, decrementBufferLimit
, incrementBufferLimit
, resetBufferLimit
, Work (..)
, isBeyondMaxRate
, estimateWorkers
, updateYieldCount
, minThreadDelay
, workerRateControl
, workerUpdateLatency
, send
, ringDoorBell
, sendYield
, sendToProducer
, sendStop
, sendStopToProducer
, handleChildException
, handleFoldException
)
where
#include "inline.hs"
import Control.Concurrent (myThreadId, takeMVar)
import Control.Concurrent.MVar (MVar, tryPutMVar)
import Control.Exception (SomeException(..), assert)
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (IORef, readIORef, writeIORef)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier,
storeLoadBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
(AbsTime, NanoSecond64(..), diffAbsTime64, fromRelTime64)
import Streamly.Internal.Data.SVar.Type
{-# INLINE decrementYieldLimit #-}
decrementYieldLimit :: SVar t m a -> IO Bool
decrementYieldLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar t m a
sv =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just IORef Count
ref -> do
Count
r <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref forall a b. (a -> b) -> a -> b
$ \Count
x -> (Count
x forall a. Num a => a -> a -> a
- Count
1, Count
x)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Count
r forall a. Ord a => a -> a -> Bool
>= Count
1
{-# INLINE incrementYieldLimit #-}
incrementYieldLimit :: SVar t m a -> IO ()
incrementYieldLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar t m a
sv =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just IORef Count
ref -> forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef Count
ref (forall a. Num a => a -> a -> a
+ Count
1)
{-# INLINE decrementBufferLimit #-}
decrementBufferLimit :: SVar t m a -> IO ()
decrementBufferLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar t m a
sv =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Limited Word
_ -> do
let ref :: IORef Count
ref = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv
Count
old <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref forall a b. (a -> b) -> a -> b
$ \Count
x ->
(if Count
x forall a. Ord a => a -> a -> Bool
>= Count
1 then Count
x forall a. Num a => a -> a -> a
- Count
1 else Count
x, Count
x)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
old forall a. Ord a => a -> a -> Bool
<= Count
0) forall a b. (a -> b) -> a -> b
$
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> PushBufferPolicy
pushBufferPolicy SVar t m a
sv of
PushBufferPolicy
PushBufferBlock -> IO ()
blockAndRetry
PushBufferPolicy
PushBufferDropNew -> do
Bool
block <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv) forall a b. (a -> b) -> a -> b
$
\([ChildEvent a]
es, Int
n) ->
case [ChildEvent a]
es of
[] -> (([],Int
n), Bool
True)
ChildEvent a
_ : [ChildEvent a]
xs -> (([ChildEvent a]
xs, Int
n forall a. Num a => a -> a -> a
- Int
1), Bool
False)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
block IO ()
blockAndRetry
PushBufferPolicy
PushBufferDropOld -> forall a. (?callStack::CallStack) => a
undefined
where
blockAndRetry :: IO ()
blockAndRetry = do
let ref :: IORef Count
ref = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv)
Count
old <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref forall a b. (a -> b) -> a -> b
$ \Count
x ->
(if Count
x forall a. Ord a => a -> a -> Bool
>= Count
1 then Count
x forall a. Num a => a -> a -> a
- Count
1 else Count
x, Count
x)
if Count
old forall a. Ord a => a -> a -> Bool
>= Count
1
then forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
else IO ()
blockAndRetry
{-# INLINE incrementBufferLimit #-}
incrementBufferLimit :: SVar t m a -> IO ()
incrementBufferLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar t m a
sv =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Limited Word
_ -> do
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv) (forall a. Num a => a -> a -> a
+ Count
1)
IO ()
writeBarrier
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
{-# INLINE resetBufferLimit #-}
resetBufferLimit :: SVar t m a -> IO ()
resetBufferLimit :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
resetBufferLimit SVar t m a
sv =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv of
Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Limited Word
n -> forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Count
pushBufferSpace SVar t m a
sv)
(forall a b. a -> b -> a
const (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
n))
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount WorkerInfo
winfo = do
Count
cnt <- forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo)
let cnt1 :: Count
cnt1 = Count
cnt forall a. Num a => a -> a -> a
+ Count
1
forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo) Count
cnt1
forall (m :: * -> *) a. Monad m => a -> m a
return Count
cnt1
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
winfo =
let ymax :: Count
ymax = WorkerInfo -> Count
workerYieldMax WorkerInfo
winfo
in Count
ymax forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& Count
cnt forall a. Ord a => a -> a -> Bool
>= Count
ymax
{-# INLINE ringDoorBell #-}
ringDoorBell :: SVar t m a -> IO ()
ringDoorBell :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv = do
IO ()
storeLoadBarrier
Bool
w <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
w forall a b. (a -> b) -> a -> b
$ do
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Bool
needDoorBell SVar t m a
sv) (forall a b. a -> b -> a
const Bool
False)
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv) ()
{-# INLINE sendWithDoorBell #-}
sendWithDoorBell ::
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell :: forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell IORef ([ChildEvent a], Int)
q MVar ()
bell ChildEvent a
msg = do
Int
oldlen <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([ChildEvent a], Int)
q forall a b. (a -> b) -> a -> b
$ \([ChildEvent a]
es, Int
n) ->
((ChildEvent a
msg forall a. a -> [a] -> [a]
: [ChildEvent a]
es, Int
n forall a. Num a => a -> a -> a
+ Int
1), Int
n)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
oldlen forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$ do
IO ()
writeBarrier
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
bell ()
forall (m :: * -> *) a. Monad m => a -> m a
return Int
oldlen
send :: SVar t m a -> ChildEvent a -> IO Int
send :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv = forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)
sendToProducer :: SVar t m a -> ChildEvent a -> IO Int
sendToProducer :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv ChildEvent a
msg = do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer SVar t m a
sv)
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar t m a
sv) ChildEvent a
msg
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo = do
(Count
cnt0, AbsTime
t0) <- forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo)
Count
cnt1 <- forall a. IORef a -> IO a
readIORef (WorkerInfo -> IORef Count
workerYieldCount WorkerInfo
winfo)
let cnt :: Count
cnt = Count
cnt1 forall a. Num a => a -> a -> a
- Count
cnt0
if Count
cnt forall a. Ord a => a -> a -> Bool
> Count
0
then do
AbsTime
t1 <- Clock -> IO AbsTime
getTime Clock
Monotonic
let period :: NanoSecond64
period = forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
t1 AbsTime
t0
forall a. IORef a -> a -> IO ()
writeIORef (WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart WorkerInfo
winfo) (Count
cnt1, AbsTime
t1)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just (Count
cnt, NanoSecond64
period)
else forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
yinfo WorkerInfo
winfo = do
Maybe (Count, NanoSecond64)
r <- WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency WorkerInfo
winfo
case Maybe (Count, NanoSecond64)
r of
Just (Count
cnt, NanoSecond64
period) -> do
let ref :: IORef (Count, Count, NanoSecond64)
ref = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
(Count
cnt1, NanoSecond64
t1) = if NanoSecond64
period forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then (Count
cnt, NanoSecond64
period) else (Count
0, NanoSecond64
0)
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef (Count, Count, NanoSecond64)
ref forall a b. (a -> b) -> a -> b
$
\(Count
fc, Count
n, NanoSecond64
t) -> (Count
fc forall a. Num a => a -> a -> a
+ Count
cnt, Count
n forall a. Num a => a -> a -> a
+ Count
cnt1, NanoSecond64
t forall a. Num a => a -> a -> a
+ NanoSecond64
t1)
Maybe (Count, NanoSecond64)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
data Work
= BlockWait NanoSecond64
| PartialWorker Count
| ManyWorkers Int Count
deriving Int -> Work -> ShowS
[Work] -> ShowS
Work -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Work] -> ShowS
$cshowList :: [Work] -> ShowS
show :: Work -> String
$cshow :: Work -> String
showsPrec :: Int -> Work -> ShowS
$cshowsPrec :: Int -> Work -> ShowS
Show
minThreadDelay :: NanoSecond64
minThreadDelay :: NanoSecond64
minThreadDelay = NanoSecond64
1000000
rateRecoveryTime :: NanoSecond64
rateRecoveryTime :: NanoSecond64
rateRecoveryTime = NanoSecond64
1000000
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
yinfo = do
let cur :: IORef (Count, Count, NanoSecond64)
cur = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency YieldRateInfo
yinfo
col :: IORef (Count, Count, NanoSecond64)
col = YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency YieldRateInfo
yinfo
longTerm :: IORef (Count, AbsTime)
longTerm = YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency YieldRateInfo
yinfo
measured :: IORef NanoSecond64
measured = YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency YieldRateInfo
yinfo
(Count
curTotalCount, Count
curCount, NanoSecond64
curTime) <- forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
cur
(Count
colTotalCount, Count
colCount, NanoSecond64
colTime) <- forall a. IORef a -> IO a
readIORef IORef (Count, Count, NanoSecond64)
col
(Count
lcount, AbsTime
ltime) <- forall a. IORef a -> IO a
readIORef IORef (Count, AbsTime)
longTerm
NanoSecond64
prevLat <- forall a. IORef a -> IO a
readIORef IORef NanoSecond64
measured
let latCount :: Count
latCount = Count
colCount forall a. Num a => a -> a -> a
+ Count
curCount
latTime :: NanoSecond64
latTime = NanoSecond64
colTime forall a. Num a => a -> a -> a
+ NanoSecond64
curTime
totalCount :: Count
totalCount = Count
colTotalCount forall a. Num a => a -> a -> a
+ Count
curTotalCount
newLat :: NanoSecond64
newLat =
if Count
latCount forall a. Ord a => a -> a -> Bool
> Count
0 Bool -> Bool -> Bool
&& NanoSecond64
latTime forall a. Ord a => a -> a -> Bool
> NanoSecond64
0
then let lat :: NanoSecond64
lat = NanoSecond64
latTime forall a. Integral a => a -> a -> a
`div` forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
latCount
in (NanoSecond64
lat forall a. Num a => a -> a -> a
+ NanoSecond64
prevLat) forall a. Integral a => a -> a -> a
`div` NanoSecond64
2
else NanoSecond64
prevLat
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
lcount forall a. Num a => a -> a -> a
+ Count
totalCount, AbsTime
ltime, NanoSecond64
newLat)
estimateWorkers
:: Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers :: Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
gainLossYields
NanoSecond64
svarElapsed NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range =
let
targetYields :: NanoSecond64
targetYields = (NanoSecond64
svarElapsed forall a. Num a => a -> a -> a
+ NanoSecond64
wLatency forall a. Num a => a -> a -> a
+ NanoSecond64
targetLat forall a. Num a => a -> a -> a
- NanoSecond64
1) forall a. Integral a => a -> a -> a
`div` NanoSecond64
targetLat
effectiveYields :: Count
effectiveYields = Count
svarYields forall a. Num a => a -> a -> a
+ Count
gainLossYields
deltaYields :: Count
deltaYields = forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetYields forall a. Num a => a -> a -> a
- Count
effectiveYields
in if Count
deltaYields forall a. Ord a => a -> a -> Bool
> Count
0
then
let deltaYieldsFreq :: Double
deltaYieldsFreq :: Double
deltaYieldsFreq =
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields forall a. Fractional a => a -> a -> a
/
forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
rateRecoveryTime
yieldsFreq :: Double
yieldsFreq = Double
1.0 forall a. Fractional a => a -> a -> a
/ forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
targetLat
totalYieldsFreq :: Double
totalYieldsFreq = Double
yieldsFreq forall a. Num a => a -> a -> a
+ Double
deltaYieldsFreq
requiredLat :: NanoSecond64
requiredLat = Int64 -> NanoSecond64
NanoSecond64 forall a b. (a -> b) -> a -> b
$ forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
1.0 forall a. Fractional a => a -> a -> a
/ Double
totalYieldsFreq
adjustedLat :: NanoSecond64
adjustedLat = forall a. Ord a => a -> a -> a
min (forall a. Ord a => a -> a -> a
max NanoSecond64
requiredLat (LatencyRange -> NanoSecond64
minLatency LatencyRange
range))
(LatencyRange -> NanoSecond64
maxLatency LatencyRange
range)
in forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
adjustedLat forall a. Ord a => a -> a -> Bool
> NanoSecond64
0) forall a b. (a -> b) -> a -> b
$
if NanoSecond64
wLatency forall a. Ord a => a -> a -> Bool
<= NanoSecond64
adjustedLat
then Count -> Work
PartialWorker Count
deltaYields
else let workers :: NanoSecond64
workers = forall {p}. (Ord p, Num p) => p -> p
withLimit forall a b. (a -> b) -> a -> b
$ NanoSecond64
wLatency forall a. Integral a => a -> a -> a
`div` NanoSecond64
adjustedLat
limited :: NanoSecond64
limited = forall a. Ord a => a -> a -> a
min NanoSecond64
workers (forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
deltaYields)
in Int -> Count -> Work
ManyWorkers (forall a b. (Integral a, Num b) => a -> b
fromIntegral NanoSecond64
limited) Count
deltaYields
else
let expectedDuration :: NanoSecond64
expectedDuration = forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
effectiveYields forall a. Num a => a -> a -> a
* NanoSecond64
targetLat
sleepTime :: NanoSecond64
sleepTime = NanoSecond64
expectedDuration forall a. Num a => a -> a -> a
- NanoSecond64
svarElapsed
maxSleepTime :: NanoSecond64
maxSleepTime = LatencyRange -> NanoSecond64
maxLatency LatencyRange
range forall a. Num a => a -> a -> a
- NanoSecond64
wLatency
s :: NanoSecond64
s = forall a. Ord a => a -> a -> a
min NanoSecond64
sleepTime NanoSecond64
maxSleepTime
in forall a. (?callStack::CallStack) => Bool -> a -> a
assert (NanoSecond64
sleepTime forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) forall a b. (a -> b) -> a -> b
$
if NanoSecond64
s forall a. Ord a => a -> a -> Bool
> NanoSecond64
0 then NanoSecond64 -> Work
BlockWait NanoSecond64
s else Int -> Count -> Work
ManyWorkers Int
1 (Int64 -> Count
Count Int64
0)
where
withLimit :: p -> p
withLimit p
n =
case Limit
workerLimit of
Limit
Unlimited -> p
n
Limited Word
x -> forall a. Ord a => a -> a -> a
min p
n (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
x)
isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar t m a
sv YieldRateInfo
yinfo = do
(Count
count, AbsTime
tstamp, NanoSecond64
wLatency) <- YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency YieldRateInfo
yinfo
AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
let duration :: NanoSecond64
duration = forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
tstamp
let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo
Count
gainLoss <- forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxWorkerLimit SVar t m a
sv) Count
count Count
gainLoss NanoSecond64
duration
NanoSecond64
wLatency NanoSecond64
targetLat (YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
yinfo)
Int
cnt <- forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Work
work of
PartialWorker Count
_yields -> Int
cnt forall a. Ord a => a -> a -> Bool
> Int
1
ManyWorkers Int
n Count
_ -> Int
cnt forall a. Ord a => a -> a -> Bool
> Int
n
BlockWait NanoSecond64
_ -> Bool
True
{-# NOINLINE checkRatePeriodic #-}
checkRatePeriodic :: SVar t m a
-> YieldRateInfo
-> WorkerInfo
-> Count
-> IO Bool
checkRatePeriodic :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo Count
ycnt = do
Count
i <- forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo)
if Count
i forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& (Count
ycnt forall a. Integral a => a -> a -> a
`mod` Count
i) forall a. Eq a => a -> a -> Bool
== Count
0
then do
YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
yinfo WorkerInfo
winfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar t m a
sv YieldRateInfo
yinfo
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
{-# NOINLINE workerRateControl #-}
workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo = do
Count
cnt <- WorkerInfo -> IO Count
updateYieldCount WorkerInfo
winfo
Bool
beyondMaxRate <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> Count -> IO Bool
checkRatePeriodic SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo Count
cnt
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not (Count -> WorkerInfo -> Bool
isBeyondMaxYield Count
cnt WorkerInfo
winfo Bool -> Bool -> Bool
|| Bool
beyondMaxRate)
{-# INLINE sendYield #-}
sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar t m a
sv Maybe WorkerInfo
mwinfo ChildEvent a
msg = do
Int
oldlen <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv ChildEvent a
msg
let limit :: Limit
limit = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar t m a
sv
Bool
bufferSpaceOk <- case Limit
limit of
Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Limited Word
lim -> do
Int
active <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (Int
oldlen forall a. Num a => a -> a -> a
+ Int
1) forall a. Ord a => a -> a -> Bool
< (forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim forall a. Num a => a -> a -> a
- Int
active)
Bool
rateLimitOk <-
case Maybe WorkerInfo
mwinfo of
Just WorkerInfo
winfo ->
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just YieldRateInfo
yinfo -> forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar t m a
sv YieldRateInfo
yinfo WorkerInfo
winfo
Maybe WorkerInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
bufferSpaceOk Bool -> Bool -> Bool
&& Bool
rateLimitOk
{-# INLINE workerStopUpdate #-}
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
info = do
Count
i <- forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
info)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
i forall a. Eq a => a -> a -> Bool
/= Count
0) forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency YieldRateInfo
info WorkerInfo
winfo
{-# INLINABLE sendStop #-}
sendStop :: SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
sv Maybe WorkerInfo
mwinfo = do
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n forall a. Num a => a -> a -> a
- Int
1
case (Maybe WorkerInfo
mwinfo, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv) of
(Just WorkerInfo
winfo, Just YieldRateInfo
info) ->
WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate WorkerInfo
winfo YieldRateInfo
info
(Maybe WorkerInfo, Maybe YieldRateInfo)
_ ->
forall (m :: * -> *) a. Monad m => a -> m a
return ()
IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ThreadId
tid -> forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv (forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid forall a. Maybe a
Nothing)
sendStopToProducer :: MonadIO m => SVar t m a -> m ()
sendStopToProducer :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
sendStopToProducer SVar t m a
sv = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
ThreadId
tid <- IO ThreadId
myThreadId
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv (forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid forall a. Maybe a
Nothing)
{-# NOINLINE handleFoldException #-}
handleFoldException :: SVar t m a -> SomeException -> IO ()
handleFoldException :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleFoldException SVar t m a
sv SomeException
e = do
ThreadId
tid <- IO ThreadId
myThreadId
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
sendToProducer SVar t m a
sv (forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (forall a. a -> Maybe a
Just SomeException
e))
{-# NOINLINE handleChildException #-}
handleChildException :: SVar t m a -> SomeException -> IO ()
handleChildException :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv SomeException
e = do
ThreadId
tid <- IO ThreadId
myThreadId
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
sv (forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (forall a. a -> Maybe a
Just SomeException
e))