| Safe Haskell | Safe |
|---|---|
| Language | Haskell2010 |
Pipes.Tutorial
Contents
Description
Conventional Haskell stream programming forces you to choose only two of the following three features:
- Effects
- Streaming
- Composability
If you sacrifice Effects you get Haskell's pure and lazy lists, which you can transform using composable functions in constant space, but without interleaving effects.
If you sacrifice Streaming you get mapM, forM and
"ListT done wrong", which are composable and effectful, but do not return
a single result until the whole list has first been processed and loaded
into memory.
If you sacrifice Composability you write a tightly coupled read,
transform, and write loop in IO, which is streaming and effectful, but is
not modular or separable.
pipes gives you all three features: effectful, streaming, and composable
programming. pipes also provides a wide variety of stream programming
abstractions which are all subsets of a single unified machinery:
- effectful
Producers (like generators), - effectful
Consumers (like iteratees), - effectful
Pipes (like Unix pipes), and: ListTdone right.
All of these are connectable and you can combine them together in clever and unexpected ways because they all share the same underlying type.
pipes requires a basic understanding of monad transformers, which you can
learn about by reading either:
- the paper "Monad Transformers - Step by Step",
- part III "Monads in the Real World" of the tutorial "All About Monads",
- chapter 18 of "Real World Haskell" on monad transformers, or:
- the documentation of the
transformerslibrary.
If you want a Quick Start guide to pipes, read the documentation in
Pipes.Prelude from top to bottom.
This tutorial is more extensive and explains the pipes API in greater
detail and illustrates several idioms.
Introduction
The pipes library decouples stream processing stages from each other so
that you can mix and match diverse stages to produce useful streaming
programs. If you are a library writer, pipes lets you package up
streaming components into a reusable interface. If you are an application
writer, pipes lets you connect pre-made streaming components with minimal
effort to produce a highly-efficient program that streams data in constant
memory.
To enforce loose coupling, components can only communicate using two commands:
pipes has four types of components built around these two commands:
Producers can onlyyieldvalues and they model streaming sourcesConsumers can onlyawaitvalues and they model streaming sinksPipes can bothyieldandawaitvalues and they model stream transformationsEffects can neitheryieldnorawaitand they model non-streaming components
You can connect these components together in four separate ways which parallel the four above types:
forhandlesyields- (
>~) handlesawaits - (
>->) handles bothyields andawaits - (
>>=) handles return values
As you connect components their types will change to reflect inputs and
outputs that you've fused away. You know that you're done connecting things
when you get an Effect, meaning that you have handled all inputs and
outputs. You run this final Effect to begin streaming.
Producers
Producers are effectful streams of input. Specifically, a Producer is a
monad transformer that extends any base monad with a new yield command.
This yield command lets you send output downstream to an anonymous
handler, decoupling how you generate values from how you consume them.
The following stdinLn Producer shows how to incrementally read in
Strings from standard input and yield them downstream, terminating
gracefully when reaching the end of the input:
-- echo.hs
import Control.Monad (unless)
import Pipes
import System.IO (isEOF)
-- +--------+-- A 'Producer' that yields 'String's
-- | |
-- | | +-- Every monad transformer has a base monad.
-- | | | This time the base monad is 'IO'.
-- | | |
-- | | | +-- Every monadic action has a return value.
-- | | | | This action returns '()' when finished
-- v v v v
stdinLn :: Producer String IO ()
stdinLn = do
eof <- lift isEOF -- 'lift' an 'IO' action from the base monad
unless eof $ do
str <- lift getLine
yield str -- 'yield' the 'String'
stdinLn -- Loopyield emits a value, suspending the current Producer until the value is
consumed. If nobody consumes the value (which is possible) then yield
never returns. You can think of yield as having the following type:
yield::Monadm => a ->Producera m ()
The true type of yield is actually more general and powerful. Throughout
the tutorial I will present type signatures like this that are simplified at
first and then later reveal more general versions. So read the above type
signature as simply saying: "You can use yield within a Producer, but
you may be able to use yield in other contexts, too."
Click the link to yield to navigate to its documentation. There you will
see that yield actually uses the Producer' (with an apostrophe) type
synonym which hides a lot of polymorphism behind a simple veneer. The
documentation for yield says that you can also use yield within a
Pipe, too, because of this polymorphism:
yield::Monadm => a ->Pipex a m ()
Use simpler types like these to guide you until you understand the fully general type.
for loops are the simplest way to consume a Producer like stdinLn.
for has the following type:
-- +-- Producer +-- The body of the +-- Result -- | to loop | loop | -- v over v v -- -------------- ------------------ ----------for::Monadm =>Producera m r -> (a ->Effectm ()) ->Effectm r
(for producer body) loops over (producer), substituting each yield in
(producer) with (body).
You can also deduce that behavior purely from the type signature:
- The body of the loop takes exactly one argument of type
(a), which is the same as the output type of theProducer. Therefore, the body of the loop must get its input from thatProducerand nowhere else. - The return value of the input
Producermatches the return value of the result, thereforeformust loop over the entireProducerand not skip anything.
The above type signature is not the true type of for, which is actually
more general. Think of the above type signature as saying: "If the first
argument of for is a Producer and the second argument returns an
Effect, then the final result must be an Effect."
Click the link to for to navigate to its documentation. There you will
see the fully general type and underneath you will see equivalent simpler
types. One of these says that if the body of the loop is a Producer, then
the result is a Producer, too:
for::Monadm =>Producera m r -> (a ->Producerb m ()) ->Producerb m r
The first type signature I showed for for was a special case of this
slightly more general signature because a Producer that never yields is
also an Effect:
dataX-- The uninhabited type typeEffectm r =ProducerXm r
This is why for permits two different type signatures. The first type
signature is just a special case of the second one:
for::Monadm =>Producera m r -> (a ->Producerb m ()) ->Producerb m r -- Specialize 'b' to 'X'for::Monadm =>Producera m r -> (a ->ProducerXm ()) ->ProducerXm r -- Producer X = Effectfor::Monadm =>Producera m r -> (a ->Effectm ()) ->Effectm r
This is the same trick that all pipes functions use to work with various
combinations of Producers, Consumers, Pipes, and Effects. Each
function really has just one general type, which you can then simplify down
to multiple useful alternative types.
Here's an example use of a for loop, where the second argument (the
loop body) is an Effect:
-- echo.hs
loop :: Effect IO ()
loop = for stdinLn $ \str -> do -- Read this like: "for str in stdinLn"
lift $ putStrLn str -- The body of the 'for' loop
-- more concise: loop = for stdinLn (lift . putStrLn)In this example, for loops over stdinLn and replaces every yield in
stdinLn with the body of the loop, printing each line. This is exactly
equivalent to the following code, which I've placed side-by-side with the
original definition of stdinLn for comparison:
loop = do | stdinLn = do
eof <- lift isEOF | eof <- lift isEOF
unless eof $ do | unless eof $ do
str <- lift getLine | str <- lift getLine
(lift . putStrLn) str | yield str
loop | stdinLnYou can think of yield as creating a hole and a for loop is one way to
fill that hole.
Notice how the final loop only lifts actions from the base monad and
does nothing else. This property is true for all Effects, which are just
glorified wrappers around actions in the base monad. This means we can run
these Effects to remove their lifts and lower them back to the
equivalent computation in the base monad:
runEffect::Monadm =>Effectm r -> m r
This is the real type signature of runEffect, which refuses to accept
anything other than an Effect. This ensures that we handle all inputs and
outputs before streaming data:
-- echo.hs main :: IO () main = runEffect loop
... or you could inline the entire loop into the following one-liner:
main = runEffect $ for stdinLn (lift . putStrLn)
Our final program loops over standard input and echoes every line to
standard output until we hit Ctrl-D to end the input stream:
$ ghc -O2 echo.hs $ ./echo Test<Enter> Test ABC<Enter> ABC <Ctrl-D> $
The final behavior is indistinguishable from just removing all the lifts
from loop:
main = do | loop = do
eof <- isEof | eof <- lift isEof
unless eof $ do | unless eof $ do
str <- getLine | str <- lift getLine
putStrLn str | (lift . putStrLn) str
main | loopThis main is what we might have written by hand if we were not using
pipes, but with pipes we can decouple the input and output logic from
each other. When we connect them back together, we still produce streaming
code equivalent to what a sufficiently careful Haskell programmer would
have written.
You can also use for to loop over lists, too. To do so, convert the list
to a Producer using each, which is exported by default from Pipes:
each :: Monad m => [a] -> Producer a m () each as = mapM_ yield as
Combine for and each to iterate over lists using a "foreach" loop:
>>>runEffect $ for (each [1..4]) (lift . print)1 2 3 4
each is actually more general and works for any Foldable:
each:: (Monadm,Foldablef) => f a ->Producera m ()
So you can loop over any Foldable container or even a Maybe:
>>>runEffect $ for (each (Just 1)) (lift . print)1
Composability
You might wonder why the body of a for loop can be a Producer. Let's
test out this feature by defining a new loop body that creates three copies
of every value:
-- nested.hs
import Pipes
import qualified Pipes.Prelude as P -- Pipes.Prelude already has 'stdinLn'
triple :: Monad m => a -> Producer a m ()
triple x = do
yield x
yield x
yield x
loop :: Producer String IO ()
loop = for P.stdinLn triple
-- This is the exact same as:
--
-- loop = for P.stdinLn $ \x -> do
-- yield x
-- yield x
-- yield xThis time our loop is a Producer that outputs Strings, specifically
three copies of each line that we read from standard input. Since loop is
a Producer we cannot run it because there is still unhandled output.
However, we can use yet another for to handle this new repeated stream:
-- nested.hs main = runEffect $ for loop (lift . putStrLn)
This creates a program which echoes every line from standard input to standard output three times:
$ ./nested Test<Enter> Test Test Test ABC<Enter> ABC ABC ABC <Ctrl-D> $
But is this really necessary? Couldn't we have instead written this using a nested for loop?
main = runEffect $
for P.stdinLn $ \str1 ->
for (triple str1) $ \str2 ->
lift $ putStrLn str2Yes, we could have! In fact, this is a special case of the following equality, which always holds no matter what:
-- s :: Monad m =>Producera m () -- i.e. 'P.stdinLn' -- f :: Monad m => a ->Producerb m () -- i.e. 'triple' -- g :: Monad m => b ->Producerc m () -- i.e. '(lift . putStrLn)' for (for s f) g = for s (\x -> for (f x) g)
We can understand the rationale behind this equality if we first define the
following operator that is the point-free counterpart to for:
(~>) :: Monad m
=> (a -> Producer b m ())
-> (b -> Producer c m ())
-> (a -> Producer c m ())
(f ~> g) x = for (f x) g
Using (~>) (pronounced "into"), we can transform our original equality
into the following more symmetric equation:
f :: Monad m => a ->Producerb m () g :: Monad m => b ->Producerc m () h :: Monad m => c ->Producerd m () -- Associativity (f ~> g) ~> h = f ~> (g ~> h)
This looks just like an associativity law. In fact, (~>) has another nice
property, which is that yield is its left and right identity:
-- Left Identity yield ~> f = f
-- Right Identity f ~> yield = f
In other words, yield and (~>) form a Category, specifically the
generator category, where (~>) plays the role of the composition operator
and yield is the identity. If you don't know what a Category is, that's
okay, and category theory is not a prerequisite for using pipes. All you
really need to know is that pipes uses some simple category theory to keep
the API intuitive and easy to use.
Notice that if we translate the left identity law to use for instead of
(~>) we get:
for (yield x) f = f x
This just says that if you iterate over a pure single-element Producer,
then you could instead cut out the middle man and directly apply the body of
the loop to that single element.
If we translate the right identity law to use for instead of (~>) we
get:
for s yield = s
This just says that if the only thing you do is re-yield every element of
a stream, you get back your original stream.
These three "for loop" laws summarize our intuition for how for loops
should behave and because these are Category laws in disguise that means
that Producers are composable in a rigorous sense of the word.
In fact, we get more out of this than just a bunch of equations. We also
get a useful operator: (~>). We can use this operator to condense
our original code into the following more succinct form that composes two
transformations:
main = runEffect $ for P.stdinLn (triple ~> lift . putStrLn)
This means that we can also choose to program in a more functional style and
think of stream processing in terms of composing transformations using
(~>) instead of nesting a bunch of for loops.
The above example is a microcosm of the design philosophy behind the pipes
library:
- Define the API in terms of categories
- Specify expected behavior in terms of category laws
- Think compositionally instead of sequentially
Consumers
Sometimes you don't want to use a for loop because you don't want to consume
every element of a Producer or because you don't want to process every
value of a Producer the exact same way.
The most general solution is to externally iterate over the Producer using
the next command:
next::Monadm =>Producera m r -> m (Eitherr (a,Producera m r))
Think of next as pattern matching on the head of the Producer. This
Either returns a Left if the Producer is done or it returns a Right
containing the next value, a, along with the remainder of the Producer.
However, sometimes we can get away with something a little more simple and
elegant, like a Consumer, which represents an effectful sink of values. A
Consumer is a monad transformer that extends the base monad with a new
await command. This await command lets you receive input from an
anonymous upstream source.
The following stdoutLn Consumer shows how to incrementally await
Strings and print them to standard output, terminating gracefully when
receiving a broken pipe error:
import Control.Monad (unless)
import Control.Exception (try, throwIO)
import qualified GHC.IO.Exception as G
import Pipes
-- +--------+-- A 'Consumer' that awaits 'String's
-- | |
-- v v
stdoutLn :: Consumer String IO ()
stdoutLn = do
str <- await -- 'await' a 'String'
x <- lift $ try $ putStrLn str
case x of
-- Gracefully terminate if we got a broken pipe error
Left e@(G.IOError { G.ioe_type = t}) ->
lift $ unless (t == G.ResourceVanished) $ throwIO e
-- Otherwise loop
Right () -> stdoutLnawait is the dual of yield: we suspend our Consumer until we receive a
new value. If nobody provides a value (which is possible) then await
never returns. You can think of await as having the following type:
await::Monadm =>Consumera m a
One way to feed a Consumer is to repeatedly feed the same input using
(>~) (pronounced "feed"):
-- +- Feed +- Consumer to +- Returns new -- | action | feed | Effect -- v v v -- ---------- -------------- ---------- (>~) ::Monadm =>Effectm b ->Consumerb m c ->Effectm c
(draw >~ consumer) loops over (consumer), substituting each await in
(consumer) with (draw).
So the following code replaces every await in stdoutLn with
(lift getLine) and then removes all the lifts:
>>>runEffect $ lift getLine >~ stdoutLnTest<Enter> Test ABC<Enter> ABC 42<Enter> 42 ...
You might wonder why (>~) uses an Effect instead of a raw action in the
base monad. The reason why is that (>~) actually permits the following
more general type:
(>~) ::Monadm =>Consumera m b ->Consumerb m c ->Consumera m c
(>~) is the dual of (~>), composing Consumers instead of Producers.
This means that you can feed a Consumer with yet another Consumer so
that you can await while you await. For example, we could define the
following intermediate Consumer that requests two Strings and returns
them concatenated:
doubleUp :: Monad m => Consumer String m String
doubleUp = do
str1 <- await
str2 <- await
return (str1 ++ str2)
-- more concise: doubleUp = (++) <$> await <*> awaitWe can now insert this in between (lift getLine) and stdoutLn and see
what happens:
>>>runEffect $ lift getLine >~ doubleUp >~ stdoutLnTest<Enter> ing<Enter> Testing ABC<Enter> DEF<Enter> ABCDEF 42<Enter> 000<Enter> 42000 ...
doubleUp splits every request from stdoutLn into two separate requests
and
returns back the concatenated result.
We didn't need to parenthesize the above chain of (>~) operators, because
(>~) is associative:
-- Associativity (f >~ g) >~ h = f >~ (g >~ h)
... so we can always omit the parentheses since the meaning is unambiguous:
f >~ g >~ h
Also, (>~) has an identity, which is await!
-- Left identity await >~ f = f -- Right Identity f >~ await = f
In other words, (>~) and await form a Category, too, specifically the
iteratee category, and Consumers are also composable.
Pipes
Our previous programs were unsatisfactory because they were biased either
towards the Producer end or the Consumer end. As a result, we had to
choose between gracefully handling end of input (using stdinLn) or
gracefully handling end of output (using stdoutLn), but not both at the
same time.
However, we don't need to restrict ourselves to using Producers
exclusively or Consumers exclusively. We can connect Producers and
Consumers directly together using (>->) (pronounced "pipe"):
(>->) ::Monadm =>Producera m r ->Consumera m r ->Effectm r
This returns an Effect which we can run:
-- echo2.hs import Pipes import qualified Pipes.Prelude as P -- Pipes.Prelude also provides 'stdoutLn' main = runEffect $ P.stdinLn >-> P.stdoutLn
This program is more declarative of our intent: we want to stream values
from stdinLn to stdoutLn. The above "pipeline" not only echoes
standard input to standard output, but also handles both end of input and
broken pipe errors:
$ ./echo2 Test<Enter> Test ABC<Enter> ABC 42<Enter> 42 <Ctrl-D> $
(>->) is "pull-based" meaning that control flow begins at the most
downstream component (i.e. stdoutLn in the above example). Any time a
component awaits a value it blocks and transfers control upstream and
every time a component yields a value it blocks and restores control back
downstream, satisfying the await. So in the above example, (>->)
matches every await from stdoutLn with a yield from stdinLn.
Streaming stops when either stdinLn terminates (i.e. end of input) or
stdoutLn terminates (i.e. broken pipe). This is why (>->) requires
that both the Producer and Consumer share the same type of return value:
whichever one terminates first provides the return value for the entire
Effect.
Let's test this by modifying our Producer and Consumer to each return a
diagnostic String:
-- echo3.hs
import Control.Applicative ((<$)) -- (<$) modifies return values
import Pipes
import qualified Pipes.Prelude as P
import System.IO
main = do
hSetBuffering stdout NoBuffering
str <- runEffect $
("End of input!" <$ P.stdinLn) >-> ("Broken pipe!" <$ P.stdoutLn)
hPutStrLn stderr strThis lets us diagnose whether the Producer or Consumer terminated first:
$ ./echo3 Test<Enter> Test <Ctrl-D> End of input! $ ./echo3 | perl -e 'close STDIN' Test<Enter> Broken pipe! $
You might wonder why (>->) returns an Effect that we have to run instead
of directly returning an action in the base monad. This is because you can
connect things other than Producers and Consumers, like Pipes, which
are effectful stream transformations.
A Pipe is a monad transformer that is a mix between a Producer and
Consumer, because a Pipe can both await and yield. The following
example Pipe is analagous to the Prelude's take, only allowing a fixed
number of values to flow through:
-- take.hs
import Control.Monad (replicateM_)
import Pipes
import Prelude hiding (take)
-- +--------- A 'Pipe' that
-- | +---- 'await's 'a's and
-- | | +-- 'yield's 'a's
-- | | |
-- v v v
take :: Int -> Pipe a a IO ()
take n = do
replicateM_ n $ do -- Repeat this block 'n' times
x <- await -- 'await' a value of type 'a'
yield x -- 'yield' a value of type 'a'
lift $ putStrLn "You shall not pass!" -- Fly, you fools!You can use Pipes to transform Producers, Consumers, or even other
Pipes using the same (>->) operator:
(>->) ::Monadm =>Producera m r ->Pipea b m r ->Producerb m r (>->) ::Monadm =>Pipea b m r ->Consumerb m r ->Consumera m r (>->) ::Monadm =>Pipea b m r ->Pipeb c m r ->Pipea c m r
For example, you can compose take after stdinLn to limit the number
of lines drawn from standard input:
maxInput :: Int -> Producer String IO () maxInput n = P.stdinLn >-> take n
>>>runEffect $ maxInput 3 >-> P.stdoutLnTest<Enter> Test ABC<Enter> ABC 42<Enter> 42 You shall not pass!>>>
... or you can pre-compose take before stdoutLn to limit the number
of lines written to standard output:
maxOutput :: Int -> Consumer String IO () maxOutput n = take n >-> P.stdoutLn
>>>runEffect $ P.stdinLn >-> maxOutput 3<Exact same behavior>
Those both gave the same behavior because (>->) is associative:
(p1 >-> p2) >-> p3 = p1 >-> (p2 >-> p3)
Therefore we can just leave out the parentheses:
>>>runEffect $ P.stdinLn >-> take 3 >-> P.stdoutLn<Exact same behavior>
(>->) is designed to behave like the Unix pipe operator, except with less
quirks. In fact, we can continue the analogy to Unix by defining cat
(named after the Unix cat utility), which reforwards elements endlessly:
cat :: Monad m => Pipe a a m r
cat = forever $ do
x <- await
yield xcat is the identity of (>->), meaning that cat satisfies the
following two laws:
-- Useless use of 'cat' cat >-> p = p -- Forwarding output to 'cat' does nothing p >-> cat = p
Therefore, (>->) and cat form a Category, specifically the category of
Unix pipes, and Pipes are also composable.
A lot of Unix tools have very simple definitions when written using pipes:
-- unix.hs import Control.Monad (forever) import Pipes import qualified Pipes.Prelude as P -- Pipes.Prelude provides 'take', too import Prelude hiding (head) head :: Monad m => Int -> Pipe a a m () head = P.take yes :: Monad m => Producer String m r yes = forever $ yield "y" main = runEffect $ yes >-> head 3 >-> P.stdoutLn
This prints out 3 'y's, just like the equivalent Unix pipeline:
$ ./unix y y y $ yes | head -3 y y y $
This lets us write "Haskell pipes" instead of Unix pipes. These are much easier to build than Unix pipes and we can connect them directly within Haskell for interoperability with the Haskell language and ecosystem.
ListT
pipes also provides a "ListT done right" implementation. This differs
from the implementation in transformers because this ListT:
- obeys the monad laws, and
- streams data immediately instead of collecting all results into memory.
The latter property is actually an elegant consequence of obeying the monad laws.
To bind a list within a ListT computation, combine Select and each:
import Pipes
pair :: ListT IO (Int, Int)
pair = do
x <- Select $ each [1, 2]
lift $ putStrLn $ "x = " ++ show x
y <- Select $ each [3, 4]
lift $ putStrLn $ "y = " ++ show y
return (x, y)You can then loop over a ListT by using every:
every::Monadm =>ListTm a ->Producera m ()
So you can use your ListT within a for loop:
>>>runEffect $ for (every pair) (lift . print)x = 1 y = 3 (1,3) y = 4 (1,4) x = 2 y = 3 (2,3) y = 4 (2,4)
... or a pipeline:
>>>import qualified Pipes.Prelude as P>>>runEffect $ every pair >-> P.print<Exact same behavior>
Note that ListT is lazy and only produces as many elements as we request:
>>>runEffect $ for (every pair >-> P.take 2) (lift . print)x = 1 y = 3 (1,3) y = 4 (1,4)
You can also go the other way, binding Producers directly within a
ListT. In fact, this is actually what Select was already doing:
Select::Producera m () ->ListTm a
This lets you write crazy code like:
import Pipes
import qualified Pipes.Prelude as P
input :: Producer String IO ()
input = P.stdinLn >-> P.takeWhile (/= "quit")
name :: ListT IO String
name = do
firstName <- Select input
lastName <- Select input
return (firstName ++ " " ++ lastName)Here we're binding standard input non-deterministically (twice) as if it were an effectful list:
>>>runEffect $ every name >-> P.stdoutLnDaniel<Enter> Fischer<Enter> Daniel Fischer Wagner<Enter> Daniel Wagner quit<Enter> Donald<Enter> Stewart<Enter> Donald Stewart Duck<Enter> Donald Duck quit<Enter> quit<Enter>>>>
Notice how this streams out values immediately as they are generated, rather than building up a large intermediate result and then printing all the values in one batch at the end.
ListT computations can be combined in more ways than Pipes, so try to
program in ListT as much as possible and defer converting it to a Pipe
as late as possible using loop.
You can combine ListT computations even if their inputs and outputs are
completely different:
data In
= InA A
| InB B
| InC C
data Out
= OutD D
| OutE E
| OutF F
-- Independent computations
example1 :: A -> ListT IO D
example2 :: B -> ListT IO E
example3 :: C -> ListT IO F
-- Combined computation
total :: In -> ListT IO Out
total input = case input of
InA a -> fmap OutD (example1 a)
InB b -> fmap OutE (example2 b)
InC c -> fmap OutF (example3 c)Sometimes you have multiple computations that handle different inputs but the same output, in which case you don't need to unify their outputs:
-- Overlapping outputs
example1 :: A -> ListT IO Out
example2 :: B -> ListT IO Out
example3 :: C -> ListT IO Out
-- Combined computation
total :: In -> ListT IO Out
total input = case input of
InA a -> example1 a
InB b -> example2 b
InC c -> example3 cOther times you have multiple computations that handle the same input but
produce different outputs. You can unify their outputs using the Monoid
and Functor instances for ListT:
-- Overlapping inputs
example1 :: In -> ListT IO D
example2 :: In -> ListT IO E
example3 :: In -> ListT IO F
-- Combined computation
total :: In -> ListT IO Out
total input =
fmap OutD (example1 input)
<> fmap OutE (example2 input)
<> fmap OutF (example3 input)You can also chain ListT computations, feeding the output of the first
computation as the input to the next computation:
-- End-to-end aToB :: A -> ListT IO B bToC :: B -> ListT IO C -- Combined computation aToC :: A -> LIstT IO C aToC = aToB >=> bToC
... or you can just use do notation if you prefer.
However, the Pipe type is more general than ListT and can represent
things like termination. Therefore you should consider mixing Pipes with
ListT when you need to take advantage of these extra features:
-- Mix ListT with Pipes
example :: In -> ListT IO Out
pipe :: Pipe In Out IO ()
pipe = Pipes.takeWhile (not . isC) >-> loop example
where
isC (InC _) = True
isC _ = FalseSo promote your ListT logic to a Pipe when you need to take advantage of
these Pipe-specific features.
Tricks
pipes is more powerful than meets the eye so this section presents some
non-obvious tricks you may find useful.
Many pipe combinators will work on unusual pipe types and the next few
examples will use the cat pipe to demonstrate this.
For example, you can loop over the output of a Pipe using for, which is
how map is defined:
map :: Monad m => (a -> b) -> Pipe a b m r map f = for cat $ \x -> yield (f x) -- Read this as: For all values flowing downstream, apply 'f'
This is equivalent to:
map f = forever $ do
x <- await
yield (f x)You can also feed a Pipe input using (>~). This means we could have
instead defined the yes pipe like this:
yes :: Monad m => Producer String m r yes = return "y" >~ cat -- Read this as: Keep feeding "y" downstream
This is equivalent to:
yes = forever $ yield "y"
You can also sequence two Pipes together. This is how drop is
defined:
drop :: Monad m => Int -> Pipe a a m r
drop n = do
replicateM_ n await
catThis is equivalent to:
drop n = do
replicateM_ n await
forever $ do
x <- await
yield xYou can even compose pipes inside of another pipe:
customerService :: Producer String IO ()
customerService = do
each [ "Hello, how can I help you?" -- Begin with a script
, "Hold for one second."
]
P.stdinLn >-> P.takeWhile (/= "Goodbye!") -- Now continue with a humanAlso, you can often use each in conjunction with (~>) to traverse nested
data structures. For example, you can print all non-Nothing elements
from a doubly-nested list:
>>>runEffect $ (each ~> each ~> each ~> lift . print) [[Just 1, Nothing], [Just 2, Just 3]]1 2 3
Another neat thing to know is that every has a more general type:
every:: (Monadm,Enumerablet) => t m a ->Producera m ()
Enumerable generalizes Foldable and if you have an effectful container
of your own that you want others to traverse using pipes, just have your
container implement the toListT method of the Enumerable class:
class Enumerable t where
toListT :: Monad m => t m a -> ListT m aYou can even use Enumerable to traverse effectful types that are not even
proper containers, like MaybeT:
input :: MaybeT IO String
input = do
str <- lift getLine
guard (str /= "Fail")
return str>>>runEffect $ every input >-> P.stdoutLnTest<Enter> Test>>>runEffect $ every input >-> P.stdoutLnFail<Enter>>>>
Conclusion
This tutorial covers the concepts of connecting, building, and reading
pipes code. However, this library is only the core component in an
ecosystem of streaming components. Derived libraries that build immediately
upon pipes include:
pipes-concurrency: Concurrent reactive programming and message passingpipes-parse: Minimal utilities for stream parsingpipes-safe: Resource management and exception safety forpipespipes-group: Grouping streams in constant space
These libraries provide functionality specialized to common streaming
domains. Additionally, there are several libraries on Hackage that provide
even higher-level functionality, which you can find by searching under the
"Pipes" category or by looking for packages with a pipes- prefix in
their name. Current examples include:
pipes-extras: Miscellaneous utilitiespipes-network/pipes-network-tls: Networkingpipes-zlib: Compression and decompressionpipes-binary: Binary serializationpipes-attoparsec: High-performance parsingpipes-aeson: JSON serialization and deserialization
Even these derived packages still do not explore the full potential of
pipes functionality, which actually permits bidirectional communication.
Advanced pipes users can explore this library in greater detail by
studying the documentation in the Pipes.Core module to learn about the
symmetry of the underlying Proxy type and operators.
To learn more about pipes, ask questions, or follow pipes development,
you can subscribe to the haskell-pipes mailing list at:
https://groups.google.com/forum/#!forum/haskell-pipes
... or you can mail the list directly at:
mailto:haskell-pipes@googlegroups.com
Additionally, for questions regarding types or type errors, you might find the following appendix on types very useful.
Appendix: Types
pipes uses parametric polymorphism (i.e. generics) to overload all
operations. You've probably noticed this overloading already:
yieldworks within bothProducers andPipesawaitworks within bothConsumers andPipes- (
>->) connectsProducers,Consumers, andPipes in varying ways
This overloading is great when it works, but when connections fail they produce type errors that appear intimidating at first. This section explains the underlying types so that you can work through type errors intelligently.
Producers, Consumers, Pipes, and Effects are all special cases of a
single underlying type: a Proxy. This overarching type permits fully
bidirectional communication on both an upstream and downstream interface.
You can think of it as having the following shape:
Proxy a' a b' b m r
Upstream | Downstream
+---------+
| |
a' <== <== b' -- Information flowing upstream
| |
a ==> ==> b -- Information flowing downstream
| | |
+----|----+
v
rThe four core types do not use the upstream flow of information. This means
that the a' and b' in the above diagram go unused unless you use the
more advanced features provided in Pipes.Core.
pipes uses type synonyms to hide unused inputs or outputs and clean up
type signatures. These type synonyms come in two flavors:
- Concrete type synonyms that explicitly close unused inputs and outputs of
the
Proxytype - Polymorphic type synonyms that don't explicitly close unused inputs or outputs
The concrete type synonyms use () to close unused inputs and X (the
uninhabited type) to close unused outputs:
type Effect = Proxy X () () X
Upstream | Downstream
+---------+
| |
X <== <== ()
| |
() ==> ==> X
| | |
+----|----+
v
rtype Producer b = Proxy X () () b
Upstream | Downstream
+---------+
| |
X <== <== ()
| |
() ==> ==> b
| | |
+----|----+
v
rtype Consumer a = Proxy () a () X
Upstream | Downstream
+---------+
| |
() <== <== ()
| |
a ==> ==> X
| | |
+----|----+
v
rtype Pipe a b = Proxy () a () b
Upstream | Downstream
+---------+
| |
() <== <== ()
| |
a ==> ==> b
| | |
+----|----+
v
rWhen you compose Proxys using (>->) all you are doing is placing them
side by side and fusing them laterally. For example, when you compose a
Producer, Pipe, and a Consumer, you can think of information flowing
like this:
Producer Pipe Consumer
+-----------+ +----------+ +------------+
| | | | | |
X <== <== () <== <== () <== <== ()
| stdinLn | | take 3 | | stdoutLn |
() ==> ==> String ==> ==> String ==> ==> X
| | | | | | | | |
+-----|-----+ +----|-----+ +------|-----+
v v v
() () ()Composition fuses away the intermediate interfaces, leaving behind an
Effect:
Effect
+-----------------------------------+
| |
X <== <== ()
| stdinLn >-> take 3 >-> stdoutLn |
() ==> ==> X
| |
+----------------|------------------+
v
()pipes also provides polymorphic type synonyms with apostrophes at the end
of their names. These use universal quantification to leave open any unused
input or output ends (which I mark using *):
Producer': marks the upstream end unused but still open
type Producer' b m r = forall x' x . Proxy x' x () b m r
Upstream | Downstream
+---------+
| |
* <== <== ()
| |
* ==> ==> b
| | |
+----|----+
v
rConsumer': marks the downstream end unused but still open
type Consumer' a m r = forall y' y . Proxy () a y' y m r
Upstream | Downstream
+---------+
| |
() <== <== *
| |
a ==> ==> *
| | |
+----|----+
v
rEffect': marks both ends unused but still open
type Effect' m r = forall x' x y' y . Proxy x' x y' y m r
Upstream | Downstream
+---------+
| |
* <== <== *
| |
* ==> ==> *
| | |
+----|----+
v
rNote that there is no polymorphic generalization of a Pipe.
Like before, if you compose a Producer', a Pipe, and a Consumer':
Producer' Pipe Consumer'
+-----------+ +----------+ +------------+
| | | | | |
* <== <== () <== <== () <== <== *
| stdinLn | | take 3 | | stdoutLn |
* ==> ==> String ==> ==> String ==> ==> *
| | | | | | | | |
+-----|-----+ +-----|----+ +------|-----+
v v v
() () ()... they fuse into an Effect':
Effect'
+-----------------------------------+
| |
* <== <== *
| stdinLn >-> take 3 >-> stdoutLn |
* ==> ==> *
| |
+----------------|------------------+
v
()Polymorphic type synonyms come in handy when you want to keep the type as
general as possible. For example, the type signature for yield uses
Producer' to keep the type signature simple while still leaving the
upstream input end open:
yield::Monadm => a ->Producer'a m ()
This type signature lets us use yield within a Pipe, too, because the
Pipe type synonym is a special case of the polymorphic Producer' type
synonym:
typeProducer'b m r = forall x' x .Proxyx' x () b m r typePipea b m r =Proxy() a () b m r
The same is true for await, which uses the polymorphic Consumer' type
synonym:
await::Monadm =>Consumer'a m a
We can use await within a Pipe because a Pipe is a special case of the
polymorphic Consumer' type synonym:
typeConsumer'a m r = forall y' y .Proxy() a y' y m r typePipea b m r =Proxy() a () b m r
However, polymorphic type synonyms cause problems in many other cases:
- They usually give the wrong behavior when used as the argument of a function (known as the "negative" or "contravariant" position) like this:
f :: Producer' a m r -> ... -- Wrong f :: Producer a m r -> ... -- Right
The former function only accepts polymorphic Producers as arguments.
The latter function accepts both polymorphic and concrete Producers,
which is probably what you want.
- Even when you desire a polymorphic argument, this induces a higher-ranked
type, because it translates to a
forallwhich you cannot factor out to the top-level to simplify the type signature:
f :: (forall x' x y' . Proxy x' x y' m r) -> ...
These kinds of type signatures require the RankNTypes extension.
- Even when you have polymorphic type synonyms as the result of a function
(i.e. the "positive" or "covariant" position), recent versions of
ghcsuch still require theRankNTypesextension. For example, thefromHandlefunction from Pipes.Prelude requiresRankNTypesto compile correctly onghc-7.6.3:
fromHandle :: MonadIO m => Handle -> Producer' String m ()
- You can't use polymorphic type synonyms inside other type constructors
without the
ImpredicativeTypesextension:
io :: IO (Producer' a m r) -- Type error without ImpredicativeTypes
- You can't partially apply polymorphic type synonyms:
stack :: MaybeT (Producer' a m) r -- Type error
In these scenarios you should fall back on the concrete type synonyms, which
are better behaved. If concrete type synonyms are unsatisfactory, then ask
ghc to infer the most general type signature and use that.
For the purposes of debugging type errors you can just remember that:
Input --+ +-- Output
| |
v v
Proxy a' a b' b m r
^ ^
| |
+----+-- Ignore theseFor example, let's say that you try to run the stdinLn Producer. This
produces the following type error:
>>>runEffect P.stdinLn<interactive>:4:5: Couldn't match expected type `X' with actual type `String' Expected type: Effect m0 r0 Actual type: Proxy X () () String IO () In the first argument of `runEffect', namely `P.stdinLn' In the expression: runEffect P.stdinLn
runEffect expects an Effect, which is equivalent to the following type:
Effect IO () = Proxy X () () X IO ()
... but stdinLn type-checks as a Producer, which has the following
type:
Producer String IO () = Proxy X () () String IO ()
The fourth type variable (the output) does not match. For an Effect this
type variable should be closed (i.e. X), but stdinLn has a String
output, thus the type error:
Couldn't match expected type `X' with actual type `String'
Any time you get type errors like these you can work through them by expanding out the type synonyms and seeing which type variables do not match.
You may also consult this table of type synonyms to more easily compare them:
type Effect = Proxy X () () X type Producer b = Proxy X () () b type Consumer a = Proxy () a () X type Pipe a b = Proxy () a () b type Server b' b = Proxy X () b' b type Client a' a = Proxy a' a () X type Effect' m r = forall x' x y' y . Proxy x' x y' y m r type Producer' b m r = forall x' x . Proxy x' x () b m r type Consumer' a m r = forall y' y . Proxy () a y' y m r type Server' b' b m r = forall x' x . Proxy x' x b' b m r type Client' a' a m r = forall y' y . Proxy a' a y' y m r
Appendix: Time Complexity
There are three functions that give quadratic time complexity when used in
within pipes:
For example, the time complexity of this code segment scales quadratically
with n:
import Control.Monad (replicateM) import Pipes quadratic :: Int -> Consumer a m [a] quadratic n = replicateM n await
These three functions are generally bad practice to use, because all three of them correspond to "ListT done wrong", building a list in memory instead of streaming results.
However, sometimes situations arise where one deliberately intends to build a list in memory. The solution is to use the "codensity transformation" to transform the code to run with linear time complexity. This involves:
- wrapping the code in the
Codensitymonad transformer (fromControl.Monad.Codensitymodule of thekan-extensionspackage) usinglift - applying
sequence/replicateM/mapM - unwrapping the code using
lowerCodensity
To illustrate this, we'd transform the above example to:
import Control.Monad.Codensity (lowerCodensity) linear :: Monad m => Int -> Consumer a m [a] linear n = lowerCodensity $ replicateM n $ lift await
This will produce the exact same result, but in linear time.
Copyright
This tutorial is licensed under a Creative Commons Attribution 4.0 International License