-- |
-- Module     : Simulation.Aivika.Processor.RoundRobbin
-- Copyright  : Copyright (c) 2009-2017, David Sorokin <david.sorokin@gmail.com>
-- License    : BSD3
-- Maintainer : David Sorokin <david.sorokin@gmail.com>
-- Stability  : experimental
-- Tested with: GHC 8.0.1
--
-- The module defines the Round-Robbin processor.
--
module Simulation.Aivika.Processor.RoundRobbin
       (roundRobbinProcessor,
        roundRobbinProcessorUsingIds) where

import Control.Monad

import Simulation.Aivika.Simulation
import Simulation.Aivika.Event
import Simulation.Aivika.Process
import Simulation.Aivika.Processor
import Simulation.Aivika.Stream
import Simulation.Aivika.Queue.Infinite.Base

-- | Represents the Round-Robbin processor that tries to perform the task within
-- the specified timeout. If the task times out, then it is canceled and returned
-- to the processor again; otherwise, the successful result is redirected to output.
roundRobbinProcessor :: Processor (Process Double, Process a) a
roundRobbinProcessor :: forall a. Processor (Process Double, Process a) a
roundRobbinProcessor =
  (Stream (Process Double, Process a) -> Stream a)
-> Processor (Process Double, Process a) a
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream (Process Double, Process a) -> Stream a)
 -> Processor (Process Double, Process a) a)
-> (Stream (Process Double, Process a) -> Stream a)
-> Processor (Process Double, Process a) a
forall a b. (a -> b) -> a -> b
$
  Processor (Process (Double, ProcessId), Process a) a
-> Stream (Process (Double, ProcessId), Process a) -> Stream a
forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor (Process (Double, ProcessId), Process a) a
forall a. Processor (Process (Double, ProcessId), Process a) a
roundRobbinProcessorUsingIds (Stream (Process (Double, ProcessId), Process a) -> Stream a)
-> (Stream (Process Double, Process a)
    -> Stream (Process (Double, ProcessId), Process a))
-> Stream (Process Double, Process a)
-> Stream a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Process Double, Process a)
 -> Process (Process (Double, ProcessId), Process a))
-> Stream (Process Double, Process a)
-> Stream (Process (Double, ProcessId), Process a)
forall a b. (a -> Process b) -> Stream a -> Stream b
mapStreamM (Process Double, Process a)
-> Process (Process (Double, ProcessId), Process a)
forall {m :: * -> *} {m :: * -> *} {a} {b}.
(Monad m, Monad m, SimulationLift m) =>
(m a, b) -> m (m (a, ProcessId), b)
f where
    f :: (m a, b) -> m (m (a, ProcessId), b)
f (m a
timeout, b
p) =
      let x :: m (a, ProcessId)
x = do a
timeout' <- m a
timeout
                 ProcessId
pid <- Simulation ProcessId -> m ProcessId
forall a. Simulation a -> m a
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation Simulation ProcessId
newProcessId
                 (a, ProcessId) -> m (a, ProcessId)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
timeout', ProcessId
pid)
      in (m (a, ProcessId), b) -> m (m (a, ProcessId), b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (m (a, ProcessId)
x, b
p)

-- | Like 'roundRobbinProcessor' but allows specifying the process identifiers which
-- must be unique for every new attemp to perform the task even if the task is the same.
roundRobbinProcessorUsingIds :: Processor (Process (Double, ProcessId), Process a) a
roundRobbinProcessorUsingIds :: forall a. Processor (Process (Double, ProcessId), Process a) a
roundRobbinProcessorUsingIds =
  (Stream (Process (Double, ProcessId), Process a) -> Stream a)
-> Processor (Process (Double, ProcessId), Process a) a
forall a b. (Stream a -> Stream b) -> Processor a b
Processor ((Stream (Process (Double, ProcessId), Process a) -> Stream a)
 -> Processor (Process (Double, ProcessId), Process a) a)
-> (Stream (Process (Double, ProcessId), Process a) -> Stream a)
-> Processor (Process (Double, ProcessId), Process a) a
forall a b. (a -> b) -> a -> b
$ \Stream (Process (Double, ProcessId), Process a)
xs ->
  Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons (Process (a, Stream a) -> Stream a)
-> Process (a, Stream a) -> Stream a
forall a b. (a -> b) -> a -> b
$
  do FCFSQueue (Process (Double, ProcessId), Process a)
q <- Simulation (FCFSQueue (Process (Double, ProcessId), Process a))
-> Process (FCFSQueue (Process (Double, ProcessId), Process a))
forall a. Simulation a -> Process a
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation Simulation (FCFSQueue (Process (Double, ProcessId), Process a))
forall a. Simulation (FCFSQueue a)
newFCFSQueue
     let process :: Process a
process =
           do t :: (Process (Double, ProcessId), Process a)
t@(Process (Double, ProcessId)
x, Process a
p) <- FCFSQueue (Process (Double, ProcessId), Process a)
-> Process (Process (Double, ProcessId), Process a)
forall sm so a.
(DequeueStrategy sm, EnqueueStrategy so) =>
Queue sm so a -> Process a
dequeue FCFSQueue (Process (Double, ProcessId), Process a)
q
              (Double
timeout, ProcessId
pid) <- Process (Double, ProcessId)
x
              Maybe a
result <- Double -> ProcessId -> Process a -> Process (Maybe a)
forall a. Double -> ProcessId -> Process a -> Process (Maybe a)
timeoutProcessUsingId Double
timeout ProcessId
pid Process a
p
              case Maybe a
result of
                Just a
a  -> a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
                Maybe a
Nothing ->
                  do Event () -> Process ()
forall a. Event a -> Process a
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event () -> Process ()) -> Event () -> Process ()
forall a b. (a -> b) -> a -> b
$ FCFSQueue (Process (Double, ProcessId), Process a)
-> (Process (Double, ProcessId), Process a) -> Event ()
forall sm so a.
(EnqueueStrategy sm, DequeueStrategy so) =>
Queue sm so a -> a -> Event ()
enqueue FCFSQueue (Process (Double, ProcessId), Process a)
q (Process (Double, ProcessId), Process a)
t 
                     Process a
process
         processor :: Processor (Process (Double, ProcessId), Process a) a
processor =
           (Stream (Process (Double, ProcessId), Process a) -> Process ())
-> Stream a -> Processor (Process (Double, ProcessId), Process a) a
forall a b. (Stream a -> Process ()) -> Stream b -> Processor a b
bufferProcessor
           (((Process (Double, ProcessId), Process a) -> Process ())
-> Stream (Process (Double, ProcessId), Process a) -> Process ()
forall a. (a -> Process ()) -> Stream a -> Process ()
consumeStream (((Process (Double, ProcessId), Process a) -> Process ())
 -> Stream (Process (Double, ProcessId), Process a) -> Process ())
-> ((Process (Double, ProcessId), Process a) -> Process ())
-> Stream (Process (Double, ProcessId), Process a)
-> Process ()
forall a b. (a -> b) -> a -> b
$ Event () -> Process ()
forall a. Event a -> Process a
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event () -> Process ())
-> ((Process (Double, ProcessId), Process a) -> Event ())
-> (Process (Double, ProcessId), Process a)
-> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FCFSQueue (Process (Double, ProcessId), Process a)
-> (Process (Double, ProcessId), Process a) -> Event ()
forall sm so a.
(EnqueueStrategy sm, DequeueStrategy so) =>
Queue sm so a -> a -> Event ()
enqueue FCFSQueue (Process (Double, ProcessId), Process a)
q)
           (Process a -> Stream a
forall a. Process a -> Stream a
repeatProcess Process a
process)
     Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream (Stream a -> Process (a, Stream a))
-> Stream a -> Process (a, Stream a)
forall a b. (a -> b) -> a -> b
$ Processor (Process (Double, ProcessId), Process a) a
-> Stream (Process (Double, ProcessId), Process a) -> Stream a
forall a b. Processor a b -> Stream a -> Stream b
runProcessor Processor (Process (Double, ProcessId), Process a) a
processor Stream (Process (Double, ProcessId), Process a)
xs