streamly-0.10.1: Streaming, dataflow programming and declarative concurrency
Copyright(c) 2017 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Internal.Data.SVar

Description

Deprecated: SVar is replaced by Channel.

Synopsis

Documentation

Adjusting Limits

Rate Control

data Work Source #

Instances

Instances details
Show Work Source # 
Instance details

Defined in Streamly.Internal.Data.SVar.Worker

Methods

showsPrec :: Int -> Work -> ShowS #

show :: Work -> String #

showList :: [Work] -> ShowS #

minThreadDelay :: NanoSecond64 Source #

This is a magic number and it is overloaded, and used at several places to achieve batching:

  1. If we have to sleep to slowdown this is the minimum period that we accumulate before we sleep. Also, workers do not stop until this much sleep time is accumulated.
  2. Collected latencies are computed and transferred to measured latency after a minimum of this period.

Send Events

send :: SVar t m a -> ChildEvent a -> IO Int Source #

This function is used by the producer threads to queue output for the consumer thread to consume. Returns whether the queue has more space.

ringDoorBell :: SVar t m a -> IO () Source #

Yield

Stop

sendStop :: SVar t m a -> Maybe WorkerInfo -> IO () Source #

sendStopToProducer :: MonadIO m => SVar t m a -> m () Source #

Exception

Latency collection

Diagnostics

withDiagMVar :: SVar t m a -> String -> IO () -> IO () Source #

printSVar :: SVar t m a -> String -> IO () Source #

Thread accounting

delThread :: MonadIO m => SVar t m a -> ThreadId -> m () Source #

modifyThread :: MonadIO m => SVar t m a -> ThreadId -> m () Source #

allThreadsDone :: MonadIO m => SVar t m a -> m Bool Source #

This is safe even if we are adding more threads concurrently because if a child thread is adding another thread then anyway workerThreads will not be empty.

Dispatching

recordMaxWorkers :: MonadIO m => SVar t m a -> m () Source #

pushWorker :: MonadAsync m => Count -> SVar t m a -> m () Source #

pushWorkerPar :: MonadAsync m => SVar t m a -> (Maybe WorkerInfo -> m ()) -> m () Source #

In contrast to pushWorker which always happens only from the consumer thread, a pushWorkerPar can happen concurrently from multiple threads on the producer side. So we need to use a thread safe modification of workerThreads. Alternatively, we can use a CreateThread event to avoid using a CAS based modification.

sendWorkerWait :: MonadAsync m => (SVar t m a -> IO ()) -> (SVar t m a -> m Bool) -> SVar t m a -> m () Source #

sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a) Source #

sendWorkerDelay :: SVar t m a -> IO () Source #

Read Output

Postprocess Hook After Reading

Release Resources

cleanupSVar :: SVar t m a -> IO () Source #

New SVar

Parallel

newParallelVar :: MonadAsync m => SVarStopStyle -> State t m a -> m (SVar t m a) Source #

Ahead

enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO () Source #

reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO () Source #

queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool Source #

dequeueAhead :: MonadIO m => IORef ([t m a], Int) -> m (Maybe (t m a, Int)) Source #

withIORef :: IORef a -> (a -> IO b) -> IO b Source #

newAheadVar :: MonadAsync m => State t m a -> t m a -> (IORef ([t m a], Int) -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int) -> State t m a -> SVar t m a -> Maybe WorkerInfo -> m ()) -> m (SVar t m a) Source #