{-# LANGUAGE FlexibleContexts #-}
module Simulation.Aivika.Trans.Stream
(
Stream(..),
emptyStream,
mergeStreams,
mergeQueuedStreams,
mergePriorityStreams,
concatStreams,
concatQueuedStreams,
concatPriorityStreams,
splitStream,
splitStreamQueueing,
splitStreamPrioritising,
splitStreamFiltering,
splitStreamFilteringQueueing,
streamUsingId,
prefetchStream,
delayStream,
arrivalStream,
memoStream,
zipStreamSeq,
zipStreamParallel,
zip3StreamSeq,
zip3StreamParallel,
unzipStream,
streamSeq,
streamParallel,
consumeStream,
sinkStream,
repeatProcess,
mapStream,
mapStreamM,
accumStream,
apStream,
apStreamM,
filterStream,
filterStreamM,
takeStream,
takeStreamWhile,
takeStreamWhileM,
dropStream,
dropStreamWhile,
dropStreamWhileM,
singletonStream,
joinStream,
failoverStream,
signalStream,
streamSignal,
queuedSignalStream,
leftStream,
rightStream,
replaceLeftStream,
replaceRightStream,
partitionEitherStream,
cloneStream,
firstArrivalStream,
lastArrivalStream,
assembleAccumStream,
traceStream) where
import Data.Maybe
import Data.Monoid hiding ((<>))
import Data.Semigroup (Semigroup(..))
import Data.List.NonEmpty (NonEmpty((:|)))
import Control.Applicative
import Control.Monad
import Control.Monad.Trans
import Simulation.Aivika.Trans.Ref.Base
import Simulation.Aivika.Trans.DES
import Simulation.Aivika.Trans.Parameter
import Simulation.Aivika.Trans.Simulation
import Simulation.Aivika.Trans.Dynamics
import Simulation.Aivika.Trans.Event
import Simulation.Aivika.Trans.Composite
import Simulation.Aivika.Trans.Cont
import Simulation.Aivika.Trans.Process
import Simulation.Aivika.Trans.Signal
import Simulation.Aivika.Trans.Resource.Base
import Simulation.Aivika.Trans.QueueStrategy
import qualified Simulation.Aivika.Trans.Queue.Infinite.Base as IQ
import Simulation.Aivika.Arrival (Arrival(..))
newtype Stream m a = Cons { runStream :: Process m (a, Stream m a)
}
instance MonadDES m => Functor (Stream m) where
{-# INLINE fmap #-}
fmap = mapStream
instance MonadDES m => Applicative (Stream m) where
{-# INLINE pure #-}
pure a = let y = Cons (return (a, y)) in y
{-# INLINE (<*>) #-}
(<*>) = apStream
instance MonadDES m => Alternative (Stream m) where
{-# INLINE empty #-}
empty = emptyStream
{-# INLINE (<|>) #-}
(<|>) = mergeStreams
instance MonadDES m => Semigroup (Stream m a) where
{-# INLINE (<>) #-}
(<>) = mergeStreams
{-# INLINE sconcat #-}
sconcat (h :| t) = concatStreams (h : t)
instance MonadDES m => Monoid (Stream m a) where
{-# INLINE mempty #-}
mempty = emptyStream
{-# INLINE mappend #-}
mappend = (<>)
{-# INLINE mconcat #-}
mconcat = concatStreams
streamUsingId :: MonadDES m => ProcessId m -> Stream m a -> Stream m a
{-# INLINABLE streamUsingId #-}
streamUsingId pid (Cons s) =
Cons $ processUsingId pid s
memoStream :: MonadDES m => Stream m a -> Simulation m (Stream m a)
{-# INLINABLE memoStream #-}
memoStream (Cons s) =
do p <- memoProcess $
do ~(x, xs) <- s
xs' <- liftSimulation $ memoStream xs
return (x, xs')
return (Cons p)
zipStreamSeq :: MonadDES m => Stream m a -> Stream m b -> Stream m (a, b)
{-# INLINABLE zipStreamSeq #-}
zipStreamSeq (Cons sa) (Cons sb) = Cons y where
y = do ~(x, xs) <- sa
~(y, ys) <- sb
return ((x, y), zipStreamSeq xs ys)
zipStreamParallel :: MonadDES m => Stream m a -> Stream m b -> Stream m (a, b)
{-# INLINABLE zipStreamParallel #-}
zipStreamParallel (Cons sa) (Cons sb) = Cons y where
y = do ~((x, xs), (y, ys)) <- zipProcessParallel sa sb
return ((x, y), zipStreamParallel xs ys)
zip3StreamSeq :: MonadDES m => Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
{-# INLINABLE zip3StreamSeq #-}
zip3StreamSeq (Cons sa) (Cons sb) (Cons sc) = Cons y where
y = do ~(x, xs) <- sa
~(y, ys) <- sb
~(z, zs) <- sc
return ((x, y, z), zip3StreamSeq xs ys zs)
zip3StreamParallel :: MonadDES m => Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
{-# INLINABLE zip3StreamParallel #-}
zip3StreamParallel (Cons sa) (Cons sb) (Cons sc) = Cons y where
y = do ~((x, xs), (y, ys), (z, zs)) <- zip3ProcessParallel sa sb sc
return ((x, y, z), zip3StreamParallel xs ys zs)
unzipStream :: MonadDES m => Stream m (a, b) -> Simulation m (Stream m a, Stream m b)
{-# INLINABLE unzipStream #-}
unzipStream s =
do s' <- memoStream s
let sa = mapStream fst s'
sb = mapStream snd s'
return (sa, sb)
streamSeq :: MonadDES m => [Stream m a] -> Stream m [a]
{-# INLINABLE streamSeq #-}
streamSeq xs = Cons y where
y = do ps <- forM xs runStream
return (map fst ps, streamSeq $ map snd ps)
streamParallel :: MonadDES m => [Stream m a] -> Stream m [a]
{-# INLINABLE streamParallel #-}
streamParallel xs = Cons y where
y = do ps <- processParallel $ map runStream xs
return (map fst ps, streamParallel $ map snd ps)
repeatProcess :: MonadDES m => Process m a -> Stream m a
{-# INLINABLE repeatProcess #-}
repeatProcess p = Cons y where
y = do a <- p
return (a, repeatProcess p)
mapStream :: MonadDES m => (a -> b) -> Stream m a -> Stream m b
{-# INLINABLE mapStream #-}
mapStream f (Cons s) = Cons y where
y = do (a, xs) <- s
return (f a, mapStream f xs)
mapStreamM :: MonadDES m => (a -> Process m b) -> Stream m a -> Stream m b
{-# INLINABLE mapStreamM #-}
mapStreamM f (Cons s) = Cons y where
y = do (a, xs) <- s
b <- f a
return (b, mapStreamM f xs)
accumStream :: MonadDES m => (acc -> a -> Process m (acc, b)) -> acc -> Stream m a -> Stream m b
{-# INLINABLE accumStream #-}
accumStream f acc xs = Cons $ loop xs acc where
loop (Cons s) acc =
do (a, xs) <- s
(acc', b) <- f acc a
return (b, Cons $ loop xs acc')
apStream :: MonadDES m => Stream m (a -> b) -> Stream m a -> Stream m b
{-# INLINABLE apStream #-}
apStream (Cons sf) (Cons sa) = Cons y where
y = do (f, sf') <- sf
(a, sa') <- sa
return (f a, apStream sf' sa')
apStreamM :: MonadDES m => Stream m (a -> Process m b) -> Stream m a -> Stream m b
{-# INLINABLE apStreamM #-}
apStreamM (Cons sf) (Cons sa) = Cons y where
y = do (f, sf') <- sf
(a, sa') <- sa
x <- f a
return (x, apStreamM sf' sa')
filterStream :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
{-# INLINABLE filterStream #-}
filterStream p (Cons s) = Cons y where
y = do (a, xs) <- s
if p a
then return (a, filterStream p xs)
else let Cons z = filterStream p xs in z
filterStreamM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
{-# INLINABLE filterStreamM #-}
filterStreamM p (Cons s) = Cons y where
y = do (a, xs) <- s
b <- p a
if b
then return (a, filterStreamM p xs)
else let Cons z = filterStreamM p xs in z
leftStream :: MonadDES m => Stream m (Either a b) -> Stream m a
{-# INLINABLE leftStream #-}
leftStream (Cons s) = Cons y where
y = do (a, xs) <- s
case a of
Left a -> return (a, leftStream xs)
Right _ -> let Cons z = leftStream xs in z
rightStream :: MonadDES m => Stream m (Either a b) -> Stream m b
{-# INLINABLE rightStream #-}
rightStream (Cons s) = Cons y where
y = do (a, xs) <- s
case a of
Left _ -> let Cons z = rightStream xs in z
Right a -> return (a, rightStream xs)
replaceLeftStream :: MonadDES m => Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
{-# INLINABLE replaceLeftStream #-}
replaceLeftStream (Cons sab) (ys0 @ ~(Cons sc)) = Cons z where
z = do (a, xs) <- sab
case a of
Left _ ->
do (b, ys) <- sc
return (Left b, replaceLeftStream xs ys)
Right a ->
return (Right a, replaceLeftStream xs ys0)
replaceRightStream :: MonadDES m => Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
{-# INLINABLE replaceRightStream #-}
replaceRightStream (Cons sab) (ys0 @ ~(Cons sc)) = Cons z where
z = do (a, xs) <- sab
case a of
Right _ ->
do (b, ys) <- sc
return (Right b, replaceRightStream xs ys)
Left a ->
return (Left a, replaceRightStream xs ys0)
partitionEitherStream :: MonadDES m => Stream m (Either a b) -> Simulation m (Stream m a, Stream m b)
{-# INLINABLE partitionEitherStream #-}
partitionEitherStream s =
do s' <- memoStream s
return (leftStream s', rightStream s')
splitStream :: MonadDES m => Int -> Stream m a -> Simulation m [Stream m a]
{-# INLINABLE splitStream #-}
splitStream = splitStreamQueueing FCFS
splitStreamQueueing :: (MonadDES m, EnqueueStrategy m s)
=> s
-> Int
-> Stream m a
-> Simulation m [Stream m a]
{-# INLINABLE splitStreamQueueing #-}
splitStreamQueueing s n x =
do ref <- newRef x
res <- newResource s 1
let reader =
usingResource res $
do p <- liftEvent $ readRef ref
(a, xs) <- runStream p
liftEvent $ writeRef ref xs
return a
return $ map (\i -> repeatProcess reader) [1..n]
splitStreamPrioritising :: (MonadDES m, PriorityQueueStrategy m s p)
=> s
-> [Stream m p]
-> Stream m a
-> Simulation m [Stream m a]
{-# INLINABLE splitStreamPrioritising #-}
splitStreamPrioritising s ps x =
do ref <- newRef x
res <- newResource s 1
let stream (Cons p) = Cons z where
z = do (p', ps) <- p
a <- usingResourceWithPriority res p' $
do p <- liftEvent $ readRef ref
(a, xs) <- runStream p
liftEvent $ writeRef ref xs
return a
return (a, stream ps)
return $ map stream ps
splitStreamFiltering :: MonadDES m => [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
{-# INLINABLE splitStreamFiltering #-}
splitStreamFiltering = splitStreamFilteringQueueing FCFS
splitStreamFilteringQueueing :: (MonadDES m, EnqueueStrategy m s)
=> s
-> [a -> Event m Bool]
-> Stream m a
-> Simulation m [Stream m a]
{-# INLINABLE splitStreamFilteringQueueing #-}
splitStreamFilteringQueueing s preds x =
do ref <- liftSimulation $ newRef x
res <- newResource s 1
let reader pred =
do a <-
usingResource res $
do p <- liftEvent $ readRef ref
(a, xs) <- runStream p
liftEvent $
do f <- pred a
if f
then do writeRef ref xs
return $ Just a
else do writeRef ref $ Cons (return (a, xs))
return Nothing
case a of
Just a -> return a
Nothing -> reader pred
return $ map (repeatProcess . reader) preds
concatStreams :: MonadDES m => [Stream m a] -> Stream m a
{-# INLINABLE concatStreams #-}
concatStreams = concatQueuedStreams FCFS
concatQueuedStreams :: (MonadDES m, EnqueueStrategy m s)
=> s
-> [Stream m a]
-> Stream m a
{-# INLINABLE concatQueuedStreams #-}
concatQueuedStreams s streams = Cons z where
z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
writing <- liftSimulation $ newResourceWithMaxCount s 1 (Just 1)
conting <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
ref <- liftSimulation $ newRef Nothing
let writer p =
do (a, xs) <- runStream p
requestResource writing
liftEvent $ writeRef ref (Just a)
releaseResource reading
requestResource conting
writer xs
reader =
do requestResource reading
Just a <- liftEvent $ readRef ref
liftEvent $ writeRef ref Nothing
releaseResource writing
return a
forM_ streams $ spawnProcess . writer
a <- reader
let xs = repeatProcess (releaseResource conting >> reader)
return (a, xs)
concatPriorityStreams :: (MonadDES m, PriorityQueueStrategy m s p)
=> s
-> [Stream m (p, a)]
-> Stream m a
{-# INLINABLE concatPriorityStreams #-}
concatPriorityStreams s streams = Cons z where
z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
writing <- liftSimulation $ newResourceWithMaxCount s 1 (Just 1)
conting <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
ref <- liftSimulation $ newRef Nothing
let writer p =
do ((priority, a), xs) <- runStream p
requestResourceWithPriority writing priority
liftEvent $ writeRef ref (Just a)
releaseResource reading
requestResource conting
writer xs
reader =
do requestResource reading
Just a <- liftEvent $ readRef ref
liftEvent $ writeRef ref Nothing
releaseResource writing
return a
forM_ streams $ spawnProcess . writer
a <- reader
let xs = repeatProcess (releaseResource conting >> reader)
return (a, xs)
mergeStreams :: MonadDES m => Stream m a -> Stream m a -> Stream m a
{-# INLINABLE mergeStreams #-}
mergeStreams = mergeQueuedStreams FCFS
mergeQueuedStreams :: (MonadDES m, EnqueueStrategy m s)
=> s
-> Stream m a
-> Stream m a
-> Stream m a
{-# INLINABLE mergeQueuedStreams #-}
mergeQueuedStreams s x y = concatQueuedStreams s [x, y]
mergePriorityStreams :: (MonadDES m, PriorityQueueStrategy m s p)
=> s
-> Stream m (p, a)
-> Stream m (p, a)
-> Stream m a
{-# INLINABLE mergePriorityStreams #-}
mergePriorityStreams s x y = concatPriorityStreams s [x, y]
emptyStream :: MonadDES m => Stream m a
{-# INLINABLE emptyStream #-}
emptyStream = Cons neverProcess
consumeStream :: MonadDES m => (a -> Process m ()) -> Stream m a -> Process m ()
{-# INLINABLE consumeStream #-}
consumeStream f (Cons s) =
do (a, xs) <- s
f a
consumeStream f xs
sinkStream :: MonadDES m => Stream m a -> Process m ()
{-# INLINABLE sinkStream #-}
sinkStream (Cons s) =
do (a, xs) <- s
sinkStream xs
prefetchStream :: MonadDES m => Stream m a -> Stream m a
{-# INLINABLE prefetchStream #-}
prefetchStream s = Cons z where
z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
writing <- liftSimulation $ newResourceWithMaxCount FCFS 1 (Just 1)
ref <- liftSimulation $ newRef Nothing
let writer p =
do (a, xs) <- runStream p
requestResource writing
liftEvent $ writeRef ref (Just a)
releaseResource reading
writer xs
reader =
do requestResource reading
Just a <- liftEvent $ readRef ref
liftEvent $ writeRef ref Nothing
releaseResource writing
return a
spawnProcess $ writer s
runStream $ repeatProcess reader
queuedSignalStream :: MonadDES m
=> (a -> Event m ())
-> Process m a
-> Signal m a
-> Composite m (Stream m a)
{-# INLINABLE queuedSignalStream #-}
queuedSignalStream enqueue dequeue s =
do h <- liftEvent $
handleSignal s enqueue
disposableComposite h
return $ repeatProcess dequeue
signalStream :: MonadDES m => Signal m a -> Composite m (Stream m a)
{-# INLINABLE signalStream #-}
signalStream s =
do q <- liftSimulation IQ.newFCFSQueue
queuedSignalStream (IQ.enqueue q) (IQ.dequeue q) s
streamSignal :: MonadDES m => Stream m a -> Composite m (Signal m a)
{-# INLINABLE streamSignal #-}
streamSignal z =
do s <- liftSimulation newSignalSource
pid <- liftSimulation newProcessId
liftEvent $
runProcessUsingId pid $
consumeStream (liftEvent . triggerSignal s) z
disposableComposite $
DisposableEvent $
cancelProcessWithId pid
return $ publishSignal s
arrivalStream :: MonadDES m => Stream m a -> Stream m (Arrival a)
{-# INLINABLE arrivalStream #-}
arrivalStream s = Cons $ loop s Nothing where
loop s t0 = do (a, xs) <- runStream s
t <- liftDynamics time
let b = Arrival { arrivalValue = a,
arrivalTime = t,
arrivalDelay =
case t0 of
Nothing -> Nothing
Just t0 -> Just (t - t0) }
return (b, Cons $ loop xs (Just t))
delayStream :: MonadDES m => a -> Stream m a -> Stream m a
{-# INLINABLE delayStream #-}
delayStream a0 s = Cons $ return (a0, s)
singletonStream :: MonadDES m => a -> Stream m a
{-# INLINABLE singletonStream #-}
singletonStream a = Cons $ return (a, emptyStream)
joinStream :: MonadDES m => Process m (Stream m a) -> Stream m a
{-# INLINABLE joinStream #-}
joinStream m = Cons $ m >>= runStream
failoverStream :: MonadDES m => [Stream m a] -> Stream m a
{-# INLINABLE failoverStream #-}
failoverStream ps = Cons z where
z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
writing <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
ref <- liftSimulation $ newRef Nothing
pid <- processId
let writer p =
do requestResource writing
pid' <- processId
(a, xs) <-
finallyProcess (runStream p) $
liftEvent $
do cancelled' <- processCancelled pid'
when cancelled' $
releaseResourceWithinEvent writing
liftEvent $ writeRef ref (Just a)
releaseResource reading
writer xs
reader =
do releaseResource writing
requestResource reading
Just a <- liftEvent $ readRef ref
liftEvent $ writeRef ref Nothing
return a
loop [] = return ()
loop (p: ps) =
do pid' <- processId
h' <- liftEvent $
handleSignal (processCancelling pid) $ \() ->
cancelProcessWithId pid'
finallyProcess (writer p) $
liftEvent $
do disposeEvent h'
cancelled <- processCancelled pid
unless cancelled $
do cancelled' <- processCancelled pid'
unless cancelled' $
error "Expected the sub-process to be cancelled: failoverStream"
runProcess $ loop ps
liftEvent $ runProcess $ loop ps
runStream $ repeatProcess reader
takeStream :: MonadDES m => Int -> Stream m a -> Stream m a
{-# INLINABLE takeStream #-}
takeStream n s
| n <= 0 = emptyStream
| otherwise =
Cons $
do (a, xs) <- runStream s
return (a, takeStream (n - 1) xs)
takeStreamWhile :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
{-# INLINABLE takeStreamWhile #-}
takeStreamWhile p s =
Cons $
do (a, xs) <- runStream s
if p a
then return (a, takeStreamWhile p xs)
else neverProcess
takeStreamWhileM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
{-# INLINABLE takeStreamWhileM #-}
takeStreamWhileM p s =
Cons $
do (a, xs) <- runStream s
f <- p a
if f
then return (a, takeStreamWhileM p xs)
else neverProcess
dropStream :: MonadDES m => Int -> Stream m a -> Stream m a
{-# INLINABLE dropStream #-}
dropStream n s
| n <= 0 = s
| otherwise =
Cons $
do (a, xs) <- runStream s
runStream $ dropStream (n - 1) xs
dropStreamWhile :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
{-# INLINABLE dropStreamWhile #-}
dropStreamWhile p s =
Cons $
do (a, xs) <- runStream s
if p a
then runStream $ dropStreamWhile p xs
else return (a, xs)
dropStreamWhileM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
{-# INLINABLE dropStreamWhileM #-}
dropStreamWhileM p s =
Cons $
do (a, xs) <- runStream s
f <- p a
if f
then runStream $ dropStreamWhileM p xs
else return (a, xs)
cloneStream :: MonadDES m => Int -> Stream m a -> Simulation m [Stream m a]
{-# INLINABLE cloneStream #-}
cloneStream n s =
do qs <- forM [1..n] $ \i -> IQ.newFCFSQueue
rs <- newFCFSResource 1
ref <- newRef s
let reader m q =
do a <- liftEvent $ IQ.tryDequeue q
case a of
Just a -> return a
Nothing ->
usingResource rs $
do a <- liftEvent $ IQ.tryDequeue q
case a of
Just a -> return a
Nothing ->
do s <- liftEvent $ readRef ref
(a, xs) <- runStream s
liftEvent $ writeRef ref xs
forM_ (zip [1..] qs) $ \(i, q) ->
unless (i == m) $
liftEvent $ IQ.enqueue q a
return a
forM (zip [1..] qs) $ \(i, q) ->
return $ repeatProcess $ reader i q
firstArrivalStream :: MonadDES m => Int -> Stream m a -> Stream m a
{-# INLINABLE firstArrivalStream #-}
firstArrivalStream n s = assembleAccumStream f (1, Nothing) s
where f (i, a0) a =
let a0' = Just $ fromMaybe a a0
in if i `mod` n == 0
then return ((1, Nothing), a0')
else return ((i + 1, a0'), Nothing)
lastArrivalStream :: MonadDES m => Int -> Stream m a -> Stream m a
{-# INLINABLE lastArrivalStream #-}
lastArrivalStream n s = assembleAccumStream f 1 s
where f i a =
if i `mod` n == 0
then return (1, Just a)
else return (i + 1, Nothing)
assembleAccumStream :: MonadDES m => (acc -> a -> Process m (acc, Maybe b)) -> acc -> Stream m a -> Stream m b
{-# INLINABLE assembleAccumStream #-}
assembleAccumStream f acc s =
mapStream fromJust $
filterStream isJust $
accumStream f acc s
traceStream :: MonadDES m
=> Maybe String
-> Maybe String
-> Stream m a
-> Stream m a
{-# INLINABLE traceStream #-}
traceStream request response s = Cons $ loop s where
loop s = do (a, xs) <-
case request of
Nothing -> runStream s
Just message ->
traceProcess message $
runStream s
case response of
Nothing -> return (a, Cons $ loop xs)
Just message ->
traceProcess message $
return (a, Cons $ loop xs)