lawless-concurrent-machines: Concurrent networked stream transducers

This is a package candidate release! Here you can preview how this package release will appear once published to the main package index (which can be accomplished via the 'maintain' link below). Please note that once a package has been published to the main package index it cannot be undone! Please consult the package uploading documentation for more information.

[maintain] [Publish]

Warnings:

A simple use-case for this library is to run the stages of a pipelined streaming computation concurrently. If data is streaming through multiple processing stages, you might build a machine like

step1 >~> step2 >~> step3

The >~> operator connects the machines on either side with a one-element buffer. This means that data is pulled from upstream sources eagerly (perhaps pulling one more value than will be consumed by downstream), but it also means that each stage can be working simultaneously, increasing throughput of the entire pipeline.

A few small examples are available in the examples directory of the source repository.

NOTE: This is a temporary fork until concurrent-machines 0.3.1 is released.


[Skip to Readme]

Properties

Versions 0.3.1, 0.3.1
Change log CHANGELOG.md
Dependencies async (>=2.0.1 && <2.2), base (>=4.8 && <5), containers (>=0.5 && <0.6), lifted-async (>=0.1 && <0.10), machines (>=0.5 && <0.7), monad-control (>=1.0 && <1.1), semigroups (>=0.8 && <0.19), time (>=1.4 && <1.9), transformers (>=0.4 && <0.6), transformers-base (>=0.4 && <0.6) [details]
License BSD-3-Clause
Copyright Copyright (C) 2014 Anthony Cowley
Author Anthony Cowley
Maintainer acowley@gmail.com
Category Concurrency, Control
Source repo head: git clone git@gitlab.com:theunixman/haskell/lawless-concurrent-machines.git
Uploaded by misandrist at 2017-10-02T04:15:04Z

Modules

[Index]

Flags

Manual Flags

NameDescriptionDefault
splot

Build test with splot visual output

Disabled

Use -f <flag> to enable a flag, or -f -<flag> to disable that flag. More info

Downloads

Maintainer's Corner

Package maintainers

For package maintainers and hackage trustees


Readme for lawless-concurrent-machines-0.3.1

[back to package description]

A simple example of a pipelined computation whose throughput is improved by concurrently running distinct processing stages is given in examples/Pipeline.hs.

import Data.Time.Clock (getCurrentTime, diffUTCTime)
import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Machine.Concurrent

Suppose we have a worker that performs a computation on its input before producing output. This operation may take some time due to, say, network IO or just CPU load. Here we simulate an operation that takes some time with threadDelay.

worker :: String -> Double -> ProcessT IO () ()
worker name dt = repeatedly $ do _ <- await
                                 liftIO $ do
                                   putStrLn $ name ++ " working on its input"
                                   threadDelay dt'
                                 yield ()
  where dt' = floor $ dt * 1000000

We will use a little helper to time two variations of our test program.

timed :: MonadIO m => m a -> m (a, Double)
timed m = do t1 <- liftIO getCurrentTime
             r <- m
             t2 <- liftIO getCurrentTime
             return (r, realToFrac $ t2 `diffUTCTime` t1)

Now we will run a three-stage pipeline where each stage takes one second to process its input before yielding some output. A sequential execution strategy will thus take three seconds to pass an input through this pipeline. At the top-level, we will request three outputs, each of which will take three seconds to produce, resulting in a total execution time of approximately nine seconds.

main :: IO ()
main = do (r,dt) <- timed . runT . supply (repeat ()) $
            worker "A" 1 ~> worker "B" 1 ~> worker "C" 1 ~> taking 3
          putStrLn $ "Sequentially produced "++show r
          putStrLn $ "Sequential processing took "++show dt++"s"

If we instead run the same arrangement as a pipelined computation, we allow the independent stages to run concurrently, much like the stages in a pipelined CPU.

          (r',dt') <- timed . runT . supply (repeat ()) $
            worker "A" 1 >~> worker "B" 1 >~> worker "C" 1 >~> taking 3
          putStrLn $ "Pipeline produced "++show r'
          putStrLn $ "Pipeline processing took "++show dt'++"s"

With this arrangement, the first output we request takes three seconds to produce as we must wait for an input to pass through the entire length of the pipeline. However, successive outputs are following that first output through the pipeline so that we will produce more output at one second intervals. Therefore it takes is 3 + 2 = 5 seconds to produce three outputs: three seconds for the first output, and one second for each of the following two.

Build Status