linear-base-0.1.0: Standard library for linear types.
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streaming.Linear

Synopsis

Documentation

The Stream data type is an effectful series of steps with some payload value at the bottom. The steps are represented with functors. The effects are represented with some control monad. (Control monads must be bound to exactly once; see the documentation in linear-base to learn more about control monads, control applicatives and control functors.)

In words, a Stream f m r is either a payload of type r, or a step of type f (Stream f m r) or an effect of type m (Stream f m r) where f is a Control.Functor and m is a Control.Monad.

This module exports combinators that pertain to this general case. Some of these are quite abstract and pervade any use of the library, e.g.

  maps    :: (forall x . f x %1-> g x) -> Stream f m r %1-> Stream g m r
  mapped  :: (forall x. f x %1-> m (g x)) -> Stream f m r %1-> Stream g m r
  concats :: Stream (Stream f m) m r %1-> Stream f m r

(assuming here and thoughout that m or n satisfies a Control.Monad constraint, and f or g a Control.Functor constraint).

Others are surprisingly determinate in content:

  chunksOf     :: Int -> Stream f m r %1-> Stream (Stream f m) m r
  splitsAt     :: Int -> Stream f m r %1-> Stream f m (Stream f m r)
  intercalates :: Stream f m () -> Stream (Stream f m) m r %1-> Stream f m r
  unzips       :: Stream (Compose f g) m r %1->  Stream f (Stream g m) r
  separate     :: Stream (Sum f g) m r -> Stream f (Stream g m) r  -- cp. partitionEithers
  unseparate   :: Stream f (Stream g) m r -> Stream (Sum f g) m r
  groups       :: Stream (Sum f g) m r %1-> Stream (Sum (Stream f m) (Stream g m)) m r

One way to see that any streaming library needs some such general type is that it is required to represent the segmentation of a stream, and to express the equivalents of Prelude/Data.List combinators that involve 'lists of lists' and the like. See for example this post on the correct expression of a streaming 'lines' function. The module Streaming.Prelude exports combinators relating to > Stream (Of a) m r where Of a r = !a :> r is a left-strict pair. This expresses the concept of a Producer or Source or Generator and easily inter-operates with types with such names in e.g. conduit, iostreams and pipes.

The Stream and Of types

The Stream data type is equivalent to FreeT and can represent any effectful succession of steps, where the form of the steps or commands is specified by the first (functor) parameter. The effects are performed exactly once since the monad is a Control.Monad from linear-base.

data Stream f m r = Step !(f (Stream f m r)) | Effect (m (Stream f m r)) | Return r

The producer concept uses the simple functor (a,_) - or the stricter Of a _ . Then the news at each step or layer is just: an individual item of type a. Since Stream (Of a) m r is equivalent to Pipe.Producer a m r, much of the pipes Prelude can easily be mirrored in a streaming Prelude. Similarly, a simple Consumer a m r or Parser a m r concept arises when the base functor is (a -> _) . Stream ((->) input) m result consumes input until it returns a result.

To avoid breaking reasoning principles, the constructors should not be used directly. A pattern-match should go by way of inspect - or, in the producer case, next

data Stream f m r where Source #

Constructors

Step :: !(f (Stream f m r)) -> Stream f m r 
Effect :: m (Stream f m r) -> Stream f m r 
Return :: r -> Stream f m r 

Instances

Instances details
Functor f => MonadTrans (Stream f) Source # 
Instance details

Defined in Streaming.Internal.Type

Methods

lift :: Monad m => m a %1 -> Stream f m a Source #

(Functor m, Functor f) => Functor (Stream f m) Source # 
Instance details

Defined in Streaming.Internal.Type

Methods

fmap :: (a %1 -> b) -> Stream f m a %1 -> Stream f m b Source #

(Functor m, Functor f) => Applicative (Stream f m) Source # 
Instance details

Defined in Streaming.Internal.Type

Methods

pure :: a -> Stream f m a Source #

(<*>) :: Stream f m (a %1 -> b) %1 -> Stream f m a %1 -> Stream f m b Source #

liftA2 :: (a %1 -> b %1 -> c) -> Stream f m a %1 -> Stream f m b %1 -> Stream f m c Source #

(Functor m, Functor f) => Monad (Stream f m) Source # 
Instance details

Defined in Streaming.Internal.Type

Methods

(>>=) :: Stream f m a %1 -> (a %1 -> Stream f m b) %1 -> Stream f m b Source #

(>>) :: Stream f m () %1 -> Stream f m a %1 -> Stream f m a Source #

(Functor m, Functor f) => Applicative (Stream f m) Source # 
Instance details

Defined in Streaming.Internal.Type

Methods

pure :: a %1 -> Stream f m a Source #

(<*>) :: Stream f m (a %1 -> b) %1 -> Stream f m a %1 -> Stream f m b Source #

liftA2 :: (a %1 -> b %1 -> c) %1 -> Stream f m a %1 -> Stream f m b %1 -> Stream f m c Source #

(Functor m, Functor f) => Functor (Stream f m) Source # 
Instance details

Defined in Streaming.Internal.Type

Methods

fmap :: (a %1 -> b) %1 -> Stream f m a %1 -> Stream f m b Source #

data Of a b where Source #

A left-strict pair; the base functor for streams of individual elements.

Constructors

(:>) :: !a -> b -> Of a b infixr 5 

Instances

Instances details
Functor (Of a) Source # 
Instance details

Defined in Streaming.Internal.Type

Methods

fmap :: (a0 %1 -> b) -> Of a a0 %1 -> Of a b Source #

Functor (Of a) Source # 
Instance details

Defined in Streaming.Internal.Type

Methods

fmap :: (a0 %1 -> b) %1 -> Of a a0 %1 -> Of a b Source #

Constructing a Stream on a given functor

yields :: (Monad m, Functor f) => f r %1 -> Stream f m r Source #

yields is like lift for items in the streamed functor. It makes a singleton or one-layer succession.

lift :: (Control.Monad m, Control.Functor f)    => m r %1-> Stream f m r
yields ::  (Control.Monad m, Control.Functor f) => f r %1-> Stream f m r

Viewed in another light, it is like a functor-general version of yield:

S.yield a = yields (a :> ())

effect :: (Monad m, Functor f) => m (Stream f m r) %1 -> Stream f m r Source #

Wrap an effect that returns a stream

effect = join . lift

wrap :: (Monad m, Functor f) => f (Stream f m r) %1 -> Stream f m r Source #

Wrap a new layer of a stream. So, e.g.

S.cons :: Control.Monad m => a -> Stream (Of a) m r %1-> Stream (Of a) m r
S.cons a str = wrap (a :> str)

and, recursively:

S.each' :: Control.Monad m =>  [a] -> Stream (Of a) m ()
S.each' = foldr (\a b -> wrap (a :> b)) (return ())

The two operations

wrap :: (Control.Monad m, Control.Functor f) =>
  f (Stream f m r) %1-> Stream f m r
effect :: (Control.Monad m, Control.Functor f) =>
  m (Stream f m r) %1-> Stream f m r

are fundamental. We can define the parallel operations yields and lift in terms of them

yields :: (Control.Monad m, Control.Functor f) => f r %1-> Stream f m r
yields = wrap . Control.fmap Control.return
lift ::  (Control.Monad m, Control.Functor f)  => m r %1-> Stream f m r
lift = effect . Control.fmap Control.return

replicates :: (HasCallStack, Monad m, Functor f) => Int -> f () -> Stream f m () Source #

Repeat a functorial layer, command or instruction a fixed number of times.

replicatesM :: forall f m. (Monad m, Functor f) => Int -> m (f ()) -> Stream f m () Source #

replicatesM n repeats an effect containing a functorial layer, command or instruction n times.

unfold :: (Monad m, Functor f) => (s %1 -> m (Either r (f s))) -> s %1 -> Stream f m r Source #

untilJust :: forall f m r. (Monad m, Applicative f) => m (Maybe r) -> Stream f m r Source #

streamBuild :: (forall b. (r %1 -> b) -> (m b %1 -> b) -> (f b %1 -> b) -> b) -> Stream f m r Source #

Reflect a church-encoded stream; cp. GHC.Exts.build

streamFold return_ effect_ step_ (streamBuild psi) = psi return_ effect_ step_

delays :: forall f r. Applicative f => Double -> Stream f IO r Source #

Transforming streams

maps :: forall f g m r. (Monad m, Functor f) => (forall x. f x %1 -> g x) -> Stream f m r %1 -> Stream g m r Source #

Map layers of one functor to another with a transformation.

maps id = id
maps f . maps g = maps (f . g)

mapsPost :: forall m f g r. (Monad m, Functor g) => (forall x. f x %1 -> g x) -> Stream f m r %1 -> Stream g m r Source #

Map layers of one functor to another with a transformation.

mapsPost id = id
mapsPost f . mapsPost g = mapsPost (f . g)
mapsPost f = maps f

mapsPost is essentially the same as maps, but it imposes a Control.Functor constraint on its target functor rather than its source functor. It should be preferred if Control.fmap is cheaper for the target functor than for the source functor.

mapsM :: forall f g m r. (Monad m, Functor f) => (forall x. f x %1 -> m (g x)) -> Stream f m r %1 -> Stream g m r Source #

Map layers of one functor to another with a transformation involving the base monad. maps is more fundamental than mapsM, which is best understood as a convenience for effecting this frequent composition:

mapsM phi = decompose . maps (Compose . phi)

The streaming prelude exports the same function under the better name mapped, which overlaps with the lens libraries.

mapsMPost :: forall m f g r. (Monad m, Functor g) => (forall x. f x %1 -> m (g x)) -> Stream f m r %1 -> Stream g m r Source #

Map layers of one functor to another with a transformation involving the base monad. mapsMPost is essentially the same as mapsM, but it imposes a Control.Functor constraint on its target functor rather than its source functor. It should be preferred if Control.fmap is cheaper for the target functor than for the source functor.

mapsPost is more fundamental than mapsMPost, which is best understood as a convenience for effecting this frequent composition:

mapsMPost phi = decompose . mapsPost (Compose . phi)

The streaming prelude exports the same function under the better name mappedPost, which overlaps with the lens libraries.

mapped :: forall f g m r. (Monad m, Functor f) => (forall x. f x %1 -> m (g x)) -> Stream f m r %1 -> Stream g m r Source #

Map layers of one functor to another with a transformation involving the base monad. This could be trivial, e.g.

let noteBeginning text x = (fromSystemIO (System.putStrLn text)) Control.>> (Control.return x)

this is completely functor-general

maps and mapped obey these rules:

maps id              = id
mapped return        = id
maps f . maps g      = maps (f . g)
mapped f . mapped g  = mapped (f <=< g)
maps f . mapped g    = mapped (fmap f . g)
mapped f . maps g    = mapped (f <=< fmap g)

maps is more fundamental than mapped, which is best understood as a convenience for effecting this frequent composition:

mapped phi = decompose . maps (Compose . phi)

mappedPost :: forall m f g r. (Monad m, Functor g) => (forall x. f x %1 -> m (g x)) -> Stream f m r %1 -> Stream g m r Source #

A version of mapped that imposes a Control.Functor constraint on the target functor rather than the source functor. This version should be preferred if Control.fmap on the target functor is cheaper.

hoistUnexposed :: forall f m n r. (Monad m, Functor f) => (forall a. m a %1 -> n a) -> Stream f m r %1 -> Stream f n r Source #

A less-efficient version of hoist that works properly even when its argument is not a monad morphism.

groups :: forall f g m r. (Monad m, Functor f, Functor g) => Stream (Sum f g) m r %1 -> Stream (Sum (Stream f m) (Stream g m)) m r Source #

Group layers in an alternating stream into adjoining sub-streams of one type or another.

Inspecting a stream

inspect :: forall f m r. Monad m => Stream f m r %1 -> m (Either r (f (Stream f m r))) Source #

Inspect the first stage of a freely layered sequence. Compare Pipes.next and the replica Streaming.Prelude.next. This is the uncons for the general unfold.

unfold inspect = id
Streaming.Prelude.unfoldr StreamingPrelude.next = id

Splitting and joining Streams

splitsAt :: forall f m r. (HasCallStack, Monad m, Functor f) => Int -> Stream f m r %1 -> Stream f m (Stream f m r) Source #

Split a succession of layers after some number, returning a streaming or effectful pair.

>>> rest <- S.print $ S.splitAt 1 $ each' [1..3] 1 >>> S.print rest 2 3

splitAt 0 = return
(\stream -> splitAt n stream >>= splitAt m) = splitAt (m+n)

Thus, e.g.

>>> rest S.print $ (s - splitsAt 2 s >>= splitsAt 2) each' [1..5] 1 2 3 4 >>> S.print rest 5

chunksOf :: forall f m r. (HasCallStack, Monad m, Functor f) => Int -> Stream f m r %1 -> Stream (Stream f m) m r Source #

Break a stream into substreams each with n functorial layers.

>>> S.print $ mapped S.sum $ chunksOf 2 $ each' [1,1,1,1,1] 2 2 1

concats :: forall f m r. (Monad m, Functor f) => Stream (Stream f m) m r %1 -> Stream f m r Source #

Dissolves the segmentation into layers of Stream f m layers.

intercalates :: forall t m r x. (Monad m, Monad (t m), MonadTrans t, Consumable x) => t m x -> Stream (t m) m r %1 -> t m r Source #

Interpolate a layer at each segment. This specializes to e.g.

intercalates :: Stream f m () -> Stream (Stream f m) m r %1-> Stream f m r

Zipping, unzipping, separating and unseparating streams

unzips :: forall f g m r. (Monad m, Functor f, Functor g) => Stream (Compose f g) m r %1 -> Stream f (Stream g m) r Source #

separate :: forall f g m r. (Monad m, Functor f, Functor g) => Stream (Sum f g) m r -> Stream f (Stream g m) r Source #

Given a stream on a sum of functors, make it a stream on the left functor, with the streaming on the other functor as the governing monad. This is useful for acting on one or the other functor with a fold, leaving the other material for another treatment. It generalizes partitionEithers, but actually streams properly.

>>> let odd_even = S.maps (S.distinguish even) $ S.each' [1..10::Int] >>> :t separate odd_even separate odd_even :: Monad m => Stream (Of Int) (Stream (Of Int) m) ()

Now, for example, it is convenient to fold on the left and right values separately:

>>> S.toList $ S.toList $ separate odd_even [2,4,6,8,10] :> ([1,3,5,7,9] :> ())

Or we can write them to separate files or whatever:

>>> S.writeFile "even.txt" . S.show $ S.writeFile "odd.txt" . S.show $ S.separate odd_even >>> :! cat even.txt 2 4 6 8 10 >>> :! cat odd.txt 1 3 5 7 9

Of course, in the special case of Stream (Of a) m r, we can achieve the above effects more simply by using copy

>>> S.toList . S.filter even $ S.toList . S.filter odd $ S.copy $ each [1..10::Int] [2,4,6,8,10] :> ([1,3,5,7,9] :> ())

But separate and unseparate are functor-general.

unseparate :: (Monad m, Functor f, Functor g) => Stream f (Stream g m) r -> Stream (Sum f g) m r Source #

decompose :: forall f m r. (Monad m, Functor f) => Stream (Compose m f) m r %1 -> Stream f m r Source #

Rearrange a succession of layers of the form Compose m (f x).

we could as well define decompose by mapsM:

decompose = mapped getCompose

but mapped is best understood as:

mapped phi = decompose . maps (Compose . phi)

since maps and hoist are the really fundamental operations that preserve the shape of the stream:

maps  :: (Control.Monad m, Control.Functor f) => (forall x. f x %1-> g x) -> Stream f m r %1-> Stream g m r
hoist :: (Control.Monad m, Control.Functor f) => (forall a. m a %1-> n a) -> Stream f m r %1-> Stream f n r

expand :: forall f m r g h. (Monad m, Functor f) => (forall a b. (g a %1 -> b) -> f a %1 -> h b) -> Stream f m r %1 -> Stream g (Stream h m) r Source #

If Of had a Comonad instance, then we'd have

copy = expand extend

See expandPost for a version that requires a Control.Functor g instance instead.

expandPost :: forall f m r g h. (Monad m, Functor g) => (forall a b. (g a %1 -> b) -> f a %1 -> h b) -> Stream f m r %1 -> Stream g (Stream h m) r Source #

If Of had a Comonad instance, then we'd have

copy = expandPost extend

See expand for a version that requires a Control.Functor f instance instead.

Eliminating a Stream

mapsM_ :: (Functor f, Monad m) => (forall x. f x %1 -> m x) -> Stream f m r %1 -> m r Source #

Map each layer to an effect, and run them all.

run :: Monad m => Stream m m r %1 -> m r Source #

Run the effects in a stream that merely layers effects.

streamFold :: (Functor f, Monad m) => (r %1 -> b) -> (m b %1 -> b) -> (f b %1 -> b) -> Stream f m r %1 -> b Source #

streamFold reorders the arguments of destroy to be more akin to foldr It is more convenient to query in ghci to figure out what kind of 'algebra' you need to write.

>>> :t streamFold Control.return Control.join (Control.Monad m, Control.Functor f) => (f (m a) %1-> m a) -> Stream f m a %1-> m a -- iterT

>>> :t streamFold Control.return (Control.join . Control.lift) (Control.Monad m, Control.Monad (t m), Control.Functor f, Control.MonadTrans t) => (f (t m a) %1-> t m a) -> Stream f m a %1-> t m a -- iterTM

>>> :t streamFold Control.return effect (Control.Monad m, Control.Functor f, Control.Functor g) => (f (Stream g m r) %1-> Stream g m r) -> Stream f m r %1-> Stream g m r

>>> :t f -> streamFold Control.return effect (wrap . f) (Control.Monad m, Control.Functor f, Control.Functor g) => (f (Stream g m a) %1-> g (Stream g m a)) -> Stream f m a %1-> Stream g m a -- maps

>>> :t f -> streamFold Control.return effect (effect . Control.fmap wrap . f) (Control.Monad m, Control.Functor f, Control.Functor g) => (f (Stream g m a) %1-> m (g (Stream g m a))) -> Stream f m a %1-> Stream g m a -- mapped

    streamFold done eff construct
       = eff . iterT (Control.return . construct . Control.fmap eff) . Control.fmap done

iterTM :: (Functor f, Monad m, MonadTrans t, Monad (t m)) => (f (t m a) %1 -> t m a) -> Stream f m a %1 -> t m a Source #

Specialized fold following the usage of Control.Monad.Trans.Free

iterTM alg = streamFold Control.return (Control.join . Control.lift)
iterTM alg = iterT alg . hoist Control.lift

iterT :: (Functor f, Monad m) => (f (m a) %1 -> m a) -> Stream f m a %1 -> m a Source #

Specialized fold following the usage of Control.Monad.Trans.Free

iterT alg = streamFold Control.return Control.join alg
iterT alg = runIdentityT . iterTM (IdentityT . alg . Control.fmap runIdentityT)

destroy :: forall f m r b. (Functor f, Monad m) => Stream f m r %1 -> (f b %1 -> b) -> (m b %1 -> b) -> (r %1 -> b) -> b Source #

Map a stream to its church encoding; compare Data.List.foldr. destroyExposed may be more efficient in some cases when applicable, but it is less safe.

   destroy s construct eff done
     = eff .
       iterT (Control.return . construct . Control.fmap eff) .
       Control.fmap done $ s