{-# LANGUAGE FlexibleContexts #-}

-- |
-- Module     : Simulation.Aivika.Trans.Processor
-- Copyright  : Copyright (c) 2009-2017, David Sorokin <david.sorokin@gmail.com>
-- License    : BSD3
-- Maintainer : David Sorokin <david.sorokin@gmail.com>
-- Stability  : experimental
-- Tested with: GHC 8.0.1
--
-- The processor of simulation data.
--
module Simulation.Aivika.Trans.Processor
       (-- * Processor Type
        Processor(..),
        -- * Processor Primitives
        emptyProcessor,
        arrProcessor,
        accumProcessor,
        withinProcessor,
        -- * Specifying Identifier
        processorUsingId,
        -- * Prefetch and Delay Processors
        prefetchProcessor,
        delayProcessor,
        -- * Buffer Processor
        bufferProcessor,
        bufferProcessorLoop,
        -- * Processing Queues
        queueProcessor,
        queueProcessorLoopMerging,
        queueProcessorLoopSeq,
        queueProcessorLoopParallel,
        -- * Sequencing Processors
        processorSeq,
        -- * Parallelizing Processors
        processorParallel,
        processorQueuedParallel,
        processorPrioritisingOutputParallel,
        processorPrioritisingInputParallel,
        processorPrioritisingInputOutputParallel,
        -- * Arrival Processor
        arrivalProcessor,
        -- * Utilities
        joinProcessor,
        -- * Failover
        failoverProcessor,
        -- * Integrating with Signals and Channels
        channelProcessor,
        processorChannel,
        queuedChannelProcessor,
        queuedProcessorChannel,
        -- * Debugging
        traceProcessor) where

import qualified Control.Category as C
import Control.Arrow

import Data.Monoid

import Simulation.Aivika.Trans.DES
import Simulation.Aivika.Trans.Simulation
import Simulation.Aivika.Trans.Dynamics
import Simulation.Aivika.Trans.Event
import Simulation.Aivika.Trans.Composite
import Simulation.Aivika.Trans.Cont
import Simulation.Aivika.Trans.Process
import Simulation.Aivika.Trans.Stream
import Simulation.Aivika.Trans.QueueStrategy
import Simulation.Aivika.Trans.Signal
import Simulation.Aivika.Trans.Channel
import Simulation.Aivika.Arrival (Arrival(..))

-- | Represents a processor of simulation data.
newtype Processor m a b =
  Processor { runProcessor :: Stream m a -> Stream m b
              -- ^ Run the processor.
            }

instance C.Category (Processor m) where

  {-# INLINE id #-}
  id  = Processor id

  {-# INLINE (.) #-}
  Processor x . Processor y = Processor (x . y)

-- The implementation is based on article
-- A New Notation for Arrows by Ross Paterson,
-- although my streams are different and they
-- already depend on the Process monad,
-- while the pure streams were considered in the
-- mentioned article.
  
instance MonadDES m => Arrow (Processor m) where

  {-# INLINABLE arr #-}
  arr = Processor . mapStream

  {-# INLINABLE first #-}
  first (Processor f) =
    Processor $ \xys ->
    Cons $
    do (xs, ys) <- liftSimulation $ unzipStream xys
       runStream $ zipStreamParallel (f xs) ys

  {-# INLINABLE second #-}
  second (Processor f) =
    Processor $ \xys ->
    Cons $
    do (xs, ys) <- liftSimulation $ unzipStream xys
       runStream $ zipStreamParallel xs (f ys)

  {-# INLINABLE (***) #-}
  Processor f *** Processor g =
    Processor $ \xys ->
    Cons $
    do (xs, ys) <- liftSimulation $ unzipStream xys
       runStream $ zipStreamParallel (f xs) (g ys)

  {-# INLINABLE (&&&) #-}
  Processor f &&& Processor g =
    Processor $ \xs -> zipStreamParallel (f xs) (g xs)

instance MonadDES m => ArrowChoice (Processor m) where

  {-# INLINABLE left #-}
  left (Processor f) =
    Processor $ \xs ->
    Cons $
    do ys <- liftSimulation $ memoStream xs
       runStream $ replaceLeftStream ys (f $ leftStream ys)

  {-# INLINABLE right #-}
  right (Processor f) =
    Processor $ \xs ->
    Cons $
    do ys <- liftSimulation $ memoStream xs
       runStream $ replaceRightStream ys (f $ rightStream ys)

instance MonadDES m => ArrowZero (Processor m) where

  {-# INLINE zeroArrow #-}
  zeroArrow = emptyProcessor

instance MonadDES m => ArrowPlus (Processor m) where

  {-# INLINABLE (<+>) #-}
  (Processor f) <+> (Processor g) =
    Processor $ \xs ->
    Cons $
    do [xs1, xs2] <- liftSimulation $ splitStream 2 xs
       runStream $ mergeStreams (f xs1) (g xs2)

-- | A processor that never finishes its work producing an 'emptyStream'.
emptyProcessor :: MonadDES m => Processor m a b
{-# INLINABLE emptyProcessor #-}
emptyProcessor = Processor $ const emptyStream

-- | Create a simple processor by the specified handling function
-- that runs the discontinuous process for each input value to get the output.
arrProcessor :: MonadDES m => (a -> Process m b) -> Processor m a b
{-# INLINABLE arrProcessor #-}
arrProcessor = Processor . mapStreamM

-- | Accumulator that outputs a value determined by the supplied function.
accumProcessor :: MonadDES m => (acc -> a -> Process m (acc, b)) -> acc -> Processor m a b
{-# INLINABLE accumProcessor #-}
accumProcessor f acc =
  Processor $ \xs -> Cons $ loop xs acc where
    loop xs acc =
      do (a, xs') <- runStream xs
         (acc', b) <- f acc a
         return (b, Cons $ loop xs' acc') 

-- | Involve the computation with side effect when processing a stream of data.
withinProcessor :: MonadDES m => Process m () -> Processor m a a
{-# INLINABLE withinProcessor #-}
withinProcessor m =
  Processor $
  mapStreamM $ \a ->
  do { m; return a }

-- | Create a processor that will use the specified process identifier.
-- It can be useful to refer to the underlying 'Process' computation which
-- can be passivated, interrupted, canceled and so on. See also the
-- 'processUsingId' function for more details.
processorUsingId :: MonadDES m => ProcessId m -> Processor m a b -> Processor m a b
{-# INLINABLE processorUsingId #-}
processorUsingId pid (Processor f) =
  Processor $ Cons . processUsingId pid . runStream . f

-- | Launches the specified processors in parallel consuming the same input
-- stream and producing a combined output stream.
--
-- If you don't know what the enqueue strategies to apply, then
-- you will probably need 'FCFS' for the both parameters, or
-- function 'processorParallel' that does namely this.
processorQueuedParallel :: (MonadDES m,
                            EnqueueStrategy m si,
                            EnqueueStrategy m so)
                           => si
                           -- ^ the strategy applied for enqueuing the input data
                           -> so
                           -- ^ the strategy applied for enqueuing the output data
                           -> [Processor m a b]
                           -- ^ the processors to parallelize
                           -> Processor m a b
                           -- ^ the parallelized processor
{-# INLINABLE processorQueuedParallel #-}
processorQueuedParallel si so ps =
  Processor $ \xs ->
  Cons $
  do let n = length ps
     input <- liftSimulation $ splitStreamQueueing si n xs
     let results = flip map (zip input ps) $ \(input, p) ->
           runProcessor p input
         output  = concatQueuedStreams so results
     runStream output

-- | Launches the specified processors in parallel using priorities for combining the output.
processorPrioritisingOutputParallel :: (MonadDES m,
                                        EnqueueStrategy m si,
                                        PriorityQueueStrategy m so po)
                                       => si
                                       -- ^ the strategy applied for enqueuing the input data
                                       -> so
                                       -- ^ the strategy applied for enqueuing the output data
                                       -> [Processor m a (po, b)]
                                       -- ^ the processors to parallelize
                                       -> Processor m a b
                                       -- ^ the parallelized processor
{-# INLINABLE processorPrioritisingOutputParallel #-}
processorPrioritisingOutputParallel si so ps =
  Processor $ \xs ->
  Cons $
  do let n = length ps
     input <- liftSimulation $ splitStreamQueueing si n xs
     let results = flip map (zip input ps) $ \(input, p) ->
           runProcessor p input
         output  = concatPriorityStreams so results
     runStream output

-- | Launches the specified processors in parallel using priorities for consuming the intput.
processorPrioritisingInputParallel :: (MonadDES m,
                                       PriorityQueueStrategy m si pi,
                                       EnqueueStrategy m so)
                                      => si
                                      -- ^ the strategy applied for enqueuing the input data
                                      -> so
                                      -- ^ the strategy applied for enqueuing the output data
                                      -> [(Stream m pi, Processor m a b)]
                                      -- ^ the streams of input priorities and the processors
                                      -- to parallelize
                                      -> Processor m a b
                                      -- ^ the parallelized processor
{-# INLINABLE processorPrioritisingInputParallel #-}
processorPrioritisingInputParallel si so ps =
  Processor $ \xs ->
  Cons $
  do input <- liftSimulation $ splitStreamPrioritising si (map fst ps) xs
     let results = flip map (zip input ps) $ \(input, (_, p)) ->
           runProcessor p input
         output  = concatQueuedStreams so results
     runStream output

-- | Launches the specified processors in parallel using priorities for consuming
-- the input and combining the output.
processorPrioritisingInputOutputParallel :: (MonadDES m,
                                             PriorityQueueStrategy m si pi,
                                             PriorityQueueStrategy m so po)
                                            => si
                                            -- ^ the strategy applied for enqueuing the input data
                                            -> so
                                            -- ^ the strategy applied for enqueuing the output data
                                            -> [(Stream m pi, Processor m a (po, b))]
                                            -- ^ the streams of input priorities and the processors
                                            -- to parallelize
                                            -> Processor m a b
                                            -- ^ the parallelized processor
{-# INLINABLE processorPrioritisingInputOutputParallel #-}
processorPrioritisingInputOutputParallel si so ps =
  Processor $ \xs ->
  Cons $
  do input <- liftSimulation $ splitStreamPrioritising si (map fst ps) xs
     let results = flip map (zip input ps) $ \(input, (_, p)) ->
           runProcessor p input
         output  = concatPriorityStreams so results
     runStream output

-- | Launches the processors in parallel consuming the same input stream and producing
-- a combined output stream. This version applies the 'FCFS' strategy both for input
-- and output, which suits the most part of uses cases.
processorParallel :: MonadDES m => [Processor m a b] -> Processor m a b
{-# INLINABLE processorParallel #-}
processorParallel = processorQueuedParallel FCFS FCFS

-- | Launches the processors sequentially using the 'prefetchProcessor' between them
-- to model an autonomous work of each of the processors specified.
processorSeq :: MonadDES m => [Processor m a a] -> Processor m a a
{-# INLINABLE processorSeq #-}
processorSeq []  = emptyProcessor
processorSeq [p] = p
processorSeq (p : ps) = p >>> prefetchProcessor >>> processorSeq ps

-- | Create a buffer processor, where the process from the first argument
-- consumes the input stream but the stream passed in as the second argument
-- and produced usually by some other process is returned as an output.
-- This kind of processor is very useful for modeling the queues.
bufferProcessor :: MonadDES m
                   => (Stream m a -> Process m ())
                   -- ^ a separate process to consume the input 
                   -> Stream m b
                   -- ^ the resulting stream of data
                   -> Processor m a b
{-# INLINABLE bufferProcessor #-}
bufferProcessor consume output =
  Processor $ \xs ->
  Cons $
  do spawnProcess (consume xs)
     runStream output

-- | Like 'bufferProcessor' but allows creating a loop when some items
-- can be processed repeatedly. It is very useful for modeling the processors 
-- with queues and loop-backs.
bufferProcessorLoop :: MonadDES m
                       => (Stream m a -> Stream m c -> Process m ())
                       -- ^ consume two streams: the input values of type @a@
                       -- and the values of type @c@ returned by the loop
                       -> Stream m d
                       -- ^ the stream of data that may become results
                       -> Processor m d (Either e b)
                       -- ^ process and then decide what values of type @e@
                       -- should be processed in the loop (this is a condition)
                       -> Processor m e c
                       -- ^ process in the loop and then return a value
                       -- of type @c@ to the input again (this is a loop body)
                       -> Processor m a b
{-# INLINABLE bufferProcessorLoop #-}
bufferProcessorLoop consume preoutput cond body =
  Processor $ \xs ->
  Cons $
  do (reverted, output) <-
       liftSimulation $
       partitionEitherStream $
       runProcessor cond preoutput
     spawnProcess
       (consume xs $ runProcessor body reverted)
     runStream output

-- | Return a processor with help of which we can model the queue.
--
-- Although the function doesn't refer to the queue directly, its main use case
-- is namely a processing of the queue. The first argument should be the enqueueing
-- operation, while the second argument should be the opposite dequeueing operation.
--
-- The reason is as follows. There are many possible combinations how the queues
-- can be modeled. There is no sense to enumerate all them creating a separate function
-- for each case. We can just use combinators to define exactly what we need.
--
-- So, the queue can lose the input items if the queue is full, or the input process
-- can suspend while the queue is full, or we can use priorities for enqueueing,
-- storing and dequeueing the items in different combinations. There are so many use
-- cases!
--
-- There is a hope that this function along with other similar functions from this
-- module is sufficient to cover the most important cases. Even if it is not sufficient
-- then you can use a more generic function 'bufferProcessor' which this function is
-- based on. In case of need, you can even write your own function from scratch. It is
-- quite easy actually.
queueProcessor :: MonadDES m =>
                  (a -> Process m ())
                  -- ^ enqueue the input item and wait
                  -- while the queue is full if required
                  -- so that there were no hanging items
                  -> Process m b
                  -- ^ dequeue an output item
                  -> Processor m a b
                  -- ^ the buffering processor
{-# INLINABLE queueProcessor #-}
queueProcessor enqueue dequeue =
  bufferProcessor
  (consumeStream enqueue)
  (repeatProcess dequeue)

-- | Like 'queueProcessor' creates a queue processor but with a loop when some items 
-- can be processed and then added to the queue again. Also it allows specifying 
-- how two input streams of data can be merged.
queueProcessorLoopMerging :: MonadDES m
                             => (Stream m a -> Stream m d -> Stream m e)
                             -- ^ merge two streams: the input values of type @a@
                             -- and the values of type @d@ returned by the loop
                             -> (e -> Process m ())
                             -- ^ enqueue the input item and wait
                             -- while the queue is full if required
                             -- so that there were no hanging items
                             -> Process m c
                             -- ^ dequeue an item for the further processing
                             -> Processor m c (Either f b)
                             -- ^ process and then decide what values of type @f@
                             -- should be processed in the loop (this is a condition)
                             -> Processor m f d
                             -- ^ process in the loop and then return a value
                             -- of type @d@ to the queue again (this is a loop body)
                             -> Processor m a b
                             -- ^ the buffering processor
{-# INLINABLE queueProcessorLoopMerging #-}
queueProcessorLoopMerging merge enqueue dequeue =
  bufferProcessorLoop
  (\bs cs ->
    consumeStream enqueue $
    merge bs cs)
  (repeatProcess dequeue)

-- | Like 'queueProcessorLoopMerging' creates a queue processor with a loop when
-- some items can be processed and then added to the queue again. Only it sequentially 
-- merges two input streams of data: one stream that come from the external source and 
-- another stream of data returned by the loop. The first stream has a priority over 
-- the second one.
queueProcessorLoopSeq :: MonadDES m
                         => (a -> Process m ())
                         -- ^ enqueue the input item and wait
                         -- while the queue is full if required
                         -- so that there were no hanging items
                         -> Process m c
                         -- ^ dequeue an item for the further processing
                         -> Processor m c (Either e b)
                         -- ^ process and then decide what values of type @e@
                         -- should be processed in the loop (this is a condition)
                         -> Processor m e a
                         -- ^ process in the loop and then return a value
                         -- of type @a@ to the queue again (this is a loop body)
                         -> Processor m a b
                         -- ^ the buffering processor
{-# INLINABLE queueProcessorLoopSeq #-}
queueProcessorLoopSeq =
  queueProcessorLoopMerging mergeStreams

-- | Like 'queueProcessorLoopMerging' creates a queue processor with a loop when
-- some items can be processed and then added to the queue again. Only it runs two 
-- simultaneous processes to enqueue the input streams of data: one stream that come 
-- from the external source and another stream of data returned by the loop.
queueProcessorLoopParallel :: MonadDES m
                              => (a -> Process m ())
                              -- ^ enqueue the input item and wait
                              -- while the queue is full if required
                              -- so that there were no hanging items
                              -> Process m c
                              -- ^ dequeue an item for the further processing
                              -> Processor m c (Either e b)
                              -- ^ process and then decide what values of type @e@
                              -- should be processed in the loop (this is a condition)
                              -> Processor m e a
                              -- ^ process in the loop and then return a value
                              -- of type @a@ to the queue again (this is a loop body)
                              -> Processor m a b
                              -- ^ the buffering processor
{-# INLINABLE queueProcessorLoopParallel #-}
queueProcessorLoopParallel enqueue dequeue =
  bufferProcessorLoop
  (\bs cs ->
    do spawnProcess $
         consumeStream enqueue bs
       spawnProcess $
         consumeStream enqueue cs)
  (repeatProcess dequeue)

-- | This is a prefetch processor that requests for one more data item from 
-- the input in advance while the latest item is not yet fully processed in 
-- the chain of streams, usually by other processors.
--
-- You can think of this as the prefetched processor could place its latest 
-- data item in some temporary space for later use, which is very useful 
-- for modeling a sequence of separate and independent work places.
prefetchProcessor :: MonadDES m => Processor m a a
{-# INLINABLE prefetchProcessor #-}
prefetchProcessor = Processor prefetchStream

-- | Convert the specified signal transform, i.e. the channel, to a processor.
--
-- The processor may return data with delay as the values are requested by demand.
-- Consider using the 'arrivalSignal' function to provide with the information
-- about the time points at which the signal was actually triggered.
--
-- The point is that the 'Stream' used in the 'Processor' is requested outside, 
-- while the 'Signal' used in the 'Channel' is triggered inside. They are different by nature. 
-- The former is passive, while the latter is active.
--
-- The resulting processor may be a root of space leak as it uses an internal queue to store
-- the values received from the input signal. Consider using 'queuedChannelProcessor' that
-- allows specifying the bounded queue in case of need.
channelProcessor :: MonadDES m => Channel m a b -> Processor m a b
{-# INLINABLE channelProcessor #-}
channelProcessor f =
  Processor $ \xs ->
  Cons $
  do let composite =
           do sa <- streamSignal xs
              sb <- runChannel f sa
              signalStream sb
     (ys, h) <- liftEvent $
                runComposite composite mempty
     whenCancellingProcess $
       disposeEvent h
     runStream ys

-- | Convert the specified processor to a signal transform, i.e. the channel. 
--
-- The processor may return data with delay as the values are requested by demand.
-- Consider using the 'arrivalSignal' function to provide with the information
-- about the time points at which the signal was actually triggered.
--
-- The point is that the 'Stream' used in the 'Processor' is requested outside, 
-- while the 'Signal' used in 'Channel' is triggered inside. They are different by nature.
-- The former is passive, while the latter is active.
--
-- The resulting channel may be a root of space leak as it uses an internal queue to store
-- the values received from the input stream. Consider using 'queuedProcessorChannel' that
-- allows specifying the bounded queue in case of need.
processorChannel :: MonadDES m => Processor m a b -> Channel m a b
{-# INLINABLE processorChannel #-}
processorChannel (Processor f) =
  Channel $ \sa ->
  do xs <- signalStream sa
     let ys = f xs
     streamSignal ys

-- | Like 'channelProcessor' but allows specifying an arbitrary queue for storing the signal values,
-- for example, the bounded queue.
queuedChannelProcessor :: MonadDES m
                          => (b -> Event m ())
                          -- ^ enqueue
                          -> Process m b
                          -- ^ dequeue
                          -> Channel m a b
                          -- ^ the channel
                          -> Processor m a b
                          -- ^ the processor
{-# INLINABLE queuedChannelProcessor #-}
queuedChannelProcessor enqueue dequeue f =
  Processor $ \xs ->
  Cons $
  do let composite =
           do sa <- streamSignal xs
              sb <- runChannel f sa
              queuedSignalStream enqueue dequeue sb
     (ys, h) <- liftEvent $
                runComposite composite mempty
     whenCancellingProcess $
       disposeEvent h
     runStream ys

-- | Like 'processorChannel' but allows specifying an arbitrary queue for storing the signal values,
-- for example, the bounded queue.
queuedProcessorChannel :: MonadDES m =>
                          (a -> Event m ())
                          -- ^ enqueue
                          -> (Process m a)
                          -- ^ dequeue
                          -> Processor m a b
                          -- ^ the processor
                          -> Channel m a b
                          -- ^ the channel
{-# INLINABLE queuedProcessorChannel #-}
queuedProcessorChannel enqueue dequeue (Processor f) =
  Channel $ \sa ->
  do xs <- queuedSignalStream enqueue dequeue sa
     let ys = f xs
     streamSignal ys

-- | A processor that adds the information about the time points at which 
-- the original stream items were received by demand.
arrivalProcessor :: MonadDES m => Processor m a (Arrival a)
{-# INLINABLE arrivalProcessor #-}
arrivalProcessor = Processor arrivalStream

-- | A processor that delays the input stream by one step using the specified initial value.
delayProcessor :: MonadDES m => a -> Processor m a a
{-# INLINABLE delayProcessor #-}
delayProcessor a0 = Processor $ delayStream a0

-- | Removes one level of the computation, projecting its bound processor into the outer level.
joinProcessor :: MonadDES m => Process m (Processor m a b) -> Processor m a b
{-# INLINABLE joinProcessor #-}
joinProcessor m =
  Processor $ \xs ->
  Cons $
  do Processor f <- m
     runStream $ f xs

-- | Takes the next processor from the list after the current processor fails because of cancelling the underlying process.
failoverProcessor :: MonadDES m => [Processor m a b] -> Processor m a b
{-# INLINABLE failoverProcessor #-}
failoverProcessor ps =
  Processor $ \xs -> failoverStream [runProcessor p xs | p <- ps]

-- | Show the debug messages with the current simulation time.
traceProcessor :: MonadDES m
                  => Maybe String
                  -- ^ the request message
                  -> Maybe String
                  -- ^ the response message
                  -> Processor m a b
                  -- ^ a processor
                  -> Processor m a b
{-# INLINABLE traceProcessor #-}
traceProcessor request response (Processor f) =
  Processor $ traceStream request response . f