stm-conduit-4.0.0: Introduces conduits to channels, and promotes using conduits concurrently.

Safe HaskellNone
LanguageHaskell98

Data.Conduit.TMChan

Contents

Description

  • Introduction

Contains a simple source and sink for linking together conduits in in different threads. Usage is so easy, it's best explained with an example:

We first create a channel for communication...

do chan <- atomically $ newTBMChan 16

Then we fork a new thread loading a wackton of pictures into memory. The data (pictures, in this case) will be streamed down the channel to whatever is on the other side.

   _ <- forkIO . runResourceT $ do
         _ <- register $ atomically $ closeTBMChan chan
         loadTextures lotsOfPictures $$ sinkTBMChan chan

We register closing function explicitly, because starting with version 1.3.0 conduits library no longer maintain resources, so this is the only way to safely close channel in case of exceptions.

Finally, we connect something to the other end of the channel. In this case, we connect a sink which uploads the textures one by one to the graphics card.

   runResourceT $ sourceTBMChan chan $$ Conduit.mapM_ (liftIO . uploadToGraphicsCard)

By running the two tasks in parallel, we no longer have to wait for one texture to upload to the graphics card before reading the next one from disk. This avoids the common switching of bottlenecks (such as between the disk and graphics memory) that most loading processes seem to love.

Control.Concurrent.STM.TMChan and Control.Concurrent.STM.TBMChan are re-exported for convenience.

  • Caveats

It is recommended to use TBMChan as much as possible, and generally avoid TMChan usage. TMChans are unbounded, and if used, the conduit pipeline will no longer use a bounded amount of space. They will essentially leak memory if the writer is faster than the reader.

Therefore, use bounded channels as much as possible, preferably with a high bound so it will be hit infrequently.

Synopsis

Bounded Channel Connectors

sourceTBMChan :: MonadIO m => TBMChan a -> ConduitT () a m () Source #

A simple wrapper around a TBMChan. As data is pushed into the channel, the source will read it and pass it down the conduit pipeline. When the channel is closed, the source will close also.

If the channel fills up, the pipeline will stall until values are read.

sinkTBMChan :: MonadIO m => TBMChan a -> ConduitT a z m () Source #

A simple wrapper around a TBMChan. As data is pushed into the sink, it will magically begin to appear in the channel. If the channel is full, the sink will block until space frees up.

Unbounded Channel Connectors

sourceTMChan :: MonadIO m => TMChan a -> ConduitT () a m () Source #

A simple wrapper around a TMChan. As data is pushed into the channel, the source will read it and pass it down the conduit pipeline. When the channel is closed, the source will close also.

sinkTMChan :: MonadIO m => TMChan a -> ConduitT a z m () Source #

A simple wrapper around a TMChan. As data is pushed into this sink, it will magically begin to appear in the channel.

Parallel Combinators

(>=<) :: (MonadResource mi, MonadIO mo, MonadUnliftIO mi) => ConduitT () a mi () -> ConduitT () a mi () -> mo (ConduitT () a mi ()) infixl 5 Source #

Combines two sources with an unbounded channel, creating a new source which pulls data from a mix of the two sources: whichever produces first.

The order of the new source's data is undefined, but it will be some combination of the two given sources.

mergeSources Source #

Arguments

:: (MonadResource mi, MonadIO mo, MonadUnliftIO mi) 
=> [ConduitT () a mi ()]

The sources to merge.

-> Int

The bound of the intermediate channel.

-> mo (ConduitT () a mi ()) 

Merges a list of sources, putting them all into a bounded channel, and returns a source which can be pulled from to pull from all the given sources in a first-come-first-serve basis.

The order of the new source's data is undefined, but it will be some combination of the given sources. The monad of the resultant source (mo) is independent of the monads of the input sources (mi).

All spawned threads will be removed when source is closed or upon an exit from ResourceT region. This means that result can only be used within a runResourceT scope.

@before 3.0 Spawned threads are not guaranteed to be closed. This may happen if Source was closed before all it's input were closed.

Since: 3.0

(<=>) :: (MonadThrow mi, MonadIO mo, MonadUnliftIO mi) => ConduitT i i (ResourceT mi) () -> ConduitT i i (ResourceT mi) () -> ResourceT mi (ConduitT i i mo ()) Source #

Combines two conduits with unbounded channels, creating a new conduit which pulls data from a mix of the two: whichever produces first.

The order of the new conduit's output is undefined, but it will be some combination of the two given conduits.

mergeConduits Source #

Arguments

:: (MonadIO mo, MonadUnliftIO mi) 
=> [ConduitT i o (ResourceT mi) ()]

The conduits to merge.

-> Int

The bound for the channels.

-> ResourceT mi (ConduitT i o mo ()) 

Deprecated: This method will dissapear in the next version.

Provide an input across several conduits, putting them all into a bounded channel. Returns a conduit which can be pulled from to pull from all the given conduits in a first-come-first-serve basis.

The order of the new conduits's outputs is undefined, but it will be some combination of the given conduits. The monad of the resultant conduit (mo) is independent of the monads of the input conduits (mi).

Closes all worker processes when resulting conduit is closed or when execution leaves ResourceT context. This means that conduit is only valid inside runResouceT scope.

@before 3.0 Spawned threads are not guaranteed to be closed, This may happen if threads Conduit was closed before all threads have finished execution.

Since: 3.0