module Simulation.Aivika.Processor.RoundRobbin
(roundRobbinProcessor,
roundRobbinProcessorUsingIds) where
import Control.Monad
import Simulation.Aivika.Simulation
import Simulation.Aivika.Event
import Simulation.Aivika.Process
import Simulation.Aivika.Processor
import Simulation.Aivika.Stream
import Simulation.Aivika.Queue.Infinite.Base
roundRobbinProcessor :: Processor (Process Double, Process a) a
roundRobbinProcessor :: forall a. Processor (Process Double, Process a) a
roundRobbinProcessor =
forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$
forall a b. Processor a b -> Stream a -> Stream b
runProcessor forall a. Processor (Process (Double, ProcessId), Process a) a
roundRobbinProcessorUsingIds forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a -> Process b) -> Stream a -> Stream b
mapStreamM forall {m :: * -> *} {m :: * -> *} {a} {b}.
(Monad m, Monad m, SimulationLift m) =>
(m a, b) -> m (m (a, ProcessId), b)
f where
f :: (m a, b) -> m (m (a, ProcessId), b)
f (m a
timeout, b
p) =
let x :: m (a, ProcessId)
x = do a
timeout' <- m a
timeout
ProcessId
pid <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation Simulation ProcessId
newProcessId
forall (m :: * -> *) a. Monad m => a -> m a
return (a
timeout', ProcessId
pid)
in forall (m :: * -> *) a. Monad m => a -> m a
return (m (a, ProcessId)
x, b
p)
roundRobbinProcessorUsingIds :: Processor (Process (Double, ProcessId), Process a) a
roundRobbinProcessorUsingIds :: forall a. Processor (Process (Double, ProcessId), Process a) a
roundRobbinProcessorUsingIds =
forall a b. (Stream a -> Stream b) -> Processor a b
Processor forall a b. (a -> b) -> a -> b
$ \Stream (Process (Double, ProcessId), Process a)
xs ->
forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
do FCFSQueue (Process (Double, ProcessId), Process a)
q <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a. Simulation (FCFSQueue a)
newFCFSQueue
let process :: Process a
process =
do t :: (Process (Double, ProcessId), Process a)
t@(Process (Double, ProcessId)
x, Process a
p) <- forall sm so a.
(DequeueStrategy sm, EnqueueStrategy so) =>
Queue sm so a -> Process a
dequeue FCFSQueue (Process (Double, ProcessId), Process a)
q
(Double
timeout, ProcessId
pid) <- Process (Double, ProcessId)
x
Maybe a
result <- forall a. Double -> ProcessId -> Process a -> Process (Maybe a)
timeoutProcessUsingId Double
timeout ProcessId
pid Process a
p
case Maybe a
result of
Just a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Maybe a
Nothing ->
do forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall sm so a.
(EnqueueStrategy sm, DequeueStrategy so) =>
Queue sm so a -> a -> Event ()
enqueue FCFSQueue (Process (Double, ProcessId), Process a)
q (Process (Double, ProcessId), Process a)
t
Process a
process
processor :: Processor (Process (Double, ProcessId), Process a) a
processor =
forall a b. (Stream a -> Process ()) -> Stream b -> Processor a b
bufferProcessor
(forall a. (a -> Process ()) -> Stream a -> Process ()
consumeStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall sm so a.
(EnqueueStrategy sm, DequeueStrategy so) =>
Queue sm so a -> a -> Event ()
enqueue FCFSQueue (Process (Double, ProcessId), Process a)
q)
(forall a. Process a -> Stream a
repeatProcess Process a
process)
forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor (Process (Double, ProcessId), Process a) a
processor Stream (Process (Double, ProcessId), Process a)
xs