Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell2010 |
Synopsis
- data StreamT m a = forall s.StreamT {}
- unfold :: Applicative m => s -> (s -> Result s a) -> StreamT m a
- unfold_ :: Applicative m => s -> (s -> s) -> StreamT m s
- constM :: Functor m => m a -> StreamT m a
- hoist' :: (forall x. m1 x -> m2 x) -> StreamT m1 a -> StreamT m2 a
- stepStream :: Functor m => StreamT m a -> m (Result (StreamT m a) a)
- reactimate :: Monad m => StreamT m () -> m void
- streamToList :: Monad m => StreamT m a -> m [a]
- withStreamT :: (Functor m, Functor n) => (forall s. m (Result s a) -> n (Result s b)) -> StreamT m a -> StreamT n b
- concatS :: Monad m => StreamT m [a] -> StreamT m a
- applyExcept :: Monad m => StreamT (ExceptT (e1 -> e2) m) a -> StreamT (ExceptT e1 m) a -> StreamT (ExceptT e2 m) a
- exceptS :: Applicative m => StreamT (ExceptT e m) b -> StreamT m (Either e b)
- selectExcept :: Monad m => StreamT (ExceptT (Either e1 e2) m) a -> StreamT (ExceptT (e1 -> e2) m) a -> StreamT (ExceptT e2 m) a
- fixStream :: Functor m => (forall s. s -> t s) -> (forall s. (s -> m (Result s a)) -> t s -> m (Result (t s) a)) -> StreamT m a
- fixStream' :: Functor m => (forall s. s -> t s) -> (forall s. s -> (s -> m (Result s a)) -> t s -> m (Result (t s) a)) -> StreamT m a
- fixA :: Applicative m => StreamT m (a -> a) -> StreamT m a
Creating streams
Effectful streams in initial encoding.
A stream consists of an internal state s
, and a step function.
This step can make use of an effect in m
(which is often a monad),
alter the state, and return a result value.
Its semantics is continuously outputting values of type b
,
while performing side effects in m
.
An initial encoding was chosen instead of the final encoding known from e.g. list-transformer
, dunai
, machines
, streaming
, ...,
because the initial encoding is much more amenable to compiler optimizations
than the final encoding, which is:
data StreamFinalT m b = StreamFinalT (m (b, StreamFinalT m b))
When two streams are composed, GHC can often optimize the combined step function, resulting in a faster streams than what the final encoding can ever achieve, because the final encoding has to step through every continuation. Put differently, the compiler can perform static analysis on the state types of initially encoded state machines, while the final encoding knows its state only at runtime.
This performance gain comes at a peculiar cost:
Recursive definitions of streams are not possible, e.g. an equation like:
fixA stream = stream * fixA stream
This is impossible since the stream under definition itself appears in the definition body,
and thus the internal state type would be recursively defined, which GHC doesn't allow:
Type level recursion is not supported in existential types.
An stream defined thusly will typically hang and/or leak memory, trying to build up an infinite type at runtime.
It is nevertheless possible to define streams recursively, but one needs to first identify the recursive definition of its state type.
Then for the greatest generality, fixStream
and fixStream'
can be used, and some special cases are covered by functions
such as fixA
, parallely
, many
and some
.
forall s. StreamT | |
Instances
unfold :: Applicative m => s -> (s -> Result s a) -> StreamT m a Source #
Initialise with an internal state, update the state and produce output without side effects.
unfold_ :: Applicative m => s -> (s -> s) -> StreamT m s Source #
Like unfold
, but output the current state.
constM :: Functor m => m a -> StreamT m a Source #
Constantly perform the same effect, without remembering a state.
Running streams
stepStream :: Functor m => StreamT m a -> m (Result (StreamT m a) a) Source #
Perform one step of a stream, resulting in an updated stream and an output value.
reactimate :: Monad m => StreamT m () -> m void Source #
Run a stream with trivial output.
If the output of a stream does not contain information,
all of its meaning is in its effects.
This function runs the stream indefinitely.
Since it will never return with a value, this function also has no output (its output is void).
The only way it can return is if m
includes some effect of termination,
e.g. Maybe
or Either
could terminate with a Nothing
or Left
value,
or IO
can raise an exception.
streamToList :: Monad m => StreamT m a -> m [a] Source #
Run a stream, collecting the outputs in a lazy, infinite list.
Modifying streams
withStreamT :: (Functor m, Functor n) => (forall s. m (Result s a) -> n (Result s b)) -> StreamT m a -> StreamT n b Source #
Change the output type and effect of a stream without changing its state type.
concatS :: Monad m => StreamT m [a] -> StreamT m a Source #
Buffer the output of a stream, returning one value at a time.
This function lets a stream control the speed at which it produces data, since it can decide to produce any amount of output at every step.
Exception handling
applyExcept :: Monad m => StreamT (ExceptT (e1 -> e2) m) a -> StreamT (ExceptT e1 m) a -> StreamT (ExceptT e2 m) a Source #
Streams with exceptions are Applicative
in the exception type.
Run the first stream until it throws a function as an exception, then run the second one. If the second one ever throws an exception, apply the function thrown by the first one to it.
exceptS :: Applicative m => StreamT (ExceptT e m) b -> StreamT m (Either e b) Source #
Whenever an exception occurs, output it and retry on the next step.
selectExcept :: Monad m => StreamT (ExceptT (Either e1 e2) m) a -> StreamT (ExceptT (e1 -> e2) m) a -> StreamT (ExceptT e2 m) a Source #
Fix points, or recursive definitions
:: Functor m | |
=> (forall s. s -> t s) | The recursive definition of the state of the stream. |
-> (forall s. (s -> m (Result s a)) -> t s -> m (Result (t s) a)) | The recursive definition of the step function of the stream. |
-> StreamT m a |
Recursively define a stream from a recursive definition of the state, and of the step function.
If you want to define a stream recursively, this is not possible directly.
For example, consider this definition:
loops :: Monad m => StreamT m [Int]
loops = (:) $ unfold_ 0 (+ 1) * loops
The defined value loops
contains itself in its definition.
This means that the internal state type of loops
must itself be recursively defined.
But GHC cannot do this automatically, because type level and value level are separate.
Instead, we need to spell out the type level recursion explicitly with a type constructor,
over which we will take the fixpoint.
In this example, we can figure out from the definitions that:
1.
has unfold_
0 (+ 1)0 :: Int
as state
2. (:)
does not change the state
3. <*>
takes the product of both states
So the internal state s
of loops
must satisfy the equation s = (Int, s)
.
If the recursion is written as above, it tries to compute the infinite tuple (Int, (Int, (Int, ...)))
, which hangs.
Instead, we need to define a type operator over which we take the fixpoint:
-- You need to write this: data Loops x = Loops Int x -- The library supplies: data Fix f = Fix f (Fix f) type LoopsState = Fix Loops
We can then use fixStream
to define the recursive definition of loops
.
For this, we have to to tediously inline the definitions of unfold_
, (:)
, and <*>
,
until we arrive at an explicit recursive definition of the state and the step function of loops
, separately.
These are the two arguments of fixStream
.
loops :: Monad m => StreamT m [Int] loops = fixStream (Loops 0) $ fixStep (Loops n fixState) -> do Result s' a <- fixStep fixState return $ Result (Loops (n + 1) s') a
:: Functor m | |
=> (forall s. s -> t s) | |
-> (forall s. s -> (s -> m (Result s a)) -> t s -> m (Result (t s) a)) | The recursive definition of the state of the stream. |
-> StreamT m a | The recursive definition of the step function of the stream. |
A generalisation of fixStream
where the step definition is allowed to depend on the state.