{-# LANGUAGE FlexibleContexts #-}
module Simulation.Aivika.Trans.Processor
(
Processor(..),
emptyProcessor,
arrProcessor,
accumProcessor,
withinProcessor,
processorUsingId,
prefetchProcessor,
delayProcessor,
bufferProcessor,
bufferProcessorLoop,
queueProcessor,
queueProcessorLoopMerging,
queueProcessorLoopSeq,
queueProcessorLoopParallel,
processorSeq,
processorParallel,
processorQueuedParallel,
processorPrioritisingOutputParallel,
processorPrioritisingInputParallel,
processorPrioritisingInputOutputParallel,
arrivalProcessor,
joinProcessor,
failoverProcessor,
channelProcessor,
processorChannel,
queuedChannelProcessor,
queuedProcessorChannel,
traceProcessor) where
import qualified Control.Category as C
import Control.Arrow
import Data.Monoid
import Simulation.Aivika.Trans.DES
import Simulation.Aivika.Trans.Simulation
import Simulation.Aivika.Trans.Dynamics
import Simulation.Aivika.Trans.Event
import Simulation.Aivika.Trans.Composite
import Simulation.Aivika.Trans.Cont
import Simulation.Aivika.Trans.Process
import Simulation.Aivika.Trans.Stream
import Simulation.Aivika.Trans.QueueStrategy
import Simulation.Aivika.Trans.Signal
import Simulation.Aivika.Trans.Channel
import Simulation.Aivika.Arrival (Arrival(..))
newtype Processor m a b =
Processor { forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor :: Stream m a -> Stream m b
}
instance C.Category (Processor m) where
{-# INLINE id #-}
id :: forall a. Processor m a a
id = forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a. a -> a
id
{-# INLINE (.) #-}
Processor Stream m b -> Stream m c
x . :: forall b c a. Processor m b c -> Processor m a b -> Processor m a c
. Processor Stream m a -> Stream m b
y = forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor (Stream m b -> Stream m c
x forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m b
y)
instance MonadDES m => Arrow (Processor m) where
{-# INLINABLE arr #-}
arr :: forall b c. (b -> c) -> Processor m b c
arr = forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream
{-# INLINABLE first #-}
first :: forall b c d. Processor m b c -> Processor m (b, d) (c, d)
first (Processor Stream m b -> Stream m c
f) =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m (b, d)
xys ->
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do (Stream m b
xs, Stream m d
ys) <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a, b) -> Simulation m (Stream m a, Stream m b)
unzipStream Stream m (b, d)
xys
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadDES m =>
Stream m a -> Stream m b -> Stream m (a, b)
zipStreamParallel (Stream m b -> Stream m c
f Stream m b
xs) Stream m d
ys
{-# INLINABLE second #-}
second :: forall b c d. Processor m b c -> Processor m (d, b) (d, c)
second (Processor Stream m b -> Stream m c
f) =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m (d, b)
xys ->
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do (Stream m d
xs, Stream m b
ys) <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a, b) -> Simulation m (Stream m a, Stream m b)
unzipStream Stream m (d, b)
xys
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadDES m =>
Stream m a -> Stream m b -> Stream m (a, b)
zipStreamParallel Stream m d
xs (Stream m b -> Stream m c
f Stream m b
ys)
{-# INLINABLE (***) #-}
Processor Stream m b -> Stream m c
f *** :: forall b c b' c'.
Processor m b c -> Processor m b' c' -> Processor m (b, b') (c, c')
*** Processor Stream m b' -> Stream m c'
g =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m (b, b')
xys ->
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do (Stream m b
xs, Stream m b'
ys) <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a, b) -> Simulation m (Stream m a, Stream m b)
unzipStream Stream m (b, b')
xys
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadDES m =>
Stream m a -> Stream m b -> Stream m (a, b)
zipStreamParallel (Stream m b -> Stream m c
f Stream m b
xs) (Stream m b' -> Stream m c'
g Stream m b'
ys)
{-# INLINABLE (&&&) #-}
Processor Stream m b -> Stream m c
f &&& :: forall b c c'.
Processor m b c -> Processor m b c' -> Processor m b (c, c')
&&& Processor Stream m b -> Stream m c'
g =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m b
xs -> forall (m :: * -> *) a b.
MonadDES m =>
Stream m a -> Stream m b -> Stream m (a, b)
zipStreamParallel (Stream m b -> Stream m c
f Stream m b
xs) (Stream m b -> Stream m c'
g Stream m b
xs)
instance MonadDES m => ArrowChoice (Processor m) where
{-# INLINABLE left #-}
left :: forall b c d.
Processor m b c -> Processor m (Either b d) (Either c d)
left (Processor Stream m b -> Stream m c
f) =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m (Either b d)
xs ->
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do Stream m (Either b d)
ys <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Simulation m (Stream m a)
memoStream Stream m (Either b d)
xs
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
replaceLeftStream Stream m (Either b d)
ys (Stream m b -> Stream m c
f forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m a
leftStream Stream m (Either b d)
ys)
{-# INLINABLE right #-}
right :: forall b c d.
Processor m b c -> Processor m (Either d b) (Either d c)
right (Processor Stream m b -> Stream m c
f) =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m (Either d b)
xs ->
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do Stream m (Either d b)
ys <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Simulation m (Stream m a)
memoStream Stream m (Either d b)
xs
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
replaceRightStream Stream m (Either d b)
ys (Stream m b -> Stream m c
f forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m b
rightStream Stream m (Either d b)
ys)
instance MonadDES m => ArrowZero (Processor m) where
{-# INLINE zeroArrow #-}
zeroArrow :: forall b c. Processor m b c
zeroArrow = forall (m :: * -> *) a b. MonadDES m => Processor m a b
emptyProcessor
instance MonadDES m => ArrowPlus (Processor m) where
{-# INLINABLE (<+>) #-}
(Processor Stream m b -> Stream m c
f) <+> :: forall b c. Processor m b c -> Processor m b c -> Processor m b c
<+> (Processor Stream m b -> Stream m c
g) =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m b
xs ->
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do [Stream m b
xs1, Stream m b
xs2] <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadDES m =>
Int -> Stream m a -> Simulation m [Stream m a]
splitStream Int
2 Stream m b
xs
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Stream m a -> Stream m a
mergeStreams (Stream m b -> Stream m c
f Stream m b
xs1) (Stream m b -> Stream m c
g Stream m b
xs2)
emptyProcessor :: MonadDES m => Processor m a b
{-# INLINABLE emptyProcessor #-}
emptyProcessor :: forall (m :: * -> *) a b. MonadDES m => Processor m a b
emptyProcessor = forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> a
const forall (m :: * -> *) a. MonadDES m => Stream m a
emptyStream
arrProcessor :: MonadDES m => (a -> Process m b) -> Processor m a b
{-# INLINABLE arrProcessor #-}
arrProcessor :: forall (m :: * -> *) a b.
MonadDES m =>
(a -> Process m b) -> Processor m a b
arrProcessor = forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
MonadDES m =>
(a -> Process m b) -> Stream m a -> Stream m b
mapStreamM
accumProcessor :: MonadDES m => (acc -> a -> Process m (acc, b)) -> acc -> Processor m a b
{-# INLINABLE accumProcessor #-}
accumProcessor :: forall (m :: * -> *) acc a b.
MonadDES m =>
(acc -> a -> Process m (acc, b)) -> acc -> Processor m a b
accumProcessor acc -> a -> Process m (acc, b)
f acc
acc =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m a
xs -> forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$ Stream m a -> acc -> Process m (b, Stream m b)
loop Stream m a
xs acc
acc where
loop :: Stream m a -> acc -> Process m (b, Stream m b)
loop Stream m a
xs acc
acc =
do (a
a, Stream m a
xs') <- forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
xs
(acc
acc', b
b) <- acc -> a -> Process m (acc, b)
f acc
acc a
a
forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$ Stream m a -> acc -> Process m (b, Stream m b)
loop Stream m a
xs' acc
acc')
withinProcessor :: MonadDES m => Process m () -> Processor m a a
{-# INLINABLE withinProcessor #-}
withinProcessor :: forall (m :: * -> *) a.
MonadDES m =>
Process m () -> Processor m a a
withinProcessor Process m ()
m =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a b.
MonadDES m =>
(a -> Process m b) -> Stream m a -> Stream m b
mapStreamM forall a b. (a -> b) -> a -> b
$ \a
a ->
do { Process m ()
m; forall (m :: * -> *) a. Monad m => a -> m a
return a
a }
processorUsingId :: MonadDES m => ProcessId m -> Processor m a b -> Processor m a b
{-# INLINABLE processorUsingId #-}
processorUsingId :: forall (m :: * -> *) a b.
MonadDES m =>
ProcessId m -> Processor m a b -> Processor m a b
processorUsingId ProcessId m
pid (Processor Stream m a -> Stream m b
f) =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
MonadDES m =>
ProcessId m -> Process m a -> Process m a
processUsingId ProcessId m
pid forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m b
f
processorQueuedParallel :: (MonadDES m,
EnqueueStrategy m si,
EnqueueStrategy m so)
=> si
-> so
-> [Processor m a b]
-> Processor m a b
{-# INLINABLE processorQueuedParallel #-}
processorQueuedParallel :: forall (m :: * -> *) si so a b.
(MonadDES m, EnqueueStrategy m si, EnqueueStrategy m so) =>
si -> so -> [Processor m a b] -> Processor m a b
processorQueuedParallel si
si so
so [Processor m a b]
ps =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do let n :: Int
n = forall (t :: * -> *) a. Foldable t => t a -> Int
length [Processor m a b]
ps
[Stream m a]
input <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> Int -> Stream m a -> Simulation m [Stream m a]
splitStreamQueueing si
si Int
n Stream m a
xs
let results :: [Stream m b]
results = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall a b. (a -> b) -> [a] -> [b]
map (forall a b. [a] -> [b] -> [(a, b)]
zip [Stream m a]
input [Processor m a b]
ps) forall a b. (a -> b) -> a -> b
$ \(Stream m a
input, Processor m a b
p) ->
forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor Processor m a b
p Stream m a
input
output :: Stream m b
output = forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> [Stream m a] -> Stream m a
concatQueuedStreams so
so [Stream m b]
results
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m b
output
processorPrioritisingOutputParallel :: (MonadDES m,
EnqueueStrategy m si,
PriorityQueueStrategy m so po)
=> si
-> so
-> [Processor m a (po, b)]
-> Processor m a b
{-# INLINABLE processorPrioritisingOutputParallel #-}
processorPrioritisingOutputParallel :: forall (m :: * -> *) si so po a b.
(MonadDES m, EnqueueStrategy m si,
PriorityQueueStrategy m so po) =>
si -> so -> [Processor m a (po, b)] -> Processor m a b
processorPrioritisingOutputParallel si
si so
so [Processor m a (po, b)]
ps =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do let n :: Int
n = forall (t :: * -> *) a. Foldable t => t a -> Int
length [Processor m a (po, b)]
ps
[Stream m a]
input <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> Int -> Stream m a -> Simulation m [Stream m a]
splitStreamQueueing si
si Int
n Stream m a
xs
let results :: [Stream m (po, b)]
results = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall a b. (a -> b) -> [a] -> [b]
map (forall a b. [a] -> [b] -> [(a, b)]
zip [Stream m a]
input [Processor m a (po, b)]
ps) forall a b. (a -> b) -> a -> b
$ \(Stream m a
input, Processor m a (po, b)
p) ->
forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor Processor m a (po, b)
p Stream m a
input
output :: Stream m b
output = forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
s -> [Stream m (p, a)] -> Stream m a
concatPriorityStreams so
so [Stream m (po, b)]
results
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m b
output
processorPrioritisingInputParallel :: (MonadDES m,
PriorityQueueStrategy m si pi,
EnqueueStrategy m so)
=> si
-> so
-> [(Stream m pi, Processor m a b)]
-> Processor m a b
{-# INLINABLE processorPrioritisingInputParallel #-}
processorPrioritisingInputParallel :: forall (m :: * -> *) si pi so a b.
(MonadDES m, PriorityQueueStrategy m si pi,
EnqueueStrategy m so) =>
si -> so -> [(Stream m pi, Processor m a b)] -> Processor m a b
processorPrioritisingInputParallel si
si so
so [(Stream m pi, Processor m a b)]
ps =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do [Stream m a]
input <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
s -> [Stream m p] -> Stream m a -> Simulation m [Stream m a]
splitStreamPrioritising si
si (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst [(Stream m pi, Processor m a b)]
ps) Stream m a
xs
let results :: [Stream m b]
results = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall a b. (a -> b) -> [a] -> [b]
map (forall a b. [a] -> [b] -> [(a, b)]
zip [Stream m a]
input [(Stream m pi, Processor m a b)]
ps) forall a b. (a -> b) -> a -> b
$ \(Stream m a
input, (Stream m pi
_, Processor m a b
p)) ->
forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor Processor m a b
p Stream m a
input
output :: Stream m b
output = forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> [Stream m a] -> Stream m a
concatQueuedStreams so
so [Stream m b]
results
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m b
output
processorPrioritisingInputOutputParallel :: (MonadDES m,
PriorityQueueStrategy m si pi,
PriorityQueueStrategy m so po)
=> si
-> so
-> [(Stream m pi, Processor m a (po, b))]
-> Processor m a b
{-# INLINABLE processorPrioritisingInputOutputParallel #-}
processorPrioritisingInputOutputParallel :: forall (m :: * -> *) si pi so po a b.
(MonadDES m, PriorityQueueStrategy m si pi,
PriorityQueueStrategy m so po) =>
si
-> so -> [(Stream m pi, Processor m a (po, b))] -> Processor m a b
processorPrioritisingInputOutputParallel si
si so
so [(Stream m pi, Processor m a (po, b))]
ps =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do [Stream m a]
input <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
s -> [Stream m p] -> Stream m a -> Simulation m [Stream m a]
splitStreamPrioritising si
si (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst [(Stream m pi, Processor m a (po, b))]
ps) Stream m a
xs
let results :: [Stream m (po, b)]
results = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall a b. (a -> b) -> [a] -> [b]
map (forall a b. [a] -> [b] -> [(a, b)]
zip [Stream m a]
input [(Stream m pi, Processor m a (po, b))]
ps) forall a b. (a -> b) -> a -> b
$ \(Stream m a
input, (Stream m pi
_, Processor m a (po, b)
p)) ->
forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor Processor m a (po, b)
p Stream m a
input
output :: Stream m b
output = forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
s -> [Stream m (p, a)] -> Stream m a
concatPriorityStreams so
so [Stream m (po, b)]
results
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m b
output
processorParallel :: MonadDES m => [Processor m a b] -> Processor m a b
{-# INLINABLE processorParallel #-}
processorParallel :: forall (m :: * -> *) a b.
MonadDES m =>
[Processor m a b] -> Processor m a b
processorParallel = forall (m :: * -> *) si so a b.
(MonadDES m, EnqueueStrategy m si, EnqueueStrategy m so) =>
si -> so -> [Processor m a b] -> Processor m a b
processorQueuedParallel FCFS
FCFS FCFS
FCFS
processorSeq :: MonadDES m => [Processor m a a] -> Processor m a a
{-# INLINABLE processorSeq #-}
processorSeq :: forall (m :: * -> *) a.
MonadDES m =>
[Processor m a a] -> Processor m a a
processorSeq [] = forall (m :: * -> *) a b. MonadDES m => Processor m a b
emptyProcessor
processorSeq [Processor m a a
p] = Processor m a a
p
processorSeq (Processor m a a
p : [Processor m a a]
ps) = Processor m a a
p forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (m :: * -> *) a. MonadDES m => Processor m a a
prefetchProcessor forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (m :: * -> *) a.
MonadDES m =>
[Processor m a a] -> Processor m a a
processorSeq [Processor m a a]
ps
bufferProcessor :: MonadDES m
=> (Stream m a -> Process m ())
-> Stream m b
-> Processor m a b
{-# INLINABLE bufferProcessor #-}
bufferProcessor :: forall (m :: * -> *) a b.
MonadDES m =>
(Stream m a -> Process m ()) -> Stream m b -> Processor m a b
bufferProcessor Stream m a -> Process m ()
consume Stream m b
output =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess (Stream m a -> Process m ()
consume Stream m a
xs)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m b
output
bufferProcessorLoop :: MonadDES m
=> (Stream m a -> Stream m c -> Process m ())
-> Stream m d
-> Processor m d (Either e b)
-> Processor m e c
-> Processor m a b
{-# INLINABLE bufferProcessorLoop #-}
bufferProcessorLoop :: forall (m :: * -> *) a c d e b.
MonadDES m =>
(Stream m a -> Stream m c -> Process m ())
-> Stream m d
-> Processor m d (Either e b)
-> Processor m e c
-> Processor m a b
bufferProcessorLoop Stream m a -> Stream m c -> Process m ()
consume Stream m d
preoutput Processor m d (Either e b)
cond Processor m e c
body =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do (Stream m e
reverted, Stream m b
output) <-
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Simulation m (Stream m a, Stream m b)
partitionEitherStream forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor Processor m d (Either e b)
cond Stream m d
preoutput
forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess
(Stream m a -> Stream m c -> Process m ()
consume Stream m a
xs forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor Processor m e c
body Stream m e
reverted)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m b
output
queueProcessor :: MonadDES m =>
(a -> Process m ())
-> Process m b
-> Processor m a b
{-# INLINABLE queueProcessor #-}
queueProcessor :: forall (m :: * -> *) a b.
MonadDES m =>
(a -> Process m ()) -> Process m b -> Processor m a b
queueProcessor a -> Process m ()
enqueue Process m b
dequeue =
forall (m :: * -> *) a b.
MonadDES m =>
(Stream m a -> Process m ()) -> Stream m b -> Processor m a b
bufferProcessor
(forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m ()) -> Stream m a -> Process m ()
consumeStream a -> Process m ()
enqueue)
(forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m b
dequeue)
queueProcessorLoopMerging :: MonadDES m
=> (Stream m a -> Stream m d -> Stream m e)
-> (e -> Process m ())
-> Process m c
-> Processor m c (Either f b)
-> Processor m f d
-> Processor m a b
{-# INLINABLE queueProcessorLoopMerging #-}
queueProcessorLoopMerging :: forall (m :: * -> *) a d e c f b.
MonadDES m =>
(Stream m a -> Stream m d -> Stream m e)
-> (e -> Process m ())
-> Process m c
-> Processor m c (Either f b)
-> Processor m f d
-> Processor m a b
queueProcessorLoopMerging Stream m a -> Stream m d -> Stream m e
merge e -> Process m ()
enqueue Process m c
dequeue =
forall (m :: * -> *) a c d e b.
MonadDES m =>
(Stream m a -> Stream m c -> Process m ())
-> Stream m d
-> Processor m d (Either e b)
-> Processor m e c
-> Processor m a b
bufferProcessorLoop
(\Stream m a
bs Stream m d
cs ->
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m ()) -> Stream m a -> Process m ()
consumeStream e -> Process m ()
enqueue forall a b. (a -> b) -> a -> b
$
Stream m a -> Stream m d -> Stream m e
merge Stream m a
bs Stream m d
cs)
(forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m c
dequeue)
queueProcessorLoopSeq :: MonadDES m
=> (a -> Process m ())
-> Process m c
-> Processor m c (Either e b)
-> Processor m e a
-> Processor m a b
{-# INLINABLE queueProcessorLoopSeq #-}
queueProcessorLoopSeq :: forall (m :: * -> *) a c e b.
MonadDES m =>
(a -> Process m ())
-> Process m c
-> Processor m c (Either e b)
-> Processor m e a
-> Processor m a b
queueProcessorLoopSeq =
forall (m :: * -> *) a d e c f b.
MonadDES m =>
(Stream m a -> Stream m d -> Stream m e)
-> (e -> Process m ())
-> Process m c
-> Processor m c (Either f b)
-> Processor m f d
-> Processor m a b
queueProcessorLoopMerging forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Stream m a -> Stream m a
mergeStreams
queueProcessorLoopParallel :: MonadDES m
=> (a -> Process m ())
-> Process m c
-> Processor m c (Either e b)
-> Processor m e a
-> Processor m a b
{-# INLINABLE queueProcessorLoopParallel #-}
queueProcessorLoopParallel :: forall (m :: * -> *) a c e b.
MonadDES m =>
(a -> Process m ())
-> Process m c
-> Processor m c (Either e b)
-> Processor m e a
-> Processor m a b
queueProcessorLoopParallel a -> Process m ()
enqueue Process m c
dequeue =
forall (m :: * -> *) a c d e b.
MonadDES m =>
(Stream m a -> Stream m c -> Process m ())
-> Stream m d
-> Processor m d (Either e b)
-> Processor m e c
-> Processor m a b
bufferProcessorLoop
(\Stream m a
bs Stream m a
cs ->
do forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m ()) -> Stream m a -> Process m ()
consumeStream a -> Process m ()
enqueue Stream m a
bs
forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m ()) -> Stream m a -> Process m ()
consumeStream a -> Process m ()
enqueue Stream m a
cs)
(forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m c
dequeue)
prefetchProcessor :: MonadDES m => Processor m a a
{-# INLINABLE prefetchProcessor #-}
prefetchProcessor :: forall (m :: * -> *) a. MonadDES m => Processor m a a
prefetchProcessor = forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall (m :: * -> *) a. MonadDES m => Stream m a -> Stream m a
prefetchStream
channelProcessor :: MonadDES m => Channel m a b -> Processor m a b
{-# INLINABLE channelProcessor #-}
channelProcessor :: forall (m :: * -> *) a b.
MonadDES m =>
Channel m a b -> Processor m a b
channelProcessor Channel m a b
f =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do let composite :: Composite m (Stream m b)
composite =
do Signal m a
sa <- forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Composite m (Signal m a)
streamSignal Stream m a
xs
Signal m b
sb <- forall (m :: * -> *) a b.
Channel m a b -> Signal m a -> Composite m (Signal m b)
runChannel Channel m a b
f Signal m a
sa
forall (m :: * -> *) a.
MonadDES m =>
Signal m a -> Composite m (Stream m a)
signalStream Signal m b
sb
(Stream m b
ys, DisposableEvent m
h) <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
Composite m a
-> DisposableEvent m -> Event m (a, DisposableEvent m)
runComposite Composite m (Stream m b)
composite forall a. Monoid a => a
mempty
forall (m :: * -> *). MonadDES m => Event m () -> Process m ()
whenCancellingProcess forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *). DisposableEvent m -> Event m ()
disposeEvent DisposableEvent m
h
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m b
ys
processorChannel :: MonadDES m => Processor m a b -> Channel m a b
{-# INLINABLE processorChannel #-}
processorChannel :: forall (m :: * -> *) a b.
MonadDES m =>
Processor m a b -> Channel m a b
processorChannel (Processor Stream m a -> Stream m b
f) =
forall (m :: * -> *) a b.
(Signal m a -> Composite m (Signal m b)) -> Channel m a b
Channel forall a b. (a -> b) -> a -> b
$ \Signal m a
sa ->
do Stream m a
xs <- forall (m :: * -> *) a.
MonadDES m =>
Signal m a -> Composite m (Stream m a)
signalStream Signal m a
sa
let ys :: Stream m b
ys = Stream m a -> Stream m b
f Stream m a
xs
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Composite m (Signal m a)
streamSignal Stream m b
ys
queuedChannelProcessor :: MonadDES m
=> (b -> Event m ())
-> Process m b
-> Channel m a b
-> Processor m a b
{-# INLINABLE queuedChannelProcessor #-}
queuedChannelProcessor :: forall (m :: * -> *) b a.
MonadDES m =>
(b -> Event m ())
-> Process m b -> Channel m a b -> Processor m a b
queuedChannelProcessor b -> Event m ()
enqueue Process m b
dequeue Channel m a b
f =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do let composite :: Composite m (Stream m b)
composite =
do Signal m a
sa <- forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Composite m (Signal m a)
streamSignal Stream m a
xs
Signal m b
sb <- forall (m :: * -> *) a b.
Channel m a b -> Signal m a -> Composite m (Signal m b)
runChannel Channel m a b
f Signal m a
sa
forall (m :: * -> *) a.
MonadDES m =>
(a -> Event m ())
-> Process m a -> Signal m a -> Composite m (Stream m a)
queuedSignalStream b -> Event m ()
enqueue Process m b
dequeue Signal m b
sb
(Stream m b
ys, DisposableEvent m
h) <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
Composite m a
-> DisposableEvent m -> Event m (a, DisposableEvent m)
runComposite Composite m (Stream m b)
composite forall a. Monoid a => a
mempty
forall (m :: * -> *). MonadDES m => Event m () -> Process m ()
whenCancellingProcess forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *). DisposableEvent m -> Event m ()
disposeEvent DisposableEvent m
h
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m b
ys
queuedProcessorChannel :: MonadDES m =>
(a -> Event m ())
-> (Process m a)
-> Processor m a b
-> Channel m a b
{-# INLINABLE queuedProcessorChannel #-}
queuedProcessorChannel :: forall (m :: * -> *) a b.
MonadDES m =>
(a -> Event m ())
-> Process m a -> Processor m a b -> Channel m a b
queuedProcessorChannel a -> Event m ()
enqueue Process m a
dequeue (Processor Stream m a -> Stream m b
f) =
forall (m :: * -> *) a b.
(Signal m a -> Composite m (Signal m b)) -> Channel m a b
Channel forall a b. (a -> b) -> a -> b
$ \Signal m a
sa ->
do Stream m a
xs <- forall (m :: * -> *) a.
MonadDES m =>
(a -> Event m ())
-> Process m a -> Signal m a -> Composite m (Stream m a)
queuedSignalStream a -> Event m ()
enqueue Process m a
dequeue Signal m a
sa
let ys :: Stream m b
ys = Stream m a -> Stream m b
f Stream m a
xs
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Composite m (Signal m a)
streamSignal Stream m b
ys
arrivalProcessor :: MonadDES m => Processor m a (Arrival a)
{-# INLINABLE arrivalProcessor #-}
arrivalProcessor :: forall (m :: * -> *) a. MonadDES m => Processor m a (Arrival a)
arrivalProcessor = forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Stream m (Arrival a)
arrivalStream
delayProcessor :: MonadDES m => a -> Processor m a a
{-# INLINABLE delayProcessor #-}
delayProcessor :: forall (m :: * -> *) a. MonadDES m => a -> Processor m a a
delayProcessor a
a0 = forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadDES m => a -> Stream m a -> Stream m a
delayStream a
a0
joinProcessor :: MonadDES m => Process m (Processor m a b) -> Processor m a b
{-# INLINABLE joinProcessor #-}
joinProcessor :: forall (m :: * -> *) a b.
MonadDES m =>
Process m (Processor m a b) -> Processor m a b
joinProcessor Process m (Processor m a b)
m =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m a
xs ->
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do Processor Stream m a -> Stream m b
f <- Process m (Processor m a b)
m
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m b
f Stream m a
xs
failoverProcessor :: MonadDES m => [Processor m a b] -> Processor m a b
{-# INLINABLE failoverProcessor #-}
failoverProcessor :: forall (m :: * -> *) a b.
MonadDES m =>
[Processor m a b] -> Processor m a b
failoverProcessor [Processor m a b]
ps =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream m a
xs -> forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m a
failoverStream [forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor Processor m a b
p Stream m a
xs | Processor m a b
p <- [Processor m a b]
ps]
traceProcessor :: MonadDES m
=> Maybe String
-> Maybe String
-> Processor m a b
-> Processor m a b
{-# INLINABLE traceProcessor #-}
traceProcessor :: forall (m :: * -> *) a b.
MonadDES m =>
Maybe String -> Maybe String -> Processor m a b -> Processor m a b
traceProcessor Maybe String
request Maybe String
response (Processor Stream m a -> Stream m b
f) =
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadDES m =>
Maybe String -> Maybe String -> Stream m a -> Stream m a
traceStream Maybe String
request Maybe String
response forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m b
f