Safe Haskell | None |
---|---|
Language | Haskell2010 |
The primary use of concurrent machines is to establish a pipelined architecture that can boost overall throughput by running each stage of the pipeline at the same time. The processing, or production, rate of each stage may not be identical, so facilities are provided to loosen the temporal coupling between pipeline stages using buffers.
This architecture also lends itself to operations where multiple
workers are available for procesisng inputs. If each worker is to
process the same set of inputs, consider fanout
and
fanoutSteps
. If each worker is to process a disjoint set of
inputs, consider scatter
.
Synopsis
- capWye :: forall (m :: Type -> Type) a b c. Monad m => SourceT m a -> SourceT m b -> WyeT m a b c -> SourceT m c
- capY :: forall (m :: Type -> Type) b a c. Monad m => SourceT m b -> WyeT m a b c -> ProcessT m a c
- capX :: forall (m :: Type -> Type) a b c. Monad m => SourceT m a -> WyeT m a b c -> ProcessT m b c
- addY :: forall (m :: Type -> Type) b c a d. Monad m => ProcessT m b c -> WyeT m a c d -> WyeT m a b d
- addX :: forall (m :: Type -> Type) a b c d. Monad m => ProcessT m a b -> WyeT m b c d -> WyeT m a c d
- data Y a b c where
- type Wye a b c = Machine (Y a b) c
- type WyeT (m :: Type -> Type) a b c = MachineT m (Y a b) c
- logMealy :: Semigroup a => Mealy a a
- unfoldMealy :: (s -> a -> (b, s)) -> s -> Mealy a b
- newtype Mealy a b = Mealy {}
- unfoldMoore :: (s -> (b, a -> s)) -> s -> Moore a b
- logMoore :: Monoid m => Moore m m
- data Moore a b = Moore b (a -> Moore a b)
- zipping :: Tee a b (a, b)
- zipWith :: (a -> b -> c) -> Tee a b c
- zipWithT :: forall a b c (m :: Type -> Type). (a -> b -> c) -> PlanT (T a b) c m ()
- capT :: forall (m :: Type -> Type) a b c. Monad m => SourceT m a -> SourceT m b -> TeeT m a b c -> SourceT m c
- capR :: forall (m :: Type -> Type) b a c. Monad m => SourceT m b -> TeeT m a b c -> ProcessT m a c
- capL :: forall (m :: Type -> Type) a b c. Monad m => SourceT m a -> TeeT m a b c -> ProcessT m b c
- addR :: forall (m :: Type -> Type) b c a d. Monad m => ProcessT m b c -> TeeT m a c d -> TeeT m a b d
- addL :: forall (m :: Type -> Type) a b c d. Monad m => ProcessT m a b -> TeeT m b c d -> TeeT m a c d
- teeT :: forall (m :: Type -> Type) a b c (k :: Type -> Type). Monad m => TeeT m a b c -> MachineT m k a -> MachineT m k b -> MachineT m k c
- data T a b c where
- type Tee a b c = Machine (T a b) c
- type TeeT (m :: Type -> Type) a b c = MachineT m (T a b) c
- unfoldT :: Monad m => (r -> m (Maybe (a, r))) -> r -> SourceT m a
- unfold :: (r -> Maybe (a, r)) -> r -> Source a
- enumerateFromTo :: Enum a => a -> a -> Source a
- replicated :: Int -> a -> Source a
- iterated :: (a -> a) -> a -> Source a
- plug :: forall (m :: Type -> Type) (k :: Type -> Type) o. Monad m => MachineT m k o -> SourceT m o
- cap :: Process a b -> Source a -> Source b
- source :: Foldable f => f b -> Source b
- cycled :: Foldable f => f b -> Source b
- repeated :: o -> Source o
- type Source b = forall (k :: Type -> Type). Machine k b
- type SourceT (m :: Type -> Type) b = forall (k :: Type -> Type). MachineT m k b
- strippingPrefix :: forall b (m :: Type -> Type) (k :: Type -> Type -> Type) a. (Eq b, Monad m) => MachineT m (k a) b -> MachineT m (k a) b -> MachineT m (k a) b
- showing :: forall (k :: Type -> Type -> Type) a. (Category k, Show a) => Machine (k a) String
- reading :: forall (k :: Type -> Type -> Type) a. (Category k, Read a) => Machine (k String) a
- traversing :: forall (k :: Type -> Type -> Type) m a b. (Category k, Monad m) => (a -> m b) -> MachineT m (k a) b
- mapping :: forall (k :: Type -> Type -> Type) a b. Category k => (a -> b) -> Machine (k a) b
- sequencing :: forall (k :: Type -> Type -> Type) m a. (Category k, Monad m) => MachineT m (k (m a)) a
- smallest :: forall (k :: Type -> Type -> Type) a. (Category k, Ord a) => Machine (k a) a
- largest :: forall (k :: Type -> Type -> Type) a. (Category k, Ord a) => Machine (k a) a
- intersperse :: forall (k :: Type -> Type -> Type) a. Category k => a -> Machine (k a) a
- finalOr :: forall (k :: Type -> Type -> Type) a. Category k => a -> Machine (k a) a
- final :: forall (k :: Type -> Type -> Type) a. Category k => Machine (k a) a
- autoM :: forall (k :: Type -> Type -> Type) m a b. (Category k, Monad m) => (a -> m b) -> MachineT m (k a) b
- sinkPart_ :: forall (m :: Type -> Type) a b c. Monad m => (a -> (b, c)) -> ProcessT m c Void -> ProcessT m a b
- flattened :: forall (f :: Type -> Type) a. Foldable f => Process (f a) a
- asParts :: forall (f :: Type -> Type) a. Foldable f => Process (f a) a
- fold1 :: forall (k :: Type -> Type -> Type) a. Category k => (a -> a -> a) -> Machine (k a) a
- fold :: forall (k :: Type -> Type -> Type) a b. Category k => (a -> b -> a) -> a -> Machine (k b) a
- scanMap :: forall (k :: Type -> Type -> Type) b a. (Category k, Monoid b) => (a -> b) -> Machine (k a) b
- scan1 :: forall (k :: Type -> Type -> Type) a. Category k => (a -> a -> a) -> Machine (k a) a
- scan :: forall (k :: Type -> Type -> Type) a b. Category k => (a -> b -> a) -> a -> Machine (k b) a
- process :: forall (m :: Type -> Type) k i o. Monad m => (forall a. k a -> i -> a) -> MachineT m k o -> ProcessT m i o
- supply :: forall f (m :: Type -> Type) a b. (Foldable f, Monad m) => f a -> ProcessT m a b -> ProcessT m a b
- (~>) :: forall (m :: Type -> Type) (k :: Type -> Type) b c. Monad m => MachineT m k b -> ProcessT m b c -> MachineT m k c
- (<~) :: forall (m :: Type -> Type) b c (k :: Type -> Type). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c
- buffered :: Int -> Process a [a]
- droppingWhile :: (a -> Bool) -> Process a a
- takingJusts :: Process (Maybe a) a
- takingWhile :: (a -> Bool) -> Process a a
- taking :: Int -> Process a a
- dropping :: Int -> Process a a
- filtered :: (a -> Bool) -> Process a a
- prepended :: Foldable f => f a -> Process a a
- echo :: Process a a
- type Process a b = Machine (Is a) b
- type ProcessT (m :: Type -> Type) a b = MachineT m (Is a) b
- class Automaton (k :: Type -> Type -> Type) where
- class AutomatonM (x :: (Type -> Type) -> Type -> Type -> Type) where
- finishWith :: forall (m :: Type -> Type) o r (k :: Type -> Type). Monad m => (o -> Maybe r) -> MachineT m k o -> MachineT m k (Either r o)
- tagDone :: forall (m :: Type -> Type) o (k :: Type -> Type). Monad m => (o -> Bool) -> MachineT m k o -> MachineT m k (Either o o)
- deconstruct :: forall (m :: Type -> Type) (k :: Type -> Type) a o. Monad m => MachineT m k (Either a o) -> PlanT k o m a
- stopped :: forall (k :: Type -> Type) b. Machine k b
- starve :: forall (m :: Type -> Type) (k0 :: Type -> Type) b (k :: Type -> Type). Monad m => MachineT m k0 b -> MachineT m k b -> MachineT m k b
- pass :: k o -> Machine k o
- preplan :: forall (m :: Type -> Type) (k :: Type -> Type) o. Monad m => PlanT k o m (MachineT m k o) -> MachineT m k o
- before :: forall (m :: Type -> Type) (k :: Type -> Type) o a. Monad m => MachineT m k o -> PlanT k o m a -> MachineT m k o
- unfoldPlan :: forall (m :: Type -> Type) s (k :: Type -> Type) o. Monad m => s -> (s -> PlanT k o m s) -> MachineT m k o
- repeatedly :: forall (m :: Type -> Type) (k :: Type -> Type) o a. Monad m => PlanT k o m a -> MachineT m k o
- construct :: forall (m :: Type -> Type) (k :: Type -> Type) o a. Monad m => PlanT k o m a -> MachineT m k o
- fitM :: forall m m' (k :: Type -> Type) o. (Monad m, Monad m') => (forall a. m a -> m' a) -> MachineT m k o -> MachineT m' k o
- fit :: forall (m :: Type -> Type) k k' o. Monad m => (forall a. k a -> k' a) -> MachineT m k o -> MachineT m k' o
- run :: forall (k :: Type -> Type) b. MachineT Identity k b -> [b]
- runT :: forall m (k :: Type -> Type) b. Monad m => MachineT m k b -> m [b]
- runT_ :: forall m (k :: Type -> Type) b. Monad m => MachineT m k b -> m ()
- stepMachine :: forall (m :: Type -> Type) (k :: Type -> Type) o (k' :: Type -> Type) o'. Monad m => MachineT m k o -> (Step k o (MachineT m k o) -> MachineT m k' o') -> MachineT m k' o'
- encased :: forall (m :: Type -> Type) (k :: Type -> Type) o. Monad m => Step k o (MachineT m k o) -> MachineT m k o
- runMachine :: forall (k :: Type -> Type) o. MachineT Identity k o -> Step k o (MachineT Identity k o)
- data Step (k :: Type -> Type) o r
- newtype MachineT (m :: Type -> Type) (k :: Type -> Type) o = MachineT {
- runMachineT :: m (Step k o (MachineT m k o))
- type Machine (k :: Type -> Type) o = forall (m :: Type -> Type). Monad m => MachineT m k o
- class Appliance (k :: Type -> Type) where
- exhaust :: forall m a (k :: Type -> Type). Monad m => m (Maybe a) -> PlanT k a m ()
- stop :: forall (k :: Type -> Type) o a. Plan k o a
- awaits :: k i -> Plan k o i
- await :: forall (k :: Type -> Type -> Type) i o. Category k => Plan (k i) o i
- maybeYield :: forall o (k :: Type -> Type). Maybe o -> Plan k o ()
- yield :: forall o (k :: Type -> Type). o -> Plan k o ()
- runPlan :: PlanT k o Identity a -> (a -> r) -> (o -> r -> r) -> (forall z. (z -> r) -> k z -> r -> r) -> r -> r
- newtype PlanT (k :: Type -> Type) o (m :: Type -> Type) a = PlanT {
- runPlanT :: forall r. (a -> m r) -> (o -> m r -> m r) -> (forall z. (z -> m r) -> k z -> m r -> m r) -> m r -> m r
- type Plan (k :: Type -> Type) o a = forall (m :: Type -> Type). PlanT k o m a
- data Is a b where
- (>~>) :: MonadBaseControl IO m => MachineT m k b -> ProcessT m b c -> MachineT m k c
- (<~<) :: MonadBaseControl IO m => ProcessT m b c -> MachineT m k b -> MachineT m k c
- bufferConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c
- rollingConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c
- fanout :: (MonadBaseControl IO m, Semigroup r) => [ProcessT m a r] -> ProcessT m a r
- fanoutSteps :: (MonadBaseControl IO m, Monoid r) => [ProcessT m a r] -> ProcessT m a r
- wye :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> WyeT m a' b' c -> WyeT m a b c
- tee :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> TeeT m a' b' c -> TeeT m a b c
- scatter :: MonadBaseControl IO m => [MachineT m k o] -> MachineT m k o
- splitSum :: forall m a b c d. MonadBaseControl IO m => ProcessT m a b -> ProcessT m c d -> ProcessT m (Either a c) (Either b d)
- mergeSum :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (Either a b) r
- splitProd :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (a, b) r
Documentation
capWye :: forall (m :: Type -> Type) a b c. Monad m => SourceT m a -> SourceT m b -> WyeT m a b c -> SourceT m c #
Tie off both inputs of a wye by connecting them to known sources.
capY :: forall (m :: Type -> Type) b a c. Monad m => SourceT m b -> WyeT m a b c -> ProcessT m a c #
Tie off one input of a wye by connecting it to a known source.
capX :: forall (m :: Type -> Type) a b c. Monad m => SourceT m a -> WyeT m a b c -> ProcessT m b c #
Tie off one input of a wye by connecting it to a known source.
addY :: forall (m :: Type -> Type) b c a d. Monad m => ProcessT m b c -> WyeT m a c d -> WyeT m a b d #
Precompose a pipe onto the right input of a wye.
addX :: forall (m :: Type -> Type) a b c d. Monad m => ProcessT m a b -> WyeT m b c d -> WyeT m a c d #
Precompose a pipe onto the left input of a wye.
type Wye a b c = Machine (Y a b) c #
A Machine
that can read from two input stream in a non-deterministic manner.
type WyeT (m :: Type -> Type) a b c = MachineT m (Y a b) c #
A Machine
that can read from two input stream in a non-deterministic manner with monadic side-effects.
unfoldMealy :: (s -> a -> (b, s)) -> s -> Mealy a b #
A Mealy
machine modeled with explicit state.
Mealy
machines
Examples
We can enumerate inputs:
>>>
let countingMealy = unfoldMealy (\i x -> ((i, x), i + 1)) 0
>>>
run (auto countingMealy <~ source "word")
[(0,'w'),(1,'o'),(2,'r'),(3,'d')]
Instances
Arrow Mealy | |
ArrowChoice Mealy | |
ArrowApply Mealy | |
Defined in Data.Machine.Mealy | |
Profunctor Mealy | |
Defined in Data.Machine.Mealy | |
Automaton Mealy | |
Defined in Data.Machine.Mealy | |
Corepresentable Mealy | |
Choice Mealy | |
Closed Mealy | |
Defined in Data.Machine.Mealy | |
Strong Mealy | |
Costrong Mealy | |
Cosieve Mealy NonEmpty | |
Defined in Data.Machine.Mealy | |
Functor (Mealy a) | |
Applicative (Mealy a) | |
Distributive (Mealy a) | |
Representable (Mealy a) | |
Pointed (Mealy a) | |
Defined in Data.Machine.Mealy | |
Extend (Mealy a) | |
Category Mealy | |
Semigroup b => Semigroup (Mealy a b) | |
Monoid b => Monoid (Mealy a b) | |
type Corep Mealy | |
Defined in Data.Machine.Mealy | |
type Rep (Mealy a) | |
Defined in Data.Machine.Mealy |
unfoldMoore :: (s -> (b, a -> s)) -> s -> Moore a b #
Construct a Moore machine from a state valuation and transition function
Moore
machines
Instances
Profunctor Moore | |
Defined in Data.Machine.Moore | |
Automaton Moore | |
Defined in Data.Machine.Moore | |
Corepresentable Moore | |
Closed Moore | |
Defined in Data.Machine.Moore | |
Costrong Moore | |
Cosieve Moore [] | |
Defined in Data.Machine.Moore | |
Monad (Moore a) | slow diagonalization |
Functor (Moore a) | |
MonadFix (Moore a) | |
Defined in Data.Machine.Moore | |
Applicative (Moore a) | |
Distributive (Moore a) | |
Representable (Moore a) | |
MonadZip (Moore a) | |
Comonad (Moore a) | |
ComonadApply (Moore a) | |
Pointed (Moore a) | |
Defined in Data.Machine.Moore | |
Copointed (Moore a) | |
Defined in Data.Machine.Moore | |
MonadReader [a] (Moore a) | |
Semigroup b => Semigroup (Moore a b) | |
Monoid b => Monoid (Moore a b) | |
type Corep Moore | |
Defined in Data.Machine.Moore | |
type Rep (Moore a) | |
Defined in Data.Machine.Moore |
zipWith :: (a -> b -> c) -> Tee a b c #
Zip together two inputs, then apply the given function, halting as soon as either input is exhausted. This implementation reads from the left, then the right
zipWithT :: forall a b c (m :: Type -> Type). (a -> b -> c) -> PlanT (T a b) c m () #
wait for both the left and the right sides of a T and then merge them with f.
capT :: forall (m :: Type -> Type) a b c. Monad m => SourceT m a -> SourceT m b -> TeeT m a b c -> SourceT m c #
Tie off both inputs to a tee by connecting them to known sources. This is recommended over capping each side separately, as it is far more efficient.
capR :: forall (m :: Type -> Type) b a c. Monad m => SourceT m b -> TeeT m a b c -> ProcessT m a c #
Tie off one input of a tee by connecting it to a known source.
capL :: forall (m :: Type -> Type) a b c. Monad m => SourceT m a -> TeeT m a b c -> ProcessT m b c #
Tie off one input of a tee by connecting it to a known source.
addR :: forall (m :: Type -> Type) b c a d. Monad m => ProcessT m b c -> TeeT m a c d -> TeeT m a b d #
Precompose a pipe onto the right input of a tee.
addL :: forall (m :: Type -> Type) a b c d. Monad m => ProcessT m a b -> TeeT m b c d -> TeeT m a c d #
Precompose a pipe onto the left input of a tee.
teeT :: forall (m :: Type -> Type) a b c (k :: Type -> Type). Monad m => TeeT m a b c -> MachineT m k a -> MachineT m k b -> MachineT m k c #
`teeT mt ma mb` Use a Tee
to interleave or combine the outputs of ma
and mb
.
The resulting machine will draw from a single source.
Examples:
>>>
import Data.Machine.Source
>>>
run $ teeT zipping echo echo <~ source [1..5]
[(1,2),(3,4)]
type Tee a b c = Machine (T a b) c #
A Machine
that can read from two input stream in a deterministic manner.
type TeeT (m :: Type -> Type) a b c = MachineT m (T a b) c #
A Machine
that can read from two input stream in a deterministic manner with monadic side-effects.
enumerateFromTo :: Enum a => a -> a -> Source a #
Enumerate from a value to a final value, inclusive, via succ
Examples:
>>>
run $ enumerateFromTo 1 3
[1,2,3]
replicated :: Int -> a -> Source a #
replicated
n x
is a source of x
emitted n
time(s)
iterated :: (a -> a) -> a -> Source a #
iterated
f x
returns an infinite source of repeated applications
of f
to x
plug :: forall (m :: Type -> Type) (k :: Type -> Type) o. Monad m => MachineT m k o -> SourceT m o #
cycled :: Foldable f => f b -> Source b #
Loop through a Foldable
container over and over.
This can be constructed from a plan with
cycled :: Foldable f => f b -> Source b
cycled = repeatedly (traverse_ yield xs)
Examples:
>>>
run $ taking 5 <~ cycled [1,2]
[1,2,1,2,1]
Repeat the same value, over and over.
This can be constructed from a plan with
repeated :: o -> Source o
repeated = repeatedly . yield
Examples:
>>>
run $ taking 5 <~ repeated 1
[1,1,1,1,1]
type SourceT (m :: Type -> Type) b = forall (k :: Type -> Type). MachineT m k b #
A SourceT
never reads from its inputs, but may have monadic side-effects.
strippingPrefix :: forall b (m :: Type -> Type) (k :: Type -> Type -> Type) a. (Eq b, Monad m) => MachineT m (k a) b -> MachineT m (k a) b -> MachineT m (k a) b #
strippingPrefix
mp mb
Drops the given prefix from mp
. It stops if mb
did not start with the prefix given, or continues streaming after the
prefix, if mb
did.
traversing :: forall (k :: Type -> Type -> Type) m a b. (Category k, Monad m) => (a -> m b) -> MachineT m (k a) b #
Apply an effectful to all values coming from the input.
Alias to autoM
.
mapping :: forall (k :: Type -> Type -> Type) a b. Category k => (a -> b) -> Machine (k a) b #
Apply a function to all values coming from the input
This can be constructed from a plan with
mapping :: Category k => (a -> b) -> Machine (k a) b mapping f = repeatedly $ await >>= yield . f
Examples:
>>>
runT $ mapping (*2) <~ source [1..3]
[2,4,6]
sequencing :: forall (k :: Type -> Type -> Type) m a. (Category k, Monad m) => MachineT m (k (m a)) a #
Convert a stream of actions to a stream of values
This can be constructed from a plan with
sequencing :: Monad m => (a -> m b) -> ProcessT m a b sequencing :: (Category k, Monad m) => MachineT m (k (m a)) a sequencing = repeatedly $ do ma <- await a <- lift ma yield a
Examples:
>>>
runT $ sequencing <~ source [Just 3, Nothing]
Nothing
>>>
runT $ sequencing <~ source [Just 3, Just 4]
Just [3,4]
smallest :: forall (k :: Type -> Type -> Type) a. (Category k, Ord a) => Machine (k a) a #
Return the minimum value from the input
largest :: forall (k :: Type -> Type -> Type) a. (Category k, Ord a) => Machine (k a) a #
Return the maximum value from the input
intersperse :: forall (k :: Type -> Type -> Type) a. Category k => a -> Machine (k a) a #
Intersperse an element between the elements of the input
intersperse
:: a ->Process
a a
finalOr :: forall (k :: Type -> Type -> Type) a. Category k => a -> Machine (k a) a #
Skip all but the final element of the input. If the input is empty, the default value is emitted
This can be constructed from a plan with
finalOr
:: a ->Process
a a finalOr :: Category k => a -> Machine (k a) a finalOr = construct . go where go prev = do next await <| yield prev *> stop go next
Examples:
>>>
runT $ finalOr (-1) <~ source [1..10]
[10]>>>
runT $ finalOr (-1) <~ source []
[-1]
final :: forall (k :: Type -> Type -> Type) a. Category k => Machine (k a) a #
Skip all but the final element of the input
This can be constructed from a plan with
final
::Process
a a final :: Category k => Machine (k a) a final = construct $ await >>= go where go prev = do next await <| yield prev *> stop go next
Examples:
>>>
runT $ final <~ source [1..10]
[10]>>>
runT $ final <~ source []
[]
autoM :: forall (k :: Type -> Type -> Type) m a b. (Category k, Monad m) => (a -> m b) -> MachineT m (k a) b #
Apply a monadic function to each element of a ProcessT
.
This can be constructed from a plan with
autoM :: Monad m => (a -> m b) -> ProcessT m a b autoM :: (Category k, Monad m) => (a -> m b) -> MachineT m (k a) b autoM f = repeatedly $ await >>= lift . f >>= yield
Examples:
>>>
runT $ autoM Left <~ source [3, 4]
Left 3
>>>
runT $ autoM Right <~ source [3, 4]
Right [3,4]
sinkPart_ :: forall (m :: Type -> Type) a b c. Monad m => (a -> (b, c)) -> ProcessT m c Void -> ProcessT m a b #
sinkPart_ toParts sink
creates a process that uses the
toParts
function to break input into a tuple of (passAlong,
sinkPart)
for which the second projection is given to the supplied
sink
ProcessT
(that produces no output) while the first
projection is passed down the pipeline.
flattened :: forall (f :: Type -> Type) a. Foldable f => Process (f a) a #
Break each input into pieces that are fed downstream individually.
Alias for asParts
asParts :: forall (f :: Type -> Type) a. Foldable f => Process (f a) a #
Break each input into pieces that are fed downstream individually.
This can be constructed from a plan with
asParts :: Foldable f => Process (f a) a asParts = repeatedly $ await >>= traverse_ yield
Examples:
>>>
run $ asParts <~ source [[1..3],[4..6]]
[1,2,3,4,5,6]
fold1 :: forall (k :: Type -> Type -> Type) a. Category k => (a -> a -> a) -> Machine (k a) a #
fold1
is a variant of fold
that has no starting value argument
This can be constructed from a plan with
fold1 :: Category k => (a -> a -> a) -> Machine (k a) a fold1 func = construct $ await >>= go where go cur = do next await <| yield cur *> stop go $! func cur next
Examples:
>>>
run $ fold1 (+) <~ source [1..5]
[15]
fold :: forall (k :: Type -> Type -> Type) a b. Category k => (a -> b -> a) -> a -> Machine (k b) a #
Construct a Process
from a left-folding operation.
Like scan
, but only yielding the final value.
It may be useful to consider this alternative signature
fold
:: (a -> b -> a) -> a -> Process b a
This can be constructed from a plan with
fold :: Category k => (a -> b -> a) -> a -> Machine (k b) a fold func seed = construct $ go seed where go cur = do next await <| yield cur *> stop go $! func cur next
Examples:
>>>
run $ fold (+) 0 <~ source [1..5]
[15]
>>>
run $ fold (\a _ -> a + 1) 0 <~ source [1..5]
[5]
scanMap :: forall (k :: Type -> Type -> Type) b a. (Category k, Monoid b) => (a -> b) -> Machine (k a) b #
Like scan
only uses supplied function to map and uses Monoid for
associative operation
Examples:
>>>
run $ mapping getSum <~ scanMap Sum <~ source [1..5]
[0,1,3,6,10,15]
scan1 :: forall (k :: Type -> Type -> Type) a. Category k => (a -> a -> a) -> Machine (k a) a #
scan1
is a variant of scan
that has no starting value argument
This can be constructed from a plan with
scan1 :: Category k => (a -> a -> a) -> Machine (k a) a scan1 func = construct $ await >>= go where go cur = do yield cur next <- await go $! func cur next
Examples:
>>>
run $ scan1 (+) <~ source [1..5]
[1,3,6,10,15]
scan :: forall (k :: Type -> Type -> Type) a b. Category k => (a -> b -> a) -> a -> Machine (k b) a #
Construct a Process
from a left-scanning operation.
Like fold
, but yielding intermediate values.
It may be useful to consider this alternative signature
scan
:: (a -> b -> a) -> a -> Process b a
For stateful scan
use auto
with Data.Machine.Mealy machine.
This can be constructed from a plan with
scan :: Category k => (a -> b -> a) -> a -> Machine (k b) a scan func seed = construct $ go seed where go cur = do yield cur next <- await go $! func cur next
Examples:
>>>
run $ scan (+) 0 <~ source [1..5]
[0,1,3,6,10,15]
>>>
run $ scan (\a _ -> a + 1) 0 <~ source [1..5]
[0,1,2,3,4,5]
process :: forall (m :: Type -> Type) k i o. Monad m => (forall a. k a -> i -> a) -> MachineT m k o -> ProcessT m i o #
supply :: forall f (m :: Type -> Type) a b. (Foldable f, Monad m) => f a -> ProcessT m a b -> ProcessT m a b #
(~>) :: forall (m :: Type -> Type) (k :: Type -> Type) b c. Monad m => MachineT m k b -> ProcessT m b c -> MachineT m k c infixl 9 #
Flipped (<~
).
(<~) :: forall (m :: Type -> Type) b c (k :: Type -> Type). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c infixr 9 #
buffered :: Int -> Process a [a] #
Chunk up the input into n
element lists.
Avoids returning empty lists and deals with the truncation of the final group.
An approximation of this can be constructed from a plan with
buffered :: Int -> Process a [a] buffered = repeatedly . go [] where go acc 0 = yield (reverse acc) go acc n = do i await <| yield (reverse acc) *> stop go (i:acc) $! n-1
Examples:
>>>
run $ buffered 3 <~ source [1..6]
[[1,2,3],[4,5,6]]
>>>
run $ buffered 3 <~ source [1..5]
[[1,2,3],[4,5]]
>>>
run $ buffered 3 <~ source []
[]
droppingWhile :: (a -> Bool) -> Process a a #
A Process
that drops elements while a predicate holds
This can be constructed from a plan with
droppingWhile :: (a -> Bool) -> Process a a droppingWhile p = before echo loop where loop = await >>= v -> if p v then loop else yield v
Examples:
>>>
run $ droppingWhile (< 3) <~ source [1..5]
[3,4,5]
takingJusts :: Process (Maybe a) a #
A Process
that passes through elements unwrapped from Just
until a
Nothing
is found, then stops.
This can be constructed from a plan with
takingJusts :: Process (Maybe a) a takingJusts = repeatedly $ await >>= maybe stop yield
Examples:
>>>
run $ takingJusts <~ source [Just 1, Just 2, Nothing, Just 3, Just 4]
[1,2]
takingWhile :: (a -> Bool) -> Process a a #
A Process
that passes through elements until a predicate ceases to hold, then stops
This can be constructed from a plan with
takingWhile :: (a -> Bool) -> Process a a takingWhile p = repeatedly $ await >>= v -> if p v then yield v else stop
Examples:
>>>
run $ takingWhile (< 3) <~ source [1..5]
[1,2]
taking :: Int -> Process a a #
A Process
that passes through the first n
elements from its input then stops
This can be constructed from a plan with
taking n = construct . replicateM_ n $ await >>= yield
Examples:
>>>
run $ taking 3 <~ source [1..5]
[1,2,3]
dropping :: Int -> Process a a #
A Process
that drops the first n
, then repeats the rest.
This can be constructed from a plan with
dropping n = before echo $ replicateM_ n await
Examples:
>>>
run $ dropping 3 <~ source [1..5]
[4,5]
filtered :: (a -> Bool) -> Process a a #
A Process
that only passes through inputs that match a predicate.
This can be constructed from a plan with
filtered :: (a -> Bool) -> Process a a filtered p = repeatedly $ do i <- await when (p i) $ yield i
Examples:
>>>
run $ filtered even <~ source [1..5]
[2,4]
The trivial Process
that simply repeats each input it receives.
This can be constructed from a plan with
echo :: Process a a echo = repeatedly $ do i <- await yield i
Examples:
>>>
run $ echo <~ source [1..5]
[1,2,3,4,5]
type Process a b = Machine (Is a) b #
A
is a stream transducer that can consume values of type Process
a ba
from its input, and produce values of type b
for its output.
class Automaton (k :: Type -> Type -> Type) where #
Instances
Automaton Mealy | |
Defined in Data.Machine.Mealy | |
Automaton Moore | |
Defined in Data.Machine.Moore | |
Automaton Is | |
Defined in Data.Machine.Process | |
Automaton ((->) :: Type -> Type -> Type) | |
Defined in Data.Machine.Process |
class AutomatonM (x :: (Type -> Type) -> Type -> Type -> Type) where #
Instances
finishWith :: forall (m :: Type -> Type) o r (k :: Type -> Type). Monad m => (o -> Maybe r) -> MachineT m k o -> MachineT m k (Either r o) #
Use a function to produce and mark a yielded value as the
terminal value of a Machine
. All yielded values for which the
given function returns Nothing
are yielded down the pipeline, but
the first value for which the function returns a Just
value will
be returned by a Plan
created via deconstruct
.
tagDone :: forall (m :: Type -> Type) o (k :: Type -> Type). Monad m => (o -> Bool) -> MachineT m k o -> MachineT m k (Either o o) #
Use a predicate to mark a yielded value as the terminal value of
this Machine
. This is useful in combination with deconstruct
to
combine Plan
s.
deconstruct :: forall (m :: Type -> Type) (k :: Type -> Type) a o. Monad m => MachineT m k (Either a o) -> PlanT k o m a #
starve :: forall (m :: Type -> Type) (k0 :: Type -> Type) b (k :: Type -> Type). Monad m => MachineT m k0 b -> MachineT m k b -> MachineT m k b #
Run a machine with no input until it stops, then behave as another machine.
preplan :: forall (m :: Type -> Type) (k :: Type -> Type) o. Monad m => PlanT k o m (MachineT m k o) -> MachineT m k o #
Incorporate a Plan
into the resulting machine.
before :: forall (m :: Type -> Type) (k :: Type -> Type) o a. Monad m => MachineT m k o -> PlanT k o m a -> MachineT m k o #
Evaluate a machine until it stops, and then yield answers according to the supplied model.
unfoldPlan :: forall (m :: Type -> Type) s (k :: Type -> Type) o. Monad m => s -> (s -> PlanT k o m s) -> MachineT m k o #
Unfold a stateful PlanT into a MachineT.
repeatedly :: forall (m :: Type -> Type) (k :: Type -> Type) o a. Monad m => PlanT k o m a -> MachineT m k o #
Generates a model that runs a machine until it stops, then start it up again.
repeatedly
m =construct
(forever
m)
construct :: forall (m :: Type -> Type) (k :: Type -> Type) o a. Monad m => PlanT k o m a -> MachineT m k o #
Compile a machine to a model.
fitM :: forall m m' (k :: Type -> Type) o. (Monad m, Monad m') => (forall a. m a -> m' a) -> MachineT m k o -> MachineT m' k o #
fit :: forall (m :: Type -> Type) k k' o. Monad m => (forall a. k a -> k' a) -> MachineT m k o -> MachineT m k' o #
run :: forall (k :: Type -> Type) b. MachineT Identity k b -> [b] #
Run a pure machine and extract an answer.
runT :: forall m (k :: Type -> Type) b. Monad m => MachineT m k b -> m [b] #
Stop feeding input into model and extract an answer
runT_ :: forall m (k :: Type -> Type) b. Monad m => MachineT m k b -> m () #
Stop feeding input into model, taking only the effects.
stepMachine :: forall (m :: Type -> Type) (k :: Type -> Type) o (k' :: Type -> Type) o'. Monad m => MachineT m k o -> (Step k o (MachineT m k o) -> MachineT m k' o') -> MachineT m k' o' #
Transform a Machine
by looking at a single step of that machine.
encased :: forall (m :: Type -> Type) (k :: Type -> Type) o. Monad m => Step k o (MachineT m k o) -> MachineT m k o #
runMachine :: forall (k :: Type -> Type) o. MachineT Identity k o -> Step k o (MachineT Identity k o) #
newtype MachineT (m :: Type -> Type) (k :: Type -> Type) o #
A MachineT
reads from a number of inputs and may yield results before stopping
with monadic side-effects.
MachineT | |
|
Instances
Monad m => Functor (MachineT m k) | |
(Monad m, Appliance k) => Applicative (MachineT m k) | |
Defined in Data.Machine.Type | |
m ~ Identity => Foldable (MachineT m k) | This permits toList to be used on a Machine. |
Defined in Data.Machine.Type fold :: Monoid m0 => MachineT m k m0 -> m0 # foldMap :: Monoid m0 => (a -> m0) -> MachineT m k a -> m0 # foldMap' :: Monoid m0 => (a -> m0) -> MachineT m k a -> m0 # foldr :: (a -> b -> b) -> b -> MachineT m k a -> b # foldr' :: (a -> b -> b) -> b -> MachineT m k a -> b # foldl :: (b -> a -> b) -> b -> MachineT m k a -> b # foldl' :: (b -> a -> b) -> b -> MachineT m k a -> b # foldr1 :: (a -> a -> a) -> MachineT m k a -> a # foldl1 :: (a -> a -> a) -> MachineT m k a -> a # toList :: MachineT m k a -> [a] # null :: MachineT m k a -> Bool # length :: MachineT m k a -> Int # elem :: Eq a => a -> MachineT m k a -> Bool # maximum :: Ord a => MachineT m k a -> a # minimum :: Ord a => MachineT m k a -> a # | |
Monad m => Pointed (MachineT m k) | |
Defined in Data.Machine.Type | |
Monad m => Semigroup (MachineT m k o) | |
Monad m => Monoid (MachineT m k o) | |
class Appliance (k :: Type -> Type) where #
An input type that supports merging requests from multiple machines.
exhaust :: forall m a (k :: Type -> Type). Monad m => m (Maybe a) -> PlanT k a m () #
Run a monadic action repeatedly yielding its results, until it returns Nothing.
maybeYield :: forall o (k :: Type -> Type). Maybe o -> Plan k o () #
Like yield, except stops if there is no value to yield.
runPlan :: PlanT k o Identity a -> (a -> r) -> (o -> r -> r) -> (forall z. (z -> r) -> k z -> r -> r) -> r -> r #
newtype PlanT (k :: Type -> Type) o (m :: Type -> Type) a #
PlanT | |
|
Instances
MonadReader e m => MonadReader e (PlanT k o m) | |
MonadState s m => MonadState s (PlanT k o m) | |
MonadWriter w m => MonadWriter w (PlanT k o m) | |
MonadError e m => MonadError e (PlanT k o m) | |
Defined in Data.Machine.Plan throwError :: e -> PlanT k o m a # catchError :: PlanT k o m a -> (e -> PlanT k o m a) -> PlanT k o m a # | |
MonadTrans (PlanT k o) | |
Defined in Data.Machine.Plan | |
Monad (PlanT k o m) | |
Functor (PlanT k o m) | |
MonadFail (PlanT k o m) | |
Defined in Data.Machine.Plan | |
Applicative (PlanT k o m) | |
Defined in Data.Machine.Plan | |
MonadIO m => MonadIO (PlanT k o m) | |
Defined in Data.Machine.Plan | |
Alternative (PlanT k o m) | |
MonadPlus (PlanT k o m) | |
type Plan (k :: Type -> Type) o a = forall (m :: Type -> Type). PlanT k o m a #
A
is a specification for a pure Plan
k o aMachine
, that reads inputs selected by k
with types based on i
, writes values of type o
, and has intermediate results of type a
.
A
can be used as a Plan
k o a
for any PlanT
k o m a
.Monad
m
It is perhaps easier to think of Plan
in its un-cps'ed form, which would
look like:
data Plan
k o a
= Done a
| Yield o (Plan k o a)
| forall z. Await (z -> Plan k o a) (k z) (Plan k o a)
| Fail
Witnessed type equality
Concurrent connection
(>~>) :: MonadBaseControl IO m => MachineT m k b -> ProcessT m b c -> MachineT m k c infixl 7 Source #
Flipped (<~<
).
(<~<) :: MonadBaseControl IO m => ProcessT m b c -> MachineT m k b -> MachineT m k c Source #
Build a new Machine
by adding a Process
to the output of an
old Machine
. The upstream machine is run concurrently with
downstream with the aim that upstream will have a yielded value
ready as soon as downstream awaits. This effectively creates a
buffer between upstream and downstream, or source and sink, that
can contain up to one value.
(<~<
) ::Process
b c ->Process
a b ->Process
a c (<~<
) ::Process
c d ->Tee
a b c ->Tee
a b d (<~<
) ::Process
b c ->Machine
k b ->Machine
k c
Buffered machines
bufferConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c Source #
rollingConnect :: MonadBaseControl IO m => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c Source #
Concurrent processing of shared inputs
fanout :: (MonadBaseControl IO m, Semigroup r) => [ProcessT m a r] -> ProcessT m a r Source #
Share inputs with each of a list of processes in lockstep. Any values yielded by the processes for a given input are combined into a single yield from the composite process.
fanoutSteps :: (MonadBaseControl IO m, Monoid r) => [ProcessT m a r] -> ProcessT m a r Source #
Share inputs with each of a list of processes in lockstep. If
none of the processes yields a value, the composite process will
itself yield mempty
. The idea is to provide a handle on steps
only executed for their side effects. For instance, if you want to
run a collection of ProcessT
s that await but don't yield some
number of times, you can use 'fanOutSteps . map (fmap (const ()))'
followed by a taking
process.
Concurrent multiple-input machines
wye :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> WyeT m a' b' c -> WyeT m a b c Source #
tee :: forall m a a' b b' c. MonadBaseControl IO m => ProcessT m a a' -> ProcessT m b b' -> TeeT m a' b' c -> TeeT m a b c Source #
Compose a pair of pipes onto the front of a Tee.
scatter :: MonadBaseControl IO m => [MachineT m k o] -> MachineT m k o Source #
Produces values from whichever source MachineT
yields
first. This operation may also be viewed as a gather operation in
that all values produced by the given machines are interleaved when
fed downstream. Note that inputs are not shared. The composite
machine will await an input when any constituent machine awaits an
input. That input will be supplied to the awaiting constituent and
no other.
Some examples of more specific useful types scatter
may be used
at,
scatter :: [ProcessT m a b] -> ProcessT m a b scatter :: [SourceT m a] -> SourceT m a
The former may be used to stream data through a collection of
worker Process
es, the latter may be used to intersperse values
from a collection of sources.
splitSum :: forall m a b c d. MonadBaseControl IO m => ProcessT m a b -> ProcessT m c d -> ProcessT m (Either a c) (Either b d) Source #
Similar to +++
: split the input between two
processes, retagging and merging their outputs.
The two processes are run concurrently whenever possible.
mergeSum :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (Either a b) r Source #
Similar to |||
: split the input between two
processes and merge their outputs.
Connect two processes to the downstream tails of a Machine
that
produces Either
s. The two downstream consumers are run
concurrently when possible. When one downstream consumer stops, the
other is allowed to run until it stops or the upstream source
yields a value the remaining consumer can not handle.
mergeSum sinkL sinkR
produces a topology like this,
sinkL / \ a \ / \ source -- Either a b --> -- r --> \ / b / \ / sinkR
splitProd :: forall m a b r. MonadBaseControl IO m => ProcessT m a r -> ProcessT m b r -> ProcessT m (a, b) r Source #
Connect two processes to the downstream tails of a Machine
that
produces tuples. The two downstream consumers are run
concurrently. When one downstream consumer stops, the entire
pipeline is stopped.
splitProd sink1 sink2
produces a topology like this,
sink1 / \ a \ / \ source -- (a,b) --> -- r --> \ / b / \ / sink2