Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Synopsis
- type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
- data SVarStyle
- data SVarStopStyle
- data SVar t m a = SVar {
- svarStyle :: SVarStyle
- svarMrun :: RunInIO m
- svarStopStyle :: SVarStopStyle
- svarStopBy :: IORef ThreadId
- outputQueue :: IORef ([ChildEvent a], Int)
- outputDoorBell :: MVar ()
- readOutputQ :: m [ChildEvent a]
- postProcess :: m Bool
- outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
- outputDoorBellFromConsumer :: MVar ()
- maxWorkerLimit :: Limit
- maxBufferLimit :: Limit
- pushBufferSpace :: IORef Count
- pushBufferPolicy :: PushBufferPolicy
- pushBufferMVar :: MVar ()
- remainingWork :: Maybe (IORef Count)
- yieldRateInfo :: Maybe YieldRateInfo
- enqueue :: (RunInIO m, t m a) -> IO ()
- isWorkDone :: IO Bool
- isQueueDone :: IO Bool
- needDoorBell :: IORef Bool
- workLoop :: Maybe WorkerInfo -> m ()
- workerThreads :: IORef (Set ThreadId)
- workerCount :: IORef Int
- accountThread :: ThreadId -> m ()
- workerStopMVar :: MVar ()
- svarStats :: SVarStats
- svarRef :: Maybe (IORef ())
- svarInspectMode :: Bool
- svarCreator :: ThreadId
- outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
- aheadWorkQueue :: IORef ([t m a], Int)
- data Limit
- data State t m a
- defState :: State t m a
- adaptState :: State t m a -> State t n b
- getMaxThreads :: State t m a -> Limit
- setMaxThreads :: Int -> State t m a -> State t m a
- getMaxBuffer :: State t m a -> Limit
- setMaxBuffer :: Int -> State t m a -> State t m a
- getStreamRate :: State t m a -> Maybe Rate
- setStreamRate :: Maybe Rate -> State t m a -> State t m a
- setStreamLatency :: Int -> State t m a -> State t m a
- getYieldLimit :: State t m a -> Maybe Count
- setYieldLimit :: Maybe Int64 -> State t m a -> State t m a
- getInspectMode :: State t m a -> Bool
- setInspectMode :: State t m a -> State t m a
- recordMaxWorkers :: MonadIO m => SVar t m a -> m ()
- cleanupSVar :: SVar t m a -> IO ()
- cleanupSVarFromWorker :: SVar t m a -> IO ()
- newAheadVar :: MonadAsync m => State t m a -> t m a -> (IORef ([t m a], Int) -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> State t m a -> SVar t m a -> Maybe WorkerInfo -> m ()) -> m (SVar t m a)
- newParallelVar :: MonadAsync m => SVarStopStyle -> State t m a -> m (SVar t m a)
- captureMonadState :: MonadBaseControl IO m => m (RunInIO m)
- newtype RunInIO m = RunInIO {}
- data WorkerInfo = WorkerInfo {
- workerYieldMax :: Count
- workerYieldCount :: IORef Count
- workerLatencyStart :: IORef (Count, AbsTime)
- data YieldRateInfo = YieldRateInfo {
- svarLatencyTarget :: NanoSecond64
- svarLatencyRange :: LatencyRange
- svarRateBuffer :: Int
- svarGainedLostYields :: IORef Count
- svarAllTimeLatency :: IORef (Count, AbsTime)
- workerBootstrapLatency :: Maybe NanoSecond64
- workerPollingInterval :: IORef Count
- workerPendingLatency :: IORef (Count, Count, NanoSecond64)
- workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
- workerMeasuredLatency :: IORef NanoSecond64
- data ThreadAbort = ThreadAbort
- data ChildEvent a
- data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a
- = AheadEntryNull
- | AheadEntryPure a
- | AheadEntryStream (RunInIO m, t m a)
- send :: SVar t m a -> ChildEvent a -> IO Int
- sendToProducer :: SVar t m a -> ChildEvent a -> IO Int
- sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
- sendStop :: SVar t m a -> Maybe WorkerInfo -> IO ()
- sendStopToProducer :: MonadIO m => SVar t m a -> m ()
- enqueueLIFO :: SVar t m a -> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
- enqueueFIFO :: SVar t m a -> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
- enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
- reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
- pushWorkerPar :: MonadAsync m => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
- handleChildException :: SVar t m a -> SomeException -> IO ()
- handleFoldException :: SVar t m a -> SomeException -> IO ()
- queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool
- dequeueAhead :: MonadIO m => IORef ([t m a], Int) -> m (Maybe (t m a, Int))
- data HeapDequeueResult t m a
- dequeueFromHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> IO (HeapDequeueResult t m a)
- dequeueFromHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO (HeapDequeueResult t m a)
- requeueOnHeapTop :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
- updateHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO ()
- withIORef :: IORef a -> (a -> IO b) -> IO b
- heapIsSane :: Maybe Int -> Int -> Bool
- data Rate = Rate {}
- getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo)
- newSVarStats :: IO SVarStats
- collectLatency :: SVar t m a -> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
- workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
- isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool
- workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
- updateYieldCount :: WorkerInfo -> IO Count
- decrementYieldLimit :: SVar t m a -> IO Bool
- incrementYieldLimit :: SVar t m a -> IO ()
- decrementBufferLimit :: SVar t m a -> IO ()
- incrementBufferLimit :: SVar t m a -> IO ()
- postProcessBounded :: MonadAsync m => SVar t m a -> m Bool
- postProcessPaced :: MonadAsync m => SVar t m a -> m Bool
- readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a]
- readOutputQPaced :: MonadAsync m => SVar t m a -> m [ChildEvent a]
- readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
- dispatchWorkerPaced :: MonadAsync m => SVar t m a -> m Bool
- sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a)
- delThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
- modifyThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
- doFork :: MonadBaseControl IO m => m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
- fork :: MonadBaseControl IO m => m () -> m ThreadId
- forkManaged :: (MonadIO m, MonadBaseControl IO m) => m () -> m ThreadId
- toStreamVar :: MonadAsync m => SVar t m a -> t m a -> m ()
- data SVarStats = SVarStats {
- totalDispatches :: IORef Int
- maxWorkers :: IORef Int
- maxOutQSize :: IORef Int
- maxHeapSize :: IORef Int
- maxWorkQSize :: IORef Int
- avgWorkerLatency :: IORef (Count, NanoSecond64)
- minWorkerLatency :: IORef NanoSecond64
- maxWorkerLatency :: IORef NanoSecond64
- svarStopTime :: IORef (Maybe AbsTime)
- dumpSVar :: SVar t m a -> IO String
- printSVar :: SVar t m a -> String -> IO ()
- withDiagMVar :: SVar t m a -> String -> IO () -> IO ()
Documentation
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #
A monad that can perform concurrent or parallel IO operations. Streams
that can be composed concurrently require the underlying monad to be
MonadAsync
.
Since: 0.1.0 (Streamly)
Since: 0.8.0
Identify the type of the SVar. Two computations using the same style can be scheduled on the same SVar.
data SVarStopStyle Source #
Instances
Eq SVarStopStyle Source # | |
Defined in Streamly.Internal.Data.SVar (==) :: SVarStopStyle -> SVarStopStyle -> Bool # (/=) :: SVarStopStyle -> SVarStopStyle -> Bool # | |
Show SVarStopStyle Source # | |
Defined in Streamly.Internal.Data.SVar showsPrec :: Int -> SVarStopStyle -> ShowS # show :: SVarStopStyle -> String # showList :: [SVarStopStyle] -> ShowS # |
adaptState :: State t m a -> State t n b Source #
Adapt the stream state from one type to another.
getMaxThreads :: State t m a -> Limit Source #
getMaxBuffer :: State t m a -> Limit Source #
getYieldLimit :: State t m a -> Maybe Count Source #
getInspectMode :: State t m a -> Bool Source #
setInspectMode :: State t m a -> State t m a Source #
recordMaxWorkers :: MonadIO m => SVar t m a -> m () Source #
cleanupSVar :: SVar t m a -> IO () Source #
cleanupSVarFromWorker :: SVar t m a -> IO () Source #
newAheadVar :: MonadAsync m => State t m a -> t m a -> (IORef ([t m a], Int) -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> State t m a -> SVar t m a -> Maybe WorkerInfo -> m ()) -> m (SVar t m a) Source #
newParallelVar :: MonadAsync m => SVarStopStyle -> State t m a -> m (SVar t m a) Source #
captureMonadState :: MonadBaseControl IO m => m (RunInIO m) Source #
When we run computations concurrently, we completely isolate the state of the concurrent computations from the parent computation. The invariant is that we should never be running two concurrent computations in the same thread without using the runInIO function. Also, we should never be running a concurrent computation in the parent thread, otherwise it may affect the state of the parent which is against the defined semantics of concurrent execution.
data WorkerInfo Source #
An SVar or a Stream Var is a conduit to the output from multiple streams running concurrently and asynchronously. An SVar can be thought of as an asynchronous IO handle. We can write any number of streams to an SVar in a non-blocking manner and then read them back at any time at any pace. The SVar would run the streams asynchronously and accumulate results. An SVar may not really execute the stream completely and accumulate all the results. However, it ensures that the reader can read the results at whatever paces it wants to read. The SVar monitors and adapts to the consumer's pace.
An SVar is a mini scheduler, it has an associated workLoop that holds the
stream tasks to be picked and run by a pool of worker threads. It has an
associated output queue where the output stream elements are placed by the
worker threads. A outputDoorBell is used by the worker threads to intimate the
consumer thread about availability of new results in the output queue. More
workers are added to the SVar by fromStreamVar
on demand if the output
produced is not keeping pace with the consumer. On bounded SVars, workers
block on the output queue to provide throttling of the producer when the
consumer is not pulling fast enough. The number of workers may even get
reduced depending on the consuming pace.
New work is enqueued either at the time of creation of the SVar or as a
result of executing the parallel combinators i.e. <|
and <|>
when the
already enqueued computations get evaluated. See joinStreamVarAsync
.
WorkerInfo | |
|
data YieldRateInfo Source #
YieldRateInfo | |
|
data ThreadAbort Source #
Instances
Show ThreadAbort Source # | |
Defined in Streamly.Internal.Data.SVar showsPrec :: Int -> ThreadAbort -> ShowS # show :: ThreadAbort -> String # showList :: [ThreadAbort] -> ShowS # | |
Exception ThreadAbort Source # | |
Defined in Streamly.Internal.Data.SVar |
data ChildEvent a Source #
Events that a child thread may send to a parent thread.
data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a Source #
Sorting out-of-turn outputs in a heap for Ahead style streams
AheadEntryNull | |
AheadEntryPure a | |
AheadEntryStream (RunInIO m, t m a) |
send :: SVar t m a -> ChildEvent a -> IO Int Source #
This function is used by the producer threads to queue output for the consumer thread to consume. Returns whether the queue has more space.
sendToProducer :: SVar t m a -> ChildEvent a -> IO Int Source #
sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool Source #
sendStopToProducer :: MonadIO m => SVar t m a -> m () Source #
enqueueFIFO :: SVar t m a -> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO () Source #
pushWorkerPar :: MonadAsync m => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m () Source #
In contrast to pushWorker which always happens only from the consumer thread, a pushWorkerPar can happen concurrently from multiple threads on the producer side. So we need to use a thread safe modification of workerThreads. Alternatively, we can use a CreateThread event to avoid using a CAS based modification.
handleChildException :: SVar t m a -> SomeException -> IO () Source #
handleFoldException :: SVar t m a -> SomeException -> IO () Source #
data HeapDequeueResult t m a Source #
dequeueFromHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> IO (HeapDequeueResult t m a) Source #
dequeueFromHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO (HeapDequeueResult t m a) Source #
requeueOnHeapTop :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Entry Int (AheadHeapEntry t m a) -> Int -> IO () Source #
updateHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> Int -> IO () Source #
Specifies the stream yield rate in yields per second (Hertz
).
We keep accumulating yield credits at rateGoal
. At any point of time we
allow only as many yields as we have accumulated as per rateGoal
since the
start of time. If the consumer or the producer is slower or faster, the
actual rate may fall behind or exceed rateGoal
. We try to recover the gap
between the two by increasing or decreasing the pull rate from the producer.
However, if the gap becomes more than rateBuffer
we try to recover only as
much as rateBuffer
.
rateLow
puts a bound on how low the instantaneous rate can go when
recovering the rate gap. In other words, it determines the maximum yield
latency. Similarly, rateHigh
puts a bound on how high the instantaneous
rate can go when recovering the rate gap. In other words, it determines the
minimum yield latency. We reduce the latency by increasing concurrency,
therefore we can say that it puts an upper bound on concurrency.
If the rateGoal
is 0 or negative the stream never yields a value.
If the rateBuffer
is 0 or negative we do not attempt to recover.
Since: 0.5.0 (Streamly)
Since: 0.8.0
getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo) Source #
collectLatency :: SVar t m a -> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64) Source #
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO () Source #
isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool Source #
workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool Source #
updateYieldCount :: WorkerInfo -> IO Count Source #
incrementYieldLimit :: SVar t m a -> IO () Source #
decrementBufferLimit :: SVar t m a -> IO () Source #
incrementBufferLimit :: SVar t m a -> IO () Source #
postProcessBounded :: MonadAsync m => SVar t m a -> m Bool Source #
postProcessPaced :: MonadAsync m => SVar t m a -> m Bool Source #
readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a] Source #
readOutputQPaced :: MonadAsync m => SVar t m a -> m [ChildEvent a] Source #
readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int) Source #
dispatchWorkerPaced :: MonadAsync m => SVar t m a -> m Bool Source #
sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a) Source #
doFork :: MonadBaseControl IO m => m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId Source #
Fork a thread to run the given computation, installing the provided exception handler. Lifted to any monad with 'MonadBaseControl IO m' capability.
TODO: the RunInIO argument can be removed, we can directly pass the action as "mrun action" instead.
fork :: MonadBaseControl IO m => m () -> m ThreadId Source #
fork
lifted to any monad with 'MonadBaseControl IO m' capability.
forkManaged :: (MonadIO m, MonadBaseControl IO m) => m () -> m ThreadId Source #
Fork a thread that is automatically killed as soon as the reference to the returned threadId is garbage collected.
toStreamVar :: MonadAsync m => SVar t m a -> t m a -> m () Source #
Write a stream to an SVar
in a non-blocking manner. The stream can then
be read back from the SVar using fromSVar
.
SVarStats | |
|