streamly-0.7.2: Beautiful Streaming, Concurrent and Reactive Composition

Copyright(c) 2017 Harendra Kumar
LicenseBSD3
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.Internal.Data.SVar

Description

 
Synopsis

Documentation

type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) Source #

A monad that can perform concurrent or parallel IO operations. Streams that can be composed concurrently require the underlying monad to be MonadAsync.

Since: 0.1.0

data SVarStyle Source #

Identify the type of the SVar. Two computations using the same style can be scheduled on the same SVar.

Instances
Eq SVarStyle Source # 
Instance details

Defined in Streamly.Internal.Data.SVar

Show SVarStyle Source # 
Instance details

Defined in Streamly.Internal.Data.SVar

data Limit Source #

Constructors

Unlimited 
Limited Word 
Instances
Eq Limit Source # 
Instance details

Defined in Streamly.Internal.Data.SVar

Methods

(==) :: Limit -> Limit -> Bool #

(/=) :: Limit -> Limit -> Bool #

Ord Limit Source # 
Instance details

Defined in Streamly.Internal.Data.SVar

Methods

compare :: Limit -> Limit -> Ordering #

(<) :: Limit -> Limit -> Bool #

(<=) :: Limit -> Limit -> Bool #

(>) :: Limit -> Limit -> Bool #

(>=) :: Limit -> Limit -> Bool #

max :: Limit -> Limit -> Limit #

min :: Limit -> Limit -> Limit #

Show Limit Source # 
Instance details

Defined in Streamly.Internal.Data.SVar

Methods

showsPrec :: Int -> Limit -> ShowS #

show :: Limit -> String #

showList :: [Limit] -> ShowS #

data State t m a Source #

adaptState :: State t m a -> State t n b Source #

Adapt the stream state from one type to another.

setMaxThreads :: Int -> State t m a -> State t m a Source #

setMaxBuffer :: Int -> State t m a -> State t m a Source #

setStreamRate :: Maybe Rate -> State t m a -> State t m a Source #

setStreamLatency :: Int -> State t m a -> State t m a Source #

getYieldLimit :: State t m a -> Maybe Count Source #

setYieldLimit :: Maybe Int64 -> State t m a -> State t m a Source #

setInspectMode :: State t m a -> State t m a Source #

recordMaxWorkers :: MonadIO m => SVar t m a -> m () Source #

cleanupSVar :: SVar t m a -> IO () Source #

newAheadVar :: MonadAsync m => State t m a -> t m a -> (IORef ([t m a], Int) -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> State t m a -> SVar t m a -> Maybe WorkerInfo -> m ()) -> m (SVar t m a) Source #

newParallelVar :: MonadAsync m => SVarStopStyle -> State t m a -> m (SVar t m a) Source #

newtype RunInIO m Source #

Constructors

RunInIO 

Fields

data WorkerInfo Source #

An SVar or a Stream Var is a conduit to the output from multiple streams running concurrently and asynchronously. An SVar can be thought of as an asynchronous IO handle. We can write any number of streams to an SVar in a non-blocking manner and then read them back at any time at any pace. The SVar would run the streams asynchronously and accumulate results. An SVar may not really execute the stream completely and accumulate all the results. However, it ensures that the reader can read the results at whatever paces it wants to read. The SVar monitors and adapts to the consumer's pace.

An SVar is a mini scheduler, it has an associated workLoop that holds the stream tasks to be picked and run by a pool of worker threads. It has an associated output queue where the output stream elements are placed by the worker threads. A outputDoorBell is used by the worker threads to intimate the consumer thread about availability of new results in the output queue. More workers are added to the SVar by fromStreamVar on demand if the output produced is not keeping pace with the consumer. On bounded SVars, workers block on the output queue to provide throttling of the producer when the consumer is not pulling fast enough. The number of workers may even get reduced depending on the consuming pace.

New work is enqueued either at the time of creation of the SVar or as a result of executing the parallel combinators i.e. <| and <|> when the already enqueued computations get evaluated. See joinStreamVarAsync.

Constructors

WorkerInfo 

Fields

data ChildEvent a Source #

Events that a child thread may send to a parent thread.

data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a Source #

Sorting out-of-turn outputs in a heap for Ahead style streams

send :: SVar t m a -> ChildEvent a -> IO Int Source #

This function is used by the producer threads to queue output for the consumer thread to consume. Returns whether the queue has more space.

sendStop :: SVar t m a -> Maybe WorkerInfo -> IO () Source #

sendStopToProducer :: MonadIO m => SVar t m a -> m () Source #

enqueueLIFO :: SVar t m a -> IORef [t m a] -> t m a -> IO () Source #

enqueueFIFO :: SVar t m a -> LinkedQueue (t m a) -> t m a -> IO () Source #

enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO () Source #

reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO () Source #

pushWorkerPar :: MonadAsync m => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m () Source #

In contrast to pushWorker which always happens only from the consumer thread, a pushWorkerPar can happen concurrently from multiple threads on the producer side. So we need to use a thread safe modification of workerThreads. Alternatively, we can use a CreateThread event to avoid using a CAS based modification.

queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool Source #

dequeueAhead :: MonadIO m => IORef ([t m a], Int) -> m (Maybe (t m a, Int)) Source #

withIORef :: IORef a -> (a -> IO b) -> IO b Source #

data Rate Source #

Specifies the stream yield rate in yields per second (Hertz). We keep accumulating yield credits at rateGoal. At any point of time we allow only as many yields as we have accumulated as per rateGoal since the start of time. If the consumer or the producer is slower or faster, the actual rate may fall behind or exceed rateGoal. We try to recover the gap between the two by increasing or decreasing the pull rate from the producer. However, if the gap becomes more than rateBuffer we try to recover only as much as rateBuffer.

rateLow puts a bound on how low the instantaneous rate can go when recovering the rate gap. In other words, it determines the maximum yield latency. Similarly, rateHigh puts a bound on how high the instantaneous rate can go when recovering the rate gap. In other words, it determines the minimum yield latency. We reduce the latency by increasing concurrency, therefore we can say that it puts an upper bound on concurrency.

If the rateGoal is 0 or negative the stream never yields a value. If the rateBuffer is 0 or negative we do not attempt to recover.

Since: 0.5.0

Constructors

Rate 

Fields

sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a) Source #

delThread :: MonadIO m => SVar t m a -> ThreadId -> m () Source #

modifyThread :: MonadIO m => SVar t m a -> ThreadId -> m () Source #

doFork :: MonadBaseControl IO m => m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId Source #

forkManaged :: (MonadIO m, MonadBaseControl IO m) => m () -> m ThreadId Source #

Fork a thread that is automatically killed as soon as the reference to the returned threadId is garbage collected.

toStreamVar :: MonadAsync m => SVar t m a -> t m a -> m () Source #

Write a stream to an SVar in a non-blocking manner. The stream can then be read back from the SVar using fromSVar.

printSVar :: SVar t m a -> String -> IO () Source #

withDiagMVar :: SVar t m a -> String -> IO () -> IO () Source #