Safe Haskell | Safe |
---|---|

Language | Haskell2010 |

- data Stream f m r
- unfold :: (Monad m, Functor f) => (s -> m (Either r (f s))) -> s -> Stream f m r
- replicates :: (Monad m, Functor f) => Int -> f () -> Stream f m ()
- repeats :: (Monad m, Functor f) => f () -> Stream f m r
- repeatsM :: (Monad m, Functor f) => m (f ()) -> Stream f m r
- effect :: (Monad m, Functor f) => m (Stream f m r) -> Stream f m r
- wrap :: (Monad m, Functor f) => f (Stream f m r) -> Stream f m r
- yields :: (Monad m, Functor f) => f r -> Stream f m r
- streamBuild :: (forall b. (r -> b) -> (m b -> b) -> (f b -> b) -> b) -> Stream f m r
- cycles :: (Monad m, Functor f) => Stream f m () -> Stream f m r
- delays :: (MonadIO m, Applicative f) => Double -> Stream f m r
- never :: (Monad m, Applicative f) => Stream f m r
- untilJust :: (Monad m, Applicative f) => m (Maybe r) -> Stream f m r
- intercalates :: (Monad m, Monad (t m), MonadTrans t) => t m x -> Stream (t m) m r -> t m r
- concats :: (Monad m, Functor f) => Stream (Stream f m) m r -> Stream f m r
- iterT :: (Functor f, Monad m) => (f (m a) -> m a) -> Stream f m a -> m a
- iterTM :: (Functor f, Monad m, MonadTrans t, Monad (t m)) => (f (t m a) -> t m a) -> Stream f m a -> t m a
- destroy :: (Functor f, Monad m) => Stream f m r -> (f b -> b) -> (m b -> b) -> (r -> b) -> b
- streamFold :: (Functor f, Monad m) => (r -> b) -> (m b -> b) -> (f b -> b) -> Stream f m r -> b
- inspect :: Monad m => Stream f m r -> m (Either r (f (Stream f m r)))
- maps :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r
- mapsM :: (Monad m, Functor f) => (forall x. f x -> m (g x)) -> Stream f m r -> Stream g m r
- mapsPost :: forall m f g r. (Monad m, Functor g) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r
- mapsMPost :: forall m f g r. (Monad m, Functor g) => (forall x. f x -> m (g x)) -> Stream f m r -> Stream g m r
- hoistUnexposed :: (Monad m, Functor f) => (forall a. m a -> n a) -> Stream f m r -> Stream f n r
- decompose :: (Monad m, Functor f) => Stream (Compose m f) m r -> Stream f m r
- mapsM_ :: (Functor f, Monad m) => (forall x. f x -> m x) -> Stream f m r -> m r
- run :: Monad m => Stream m m r -> m r
- distribute :: (Monad m, Functor f, MonadTrans t, MFunctor t, Monad (t (Stream f m))) => Stream f (t m) r -> t (Stream f m) r
- groups :: (Monad m, Functor f, Functor g) => Stream (Sum f g) m r -> Stream (Sum (Stream f m) (Stream g m)) m r
- chunksOf :: (Monad m, Functor f) => Int -> Stream f m r -> Stream (Stream f m) m r
- splitsAt :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Stream f m r)
- takes :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m ()
- cutoff :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Maybe r)
- zipsWith :: forall f g h m r. (Monad m, Functor h) => (forall x y. f x -> g y -> h (x, y)) -> Stream f m r -> Stream g m r -> Stream h m r
- zipsWith' :: forall f g h m r. Monad m => (forall x y p. (x -> y -> p) -> f x -> g y -> h p) -> Stream f m r -> Stream g m r -> Stream h m r
- zips :: (Monad m, Functor f, Functor g) => Stream f m r -> Stream g m r -> Stream (Compose f g) m r
- unzips :: (Monad m, Functor f, Functor g) => Stream (Compose f g) m r -> Stream f (Stream g m) r
- interleaves :: (Monad m, Applicative h) => Stream h m r -> Stream h m r -> Stream h m r
- separate :: (Monad m, Functor f, Functor g) => Stream (Sum f g) m r -> Stream f (Stream g m) r
- unseparate :: (Monad m, Functor f, Functor g) => Stream f (Stream g m) r -> Stream (Sum f g) m r
- expand :: (Monad m, Functor f) => (forall a b. (g a -> b) -> f a -> h b) -> Stream f m r -> Stream g (Stream h m) r
- expandPost :: (Monad m, Functor g) => (forall a b. (g a -> b) -> f a -> h b) -> Stream f m r -> Stream g (Stream h m) r
- switch :: Sum f g r -> Sum g f r
- unexposed :: (Functor f, Monad m) => Stream f m r -> Stream f m r
- hoistExposed :: (Functor m, Functor f) => (forall b. m b -> n b) -> Stream f m a -> Stream f n a
- hoistExposedPost :: (Functor n, Functor f) => (forall b. m b -> n b) -> Stream f m a -> Stream f n a
- mapsExposed :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r
- mapsMExposed :: (Monad m, Functor f) => (forall x. f x -> m (g x)) -> Stream f m r -> Stream g m r
- destroyExposed :: (Functor f, Monad m) => Stream f m r -> (f b -> b) -> (m b -> b) -> (r -> b) -> b

# The free monad transformer

The `Stream`

data type is equivalent to `FreeT`

and can represent any effectful
succession of steps, where the form of the steps or `commands`

is
specified by the first (functor) parameter.

data Stream f m r = Step !(f (Stream f m r)) | Effect (m (Stream f m r)) | Return r

The *producer* concept uses the simple functor ` (a,_) `

- or the stricter
` Of a _ `

. Then the news at each step or layer is just: an individual item of type `a`

.
Since `Stream (Of a) m r`

is equivalent to `Pipe.Producer a m r`

, much of
the `pipes`

`Prelude`

can easily be mirrored in a `streaming`

`Prelude`

. Similarly,
a simple `Consumer a m r`

or `Parser a m r`

concept arises when the base functor is
` (a -> _) `

. `Stream ((->) input) m result`

consumes `input`

until it returns a
`result`

.

To avoid breaking reasoning principles, the constructors
should not be used directly. A pattern-match should go by way of `inspect`

- or, in the producer case, `next`

The constructors are exported by the `Internal`

module.

# Introducing a stream

unfold :: (Monad m, Functor f) => (s -> m (Either r (f s))) -> s -> Stream f m r Source #

Build a `Stream`

by unfolding steps starting from a seed. See also
the specialized `unfoldr`

in the prelude.

unfold inspect = id -- modulo the quotient we work with unfold Pipes.next :: Monad m => Producer a m r -> Stream ((,) a) m r unfold (curry (:>) . Pipes.next) :: Monad m => Producer a m r -> Stream (Of a) m r

replicates :: (Monad m, Functor f) => Int -> f () -> Stream f m () Source #

Repeat a functorial layer, command or instruction a fixed number of times.

replicates n = takes n . repeats

repeats :: (Monad m, Functor f) => f () -> Stream f m r Source #

Repeat a functorial layer (a "command" or "instruction") forever.

repeatsM :: (Monad m, Functor f) => m (f ()) -> Stream f m r Source #

Repeat an effect containing a functorial layer, command or instruction forever.

effect :: (Monad m, Functor f) => m (Stream f m r) -> Stream f m r Source #

Wrap an effect that returns a stream

effect = join . lift

wrap :: (Monad m, Functor f) => f (Stream f m r) -> Stream f m r Source #

Wrap a new layer of a stream. So, e.g.

S.cons :: Monad m => a -> Stream (Of a) m r -> Stream (Of a) m r S.cons a str = wrap (a :> str)

and, recursively:

S.each :: (Monad m, Foldable t) => t a -> Stream (Of a) m () S.each = foldr (\a b -> wrap (a :> b)) (return ())

The two operations

wrap :: (Monad m, Functor f ) => f (Stream f m r) -> Stream f m r effect :: (Monad m, Functor f ) => m (Stream f m r) -> Stream f m r

are fundamental. We can define the parallel operations `yields`

and `lift`

in
terms of them

yields :: (Monad m, Functor f ) => f r -> Stream f m r yields = wrap . fmap return lift :: (Monad m, Functor f ) => m r -> Stream f m r lift = effect . fmap return

yields :: (Monad m, Functor f) => f r -> Stream f m r Source #

`yields`

is like `lift`

for items in the streamed functor.
It makes a singleton or one-layer succession.

lift :: (Monad m, Functor f) => m r -> Stream f m r yields :: (Monad m, Functor f) => f r -> Stream f m r

Viewed in another light, it is like a functor-general version of `yield`

:

S.yield a = yields (a :> ())

streamBuild :: (forall b. (r -> b) -> (m b -> b) -> (f b -> b) -> b) -> Stream f m r Source #

Reflect a church-encoded stream; cp. `GHC.Exts.build`

streamFold return_ effect_ step_ (streamBuild psi) = psi return_ effect_ step_

cycles :: (Monad m, Functor f) => Stream f m () -> Stream f m r Source #

Construct an infinite stream by cycling a finite one

cycles = forever

`>>>`

never :: (Monad m, Applicative f) => Stream f m r Source #

`never`

interleaves the pure applicative action with the return of the monad forever.
It is the `empty`

of the `Alternative`

instance, thus

never <|> a = a a <|> never = a

and so on. If w is a monoid then `never :: Stream (Of w) m r`

is
the infinite sequence of `mempty`

, and
`str1 <|> str2`

appends the elements monoidally until one of streams ends.
Thus we have, e.g.

`>>>`

1<Enter> 2<Enter> 3<Enter> 1 2 3 4<Enter> 5<Enter> 6<Enter> 4 5 6`S.stdoutLn $ S.take 2 $ S.stdinLn <|> S.repeat " " <|> S.stdinLn <|> S.repeat " " <|> S.stdinLn`

This is equivalent to

`>>>`

`S.stdoutLn $ S.take 2 $ foldr (<|>) never [S.stdinLn, S.repeat " ", S.stdinLn, S.repeat " ", S.stdinLn ]`

Where `f`

is a monad, `(<|>)`

sequences the conjoined streams stepwise. See the
definition of `paste`

here,
where the separate steps are bytestreams corresponding to the lines of a file.

Given, say,

data Branch r = Branch r r deriving Functor -- add obvious applicative instance

then `never :: Stream Branch Identity r`

is the pure infinite binary tree with
(inaccessible) `r`

s in its leaves. Given two binary trees, `tree1 <|> tree2`

intersects them, preserving the leaves that came first,
so `tree1 <|> never = tree1`

`Stream Identity m r`

is an action in `m`

that is indefinitely delayed. Such an
action can be constructed with e.g. `untilJust`

.

untilJust :: (Monad m, Applicative f) => m (Maybe r) -> Stream f m r

Given two such items, `<|>`

instance races them.
It is thus the iterative monad transformer specially defined in
Control.Monad.Trans.Iter

So, for example, we might write

`>>>`

`let justFour str = if length str == 4 then Just str else Nothing`

`>>>`

`let four = untilJust (fmap justFour getLine)`

`>>>`

one<Enter> two<Enter> three<Enter> four<Enter> "four"`run four`

The `Alternative`

instance in
Control.Monad.Trans.Free
is avowedly wrong, though no explanation is given for this.

# Eliminating a stream

intercalates :: (Monad m, Monad (t m), MonadTrans t) => t m x -> Stream (t m) m r -> t m r Source #

Interpolate a layer at each segment. This specializes to e.g.

intercalates :: (Monad m, Functor f) => Stream f m () -> Stream (Stream f m) m r -> Stream f m r

concats :: (Monad m, Functor f) => Stream (Stream f m) m r -> Stream f m r Source #

Dissolves the segmentation into layers of `Stream f m`

layers.

iterT :: (Functor f, Monad m) => (f (m a) -> m a) -> Stream f m a -> m a Source #

Specialized fold following the usage of `Control.Monad.Trans.Free`

iterT alg = streamFold return join alg iterT alg = runIdentityT . iterTM (IdentityT . alg . fmap runIdentityT)

iterTM :: (Functor f, Monad m, MonadTrans t, Monad (t m)) => (f (t m a) -> t m a) -> Stream f m a -> t m a Source #

Specialized fold following the usage of `Control.Monad.Trans.Free`

iterTM alg = streamFold return (join . lift) iterTM alg = iterT alg . hoist lift

destroy :: (Functor f, Monad m) => Stream f m r -> (f b -> b) -> (m b -> b) -> (r -> b) -> b Source #

Map a stream to its church encoding; compare `Data.List.foldr`

.
`destroyExposed`

may be more efficient in some cases when
applicable, but it is less safe.

destroy s construct eff done = eff . iterT (return . construct . fmap eff) . fmap done $ s

streamFold :: (Functor f, Monad m) => (r -> b) -> (m b -> b) -> (f b -> b) -> Stream f m r -> b Source #

`streamFold`

reorders the arguments of `destroy`

to be more akin
to `foldr`

It is more convenient to query in ghci to figure out
what kind of 'algebra' you need to write.

`>>>`

(Monad m, Functor f) => (f (m a) -> m a) -> Stream f m a -> m a -- iterT`:t streamFold return join`

`>>>`

(Monad m, Monad (t m), Functor f, MonadTrans t) => (f (t m a) -> t m a) -> Stream f m a -> t m a -- iterTM`:t streamFold return (join . lift)`

`>>>`

(Monad m, Functor f, Functor g) => (f (Stream g m r) -> Stream g m r) -> Stream f m r -> Stream g m r`:t streamFold return effect`

`>>>`

(Monad m, Functor f, Functor g) => (f (Stream g m a) -> g (Stream g m a)) -> Stream f m a -> Stream g m a -- maps`:t \f -> streamFold return effect (wrap . f)`

`>>>`

(Monad m, Functor f, Functor g) => (f (Stream g m a) -> m (g (Stream g m a))) -> Stream f m a -> Stream g m a -- mapped`:t \f -> streamFold return effect (effect . fmap wrap . f)`

streamFold done eff construct = eff . iterT (return . construct . fmap eff) . fmap done

# Inspecting a stream wrap by wrap

inspect :: Monad m => Stream f m r -> m (Either r (f (Stream f m r))) Source #

Inspect the first stage of a freely layered sequence.
Compare `Pipes.next`

and the replica `Streaming.Prelude.next`

.
This is the `uncons`

for the general `unfold`

.

unfold inspect = id Streaming.Prelude.unfoldr StreamingPrelude.next = id

# Transforming streams

maps :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r Source #

Map layers of one functor to another with a transformation. Compare
hoist, which has a similar effect on the `monadic`

parameter.

maps id = id maps f . maps g = maps (f . g)

mapsM :: (Monad m, Functor f) => (forall x. f x -> m (g x)) -> Stream f m r -> Stream g m r Source #

Map layers of one functor to another with a transformation involving the base monad.
`maps`

is more fundamental than `mapsM`

, which is best understood as a convenience
for effecting this frequent composition:

mapsM phi = decompose . maps (Compose . phi)

The streaming prelude exports the same function under the better name `mapped`

,
which overlaps with the lens libraries.

mapsPost :: forall m f g r. (Monad m, Functor g) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r Source #

Map layers of one functor to another with a transformation. Compare
hoist, which has a similar effect on the `monadic`

parameter.

mapsPost id = id mapsPost f . mapsPost g = mapsPost (f . g) mapsPost f = mapsPost f

`mapsPost`

is essentially the same as `maps`

, but it imposes a `Functor`

constraint on
its target functor rather than its source functor. It should be preferred if `fmap`

is cheaper for the target functor than for the source functor.

mapsMPost :: forall m f g r. (Monad m, Functor g) => (forall x. f x -> m (g x)) -> Stream f m r -> Stream g m r Source #

Map layers of one functor to another with a transformation involving the base monad.
`mapsMPost`

is essentially the same as `mapsM`

, but it imposes a `Functor`

constraint on
its target functor rather than its source functor. It should be preferred if `fmap`

is cheaper for the target functor than for the source functor.

`mapsPost`

is more fundamental than `mapsMPost`

, which is best understood as a convenience
for effecting this frequent composition:

mapsMPost phi = decompose . mapsPost (Compose . phi)

The streaming prelude exports the same function under the better name `mappedPost`

,
which overlaps with the lens libraries.

hoistUnexposed :: (Monad m, Functor f) => (forall a. m a -> n a) -> Stream f m r -> Stream f n r Source #

A less-efficient version of `hoist`

that works properly even when its
argument is not a monad morphism.

hoistUnexposed = hoist . unexposed

decompose :: (Monad m, Functor f) => Stream (Compose m f) m r -> Stream f m r Source #

Rearrange a succession of layers of the form `Compose m (f x)`

.

we could as well define `decompose`

by `mapsM`

:

decompose = mapped getCompose

but `mapped`

is best understood as:

mapped phi = decompose . maps (Compose . phi)

since `maps`

and `hoist`

are the really fundamental operations that preserve the
shape of the stream:

maps :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r hoist :: (Monad m, Functor f) => (forall a. m a -> n a) -> Stream f m r -> Stream f n r

mapsM_ :: (Functor f, Monad m) => (forall x. f x -> m x) -> Stream f m r -> m r Source #

Map each layer to an effect, and run them all.

run :: Monad m => Stream m m r -> m r Source #

Run the effects in a stream that merely layers effects.

distribute :: (Monad m, Functor f, MonadTrans t, MFunctor t, Monad (t (Stream f m))) => Stream f (t m) r -> t (Stream f m) r Source #

Make it possible to 'run' the underlying transformed monad.

groups :: (Monad m, Functor f, Functor g) => Stream (Sum f g) m r -> Stream (Sum (Stream f m) (Stream g m)) m r Source #

Group layers in an alternating stream into adjoining sub-streams of one type or another.

# Splitting streams

chunksOf :: (Monad m, Functor f) => Int -> Stream f m r -> Stream (Stream f m) m r Source #

Break a stream into substreams each with n functorial layers.

`>>>`

2 2 1`S.print $ mapped S.sum $ chunksOf 2 $ each [1,1,1,1,1]`

splitsAt :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m (Stream f m r) Source #

Split a succession of layers after some number, returning a streaming or effectful pair.

`>>>`

1`rest <- S.print $ S.splitAt 1 $ each [1..3]`

`>>>`

2 3`S.print rest`

splitAt 0 = return splitAt n >=> splitAt m = splitAt (m+n)

Thus, e.g.

`>>>`

1 2 3 4`rest <- S.print $ splitsAt 2 >=> splitsAt 2 $ each [1..5]`

`>>>`

5`S.print rest`

# Zipping and unzipping streams

zipsWith :: forall f g h m r. (Monad m, Functor h) => (forall x y. f x -> g y -> h (x, y)) -> Stream f m r -> Stream g m r -> Stream h m r Source #

Zip two streams together. The `zipsWith'`

function should generally
be preferred for efficiency.

zipsWith' :: forall f g h m r. Monad m => (forall x y p. (x -> y -> p) -> f x -> g y -> h p) -> Stream f m r -> Stream g m r -> Stream h m r Source #

Zip two streams together.

zips :: (Monad m, Functor f, Functor g) => Stream f m r -> Stream g m r -> Stream (Compose f g) m r Source #

unzips :: (Monad m, Functor f, Functor g) => Stream (Compose f g) m r -> Stream f (Stream g m) r Source #

interleaves :: (Monad m, Applicative h) => Stream h m r -> Stream h m r -> Stream h m r Source #

Interleave functor layers, with the effects of the first preceding the effects of the second. When the first stream runs out, any remaining effects in the second are ignored.

interleaves = zipsWith (liftA2 (,))

`>>>`

`let paste = \a b -> interleaves (Q.lines a) (maps (Q.cons' '\t') (Q.lines b))`

`>>>`

hello goodbye world world`Q.stdout $ Q.unlines $ paste "hello\nworld\n" "goodbye\nworld\n"`

separate :: (Monad m, Functor f, Functor g) => Stream (Sum f g) m r -> Stream f (Stream g m) r Source #

Given a stream on a sum of functors, make it a stream on the left functor,
with the streaming on the other functor as the governing monad. This is
useful for acting on one or the other functor with a fold, leaving the
other material for another treatment. It generalizes
`partitionEithers`

, but actually streams properly.

`>>>`

`let odd_even = S.maps (S.distinguish even) $ S.each [1..10::Int]`

`>>>`

separate odd_even :: Monad m => Stream (Of Int) (Stream (Of Int) m) ()`:t separate odd_even`

Now, for example, it is convenient to fold on the left and right values separately:

`>>>`

[2,4,6,8,10] :> ([1,3,5,7,9] :> ())`S.toList $ S.toList $ separate odd_even`

Or we can write them to separate files or whatever:

`>>>`

`runResourceT $ S.writeFile "even.txt" . S.show $ S.writeFile "odd.txt" . S.show $ S.separate odd_even`

`>>>`

2 4 6 8 10`:! cat even.txt`

`>>>`

1 3 5 7 9`:! cat odd.txt`

Of course, in the special case of `Stream (Of a) m r`

, we can achieve the above
effects more simply by using `copy`

`>>>`

[2,4,6,8,10] :> ([1,3,5,7,9] :> ())`S.toList . S.filter even $ S.toList . S.filter odd $ S.copy $ each [1..10::Int]`

But `separate`

and `unseparate`

are functor-general.

unseparate :: (Monad m, Functor f, Functor g) => Stream f (Stream g m) r -> Stream (Sum f g) m r Source #

expand :: (Monad m, Functor f) => (forall a b. (g a -> b) -> f a -> h b) -> Stream f m r -> Stream g (Stream h m) r Source #

If `Of`

had a `Comonad`

instance, then we'd have

copy = expand extend

See `expandPost`

for a version that requires a `Functor g`

instance instead.

expandPost :: (Monad m, Functor g) => (forall a b. (g a -> b) -> f a -> h b) -> Stream f m r -> Stream g (Stream h m) r Source #

If `Of`

had a `Comonad`

instance, then we'd have

copy = expandPost extend

See `expand`

for a version that requires a `Functor f`

instance
instead.

# Assorted Data.Functor.x help

switch :: Sum f g r -> Sum g f r Source #

Swap the order of functors in a sum of functors.

`>>>`

'a' 'a' 'a' "bnn" :> ()`S.toList $ S.print $ separate $ maps S.switch $ maps (S.distinguish (=='a')) $ S.each "banana"`

`>>>`

'b' 'n' 'n' "aaa" :> ()`S.toList $ S.print $ separate $ maps (S.distinguish (=='a')) $ S.each "banana"`

# For use in implementation

unexposed :: (Functor f, Monad m) => Stream f m r -> Stream f m r Source #

This is akin to the `observe`

of `Pipes.Internal`

. It reeffects the layering
in instances of `Stream f m r`

so that it replicates that of
`FreeT`

.

hoistExposed :: (Functor m, Functor f) => (forall b. m b -> n b) -> Stream f m a -> Stream f n a Source #

The same as `hoist`

, but explicitly named to indicate that it
is not entirely safe. In particular, its argument must be a monad
morphism.

hoistExposedPost :: (Functor n, Functor f) => (forall b. m b -> n b) -> Stream f m a -> Stream f n a Source #

The same as `hoistExposed`

, but with a `Functor`

constraint on
the target rather than the source. This must be used only with
a monad morphism.

mapsExposed :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r Source #

Deprecated: Use maps instead.

mapsMExposed :: (Monad m, Functor f) => (forall x. f x -> m (g x)) -> Stream f m r -> Stream g m r Source #

Deprecated: Use mapsM instead.

destroyExposed :: (Functor f, Monad m) => Stream f m r -> (f b -> b) -> (m b -> b) -> (r -> b) -> b Source #

Map a stream directly to its church encoding; compare `Data.List.foldr`

It permits distinctions that should be hidden, as can be seen from
e.g.

isPure stream = destroyExposed (const True) (const False) (const True)

and similar nonsense. The crucial
constraint is that the `m x -> x`

argument is an *Eilenberg-Moore algebra*.
See Atkey, "Reasoning about Stream Processing with Effects"

When in doubt, use `destroy`

instead.