module Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
(
pushWorker
, dispatchWorker
, dispatchWorkerPaced
, sendWorkerWait
, startChannel
, sendWorkerDelay
, sendWorkerDelayPaced
)
where
import Control.Concurrent (takeMVar, threadDelay)
import Control.Exception (assert)
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Maybe (fromJust, fromMaybe)
import Data.IORef (modifyIORef, newIORef, readIORef, writeIORef)
import Streamly.Internal.Control.Concurrent (MonadRunInIO)
import Streamly.Internal.Control.ForkLifted (doFork)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_, storeLoadBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
(MicroSecond64(..), diffAbsTime64, fromRelTime64, toRelTime64)
import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Channel.Dispatcher
import Streamly.Internal.Data.Channel.Types
import Streamly.Internal.Data.Channel.Worker
{-# NOINLINE pushWorker #-}
pushWorker :: MonadRunInIO m => Count -> Channel m a -> m ()
pushWorker :: forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
yieldMax Channel m a
sv = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv) ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv)
(m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> SVarStats -> m ()
forall (m :: * -> *). MonadIO m => IORef Int -> SVarStats -> m ()
recordMaxWorkers (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv) (Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)
Maybe WorkerInfo
winfo <-
case Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv of
Maybe YieldRateInfo
Nothing -> Maybe WorkerInfo -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe WorkerInfo
forall a. Maybe a
Nothing
Just YieldRateInfo
_ -> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo))
-> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ do
IORef Count
cntRef <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
0
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Count, AbsTime)
lat <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe WorkerInfo -> IO (Maybe WorkerInfo))
-> Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ WorkerInfo -> Maybe WorkerInfo
forall a. a -> Maybe a
Just WorkerInfo :: Count -> IORef Count -> IORef (Count, AbsTime) -> WorkerInfo
WorkerInfo
{ workerYieldMax :: Count
workerYieldMax = Count
yieldMax
, workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
, workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
}
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> m ()
workLoop Channel m a
sv Maybe WorkerInfo
winfo) (Channel m a -> RunInIO m
forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv) SomeException -> IO ()
exception m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ThreadId -> m ()
modThread
where
modThread :: ThreadId -> m ()
modThread = IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
modifyThread (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
exception :: SomeException -> IO ()
exception = IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> SomeException -> IO ()
handleChildException (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
{-# INLINE getEffectiveWorkerLimit #-}
getEffectiveWorkerLimit :: MonadIO m => Channel m a -> m Limit
getEffectiveWorkerLimit :: forall (m :: * -> *) a. MonadIO m => Channel m a -> m Limit
getEffectiveWorkerLimit Channel m a
sv = do
let workerLimit :: Limit
workerLimit = Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv
case Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
Maybe (IORef Count)
Nothing -> Limit -> m Limit
forall (m :: * -> *) a. Monad m => a -> m a
return Limit
workerLimit
Just IORef Count
ref -> do
Count
n <- IO Count -> m Count
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Count -> m Count) -> IO Count -> m Count
forall a b. (a -> b) -> a -> b
$ IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
case Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv of
Just YieldRateInfo
_ -> Limit -> m Limit
forall (m :: * -> *) a. Monad m => a -> m a
return Limit
workerLimit
Maybe YieldRateInfo
Nothing ->
Limit -> m Limit
forall (m :: * -> *) a. Monad m => a -> m a
return (Limit -> m Limit) -> Limit -> m Limit
forall a b. (a -> b) -> a -> b
$
case Limit
workerLimit of
Limit
Unlimited -> Word -> Limit
Limited (Count -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
n)
Limited Word
lim -> Word -> Limit
Limited (Word -> Limit) -> Word -> Limit
forall a b. (a -> b) -> a -> b
$ Word -> Word -> Word
forall a. Ord a => a -> a -> a
min Word
lim (Count -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Count
n)
{-# INLINE checkMaxThreads #-}
checkMaxThreads :: MonadIO m => Int -> Channel m a -> m Bool
checkMaxThreads :: forall (m :: * -> *) a. MonadIO m => Int -> Channel m a -> m Bool
checkMaxThreads Int
active Channel m a
sv = do
Limit
limit <- Channel m a -> m Limit
forall (m :: * -> *) a. MonadIO m => Channel m a -> m Limit
getEffectiveWorkerLimit Channel m a
sv
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return
(Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ case Limit
limit of
Limit
Unlimited -> Bool
True
Limited Word
lim -> Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
active
{-# INLINE checkMaxBuffer #-}
checkMaxBuffer :: MonadIO m => Int -> Channel m a -> m Bool
checkMaxBuffer :: forall (m :: * -> *) a. MonadIO m => Int -> Channel m a -> m Bool
checkMaxBuffer Int
active Channel m a
sv = do
let limit :: Limit
limit = Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit Channel m a
sv
case Limit
limit of
Limit
Unlimited -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Limited Word
lim -> do
([ChildEvent a]
_, Int
n) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
active
dispatchWorker :: MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker :: forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
yieldCount Channel m a
sv = do
Bool
done <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> IO Bool
forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone Channel m a
sv
if Bool -> Bool
not Bool
done
then do
Bool
qDone <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> IO Bool
forall (m :: * -> *) a. Channel m a -> IO Bool
isQueueDone Channel m a
sv
Int
active <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ [Char] -> m ()
forall a. HasCallStack => [Char] -> a
error [Char]
"dispatchWorker active negative"
if Bool -> Bool
not Bool
qDone
then do
Bool
r <- Int -> Channel m a -> m Bool
forall (m :: * -> *) a. MonadIO m => Int -> Channel m a -> m Bool
checkMaxThreads Int
active Channel m a
sv
if Bool
r
then do
Bool
r1 <- Int -> Channel m a -> m Bool
forall (m :: * -> *) a. MonadIO m => Int -> Channel m a -> m Bool
checkMaxBuffer Int
active Channel m a
sv
if Bool
r1
then Count -> Channel m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
yieldCount Channel m a
sv m () -> m Bool -> m Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Bool
r <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> IO Bool
forall (m :: * -> *) a. Channel m a -> IO Bool
isWorkDone Channel m a
sv
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> Channel m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
0 Channel m a
sv
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
dispatchWorkerPaced :: MonadRunInIO m =>
Channel m a -> m Bool
dispatchWorkerPaced :: forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
dispatchWorkerPaced Channel m a
sv = do
let yinfo :: YieldRateInfo
yinfo = Maybe YieldRateInfo -> YieldRateInfo
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe YieldRateInfo -> YieldRateInfo)
-> Maybe YieldRateInfo -> YieldRateInfo
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv
(Count
svarYields, NanoSecond64
svarElapsed, NanoSecond64
wLatency) <- do
AbsTime
now <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
(Count
yieldCount, AbsTime
baseTime, NanoSecond64
lat) <-
IO (Count, AbsTime, NanoSecond64)
-> m (Count, AbsTime, NanoSecond64)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO (Count, AbsTime, NanoSecond64)
-> m (Count, AbsTime, NanoSecond64))
-> IO (Count, AbsTime, NanoSecond64)
-> m (Count, AbsTime, NanoSecond64)
forall a b. (a -> b) -> a -> b
$ Bool
-> SVarStats
-> YieldRateInfo
-> Bool
-> IO (Count, AbsTime, NanoSecond64)
collectLatency
(Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) (Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv) YieldRateInfo
yinfo Bool
False
let elapsed :: NanoSecond64
elapsed = RelTime64 -> NanoSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (RelTime64 -> NanoSecond64) -> RelTime64 -> NanoSecond64
forall a b. (a -> b) -> a -> b
$ AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
now AbsTime
baseTime
let latency :: NanoSecond64
latency =
if NanoSecond64
lat NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0
then NanoSecond64 -> Maybe NanoSecond64 -> NanoSecond64
forall a. a -> Maybe a -> a
fromMaybe NanoSecond64
lat (YieldRateInfo -> Maybe NanoSecond64
workerBootstrapLatency YieldRateInfo
yinfo)
else NanoSecond64
lat
(Count, NanoSecond64, NanoSecond64)
-> m (Count, NanoSecond64, NanoSecond64)
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
yieldCount, NanoSecond64
elapsed, NanoSecond64
latency)
if NanoSecond64
wLatency NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
0
then Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do
let workerLimit :: Limit
workerLimit = Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv
let targetLat :: NanoSecond64
targetLat = YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo
let range :: LatencyRange
range = YieldRateInfo -> LatencyRange
svarLatencyRange YieldRateInfo
yinfo
Count
gainLoss <- IO Count -> m Count
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Count -> m Count) -> IO Count -> m Count
forall a b. (a -> b) -> a -> b
$ IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo)
let work :: Work
work = Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers Limit
workerLimit Count
svarYields Count
gainLoss NanoSecond64
svarElapsed
NanoSecond64
wLatency NanoSecond64
targetLat LatencyRange
range
case Work
work of
BlockWait NanoSecond64
s -> do
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (NanoSecond64
s NanoSecond64 -> NanoSecond64 -> Bool
forall a. Ord a => a -> a -> Bool
>= NanoSecond64
0) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
Bool
done <- IORef (Set ThreadId) -> m Bool
forall (m :: * -> *). MonadIO m => IORef (Set ThreadId) -> m Bool
allThreadsDone (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ do
let us :: MicroSecond64
us = RelTime64 -> MicroSecond64
forall a. TimeUnit64 a => RelTime64 -> a
fromRelTime64 (NanoSecond64 -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 NanoSecond64
s) :: MicroSecond64
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (MicroSecond64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral MicroSecond64
us)
Count -> Channel m a -> m Bool
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
1 Channel m a
sv
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
PartialWorker Count
yields -> do
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
YieldRateInfo -> Count -> m ()
forall {f :: * -> *}. MonadIO f => YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields
Bool
done <- IORef (Set ThreadId) -> m Bool
forall (m :: * -> *). MonadIO m => IORef (Set ThreadId) -> m Bool
allThreadsDone (Channel m a -> IORef (Set ThreadId)
forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
done (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> Channel m a -> m Bool
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
yields Channel m a
sv
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
ManyWorkers Int
netWorkers Count
yields -> do
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Int
netWorkers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
0) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
YieldRateInfo -> Count -> m ()
forall {f :: * -> *}. MonadIO f => YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields
let periodRef :: IORef Count
periodRef = YieldRateInfo -> IORef Count
workerPollingInterval YieldRateInfo
yinfo
ycnt :: Count
ycnt = Count -> Count -> Count
forall a. Ord a => a -> a -> a
max Count
1 (Count -> Count) -> Count -> Count
forall a b. (a -> b) -> a -> b
$ Count
yields Count -> Count -> Count
forall a. Integral a => a -> a -> a
`div` Int -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
netWorkers
period :: Count
period = Count -> Count -> Count
forall a. Ord a => a -> a -> a
min Count
ycnt (Word -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
magicMaxBuffer)
Count
old <- IO Count -> m Count
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Count -> m Count) -> IO Count -> m Count
forall a b. (a -> b) -> a -> b
$ IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
periodRef
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
period Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
< Count
old) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Count -> Count -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Count
periodRef Count
period
Int
cnt <- IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv
if Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
netWorkers
then do
let total :: Int
total = Int
netWorkers Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
cnt
batch :: Int
batch = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ NanoSecond64 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (NanoSecond64 -> Int) -> NanoSecond64 -> Int
forall a b. (a -> b) -> a -> b
$
NanoSecond64
minThreadDelay NanoSecond64 -> NanoSecond64 -> NanoSecond64
forall a. Integral a => a -> a -> a
`div` NanoSecond64
targetLat
Int -> m Bool
forall {t}. (Eq t, Num t) => t -> m Bool
dispatchN (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
total Int
batch)
else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
where
updateGainedLostYields :: YieldRateInfo -> Count -> f ()
updateGainedLostYields YieldRateInfo
yinfo Count
yields = do
let buf :: Count
buf = Int -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Count) -> Int -> Count
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> Int
svarRateBuffer YieldRateInfo
yinfo
Bool -> f () -> f ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Count
yields Count -> Count -> Bool
forall a. Eq a => a -> a -> Bool
/= Count
0 Bool -> Bool -> Bool
&& Count -> Count
forall a. Num a => a -> a
abs Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
buf) (f () -> f ()) -> f () -> f ()
forall a b. (a -> b) -> a -> b
$ do
let delta :: Count
delta =
if Count
yields Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
> Count
0
then Count
yields Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
buf
else Count
yields Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
buf
IO () -> f ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> f ()) -> IO () -> f ()
forall a b. (a -> b) -> a -> b
$ IORef Count -> (Count -> Count) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
modifyIORef (YieldRateInfo -> IORef Count
svarGainedLostYields YieldRateInfo
yinfo) (Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
delta)
dispatchN :: t -> m Bool
dispatchN t
n =
if t
n t -> t -> Bool
forall a. Eq a => a -> a -> Bool
== t
0
then Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else do
Bool
r <- Count -> Channel m a -> m Bool
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
0 Channel m a
sv
if Bool
r
then t -> m Bool
dispatchN (t
n t -> t -> t
forall a. Num a => a -> a -> a
- t
1)
else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
{-# NOINLINE sendWorkerWait #-}
sendWorkerWait
:: MonadIO m
=> Bool
-> (Channel m a -> IO ())
-> (Channel m a -> m Bool)
-> Channel m a
-> m ()
sendWorkerWait :: forall (m :: * -> *) a.
MonadIO m =>
Bool
-> (Channel m a -> IO ())
-> (Channel m a -> m Bool)
-> Channel m a
-> m ()
sendWorkerWait Bool
eagerEval Channel m a -> IO ()
delay Channel m a -> m Bool
dispatch Channel m a
sv = m ()
go
where
go :: m ()
go = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> IO ()
delay Channel m a
sv
([ChildEvent a]
_, Int
n) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 Bool -> Bool -> Bool
|| Bool
eagerEval) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Bool -> (Bool -> Bool) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) ((Bool -> Bool) -> IO ()) -> (Bool -> Bool) -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> Bool -> Bool
forall a b. a -> b -> a
const Bool
True
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
storeLoadBarrier
Bool
canDoMore <- Channel m a -> m Bool
dispatch Channel m a
sv
if Bool
canDoMore
then m ()
go
else do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> IO [Char] -> [Char] -> IO () -> IO ()
withDiagMVar
(Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv)
(Channel m a -> IO [Char]
forall (m :: * -> *) a. Channel m a -> IO [Char]
dumpSVar Channel m a
sv)
[Char]
"sendWorkerWait: nothing to do"
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
([ChildEvent a]
_, Int
len) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
then m ()
go
else
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Bool -> (Bool -> Bool) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv)
((Bool -> Bool) -> IO ()) -> (Bool -> Bool) -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> Bool -> Bool
forall a b. a -> b -> a
const Bool
False
startChannel :: MonadRunInIO m =>
Channel m a -> m ()
startChannel :: forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m ()
startChannel Channel m a
chan = do
case Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
chan of
Maybe YieldRateInfo
Nothing -> Count -> Channel m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
0 Channel m a
chan
Just YieldRateInfo
yinfo ->
if YieldRateInfo -> NanoSecond64
svarLatencyTarget YieldRateInfo
yinfo NanoSecond64 -> NanoSecond64 -> Bool
forall a. Eq a => a -> a -> Bool
== NanoSecond64
forall a. Bounded a => a
maxBound
then IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
forall a. Bounded a => a
maxBound
else Count -> Channel m a -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m ()
pushWorker Count
1 Channel m a
chan
sendWorkerDelayPaced :: Channel m a -> IO ()
sendWorkerDelayPaced :: forall (m :: * -> *) a. Channel m a -> IO ()
sendWorkerDelayPaced Channel m a
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendWorkerDelay :: Channel m a -> IO ()
sendWorkerDelay :: forall (m :: * -> *) a. Channel m a -> IO ()
sendWorkerDelay Channel m a
_sv =
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()