Copyright | (c) 2014, 2015 Gatlin Johnson <gatlin@niltag.net> |
---|---|
License | GPL-3 |
Maintainer | gatlin@niltag.net |
Stability | experimental |
Safe Haskell | Safe |
Language | Haskell2010 |
Write effect-ful stream processing functions and compose them into a series of tubes.
This exists primarily for my own education. It is updated often as I try things and is probably, at this moment, wrong.
My goals were to
- learn more about iteratees and stream processing; and
- explore the relationships between functions, pairs, sum types, and products.
- type Tube a b = FreeT (TubeF a b)
- newtype TubeF a b k = TubeF {
- runT :: forall r. ((a -> k) -> r) -> ((b, k) -> r) -> r
- type Source b m r = forall x. Tube x b m r
- type Sink a m r = forall x. Tube a x m r
- run :: FreeT f m a -> m (FreeF f a (FreeT f m a))
- await :: Monad m => Tube a b m a
- yield :: Monad m => b -> Tube a b m ()
- each :: (Monad m, Foldable t) => t b -> Tube a b m ()
- for :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- (~>) :: Monad m => Tube a b m r -> (b -> Tube a c m s) -> Tube a c m r
- (>-) :: Monad m => Tube a b m r -> (b -> Tube b c m r) -> Tube a c m r
- (><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r
- (|>) :: Monad m => Source b m () -> Sink (Maybe b) m s -> Sink (Maybe b) m s
- (-<) :: Monad m => a -> Sink a m b -> Sink a m b
- liftT :: (MonadTrans t, Monad m) => FreeT f m a -> t m (FreeF f a (FreeT f m a))
- cat :: Monad m => Tube a a m r
- map :: Monad m => (a -> b) -> Tube a b m r
- drop :: Monad m => Int -> Tube a a m r
- take :: Monad m => Int -> Tube a a m ()
- takeWhile :: Monad m => (a -> Bool) -> Tube a a m ()
- filter :: Monad m => (a -> Bool) -> Tube a a m r
- reduce :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Source a m () -> m b
- every :: (Foldable t, Monad m) => t b -> Tube a (Maybe b) m ()
- unyield :: Monad m => FreeT (TubeF x b) m () -> m (Maybe (b, FreeT (TubeF x b) m ()))
- prompt :: Source String IO ()
- display :: Sink String IO ()
- type Pump a b = CofreeT (PumpF a b)
- data PumpF a b k = PumpF {}
- pump :: Comonad w => w a -> (w a -> (b, w a)) -> (w a -> c -> w a) -> Pump b c w a
- send :: Comonad w => Pump a b w r -> b -> Pump a b w r
- recv :: Comonad w => Pump a b w r -> (a, Pump a b w r)
- runPump :: (Comonad w, Monad m) => (x -> y -> r) -> Pump a b w x -> Tube a b m y -> m r
- lift :: MonadTrans t => forall m a. Monad m => m a -> t m a
- runFreeT :: FreeT f m a -> m (FreeF f a (FreeT f m a))
Documentation
A Tube
is a computation that can yield multiple intermediate values or await
intermediate inputs before computing a final result. Any monadic function may
be turned into a Tube
.
Tube
s may be composed in different ways. For instance, in ghci:
>>> run $ for (each [1..4] >< map show) $ lift . putStrLn 1 2 3 4
Here, each
converts an Foldable
into a Source
of values; for
performs
a computation with each value. Another example, using two built-in Tube
s for
convenience:
>>> run $ prompt >< filter (/= "Die Antwoord") >< map (++ " is bad") >< print > dubstep dubstep is bad > the sun the sun is bad > Die Antwoord > this example this example is bad
A few stream processing combinators are provided for mapping, filtering, taking, and other basic operations.
For those times when you want to reduce
a stream, you can like so:
>>> reduce (+) 0 id (each [1..10]) 55
><
is useful for combining Tube
s which all have the same return value -
most often ()
simply because every Source
will have that value.
There is more in the library not covered here, and you are encouraged to take a look around.
type Tube a b = FreeT (TubeF a b) Source
A Tube
is a computation which can
yield
an intermediate value downstream and suspend execution; andawait
a value from upstream, deferring execution until it is received.
Moreover, individual Tube
s may be freely composed into larger ones, so long
as their types match. Thus, one may write small, reusable building blocks and
construct efficient stream process pipelines.
Since a much better engineered, more popular, and decidedly more mature library already uses the term "pipes" I have opted instead to think of my work as a series of tubes.
TubeF
is the union of unary functions and binary products into a single
type, here defined with a Boehm-Berarducci encoding.
This type is equivalent to the following:
data TubeF a b k = Await (a -> k) -- :: (a -> k) -> TubeF a b k | Yield (b , k) -- :: (b , k) -> TubeF a b k
The type signatures for the two value constructors should bear a strong
resemblance to the actual type signature of runT
. Instead of encoding
tubes as structures which build up when composed, a TubeF
is a control
flow mechanism which picks one of two provided continuations.
People using this library should never have to contend with these details but it is worth mentioning.
Core infrastructure
(>-) :: Monad m => Tube a b m r -> (b -> Tube b c m r) -> Tube a c m r Source
Connect a task to a continuation yielding another task; see ><
(><) :: Monad m => Tube a b m r -> Tube b c m r -> Tube a c m r infixl 3 Source
Compose two tubes into a new tube.
Utilities
map :: Monad m => (a -> b) -> Tube a b m r Source
Transforms all incoming values according to some function.
takeWhile :: Monad m => (a -> Bool) -> Tube a a m () Source
Terminates the stream upon receiving a value violating the predicate
filter :: Monad m => (a -> Bool) -> Tube a a m r Source
Yields only values satisfying some predicate.
:: Monad m | |
=> (x -> a -> x) | step function |
-> x | initial value |
-> (x -> b) | final transformation |
-> Source a m () | stream source |
-> m b |
Strict left-fold of a stream. Note that the actual return type of the source is not relevant, only the intermediate yield type.
every :: (Foldable t, Monad m) => t b -> Tube a (Maybe b) m () Source
Similar to each
except it explicitly marks the stream as exhausted
unyield :: Monad m => FreeT (TubeF x b) m () -> m (Maybe (b, FreeT (TubeF x b) m ())) Source
Taps the next value from a source, maybe.
prompt :: Source String IO () Source
Source of String
s from stdin. This is mostly for debugging / ghci example purposes.
display :: Sink String IO () Source
Sink for String
s to stdout. This is mostly for debugging / ghci example
purposes.
Pump
type Pump a b = CofreeT (PumpF a b) Source
A Pump
is the dual to a Tube
: where a Tube
is a computation manipulating
a stream of values, a Pump
can be situated on either end of a tube to both
insert values when requested and handle any yielded results.
One interesting use of a Pump
is to feed data to a Tube
, collecting the
result as well as unused input:
import Data.Functor.Identity
p :: [a] -> Pump (Maybe a) x Identity [a]
p inp = pump (return inp)
(wa -> case (extract wa) of
[] -> (Nothing, wa)
x:xs -> (Just x, return xs))
const
-- a Sink
that stops after 5 loops, or when input is exhausted
add5 :: Sink (Maybe Int) IO Int
add5 = loop 0 5 where
loop acc ct = if 0 == ct
then return acc
else do
mn <- await
maybe (return acc)
(n -> loop (acc+n) (ct - 1))
mn
result :: IO ([Int], Int)
result = runPump (curry id) (p [1..10]) add5
-- ([6,7,8,9,10],15)
Pump
s are still being investigated by the author so if you come up with
something interesting, please share!
send :: Comonad w => Pump a b w r -> b -> Pump a b w r Source
Send a value into a Pump
, effectively re-seeding the stream.
Re-exports
lift :: MonadTrans t => forall m a. Monad m => m a -> t m a
Lift a computation from the argument monad to the constructed monad.