tubes-0.2.2.0: Effectful, iteratee-inspired stream processing based on a free monad.

Copyright(c) 2014, 2015 Gatlin Johnson <gatlin@niltag.net>
LicenseGPL-3
Maintainergatlin@niltag.net
Stabilityexperimental
Safe HaskellSafe
LanguageHaskell2010

Tubes

Contents

Description

Write effect-ful stream processing functions and compose them into a series of tubes.

This exists primarily for my own education. It is updated often as I try things and is probably, at this moment, wrong.

My goals were to

  • learn more about iteratees and stream processing; and
  • explore the relationships between functions, pairs, sum types, and products.

Synopsis

Documentation

A Tube is a computation that can yield multiple intermediate values or await intermediate inputs before computing a final result. Any monadic function may be turned into a Tube.

Tubes may be composed in different ways. For instance, in ghci:

   >>> run $ for (each [1..4] >< map show) $ lift . putStrLn
   1
   2
   3
   4
   

Here, each converts an Foldable into a Source of values; for performs a computation with each value. Another example, using two built-in Tubes for convenience:

   >>> run $ prompt >< filter (/= "Die Antwoord") >< map (++ " is bad") >< print
   > dubstep
   dubstep is bad
   > the sun
   the sun is bad
   > Die Antwoord
   > this example
   this example is bad
   

A few stream processing combinators are provided for mapping, filtering, taking, and other basic operations.

For those times when you want to reduce a stream, you can like so:

   >>> reduce (+) 0 id (each [1..10])
   55
   

>< is useful for combining Tubes which all have the same return value - most often () simply because every Source will have that value.

There is more in the library not covered here, and you are encouraged to take a look around.

type Tube a b = FreeT (TubeF a b) Source

A Tube is a computation which can

  • yield an intermediate value downstream and suspend execution; and
  • await a value from upstream, deferring execution until it is received.

Moreover, individual Tubes may be freely composed into larger ones, so long as their types match. Thus, one may write small, reusable building blocks and construct efficient stream process pipelines.

Since a much better engineered, more popular, and decidedly more mature library already uses the term "pipes" I have opted instead to think of my work as a series of tubes.

newtype TubeF a b k Source

TubeF is the union of unary functions and binary products into a single type, here defined with a Boehm-Berarducci encoding.

This type is equivalent to the following:

       data TubeF a b k
           = Await (a -> k) -- :: (a -> k) -> TubeF a b k
           | Yield (b  , k) -- :: (b  , k) -> TubeF a b k
   

The type signatures for the two value constructors should bear a strong resemblance to the actual type signature of runT. Instead of encoding tubes as structures which build up when composed, a TubeF is a control flow mechanism which picks one of two provided continuations.

People using this library should never have to contend with these details but it is worth mentioning.

Constructors

TubeF 

Fields

runT :: forall r. ((a -> k) -> r) -> ((b, k) -> r) -> r
 

Instances

type Source b m r = forall x. Tube x b m r Source

A computation which only yields and never awaits

type Sink a m r = forall x. Tube a x m r Source

A computation which only awaits and never yields.

Core infrastructure

run :: FreeT f m a -> m (FreeF f a (FreeT f m a)) Source

run is shorter than runFreeT and who knows, maybe it'll change some day

await :: Monad m => Tube a b m a Source

Command to wait for a new value upstream

yield :: Monad m => b -> Tube a b m () Source

Command to send a value downstream

each :: (Monad m, Foldable t) => t b -> Tube a b m () Source

Convert a list to a Source

for :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r Source

Enumerate yielded values into a continuation, creating a new Source

(~>) :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r Source

Infix version of for

(>-) :: Monad m => Tube a b m r -> (b -> Tube b c m r) -> Tube a c m r Source

Connect a task to a continuation yielding another task; see ><

(><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r infixl 3 Source

Compose two tubes into a new tube.

(|>) :: Monad m => Source b m () -> Sink (Maybe b) m s -> Sink (Maybe b) m s Source

Connects a Source to a Sink, finishing when either the Source is exhausted or the Sink terminates.

(-<) :: Monad m => a -> Sink a m b -> Sink a m b Source

Insert a value into a Sink

liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a)) Source

This performs a neat trick: a Tube with a return type a will be turned into a new Tube containing the underlying TubeF value.

In this way the >< and >- functions can replace the () return value with a continuation and recursively traverse the computation until a final result is reached.

Utilities

cat :: Monad m => Tube a a m r Source

Continuously relays any values it receives. Iteratee identity.

map :: Monad m => (a -> b) -> Tube a b m r Source

Transforms all incoming values according to some function.

drop :: Monad m => Int -> Tube a a m r Source

Refuses to yield the first n values it receives.

take :: Monad m => Int -> Tube a a m () Source

Relay only the first n elements of a stream.

takeWhile :: Monad m => (a -> Bool) -> Tube a a m () Source

Terminates the stream upon receiving a value violating the predicate

filter :: Monad m => (a -> Bool) -> Tube a a m r Source

Yields only values satisfying some predicate.

reduce Source

Arguments

:: Monad m 
=> (x -> a -> x)

step function

-> x

initial value

-> (x -> b)

final transformation

-> Source a m ()

stream source

-> m b 

Strict left-fold of a stream. Note that the actual return type of the source is not relevant, only the intermediate yield type.

every :: (Foldable t, Monad m) => t b -> Tube a (Maybe b) m () Source

Similar to each except it explicitly marks the stream as exhausted

unyield :: Monad m => FreeT (TubeF x b) m () -> m (Maybe (b, FreeT (TubeF x b) m ())) Source

Taps the next value from a source, maybe.

prompt :: Source String IO () Source

Source of Strings from stdin. This is mostly for debugging / ghci example purposes.

display :: Sink String IO () Source

Sink for Strings to stdout. This is mostly for debugging / ghci example purposes.

Pump

type Pump a b = CofreeT (PumpF a b) Source

A Pump is the dual to a Tube: where a Tube is a computation manipulating a stream of values, a Pump can be situated on either end of a tube to both insert values when requested and handle any yielded results.

One interesting use of a Pump is to feed data to a Tube, collecting the result as well as unused input:

   import Data.Functor.Identity

   p :: [a] -> Pump (Maybe a) x Identity [a]
   p inp = pump (return inp)
           (wa -> case (extract wa) of
               [] -> (Nothing, wa)
               x:xs -> (Just x, return xs))
           const

   -- a Sink that stops after 5 loops, or when input is exhausted
   add5 :: Sink (Maybe Int) IO Int
   add5 = loop 0 5 where
       loop acc ct = if 0 == ct
           then return acc
           else do
               mn <- await
               maybe (return acc)
                     (n -> loop (acc+n) (ct - 1))
                     mn

   result :: IO ([Int], Int)
   result = runPump (curry id) (p [1..10]) add5
   -- ([6,7,8,9,10],15)
   

Pumps are still being investigated by the author so if you come up with something interesting, please share!

data PumpF a b k Source

Constructors

PumpF 

Fields

recvF :: (a, k)
 
sendF :: b -> k
 

Instances

pump :: Comonad w => w a -> (w a -> (b, w a)) -> (w a -> c -> w a) -> Pump b c w a Source

Creates a Pump for a Tube using a comonadic seed value, a function to give it more data upon request, and a function to handle any yielded results.

Values received from the Tube may be altered and sent back into the tube, hence this mechanism does act like something of a pump.

send :: Comonad w => Pump a b w r -> b -> Pump a b w r Source

Send a value into a Pump, effectively re-seeding the stream.

recv :: Comonad w => Pump a b w r -> (a, Pump a b w r) Source

Pull a value from a Pump, along with the rest of the Pump.

runPump :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w x -> Tube a b m y -> m r Source

Given a suitably matching Tube and Pump, you can use the latter to execute the former.

Re-exports

lift :: MonadTrans t => forall m a. Monad m => m a -> t m a

Lift a computation from the argument monad to the constructed monad.

runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))