-- |
-- Module     : Simulation.Aivika.Stream
-- Copyright  : Copyright (c) 2009-2017, David Sorokin <david.sorokin@gmail.com>
-- License    : BSD3
-- Maintainer : David Sorokin <david.sorokin@gmail.com>
-- Stability  : experimental
-- Tested with: GHC 8.0.1
--
-- The infinite stream of data in time.
--
module Simulation.Aivika.Stream
       (-- * Stream Type
        Stream(..),
        -- * Merging and Splitting Stream
        emptyStream,
        mergeStreams,
        mergeQueuedStreams,
        mergePriorityStreams,
        concatStreams,
        concatQueuedStreams,
        concatPriorityStreams,
        splitStream,
        splitStreamQueueing,
        splitStreamPrioritising,
        splitStreamFiltering,
        splitStreamFilteringQueueing,
        -- * Specifying Identifier
        streamUsingId,
        -- * Prefetching and Delaying Stream
        prefetchStream,
        delayStream,
        -- * Stream Arriving
        arrivalStream,
        -- * Memoizing, Zipping and Uzipping Stream
        memoStream,
        zipStreamSeq,
        zipStreamParallel,
        zip3StreamSeq,
        zip3StreamParallel,
        unzipStream,
        streamSeq,
        streamParallel,
        -- * Consuming and Sinking Stream
        consumeStream,
        sinkStream,
        -- * Useful Combinators
        repeatProcess,
        mapStream,
        mapStreamM,
        accumStream,
        apStream,
        apStreamM,
        filterStream,
        filterStreamM,
        takeStream,
        takeStreamWhile,
        takeStreamWhileM,
        dropStream,
        dropStreamWhile,
        dropStreamWhileM,
        singletonStream,
        joinStream,
        -- * Failover
        failoverStream,
        -- * Integrating with Signals
        signalStream,
        streamSignal,
        queuedSignalStream,
        -- * Utilities
        leftStream,
        rightStream,
        replaceLeftStream,
        replaceRightStream,
        partitionEitherStream,
        -- * Assemblying Streams
        cloneStream,
        firstArrivalStream,
        lastArrivalStream,
        assembleAccumStream,
        -- * Debugging
        traceStream) where

import Data.IORef
import Data.Maybe
import Data.Monoid hiding ((<>))
import Data.Semigroup (Semigroup(..))
import Data.List.NonEmpty (NonEmpty((:|)))

import Control.Applicative
import Control.Monad
import Control.Monad.Trans

import Simulation.Aivika.Simulation
import Simulation.Aivika.Dynamics
import Simulation.Aivika.Event
import Simulation.Aivika.Composite
import Simulation.Aivika.Cont
import Simulation.Aivika.Process
import Simulation.Aivika.Signal
import Simulation.Aivika.Resource.Base
import Simulation.Aivika.QueueStrategy
import qualified Simulation.Aivika.Queue.Infinite.Base as IQ
import Simulation.Aivika.Internal.Arrival

-- | Represents an infinite stream of data in time,
-- some kind of the cons cell.
newtype Stream a = Cons { forall a. Stream a -> Process (a, Stream a)
runStream :: Process (a, Stream a)
                          -- ^ Run the stream.
                        }

instance Functor Stream where
  
  fmap :: forall a b. (a -> b) -> Stream a -> Stream b
fmap = forall a b. (a -> b) -> Stream a -> Stream b
mapStream

instance Applicative Stream where

  pure :: forall a. a -> Stream a
pure a
a = let y :: Stream a
y = forall a. Process (a, Stream a) -> Stream a
Cons (forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
y)) in Stream a
y
  
  <*> :: forall a b. Stream (a -> b) -> Stream a -> Stream b
(<*>) = forall a b. Stream (a -> b) -> Stream a -> Stream b
apStream

instance Alternative Stream where

  empty :: forall a. Stream a
empty = forall a. Stream a
emptyStream

  <|> :: forall a. Stream a -> Stream a -> Stream a
(<|>) = forall a. Stream a -> Stream a -> Stream a
mergeStreams

instance Semigroup (Stream a) where

  <> :: Stream a -> Stream a -> Stream a
(<>) = forall a. Stream a -> Stream a -> Stream a
mergeStreams

  sconcat :: NonEmpty (Stream a) -> Stream a
sconcat (Stream a
h :| [Stream a]
t) = forall a. [Stream a] -> Stream a
concatStreams (Stream a
h forall a. a -> [a] -> [a]
: [Stream a]
t)

instance Monoid (Stream a) where

  mempty :: Stream a
mempty  = forall a. Stream a
emptyStream

  mappend :: Stream a -> Stream a -> Stream a
mappend = forall a. Semigroup a => a -> a -> a
(<>)

  mconcat :: [Stream a] -> Stream a
mconcat = forall a. [Stream a] -> Stream a
concatStreams

-- | Create a stream that will use the specified process identifier.
-- It can be useful to refer to the underlying 'Process' computation which
-- can be passivated, interrupted, canceled and so on. See also the
-- 'processUsingId' function for more details.
streamUsingId :: ProcessId -> Stream a -> Stream a
streamUsingId :: forall a. ProcessId -> Stream a -> Stream a
streamUsingId ProcessId
pid (Cons Process (a, Stream a)
s) =
  forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ forall a. ProcessId -> Process a -> Process a
processUsingId ProcessId
pid Process (a, Stream a)
s

-- | Memoize the stream so that it would always return the same data
-- within the simulation run.
memoStream :: Stream a -> Simulation (Stream a)
memoStream :: forall a. Stream a -> Simulation (Stream a)
memoStream (Cons Process (a, Stream a)
s) =
  do Process (a, Stream a)
p <- forall a. Process a -> Simulation (Process a)
memoProcess forall a b. (a -> b) -> a -> b
$
          do ~(a
x, Stream a
xs) <- Process (a, Stream a)
s
             Stream a
xs' <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall a. Stream a -> Simulation (Stream a)
memoStream Stream a
xs
             forall (m :: * -> *) a. Monad m => a -> m a
return (a
x, Stream a
xs')
     forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
p)

-- | Zip two streams trying to get data sequentially.
zipStreamSeq :: Stream a -> Stream b -> Stream (a, b)
zipStreamSeq :: forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamSeq (Cons Process (a, Stream a)
sa) (Cons Process (b, Stream b)
sb) = forall a. Process (a, Stream a) -> Stream a
Cons Process ((a, b), Stream (a, b))
y where
  y :: Process ((a, b), Stream (a, b))
y = do ~(a
x, Stream a
xs) <- Process (a, Stream a)
sa
         ~(b
y, Stream b
ys) <- Process (b, Stream b)
sb
         forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y), forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamSeq Stream a
xs Stream b
ys)

-- | Zip two streams trying to get data as soon as possible,
-- launching the sub-processes in parallel.
zipStreamParallel :: Stream a -> Stream b -> Stream (a, b)
zipStreamParallel :: forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamParallel (Cons Process (a, Stream a)
sa) (Cons Process (b, Stream b)
sb) = forall a. Process (a, Stream a) -> Stream a
Cons Process ((a, b), Stream (a, b))
y where
  y :: Process ((a, b), Stream (a, b))
y = do ~((a
x, Stream a
xs), (b
y, Stream b
ys)) <- forall a b. Process a -> Process b -> Process (a, b)
zipProcessParallel Process (a, Stream a)
sa Process (b, Stream b)
sb
         forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y), forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamParallel Stream a
xs Stream b
ys)

-- | Zip three streams trying to get data sequentially.
zip3StreamSeq :: Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamSeq :: forall a b c. Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamSeq (Cons Process (a, Stream a)
sa) (Cons Process (b, Stream b)
sb) (Cons Process (c, Stream c)
sc) = forall a. Process (a, Stream a) -> Stream a
Cons Process ((a, b, c), Stream (a, b, c))
y where
  y :: Process ((a, b, c), Stream (a, b, c))
y = do ~(a
x, Stream a
xs) <- Process (a, Stream a)
sa
         ~(b
y, Stream b
ys) <- Process (b, Stream b)
sb
         ~(c
z, Stream c
zs) <- Process (c, Stream c)
sc
         forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y, c
z), forall a b c. Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamSeq Stream a
xs Stream b
ys Stream c
zs)

-- | Zip three streams trying to get data as soon as possible,
-- launching the sub-processes in parallel.
zip3StreamParallel :: Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamParallel :: forall a b c. Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamParallel (Cons Process (a, Stream a)
sa) (Cons Process (b, Stream b)
sb) (Cons Process (c, Stream c)
sc) = forall a. Process (a, Stream a) -> Stream a
Cons Process ((a, b, c), Stream (a, b, c))
y where
  y :: Process ((a, b, c), Stream (a, b, c))
y = do ~((a
x, Stream a
xs), (b
y, Stream b
ys), (c
z, Stream c
zs)) <- forall a b c.
Process a -> Process b -> Process c -> Process (a, b, c)
zip3ProcessParallel Process (a, Stream a)
sa Process (b, Stream b)
sb Process (c, Stream c)
sc
         forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y, c
z), forall a b c. Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamParallel Stream a
xs Stream b
ys Stream c
zs)

-- | Unzip the stream.
unzipStream :: Stream (a, b) -> Simulation (Stream a, Stream b)
unzipStream :: forall a b. Stream (a, b) -> Simulation (Stream a, Stream b)
unzipStream Stream (a, b)
s =
  do Stream (a, b)
s' <- forall a. Stream a -> Simulation (Stream a)
memoStream Stream (a, b)
s
     let sa :: Stream a
sa = forall a b. (a -> b) -> Stream a -> Stream b
mapStream forall a b. (a, b) -> a
fst Stream (a, b)
s'
         sb :: Stream b
sb = forall a b. (a -> b) -> Stream a -> Stream b
mapStream forall a b. (a, b) -> b
snd Stream (a, b)
s'
     forall (m :: * -> *) a. Monad m => a -> m a
return (Stream a
sa, Stream b
sb)

-- | To form each new portion of data for the output stream,
-- read data sequentially from the input streams.
--
-- This is a generalization of 'zipStreamSeq'.
streamSeq :: [Stream a] -> Stream [a]
streamSeq :: forall a. [Stream a] -> Stream [a]
streamSeq [Stream a]
xs = forall a. Process (a, Stream a) -> Stream a
Cons Process ([a], Stream [a])
y where
  y :: Process ([a], Stream [a])
y = do [(a, Stream a)]
ps <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Stream a]
xs forall a. Stream a -> Process (a, Stream a)
runStream
         forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst [(a, Stream a)]
ps, forall a. [Stream a] -> Stream [a]
streamSeq forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> b
snd [(a, Stream a)]
ps)

-- | To form each new portion of data for the output stream,
-- read data from the input streams in parallel.
--
-- This is a generalization of 'zipStreamParallel'.
streamParallel :: [Stream a] -> Stream [a]
streamParallel :: forall a. [Stream a] -> Stream [a]
streamParallel [Stream a]
xs = forall a. Process (a, Stream a) -> Stream a
Cons Process ([a], Stream [a])
y where
  y :: Process ([a], Stream [a])
y = do [(a, Stream a)]
ps <- forall a. [Process a] -> Process [a]
processParallel forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map forall a. Stream a -> Process (a, Stream a)
runStream [Stream a]
xs
         forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst [(a, Stream a)]
ps, forall a. [Stream a] -> Stream [a]
streamParallel forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> b
snd [(a, Stream a)]
ps)

-- | Return a stream of values generated by the specified process.
repeatProcess :: Process a -> Stream a
repeatProcess :: forall a. Process a -> Stream a
repeatProcess Process a
p = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
y where
  y :: Process (a, Stream a)
y = do a
a <- Process a
p
         forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. Process a -> Stream a
repeatProcess Process a
p)

-- | Map the stream according the specified function.
mapStream :: (a -> b) -> Stream a -> Stream b
mapStream :: forall a b. (a -> b) -> Stream a -> Stream b
mapStream a -> b
f (Cons Process (a, Stream a)
s) = forall a. Process (a, Stream a) -> Stream a
Cons Process (b, Stream b)
y where
  y :: Process (b, Stream b)
y = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
         forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b
f a
a, forall a b. (a -> b) -> Stream a -> Stream b
mapStream a -> b
f Stream a
xs)

-- | Compose the stream.
mapStreamM :: (a -> Process b) -> Stream a -> Stream b
mapStreamM :: forall a b. (a -> Process b) -> Stream a -> Stream b
mapStreamM a -> Process b
f (Cons Process (a, Stream a)
s) = forall a. Process (a, Stream a) -> Stream a
Cons Process (b, Stream b)
y where
  y :: Process (b, Stream b)
y = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
         b
b <- a -> Process b
f a
a
         forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, forall a b. (a -> Process b) -> Stream a -> Stream b
mapStreamM a -> Process b
f Stream a
xs)

-- | Accumulator that outputs a value determined by the supplied function.
accumStream :: (acc -> a -> Process (acc, b)) -> acc -> Stream a -> Stream b
accumStream :: forall acc a b.
(acc -> a -> Process (acc, b)) -> acc -> Stream a -> Stream b
accumStream acc -> a -> Process (acc, b)
f acc
acc Stream a
xs = forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ Stream a -> acc -> Process (b, Stream b)
loop Stream a
xs acc
acc where
  loop :: Stream a -> acc -> Process (b, Stream b)
loop (Cons Process (a, Stream a)
s) acc
acc =
    do (a
a, Stream a
xs) <- Process (a, Stream a)
s
       (acc
acc', b
b) <- acc -> a -> Process (acc, b)
f acc
acc a
a
       forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ Stream a -> acc -> Process (b, Stream b)
loop Stream a
xs acc
acc') 

-- | Sequential application.
apStream :: Stream (a -> b) -> Stream a -> Stream b
apStream :: forall a b. Stream (a -> b) -> Stream a -> Stream b
apStream (Cons Process (a -> b, Stream (a -> b))
sf) (Cons Process (a, Stream a)
sa) = forall a. Process (a, Stream a) -> Stream a
Cons Process (b, Stream b)
y where
  y :: Process (b, Stream b)
y = do (a -> b
f, Stream (a -> b)
sf') <- Process (a -> b, Stream (a -> b))
sf
         (a
a, Stream a
sa') <- Process (a, Stream a)
sa
         forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b
f a
a, forall a b. Stream (a -> b) -> Stream a -> Stream b
apStream Stream (a -> b)
sf' Stream a
sa')

-- | Sequential application.
apStreamM :: Stream (a -> Process b) -> Stream a -> Stream b
apStreamM :: forall a b. Stream (a -> Process b) -> Stream a -> Stream b
apStreamM (Cons Process (a -> Process b, Stream (a -> Process b))
sf) (Cons Process (a, Stream a)
sa) = forall a. Process (a, Stream a) -> Stream a
Cons Process (b, Stream b)
y where
  y :: Process (b, Stream b)
y = do (a -> Process b
f, Stream (a -> Process b)
sf') <- Process (a -> Process b, Stream (a -> Process b))
sf
         (a
a, Stream a
sa') <- Process (a, Stream a)
sa
         b
x <- a -> Process b
f a
a
         forall (m :: * -> *) a. Monad m => a -> m a
return (b
x, forall a b. Stream (a -> Process b) -> Stream a -> Stream b
apStreamM Stream (a -> Process b)
sf' Stream a
sa')

-- | Filter only those data values that satisfy to the specified predicate.
filterStream :: (a -> Bool) -> Stream a -> Stream a
filterStream :: forall a. (a -> Bool) -> Stream a -> Stream a
filterStream a -> Bool
p (Cons Process (a, Stream a)
s) = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
y where
  y :: Process (a, Stream a)
y = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
         if a -> Bool
p a
a
           then forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. (a -> Bool) -> Stream a -> Stream a
filterStream a -> Bool
p Stream a
xs)
           else let Cons Process (a, Stream a)
z = forall a. (a -> Bool) -> Stream a -> Stream a
filterStream a -> Bool
p Stream a
xs in Process (a, Stream a)
z

-- | Filter only those data values that satisfy to the specified predicate.
filterStreamM :: (a -> Process Bool) -> Stream a -> Stream a
filterStreamM :: forall a. (a -> Process Bool) -> Stream a -> Stream a
filterStreamM a -> Process Bool
p (Cons Process (a, Stream a)
s) = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
y where
  y :: Process (a, Stream a)
y = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
         Bool
b <- a -> Process Bool
p a
a
         if Bool
b
           then forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. (a -> Process Bool) -> Stream a -> Stream a
filterStreamM a -> Process Bool
p Stream a
xs)
           else let Cons Process (a, Stream a)
z = forall a. (a -> Process Bool) -> Stream a -> Stream a
filterStreamM a -> Process Bool
p Stream a
xs in Process (a, Stream a)
z

-- | The stream of 'Left' values.
leftStream :: Stream (Either a b) -> Stream a
leftStream :: forall a b. Stream (Either a b) -> Stream a
leftStream (Cons Process (Either a b, Stream (Either a b))
s) = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
y where
  y :: Process (a, Stream a)
y = do (Either a b
a, Stream (Either a b)
xs) <- Process (Either a b, Stream (Either a b))
s
         case Either a b
a of
           Left a
a  -> forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a b. Stream (Either a b) -> Stream a
leftStream Stream (Either a b)
xs)
           Right b
_ -> let Cons Process (a, Stream a)
z = forall a b. Stream (Either a b) -> Stream a
leftStream Stream (Either a b)
xs in Process (a, Stream a)
z

-- | The stream of 'Right' values.
rightStream :: Stream (Either a b) -> Stream b
rightStream :: forall a b. Stream (Either a b) -> Stream b
rightStream (Cons Process (Either a b, Stream (Either a b))
s) = forall a. Process (a, Stream a) -> Stream a
Cons Process (b, Stream b)
y where
  y :: Process (b, Stream b)
y = do (Either a b
a, Stream (Either a b)
xs) <- Process (Either a b, Stream (Either a b))
s
         case Either a b
a of
           Left a
_  -> let Cons Process (b, Stream b)
z = forall a b. Stream (Either a b) -> Stream b
rightStream Stream (Either a b)
xs in Process (b, Stream b)
z
           Right b
a -> forall (m :: * -> *) a. Monad m => a -> m a
return (b
a, forall a b. Stream (Either a b) -> Stream b
rightStream Stream (Either a b)
xs)

-- | Replace the 'Left' values.
replaceLeftStream :: Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream :: forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream (Cons Process (Either a b, Stream (Either a b))
sab) (ys0 :: Stream c
ys0@(Cons Process (c, Stream c)
sc)) = forall a. Process (a, Stream a) -> Stream a
Cons Process (Either c b, Stream (Either c b))
z where
  z :: Process (Either c b, Stream (Either c b))
z = do (Either a b
a, Stream (Either a b)
xs) <- Process (Either a b, Stream (Either a b))
sab
         case Either a b
a of
           Left a
_ ->
             do (c
b, Stream c
ys) <- Process (c, Stream c)
sc
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left c
b, forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream Stream (Either a b)
xs Stream c
ys)
           Right b
a ->
             forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
a, forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream Stream (Either a b)
xs Stream c
ys0)

-- | Replace the 'Right' values.
replaceRightStream :: Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream :: forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream (Cons Process (Either a b, Stream (Either a b))
sab) (ys0 :: Stream c
ys0@(Cons Process (c, Stream c)
sc)) = forall a. Process (a, Stream a) -> Stream a
Cons Process (Either a c, Stream (Either a c))
z where
  z :: Process (Either a c, Stream (Either a c))
z = do (Either a b
a, Stream (Either a b)
xs) <- Process (Either a b, Stream (Either a b))
sab
         case Either a b
a of
           Right b
_ ->
             do (c
b, Stream c
ys) <- Process (c, Stream c)
sc
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right c
b, forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream Stream (Either a b)
xs Stream c
ys)
           Left a
a ->
             forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left a
a, forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream Stream (Either a b)
xs Stream c
ys0)

-- | Partition the stream of 'Either' values into two streams.
partitionEitherStream :: Stream (Either a b) -> Simulation (Stream a, Stream b)
partitionEitherStream :: forall a b. Stream (Either a b) -> Simulation (Stream a, Stream b)
partitionEitherStream Stream (Either a b)
s =
  do Stream (Either a b)
s' <- forall a. Stream a -> Simulation (Stream a)
memoStream Stream (Either a b)
s
     forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. Stream (Either a b) -> Stream a
leftStream Stream (Either a b)
s', forall a b. Stream (Either a b) -> Stream b
rightStream Stream (Either a b)
s')

-- | Split the input stream into the specified number of output streams
-- after applying the 'FCFS' strategy for enqueuing the output requests.
splitStream :: Int -> Stream a -> Simulation [Stream a]
splitStream :: forall a. Int -> Stream a -> Simulation [Stream a]
splitStream = forall s a.
EnqueueStrategy s =>
s -> Int -> Stream a -> Simulation [Stream a]
splitStreamQueueing FCFS
FCFS

-- | Split the input stream into the specified number of output streams.
--
-- If you don't know what the strategy to apply, then you probably
-- need the 'FCFS' strategy, or function 'splitStream' that
-- does namely this.
splitStreamQueueing :: EnqueueStrategy s
                       => s
                       -- ^ the strategy applied for enqueuing the output requests
                       -> Int
                       -- ^ the number of output streams
                       -> Stream a
                       -- ^ the input stream
                       -> Simulation [Stream a]
                       -- ^ the splitted output streams
splitStreamQueueing :: forall s a.
EnqueueStrategy s =>
s -> Int -> Stream a -> Simulation [Stream a]
splitStreamQueueing s
s Int
n Stream a
x =
  do IORef (Stream a)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef Stream a
x
     Resource s
res <- forall s. QueueStrategy s => s -> Int -> Simulation (Resource s)
newResource s
s Int
1
     let reader :: Process a
reader =
           forall s a.
EnqueueStrategy s =>
Resource s -> Process a -> Process a
usingResource Resource s
res forall a b. (a -> b) -> a -> b
$
           do Stream a
p <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Stream a)
ref
              (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p
              forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Stream a)
ref Stream a
xs
              forall (m :: * -> *) a. Monad m => a -> m a
return a
a
     forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map (\Int
i -> forall a. Process a -> Stream a
repeatProcess Process a
reader) [Int
1..Int
n]

-- | Split the input stream into a list of output streams
-- using the specified priorities.
splitStreamPrioritising :: PriorityQueueStrategy s p
                           => s
                           -- ^ the strategy applied for enqueuing the output requests
                           -> [Stream p]
                           -- ^ the streams of priorities
                           -> Stream a
                           -- ^ the input stream
                           -> Simulation [Stream a]
                           -- ^ the splitted output streams
splitStreamPrioritising :: forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream p] -> Stream a -> Simulation [Stream a]
splitStreamPrioritising s
s [Stream p]
ps Stream a
x =
  do IORef (Stream a)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef Stream a
x
     Resource s
res <- forall s. QueueStrategy s => s -> Int -> Simulation (Resource s)
newResource s
s Int
1
     let stream :: Stream a -> Stream a
stream (Cons Process (a, Stream a)
p) = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
z where
           z :: Process (a, Stream a)
z = do (a
p', Stream a
ps) <- Process (a, Stream a)
p
                  a
a <- forall s p a.
PriorityQueueStrategy s p =>
Resource s -> p -> Process a -> Process a
usingResourceWithPriority Resource s
res a
p' forall a b. (a -> b) -> a -> b
$
                       do Stream a
p <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Stream a)
ref
                          (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p
                          forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Stream a)
ref Stream a
xs
                          forall (m :: * -> *) a. Monad m => a -> m a
return a
a
                  forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a -> Stream a
stream Stream a
ps)
     forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map forall {a}. PriorityQueueStrategy s a => Stream a -> Stream a
stream [Stream p]
ps

-- | Split the input stream into the specified number of output streams
-- after filtering and applying the 'FCFS' strategy for enqueuing the output requests.
splitStreamFiltering :: [a -> Event Bool] -> Stream a -> Simulation [Stream a]
splitStreamFiltering :: forall a. [a -> Event Bool] -> Stream a -> Simulation [Stream a]
splitStreamFiltering = forall s a.
EnqueueStrategy s =>
s -> [a -> Event Bool] -> Stream a -> Simulation [Stream a]
splitStreamFilteringQueueing FCFS
FCFS

-- | Split the input stream into the specified number of output streams after filtering.
--
-- If you don't know what the strategy to apply, then you probably
-- need the 'FCFS' strategy, or function 'splitStreamFiltering' that
-- does namely this.
splitStreamFilteringQueueing :: EnqueueStrategy s
                                => s
                                -- ^ the strategy applied for enqueuing the output requests
                                -> [a -> Event Bool]
                                -- ^ the filters for output streams
                                -> Stream a
                                -- ^ the input stream
                                -> Simulation [Stream a]
                                -- ^ the splitted output streams
splitStreamFilteringQueueing :: forall s a.
EnqueueStrategy s =>
s -> [a -> Event Bool] -> Stream a -> Simulation [Stream a]
splitStreamFilteringQueueing s
s [a -> Event Bool]
preds Stream a
x =
  do IORef (Stream a)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef Stream a
x
     Resource s
res <- forall s. QueueStrategy s => s -> Int -> Simulation (Resource s)
newResource s
s Int
1
     let reader :: (a -> Event Bool) -> Process a
reader a -> Event Bool
pred =
           do Maybe a
a <-
                forall s a.
EnqueueStrategy s =>
Resource s -> Process a -> Process a
usingResource Resource s
res forall a b. (a -> b) -> a -> b
$
                do Stream a
p <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Stream a)
ref
                   (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p
                   forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
                     do Bool
f <- a -> Event Bool
pred a
a
                        if Bool
f
                          then do forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Stream a)
ref Stream a
xs
                                  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
a
                          else do forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Stream a)
ref forall a b. (a -> b) -> a -> b
$ forall a. Process (a, Stream a) -> Stream a
Cons (forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
xs))
                                  forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
              case Maybe a
a of
                Just a
a  -> forall (m :: * -> *) a. Monad m => a -> m a
return a
a
                Maybe a
Nothing -> (a -> Event Bool) -> Process a
reader a -> Event Bool
pred
     forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map (forall a. Process a -> Stream a
repeatProcess forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Event Bool) -> Process a
reader) [a -> Event Bool]
preds

-- | Concatenate the input streams applying the 'FCFS' strategy and
-- producing one output stream.
concatStreams :: [Stream a] -> Stream a
concatStreams :: forall a. [Stream a] -> Stream a
concatStreams = forall s a. EnqueueStrategy s => s -> [Stream a] -> Stream a
concatQueuedStreams FCFS
FCFS

-- | Concatenate the input streams producing one output stream.
--
-- If you don't know what the strategy to apply, then you probably
-- need the 'FCFS' strategy, or function 'concatStreams' that
-- does namely this.
concatQueuedStreams :: EnqueueStrategy s
                       => s
                       -- ^ the strategy applied for enqueuing the input data
                       -> [Stream a]
                       -- ^ the input stream
                       -> Stream a
                       -- ^ the combined output stream
concatQueuedStreams :: forall s a. EnqueueStrategy s => s -> [Stream a] -> Stream a
concatQueuedStreams s
s [Stream a]
streams = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
z where
  z :: Process (a, Stream a)
z = do Resource FCFS
reading <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
         Resource s
writing <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount s
s Int
1 (forall a. a -> Maybe a
Just Int
1)
         Resource FCFS
conting <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
         IORef (Maybe a)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing
         let writer :: Stream a -> Process b
writer Stream a
p =
               do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p
                  forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource s
writing
                  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref (forall a. a -> Maybe a
Just a
a)
                  forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
reading
                  forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
conting
                  Stream a -> Process b
writer Stream a
xs
             reader :: Process a
reader =
               do forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
reading
                  Just a
a <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref
                  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref forall a. Maybe a
Nothing
                  forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource s
writing
                  forall (m :: * -> *) a. Monad m => a -> m a
return a
a
         forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Stream a]
streams forall a b. (a -> b) -> a -> b
$ Process () -> Process ()
spawnProcess forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall {b}. Stream a -> Process b
writer
         a
a <- Process a
reader
         let xs :: Stream a
xs = forall a. Process a -> Stream a
repeatProcess (forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
conting forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process a
reader)
         forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
xs)

-- | Concatenate the input priority streams producing one output stream.
concatPriorityStreams :: PriorityQueueStrategy s p
                         => s
                         -- ^ the strategy applied for enqueuing the input data
                         -> [Stream (p, a)]
                         -- ^ the input stream
                         -> Stream a
                         -- ^ the combined output stream
concatPriorityStreams :: forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream (p, a)] -> Stream a
concatPriorityStreams s
s [Stream (p, a)]
streams = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
z where
  z :: Process (a, Stream a)
z = do Resource FCFS
reading <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
         Resource s
writing <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount s
s Int
1 (forall a. a -> Maybe a
Just Int
1)
         Resource FCFS
conting <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
         IORef (Maybe a)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing
         let writer :: Stream (p, a) -> Process b
writer Stream (p, a)
p =
               do ((p
priority, a
a), Stream (p, a)
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream (p, a)
p
                  forall s p.
PriorityQueueStrategy s p =>
Resource s -> p -> Process ()
requestResourceWithPriority Resource s
writing p
priority
                  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref (forall a. a -> Maybe a
Just a
a)
                  forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
reading
                  forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
conting
                  Stream (p, a) -> Process b
writer Stream (p, a)
xs
             reader :: Process a
reader =
               do forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
reading
                  Just a
a <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref
                  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref forall a. Maybe a
Nothing
                  forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource s
writing
                  forall (m :: * -> *) a. Monad m => a -> m a
return a
a
         forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Stream (p, a)]
streams forall a b. (a -> b) -> a -> b
$ Process () -> Process ()
spawnProcess forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall {p} {b}.
PriorityQueueStrategy s p =>
Stream (p, a) -> Process b
writer
         a
a <- Process a
reader
         let xs :: Stream a
xs = forall a. Process a -> Stream a
repeatProcess (forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
conting forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process a
reader)
         forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
xs)

-- | Merge two streams applying the 'FCFS' strategy for enqueuing the input data.
mergeStreams :: Stream a -> Stream a -> Stream a
mergeStreams :: forall a. Stream a -> Stream a -> Stream a
mergeStreams = forall s a.
EnqueueStrategy s =>
s -> Stream a -> Stream a -> Stream a
mergeQueuedStreams FCFS
FCFS

-- | Merge two streams.
--
-- If you don't know what the strategy to apply, then you probably
-- need the 'FCFS' strategy, or function 'mergeStreams' that
-- does namely this.
mergeQueuedStreams :: EnqueueStrategy s
                      => s
                      -- ^ the strategy applied for enqueuing the input data
                      -> Stream a
                      -- ^ the fist input stream
                      -> Stream a
                      -- ^ the second input stream
                      -> Stream a
                      -- ^ the output combined stream
mergeQueuedStreams :: forall s a.
EnqueueStrategy s =>
s -> Stream a -> Stream a -> Stream a
mergeQueuedStreams s
s Stream a
x Stream a
y = forall s a. EnqueueStrategy s => s -> [Stream a] -> Stream a
concatQueuedStreams s
s [Stream a
x, Stream a
y]

-- | Merge two priority streams.
mergePriorityStreams :: PriorityQueueStrategy s p
                        => s
                        -- ^ the strategy applied for enqueuing the input data
                        -> Stream (p, a)
                        -- ^ the fist input stream
                        -> Stream (p, a)
                        -- ^ the second input stream
                        -> Stream a
                        -- ^ the output combined stream
mergePriorityStreams :: forall s p a.
PriorityQueueStrategy s p =>
s -> Stream (p, a) -> Stream (p, a) -> Stream a
mergePriorityStreams s
s Stream (p, a)
x Stream (p, a)
y = forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream (p, a)] -> Stream a
concatPriorityStreams s
s [Stream (p, a)
x, Stream (p, a)
y]

-- | An empty stream that never returns data.
emptyStream :: Stream a
emptyStream :: forall a. Stream a
emptyStream = forall a. Process (a, Stream a) -> Stream a
Cons forall a. Process a
neverProcess

-- | Consume the stream. It returns a process that infinitely reads data
-- from the stream and then redirects them to the provided function.
-- It is useful for modeling the process of enqueueing data in the queue
-- from the input stream.
consumeStream :: (a -> Process ()) -> Stream a -> Process ()
consumeStream :: forall a. (a -> Process ()) -> Stream a -> Process ()
consumeStream a -> Process ()
f = forall {b}. Stream a -> Process b
p where
  p :: Stream a -> Process b
p (Cons Process (a, Stream a)
s) = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
                  a -> Process ()
f a
a
                  Stream a -> Process b
p Stream a
xs

-- | Sink the stream. It returns a process that infinitely reads data
-- from the stream. The resulting computation can be a moving force
-- to simulate the whole system of the interconnected streams and
-- processors.
sinkStream :: Stream a -> Process ()
sinkStream :: forall a. Stream a -> Process ()
sinkStream = forall {a} {b}. Stream a -> Process b
p where
  p :: Stream a -> Process b
p (Cons Process (a, Stream a)
s) = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
                  Stream a -> Process b
p Stream a
xs
  
-- | Prefetch the input stream requesting for one more data item in advance 
-- while the last received item is not yet fully processed in the chain of 
-- streams, usually by the processors.
--
-- You can think of this as the prefetched stream could place its latest 
-- data item in some temporary space for later use, which is very useful 
-- for modeling a sequence of separate and independent work places.
prefetchStream :: Stream a -> Stream a
prefetchStream :: forall a. Stream a -> Stream a
prefetchStream Stream a
s = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
z where
  z :: Process (a, Stream a)
z = do Resource FCFS
reading <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
         Resource FCFS
writing <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
1 (forall a. a -> Maybe a
Just Int
1)
         IORef (Maybe a)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing
         let writer :: Stream a -> Process b
writer Stream a
p =
               do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p
                  forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
writing
                  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref (forall a. a -> Maybe a
Just a
a)
                  forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
reading
                  Stream a -> Process b
writer Stream a
xs
             reader :: Process a
reader =
               do forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
reading
                  Just a
a <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref
                  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref forall a. Maybe a
Nothing
                  forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
writing
                  forall (m :: * -> *) a. Monad m => a -> m a
return a
a
         Process () -> Process ()
spawnProcess forall a b. (a -> b) -> a -> b
$ forall {b}. Stream a -> Process b
writer Stream a
s
         forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a. Process a -> Stream a
repeatProcess Process a
reader

-- | Like 'signalStream' but allows specifying an arbitrary queue instead of the unbounded queue.
queuedSignalStream :: (a -> Event ())
                      -- ^ enqueue
                      -> Process a
                      -- ^ dequeue
                      -> Signal a
                      -- ^ the input signal
                      -> Composite (Stream a)
                      -- ^ the output stream
queuedSignalStream :: forall a.
(a -> Event ()) -> Process a -> Signal a -> Composite (Stream a)
queuedSignalStream a -> Event ()
enqueue Process a
dequeue Signal a
s =
  do DisposableEvent
h <- forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
          forall a. Signal a -> (a -> Event ()) -> Event DisposableEvent
handleSignal Signal a
s a -> Event ()
enqueue
     DisposableEvent -> Composite ()
disposableComposite DisposableEvent
h
     forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. Process a -> Stream a
repeatProcess Process a
dequeue

-- | Return a stream of values triggered by the specified signal.
--
-- Since the time at which the values of the stream are requested for may differ from
-- the time at which the signal is triggered, it can be useful to apply the 'arrivalSignal'
-- function to add the information about the time points at which the signal was 
-- actually received.
--
-- The point is that the 'Stream' is requested outside, while the 'Signal' is triggered
-- inside. They are different by nature. The former is passive, while the latter is active.
--
-- The resulting stream may be a root of space leak as it uses an internal unbounded queue to store
-- the values received from the signal. The oldest value is dequeued each time we request
-- the stream and it is returned within the computation. Consider using 'queuedSignalStream' that
-- allows specifying the bounded queue in case of need.
signalStream :: Signal a -> Composite (Stream a)
signalStream :: forall a. Signal a -> Composite (Stream a)
signalStream Signal a
s =
  do FCFSQueue a
q <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a. Simulation (FCFSQueue a)
IQ.newFCFSQueue
     forall a.
(a -> Event ()) -> Process a -> Signal a -> Composite (Stream a)
queuedSignalStream (forall sm so a.
(EnqueueStrategy sm, DequeueStrategy so) =>
Queue sm so a -> a -> Event ()
IQ.enqueue FCFSQueue a
q) (forall sm so a.
(DequeueStrategy sm, EnqueueStrategy so) =>
Queue sm so a -> Process a
IQ.dequeue FCFSQueue a
q) Signal a
s

-- | Return a computation of the disposable signal that triggers values from the specified stream,
-- each time the next value of the stream is received within the underlying 'Process' 
-- computation.
streamSignal :: Stream a -> Composite (Signal a)
streamSignal :: forall a. Stream a -> Composite (Signal a)
streamSignal Stream a
z =
  do SignalSource a
s <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a. Simulation (SignalSource a)
newSignalSource
     ProcessId
pid <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation Simulation ProcessId
newProcessId
     forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
       ProcessId -> Process () -> Event ()
runProcessUsingId ProcessId
pid forall a b. (a -> b) -> a -> b
$
       forall a. (a -> Process ()) -> Stream a -> Process ()
consumeStream (forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. SignalSource a -> a -> Event ()
triggerSignal SignalSource a
s) Stream a
z
     DisposableEvent -> Composite ()
disposableComposite forall a b. (a -> b) -> a -> b
$
       Event () -> DisposableEvent
DisposableEvent forall a b. (a -> b) -> a -> b
$
       ProcessId -> Event ()
cancelProcessWithId ProcessId
pid
     forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. SignalSource a -> Signal a
publishSignal SignalSource a
s
  
-- | Transform a stream so that the resulting stream returns a sequence of arrivals
-- saving the information about the time points at which the original stream items 
-- were received by demand.
arrivalStream :: Stream a -> Stream (Arrival a)
arrivalStream :: forall a. Stream a -> Stream (Arrival a)
arrivalStream Stream a
s = forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ forall {a}.
Stream a -> Maybe Double -> Process (Arrival a, Stream (Arrival a))
loop Stream a
s forall a. Maybe a
Nothing where
  loop :: Stream a -> Maybe Double -> Process (Arrival a, Stream (Arrival a))
loop Stream a
s Maybe Double
t0 = do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
                 Double
t <- forall (m :: * -> *) a. DynamicsLift m => Dynamics a -> m a
liftDynamics Dynamics Double
time
                 let b :: Arrival a
b = Arrival { arrivalValue :: a
arrivalValue = a
a,
                                   arrivalTime :: Double
arrivalTime  = Double
t,
                                   arrivalDelay :: Maybe Double
arrivalDelay =
                                     case Maybe Double
t0 of
                                       Maybe Double
Nothing -> forall a. Maybe a
Nothing
                                       Just Double
t0 -> forall a. a -> Maybe a
Just (Double
t forall a. Num a => a -> a -> a
- Double
t0) }
                 forall (m :: * -> *) a. Monad m => a -> m a
return (Arrival a
b, forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ Stream a -> Maybe Double -> Process (Arrival a, Stream (Arrival a))
loop Stream a
xs (forall a. a -> Maybe a
Just Double
t))

-- | Delay the stream by one step using the specified initial value.
delayStream :: a -> Stream a -> Stream a
delayStream :: forall a. a -> Stream a -> Stream a
delayStream a
a0 Stream a
s = forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return (a
a0, Stream a
s)

-- | Return a stream consisting of exactly one element and inifinite tail.
singletonStream :: a -> Stream a
singletonStream :: forall a. a -> Stream a
singletonStream a
a = forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. Stream a
emptyStream)

-- | Removes one level of the computation, projecting its bound stream into the outer level.
joinStream :: Process (Stream a) -> Stream a
joinStream :: forall a. Process (Stream a) -> Stream a
joinStream Process (Stream a)
m = forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ Process (Stream a)
m forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. Stream a -> Process (a, Stream a)
runStream

-- | Takes the next stream from the list after the current stream fails because of cancelling the underlying process.
failoverStream :: [Stream a] -> Stream a
failoverStream :: forall a. [Stream a] -> Stream a
failoverStream [Stream a]
ps = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
z where
  z :: Process (a, Stream a)
z = do Resource FCFS
reading <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
         Resource FCFS
writing <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
         IORef (Maybe a)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing
         ProcessId
pid <- Process ProcessId
processId
         let writer :: Stream a -> Process b
writer Stream a
p =
               do forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
writing
                  ProcessId
pid' <- Process ProcessId
processId
                  (a
a, Stream a
xs) <-
                    forall a b. Process a -> Process b -> Process a
finallyProcess (forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p) forall a b. (a -> b) -> a -> b
$
                    forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
                    do Bool
cancelled' <- ProcessId -> Event Bool
processCancelled ProcessId
pid'
                       forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cancelled' forall a b. (a -> b) -> a -> b
$
                         forall s. DequeueStrategy s => Resource s -> Event ()
releaseResourceWithinEvent Resource FCFS
writing
                  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref (forall a. a -> Maybe a
Just a
a)
                  forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
reading
                  Stream a -> Process b
writer Stream a
xs
             reader :: Process a
reader =
               do forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
writing
                  forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
reading
                  Just a
a <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref
                  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref forall a. Maybe a
Nothing
                  forall (m :: * -> *) a. Monad m => a -> m a
return a
a
             loop :: [Stream a] -> Process ()
loop [] = forall (m :: * -> *) a. Monad m => a -> m a
return ()
             loop (Stream a
p: [Stream a]
ps) =
               do ProcessId
pid' <- Process ProcessId
processId
                  DisposableEvent
h' <- forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
                        forall a. Signal a -> (a -> Event ()) -> Event DisposableEvent
handleSignal (ProcessId -> Signal ()
processCancelling ProcessId
pid) forall a b. (a -> b) -> a -> b
$ \() ->
                        ProcessId -> Event ()
cancelProcessWithId ProcessId
pid'
                  forall a b. Process a -> Process b -> Process a
finallyProcess (forall {b}. Stream a -> Process b
writer Stream a
p) forall a b. (a -> b) -> a -> b
$
                    forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
                    do DisposableEvent -> Event ()
disposeEvent DisposableEvent
h'
                       Bool
cancelled <- ProcessId -> Event Bool
processCancelled ProcessId
pid
                       forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
cancelled forall a b. (a -> b) -> a -> b
$
                         do Bool
cancelled' <- ProcessId -> Event Bool
processCancelled ProcessId
pid'
                            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
cancelled' forall a b. (a -> b) -> a -> b
$
                              forall a. HasCallStack => [Char] -> a
error [Char]
"Expected the sub-process to be cancelled: failoverStream"
                            Process () -> Event ()
runProcess forall a b. (a -> b) -> a -> b
$ [Stream a] -> Process ()
loop [Stream a]
ps
         forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$ Process () -> Event ()
runProcess forall a b. (a -> b) -> a -> b
$ [Stream a] -> Process ()
loop [Stream a]
ps
         forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a. Process a -> Stream a
repeatProcess Process a
reader

-- | Return the prefix of the stream of the specified length.
takeStream :: Int -> Stream a -> Stream a
takeStream :: forall a. Int -> Stream a -> Stream a
takeStream Int
n Stream a
s
  | Int
n forall a. Ord a => a -> a -> Bool
<= Int
0    = forall a. Stream a
emptyStream
  | Bool
otherwise =
    forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
    do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
       forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. Int -> Stream a -> Stream a
takeStream (Int
n forall a. Num a => a -> a -> a
- Int
1) Stream a
xs)

-- | Return the longest prefix of the stream of elements that satisfy the predicate.
takeStreamWhile :: (a -> Bool) -> Stream a -> Stream a
takeStreamWhile :: forall a. (a -> Bool) -> Stream a -> Stream a
takeStreamWhile a -> Bool
p Stream a
s =
  forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
  do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
     if a -> Bool
p a
a
       then forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. (a -> Bool) -> Stream a -> Stream a
takeStreamWhile a -> Bool
p Stream a
xs)
       else forall a. Process a
neverProcess

-- | Return the longest prefix of the stream of elements that satisfy the computation.
takeStreamWhileM :: (a -> Process Bool) -> Stream a -> Stream a
takeStreamWhileM :: forall a. (a -> Process Bool) -> Stream a -> Stream a
takeStreamWhileM a -> Process Bool
p Stream a
s =
  forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
  do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
     Bool
f <- a -> Process Bool
p a
a
     if Bool
f
       then forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. (a -> Process Bool) -> Stream a -> Stream a
takeStreamWhileM a -> Process Bool
p Stream a
xs)
       else forall a. Process a
neverProcess

-- | Return the suffix of the stream after the specified first elements.
dropStream :: Int -> Stream a -> Stream a
dropStream :: forall a. Int -> Stream a -> Stream a
dropStream Int
n Stream a
s
  | Int
n forall a. Ord a => a -> a -> Bool
<= Int
0    = Stream a
s
  | Bool
otherwise =
    forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
    do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
       forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a. Int -> Stream a -> Stream a
dropStream (Int
n forall a. Num a => a -> a -> a
- Int
1) Stream a
xs

-- | Return the suffix of the stream of elements remaining after 'takeStreamWhile'.
dropStreamWhile :: (a -> Bool) -> Stream a -> Stream a
dropStreamWhile :: forall a. (a -> Bool) -> Stream a -> Stream a
dropStreamWhile a -> Bool
p Stream a
s =
  forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
  do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
     if a -> Bool
p a
a
       then forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a. (a -> Bool) -> Stream a -> Stream a
dropStreamWhile a -> Bool
p Stream a
xs
       else forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
xs)

-- | Return the suffix of the stream of elements remaining after 'takeStreamWhileM'.
dropStreamWhileM :: (a -> Process Bool) -> Stream a -> Stream a
dropStreamWhileM :: forall a. (a -> Process Bool) -> Stream a -> Stream a
dropStreamWhileM a -> Process Bool
p Stream a
s =
  forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
  do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
     Bool
f <- a -> Process Bool
p a
a
     if Bool
f
       then forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a. (a -> Process Bool) -> Stream a -> Stream a
dropStreamWhileM a -> Process Bool
p Stream a
xs
       else forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
xs)

-- | Create the specified number of equivalent clones of the input stream.
cloneStream :: Int -> Stream a -> Simulation [Stream a]
cloneStream :: forall a. Int -> Stream a -> Simulation [Stream a]
cloneStream Int
n Stream a
s =
  do [FCFSQueue a]
qs  <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1..Int
n] forall a b. (a -> b) -> a -> b
$ \Int
i -> forall a. Simulation (FCFSQueue a)
IQ.newFCFSQueue
     Resource FCFS
rs  <- Int -> Simulation (Resource FCFS)
newFCFSResource Int
1
     IORef (Stream a)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef Stream a
s
     let reader :: a -> Queue sm so a -> Process a
reader a
m Queue sm so a
q =
           do Maybe a
a <- forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall sm so a.
DequeueStrategy sm =>
Queue sm so a -> Event (Maybe a)
IQ.tryDequeue Queue sm so a
q
              case Maybe a
a of
                Just a
a  -> forall (m :: * -> *) a. Monad m => a -> m a
return a
a
                Maybe a
Nothing ->
                  forall s a.
EnqueueStrategy s =>
Resource s -> Process a -> Process a
usingResource Resource FCFS
rs forall a b. (a -> b) -> a -> b
$
                  do Maybe a
a <- forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall sm so a.
DequeueStrategy sm =>
Queue sm so a -> Event (Maybe a)
IQ.tryDequeue Queue sm so a
q
                     case Maybe a
a of
                       Just a
a  -> forall (m :: * -> *) a. Monad m => a -> m a
return a
a
                       Maybe a
Nothing ->
                         do Stream a
s <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Stream a)
ref
                            (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
                            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Stream a)
ref Stream a
xs
                            forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (forall a b. [a] -> [b] -> [(a, b)]
zip [a
1..] [FCFSQueue a]
qs) forall a b. (a -> b) -> a -> b
$ \(a
i, FCFSQueue a
q) ->
                              forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (a
i forall a. Eq a => a -> a -> Bool
== a
m) forall a b. (a -> b) -> a -> b
$
                              forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall sm so a.
(EnqueueStrategy sm, DequeueStrategy so) =>
Queue sm so a -> a -> Event ()
IQ.enqueue FCFSQueue a
q a
a
                            forall (m :: * -> *) a. Monad m => a -> m a
return a
a
     forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM (forall a b. [a] -> [b] -> [(a, b)]
zip [Integer
1..] [FCFSQueue a]
qs) forall a b. (a -> b) -> a -> b
$ \(Integer
i, FCFSQueue a
q) ->
       forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. Process a -> Stream a
repeatProcess forall a b. (a -> b) -> a -> b
$ forall {sm} {a} {so}.
(DequeueStrategy sm, Num a, Enum a, Eq a) =>
a -> Queue sm so a -> Process a
reader Integer
i FCFSQueue a
q

-- | Return a stream of first arrivals after assembling the specified number of elements.
firstArrivalStream :: Int -> Stream a -> Stream a
firstArrivalStream :: forall a. Int -> Stream a -> Stream a
firstArrivalStream Int
n Stream a
s = forall acc a b.
(acc -> a -> Process (acc, Maybe b)) -> acc -> Stream a -> Stream b
assembleAccumStream forall {m :: * -> *} {a}.
Monad m =>
(Int, Maybe a) -> a -> m ((Int, Maybe a), Maybe a)
f (Int
1, forall a. Maybe a
Nothing) Stream a
s
  where f :: (Int, Maybe a) -> a -> m ((Int, Maybe a), Maybe a)
f (Int
i, Maybe a
a0) a
a =
          let a0' :: Maybe a
a0' = forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a -> a
fromMaybe a
a Maybe a
a0
          in if Int
i forall a. Integral a => a -> a -> a
`mod` Int
n forall a. Eq a => a -> a -> Bool
== Int
0
             then forall (m :: * -> *) a. Monad m => a -> m a
return ((Int
1, forall a. Maybe a
Nothing), Maybe a
a0')
             else forall (m :: * -> *) a. Monad m => a -> m a
return ((Int
i forall a. Num a => a -> a -> a
+ Int
1, Maybe a
a0'), forall a. Maybe a
Nothing)

-- | Return a stream of last arrivals after assembling the specified number of elements.
lastArrivalStream :: Int -> Stream a -> Stream a
lastArrivalStream :: forall a. Int -> Stream a -> Stream a
lastArrivalStream Int
n Stream a
s = forall acc a b.
(acc -> a -> Process (acc, Maybe b)) -> acc -> Stream a -> Stream b
assembleAccumStream forall {m :: * -> *} {a}. Monad m => Int -> a -> m (Int, Maybe a)
f Int
1 Stream a
s
  where f :: Int -> a -> m (Int, Maybe a)
f Int
i a
a =
          if Int
i forall a. Integral a => a -> a -> a
`mod` Int
n forall a. Eq a => a -> a -> Bool
== Int
0
          then forall (m :: * -> *) a. Monad m => a -> m a
return (Int
1, forall a. a -> Maybe a
Just a
a)
          else forall (m :: * -> *) a. Monad m => a -> m a
return (Int
i forall a. Num a => a -> a -> a
+ Int
1, forall a. Maybe a
Nothing)

-- | Assemble an accumulated stream using the supplied function.
assembleAccumStream :: (acc -> a -> Process (acc, Maybe b)) -> acc -> Stream a -> Stream b
assembleAccumStream :: forall acc a b.
(acc -> a -> Process (acc, Maybe b)) -> acc -> Stream a -> Stream b
assembleAccumStream acc -> a -> Process (acc, Maybe b)
f acc
acc Stream a
s =
  forall a b. (a -> b) -> Stream a -> Stream b
mapStream forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$
  forall a. (a -> Bool) -> Stream a -> Stream a
filterStream forall a. Maybe a -> Bool
isJust forall a b. (a -> b) -> a -> b
$
  forall acc a b.
(acc -> a -> Process (acc, b)) -> acc -> Stream a -> Stream b
accumStream acc -> a -> Process (acc, Maybe b)
f acc
acc Stream a
s

-- | Show the debug messages with the current simulation time.
traceStream :: Maybe String
               -- ^ the request message
               -> Maybe String
               -- ^ the response message
               -> Stream a
               -- ^ a stream
               -> Stream a
traceStream :: forall a. Maybe [Char] -> Maybe [Char] -> Stream a -> Stream a
traceStream Maybe [Char]
request Maybe [Char]
response Stream a
s = forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ forall a. Stream a -> Process (a, Stream a)
loop Stream a
s where
  loop :: Stream a -> Process (a, Stream a)
loop Stream a
s = do (a
a, Stream a
xs) <-
                case Maybe [Char]
request of
                  Maybe [Char]
Nothing -> forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
                  Just [Char]
message ->
                    forall a. [Char] -> Process a -> Process a
traceProcess [Char]
message forall a b. (a -> b) -> a -> b
$
                    forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
              case Maybe [Char]
response of
                Maybe [Char]
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ Stream a -> Process (a, Stream a)
loop Stream a
xs)
                Just [Char]
message ->
                  forall a. [Char] -> Process a -> Process a
traceProcess [Char]
message forall a b. (a -> b) -> a -> b
$
                  forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ Stream a -> Process (a, Stream a)
loop Stream a
xs)