module Streamly.Internal.Data.SVar
(
module Streamly.Internal.Data.SVar.Type
, module Streamly.Internal.Data.SVar.Worker
, module Streamly.Internal.Data.SVar.Dispatch
, module Streamly.Internal.Data.SVar.Pull
, getYieldRateInfo
, newSVarStats
, newParallelVar
, enqueueAhead
, reEnqueueAhead
, queueEmptyAhead
, dequeueAhead
, HeapDequeueResult(..)
, dequeueFromHeap
, dequeueFromHeapSeq
, requeueOnHeapTop
, updateHeapSeq
, withIORef
, heapIsSane
, newAheadVar
)
where
#include "inline.hs"
import Control.Concurrent (myThreadId, takeMVar)
import Control.Concurrent.MVar (newEmptyMVar, tryPutMVar, tryTakeMVar, newMVar)
import Control.Exception (assert)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Heap (Heap, Entry(..))
import Data.IORef (newIORef, readIORef)
import Data.IORef (IORef, atomicModifyIORef)
import Streamly.Internal.Control.Concurrent
(MonadAsync, askRunInIO, RunInIO)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units (NanoSecond64(..))
import qualified Data.Heap as H
import qualified Data.Set as S
import Streamly.Internal.Data.SVar.Dispatch
import Streamly.Internal.Data.SVar.Pull
import Streamly.Internal.Data.SVar.Type
import Streamly.Internal.Data.SVar.Worker
getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st = do
let rateToLatency :: a -> a
rateToLatency a
r = if a
r forall a. Ord a => a -> a -> Bool
<= a
0 then forall a. Bounded a => a
maxBound else forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ a
1.0e9 forall a. Fractional a => a -> a -> a
/ a
r
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State t m a
st of
Just (Rate Double
low Double
goal Double
high Int
buf) ->
let l :: NanoSecond64
l = forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
goal
minl :: NanoSecond64
minl = forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
high
maxl :: NanoSecond64
maxl = forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
low
in NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
l (NanoSecond64 -> NanoSecond64 -> LatencyRange
LatencyRange NanoSecond64
minl NanoSecond64
maxl) Int
buf
Maybe Rate
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
where
mkYieldRateInfo :: NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
latency LatencyRange
latRange Int
buf = do
IORef NanoSecond64
measured <- forall a. a -> IO (IORef a)
newIORef NanoSecond64
0
IORef (Count, Count, NanoSecond64)
wcur <- forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
IORef (Count, Count, NanoSecond64)
wcol <- forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Count, AbsTime)
wlong <- forall a. a -> IO (IORef a)
newIORef (Count
0,AbsTime
now)
IORef Count
period <- forall a. a -> IO (IORef a)
newIORef Count
1
IORef Count
gainLoss <- forall a. a -> IO (IORef a)
newIORef (Int64 -> Count
Count Int64
0)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just YieldRateInfo
{ svarLatencyTarget :: NanoSecond64
svarLatencyTarget = NanoSecond64
latency
, svarLatencyRange :: LatencyRange
svarLatencyRange = LatencyRange
latRange
, svarRateBuffer :: Int
svarRateBuffer = Int
buf
, svarGainedLostYields :: IORef Count
svarGainedLostYields = IORef Count
gainLoss
, workerBootstrapLatency :: Maybe NanoSecond64
workerBootstrapLatency = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
getStreamLatency State t m a
st
, workerPollingInterval :: IORef Count
workerPollingInterval = IORef Count
period
, workerMeasuredLatency :: IORef NanoSecond64
workerMeasuredLatency = IORef NanoSecond64
measured
, workerPendingLatency :: IORef (Count, Count, NanoSecond64)
workerPendingLatency = IORef (Count, Count, NanoSecond64)
wcur
, workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
workerCollectedLatency = IORef (Count, Count, NanoSecond64)
wcol
, svarAllTimeLatency :: IORef (Count, AbsTime)
svarAllTimeLatency = IORef (Count, AbsTime)
wlong
}
newSVarStats :: IO SVarStats
newSVarStats :: IO SVarStats
newSVarStats = do
IORef Int
disp <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxWrk <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxOq <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxHs <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxWq <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef (Count, NanoSecond64)
avgLat <- forall a. a -> IO (IORef a)
newIORef (Count
0, Int64 -> NanoSecond64
NanoSecond64 Int64
0)
IORef NanoSecond64
maxLat <- forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
IORef NanoSecond64
minLat <- forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
IORef (Maybe AbsTime)
stpTime <- forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing
forall (m :: * -> *) a. Monad m => a -> m a
return SVarStats
{ totalDispatches :: IORef Int
totalDispatches = IORef Int
disp
, maxWorkers :: IORef Int
maxWorkers = IORef Int
maxWrk
, maxOutQSize :: IORef Int
maxOutQSize = IORef Int
maxOq
, maxHeapSize :: IORef Int
maxHeapSize = IORef Int
maxHs
, maxWorkQSize :: IORef Int
maxWorkQSize = IORef Int
maxWq
, avgWorkerLatency :: IORef (Count, NanoSecond64)
avgWorkerLatency = IORef (Count, NanoSecond64)
avgLat
, minWorkerLatency :: IORef NanoSecond64
minWorkerLatency = IORef NanoSecond64
minLat
, maxWorkerLatency :: IORef NanoSecond64
maxWorkerLatency = IORef NanoSecond64
maxLat
, svarStopTime :: IORef (Maybe AbsTime)
svarStopTime = IORef (Maybe AbsTime)
stpTime
}
{-# INLINE enqueueAhead #-}
enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead SVar t m a
sv IORef ([t m a], Int)
q (RunInIO m, t m a)
m = do
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([t m a], Int)
q forall a b. (a -> b) -> a -> b
$ \ case
([], Int
n) -> ([forall a b. (a, b) -> b
snd (RunInIO m, t m a)
m], Int
n forall a. Num a => a -> a -> a
+ Int
1)
([t m a], Int)
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"enqueueAhead: queue is not empty"
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
{-# INLINE reEnqueueAhead #-}
reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar t m a
sv IORef ([t m a], Int)
q t m a
m = do
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([t m a], Int)
q forall a b. (a -> b) -> a -> b
$ \ case
([], Int
n) -> ([t m a
m], Int
n)
([t m a], Int)
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"reEnqueueAhead: queue is not empty"
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
{-# INLINE queueEmptyAhead #-}
queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool
queueEmptyAhead :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m Bool
queueEmptyAhead IORef ([t m a], Int)
q = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
([t m a]
xs, Int
_) <- forall a. IORef a -> IO a
readIORef IORef ([t m a], Int)
q
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t a -> Bool
null [t m a]
xs
{-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m
=> IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([t m a], Int)
q = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([t m a], Int)
q forall a b. (a -> b) -> a -> b
$ \case
([], Int
n) -> (([], Int
n), forall a. Maybe a
Nothing)
(t m a
x : [], Int
n) -> (([], Int
n), forall a. a -> Maybe a
Just (t m a
x, Int
n))
([t m a], Int)
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"more than one item on queue"
withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef :: forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef a
ref a -> IO b
f = forall a. IORef a -> IO a
readIORef IORef a
ref forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> IO b
f
atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ :: forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef a
ref a -> a
f =
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef a
ref forall a b. (a -> b) -> a -> b
$ \a
x -> (a -> a
f a
x, ())
data HeapDequeueResult t m a =
Clearing
| Waiting Int
| Ready (Entry Int (AheadHeapEntry t m a))
{-# INLINE dequeueFromHeap #-}
dequeueFromHeap
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar =
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \pair :: (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair@(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
case Maybe Int
snum of
Maybe Int
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
HeapDequeueResult t m a
Clearing)
Just Int
n -> do
let r :: Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r = forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
case Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r of
Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
n
then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', forall a. Maybe a
Nothing), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
else forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo forall a. Ord a => a -> a -> Bool
>= Int
n) ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
i =
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
case Maybe Int
snum of
Maybe Int
Nothing -> do
let r :: Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r = forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
case Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r of
Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
i
then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', forall a. Maybe a
Nothing), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
else forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo forall a. Ord a => a -> a -> Bool
>= Int
i) ((Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
i), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
i), forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
Just Int
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"dequeueFromHeapSeq: unreachable"
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo =
case Maybe Int
snum of
Maybe Int
Nothing -> Bool
True
Just Int
n -> Int
seqNo forall a. Ord a => a -> a -> Bool
>= Int
n
{-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a)
-> Int
-> IO ()
requeueOnHeapTop :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Entry Int (AheadHeapEntry t m a)
ent Int
seqNo =
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (forall a. Ord a => a -> Heap a -> Heap a
H.insert Entry Int (AheadHeapEntry t m a)
ent Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
seqNo)
{-# INLINE updateHeapSeq #-}
updateHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO ()
updateHeapSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
seqNo =
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Heap (Entry Int (AheadHeapEntry t m a))
hp, forall a. a -> Maybe a
Just Int
seqNo)
getAheadSVar :: MonadAsync m
=> State t m a
-> ( IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar State t m a
st IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
f RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH <- forall a. a -> IO (IORef a)
newIORef (forall a. Heap a
H.empty, forall a. a -> Maybe a
Just Int
0)
MVar ()
outQMv <- forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- forall a. a -> IO (IORef a)
newIORef forall a. Set a
S.empty
IORef ([t m a], Int)
q <- forall a. a -> IO (IORef a)
newIORef ([], -Int
1)
MVar ()
stopMVar <- forall a. a -> IO (MVar a)
newMVar ()
Maybe (IORef Count)
yl <- case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
Maybe Count
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just Count
x -> forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let getSVar :: SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv SVar t m a -> m [ChildEvent a]
readOutput SVar t m a -> m Bool
postProc = SVar
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = forall a. HasCallStack => a
undefined
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = forall a. HasCallStack => a
undefined
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = forall a. HasCallStack => a
undefined
, pushBufferMVar :: MVar ()
pushBufferMVar = forall a. HasCallStack => a
undefined
, maxWorkerLimit :: Limit
maxWorkerLimit = forall a. Ord a => a -> a -> a
min (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State t m a
st) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = forall a. HasCallStack => a
undefined
, readOutputQ :: m [ChildEvent a]
readOutputQ = SVar t m a -> m [ChildEvent a]
readOutput SVar t m a
sv
, postProcess :: m Bool
postProcess = SVar t m a -> m Bool
postProc SVar t m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
f IORef ([t m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH State t m a
st{streamVar :: Maybe (SVar t m a)
streamVar = forall a. a -> Maybe a
Just SVar t m a
sv} SVar t m a
sv
, enqueue :: (RunInIO m, t m a) -> IO ()
enqueue = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead SVar t m a
sv IORef ([t m a], Int)
q
, isWorkDone :: IO Bool
isWorkDone = forall {t :: * -> *} {t :: (* -> *) -> * -> *} {m :: * -> *} {a}
{a} {b} {a} {b}.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead SVar t m a
sv IORef ([t m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH
, isQueueDone :: IO Bool
isQueueDone = forall {t :: * -> *} {t :: (* -> *) -> * -> *} {m :: * -> *} {a}
{a} {b}.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef ([t m a], Int)
q
, needDoorBell :: IORef Bool
needDoorBell = IORef Bool
wfw
, svarStyle :: SVarStyle
svarStyle = SVarStyle
AheadVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
StopNone
, svarStopBy :: IORef ThreadId
svarStopBy = forall a. HasCallStack => a
undefined
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar t m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = MVar ()
stopMVar
, svarRef :: Maybe (IORef ())
svarRef = forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State t m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([t m a], Int)
aheadWorkQueue = IORef ([t m a], Int)
q
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap = IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: SVar t m a
sv =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State t m a
st of
Maybe Rate
Nothing -> SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
Just Rate
_ -> SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
in forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
sv
where
{-# INLINE isQueueDoneAhead #-}
isQueueDoneAhead :: SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef (t a, b)
q = do
Bool
queueDone <- forall {t :: * -> *} {a} {b}.
Foldable t =>
IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q
Bool
yieldsDone <-
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Just IORef Count
yref -> do
Count
n <- forall a. IORef a -> IO a
readIORef IORef Count
yref
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
yieldsDone Bool -> Bool -> Bool
|| Bool
queueDone
{-# INLINE isWorkDoneAhead #-}
isWorkDoneAhead :: SVar t m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead SVar t m a
sv IORef (t a, b)
q IORef (Heap a, b)
ref = do
Bool
heapDone <- do
(Heap a
hp, b
_) <- forall a. IORef a -> IO a
readIORef IORef (Heap a, b)
ref
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. Heap a -> Int
H.size Heap a
hp forall a. Ord a => a -> a -> Bool
<= Int
0)
Bool
queueDone <- forall {t :: * -> *} {t :: (* -> *) -> * -> *} {m :: * -> *} {a}
{a} {b}.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef (t a, b)
q
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool
heapDone Bool -> Bool -> Bool
&& Bool
queueDone
checkEmpty :: IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q = do
(t a
xs, b
_) <- forall a. IORef a -> IO a
readIORef IORef (t a, b)
q
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a. Foldable t => t a -> Bool
null t a
xs
{-# INLINABLE newAheadVar #-}
newAheadVar :: MonadAsync m
=> State t m a
-> t m a
-> ( IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar t m a)
newAheadVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar t m a)
newAheadVar State t m a
st t m a
m IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
wloop = do
RunInIO m
mrun <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
SVar t m a
sv <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar State t m a
st IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
wloop RunInIO m
mrun
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar t m a
sv t m a
m
getParallelSVar :: MonadIO m
=> SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar SVarStopStyle
ss State t m a
st RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
IORef ([ChildEvent a], Int)
outQRev <- forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- forall a. IO (MVar a)
newEmptyMVar
MVar ()
outQMvRev <- forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- forall a. a -> IO (IORef a)
newIORef Int
0
IORef (Set ThreadId)
running <- forall a. a -> IO (IORef a)
newIORef forall a. Set a
S.empty
Maybe (IORef Count)
yl <- case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
Maybe Count
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just Count
x -> forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st
let bufLim :: Count
bufLim =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st of
Limit
Unlimited -> forall a. HasCallStack => a
undefined
Limited Word
x -> forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
x
IORef Count
remBuf <- forall a. a -> IO (IORef a)
newIORef Count
bufLim
MVar ()
pbMVar <- forall a. a -> IO (MVar a)
newMVar ()
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
IORef ThreadId
stopBy <-
case SVarStopStyle
ss of
SVarStopStyle
StopBy -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a. HasCallStack => a
undefined
SVarStopStyle
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. HasCallStack => a
undefined
let sv :: SVar t m a
sv =
SVar { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
outQRev
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = IORef Count
remBuf
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
PushBufferBlock
, pushBufferMVar :: MVar ()
pushBufferMVar = MVar ()
pbMVar
, maxWorkerLimit :: Limit
maxWorkerLimit = Limit
Unlimited
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
outQMvRev
, readOutputQ :: m [ChildEvent a]
readOutputQ = forall {m :: * -> *} {t :: (* -> *) -> * -> *} {m :: * -> *} {a}.
MonadIO m =>
SVar t m a -> m [ChildEvent a]
readOutputQPar SVar t m a
sv
, postProcess :: m Bool
postProcess = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = forall a. HasCallStack => a
undefined
, enqueue :: (RunInIO m, t m a) -> IO ()
enqueue = forall a. HasCallStack => a
undefined
, isWorkDone :: IO Bool
isWorkDone = forall a. HasCallStack => a
undefined
, isQueueDone :: IO Bool
isQueueDone = forall a. HasCallStack => a
undefined
, needDoorBell :: IORef Bool
needDoorBell = forall a. HasCallStack => a
undefined
, svarStyle :: SVarStyle
svarStyle = SVarStyle
ParallelVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
ss
, svarStopBy :: IORef ThreadId
svarStopBy = IORef ThreadId
stopBy
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = forall a. HasCallStack => a
undefined
, svarRef :: Maybe (IORef ())
svarRef = forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State t m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([t m a], Int)
aheadWorkQueue = forall a. HasCallStack => a
undefined
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap = forall a. HasCallStack => a
undefined
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
in forall (m :: * -> *) a. Monad m => a -> m a
return forall {t :: (* -> *) -> * -> *}. SVar t m a
sv
where
readOutputQPar :: SVar t m a -> m [ChildEvent a]
readOutputQPar SVar t m a
sv = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [Char] -> IO () -> IO ()
withDiagMVar SVar t m a
sv [Char]
"readOutputQPar: doorbell"
forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just YieldRateInfo
yinfo -> forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
False
[ChildEvent a]
r <- forall a b. (a, b) -> a
fst forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO (Maybe a)
tryTakeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
resetBufferLimit SVar t m a
sv
IO ()
writeBarrier
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
r
{-# INLINABLE newParallelVar #-}
newParallelVar :: MonadAsync m
=> SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
ss State t m a
st = do
RunInIO m
mrun <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar SVarStopStyle
ss State t m a
st RunInIO m
mrun