-- |
-- Module     : Simulation.Aivika.Processor
-- Copyright  : Copyright (c) 2009-2017, David Sorokin <david.sorokin@gmail.com>
-- License    : BSD3
-- Maintainer : David Sorokin <david.sorokin@gmail.com>
-- Stability  : experimental
-- Tested with: GHC 8.0.1
--
-- The processor of simulation data.
--
module Simulation.Aivika.Processor
       (-- * Processor Type
        Processor(..),
        -- * Processor Primitives
        emptyProcessor,
        arrProcessor,
        accumProcessor,
        withinProcessor,
        -- * Specifying Identifier
        processorUsingId,
        -- * Prefetch and Delay Processors
        prefetchProcessor,
        delayProcessor,
        -- * Buffer Processor
        bufferProcessor,
        bufferProcessorLoop,
        -- * Processing Queues
        queueProcessor,
        queueProcessorLoopMerging,
        queueProcessorLoopSeq,
        queueProcessorLoopParallel,
        -- * Sequencing Processors
        processorSeq,
        -- * Parallelizing Processors
        processorParallel,
        processorQueuedParallel,
        processorPrioritisingOutputParallel,
        processorPrioritisingInputParallel,
        processorPrioritisingInputOutputParallel,
        -- * Arrival Processor
        arrivalProcessor,
        -- * Utilities
        joinProcessor,
        -- * Failover
        failoverProcessor,
        -- * Integrating with Signals and Channels
        channelProcessor,
        processorChannel,
        queuedChannelProcessor,
        queuedProcessorChannel,
        -- * Debugging
        traceProcessor) where

import qualified Control.Category as C
import Control.Arrow

import Data.Monoid

import Simulation.Aivika.Simulation
import Simulation.Aivika.Dynamics
import Simulation.Aivika.Event
import Simulation.Aivika.Composite
import Simulation.Aivika.Cont
import Simulation.Aivika.Process
import Simulation.Aivika.Stream
import Simulation.Aivika.QueueStrategy
import Simulation.Aivika.Signal
import Simulation.Aivika.Channel
import Simulation.Aivika.Internal.Arrival

-- | Represents a processor of simulation data.
newtype Processor a b =
  Processor { forall a b. Processor a b -> Stream a -> Stream b
runProcessor :: Stream a -> Stream b
              -- ^ Run the processor.
            }

instance C.Category Processor where

  id :: forall a. Processor a a
id  = forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a. a -> a
id

  Processor Stream b -> Stream c
x . :: forall b c a. Processor b c -> Processor a b -> Processor a c
. Processor Stream a -> Stream b
y = forall a b. (Stream a -> Stream b) -> Processor a b
Processor (Stream b -> Stream c
x forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream a -> Stream b
y)

-- The implementation is based on article
-- A New Notation for Arrows by Ross Paterson,
-- although my streams are different and they
-- already depend on the Process monad,
-- while the pure streams were considered in the
-- mentioned article.
  
instance Arrow Processor where

  arr :: forall b c. (b -> c) -> Processor b c
arr = forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a -> b) -> Stream a -> Stream b
mapStream

  first :: forall b c d. Processor b c -> Processor (b, d) (c, d)
first (Processor Stream b -> Stream c
f) =
    forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream (b, d)
xys ->
    forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
    do (Stream b
xs, Stream d
ys) <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall a b. Stream (a, b) -> Simulation (Stream a, Stream b)
unzipStream Stream (b, d)
xys
       forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamParallel (Stream b -> Stream c
f Stream b
xs) Stream d
ys

  second :: forall b c d. Processor b c -> Processor (d, b) (d, c)
second (Processor Stream b -> Stream c
f) =
    forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream (d, b)
xys ->
    forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
    do (Stream d
xs, Stream b
ys) <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall a b. Stream (a, b) -> Simulation (Stream a, Stream b)
unzipStream Stream (d, b)
xys
       forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamParallel Stream d
xs (Stream b -> Stream c
f Stream b
ys)

  Processor Stream b -> Stream c
f *** :: forall b c b' c'.
Processor b c -> Processor b' c' -> Processor (b, b') (c, c')
*** Processor Stream b' -> Stream c'
g =
    forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream (b, b')
xys ->
    forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
    do (Stream b
xs, Stream b'
ys) <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall a b. Stream (a, b) -> Simulation (Stream a, Stream b)
unzipStream Stream (b, b')
xys
       forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamParallel (Stream b -> Stream c
f Stream b
xs) (Stream b' -> Stream c'
g Stream b'
ys)

  Processor Stream b -> Stream c
f &&& :: forall b c c'.
Processor b c -> Processor b c' -> Processor b (c, c')
&&& Processor Stream b -> Stream c'
g =
    forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream b
xs -> forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamParallel (Stream b -> Stream c
f Stream b
xs) (Stream b -> Stream c'
g Stream b
xs)

instance ArrowChoice Processor where

  left :: forall b c d. Processor b c -> Processor (Either b d) (Either c d)
left (Processor Stream b -> Stream c
f) =
    forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream (Either b d)
xs ->
    forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
    do Stream (Either b d)
ys <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall a. Stream a -> Simulation (Stream a)
memoStream Stream (Either b d)
xs
       forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream Stream (Either b d)
ys (Stream b -> Stream c
f forall a b. (a -> b) -> a -> b
$ forall a b. Stream (Either a b) -> Stream a
leftStream Stream (Either b d)
ys)

  right :: forall b c d. Processor b c -> Processor (Either d b) (Either d c)
right (Processor Stream b -> Stream c
f) =
    forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream (Either d b)
xs ->
    forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
    do Stream (Either d b)
ys <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall a. Stream a -> Simulation (Stream a)
memoStream Stream (Either d b)
xs
       forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream Stream (Either d b)
ys (Stream b -> Stream c
f forall a b. (a -> b) -> a -> b
$ forall a b. Stream (Either a b) -> Stream b
rightStream Stream (Either d b)
ys)

instance ArrowZero Processor where

  zeroArrow :: forall b c. Processor b c
zeroArrow = forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> a
const forall a. Stream a
emptyStream

instance ArrowPlus Processor where

  (Processor Stream b -> Stream c
f) <+> :: forall b c. Processor b c -> Processor b c -> Processor b c
<+> (Processor Stream b -> Stream c
g) =
    forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream b
xs ->
    forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
    do [Stream b
xs1, Stream b
xs2] <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall a. Int -> Stream a -> Simulation [Stream a]
splitStream Int
2 Stream b
xs
       forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a. Stream a -> Stream a -> Stream a
mergeStreams (Stream b -> Stream c
f Stream b
xs1) (Stream b -> Stream c
g Stream b
xs2)

-- | A processor that never finishes its work producing an 'emptyStream'.
emptyProcessor :: Processor a b
emptyProcessor :: forall b c. Processor b c
emptyProcessor = forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> a
const forall a. Stream a
emptyStream

-- | Create a simple processor by the specified handling function
-- that runs the discontinuous process for each input value to get the output.
arrProcessor :: (a -> Process b) -> Processor a b
arrProcessor :: forall a b. (a -> Process b) -> Processor a b
arrProcessor = forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a -> Process b) -> Stream a -> Stream b
mapStreamM

-- | Accumulator that outputs a value determined by the supplied function.
accumProcessor :: (acc -> a -> Process (acc, b)) -> acc -> Processor a b
accumProcessor :: forall acc a b.
(acc -> a -> Process (acc, b)) -> acc -> Processor a b
accumProcessor acc -> a -> Process (acc, b)
f acc
acc = forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ forall acc a b.
(acc -> a -> Process (acc, b)) -> acc -> Stream a -> Stream b
accumStream acc -> a -> Process (acc, b)
f acc
acc

-- | Involve the computation with side effect when processing a stream of data.
withinProcessor :: Process () -> Processor a a
withinProcessor :: forall a. Process () -> Processor a a
withinProcessor Process ()
m =
  forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$
  forall a b. (a -> Process b) -> Stream a -> Stream b
mapStreamM forall a b. (a -> b) -> a -> b
$ \a
a ->
  do { Process ()
m; forall (m :: * -> *) a. Monad m => a -> m a
return a
a }

-- | Create a processor that will use the specified process identifier.
-- It can be useful to refer to the underlying 'Process' computation which
-- can be passivated, interrupted, canceled and so on. See also the
-- 'processUsingId' function for more details.
processorUsingId :: ProcessId -> Processor a b -> Processor a b
processorUsingId :: forall a b. ProcessId -> Processor a b -> Processor a b
processorUsingId ProcessId
pid (Processor Stream a -> Stream b
f) =
  forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ forall a. Process (a, Stream a) -> Stream a
Cons forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. ProcessId -> Process a -> Process a
processUsingId ProcessId
pid forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Stream a -> Process (a, Stream a)
runStream forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream a -> Stream b
f

-- | Launches the specified processors in parallel consuming the same input
-- stream and producing a combined output stream.
--
-- If you don't know what the enqueue strategies to apply, then
-- you will probably need 'FCFS' for the both parameters, or
-- function 'processorParallel' that does namely this.
processorQueuedParallel :: (EnqueueStrategy si,
                            EnqueueStrategy so)
                           => si
                           -- ^ the strategy applied for enqueuing the input data
                           -> so
                           -- ^ the strategy applied for enqueuing the output data
                           -> [Processor a b]
                           -- ^ the processors to parallelize
                           -> Processor a b
                           -- ^ the parallelized processor
processorQueuedParallel :: forall si so a b.
(EnqueueStrategy si, EnqueueStrategy so) =>
si -> so -> [Processor a b] -> Processor a b
processorQueuedParallel si
si so
so [Processor a b]
ps =
  forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
  forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
  do let n :: Int
n = forall (t :: * -> *) a. Foldable t => t a -> Int
length [Processor a b]
ps
     [Stream a]
input <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s a.
EnqueueStrategy s =>
s -> Int -> Stream a -> Simulation [Stream a]
splitStreamQueueing si
si Int
n Stream a
xs
     let results :: [Stream b]
results = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall a b. (a -> b) -> [a] -> [b]
map (forall a b. [a] -> [b] -> [(a, b)]
zip [Stream a]
input [Processor a b]
ps) forall a b. (a -> b) -> a -> b
$ \(Stream a
input, Processor a b
p) ->
           forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor a b
p Stream a
input
         output :: Stream b
output  = forall s a. EnqueueStrategy s => s -> [Stream a] -> Stream a
concatQueuedStreams so
so [Stream b]
results
     forall a. Stream a -> Process (a, Stream a)
runStream Stream b
output

-- | Launches the specified processors in parallel using priorities for combining the output.
processorPrioritisingOutputParallel :: (EnqueueStrategy si,
                                        PriorityQueueStrategy so po)
                                       => si
                                       -- ^ the strategy applied for enqueuing the input data
                                       -> so
                                       -- ^ the strategy applied for enqueuing the output data
                                       -> [Processor a (po, b)]
                                       -- ^ the processors to parallelize
                                       -> Processor a b
                                       -- ^ the parallelized processor
processorPrioritisingOutputParallel :: forall si so po a b.
(EnqueueStrategy si, PriorityQueueStrategy so po) =>
si -> so -> [Processor a (po, b)] -> Processor a b
processorPrioritisingOutputParallel si
si so
so [Processor a (po, b)]
ps =
  forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
  forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
  do let n :: Int
n = forall (t :: * -> *) a. Foldable t => t a -> Int
length [Processor a (po, b)]
ps
     [Stream a]
input <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s a.
EnqueueStrategy s =>
s -> Int -> Stream a -> Simulation [Stream a]
splitStreamQueueing si
si Int
n Stream a
xs
     let results :: [Stream (po, b)]
results = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall a b. (a -> b) -> [a] -> [b]
map (forall a b. [a] -> [b] -> [(a, b)]
zip [Stream a]
input [Processor a (po, b)]
ps) forall a b. (a -> b) -> a -> b
$ \(Stream a
input, Processor a (po, b)
p) ->
           forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor a (po, b)
p Stream a
input
         output :: Stream b
output  = forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream (p, a)] -> Stream a
concatPriorityStreams so
so [Stream (po, b)]
results
     forall a. Stream a -> Process (a, Stream a)
runStream Stream b
output

-- | Launches the specified processors in parallel using priorities for consuming the intput.
processorPrioritisingInputParallel :: (PriorityQueueStrategy si pi,
                                       EnqueueStrategy so)
                                      => si
                                      -- ^ the strategy applied for enqueuing the input data
                                      -> so
                                      -- ^ the strategy applied for enqueuing the output data
                                      -> [(Stream pi, Processor a b)]
                                      -- ^ the streams of input priorities and the processors
                                      -- to parallelize
                                      -> Processor a b
                                      -- ^ the parallelized processor
processorPrioritisingInputParallel :: forall si pi so a b.
(PriorityQueueStrategy si pi, EnqueueStrategy so) =>
si -> so -> [(Stream pi, Processor a b)] -> Processor a b
processorPrioritisingInputParallel si
si so
so [(Stream pi, Processor a b)]
ps =
  forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
  forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
  do [Stream a]
input <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream p] -> Stream a -> Simulation [Stream a]
splitStreamPrioritising si
si (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst [(Stream pi, Processor a b)]
ps) Stream a
xs
     let results :: [Stream b]
results = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall a b. (a -> b) -> [a] -> [b]
map (forall a b. [a] -> [b] -> [(a, b)]
zip [Stream a]
input [(Stream pi, Processor a b)]
ps) forall a b. (a -> b) -> a -> b
$ \(Stream a
input, (Stream pi
_, Processor a b
p)) ->
           forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor a b
p Stream a
input
         output :: Stream b
output  = forall s a. EnqueueStrategy s => s -> [Stream a] -> Stream a
concatQueuedStreams so
so [Stream b]
results
     forall a. Stream a -> Process (a, Stream a)
runStream Stream b
output

-- | Launches the specified processors in parallel using priorities for consuming
-- the input and combining the output.
processorPrioritisingInputOutputParallel :: (PriorityQueueStrategy si pi,
                                             PriorityQueueStrategy so po)
                                            => si
                                            -- ^ the strategy applied for enqueuing the input data
                                            -> so
                                            -- ^ the strategy applied for enqueuing the output data
                                            -> [(Stream pi, Processor a (po, b))]
                                            -- ^ the streams of input priorities and the processors
                                            -- to parallelize
                                            -> Processor a b
                                            -- ^ the parallelized processor
processorPrioritisingInputOutputParallel :: forall si pi so po a b.
(PriorityQueueStrategy si pi, PriorityQueueStrategy so po) =>
si -> so -> [(Stream pi, Processor a (po, b))] -> Processor a b
processorPrioritisingInputOutputParallel si
si so
so [(Stream pi, Processor a (po, b))]
ps =
  forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
  forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
  do [Stream a]
input <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream p] -> Stream a -> Simulation [Stream a]
splitStreamPrioritising si
si (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst [(Stream pi, Processor a (po, b))]
ps) Stream a
xs
     let results :: [Stream (po, b)]
results = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall a b. (a -> b) -> [a] -> [b]
map (forall a b. [a] -> [b] -> [(a, b)]
zip [Stream a]
input [(Stream pi, Processor a (po, b))]
ps) forall a b. (a -> b) -> a -> b
$ \(Stream a
input, (Stream pi
_, Processor a (po, b)
p)) ->
           forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor a (po, b)
p Stream a
input
         output :: Stream b
output  = forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream (p, a)] -> Stream a
concatPriorityStreams so
so [Stream (po, b)]
results
     forall a. Stream a -> Process (a, Stream a)
runStream Stream b
output

-- | Launches the processors in parallel consuming the same input stream and producing
-- a combined output stream. This version applies the 'FCFS' strategy both for input
-- and output, which suits the most part of uses cases.
processorParallel :: [Processor a b] -> Processor a b
processorParallel :: forall a b. [Processor a b] -> Processor a b
processorParallel = forall si so a b.
(EnqueueStrategy si, EnqueueStrategy so) =>
si -> so -> [Processor a b] -> Processor a b
processorQueuedParallel FCFS
FCFS FCFS
FCFS

-- | Launches the processors sequentially using the 'prefetchProcessor' between them
-- to model an autonomous work of each of the processors specified.
processorSeq :: [Processor a a] -> Processor a a
processorSeq :: forall a. [Processor a a] -> Processor a a
processorSeq []  = forall b c. Processor b c
emptyProcessor
processorSeq [Processor a a
p] = Processor a a
p
processorSeq (Processor a a
p : [Processor a a]
ps) = Processor a a
p forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall a. Processor a a
prefetchProcessor forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall a. [Processor a a] -> Processor a a
processorSeq [Processor a a]
ps

-- | Create a buffer processor, where the process from the first argument
-- consumes the input stream but the stream passed in as the second argument
-- and produced usually by some other process is returned as an output.
-- This kind of processor is very useful for modeling the queues.
bufferProcessor :: (Stream a -> Process ())
                   -- ^ a separate process to consume the input 
                   -> Stream b
                   -- ^ the resulting stream of data
                   -> Processor a b
bufferProcessor :: forall a b. (Stream a -> Process ()) -> Stream b -> Processor a b
bufferProcessor Stream a -> Process ()
consume Stream b
output =
  forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
  forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
  do Process () -> Process ()
spawnProcess (Stream a -> Process ()
consume Stream a
xs)
     forall a. Stream a -> Process (a, Stream a)
runStream Stream b
output

-- | Like 'bufferProcessor' but allows creating a loop when some items
-- can be processed repeatedly. It is very useful for modeling the processors 
-- with queues and loop-backs.
bufferProcessorLoop :: (Stream a -> Stream c -> Process ())
                       -- ^ consume two streams: the input values of type @a@
                       -- and the values of type @c@ returned by the loop
                       -> Stream d
                       -- ^ the stream of data that may become results
                       -> Processor d (Either e b)
                       -- ^ process and then decide what values of type @e@
                       -- should be processed in the loop (this is a condition)
                       -> Processor e c
                       -- ^ process in the loop and then return a value
                       -- of type @c@ to the input again (this is a loop body)
                       -> Processor a b
bufferProcessorLoop :: forall a c d e b.
(Stream a -> Stream c -> Process ())
-> Stream d
-> Processor d (Either e b)
-> Processor e c
-> Processor a b
bufferProcessorLoop Stream a -> Stream c -> Process ()
consume Stream d
preoutput Processor d (Either e b)
cond Processor e c
body =
  forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
  forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
  do (Stream e
reverted, Stream b
output) <-
       forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$
       forall a b. Stream (Either a b) -> Simulation (Stream a, Stream b)
partitionEitherStream forall a b. (a -> b) -> a -> b
$
       forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor d (Either e b)
cond Stream d
preoutput
     Process () -> Process ()
spawnProcess 
       (Stream a -> Stream c -> Process ()
consume Stream a
xs forall a b. (a -> b) -> a -> b
$ forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor e c
body Stream e
reverted)
     forall a. Stream a -> Process (a, Stream a)
runStream Stream b
output

-- | Return a processor with help of which we can model the queue.
--
-- Although the function doesn't refer to the queue directly, its main use case
-- is namely a processing of the queue. The first argument should be the enqueueing
-- operation, while the second argument should be the opposite dequeueing operation.
--
-- The reason is as follows. There are many possible combinations how the queues
-- can be modeled. There is no sense to enumerate all them creating a separate function
-- for each case. We can just use combinators to define exactly what we need.
--
-- So, the queue can lose the input items if the queue is full, or the input process
-- can suspend while the queue is full, or we can use priorities for enqueueing,
-- storing and dequeueing the items in different combinations. There are so many use
-- cases!
--
-- There is a hope that this function along with other similar functions from this
-- module is sufficient to cover the most important cases. Even if it is not sufficient
-- then you can use a more generic function 'bufferProcessor' which this function is
-- based on. In case of need, you can even write your own function from scratch. It is
-- quite easy actually.
queueProcessor :: (a -> Process ())
                  -- ^ enqueue the input item and wait
                  -- while the queue is full if required
                  -- so that there were no hanging items
                  -> Process b
                  -- ^ dequeue an output item
                  -> Processor a b
                  -- ^ the buffering processor
queueProcessor :: forall a b. (a -> Process ()) -> Process b -> Processor a b
queueProcessor a -> Process ()
enqueue Process b
dequeue =
  forall a b. (Stream a -> Process ()) -> Stream b -> Processor a b
bufferProcessor
  (forall a. (a -> Process ()) -> Stream a -> Process ()
consumeStream a -> Process ()
enqueue)
  (forall a. Process a -> Stream a
repeatProcess Process b
dequeue)

-- | Like 'queueProcessor' creates a queue processor but with a loop when some items 
-- can be processed and then added to the queue again. Also it allows specifying 
-- how two input streams of data can be merged.
queueProcessorLoopMerging :: (Stream a -> Stream d -> Stream e)
                             -- ^ merge two streams: the input values of type @a@
                             -- and the values of type @d@ returned by the loop
                             -> (e -> Process ())
                             -- ^ enqueue the input item and wait
                             -- while the queue is full if required
                             -- so that there were no hanging items
                             -> Process c
                             -- ^ dequeue an item for the further processing
                             -> Processor c (Either f b)
                             -- ^ process and then decide what values of type @f@
                             -- should be processed in the loop (this is a condition)
                             -> Processor f d
                             -- ^ process in the loop and then return a value
                             -- of type @d@ to the queue again (this is a loop body)
                             -> Processor a b
                             -- ^ the buffering processor
queueProcessorLoopMerging :: forall a d e c f b.
(Stream a -> Stream d -> Stream e)
-> (e -> Process ())
-> Process c
-> Processor c (Either f b)
-> Processor f d
-> Processor a b
queueProcessorLoopMerging Stream a -> Stream d -> Stream e
merge e -> Process ()
enqueue Process c
dequeue =
  forall a c d e b.
(Stream a -> Stream c -> Process ())
-> Stream d
-> Processor d (Either e b)
-> Processor e c
-> Processor a b
bufferProcessorLoop
  (\Stream a
bs Stream d
cs ->
    forall a. (a -> Process ()) -> Stream a -> Process ()
consumeStream e -> Process ()
enqueue forall a b. (a -> b) -> a -> b
$
    Stream a -> Stream d -> Stream e
merge Stream a
bs Stream d
cs)
  (forall a. Process a -> Stream a
repeatProcess Process c
dequeue)

-- | Like 'queueProcessorLoopMerging' creates a queue processor with a loop when
-- some items can be processed and then added to the queue again. Only it sequentially 
-- merges two input streams of data: one stream that come from the external source and 
-- another stream of data returned by the loop. The first stream has a priority over 
-- the second one.
queueProcessorLoopSeq :: (a -> Process ())
                         -- ^ enqueue the input item and wait
                         -- while the queue is full if required
                         -- so that there were no hanging items
                         -> Process c
                         -- ^ dequeue an item for the further processing
                         -> Processor c (Either e b)
                         -- ^ process and then decide what values of type @e@
                         -- should be processed in the loop (this is a condition)
                         -> Processor e a
                         -- ^ process in the loop and then return a value
                         -- of type @a@ to the queue again (this is a loop body)
                         -> Processor a b
                         -- ^ the buffering processor
queueProcessorLoopSeq :: forall a c e b.
(a -> Process ())
-> Process c
-> Processor c (Either e b)
-> Processor e a
-> Processor a b
queueProcessorLoopSeq =
  forall a d e c f b.
(Stream a -> Stream d -> Stream e)
-> (e -> Process ())
-> Process c
-> Processor c (Either f b)
-> Processor f d
-> Processor a b
queueProcessorLoopMerging forall a. Stream a -> Stream a -> Stream a
mergeStreams

-- | Like 'queueProcessorLoopMerging' creates a queue processor with a loop when
-- some items can be processed and then added to the queue again. Only it runs two 
-- simultaneous processes to enqueue the input streams of data: one stream that come 
-- from the external source and another stream of data returned by the loop.
queueProcessorLoopParallel :: (a -> Process ())
                              -- ^ enqueue the input item and wait
                              -- while the queue is full if required
                              -- so that there were no hanging items
                              -> Process c
                              -- ^ dequeue an item for the further processing
                              -> Processor c (Either e b)
                              -- ^ process and then decide what values of type @e@
                              -- should be processed in the loop (this is a condition)
                              -> Processor e a
                              -- ^ process in the loop and then return a value
                              -- of type @a@ to the queue again (this is a loop body)
                              -> Processor a b
                              -- ^ the buffering processor
queueProcessorLoopParallel :: forall a c e b.
(a -> Process ())
-> Process c
-> Processor c (Either e b)
-> Processor e a
-> Processor a b
queueProcessorLoopParallel a -> Process ()
enqueue Process c
dequeue =
  forall a c d e b.
(Stream a -> Stream c -> Process ())
-> Stream d
-> Processor d (Either e b)
-> Processor e c
-> Processor a b
bufferProcessorLoop
  (\Stream a
bs Stream a
cs ->
    do Process () -> Process ()
spawnProcess forall a b. (a -> b) -> a -> b
$
         forall a. (a -> Process ()) -> Stream a -> Process ()
consumeStream a -> Process ()
enqueue Stream a
bs
       Process () -> Process ()
spawnProcess forall a b. (a -> b) -> a -> b
$
         forall a. (a -> Process ()) -> Stream a -> Process ()
consumeStream a -> Process ()
enqueue Stream a
cs)
  (forall a. Process a -> Stream a
repeatProcess Process c
dequeue)

-- | This is a prefetch processor that requests for one more data item from 
-- the input in advance while the latest item is not yet fully processed in 
-- the chain of streams, usually by other processors.
--
-- You can think of this as the prefetched processor could place its latest 
-- data item in some temporary space for later use, which is very useful 
-- for modeling a sequence of separate and independent work places.
prefetchProcessor :: Processor a a
prefetchProcessor :: forall a. Processor a a
prefetchProcessor = forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a. Stream a -> Stream a
prefetchStream

-- | Convert the specified signal transform, i.e. the channel, to a processor.
--
-- The processor may return data with delay as the values are requested by demand.
-- Consider using the 'arrivalSignal' function to provide with the information
-- about the time points at which the signal was actually triggered.
--
-- The point is that the 'Stream' used in the 'Processor' is requested outside, 
-- while the 'Signal' used in the 'Channel' is triggered inside. They are different by nature. 
-- The former is passive, while the latter is active.
--
-- The resulting processor may be a root of space leak as it uses an internal queue to store
-- the values received from the input signal. Consider using 'queuedChannelProcessor' that
-- allows specifying the bounded queue in case of need.
channelProcessor :: Channel a b -> Processor a b
channelProcessor :: forall a b. Channel a b -> Processor a b
channelProcessor Channel a b
f =
  forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
  forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
  do let composite :: Composite (Stream b)
composite =
           do Signal a
sa <- forall a. Stream a -> Composite (Signal a)
streamSignal Stream a
xs
              Signal b
sb <- forall a b. Channel a b -> Signal a -> Composite (Signal b)
runChannel Channel a b
f Signal a
sa
              forall a. Signal a -> Composite (Stream a)
signalStream Signal b
sb
     (Stream b
ys, DisposableEvent
h) <- forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
                forall a.
Composite a -> DisposableEvent -> Event (a, DisposableEvent)
runComposite Composite (Stream b)
composite forall a. Monoid a => a
mempty
     Event () -> Process ()
whenCancellingProcess forall a b. (a -> b) -> a -> b
$
       DisposableEvent -> Event ()
disposeEvent DisposableEvent
h
     forall a. Stream a -> Process (a, Stream a)
runStream Stream b
ys

-- | Convert the specified processor to a signal transform, i.e. the channel. 
--
-- The processor may return data with delay as the values are requested by demand.
-- Consider using the 'arrivalSignal' function to provide with the information
-- about the time points at which the signal was actually triggered.
--
-- The point is that the 'Stream' used in the 'Processor' is requested outside, 
-- while the 'Signal' used in the 'Channel' is triggered inside. They are different by nature.
-- The former is passive, while the latter is active.
--
-- The resulting channel may be a root of space leak as it uses an internal queue to store
-- the values received from the input stream. Consider using 'queuedProcessorChannel' that
-- allows specifying the bounded queue in case of need.
processorChannel :: Processor a b -> Channel a b
processorChannel :: forall a b. Processor a b -> Channel a b
processorChannel (Processor Stream a -> Stream b
f) =
  forall a b. (Signal a -> Composite (Signal b)) -> Channel a b
Channel forall a b. (a -> b) -> a -> b
$ \Signal a
sa ->
  do Stream a
xs <- forall a. Signal a -> Composite (Stream a)
signalStream Signal a
sa
     let ys :: Stream b
ys = Stream a -> Stream b
f Stream a
xs
     forall a. Stream a -> Composite (Signal a)
streamSignal Stream b
ys

-- | Like 'channelProcessor' but allows specifying an arbitrary queue for storing the signal values,
-- for example, the bounded queue.
queuedChannelProcessor :: (b -> Event ())
                          -- ^ enqueue
                          -> Process b
                          -- ^ dequeue
                          -> Channel a b
                          -- ^ the channel
                          -> Processor a b
                          -- ^ the processor
queuedChannelProcessor :: forall b a.
(b -> Event ()) -> Process b -> Channel a b -> Processor a b
queuedChannelProcessor b -> Event ()
enqueue Process b
dequeue Channel a b
f =
  forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
  forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
  do let composite :: Composite (Stream b)
composite =
           do Signal a
sa <- forall a. Stream a -> Composite (Signal a)
streamSignal Stream a
xs
              Signal b
sb <- forall a b. Channel a b -> Signal a -> Composite (Signal b)
runChannel Channel a b
f Signal a
sa
              forall a.
(a -> Event ()) -> Process a -> Signal a -> Composite (Stream a)
queuedSignalStream b -> Event ()
enqueue Process b
dequeue Signal b
sb
     (Stream b
ys, DisposableEvent
h) <- forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
                forall a.
Composite a -> DisposableEvent -> Event (a, DisposableEvent)
runComposite Composite (Stream b)
composite forall a. Monoid a => a
mempty
     Event () -> Process ()
whenCancellingProcess forall a b. (a -> b) -> a -> b
$
       DisposableEvent -> Event ()
disposeEvent DisposableEvent
h
     forall a. Stream a -> Process (a, Stream a)
runStream Stream b
ys

-- | Like 'processorChannel' but allows specifying an arbitrary queue for storing the signal values,
-- for example, the bounded queue.
queuedProcessorChannel :: (a -> Event ())
                          -- ^ enqueue
                          -> (Process a)
                          -- ^ dequeue
                          -> Processor a b
                          -- ^ the processor
                          -> Channel a b
                          -- ^ the channel
queuedProcessorChannel :: forall a b.
(a -> Event ()) -> Process a -> Processor a b -> Channel a b
queuedProcessorChannel a -> Event ()
enqueue Process a
dequeue (Processor Stream a -> Stream b
f) =
  forall a b. (Signal a -> Composite (Signal b)) -> Channel a b
Channel forall a b. (a -> b) -> a -> b
$ \Signal a
sa ->
  do Stream a
xs <- forall a.
(a -> Event ()) -> Process a -> Signal a -> Composite (Stream a)
queuedSignalStream a -> Event ()
enqueue Process a
dequeue Signal a
sa
     let ys :: Stream b
ys = Stream a -> Stream b
f Stream a
xs
     forall a. Stream a -> Composite (Signal a)
streamSignal Stream b
ys

-- | A processor that adds the information about the time points at which 
-- the original stream items were received by demand.
arrivalProcessor :: Processor a (Arrival a)
arrivalProcessor :: forall a. Processor a (Arrival a)
arrivalProcessor = forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a. Stream a -> Stream (Arrival a)
arrivalStream

-- | A processor that delays the input stream by one step using the specified initial value.
delayProcessor :: a -> Processor a a
delayProcessor :: forall a. a -> Processor a a
delayProcessor a
a0 = forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ forall a. a -> Stream a -> Stream a
delayStream a
a0

-- | Removes one level of the computation, projecting its bound processor into the outer level.
joinProcessor :: Process (Processor a b) -> Processor a b
joinProcessor :: forall a b. Process (Processor a b) -> Processor a b
joinProcessor Process (Processor a b)
m =
  forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream a
xs ->
  forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
  do Processor Stream a -> Stream b
f <- Process (Processor a b)
m
     forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ Stream a -> Stream b
f Stream a
xs

-- | Takes the next processor from the list after the current processor fails because of cancelling the underlying process.
failoverProcessor :: [Processor a b] -> Processor a b
failoverProcessor :: forall a b. [Processor a b] -> Processor a b
failoverProcessor [Processor a b]
ps =
  forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream a
xs -> forall a. [Stream a] -> Stream a
failoverStream [forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor a b
p Stream a
xs | Processor a b
p <- [Processor a b]
ps]

-- | Show the debug messages with the current simulation time.
traceProcessor :: Maybe String
                  -- ^ the request message
                  -> Maybe String
                  -- ^ the response message
                  -> Processor a b
                  -- ^ a processor
                  -> Processor a b
traceProcessor :: forall a b.
Maybe String -> Maybe String -> Processor a b -> Processor a b
traceProcessor Maybe String
request Maybe String
response (Processor Stream a -> Stream b
f) =
  forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ forall a. Maybe String -> Maybe String -> Stream a -> Stream a
traceStream Maybe String
request Maybe String
response forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream a -> Stream b
f