Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
Deprecated: SVar is replaced by Channel.
Synopsis
- module Streamly.Internal.Data.SVar.Type
- decrementYieldLimit :: SVar t m a -> IO Bool
- incrementYieldLimit :: SVar t m a -> IO ()
- decrementBufferLimit :: SVar t m a -> IO ()
- incrementBufferLimit :: SVar t m a -> IO ()
- resetBufferLimit :: SVar t m a -> IO ()
- data Work
- isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool
- estimateWorkers :: Limit -> Count -> Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> LatencyRange -> Work
- updateYieldCount :: WorkerInfo -> IO Count
- minThreadDelay :: NanoSecond64
- workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
- workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
- send :: SVar t m a -> ChildEvent a -> IO Int
- ringDoorBell :: SVar t m a -> IO ()
- sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
- sendToProducer :: SVar t m a -> ChildEvent a -> IO Int
- sendStop :: SVar t m a -> Maybe WorkerInfo -> IO ()
- sendStopToProducer :: MonadIO m => SVar t m a -> m ()
- handleChildException :: SVar t m a -> SomeException -> IO ()
- handleFoldException :: SVar t m a -> SomeException -> IO ()
- collectLatency :: SVar t m a -> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
- withDiagMVar :: SVar t m a -> String -> IO () -> IO ()
- dumpSVar :: SVar t m a -> IO String
- printSVar :: SVar t m a -> String -> IO ()
- delThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
- modifyThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
- allThreadsDone :: MonadIO m => SVar t m a -> m Bool
- recordMaxWorkers :: MonadIO m => SVar t m a -> m ()
- pushWorker :: MonadAsync m => Count -> SVar t m a -> m ()
- pushWorkerPar :: MonadAsync m => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
- dispatchWorker :: MonadAsync m => Count -> SVar t m a -> m Bool
- dispatchWorkerPaced :: MonadAsync m => SVar t m a -> m Bool
- sendWorkerWait :: MonadAsync m => (SVar t m a -> IO ()) -> (SVar t m a -> m Bool) -> SVar t m a -> m ()
- sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a)
- sendWorkerDelay :: SVar t m a -> IO ()
- sendWorkerDelayPaced :: SVar t m a -> IO ()
- readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
- readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int)
- readOutputQPaced :: MonadAsync m => SVar t m a -> m [ChildEvent a]
- readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a]
- postProcessPaced :: MonadAsync m => SVar t m a -> m Bool
- postProcessBounded :: MonadAsync m => SVar t m a -> m Bool
- cleanupSVar :: SVar t m a -> IO ()
- cleanupSVarFromWorker :: SVar t m a -> IO ()
- getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo)
- newSVarStats :: IO SVarStats
- newParallelVar :: MonadAsync m => SVarStopStyle -> State t m a -> m (SVar t m a)
- enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
- reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
- queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool
- dequeueAhead :: MonadIO m => IORef ([t m a], Int) -> m (Maybe (t m a, Int))
- data HeapDequeueResult t m a
- dequeueFromHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> IO (HeapDequeueResult t m a)
- dequeueFromHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO (HeapDequeueResult t m a)
- requeueOnHeapTop :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
- updateHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO ()
- withIORef :: IORef a -> (a -> IO b) -> IO b
- heapIsSane :: Maybe Int -> Int -> Bool
- newAheadVar :: MonadAsync m => State t m a -> t m a -> (IORef ([t m a], Int) -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> State t m a -> SVar t m a -> Maybe WorkerInfo -> m ()) -> m (SVar t m a)
Documentation
Adjusting Limits
incrementYieldLimit :: SVar t m a -> IO () Source #
decrementBufferLimit :: SVar t m a -> IO () Source #
incrementBufferLimit :: SVar t m a -> IO () Source #
resetBufferLimit :: SVar t m a -> IO () Source #
Rate Control
isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool Source #
estimateWorkers :: Limit -> Count -> Count -> NanoSecond64 -> NanoSecond64 -> NanoSecond64 -> LatencyRange -> Work Source #
updateYieldCount :: WorkerInfo -> IO Count Source #
minThreadDelay :: NanoSecond64 Source #
This is a magic number and it is overloaded, and used at several places to achieve batching:
- If we have to sleep to slowdown this is the minimum period that we accumulate before we sleep. Also, workers do not stop until this much sleep time is accumulated.
- Collected latencies are computed and transferred to measured latency after a minimum of this period.
workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool Source #
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO () Source #
Send Events
send :: SVar t m a -> ChildEvent a -> IO Int Source #
This function is used by the producer threads to queue output for the consumer thread to consume. Returns whether the queue has more space.
ringDoorBell :: SVar t m a -> IO () Source #
Yield
sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool Source #
sendToProducer :: SVar t m a -> ChildEvent a -> IO Int Source #
Stop
sendStopToProducer :: MonadIO m => SVar t m a -> m () Source #
Exception
handleChildException :: SVar t m a -> SomeException -> IO () Source #
handleFoldException :: SVar t m a -> SomeException -> IO () Source #
Latency collection
collectLatency :: SVar t m a -> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64) Source #
Diagnostics
Thread accounting
allThreadsDone :: MonadIO m => SVar t m a -> m Bool Source #
This is safe even if we are adding more threads concurrently because if
a child thread is adding another thread then anyway workerThreads
will
not be empty.
Dispatching
recordMaxWorkers :: MonadIO m => SVar t m a -> m () Source #
pushWorker :: MonadAsync m => Count -> SVar t m a -> m () Source #
pushWorkerPar :: MonadAsync m => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m () Source #
In contrast to pushWorker which always happens only from the consumer thread, a pushWorkerPar can happen concurrently from multiple threads on the producer side. So we need to use a thread safe modification of workerThreads. Alternatively, we can use a CreateThread event to avoid using a CAS based modification.
dispatchWorker :: MonadAsync m => Count -> SVar t m a -> m Bool Source #
dispatchWorkerPaced :: MonadAsync m => SVar t m a -> m Bool Source #
sendWorkerWait :: MonadAsync m => (SVar t m a -> IO ()) -> (SVar t m a -> m Bool) -> SVar t m a -> m () Source #
sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a) Source #
sendWorkerDelay :: SVar t m a -> IO () Source #
sendWorkerDelayPaced :: SVar t m a -> IO () Source #
Read Output
readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int) Source #
readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int) Source #
readOutputQPaced :: MonadAsync m => SVar t m a -> m [ChildEvent a] Source #
readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a] Source #
Postprocess Hook After Reading
postProcessPaced :: MonadAsync m => SVar t m a -> m Bool Source #
postProcessBounded :: MonadAsync m => SVar t m a -> m Bool Source #
Release Resources
cleanupSVar :: SVar t m a -> IO () Source #
cleanupSVarFromWorker :: SVar t m a -> IO () Source #
New SVar
getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo) Source #
Parallel
newParallelVar :: MonadAsync m => SVarStopStyle -> State t m a -> m (SVar t m a) Source #
Ahead
data HeapDequeueResult t m a Source #
dequeueFromHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> IO (HeapDequeueResult t m a) Source #
dequeueFromHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO (HeapDequeueResult t m a) Source #
requeueOnHeapTop :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Entry Int (AheadHeapEntry t m a) -> Int -> IO () Source #
updateHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO () Source #
newAheadVar :: MonadAsync m => State t m a -> t m a -> (IORef ([t m a], Int) -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> State t m a -> SVar t m a -> Maybe WorkerInfo -> m ()) -> m (SVar t m a) Source #