-- |
-- Module     : Simulation.Aivika.Trans.Processor.RoundRobbin
-- Copyright  : Copyright (c) 2009-2017, David Sorokin <david.sorokin@gmail.com>
-- License    : BSD3
-- Maintainer : David Sorokin <david.sorokin@gmail.com>
-- Stability  : experimental
-- Tested with: GHC 8.0.1
--
-- The module defines the Round-Robbin processor.
--
module Simulation.Aivika.Trans.Processor.RoundRobbin
       (roundRobbinProcessor,
        roundRobbinProcessorUsingIds) where

import Control.Monad

import Simulation.Aivika.Trans.DES
import Simulation.Aivika.Trans.Simulation
import Simulation.Aivika.Trans.Event
import Simulation.Aivika.Trans.Process
import Simulation.Aivika.Trans.Processor
import Simulation.Aivika.Trans.Stream
import Simulation.Aivika.Trans.Queue.Infinite.Base

-- | Represents the Round-Robbin processor that tries to perform the task within
-- the specified timeout. If the task times out, then it is canceled and returned
-- to the processor again; otherwise, the successful result is redirected to output.
roundRobbinProcessor :: MonadDES m => Processor m (Process m Double, Process m a) a
{-# INLINABLE roundRobbinProcessor #-}
roundRobbinProcessor :: Processor m (Process m Double, Process m a) a
roundRobbinProcessor =
  (Stream m (Process m Double, Process m a) -> Stream m a)
-> Processor m (Process m Double, Process m a) a
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m (Process m Double, Process m a) -> Stream m a)
 -> Processor m (Process m Double, Process m a) a)
-> (Stream m (Process m Double, Process m a) -> Stream m a)
-> Processor m (Process m Double, Process m a) a
forall a b. (a -> b) -> a -> b
$
  Processor m (Process m (Double, ProcessId m), Process m a) a
-> Stream m (Process m (Double, ProcessId m), Process m a)
-> Stream m a
forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor Processor m (Process m (Double, ProcessId m), Process m a) a
forall (m :: * -> *) a.
MonadDES m =>
Processor m (Process m (Double, ProcessId m), Process m a) a
roundRobbinProcessorUsingIds (Stream m (Process m (Double, ProcessId m), Process m a)
 -> Stream m a)
-> (Stream m (Process m Double, Process m a)
    -> Stream m (Process m (Double, ProcessId m), Process m a))
-> Stream m (Process m Double, Process m a)
-> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Process m Double, Process m a)
 -> Process m (Process m (Double, ProcessId m), Process m a))
-> Stream m (Process m Double, Process m a)
-> Stream m (Process m (Double, ProcessId m), Process m a)
forall (m :: * -> *) a b.
MonadDES m =>
(a -> Process m b) -> Stream m a -> Stream m b
mapStreamM (Process m Double, Process m a)
-> Process m (Process m (Double, ProcessId m), Process m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(Monad m, SimulationLift t m, MonadDES m, Monad (t m)) =>
(t m a, b) -> m (t m (a, ProcessId m), b)
f where
    f :: (t m a, b) -> m (t m (a, ProcessId m), b)
f (t m a
timeout, b
p) =
      let x :: t m (a, ProcessId m)
x = do a
timeout' <- t m a
timeout
                 ProcessId m
pid <- Simulation m (ProcessId m) -> t m (ProcessId m)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation Simulation m (ProcessId m)
forall (m :: * -> *). MonadDES m => Simulation m (ProcessId m)
newProcessId
                 (a, ProcessId m) -> t m (a, ProcessId m)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
timeout', ProcessId m
pid)
      in (t m (a, ProcessId m), b) -> m (t m (a, ProcessId m), b)
forall (m :: * -> *) a. Monad m => a -> m a
return (t m (a, ProcessId m)
x, b
p)

-- | Like 'roundRobbinProcessor' but allows specifying the process identifiers which
-- must be unique for every new attemp to perform the task even if the task is the same.
roundRobbinProcessorUsingIds :: MonadDES m => Processor m (Process m (Double, ProcessId m), Process m a) a
{-# INLINABLE roundRobbinProcessorUsingIds #-}
roundRobbinProcessorUsingIds :: Processor m (Process m (Double, ProcessId m), Process m a) a
roundRobbinProcessorUsingIds =
  (Stream m (Process m (Double, ProcessId m), Process m a)
 -> Stream m a)
-> Processor m (Process m (Double, ProcessId m), Process m a) a
forall (m :: * -> *) a b.
(Stream m a -> Stream m b) -> Processor m a b
Processor ((Stream m (Process m (Double, ProcessId m), Process m a)
  -> Stream m a)
 -> Processor m (Process m (Double, ProcessId m), Process m a) a)
-> (Stream m (Process m (Double, ProcessId m), Process m a)
    -> Stream m a)
-> Processor m (Process m (Double, ProcessId m), Process m a) a
forall a b. (a -> b) -> a -> b
$ \Stream m (Process m (Double, ProcessId m), Process m a)
xs ->
  Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$
  do FCFSQueue m (Process m (Double, ProcessId m), Process m a)
q <- Simulation
  m (FCFSQueue m (Process m (Double, ProcessId m), Process m a))
-> Process
     m (FCFSQueue m (Process m (Double, ProcessId m), Process m a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation Simulation
  m (FCFSQueue m (Process m (Double, ProcessId m), Process m a))
forall (m :: * -> *) a. MonadDES m => Simulation m (FCFSQueue m a)
newFCFSQueue
     let process :: Process m a
process =
           do t :: (Process m (Double, ProcessId m), Process m a)
t@(x, p) <- FCFSQueue m (Process m (Double, ProcessId m), Process m a)
-> Process m (Process m (Double, ProcessId m), Process m a)
forall (m :: * -> *) sm so a.
(MonadDES m, DequeueStrategy m sm, EnqueueStrategy m so) =>
Queue m sm so a -> Process m a
dequeue FCFSQueue m (Process m (Double, ProcessId m), Process m a)
q
              (Double
timeout, ProcessId m
pid) <- Process m (Double, ProcessId m)
x
              Maybe a
result <- Double -> ProcessId m -> Process m a -> Process m (Maybe a)
forall (m :: * -> *) a.
MonadDES m =>
Double -> ProcessId m -> Process m a -> Process m (Maybe a)
timeoutProcessUsingId Double
timeout ProcessId m
pid Process m a
p
              case Maybe a
result of
                Just a
a  -> a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
                Maybe a
Nothing ->
                  do Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ FCFSQueue m (Process m (Double, ProcessId m), Process m a)
-> (Process m (Double, ProcessId m), Process m a) -> Event m ()
forall (m :: * -> *) sm so a.
(MonadDES m, EnqueueStrategy m sm, DequeueStrategy m so) =>
Queue m sm so a -> a -> Event m ()
enqueue FCFSQueue m (Process m (Double, ProcessId m), Process m a)
q (Process m (Double, ProcessId m), Process m a)
t 
                     Process m a
process
         processor :: Processor m (Process m (Double, ProcessId m), Process m a) a
processor =
           (Stream m (Process m (Double, ProcessId m), Process m a)
 -> Process m ())
-> Stream m a
-> Processor m (Process m (Double, ProcessId m), Process m a) a
forall (m :: * -> *) a b.
MonadDES m =>
(Stream m a -> Process m ()) -> Stream m b -> Processor m a b
bufferProcessor
           (((Process m (Double, ProcessId m), Process m a) -> Process m ())
-> Stream m (Process m (Double, ProcessId m), Process m a)
-> Process m ()
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m ()) -> Stream m a -> Process m ()
consumeStream (((Process m (Double, ProcessId m), Process m a) -> Process m ())
 -> Stream m (Process m (Double, ProcessId m), Process m a)
 -> Process m ())
-> ((Process m (Double, ProcessId m), Process m a) -> Process m ())
-> Stream m (Process m (Double, ProcessId m), Process m a)
-> Process m ()
forall a b. (a -> b) -> a -> b
$ Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ())
-> ((Process m (Double, ProcessId m), Process m a) -> Event m ())
-> (Process m (Double, ProcessId m), Process m a)
-> Process m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FCFSQueue m (Process m (Double, ProcessId m), Process m a)
-> (Process m (Double, ProcessId m), Process m a) -> Event m ()
forall (m :: * -> *) sm so a.
(MonadDES m, EnqueueStrategy m sm, DequeueStrategy m so) =>
Queue m sm so a -> a -> Event m ()
enqueue FCFSQueue m (Process m (Double, ProcessId m), Process m a)
q)
           (Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
process)
     Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m a -> Process m (a, Stream m a))
-> Stream m a -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$ Processor m (Process m (Double, ProcessId m), Process m a) a
-> Stream m (Process m (Double, ProcessId m), Process m a)
-> Stream m a
forall (m :: * -> *) a b.
Processor m a b -> Stream m a -> Stream m b
runProcessor Processor m (Process m (Double, ProcessId m), Process m a) a
processor Stream m (Process m (Double, ProcessId m), Process m a)
xs