-- | -- Module : Simulation.Aivika.Processor -- Copyright : Copyright (c) 2009-2017, David Sorokin <david.sorokin@gmail.com> -- License : BSD3 -- Maintainer : David Sorokin <david.sorokin@gmail.com> -- Stability : experimental -- Tested with: GHC 8.0.1 -- -- The processor of simulation data. -- module Simulation.Aivika.Processor (-- * Processor Type Processor(..), -- * Processor Primitives emptyProcessor, arrProcessor, accumProcessor, withinProcessor, -- * Specifying Identifier processorUsingId, -- * Prefetch and Delay Processors prefetchProcessor, delayProcessor, -- * Buffer Processor bufferProcessor, bufferProcessorLoop, -- * Processing Queues queueProcessor, queueProcessorLoopMerging, queueProcessorLoopSeq, queueProcessorLoopParallel, -- * Sequencing Processors processorSeq, -- * Parallelizing Processors processorParallel, processorQueuedParallel, processorPrioritisingOutputParallel, processorPrioritisingInputParallel, processorPrioritisingInputOutputParallel, -- * Arrival Processor arrivalProcessor, -- * Utilities joinProcessor, -- * Failover failoverProcessor, -- * Integrating with Signals and Channels channelProcessor, processorChannel, queuedChannelProcessor, queuedProcessorChannel, -- * Debugging traceProcessor) where import qualified Control.Category as C import Control.Arrow import Data.Monoid import Simulation.Aivika.Simulation import Simulation.Aivika.Dynamics import Simulation.Aivika.Event import Simulation.Aivika.Composite import Simulation.Aivika.Cont import Simulation.Aivika.Process import Simulation.Aivika.Stream import Simulation.Aivika.QueueStrategy import Simulation.Aivika.Signal import Simulation.Aivika.Channel import Simulation.Aivika.Internal.Arrival -- | Represents a processor of simulation data. newtype Processor a b = Processor { Processor a b -> Stream a -> Stream b runProcessor :: Stream a -> Stream b -- ^ Run the processor. } instance C.Category Processor where id :: Processor a a id = (Stream a -> Stream a) -> Processor a a forall a b. (Stream a -> Stream b) -> Processor a b Processor Stream a -> Stream a forall a. a -> a id Processor Stream b -> Stream c x . :: Processor b c -> Processor a b -> Processor a c . Processor Stream a -> Stream b y = (Stream a -> Stream c) -> Processor a c forall a b. (Stream a -> Stream b) -> Processor a b Processor (Stream b -> Stream c x (Stream b -> Stream c) -> (Stream a -> Stream b) -> Stream a -> Stream c forall b c a. (b -> c) -> (a -> b) -> a -> c . Stream a -> Stream b y) -- The implementation is based on article -- A New Notation for Arrows by Ross Paterson, -- although my streams are different and they -- already depend on the Process monad, -- while the pure streams were considered in the -- mentioned article. instance Arrow Processor where arr :: (b -> c) -> Processor b c arr = (Stream b -> Stream c) -> Processor b c forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream b -> Stream c) -> Processor b c) -> ((b -> c) -> Stream b -> Stream c) -> (b -> c) -> Processor b c forall b c a. (b -> c) -> (a -> b) -> a -> c . (b -> c) -> Stream b -> Stream c forall a b. (a -> b) -> Stream a -> Stream b mapStream first :: Processor b c -> Processor (b, d) (c, d) first (Processor Stream b -> Stream c f) = (Stream (b, d) -> Stream (c, d)) -> Processor (b, d) (c, d) forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream (b, d) -> Stream (c, d)) -> Processor (b, d) (c, d)) -> (Stream (b, d) -> Stream (c, d)) -> Processor (b, d) (c, d) forall a b. (a -> b) -> a -> b $ \Stream (b, d) xys -> Process ((c, d), Stream (c, d)) -> Stream (c, d) forall a. Process (a, Stream a) -> Stream a Cons (Process ((c, d), Stream (c, d)) -> Stream (c, d)) -> Process ((c, d), Stream (c, d)) -> Stream (c, d) forall a b. (a -> b) -> a -> b $ do (Stream b xs, Stream d ys) <- Simulation (Stream b, Stream d) -> Process (Stream b, Stream d) forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a liftSimulation (Simulation (Stream b, Stream d) -> Process (Stream b, Stream d)) -> Simulation (Stream b, Stream d) -> Process (Stream b, Stream d) forall a b. (a -> b) -> a -> b $ Stream (b, d) -> Simulation (Stream b, Stream d) forall a b. Stream (a, b) -> Simulation (Stream a, Stream b) unzipStream Stream (b, d) xys Stream (c, d) -> Process ((c, d), Stream (c, d)) forall a. Stream a -> Process (a, Stream a) runStream (Stream (c, d) -> Process ((c, d), Stream (c, d))) -> Stream (c, d) -> Process ((c, d), Stream (c, d)) forall a b. (a -> b) -> a -> b $ Stream c -> Stream d -> Stream (c, d) forall a b. Stream a -> Stream b -> Stream (a, b) zipStreamParallel (Stream b -> Stream c f Stream b xs) Stream d ys second :: Processor b c -> Processor (d, b) (d, c) second (Processor Stream b -> Stream c f) = (Stream (d, b) -> Stream (d, c)) -> Processor (d, b) (d, c) forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream (d, b) -> Stream (d, c)) -> Processor (d, b) (d, c)) -> (Stream (d, b) -> Stream (d, c)) -> Processor (d, b) (d, c) forall a b. (a -> b) -> a -> b $ \Stream (d, b) xys -> Process ((d, c), Stream (d, c)) -> Stream (d, c) forall a. Process (a, Stream a) -> Stream a Cons (Process ((d, c), Stream (d, c)) -> Stream (d, c)) -> Process ((d, c), Stream (d, c)) -> Stream (d, c) forall a b. (a -> b) -> a -> b $ do (Stream d xs, Stream b ys) <- Simulation (Stream d, Stream b) -> Process (Stream d, Stream b) forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a liftSimulation (Simulation (Stream d, Stream b) -> Process (Stream d, Stream b)) -> Simulation (Stream d, Stream b) -> Process (Stream d, Stream b) forall a b. (a -> b) -> a -> b $ Stream (d, b) -> Simulation (Stream d, Stream b) forall a b. Stream (a, b) -> Simulation (Stream a, Stream b) unzipStream Stream (d, b) xys Stream (d, c) -> Process ((d, c), Stream (d, c)) forall a. Stream a -> Process (a, Stream a) runStream (Stream (d, c) -> Process ((d, c), Stream (d, c))) -> Stream (d, c) -> Process ((d, c), Stream (d, c)) forall a b. (a -> b) -> a -> b $ Stream d -> Stream c -> Stream (d, c) forall a b. Stream a -> Stream b -> Stream (a, b) zipStreamParallel Stream d xs (Stream b -> Stream c f Stream b ys) Processor Stream b -> Stream c f *** :: Processor b c -> Processor b' c' -> Processor (b, b') (c, c') *** Processor Stream b' -> Stream c' g = (Stream (b, b') -> Stream (c, c')) -> Processor (b, b') (c, c') forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream (b, b') -> Stream (c, c')) -> Processor (b, b') (c, c')) -> (Stream (b, b') -> Stream (c, c')) -> Processor (b, b') (c, c') forall a b. (a -> b) -> a -> b $ \Stream (b, b') xys -> Process ((c, c'), Stream (c, c')) -> Stream (c, c') forall a. Process (a, Stream a) -> Stream a Cons (Process ((c, c'), Stream (c, c')) -> Stream (c, c')) -> Process ((c, c'), Stream (c, c')) -> Stream (c, c') forall a b. (a -> b) -> a -> b $ do (Stream b xs, Stream b' ys) <- Simulation (Stream b, Stream b') -> Process (Stream b, Stream b') forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a liftSimulation (Simulation (Stream b, Stream b') -> Process (Stream b, Stream b')) -> Simulation (Stream b, Stream b') -> Process (Stream b, Stream b') forall a b. (a -> b) -> a -> b $ Stream (b, b') -> Simulation (Stream b, Stream b') forall a b. Stream (a, b) -> Simulation (Stream a, Stream b) unzipStream Stream (b, b') xys Stream (c, c') -> Process ((c, c'), Stream (c, c')) forall a. Stream a -> Process (a, Stream a) runStream (Stream (c, c') -> Process ((c, c'), Stream (c, c'))) -> Stream (c, c') -> Process ((c, c'), Stream (c, c')) forall a b. (a -> b) -> a -> b $ Stream c -> Stream c' -> Stream (c, c') forall a b. Stream a -> Stream b -> Stream (a, b) zipStreamParallel (Stream b -> Stream c f Stream b xs) (Stream b' -> Stream c' g Stream b' ys) Processor Stream b -> Stream c f &&& :: Processor b c -> Processor b c' -> Processor b (c, c') &&& Processor Stream b -> Stream c' g = (Stream b -> Stream (c, c')) -> Processor b (c, c') forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream b -> Stream (c, c')) -> Processor b (c, c')) -> (Stream b -> Stream (c, c')) -> Processor b (c, c') forall a b. (a -> b) -> a -> b $ \Stream b xs -> Stream c -> Stream c' -> Stream (c, c') forall a b. Stream a -> Stream b -> Stream (a, b) zipStreamParallel (Stream b -> Stream c f Stream b xs) (Stream b -> Stream c' g Stream b xs) instance ArrowChoice Processor where left :: Processor b c -> Processor (Either b d) (Either c d) left (Processor Stream b -> Stream c f) = (Stream (Either b d) -> Stream (Either c d)) -> Processor (Either b d) (Either c d) forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream (Either b d) -> Stream (Either c d)) -> Processor (Either b d) (Either c d)) -> (Stream (Either b d) -> Stream (Either c d)) -> Processor (Either b d) (Either c d) forall a b. (a -> b) -> a -> b $ \Stream (Either b d) xs -> Process (Either c d, Stream (Either c d)) -> Stream (Either c d) forall a. Process (a, Stream a) -> Stream a Cons (Process (Either c d, Stream (Either c d)) -> Stream (Either c d)) -> Process (Either c d, Stream (Either c d)) -> Stream (Either c d) forall a b. (a -> b) -> a -> b $ do Stream (Either b d) ys <- Simulation (Stream (Either b d)) -> Process (Stream (Either b d)) forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a liftSimulation (Simulation (Stream (Either b d)) -> Process (Stream (Either b d))) -> Simulation (Stream (Either b d)) -> Process (Stream (Either b d)) forall a b. (a -> b) -> a -> b $ Stream (Either b d) -> Simulation (Stream (Either b d)) forall a. Stream a -> Simulation (Stream a) memoStream Stream (Either b d) xs Stream (Either c d) -> Process (Either c d, Stream (Either c d)) forall a. Stream a -> Process (a, Stream a) runStream (Stream (Either c d) -> Process (Either c d, Stream (Either c d))) -> Stream (Either c d) -> Process (Either c d, Stream (Either c d)) forall a b. (a -> b) -> a -> b $ Stream (Either b d) -> Stream c -> Stream (Either c d) forall a b c. Stream (Either a b) -> Stream c -> Stream (Either c b) replaceLeftStream Stream (Either b d) ys (Stream b -> Stream c f (Stream b -> Stream c) -> Stream b -> Stream c forall a b. (a -> b) -> a -> b $ Stream (Either b d) -> Stream b forall a b. Stream (Either a b) -> Stream a leftStream Stream (Either b d) ys) right :: Processor b c -> Processor (Either d b) (Either d c) right (Processor Stream b -> Stream c f) = (Stream (Either d b) -> Stream (Either d c)) -> Processor (Either d b) (Either d c) forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream (Either d b) -> Stream (Either d c)) -> Processor (Either d b) (Either d c)) -> (Stream (Either d b) -> Stream (Either d c)) -> Processor (Either d b) (Either d c) forall a b. (a -> b) -> a -> b $ \Stream (Either d b) xs -> Process (Either d c, Stream (Either d c)) -> Stream (Either d c) forall a. Process (a, Stream a) -> Stream a Cons (Process (Either d c, Stream (Either d c)) -> Stream (Either d c)) -> Process (Either d c, Stream (Either d c)) -> Stream (Either d c) forall a b. (a -> b) -> a -> b $ do Stream (Either d b) ys <- Simulation (Stream (Either d b)) -> Process (Stream (Either d b)) forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a liftSimulation (Simulation (Stream (Either d b)) -> Process (Stream (Either d b))) -> Simulation (Stream (Either d b)) -> Process (Stream (Either d b)) forall a b. (a -> b) -> a -> b $ Stream (Either d b) -> Simulation (Stream (Either d b)) forall a. Stream a -> Simulation (Stream a) memoStream Stream (Either d b) xs Stream (Either d c) -> Process (Either d c, Stream (Either d c)) forall a. Stream a -> Process (a, Stream a) runStream (Stream (Either d c) -> Process (Either d c, Stream (Either d c))) -> Stream (Either d c) -> Process (Either d c, Stream (Either d c)) forall a b. (a -> b) -> a -> b $ Stream (Either d b) -> Stream c -> Stream (Either d c) forall a b c. Stream (Either a b) -> Stream c -> Stream (Either a c) replaceRightStream Stream (Either d b) ys (Stream b -> Stream c f (Stream b -> Stream c) -> Stream b -> Stream c forall a b. (a -> b) -> a -> b $ Stream (Either d b) -> Stream b forall a b. Stream (Either a b) -> Stream b rightStream Stream (Either d b) ys) instance ArrowZero Processor where zeroArrow :: Processor b c zeroArrow = (Stream b -> Stream c) -> Processor b c forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream b -> Stream c) -> Processor b c) -> (Stream b -> Stream c) -> Processor b c forall a b. (a -> b) -> a -> b $ Stream c -> Stream b -> Stream c forall a b. a -> b -> a const Stream c forall a. Stream a emptyStream instance ArrowPlus Processor where (Processor Stream b -> Stream c f) <+> :: Processor b c -> Processor b c -> Processor b c <+> (Processor Stream b -> Stream c g) = (Stream b -> Stream c) -> Processor b c forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream b -> Stream c) -> Processor b c) -> (Stream b -> Stream c) -> Processor b c forall a b. (a -> b) -> a -> b $ \Stream b xs -> Process (c, Stream c) -> Stream c forall a. Process (a, Stream a) -> Stream a Cons (Process (c, Stream c) -> Stream c) -> Process (c, Stream c) -> Stream c forall a b. (a -> b) -> a -> b $ do [Stream b xs1, Stream b xs2] <- Simulation [Stream b] -> Process [Stream b] forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a liftSimulation (Simulation [Stream b] -> Process [Stream b]) -> Simulation [Stream b] -> Process [Stream b] forall a b. (a -> b) -> a -> b $ Int -> Stream b -> Simulation [Stream b] forall a. Int -> Stream a -> Simulation [Stream a] splitStream Int 2 Stream b xs Stream c -> Process (c, Stream c) forall a. Stream a -> Process (a, Stream a) runStream (Stream c -> Process (c, Stream c)) -> Stream c -> Process (c, Stream c) forall a b. (a -> b) -> a -> b $ Stream c -> Stream c -> Stream c forall a. Stream a -> Stream a -> Stream a mergeStreams (Stream b -> Stream c f Stream b xs1) (Stream b -> Stream c g Stream b xs2) -- | A processor that never finishes its work producing an 'emptyStream'. emptyProcessor :: Processor a b emptyProcessor :: Processor a b emptyProcessor = (Stream a -> Stream b) -> Processor a b forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream b) -> Processor a b) -> (Stream a -> Stream b) -> Processor a b forall a b. (a -> b) -> a -> b $ Stream b -> Stream a -> Stream b forall a b. a -> b -> a const Stream b forall a. Stream a emptyStream -- | Create a simple processor by the specified handling function -- that runs the discontinuous process for each input value to get the output. arrProcessor :: (a -> Process b) -> Processor a b arrProcessor :: (a -> Process b) -> Processor a b arrProcessor = (Stream a -> Stream b) -> Processor a b forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream b) -> Processor a b) -> ((a -> Process b) -> Stream a -> Stream b) -> (a -> Process b) -> Processor a b forall b c a. (b -> c) -> (a -> b) -> a -> c . (a -> Process b) -> Stream a -> Stream b forall a b. (a -> Process b) -> Stream a -> Stream b mapStreamM -- | Accumulator that outputs a value determined by the supplied function. accumProcessor :: (acc -> a -> Process (acc, b)) -> acc -> Processor a b accumProcessor :: (acc -> a -> Process (acc, b)) -> acc -> Processor a b accumProcessor acc -> a -> Process (acc, b) f acc acc = (Stream a -> Stream b) -> Processor a b forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream b) -> Processor a b) -> (Stream a -> Stream b) -> Processor a b forall a b. (a -> b) -> a -> b $ (acc -> a -> Process (acc, b)) -> acc -> Stream a -> Stream b forall acc a b. (acc -> a -> Process (acc, b)) -> acc -> Stream a -> Stream b accumStream acc -> a -> Process (acc, b) f acc acc -- | Involve the computation with side effect when processing a stream of data. withinProcessor :: Process () -> Processor a a withinProcessor :: Process () -> Processor a a withinProcessor Process () m = (Stream a -> Stream a) -> Processor a a forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream a) -> Processor a a) -> (Stream a -> Stream a) -> Processor a a forall a b. (a -> b) -> a -> b $ (a -> Process a) -> Stream a -> Stream a forall a b. (a -> Process b) -> Stream a -> Stream b mapStreamM ((a -> Process a) -> Stream a -> Stream a) -> (a -> Process a) -> Stream a -> Stream a forall a b. (a -> b) -> a -> b $ \a a -> do { Process () m; a -> Process a forall (m :: * -> *) a. Monad m => a -> m a return a a } -- | Create a processor that will use the specified process identifier. -- It can be useful to refer to the underlying 'Process' computation which -- can be passivated, interrupted, canceled and so on. See also the -- 'processUsingId' function for more details. processorUsingId :: ProcessId -> Processor a b -> Processor a b processorUsingId :: ProcessId -> Processor a b -> Processor a b processorUsingId ProcessId pid (Processor Stream a -> Stream b f) = (Stream a -> Stream b) -> Processor a b forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream b) -> Processor a b) -> (Stream a -> Stream b) -> Processor a b forall a b. (a -> b) -> a -> b $ Process (b, Stream b) -> Stream b forall a. Process (a, Stream a) -> Stream a Cons (Process (b, Stream b) -> Stream b) -> (Stream a -> Process (b, Stream b)) -> Stream a -> Stream b forall b c a. (b -> c) -> (a -> b) -> a -> c . ProcessId -> Process (b, Stream b) -> Process (b, Stream b) forall a. ProcessId -> Process a -> Process a processUsingId ProcessId pid (Process (b, Stream b) -> Process (b, Stream b)) -> (Stream a -> Process (b, Stream b)) -> Stream a -> Process (b, Stream b) forall b c a. (b -> c) -> (a -> b) -> a -> c . Stream b -> Process (b, Stream b) forall a. Stream a -> Process (a, Stream a) runStream (Stream b -> Process (b, Stream b)) -> (Stream a -> Stream b) -> Stream a -> Process (b, Stream b) forall b c a. (b -> c) -> (a -> b) -> a -> c . Stream a -> Stream b f -- | Launches the specified processors in parallel consuming the same input -- stream and producing a combined output stream. -- -- If you don't know what the enqueue strategies to apply, then -- you will probably need 'FCFS' for the both parameters, or -- function 'processorParallel' that does namely this. processorQueuedParallel :: (EnqueueStrategy si, EnqueueStrategy so) => si -- ^ the strategy applied for enqueuing the input data -> so -- ^ the strategy applied for enqueuing the output data -> [Processor a b] -- ^ the processors to parallelize -> Processor a b -- ^ the parallelized processor processorQueuedParallel :: si -> so -> [Processor a b] -> Processor a b processorQueuedParallel si si so so [Processor a b] ps = (Stream a -> Stream b) -> Processor a b forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream b) -> Processor a b) -> (Stream a -> Stream b) -> Processor a b forall a b. (a -> b) -> a -> b $ \Stream a xs -> Process (b, Stream b) -> Stream b forall a. Process (a, Stream a) -> Stream a Cons (Process (b, Stream b) -> Stream b) -> Process (b, Stream b) -> Stream b forall a b. (a -> b) -> a -> b $ do let n :: Int n = [Processor a b] -> Int forall (t :: * -> *) a. Foldable t => t a -> Int length [Processor a b] ps [Stream a] input <- Simulation [Stream a] -> Process [Stream a] forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a liftSimulation (Simulation [Stream a] -> Process [Stream a]) -> Simulation [Stream a] -> Process [Stream a] forall a b. (a -> b) -> a -> b $ si -> Int -> Stream a -> Simulation [Stream a] forall s a. EnqueueStrategy s => s -> Int -> Stream a -> Simulation [Stream a] splitStreamQueueing si si Int n Stream a xs let results :: [Stream b] results = (((Stream a, Processor a b) -> Stream b) -> [(Stream a, Processor a b)] -> [Stream b]) -> [(Stream a, Processor a b)] -> ((Stream a, Processor a b) -> Stream b) -> [Stream b] forall a b c. (a -> b -> c) -> b -> a -> c flip ((Stream a, Processor a b) -> Stream b) -> [(Stream a, Processor a b)] -> [Stream b] forall a b. (a -> b) -> [a] -> [b] map ([Stream a] -> [Processor a b] -> [(Stream a, Processor a b)] forall a b. [a] -> [b] -> [(a, b)] zip [Stream a] input [Processor a b] ps) (((Stream a, Processor a b) -> Stream b) -> [Stream b]) -> ((Stream a, Processor a b) -> Stream b) -> [Stream b] forall a b. (a -> b) -> a -> b $ \(Stream a input, Processor a b p) -> Processor a b -> Stream a -> Stream b forall a b. Processor a b -> Stream a -> Stream b runProcessor Processor a b p Stream a input output :: Stream b output = so -> [Stream b] -> Stream b forall s a. EnqueueStrategy s => s -> [Stream a] -> Stream a concatQueuedStreams so so [Stream b] results Stream b -> Process (b, Stream b) forall a. Stream a -> Process (a, Stream a) runStream Stream b output -- | Launches the specified processors in parallel using priorities for combining the output. processorPrioritisingOutputParallel :: (EnqueueStrategy si, PriorityQueueStrategy so po) => si -- ^ the strategy applied for enqueuing the input data -> so -- ^ the strategy applied for enqueuing the output data -> [Processor a (po, b)] -- ^ the processors to parallelize -> Processor a b -- ^ the parallelized processor processorPrioritisingOutputParallel :: si -> so -> [Processor a (po, b)] -> Processor a b processorPrioritisingOutputParallel si si so so [Processor a (po, b)] ps = (Stream a -> Stream b) -> Processor a b forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream b) -> Processor a b) -> (Stream a -> Stream b) -> Processor a b forall a b. (a -> b) -> a -> b $ \Stream a xs -> Process (b, Stream b) -> Stream b forall a. Process (a, Stream a) -> Stream a Cons (Process (b, Stream b) -> Stream b) -> Process (b, Stream b) -> Stream b forall a b. (a -> b) -> a -> b $ do let n :: Int n = [Processor a (po, b)] -> Int forall (t :: * -> *) a. Foldable t => t a -> Int length [Processor a (po, b)] ps [Stream a] input <- Simulation [Stream a] -> Process [Stream a] forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a liftSimulation (Simulation [Stream a] -> Process [Stream a]) -> Simulation [Stream a] -> Process [Stream a] forall a b. (a -> b) -> a -> b $ si -> Int -> Stream a -> Simulation [Stream a] forall s a. EnqueueStrategy s => s -> Int -> Stream a -> Simulation [Stream a] splitStreamQueueing si si Int n Stream a xs let results :: [Stream (po, b)] results = (((Stream a, Processor a (po, b)) -> Stream (po, b)) -> [(Stream a, Processor a (po, b))] -> [Stream (po, b)]) -> [(Stream a, Processor a (po, b))] -> ((Stream a, Processor a (po, b)) -> Stream (po, b)) -> [Stream (po, b)] forall a b c. (a -> b -> c) -> b -> a -> c flip ((Stream a, Processor a (po, b)) -> Stream (po, b)) -> [(Stream a, Processor a (po, b))] -> [Stream (po, b)] forall a b. (a -> b) -> [a] -> [b] map ([Stream a] -> [Processor a (po, b)] -> [(Stream a, Processor a (po, b))] forall a b. [a] -> [b] -> [(a, b)] zip [Stream a] input [Processor a (po, b)] ps) (((Stream a, Processor a (po, b)) -> Stream (po, b)) -> [Stream (po, b)]) -> ((Stream a, Processor a (po, b)) -> Stream (po, b)) -> [Stream (po, b)] forall a b. (a -> b) -> a -> b $ \(Stream a input, Processor a (po, b) p) -> Processor a (po, b) -> Stream a -> Stream (po, b) forall a b. Processor a b -> Stream a -> Stream b runProcessor Processor a (po, b) p Stream a input output :: Stream b output = so -> [Stream (po, b)] -> Stream b forall s p a. PriorityQueueStrategy s p => s -> [Stream (p, a)] -> Stream a concatPriorityStreams so so [Stream (po, b)] results Stream b -> Process (b, Stream b) forall a. Stream a -> Process (a, Stream a) runStream Stream b output -- | Launches the specified processors in parallel using priorities for consuming the intput. processorPrioritisingInputParallel :: (PriorityQueueStrategy si pi, EnqueueStrategy so) => si -- ^ the strategy applied for enqueuing the input data -> so -- ^ the strategy applied for enqueuing the output data -> [(Stream pi, Processor a b)] -- ^ the streams of input priorities and the processors -- to parallelize -> Processor a b -- ^ the parallelized processor processorPrioritisingInputParallel :: si -> so -> [(Stream pi, Processor a b)] -> Processor a b processorPrioritisingInputParallel si si so so [(Stream pi, Processor a b)] ps = (Stream a -> Stream b) -> Processor a b forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream b) -> Processor a b) -> (Stream a -> Stream b) -> Processor a b forall a b. (a -> b) -> a -> b $ \Stream a xs -> Process (b, Stream b) -> Stream b forall a. Process (a, Stream a) -> Stream a Cons (Process (b, Stream b) -> Stream b) -> Process (b, Stream b) -> Stream b forall a b. (a -> b) -> a -> b $ do [Stream a] input <- Simulation [Stream a] -> Process [Stream a] forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a liftSimulation (Simulation [Stream a] -> Process [Stream a]) -> Simulation [Stream a] -> Process [Stream a] forall a b. (a -> b) -> a -> b $ si -> [Stream pi] -> Stream a -> Simulation [Stream a] forall s p a. PriorityQueueStrategy s p => s -> [Stream p] -> Stream a -> Simulation [Stream a] splitStreamPrioritising si si (((Stream pi, Processor a b) -> Stream pi) -> [(Stream pi, Processor a b)] -> [Stream pi] forall a b. (a -> b) -> [a] -> [b] map (Stream pi, Processor a b) -> Stream pi forall a b. (a, b) -> a fst [(Stream pi, Processor a b)] ps) Stream a xs let results :: [Stream b] results = (((Stream a, (Stream pi, Processor a b)) -> Stream b) -> [(Stream a, (Stream pi, Processor a b))] -> [Stream b]) -> [(Stream a, (Stream pi, Processor a b))] -> ((Stream a, (Stream pi, Processor a b)) -> Stream b) -> [Stream b] forall a b c. (a -> b -> c) -> b -> a -> c flip ((Stream a, (Stream pi, Processor a b)) -> Stream b) -> [(Stream a, (Stream pi, Processor a b))] -> [Stream b] forall a b. (a -> b) -> [a] -> [b] map ([Stream a] -> [(Stream pi, Processor a b)] -> [(Stream a, (Stream pi, Processor a b))] forall a b. [a] -> [b] -> [(a, b)] zip [Stream a] input [(Stream pi, Processor a b)] ps) (((Stream a, (Stream pi, Processor a b)) -> Stream b) -> [Stream b]) -> ((Stream a, (Stream pi, Processor a b)) -> Stream b) -> [Stream b] forall a b. (a -> b) -> a -> b $ \(Stream a input, (Stream pi _, Processor a b p)) -> Processor a b -> Stream a -> Stream b forall a b. Processor a b -> Stream a -> Stream b runProcessor Processor a b p Stream a input output :: Stream b output = so -> [Stream b] -> Stream b forall s a. EnqueueStrategy s => s -> [Stream a] -> Stream a concatQueuedStreams so so [Stream b] results Stream b -> Process (b, Stream b) forall a. Stream a -> Process (a, Stream a) runStream Stream b output -- | Launches the specified processors in parallel using priorities for consuming -- the input and combining the output. processorPrioritisingInputOutputParallel :: (PriorityQueueStrategy si pi, PriorityQueueStrategy so po) => si -- ^ the strategy applied for enqueuing the input data -> so -- ^ the strategy applied for enqueuing the output data -> [(Stream pi, Processor a (po, b))] -- ^ the streams of input priorities and the processors -- to parallelize -> Processor a b -- ^ the parallelized processor processorPrioritisingInputOutputParallel :: si -> so -> [(Stream pi, Processor a (po, b))] -> Processor a b processorPrioritisingInputOutputParallel si si so so [(Stream pi, Processor a (po, b))] ps = (Stream a -> Stream b) -> Processor a b forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream b) -> Processor a b) -> (Stream a -> Stream b) -> Processor a b forall a b. (a -> b) -> a -> b $ \Stream a xs -> Process (b, Stream b) -> Stream b forall a. Process (a, Stream a) -> Stream a Cons (Process (b, Stream b) -> Stream b) -> Process (b, Stream b) -> Stream b forall a b. (a -> b) -> a -> b $ do [Stream a] input <- Simulation [Stream a] -> Process [Stream a] forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a liftSimulation (Simulation [Stream a] -> Process [Stream a]) -> Simulation [Stream a] -> Process [Stream a] forall a b. (a -> b) -> a -> b $ si -> [Stream pi] -> Stream a -> Simulation [Stream a] forall s p a. PriorityQueueStrategy s p => s -> [Stream p] -> Stream a -> Simulation [Stream a] splitStreamPrioritising si si (((Stream pi, Processor a (po, b)) -> Stream pi) -> [(Stream pi, Processor a (po, b))] -> [Stream pi] forall a b. (a -> b) -> [a] -> [b] map (Stream pi, Processor a (po, b)) -> Stream pi forall a b. (a, b) -> a fst [(Stream pi, Processor a (po, b))] ps) Stream a xs let results :: [Stream (po, b)] results = (((Stream a, (Stream pi, Processor a (po, b))) -> Stream (po, b)) -> [(Stream a, (Stream pi, Processor a (po, b)))] -> [Stream (po, b)]) -> [(Stream a, (Stream pi, Processor a (po, b)))] -> ((Stream a, (Stream pi, Processor a (po, b))) -> Stream (po, b)) -> [Stream (po, b)] forall a b c. (a -> b -> c) -> b -> a -> c flip ((Stream a, (Stream pi, Processor a (po, b))) -> Stream (po, b)) -> [(Stream a, (Stream pi, Processor a (po, b)))] -> [Stream (po, b)] forall a b. (a -> b) -> [a] -> [b] map ([Stream a] -> [(Stream pi, Processor a (po, b))] -> [(Stream a, (Stream pi, Processor a (po, b)))] forall a b. [a] -> [b] -> [(a, b)] zip [Stream a] input [(Stream pi, Processor a (po, b))] ps) (((Stream a, (Stream pi, Processor a (po, b))) -> Stream (po, b)) -> [Stream (po, b)]) -> ((Stream a, (Stream pi, Processor a (po, b))) -> Stream (po, b)) -> [Stream (po, b)] forall a b. (a -> b) -> a -> b $ \(Stream a input, (Stream pi _, Processor a (po, b) p)) -> Processor a (po, b) -> Stream a -> Stream (po, b) forall a b. Processor a b -> Stream a -> Stream b runProcessor Processor a (po, b) p Stream a input output :: Stream b output = so -> [Stream (po, b)] -> Stream b forall s p a. PriorityQueueStrategy s p => s -> [Stream (p, a)] -> Stream a concatPriorityStreams so so [Stream (po, b)] results Stream b -> Process (b, Stream b) forall a. Stream a -> Process (a, Stream a) runStream Stream b output -- | Launches the processors in parallel consuming the same input stream and producing -- a combined output stream. This version applies the 'FCFS' strategy both for input -- and output, which suits the most part of uses cases. processorParallel :: [Processor a b] -> Processor a b processorParallel :: [Processor a b] -> Processor a b processorParallel = FCFS -> FCFS -> [Processor a b] -> Processor a b forall si so a b. (EnqueueStrategy si, EnqueueStrategy so) => si -> so -> [Processor a b] -> Processor a b processorQueuedParallel FCFS FCFS FCFS FCFS -- | Launches the processors sequentially using the 'prefetchProcessor' between them -- to model an autonomous work of each of the processors specified. processorSeq :: [Processor a a] -> Processor a a processorSeq :: [Processor a a] -> Processor a a processorSeq [] = Processor a a forall b c. Processor b c emptyProcessor processorSeq [Processor a a p] = Processor a a p processorSeq (Processor a a p : [Processor a a] ps) = Processor a a p Processor a a -> Processor a a -> Processor a a forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k). Category cat => cat a b -> cat b c -> cat a c >>> Processor a a forall a. Processor a a prefetchProcessor Processor a a -> Processor a a -> Processor a a forall k (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k). Category cat => cat a b -> cat b c -> cat a c >>> [Processor a a] -> Processor a a forall a. [Processor a a] -> Processor a a processorSeq [Processor a a] ps -- | Create a buffer processor, where the process from the first argument -- consumes the input stream but the stream passed in as the second argument -- and produced usually by some other process is returned as an output. -- This kind of processor is very useful for modeling the queues. bufferProcessor :: (Stream a -> Process ()) -- ^ a separate process to consume the input -> Stream b -- ^ the resulting stream of data -> Processor a b bufferProcessor :: (Stream a -> Process ()) -> Stream b -> Processor a b bufferProcessor Stream a -> Process () consume Stream b output = (Stream a -> Stream b) -> Processor a b forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream b) -> Processor a b) -> (Stream a -> Stream b) -> Processor a b forall a b. (a -> b) -> a -> b $ \Stream a xs -> Process (b, Stream b) -> Stream b forall a. Process (a, Stream a) -> Stream a Cons (Process (b, Stream b) -> Stream b) -> Process (b, Stream b) -> Stream b forall a b. (a -> b) -> a -> b $ do Process () -> Process () spawnProcess (Stream a -> Process () consume Stream a xs) Stream b -> Process (b, Stream b) forall a. Stream a -> Process (a, Stream a) runStream Stream b output -- | Like 'bufferProcessor' but allows creating a loop when some items -- can be processed repeatedly. It is very useful for modeling the processors -- with queues and loop-backs. bufferProcessorLoop :: (Stream a -> Stream c -> Process ()) -- ^ consume two streams: the input values of type @a@ -- and the values of type @c@ returned by the loop -> Stream d -- ^ the stream of data that may become results -> Processor d (Either e b) -- ^ process and then decide what values of type @e@ -- should be processed in the loop (this is a condition) -> Processor e c -- ^ process in the loop and then return a value -- of type @c@ to the input again (this is a loop body) -> Processor a b bufferProcessorLoop :: (Stream a -> Stream c -> Process ()) -> Stream d -> Processor d (Either e b) -> Processor e c -> Processor a b bufferProcessorLoop Stream a -> Stream c -> Process () consume Stream d preoutput Processor d (Either e b) cond Processor e c body = (Stream a -> Stream b) -> Processor a b forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream b) -> Processor a b) -> (Stream a -> Stream b) -> Processor a b forall a b. (a -> b) -> a -> b $ \Stream a xs -> Process (b, Stream b) -> Stream b forall a. Process (a, Stream a) -> Stream a Cons (Process (b, Stream b) -> Stream b) -> Process (b, Stream b) -> Stream b forall a b. (a -> b) -> a -> b $ do (Stream e reverted, Stream b output) <- Simulation (Stream e, Stream b) -> Process (Stream e, Stream b) forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a liftSimulation (Simulation (Stream e, Stream b) -> Process (Stream e, Stream b)) -> Simulation (Stream e, Stream b) -> Process (Stream e, Stream b) forall a b. (a -> b) -> a -> b $ Stream (Either e b) -> Simulation (Stream e, Stream b) forall a b. Stream (Either a b) -> Simulation (Stream a, Stream b) partitionEitherStream (Stream (Either e b) -> Simulation (Stream e, Stream b)) -> Stream (Either e b) -> Simulation (Stream e, Stream b) forall a b. (a -> b) -> a -> b $ Processor d (Either e b) -> Stream d -> Stream (Either e b) forall a b. Processor a b -> Stream a -> Stream b runProcessor Processor d (Either e b) cond Stream d preoutput Process () -> Process () spawnProcess (Stream a -> Stream c -> Process () consume Stream a xs (Stream c -> Process ()) -> Stream c -> Process () forall a b. (a -> b) -> a -> b $ Processor e c -> Stream e -> Stream c forall a b. Processor a b -> Stream a -> Stream b runProcessor Processor e c body Stream e reverted) Stream b -> Process (b, Stream b) forall a. Stream a -> Process (a, Stream a) runStream Stream b output -- | Return a processor with help of which we can model the queue. -- -- Although the function doesn't refer to the queue directly, its main use case -- is namely a processing of the queue. The first argument should be the enqueueing -- operation, while the second argument should be the opposite dequeueing operation. -- -- The reason is as follows. There are many possible combinations how the queues -- can be modeled. There is no sense to enumerate all them creating a separate function -- for each case. We can just use combinators to define exactly what we need. -- -- So, the queue can lose the input items if the queue is full, or the input process -- can suspend while the queue is full, or we can use priorities for enqueueing, -- storing and dequeueing the items in different combinations. There are so many use -- cases! -- -- There is a hope that this function along with other similar functions from this -- module is sufficient to cover the most important cases. Even if it is not sufficient -- then you can use a more generic function 'bufferProcessor' which this function is -- based on. In case of need, you can even write your own function from scratch. It is -- quite easy actually. queueProcessor :: (a -> Process ()) -- ^ enqueue the input item and wait -- while the queue is full if required -- so that there were no hanging items -> Process b -- ^ dequeue an output item -> Processor a b -- ^ the buffering processor queueProcessor :: (a -> Process ()) -> Process b -> Processor a b queueProcessor a -> Process () enqueue Process b dequeue = (Stream a -> Process ()) -> Stream b -> Processor a b forall a b. (Stream a -> Process ()) -> Stream b -> Processor a b bufferProcessor ((a -> Process ()) -> Stream a -> Process () forall a. (a -> Process ()) -> Stream a -> Process () consumeStream a -> Process () enqueue) (Process b -> Stream b forall a. Process a -> Stream a repeatProcess Process b dequeue) -- | Like 'queueProcessor' creates a queue processor but with a loop when some items -- can be processed and then added to the queue again. Also it allows specifying -- how two input streams of data can be merged. queueProcessorLoopMerging :: (Stream a -> Stream d -> Stream e) -- ^ merge two streams: the input values of type @a@ -- and the values of type @d@ returned by the loop -> (e -> Process ()) -- ^ enqueue the input item and wait -- while the queue is full if required -- so that there were no hanging items -> Process c -- ^ dequeue an item for the further processing -> Processor c (Either f b) -- ^ process and then decide what values of type @f@ -- should be processed in the loop (this is a condition) -> Processor f d -- ^ process in the loop and then return a value -- of type @d@ to the queue again (this is a loop body) -> Processor a b -- ^ the buffering processor queueProcessorLoopMerging :: (Stream a -> Stream d -> Stream e) -> (e -> Process ()) -> Process c -> Processor c (Either f b) -> Processor f d -> Processor a b queueProcessorLoopMerging Stream a -> Stream d -> Stream e merge e -> Process () enqueue Process c dequeue = (Stream a -> Stream d -> Process ()) -> Stream c -> Processor c (Either f b) -> Processor f d -> Processor a b forall a c d e b. (Stream a -> Stream c -> Process ()) -> Stream d -> Processor d (Either e b) -> Processor e c -> Processor a b bufferProcessorLoop (\Stream a bs Stream d cs -> (e -> Process ()) -> Stream e -> Process () forall a. (a -> Process ()) -> Stream a -> Process () consumeStream e -> Process () enqueue (Stream e -> Process ()) -> Stream e -> Process () forall a b. (a -> b) -> a -> b $ Stream a -> Stream d -> Stream e merge Stream a bs Stream d cs) (Process c -> Stream c forall a. Process a -> Stream a repeatProcess Process c dequeue) -- | Like 'queueProcessorLoopMerging' creates a queue processor with a loop when -- some items can be processed and then added to the queue again. Only it sequentially -- merges two input streams of data: one stream that come from the external source and -- another stream of data returned by the loop. The first stream has a priority over -- the second one. queueProcessorLoopSeq :: (a -> Process ()) -- ^ enqueue the input item and wait -- while the queue is full if required -- so that there were no hanging items -> Process c -- ^ dequeue an item for the further processing -> Processor c (Either e b) -- ^ process and then decide what values of type @e@ -- should be processed in the loop (this is a condition) -> Processor e a -- ^ process in the loop and then return a value -- of type @a@ to the queue again (this is a loop body) -> Processor a b -- ^ the buffering processor queueProcessorLoopSeq :: (a -> Process ()) -> Process c -> Processor c (Either e b) -> Processor e a -> Processor a b queueProcessorLoopSeq = (Stream a -> Stream a -> Stream a) -> (a -> Process ()) -> Process c -> Processor c (Either e b) -> Processor e a -> Processor a b forall a d e c f b. (Stream a -> Stream d -> Stream e) -> (e -> Process ()) -> Process c -> Processor c (Either f b) -> Processor f d -> Processor a b queueProcessorLoopMerging Stream a -> Stream a -> Stream a forall a. Stream a -> Stream a -> Stream a mergeStreams -- | Like 'queueProcessorLoopMerging' creates a queue processor with a loop when -- some items can be processed and then added to the queue again. Only it runs two -- simultaneous processes to enqueue the input streams of data: one stream that come -- from the external source and another stream of data returned by the loop. queueProcessorLoopParallel :: (a -> Process ()) -- ^ enqueue the input item and wait -- while the queue is full if required -- so that there were no hanging items -> Process c -- ^ dequeue an item for the further processing -> Processor c (Either e b) -- ^ process and then decide what values of type @e@ -- should be processed in the loop (this is a condition) -> Processor e a -- ^ process in the loop and then return a value -- of type @a@ to the queue again (this is a loop body) -> Processor a b -- ^ the buffering processor queueProcessorLoopParallel :: (a -> Process ()) -> Process c -> Processor c (Either e b) -> Processor e a -> Processor a b queueProcessorLoopParallel a -> Process () enqueue Process c dequeue = (Stream a -> Stream a -> Process ()) -> Stream c -> Processor c (Either e b) -> Processor e a -> Processor a b forall a c d e b. (Stream a -> Stream c -> Process ()) -> Stream d -> Processor d (Either e b) -> Processor e c -> Processor a b bufferProcessorLoop (\Stream a bs Stream a cs -> do Process () -> Process () spawnProcess (Process () -> Process ()) -> Process () -> Process () forall a b. (a -> b) -> a -> b $ (a -> Process ()) -> Stream a -> Process () forall a. (a -> Process ()) -> Stream a -> Process () consumeStream a -> Process () enqueue Stream a bs Process () -> Process () spawnProcess (Process () -> Process ()) -> Process () -> Process () forall a b. (a -> b) -> a -> b $ (a -> Process ()) -> Stream a -> Process () forall a. (a -> Process ()) -> Stream a -> Process () consumeStream a -> Process () enqueue Stream a cs) (Process c -> Stream c forall a. Process a -> Stream a repeatProcess Process c dequeue) -- | This is a prefetch processor that requests for one more data item from -- the input in advance while the latest item is not yet fully processed in -- the chain of streams, usually by other processors. -- -- You can think of this as the prefetched processor could place its latest -- data item in some temporary space for later use, which is very useful -- for modeling a sequence of separate and independent work places. prefetchProcessor :: Processor a a prefetchProcessor :: Processor a a prefetchProcessor = (Stream a -> Stream a) -> Processor a a forall a b. (Stream a -> Stream b) -> Processor a b Processor Stream a -> Stream a forall a. Stream a -> Stream a prefetchStream -- | Convert the specified signal transform, i.e. the channel, to a processor. -- -- The processor may return data with delay as the values are requested by demand. -- Consider using the 'arrivalSignal' function to provide with the information -- about the time points at which the signal was actually triggered. -- -- The point is that the 'Stream' used in the 'Processor' is requested outside, -- while the 'Signal' used in the 'Channel' is triggered inside. They are different by nature. -- The former is passive, while the latter is active. -- -- The resulting processor may be a root of space leak as it uses an internal queue to store -- the values received from the input signal. Consider using 'queuedChannelProcessor' that -- allows specifying the bounded queue in case of need. channelProcessor :: Channel a b -> Processor a b channelProcessor :: Channel a b -> Processor a b channelProcessor Channel a b f = (Stream a -> Stream b) -> Processor a b forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream b) -> Processor a b) -> (Stream a -> Stream b) -> Processor a b forall a b. (a -> b) -> a -> b $ \Stream a xs -> Process (b, Stream b) -> Stream b forall a. Process (a, Stream a) -> Stream a Cons (Process (b, Stream b) -> Stream b) -> Process (b, Stream b) -> Stream b forall a b. (a -> b) -> a -> b $ do let composite :: Composite (Stream b) composite = do Signal a sa <- Stream a -> Composite (Signal a) forall a. Stream a -> Composite (Signal a) streamSignal Stream a xs Signal b sb <- Channel a b -> Signal a -> Composite (Signal b) forall a b. Channel a b -> Signal a -> Composite (Signal b) runChannel Channel a b f Signal a sa Signal b -> Composite (Stream b) forall a. Signal a -> Composite (Stream a) signalStream Signal b sb (Stream b ys, DisposableEvent h) <- Event (Stream b, DisposableEvent) -> Process (Stream b, DisposableEvent) forall (m :: * -> *) a. EventLift m => Event a -> m a liftEvent (Event (Stream b, DisposableEvent) -> Process (Stream b, DisposableEvent)) -> Event (Stream b, DisposableEvent) -> Process (Stream b, DisposableEvent) forall a b. (a -> b) -> a -> b $ Composite (Stream b) -> DisposableEvent -> Event (Stream b, DisposableEvent) forall a. Composite a -> DisposableEvent -> Event (a, DisposableEvent) runComposite Composite (Stream b) composite DisposableEvent forall a. Monoid a => a mempty Event () -> Process () whenCancellingProcess (Event () -> Process ()) -> Event () -> Process () forall a b. (a -> b) -> a -> b $ DisposableEvent -> Event () disposeEvent DisposableEvent h Stream b -> Process (b, Stream b) forall a. Stream a -> Process (a, Stream a) runStream Stream b ys -- | Convert the specified processor to a signal transform, i.e. the channel. -- -- The processor may return data with delay as the values are requested by demand. -- Consider using the 'arrivalSignal' function to provide with the information -- about the time points at which the signal was actually triggered. -- -- The point is that the 'Stream' used in the 'Processor' is requested outside, -- while the 'Signal' used in the 'Channel' is triggered inside. They are different by nature. -- The former is passive, while the latter is active. -- -- The resulting channel may be a root of space leak as it uses an internal queue to store -- the values received from the input stream. Consider using 'queuedProcessorChannel' that -- allows specifying the bounded queue in case of need. processorChannel :: Processor a b -> Channel a b processorChannel :: Processor a b -> Channel a b processorChannel (Processor Stream a -> Stream b f) = (Signal a -> Composite (Signal b)) -> Channel a b forall a b. (Signal a -> Composite (Signal b)) -> Channel a b Channel ((Signal a -> Composite (Signal b)) -> Channel a b) -> (Signal a -> Composite (Signal b)) -> Channel a b forall a b. (a -> b) -> a -> b $ \Signal a sa -> do Stream a xs <- Signal a -> Composite (Stream a) forall a. Signal a -> Composite (Stream a) signalStream Signal a sa let ys :: Stream b ys = Stream a -> Stream b f Stream a xs Stream b -> Composite (Signal b) forall a. Stream a -> Composite (Signal a) streamSignal Stream b ys -- | Like 'channelProcessor' but allows specifying an arbitrary queue for storing the signal values, -- for example, the bounded queue. queuedChannelProcessor :: (b -> Event ()) -- ^ enqueue -> Process b -- ^ dequeue -> Channel a b -- ^ the channel -> Processor a b -- ^ the processor queuedChannelProcessor :: (b -> Event ()) -> Process b -> Channel a b -> Processor a b queuedChannelProcessor b -> Event () enqueue Process b dequeue Channel a b f = (Stream a -> Stream b) -> Processor a b forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream b) -> Processor a b) -> (Stream a -> Stream b) -> Processor a b forall a b. (a -> b) -> a -> b $ \Stream a xs -> Process (b, Stream b) -> Stream b forall a. Process (a, Stream a) -> Stream a Cons (Process (b, Stream b) -> Stream b) -> Process (b, Stream b) -> Stream b forall a b. (a -> b) -> a -> b $ do let composite :: Composite (Stream b) composite = do Signal a sa <- Stream a -> Composite (Signal a) forall a. Stream a -> Composite (Signal a) streamSignal Stream a xs Signal b sb <- Channel a b -> Signal a -> Composite (Signal b) forall a b. Channel a b -> Signal a -> Composite (Signal b) runChannel Channel a b f Signal a sa (b -> Event ()) -> Process b -> Signal b -> Composite (Stream b) forall a. (a -> Event ()) -> Process a -> Signal a -> Composite (Stream a) queuedSignalStream b -> Event () enqueue Process b dequeue Signal b sb (Stream b ys, DisposableEvent h) <- Event (Stream b, DisposableEvent) -> Process (Stream b, DisposableEvent) forall (m :: * -> *) a. EventLift m => Event a -> m a liftEvent (Event (Stream b, DisposableEvent) -> Process (Stream b, DisposableEvent)) -> Event (Stream b, DisposableEvent) -> Process (Stream b, DisposableEvent) forall a b. (a -> b) -> a -> b $ Composite (Stream b) -> DisposableEvent -> Event (Stream b, DisposableEvent) forall a. Composite a -> DisposableEvent -> Event (a, DisposableEvent) runComposite Composite (Stream b) composite DisposableEvent forall a. Monoid a => a mempty Event () -> Process () whenCancellingProcess (Event () -> Process ()) -> Event () -> Process () forall a b. (a -> b) -> a -> b $ DisposableEvent -> Event () disposeEvent DisposableEvent h Stream b -> Process (b, Stream b) forall a. Stream a -> Process (a, Stream a) runStream Stream b ys -- | Like 'processorChannel' but allows specifying an arbitrary queue for storing the signal values, -- for example, the bounded queue. queuedProcessorChannel :: (a -> Event ()) -- ^ enqueue -> (Process a) -- ^ dequeue -> Processor a b -- ^ the processor -> Channel a b -- ^ the channel queuedProcessorChannel :: (a -> Event ()) -> Process a -> Processor a b -> Channel a b queuedProcessorChannel a -> Event () enqueue Process a dequeue (Processor Stream a -> Stream b f) = (Signal a -> Composite (Signal b)) -> Channel a b forall a b. (Signal a -> Composite (Signal b)) -> Channel a b Channel ((Signal a -> Composite (Signal b)) -> Channel a b) -> (Signal a -> Composite (Signal b)) -> Channel a b forall a b. (a -> b) -> a -> b $ \Signal a sa -> do Stream a xs <- (a -> Event ()) -> Process a -> Signal a -> Composite (Stream a) forall a. (a -> Event ()) -> Process a -> Signal a -> Composite (Stream a) queuedSignalStream a -> Event () enqueue Process a dequeue Signal a sa let ys :: Stream b ys = Stream a -> Stream b f Stream a xs Stream b -> Composite (Signal b) forall a. Stream a -> Composite (Signal a) streamSignal Stream b ys -- | A processor that adds the information about the time points at which -- the original stream items were received by demand. arrivalProcessor :: Processor a (Arrival a) arrivalProcessor :: Processor a (Arrival a) arrivalProcessor = (Stream a -> Stream (Arrival a)) -> Processor a (Arrival a) forall a b. (Stream a -> Stream b) -> Processor a b Processor Stream a -> Stream (Arrival a) forall a. Stream a -> Stream (Arrival a) arrivalStream -- | A processor that delays the input stream by one step using the specified initial value. delayProcessor :: a -> Processor a a delayProcessor :: a -> Processor a a delayProcessor a a0 = (Stream a -> Stream a) -> Processor a a forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream a) -> Processor a a) -> (Stream a -> Stream a) -> Processor a a forall a b. (a -> b) -> a -> b $ a -> Stream a -> Stream a forall a. a -> Stream a -> Stream a delayStream a a0 -- | Removes one level of the computation, projecting its bound processor into the outer level. joinProcessor :: Process (Processor a b) -> Processor a b joinProcessor :: Process (Processor a b) -> Processor a b joinProcessor Process (Processor a b) m = (Stream a -> Stream b) -> Processor a b forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream b) -> Processor a b) -> (Stream a -> Stream b) -> Processor a b forall a b. (a -> b) -> a -> b $ \Stream a xs -> Process (b, Stream b) -> Stream b forall a. Process (a, Stream a) -> Stream a Cons (Process (b, Stream b) -> Stream b) -> Process (b, Stream b) -> Stream b forall a b. (a -> b) -> a -> b $ do Processor Stream a -> Stream b f <- Process (Processor a b) m Stream b -> Process (b, Stream b) forall a. Stream a -> Process (a, Stream a) runStream (Stream b -> Process (b, Stream b)) -> Stream b -> Process (b, Stream b) forall a b. (a -> b) -> a -> b $ Stream a -> Stream b f Stream a xs -- | Takes the next processor from the list after the current processor fails because of cancelling the underlying process. failoverProcessor :: [Processor a b] -> Processor a b failoverProcessor :: [Processor a b] -> Processor a b failoverProcessor [Processor a b] ps = (Stream a -> Stream b) -> Processor a b forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream b) -> Processor a b) -> (Stream a -> Stream b) -> Processor a b forall a b. (a -> b) -> a -> b $ \Stream a xs -> [Stream b] -> Stream b forall a. [Stream a] -> Stream a failoverStream [Processor a b -> Stream a -> Stream b forall a b. Processor a b -> Stream a -> Stream b runProcessor Processor a b p Stream a xs | Processor a b p <- [Processor a b] ps] -- | Show the debug messages with the current simulation time. traceProcessor :: Maybe String -- ^ the request message -> Maybe String -- ^ the response message -> Processor a b -- ^ a processor -> Processor a b traceProcessor :: Maybe String -> Maybe String -> Processor a b -> Processor a b traceProcessor Maybe String request Maybe String response (Processor Stream a -> Stream b f) = (Stream a -> Stream b) -> Processor a b forall a b. (Stream a -> Stream b) -> Processor a b Processor ((Stream a -> Stream b) -> Processor a b) -> (Stream a -> Stream b) -> Processor a b forall a b. (a -> b) -> a -> b $ Maybe String -> Maybe String -> Stream b -> Stream b forall a. Maybe String -> Maybe String -> Stream a -> Stream a traceStream Maybe String request Maybe String response (Stream b -> Stream b) -> (Stream a -> Stream b) -> Stream a -> Stream b forall b c a. (b -> c) -> (a -> b) -> a -> c . Stream a -> Stream b f