Copyright | (c) 2017 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Synopsis
- data ThreadAbort = ThreadAbort
- data ChildEvent a
- newtype RunInIO m = RunInIO {}
- data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a
- = AheadEntryNull
- | AheadEntryPure a
- | AheadEntryStream (RunInIO m, t m a)
- newtype Count = Count Int64
- data Limit
- data SVarStyle
- data SVarStopStyle
- data SVarStats = SVarStats {}
- data WorkerInfo = WorkerInfo {}
- data PushBufferPolicy
- data LatencyRange = LatencyRange {}
- data YieldRateInfo = YieldRateInfo {
- svarLatencyTarget :: NanoSecond64
- svarLatencyRange :: LatencyRange
- svarRateBuffer :: Int
- svarGainedLostYields :: IORef Count
- svarAllTimeLatency :: IORef (Count, AbsTime)
- workerBootstrapLatency :: Maybe NanoSecond64
- workerPollingInterval :: IORef Count
- workerPendingLatency :: IORef (Count, Count, NanoSecond64)
- workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
- workerMeasuredLatency :: IORef NanoSecond64
- data SVar t m a = SVar {
- svarStyle :: SVarStyle
- svarMrun :: RunInIO m
- svarStopStyle :: SVarStopStyle
- svarStopBy :: IORef ThreadId
- outputQueue :: IORef ([ChildEvent a], Int)
- outputDoorBell :: MVar ()
- readOutputQ :: m [ChildEvent a]
- postProcess :: m Bool
- outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
- outputDoorBellFromConsumer :: MVar ()
- maxWorkerLimit :: Limit
- maxBufferLimit :: Limit
- pushBufferSpace :: IORef Count
- pushBufferPolicy :: PushBufferPolicy
- pushBufferMVar :: MVar ()
- remainingWork :: Maybe (IORef Count)
- yieldRateInfo :: Maybe YieldRateInfo
- enqueue :: (RunInIO m, t m a) -> IO ()
- isWorkDone :: IO Bool
- isQueueDone :: IO Bool
- needDoorBell :: IORef Bool
- workLoop :: Maybe WorkerInfo -> m ()
- workerThreads :: IORef (Set ThreadId)
- workerCount :: IORef Int
- accountThread :: ThreadId -> m ()
- workerStopMVar :: MVar ()
- svarStats :: SVarStats
- svarRef :: Maybe (IORef ())
- svarInspectMode :: Bool
- svarCreator :: ThreadId
- outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
- aheadWorkQueue :: IORef ([t m a], Int)
- data Rate = Rate {}
- data State t m a
- magicMaxBuffer :: Word
- defState :: State t m a
- adaptState :: State t m a -> State t n b
- getMaxThreads :: State t m a -> Limit
- setMaxThreads :: Int -> State t m a -> State t m a
- getMaxBuffer :: State t m a -> Limit
- setMaxBuffer :: Int -> State t m a -> State t m a
- getStreamRate :: State t m a -> Maybe Rate
- setStreamRate :: Maybe Rate -> State t m a -> State t m a
- getStreamLatency :: State t m a -> Maybe NanoSecond64
- setStreamLatency :: Int -> State t m a -> State t m a
- getYieldLimit :: State t m a -> Maybe Count
- setYieldLimit :: Maybe Int64 -> State t m a -> State t m a
- getInspectMode :: State t m a -> Bool
- setInspectMode :: State t m a -> State t m a
Parent child communication
data ThreadAbort Source #
Instances
Show ThreadAbort Source # | |
Defined in Streamly.Internal.Data.SVar.Type showsPrec :: Int -> ThreadAbort -> ShowS # show :: ThreadAbort -> String # showList :: [ThreadAbort] -> ShowS # | |
Exception ThreadAbort Source # | |
Defined in Streamly.Internal.Data.SVar.Type |
data ChildEvent a Source #
Events that a child thread may send to a parent thread.
data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a Source #
Sorting out-of-turn outputs in a heap for Ahead style streams
AheadEntryNull | |
AheadEntryPure a | |
AheadEntryStream (RunInIO m, t m a) |
SVar
Instances
Bounded Count Source # | |
Enum Count Source # | |
Defined in Streamly.Internal.Data.SVar.Type | |
Eq Count Source # | |
Integral Count Source # | |
Num Count Source # | |
Ord Count Source # | |
Read Count Source # | |
Real Count Source # | |
Defined in Streamly.Internal.Data.SVar.Type toRational :: Count -> Rational # | |
Show Count Source # | |
Identify the type of the SVar. Two computations using the same style can be scheduled on the same SVar.
data SVarStopStyle Source #
Instances
Eq SVarStopStyle Source # | |
Defined in Streamly.Internal.Data.SVar.Type (==) :: SVarStopStyle -> SVarStopStyle -> Bool # (/=) :: SVarStopStyle -> SVarStopStyle -> Bool # | |
Show SVarStopStyle Source # | |
Defined in Streamly.Internal.Data.SVar.Type showsPrec :: Int -> SVarStopStyle -> ShowS # show :: SVarStopStyle -> String # showList :: [SVarStopStyle] -> ShowS # |
SVarStats | |
|
data WorkerInfo Source #
An SVar or a Stream Var is a conduit to the output from multiple streams running concurrently and asynchronously. An SVar can be thought of as an asynchronous IO handle. We can write any number of streams to an SVar in a non-blocking manner and then read them back at any time at any pace. The SVar would run the streams asynchronously and accumulate results. An SVar may not really execute the stream completely and accumulate all the results. However, it ensures that the reader can read the results at whatever paces it wants to read. The SVar monitors and adapts to the consumer's pace.
An SVar is a mini scheduler, it has an associated workLoop that holds the
stream tasks to be picked and run by a pool of worker threads. It has an
associated output queue where the output stream elements are placed by the
worker threads. A outputDoorBell is used by the worker threads to intimate the
consumer thread about availability of new results in the output queue. More
workers are added to the SVar by fromStreamVar
on demand if the output
produced is not keeping pace with the consumer. On bounded SVars, workers
block on the output queue to provide throttling of the producer when the
consumer is not pulling fast enough. The number of workers may even get
reduced depending on the consuming pace.
New work is enqueued either at the time of creation of the SVar or as a
result of executing the parallel combinators i.e. <|
and <|>
when the
already enqueued computations get evaluated. See joinStreamVarAsync
.
data PushBufferPolicy Source #
Buffering policy for persistent push workers (in ParallelT). In a pull style SVar (in AsyncT, AheadT etc.), the consumer side dispatches workers on demand, workers terminate if the buffer is full or if the consumer is not cosuming fast enough. In a push style SVar, a worker is dispatched only once, workers are persistent and keep pushing work to the consumer via a bounded buffer. If the buffer becomes full the worker either blocks, or it can drop an item from the buffer to make space.
Pull style SVars are useful in lazy stream evaluation whereas push style SVars are useful in strict left Folds.
XXX Maybe we can separate the implementation in two different types instead of using a common SVar type.
data LatencyRange Source #
Instances
Show LatencyRange Source # | |
Defined in Streamly.Internal.Data.SVar.Type showsPrec :: Int -> LatencyRange -> ShowS # show :: LatencyRange -> String # showList :: [LatencyRange] -> ShowS # |
data YieldRateInfo Source #
State threaded around the stream
Specifies the stream yield rate in yields per second (Hertz
).
We keep accumulating yield credits at rateGoal
. At any point of time we
allow only as many yields as we have accumulated as per rateGoal
since the
start of time. If the consumer or the producer is slower or faster, the
actual rate may fall behind or exceed rateGoal
. We try to recover the gap
between the two by increasing or decreasing the pull rate from the producer.
However, if the gap becomes more than rateBuffer
we try to recover only as
much as rateBuffer
.
rateLow
puts a bound on how low the instantaneous rate can go when
recovering the rate gap. In other words, it determines the maximum yield
latency. Similarly, rateHigh
puts a bound on how high the instantaneous
rate can go when recovering the rate gap. In other words, it determines the
minimum yield latency. We reduce the latency by increasing concurrency,
therefore we can say that it puts an upper bound on concurrency.
If the rateGoal
is 0 or negative the stream never yields a value.
If the rateBuffer
is 0 or negative we do not attempt to recover.
Since: 0.5.0 (Streamly)
Since: 0.8.0
Default State
Type cast
adaptState :: State t m a -> State t n b Source #
Adapt the stream state from one type to another.
State accessors
getMaxThreads :: State t m a -> Limit Source #
getMaxBuffer :: State t m a -> Limit Source #
getStreamLatency :: State t m a -> Maybe NanoSecond64 Source #
getInspectMode :: State t m a -> Bool Source #
setInspectMode :: State t m a -> State t m a Source #