streamly-0.7.2: Beautiful Streaming, Concurrent and Reactive Composition

Copyright (c) 2017 Harendra Kumar BSD3 streamly@composewell.com experimental GHC None Haskell2010

Streamly.Internal.Data.SVar

Description

Synopsis

# Documentation

A monad that can perform concurrent or parallel IO operations. Streams that can be composed concurrently require the underlying monad to be MonadAsync.

Since: 0.1.0

data SVarStyle Source #

Identify the type of the SVar. Two computations using the same style can be scheduled on the same SVar.

Constructors

 AsyncVar WAsyncVar ParallelVar AheadVar
Instances
 Source # Instance detailsDefined in Streamly.Internal.Data.SVar Methods Source # Instance detailsDefined in Streamly.Internal.Data.SVar MethodsshowList :: [SVarStyle] -> ShowS #

Constructors

 StopNone StopAny StopBy
Instances
 Source # Instance detailsDefined in Streamly.Internal.Data.SVar Methods Source # Instance detailsDefined in Streamly.Internal.Data.SVar MethodsshowList :: [SVarStopStyle] -> ShowS #

data SVar t m a Source #

Constructors

 SVar FieldssvarStyle :: SVarStyle svarMrun :: RunInIO m svarStopStyle :: SVarStopStyle svarStopBy :: IORef ThreadId outputQueue :: IORef ([ChildEvent a], Int) outputDoorBell :: MVar () readOutputQ :: m [ChildEvent a] postProcess :: m Bool outputQueueFromConsumer :: IORef ([ChildEvent a], Int) outputDoorBellFromConsumer :: MVar () maxWorkerLimit :: Limit maxBufferLimit :: Limit pushBufferSpace :: IORef Count pushBufferPolicy :: PushBufferPolicy pushBufferMVar :: MVar () remainingWork :: Maybe (IORef Count) yieldRateInfo :: Maybe YieldRateInfo enqueue :: t m a -> IO () isWorkDone :: IO Bool isQueueDone :: IO Bool needDoorBell :: IORef Bool workLoop :: Maybe WorkerInfo -> m () workerThreads :: IORef (Set ThreadId) workerCount :: IORef Int accountThread :: ThreadId -> m () workerStopMVar :: MVar () svarStats :: SVarStats svarRef :: Maybe (IORef ()) svarInspectMode :: Bool svarCreator :: ThreadId outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) aheadWorkQueue :: IORef ([t m a], Int)

data Limit Source #

Constructors

 Unlimited Limited Word
Instances
 Source # Instance detailsDefined in Streamly.Internal.Data.SVar Methods(==) :: Limit -> Limit -> Bool #(/=) :: Limit -> Limit -> Bool # Source # Instance detailsDefined in Streamly.Internal.Data.SVar Methods(<) :: Limit -> Limit -> Bool #(<=) :: Limit -> Limit -> Bool #(>) :: Limit -> Limit -> Bool #(>=) :: Limit -> Limit -> Bool #max :: Limit -> Limit -> Limit #min :: Limit -> Limit -> Limit # Source # Instance detailsDefined in Streamly.Internal.Data.SVar MethodsshowsPrec :: Int -> Limit -> ShowS #show :: Limit -> String #showList :: [Limit] -> ShowS #

data State t m a Source #

adaptState :: State t m a -> State t n b Source #

Adapt the stream state from one type to another.

setMaxThreads :: Int -> State t m a -> State t m a Source #

setMaxBuffer :: Int -> State t m a -> State t m a Source #

setStreamRate :: Maybe Rate -> State t m a -> State t m a Source #

setStreamLatency :: Int -> State t m a -> State t m a Source #

getYieldLimit :: State t m a -> Maybe Count Source #

setYieldLimit :: Maybe Int64 -> State t m a -> State t m a Source #

setInspectMode :: State t m a -> State t m a Source #

recordMaxWorkers :: MonadIO m => SVar t m a -> m () Source #

cleanupSVar :: SVar t m a -> IO () Source #

newAheadVar :: MonadAsync m => State t m a -> t m a -> (IORef ([t m a], Int) -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> State t m a -> SVar t m a -> Maybe WorkerInfo -> m ()) -> m (SVar t m a) Source #

newParallelVar :: MonadAsync m => SVarStopStyle -> State t m a -> m (SVar t m a) Source #

newtype RunInIO m Source #

Constructors

 RunInIO FieldsrunInIO :: forall b. m b -> IO (StM m b)

An SVar or a Stream Var is a conduit to the output from multiple streams running concurrently and asynchronously. An SVar can be thought of as an asynchronous IO handle. We can write any number of streams to an SVar in a non-blocking manner and then read them back at any time at any pace. The SVar would run the streams asynchronously and accumulate results. An SVar may not really execute the stream completely and accumulate all the results. However, it ensures that the reader can read the results at whatever paces it wants to read. The SVar monitors and adapts to the consumer's pace.

An SVar is a mini scheduler, it has an associated workLoop that holds the stream tasks to be picked and run by a pool of worker threads. It has an associated output queue where the output stream elements are placed by the worker threads. A outputDoorBell is used by the worker threads to intimate the consumer thread about availability of new results in the output queue. More workers are added to the SVar by fromStreamVar on demand if the output produced is not keeping pace with the consumer. On bounded SVars, workers block on the output queue to provide throttling of the producer when the consumer is not pulling fast enough. The number of workers may even get reduced depending on the consuming pace.

New work is enqueued either at the time of creation of the SVar or as a result of executing the parallel combinators i.e. <| and <|> when the already enqueued computations get evaluated. See joinStreamVarAsync.

Constructors

 WorkerInfo FieldsworkerYieldMax :: Count workerYieldCount :: IORef Count workerLatencyStart :: IORef (Count, AbsTime)

Constructors

 YieldRateInfo FieldssvarLatencyTarget :: NanoSecond64 svarLatencyRange :: LatencyRange svarRateBuffer :: Int svarGainedLostYields :: IORef Count svarAllTimeLatency :: IORef (Count, AbsTime) workerBootstrapLatency :: Maybe NanoSecond64 workerPollingInterval :: IORef Count workerPendingLatency :: IORef (Count, Count, NanoSecond64) workerCollectedLatency :: IORef (Count, Count, NanoSecond64) workerMeasuredLatency :: IORef NanoSecond64

Constructors

Instances
 Source # Instance detailsDefined in Streamly.Internal.Data.SVar MethodsshowList :: [ThreadAbort] -> ShowS # Source # Instance detailsDefined in Streamly.Internal.Data.SVar Methods

data ChildEvent a Source #

Events that a child thread may send to a parent thread.

Constructors

 ChildYield a ChildStop ThreadId (Maybe SomeException)

data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a Source #

Sorting out-of-turn outputs in a heap for Ahead style streams

Constructors

send :: SVar t m a -> ChildEvent a -> IO Int Source #

This function is used by the producer threads to queue output for the consumer thread to consume. Returns whether the queue has more space.

sendStop :: SVar t m a -> Maybe WorkerInfo -> IO () Source #

sendStopToProducer :: MonadIO m => SVar t m a -> m () Source #

enqueueLIFO :: SVar t m a -> IORef [t m a] -> t m a -> IO () Source #

enqueueFIFO :: SVar t m a -> LinkedQueue (t m a) -> t m a -> IO () Source #

enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO () Source #

reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO () Source #

pushWorkerPar :: MonadAsync m => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m () Source #

In contrast to pushWorker which always happens only from the consumer thread, a pushWorkerPar can happen concurrently from multiple threads on the producer side. So we need to use a thread safe modification of workerThreads. Alternatively, we can use a CreateThread event to avoid using a CAS based modification.

queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool Source #

dequeueAhead :: MonadIO m => IORef ([t m a], Int) -> m (Maybe (t m a, Int)) Source #

data HeapDequeueResult t m a Source #

Constructors

 Clearing Waiting Int Ready (Entry Int (AheadHeapEntry t m a))

withIORef :: IORef a -> (a -> IO b) -> IO b Source #

data Rate Source #

Specifies the stream yield rate in yields per second (Hertz). We keep accumulating yield credits at rateGoal. At any point of time we allow only as many yields as we have accumulated as per rateGoal since the start of time. If the consumer or the producer is slower or faster, the actual rate may fall behind or exceed rateGoal. We try to recover the gap between the two by increasing or decreasing the pull rate from the producer. However, if the gap becomes more than rateBuffer we try to recover only as much as rateBuffer.

rateLow puts a bound on how low the instantaneous rate can go when recovering the rate gap. In other words, it determines the maximum yield latency. Similarly, rateHigh puts a bound on how high the instantaneous rate can go when recovering the rate gap. In other words, it determines the minimum yield latency. We reduce the latency by increasing concurrency, therefore we can say that it puts an upper bound on concurrency.

If the rateGoal is 0 or negative the stream never yields a value. If the rateBuffer is 0 or negative we do not attempt to recover.

Since: 0.5.0

Constructors

 Rate FieldsrateLow :: DoubleThe lower rate limitrateGoal :: DoubleThe target rate we want to achieverateHigh :: DoubleThe upper rate limitrateBuffer :: IntMaximum slack from the goal

sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a) Source #

delThread :: MonadIO m => SVar t m a -> ThreadId -> m () Source #

modifyThread :: MonadIO m => SVar t m a -> ThreadId -> m () Source #

doFork :: MonadBaseControl IO m => m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId Source #

forkManaged :: (MonadIO m, MonadBaseControl IO m) => m () -> m ThreadId Source #

Fork a thread that is automatically killed as soon as the reference to the returned threadId is garbage collected.

toStreamVar :: MonadAsync m => SVar t m a -> t m a -> m () Source #

Write a stream to an SVar in a non-blocking manner. The stream can then be read back from the SVar using fromSVar.

data SVarStats Source #

Constructors

 SVarStats FieldstotalDispatches :: IORef Int maxWorkers :: IORef Int maxOutQSize :: IORef Int maxHeapSize :: IORef Int maxWorkQSize :: IORef Int avgWorkerLatency :: IORef (Count, NanoSecond64) minWorkerLatency :: IORef NanoSecond64 maxWorkerLatency :: IORef NanoSecond64 svarStopTime :: IORef (Maybe AbsTime)

printSVar :: SVar t m a -> String -> IO () Source #

withDiagMVar :: SVar t m a -> String -> IO () -> IO () Source #