{-# LANGUAGE FlexibleContexts #-}
module Simulation.Aivika.Trans.Stream
(
Stream(..),
emptyStream,
mergeStreams,
mergeQueuedStreams,
mergePriorityStreams,
concatStreams,
concatQueuedStreams,
concatPriorityStreams,
splitStream,
splitStreamQueueing,
splitStreamPrioritising,
splitStreamFiltering,
splitStreamFilteringQueueing,
streamUsingId,
prefetchStream,
delayStream,
arrivalStream,
memoStream,
zipStreamSeq,
zipStreamParallel,
zip3StreamSeq,
zip3StreamParallel,
unzipStream,
streamSeq,
streamParallel,
consumeStream,
sinkStream,
repeatProcess,
mapStream,
mapStreamM,
accumStream,
apStream,
apStreamM,
filterStream,
filterStreamM,
takeStream,
takeStreamWhile,
takeStreamWhileM,
dropStream,
dropStreamWhile,
dropStreamWhileM,
singletonStream,
joinStream,
failoverStream,
signalStream,
streamSignal,
queuedSignalStream,
leftStream,
rightStream,
replaceLeftStream,
replaceRightStream,
partitionEitherStream,
cloneStream,
firstArrivalStream,
lastArrivalStream,
assembleAccumStream,
traceStream) where
import Data.Maybe
import Data.Monoid hiding ((<>))
import Data.Semigroup (Semigroup(..))
import Data.List.NonEmpty (NonEmpty((:|)))
import Control.Applicative
import Control.Monad
import Control.Monad.Trans
import Simulation.Aivika.Trans.Ref.Base
import Simulation.Aivika.Trans.DES
import Simulation.Aivika.Trans.Parameter
import Simulation.Aivika.Trans.Simulation
import Simulation.Aivika.Trans.Dynamics
import Simulation.Aivika.Trans.Event
import Simulation.Aivika.Trans.Composite
import Simulation.Aivika.Trans.Cont
import Simulation.Aivika.Trans.Process
import Simulation.Aivika.Trans.Signal
import Simulation.Aivika.Trans.Resource.Base
import Simulation.Aivika.Trans.QueueStrategy
import qualified Simulation.Aivika.Trans.Queue.Infinite.Base as IQ
import Simulation.Aivika.Arrival (Arrival(..))
newtype Stream m a = Cons { forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream :: Process m (a, Stream m a)
}
instance MonadDES m => Functor (Stream m) where
{-# INLINE fmap #-}
fmap :: forall a b. (a -> b) -> Stream m a -> Stream m b
fmap = forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream
instance MonadDES m => Applicative (Stream m) where
{-# INLINE pure #-}
pure :: forall a. a -> Stream m a
pure a
a = let y :: Stream m a
y = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
y)) in Stream m a
y
{-# INLINE (<*>) #-}
<*> :: forall a b. Stream m (a -> b) -> Stream m a -> Stream m b
(<*>) = forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a -> b) -> Stream m a -> Stream m b
apStream
instance MonadDES m => Alternative (Stream m) where
{-# INLINE empty #-}
empty :: forall a. Stream m a
empty = forall (m :: * -> *) a. MonadDES m => Stream m a
emptyStream
{-# INLINE (<|>) #-}
<|> :: forall a. Stream m a -> Stream m a -> Stream m a
(<|>) = forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Stream m a -> Stream m a
mergeStreams
instance MonadDES m => Semigroup (Stream m a) where
{-# INLINE (<>) #-}
<> :: Stream m a -> Stream m a -> Stream m a
(<>) = forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Stream m a -> Stream m a
mergeStreams
{-# INLINE sconcat #-}
sconcat :: NonEmpty (Stream m a) -> Stream m a
sconcat (Stream m a
h :| [Stream m a]
t) = forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m a
concatStreams (Stream m a
h forall a. a -> [a] -> [a]
: [Stream m a]
t)
instance MonadDES m => Monoid (Stream m a) where
{-# INLINE mempty #-}
mempty :: Stream m a
mempty = forall (m :: * -> *) a. MonadDES m => Stream m a
emptyStream
{-# INLINE mappend #-}
mappend :: Stream m a -> Stream m a -> Stream m a
mappend = forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE mconcat #-}
mconcat :: [Stream m a] -> Stream m a
mconcat = forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m a
concatStreams
streamUsingId :: MonadDES m => ProcessId m -> Stream m a -> Stream m a
{-# INLINABLE streamUsingId #-}
streamUsingId :: forall (m :: * -> *) a.
MonadDES m =>
ProcessId m -> Stream m a -> Stream m a
streamUsingId ProcessId m
pid (Cons Process m (a, Stream m a)
s) =
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadDES m =>
ProcessId m -> Process m a -> Process m a
processUsingId ProcessId m
pid Process m (a, Stream m a)
s
memoStream :: MonadDES m => Stream m a -> Simulation m (Stream m a)
{-# INLINABLE memoStream #-}
memoStream :: forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Simulation m (Stream m a)
memoStream (Cons Process m (a, Stream m a)
s) =
do Process m (a, Stream m a)
p <- forall (m :: * -> *) a.
MonadDES m =>
Process m a -> Simulation m (Process m a)
memoProcess forall a b. (a -> b) -> a -> b
$
do ~(a
x, Stream m a
xs) <- Process m (a, Stream m a)
s
Stream m a
xs' <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Simulation m (Stream m a)
memoStream Stream m a
xs
forall (m :: * -> *) a. Monad m => a -> m a
return (a
x, Stream m a
xs')
forall (m :: * -> *) a. Monad m => a -> m a
return (forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
p)
zipStreamSeq :: MonadDES m => Stream m a -> Stream m b -> Stream m (a, b)
{-# INLINABLE zipStreamSeq #-}
zipStreamSeq :: forall (m :: * -> *) a b.
MonadDES m =>
Stream m a -> Stream m b -> Stream m (a, b)
zipStreamSeq (Cons Process m (a, Stream m a)
sa) (Cons Process m (b, Stream m b)
sb) = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ((a, b), Stream m (a, b))
y where
y :: Process m ((a, b), Stream m (a, b))
y = do ~(a
x, Stream m a
xs) <- Process m (a, Stream m a)
sa
~(b
y, Stream m b
ys) <- Process m (b, Stream m b)
sb
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y), forall (m :: * -> *) a b.
MonadDES m =>
Stream m a -> Stream m b -> Stream m (a, b)
zipStreamSeq Stream m a
xs Stream m b
ys)
zipStreamParallel :: MonadDES m => Stream m a -> Stream m b -> Stream m (a, b)
{-# INLINABLE zipStreamParallel #-}
zipStreamParallel :: forall (m :: * -> *) a b.
MonadDES m =>
Stream m a -> Stream m b -> Stream m (a, b)
zipStreamParallel (Cons Process m (a, Stream m a)
sa) (Cons Process m (b, Stream m b)
sb) = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ((a, b), Stream m (a, b))
y where
y :: Process m ((a, b), Stream m (a, b))
y = do ~((a
x, Stream m a
xs), (b
y, Stream m b
ys)) <- forall (m :: * -> *) a b.
MonadDES m =>
Process m a -> Process m b -> Process m (a, b)
zipProcessParallel Process m (a, Stream m a)
sa Process m (b, Stream m b)
sb
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y), forall (m :: * -> *) a b.
MonadDES m =>
Stream m a -> Stream m b -> Stream m (a, b)
zipStreamParallel Stream m a
xs Stream m b
ys)
zip3StreamSeq :: MonadDES m => Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
{-# INLINABLE zip3StreamSeq #-}
zip3StreamSeq :: forall (m :: * -> *) a b c.
MonadDES m =>
Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
zip3StreamSeq (Cons Process m (a, Stream m a)
sa) (Cons Process m (b, Stream m b)
sb) (Cons Process m (c, Stream m c)
sc) = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ((a, b, c), Stream m (a, b, c))
y where
y :: Process m ((a, b, c), Stream m (a, b, c))
y = do ~(a
x, Stream m a
xs) <- Process m (a, Stream m a)
sa
~(b
y, Stream m b
ys) <- Process m (b, Stream m b)
sb
~(c
z, Stream m c
zs) <- Process m (c, Stream m c)
sc
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y, c
z), forall (m :: * -> *) a b c.
MonadDES m =>
Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
zip3StreamSeq Stream m a
xs Stream m b
ys Stream m c
zs)
zip3StreamParallel :: MonadDES m => Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
{-# INLINABLE zip3StreamParallel #-}
zip3StreamParallel :: forall (m :: * -> *) a b c.
MonadDES m =>
Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
zip3StreamParallel (Cons Process m (a, Stream m a)
sa) (Cons Process m (b, Stream m b)
sb) (Cons Process m (c, Stream m c)
sc) = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ((a, b, c), Stream m (a, b, c))
y where
y :: Process m ((a, b, c), Stream m (a, b, c))
y = do ~((a
x, Stream m a
xs), (b
y, Stream m b
ys), (c
z, Stream m c
zs)) <- forall (m :: * -> *) a b c.
MonadDES m =>
Process m a -> Process m b -> Process m c -> Process m (a, b, c)
zip3ProcessParallel Process m (a, Stream m a)
sa Process m (b, Stream m b)
sb Process m (c, Stream m c)
sc
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y, c
z), forall (m :: * -> *) a b c.
MonadDES m =>
Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
zip3StreamParallel Stream m a
xs Stream m b
ys Stream m c
zs)
unzipStream :: MonadDES m => Stream m (a, b) -> Simulation m (Stream m a, Stream m b)
{-# INLINABLE unzipStream #-}
unzipStream :: forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a, b) -> Simulation m (Stream m a, Stream m b)
unzipStream Stream m (a, b)
s =
do Stream m (a, b)
s' <- forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Simulation m (Stream m a)
memoStream Stream m (a, b)
s
let sa :: Stream m a
sa = forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream forall a b. (a, b) -> a
fst Stream m (a, b)
s'
sb :: Stream m b
sb = forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream forall a b. (a, b) -> b
snd Stream m (a, b)
s'
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a
sa, Stream m b
sb)
streamSeq :: MonadDES m => [Stream m a] -> Stream m [a]
{-# INLINABLE streamSeq #-}
streamSeq :: forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m [a]
streamSeq [Stream m a]
xs = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ([a], Stream m [a])
y where
y :: Process m ([a], Stream m [a])
y = do [(a, Stream m a)]
ps <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Stream m a]
xs forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst [(a, Stream m a)]
ps, forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m [a]
streamSeq forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> b
snd [(a, Stream m a)]
ps)
streamParallel :: MonadDES m => [Stream m a] -> Stream m [a]
{-# INLINABLE streamParallel #-}
streamParallel :: forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m [a]
streamParallel [Stream m a]
xs = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ([a], Stream m [a])
y where
y :: Process m ([a], Stream m [a])
y = do [(a, Stream m a)]
ps <- forall (m :: * -> *) a.
MonadDES m =>
[Process m a] -> Process m [a]
processParallel forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream [Stream m a]
xs
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst [(a, Stream m a)]
ps, forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m [a]
streamParallel forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> b
snd [(a, Stream m a)]
ps)
repeatProcess :: MonadDES m => Process m a -> Stream m a
{-# INLINABLE repeatProcess #-}
repeatProcess :: forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
p = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
y where
y :: Process m (a, Stream m a)
y = do a
a <- Process m a
p
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
p)
mapStream :: MonadDES m => (a -> b) -> Stream m a -> Stream m b
{-# INLINABLE mapStream #-}
mapStream :: forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream a -> b
f (Cons Process m (a, Stream m a)
s) = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (b, Stream m b)
y where
y :: Process m (b, Stream m b)
y = do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b
f a
a, forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream a -> b
f Stream m a
xs)
mapStreamM :: MonadDES m => (a -> Process m b) -> Stream m a -> Stream m b
{-# INLINABLE mapStreamM #-}
mapStreamM :: forall (m :: * -> *) a b.
MonadDES m =>
(a -> Process m b) -> Stream m a -> Stream m b
mapStreamM a -> Process m b
f (Cons Process m (a, Stream m a)
s) = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (b, Stream m b)
y where
y :: Process m (b, Stream m b)
y = do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
b
b <- a -> Process m b
f a
a
forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, forall (m :: * -> *) a b.
MonadDES m =>
(a -> Process m b) -> Stream m a -> Stream m b
mapStreamM a -> Process m b
f Stream m a
xs)
accumStream :: MonadDES m => (acc -> a -> Process m (acc, b)) -> acc -> Stream m a -> Stream m b
{-# INLINABLE accumStream #-}
accumStream :: forall (m :: * -> *) acc a b.
MonadDES m =>
(acc -> a -> Process m (acc, b)) -> acc -> Stream m a -> Stream m b
accumStream acc -> a -> Process m (acc, b)
f acc
acc Stream m a
xs = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$ Stream m a -> acc -> Process m (b, Stream m b)
loop Stream m a
xs acc
acc where
loop :: Stream m a -> acc -> Process m (b, Stream m b)
loop (Cons Process m (a, Stream m a)
s) acc
acc =
do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
(acc
acc', b
b) <- acc -> a -> Process m (acc, b)
f acc
acc a
a
forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$ Stream m a -> acc -> Process m (b, Stream m b)
loop Stream m a
xs acc
acc')
apStream :: MonadDES m => Stream m (a -> b) -> Stream m a -> Stream m b
{-# INLINABLE apStream #-}
apStream :: forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a -> b) -> Stream m a -> Stream m b
apStream (Cons Process m (a -> b, Stream m (a -> b))
sf) (Cons Process m (a, Stream m a)
sa) = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (b, Stream m b)
y where
y :: Process m (b, Stream m b)
y = do (a -> b
f, Stream m (a -> b)
sf') <- Process m (a -> b, Stream m (a -> b))
sf
(a
a, Stream m a
sa') <- Process m (a, Stream m a)
sa
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b
f a
a, forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a -> b) -> Stream m a -> Stream m b
apStream Stream m (a -> b)
sf' Stream m a
sa')
apStreamM :: MonadDES m => Stream m (a -> Process m b) -> Stream m a -> Stream m b
{-# INLINABLE apStreamM #-}
apStreamM :: forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a -> Process m b) -> Stream m a -> Stream m b
apStreamM (Cons Process m (a -> Process m b, Stream m (a -> Process m b))
sf) (Cons Process m (a, Stream m a)
sa) = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (b, Stream m b)
y where
y :: Process m (b, Stream m b)
y = do (a -> Process m b
f, Stream m (a -> Process m b)
sf') <- Process m (a -> Process m b, Stream m (a -> Process m b))
sf
(a
a, Stream m a
sa') <- Process m (a, Stream m a)
sa
b
x <- a -> Process m b
f a
a
forall (m :: * -> *) a. Monad m => a -> m a
return (b
x, forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a -> Process m b) -> Stream m a -> Stream m b
apStreamM Stream m (a -> Process m b)
sf' Stream m a
sa')
filterStream :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
{-# INLINABLE filterStream #-}
filterStream :: forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
filterStream a -> Bool
p (Cons Process m (a, Stream m a)
s) = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
y where
y :: Process m (a, Stream m a)
y = do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
if a -> Bool
p a
a
then forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
filterStream a -> Bool
p Stream m a
xs)
else let Cons Process m (a, Stream m a)
z = forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
filterStream a -> Bool
p Stream m a
xs in Process m (a, Stream m a)
z
filterStreamM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
{-# INLINABLE filterStreamM #-}
filterStreamM :: forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m Bool) -> Stream m a -> Stream m a
filterStreamM a -> Process m Bool
p (Cons Process m (a, Stream m a)
s) = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
y where
y :: Process m (a, Stream m a)
y = do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
Bool
b <- a -> Process m Bool
p a
a
if Bool
b
then forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m Bool) -> Stream m a -> Stream m a
filterStreamM a -> Process m Bool
p Stream m a
xs)
else let Cons Process m (a, Stream m a)
z = forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m Bool) -> Stream m a -> Stream m a
filterStreamM a -> Process m Bool
p Stream m a
xs in Process m (a, Stream m a)
z
leftStream :: MonadDES m => Stream m (Either a b) -> Stream m a
{-# INLINABLE leftStream #-}
leftStream :: forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m a
leftStream (Cons Process m (Either a b, Stream m (Either a b))
s) = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
y where
y :: Process m (a, Stream m a)
y = do (Either a b
a, Stream m (Either a b)
xs) <- Process m (Either a b, Stream m (Either a b))
s
case Either a b
a of
Left a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m a
leftStream Stream m (Either a b)
xs)
Right b
_ -> let Cons Process m (a, Stream m a)
z = forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m a
leftStream Stream m (Either a b)
xs in Process m (a, Stream m a)
z
rightStream :: MonadDES m => Stream m (Either a b) -> Stream m b
{-# INLINABLE rightStream #-}
rightStream :: forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m b
rightStream (Cons Process m (Either a b, Stream m (Either a b))
s) = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (b, Stream m b)
y where
y :: Process m (b, Stream m b)
y = do (Either a b
a, Stream m (Either a b)
xs) <- Process m (Either a b, Stream m (Either a b))
s
case Either a b
a of
Left a
_ -> let Cons Process m (b, Stream m b)
z = forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m b
rightStream Stream m (Either a b)
xs in Process m (b, Stream m b)
z
Right b
a -> forall (m :: * -> *) a. Monad m => a -> m a
return (b
a, forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m b
rightStream Stream m (Either a b)
xs)
replaceLeftStream :: MonadDES m => Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
{-# INLINABLE replaceLeftStream #-}
replaceLeftStream :: forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
replaceLeftStream (Cons Process m (Either a b, Stream m (Either a b))
sab) (ys0 :: Stream m c
ys0@(Cons Process m (c, Stream m c)
sc)) = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (Either c b, Stream m (Either c b))
z where
z :: Process m (Either c b, Stream m (Either c b))
z = do (Either a b
a, Stream m (Either a b)
xs) <- Process m (Either a b, Stream m (Either a b))
sab
case Either a b
a of
Left a
_ ->
do (c
b, Stream m c
ys) <- Process m (c, Stream m c)
sc
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left c
b, forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
replaceLeftStream Stream m (Either a b)
xs Stream m c
ys)
Right b
a ->
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
a, forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
replaceLeftStream Stream m (Either a b)
xs Stream m c
ys0)
replaceRightStream :: MonadDES m => Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
{-# INLINABLE replaceRightStream #-}
replaceRightStream :: forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
replaceRightStream (Cons Process m (Either a b, Stream m (Either a b))
sab) (ys0 :: Stream m c
ys0@(Cons Process m (c, Stream m c)
sc)) = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (Either a c, Stream m (Either a c))
z where
z :: Process m (Either a c, Stream m (Either a c))
z = do (Either a b
a, Stream m (Either a b)
xs) <- Process m (Either a b, Stream m (Either a b))
sab
case Either a b
a of
Right b
_ ->
do (c
b, Stream m c
ys) <- Process m (c, Stream m c)
sc
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right c
b, forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
replaceRightStream Stream m (Either a b)
xs Stream m c
ys)
Left a
a ->
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left a
a, forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
replaceRightStream Stream m (Either a b)
xs Stream m c
ys0)
partitionEitherStream :: MonadDES m => Stream m (Either a b) -> Simulation m (Stream m a, Stream m b)
{-# INLINABLE partitionEitherStream #-}
partitionEitherStream :: forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Simulation m (Stream m a, Stream m b)
partitionEitherStream Stream m (Either a b)
s =
do Stream m (Either a b)
s' <- forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Simulation m (Stream m a)
memoStream Stream m (Either a b)
s
forall (m :: * -> *) a. Monad m => a -> m a
return (forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m a
leftStream Stream m (Either a b)
s', forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m b
rightStream Stream m (Either a b)
s')
splitStream :: MonadDES m => Int -> Stream m a -> Simulation m [Stream m a]
{-# INLINABLE splitStream #-}
splitStream :: forall (m :: * -> *) a.
MonadDES m =>
Int -> Stream m a -> Simulation m [Stream m a]
splitStream = forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> Int -> Stream m a -> Simulation m [Stream m a]
splitStreamQueueing FCFS
FCFS
splitStreamQueueing :: (MonadDES m, EnqueueStrategy m s)
=> s
-> Int
-> Stream m a
-> Simulation m [Stream m a]
{-# INLINABLE splitStreamQueueing #-}
splitStreamQueueing :: forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> Int -> Stream m a -> Simulation m [Stream m a]
splitStreamQueueing s
s Int
n Stream m a
x =
do Ref m (Stream m a)
ref <- forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Stream m a
x
Resource m s
res <- forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Simulation m (Resource m s)
newResource s
s Int
1
let reader :: Process m a
reader =
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m a -> Process m a
usingResource Resource m s
res forall a b. (a -> b) -> a -> b
$
do Stream m a
p <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Stream m a)
ref
(a
a, Stream m a
xs) <- forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Stream m a)
ref Stream m a
xs
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map (\Int
i -> forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
reader) [Int
1..Int
n]
splitStreamPrioritising :: (MonadDES m, PriorityQueueStrategy m s p)
=> s
-> [Stream m p]
-> Stream m a
-> Simulation m [Stream m a]
{-# INLINABLE splitStreamPrioritising #-}
splitStreamPrioritising :: forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
s -> [Stream m p] -> Stream m a -> Simulation m [Stream m a]
splitStreamPrioritising s
s [Stream m p]
ps Stream m a
x =
do Ref m (Stream m a)
ref <- forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Stream m a
x
Resource m s
res <- forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Simulation m (Resource m s)
newResource s
s Int
1
let stream :: Stream m a -> Stream m a
stream (Cons Process m (a, Stream m a)
p) = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
z where
z :: Process m (a, Stream m a)
z = do (a
p', Stream m a
ps) <- Process m (a, Stream m a)
p
a
a <- forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
Resource m s -> p -> Process m a -> Process m a
usingResourceWithPriority Resource m s
res a
p' forall a b. (a -> b) -> a -> b
$
do Stream m a
p <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Stream m a)
ref
(a
a, Stream m a
xs) <- forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Stream m a)
ref Stream m a
xs
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a -> Stream m a
stream Stream m a
ps)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map forall {a}. PriorityQueueStrategy m s a => Stream m a -> Stream m a
stream [Stream m p]
ps
splitStreamFiltering :: MonadDES m => [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
{-# INLINABLE splitStreamFiltering #-}
splitStreamFiltering :: forall (m :: * -> *) a.
MonadDES m =>
[a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
splitStreamFiltering = forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
splitStreamFilteringQueueing FCFS
FCFS
splitStreamFilteringQueueing :: (MonadDES m, EnqueueStrategy m s)
=> s
-> [a -> Event m Bool]
-> Stream m a
-> Simulation m [Stream m a]
{-# INLINABLE splitStreamFilteringQueueing #-}
splitStreamFilteringQueueing :: forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
splitStreamFilteringQueueing s
s [a -> Event m Bool]
preds Stream m a
x =
do Ref m (Stream m a)
ref <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Stream m a
x
Resource m s
res <- forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Simulation m (Resource m s)
newResource s
s Int
1
let reader :: (a -> Event m Bool) -> Process m a
reader a -> Event m Bool
pred =
do Maybe a
a <-
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m a -> Process m a
usingResource Resource m s
res forall a b. (a -> b) -> a -> b
$
do Stream m a
p <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Stream m a)
ref
(a
a, Stream m a
xs) <- forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$
do Bool
f <- a -> Event m Bool
pred a
a
if Bool
f
then do forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Stream m a)
ref Stream m a
xs
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
a
else do forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Stream m a)
ref forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
xs))
forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
case Maybe a
a of
Just a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Maybe a
Nothing -> (a -> Event m Bool) -> Process m a
reader a -> Event m Bool
pred
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map (forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Event m Bool) -> Process m a
reader) [a -> Event m Bool]
preds
concatStreams :: MonadDES m => [Stream m a] -> Stream m a
{-# INLINABLE concatStreams #-}
concatStreams :: forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m a
concatStreams = forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> [Stream m a] -> Stream m a
concatQueuedStreams FCFS
FCFS
concatQueuedStreams :: (MonadDES m, EnqueueStrategy m s)
=> s
-> [Stream m a]
-> Stream m a
{-# INLINABLE concatQueuedStreams #-}
concatQueuedStreams :: forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> [Stream m a] -> Stream m a
concatQueuedStreams s
s [Stream m a]
streams = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
z where
z :: Process m (a, Stream m a)
z = do Resource m FCFS
reading <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
Resource m s
writing <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount s
s Int
1 (forall a. a -> Maybe a
Just Int
1)
Resource m FCFS
conting <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
Ref m (Maybe a)
ref <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef forall a. Maybe a
Nothing
let writer :: Stream m a -> Process m b
writer Stream m a
p =
do (a
a, Stream m a
xs) <- forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m s
writing
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref (forall a. a -> Maybe a
Just a
a)
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
reading
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
conting
Stream m a -> Process m b
writer Stream m a
xs
reader :: Process m a
reader =
do forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
reading
Just a
a <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Maybe a)
ref
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref forall a. Maybe a
Nothing
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m s
writing
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Stream m a]
streams forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall {b}. Stream m a -> Process m b
writer
a
a <- Process m a
reader
let xs :: Stream m a
xs = forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess (forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
conting forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process m a
reader)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
xs)
concatPriorityStreams :: (MonadDES m, PriorityQueueStrategy m s p)
=> s
-> [Stream m (p, a)]
-> Stream m a
{-# INLINABLE concatPriorityStreams #-}
concatPriorityStreams :: forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
s -> [Stream m (p, a)] -> Stream m a
concatPriorityStreams s
s [Stream m (p, a)]
streams = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
z where
z :: Process m (a, Stream m a)
z = do Resource m FCFS
reading <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
Resource m s
writing <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount s
s Int
1 (forall a. a -> Maybe a
Just Int
1)
Resource m FCFS
conting <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
Ref m (Maybe a)
ref <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef forall a. Maybe a
Nothing
let writer :: Stream m (p, a) -> Process m b
writer Stream m (p, a)
p =
do ((p
priority, a
a), Stream m (p, a)
xs) <- forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m (p, a)
p
forall (m :: * -> *) s p.
(MonadDES m, PriorityQueueStrategy m s p) =>
Resource m s -> p -> Process m ()
requestResourceWithPriority Resource m s
writing p
priority
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref (forall a. a -> Maybe a
Just a
a)
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
reading
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
conting
Stream m (p, a) -> Process m b
writer Stream m (p, a)
xs
reader :: Process m a
reader =
do forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
reading
Just a
a <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Maybe a)
ref
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref forall a. Maybe a
Nothing
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m s
writing
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Stream m (p, a)]
streams forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall {p} {b}.
PriorityQueueStrategy m s p =>
Stream m (p, a) -> Process m b
writer
a
a <- Process m a
reader
let xs :: Stream m a
xs = forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess (forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
conting forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process m a
reader)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
xs)
mergeStreams :: MonadDES m => Stream m a -> Stream m a -> Stream m a
{-# INLINABLE mergeStreams #-}
mergeStreams :: forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Stream m a -> Stream m a
mergeStreams = forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> Stream m a -> Stream m a -> Stream m a
mergeQueuedStreams FCFS
FCFS
mergeQueuedStreams :: (MonadDES m, EnqueueStrategy m s)
=> s
-> Stream m a
-> Stream m a
-> Stream m a
{-# INLINABLE mergeQueuedStreams #-}
mergeQueuedStreams :: forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> Stream m a -> Stream m a -> Stream m a
mergeQueuedStreams s
s Stream m a
x Stream m a
y = forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> [Stream m a] -> Stream m a
concatQueuedStreams s
s [Stream m a
x, Stream m a
y]
mergePriorityStreams :: (MonadDES m, PriorityQueueStrategy m s p)
=> s
-> Stream m (p, a)
-> Stream m (p, a)
-> Stream m a
{-# INLINABLE mergePriorityStreams #-}
mergePriorityStreams :: forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
s -> Stream m (p, a) -> Stream m (p, a) -> Stream m a
mergePriorityStreams s
s Stream m (p, a)
x Stream m (p, a)
y = forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
s -> [Stream m (p, a)] -> Stream m a
concatPriorityStreams s
s [Stream m (p, a)
x, Stream m (p, a)
y]
emptyStream :: MonadDES m => Stream m a
{-# INLINABLE emptyStream #-}
emptyStream :: forall (m :: * -> *) a. MonadDES m => Stream m a
emptyStream = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall (m :: * -> *) a. MonadDES m => Process m a
neverProcess
consumeStream :: MonadDES m => (a -> Process m ()) -> Stream m a -> Process m ()
{-# INLINABLE consumeStream #-}
consumeStream :: forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m ()) -> Stream m a -> Process m ()
consumeStream a -> Process m ()
f (Cons Process m (a, Stream m a)
s) =
do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
a -> Process m ()
f a
a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m ()) -> Stream m a -> Process m ()
consumeStream a -> Process m ()
f Stream m a
xs
sinkStream :: MonadDES m => Stream m a -> Process m ()
{-# INLINABLE sinkStream #-}
sinkStream :: forall (m :: * -> *) a. MonadDES m => Stream m a -> Process m ()
sinkStream (Cons Process m (a, Stream m a)
s) =
do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
forall (m :: * -> *) a. MonadDES m => Stream m a -> Process m ()
sinkStream Stream m a
xs
prefetchStream :: MonadDES m => Stream m a -> Stream m a
{-# INLINABLE prefetchStream #-}
prefetchStream :: forall (m :: * -> *) a. MonadDES m => Stream m a -> Stream m a
prefetchStream Stream m a
s = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
z where
z :: Process m (a, Stream m a)
z = do Resource m FCFS
reading <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
Resource m FCFS
writing <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
1 (forall a. a -> Maybe a
Just Int
1)
Ref m (Maybe a)
ref <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef forall a. Maybe a
Nothing
let writer :: Stream m a -> Process m b
writer Stream m a
p =
do (a
a, Stream m a
xs) <- forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
writing
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref (forall a. a -> Maybe a
Just a
a)
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
reading
Stream m a -> Process m b
writer Stream m a
xs
reader :: Process m a
reader =
do forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
reading
Just a
a <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Maybe a)
ref
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref forall a. Maybe a
Nothing
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
writing
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess forall a b. (a -> b) -> a -> b
$ forall {b}. Stream m a -> Process m b
writer Stream m a
s
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
reader
queuedSignalStream :: MonadDES m
=> (a -> Event m ())
-> Process m a
-> Signal m a
-> Composite m (Stream m a)
{-# INLINABLE queuedSignalStream #-}
queuedSignalStream :: forall (m :: * -> *) a.
MonadDES m =>
(a -> Event m ())
-> Process m a -> Signal m a -> Composite m (Stream m a)
queuedSignalStream a -> Event m ()
enqueue Process m a
dequeue Signal m a
s =
do DisposableEvent m
h <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
Signal m a -> (a -> Event m ()) -> Event m (DisposableEvent m)
handleSignal Signal m a
s a -> Event m ()
enqueue
forall (m :: * -> *).
Monad m =>
DisposableEvent m -> Composite m ()
disposableComposite DisposableEvent m
h
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
dequeue
signalStream :: MonadDES m => Signal m a -> Composite m (Stream m a)
{-# INLINABLE signalStream #-}
signalStream :: forall (m :: * -> *) a.
MonadDES m =>
Signal m a -> Composite m (Stream m a)
signalStream Signal m a
s =
do FCFSQueue m a
q <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall (m :: * -> *) a. MonadDES m => Simulation m (FCFSQueue m a)
IQ.newFCFSQueue
forall (m :: * -> *) a.
MonadDES m =>
(a -> Event m ())
-> Process m a -> Signal m a -> Composite m (Stream m a)
queuedSignalStream (forall (m :: * -> *) sm so a.
(MonadDES m, EnqueueStrategy m sm, DequeueStrategy m so) =>
Queue m sm so a -> a -> Event m ()
IQ.enqueue FCFSQueue m a
q) (forall (m :: * -> *) sm so a.
(MonadDES m, DequeueStrategy m sm, EnqueueStrategy m so) =>
Queue m sm so a -> Process m a
IQ.dequeue FCFSQueue m a
q) Signal m a
s
streamSignal :: MonadDES m => Stream m a -> Composite m (Signal m a)
{-# INLINABLE streamSignal #-}
streamSignal :: forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Composite m (Signal m a)
streamSignal Stream m a
z =
do SignalSource m a
s <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall (m :: * -> *) a.
MonadDES m =>
Simulation m (SignalSource m a)
newSignalSource
ProcessId m
pid <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall (m :: * -> *). MonadDES m => Simulation m (ProcessId m)
newProcessId
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *).
MonadDES m =>
ProcessId m -> Process m () -> Event m ()
runProcessUsingId ProcessId m
pid forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m ()) -> Stream m a -> Process m ()
consumeStream (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. SignalSource m a -> a -> Event m ()
triggerSignal SignalSource m a
s) Stream m a
z
forall (m :: * -> *).
Monad m =>
DisposableEvent m -> Composite m ()
disposableComposite forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *). Event m () -> DisposableEvent m
DisposableEvent forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *). MonadDES m => ProcessId m -> Event m ()
cancelProcessWithId ProcessId m
pid
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. SignalSource m a -> Signal m a
publishSignal SignalSource m a
s
arrivalStream :: MonadDES m => Stream m a -> Stream m (Arrival a)
{-# INLINABLE arrivalStream #-}
arrivalStream :: forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Stream m (Arrival a)
arrivalStream Stream m a
s = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$ forall {m :: * -> *} {a}.
MonadDES m =>
Stream m a
-> Maybe Double -> Process m (Arrival a, Stream m (Arrival a))
loop Stream m a
s forall a. Maybe a
Nothing where
loop :: Stream m a
-> Maybe Double -> Process m (Arrival a, Stream m (Arrival a))
loop Stream m a
s Maybe Double
t0 = do (a
a, Stream m a
xs) <- forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
Double
t <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
DynamicsLift t m =>
Dynamics m a -> t m a
liftDynamics forall (m :: * -> *). Monad m => Dynamics m Double
time
let b :: Arrival a
b = Arrival { arrivalValue :: a
arrivalValue = a
a,
arrivalTime :: Double
arrivalTime = Double
t,
arrivalDelay :: Maybe Double
arrivalDelay =
case Maybe Double
t0 of
Maybe Double
Nothing -> forall a. Maybe a
Nothing
Just Double
t0 -> forall a. a -> Maybe a
Just (Double
t forall a. Num a => a -> a -> a
- Double
t0) }
forall (m :: * -> *) a. Monad m => a -> m a
return (Arrival a
b, forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$ Stream m a
-> Maybe Double -> Process m (Arrival a, Stream m (Arrival a))
loop Stream m a
xs (forall a. a -> Maybe a
Just Double
t))
delayStream :: MonadDES m => a -> Stream m a -> Stream m a
{-# INLINABLE delayStream #-}
delayStream :: forall (m :: * -> *) a. MonadDES m => a -> Stream m a -> Stream m a
delayStream a
a0 Stream m a
s = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return (a
a0, Stream m a
s)
singletonStream :: MonadDES m => a -> Stream m a
{-# INLINABLE singletonStream #-}
singletonStream :: forall (m :: * -> *) a. MonadDES m => a -> Stream m a
singletonStream a
a = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall (m :: * -> *) a. MonadDES m => Stream m a
emptyStream)
joinStream :: MonadDES m => Process m (Stream m a) -> Stream m a
{-# INLINABLE joinStream #-}
joinStream :: forall (m :: * -> *) a.
MonadDES m =>
Process m (Stream m a) -> Stream m a
joinStream Process m (Stream m a)
m = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$ Process m (Stream m a)
m forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream
failoverStream :: MonadDES m => [Stream m a] -> Stream m a
{-# INLINABLE failoverStream #-}
failoverStream :: forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m a
failoverStream [Stream m a]
ps = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
z where
z :: Process m (a, Stream m a)
z = do Resource m FCFS
reading <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
Resource m FCFS
writing <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
Ref m (Maybe a)
ref <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef forall a. Maybe a
Nothing
ProcessId m
pid <- forall (m :: * -> *). MonadDES m => Process m (ProcessId m)
processId
let writer :: Stream m a -> Process m b
writer Stream m a
p =
do forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
writing
ProcessId m
pid' <- forall (m :: * -> *). MonadDES m => Process m (ProcessId m)
processId
(a
a, Stream m a
xs) <-
forall (m :: * -> *) a b.
MonadDES m =>
Process m a -> Process m b -> Process m a
finallyProcess (forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p) forall a b. (a -> b) -> a -> b
$
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$
do Bool
cancelled' <- forall (m :: * -> *). MonadDES m => ProcessId m -> Event m Bool
processCancelled ProcessId m
pid'
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cancelled' forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Event m ()
releaseResourceWithinEvent Resource m FCFS
writing
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref (forall a. a -> Maybe a
Just a
a)
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
reading
Stream m a -> Process m b
writer Stream m a
xs
reader :: Process m a
reader =
do forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
writing
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
reading
Just a
a <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Maybe a)
ref
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref forall a. Maybe a
Nothing
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
loop :: [Stream m a] -> Process m ()
loop [] = forall (m :: * -> *) a. Monad m => a -> m a
return ()
loop (Stream m a
p: [Stream m a]
ps) =
do ProcessId m
pid' <- forall (m :: * -> *). MonadDES m => Process m (ProcessId m)
processId
DisposableEvent m
h' <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
Signal m a -> (a -> Event m ()) -> Event m (DisposableEvent m)
handleSignal (forall (m :: * -> *). MonadDES m => ProcessId m -> Signal m ()
processCancelling ProcessId m
pid) forall a b. (a -> b) -> a -> b
$ \() ->
forall (m :: * -> *). MonadDES m => ProcessId m -> Event m ()
cancelProcessWithId ProcessId m
pid'
forall (m :: * -> *) a b.
MonadDES m =>
Process m a -> Process m b -> Process m a
finallyProcess (forall {b}. Stream m a -> Process m b
writer Stream m a
p) forall a b. (a -> b) -> a -> b
$
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$
do forall (m :: * -> *). DisposableEvent m -> Event m ()
disposeEvent DisposableEvent m
h'
Bool
cancelled <- forall (m :: * -> *). MonadDES m => ProcessId m -> Event m Bool
processCancelled ProcessId m
pid
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
cancelled forall a b. (a -> b) -> a -> b
$
do Bool
cancelled' <- forall (m :: * -> *). MonadDES m => ProcessId m -> Event m Bool
processCancelled ProcessId m
pid'
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
cancelled' forall a b. (a -> b) -> a -> b
$
forall a. HasCallStack => [Char] -> a
error [Char]
"Expected the sub-process to be cancelled: failoverStream"
forall (m :: * -> *). MonadDES m => Process m () -> Event m ()
runProcess forall a b. (a -> b) -> a -> b
$ [Stream m a] -> Process m ()
loop [Stream m a]
ps
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadDES m => Process m () -> Event m ()
runProcess forall a b. (a -> b) -> a -> b
$ [Stream m a] -> Process m ()
loop [Stream m a]
ps
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
reader
takeStream :: MonadDES m => Int -> Stream m a -> Stream m a
{-# INLINABLE takeStream #-}
takeStream :: forall (m :: * -> *) a.
MonadDES m =>
Int -> Stream m a -> Stream m a
takeStream Int
n Stream m a
s
| Int
n forall a. Ord a => a -> a -> Bool
<= Int
0 = forall (m :: * -> *) a. MonadDES m => Stream m a
emptyStream
| Bool
otherwise =
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do (a
a, Stream m a
xs) <- forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall (m :: * -> *) a.
MonadDES m =>
Int -> Stream m a -> Stream m a
takeStream (Int
n forall a. Num a => a -> a -> a
- Int
1) Stream m a
xs)
takeStreamWhile :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
{-# INLINABLE takeStreamWhile #-}
takeStreamWhile :: forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
takeStreamWhile a -> Bool
p Stream m a
s =
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do (a
a, Stream m a
xs) <- forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
if a -> Bool
p a
a
then forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
takeStreamWhile a -> Bool
p Stream m a
xs)
else forall (m :: * -> *) a. MonadDES m => Process m a
neverProcess
takeStreamWhileM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
{-# INLINABLE takeStreamWhileM #-}
takeStreamWhileM :: forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m Bool) -> Stream m a -> Stream m a
takeStreamWhileM a -> Process m Bool
p Stream m a
s =
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do (a
a, Stream m a
xs) <- forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
Bool
f <- a -> Process m Bool
p a
a
if Bool
f
then forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m Bool) -> Stream m a -> Stream m a
takeStreamWhileM a -> Process m Bool
p Stream m a
xs)
else forall (m :: * -> *) a. MonadDES m => Process m a
neverProcess
dropStream :: MonadDES m => Int -> Stream m a -> Stream m a
{-# INLINABLE dropStream #-}
dropStream :: forall (m :: * -> *) a.
MonadDES m =>
Int -> Stream m a -> Stream m a
dropStream Int
n Stream m a
s
| Int
n forall a. Ord a => a -> a -> Bool
<= Int
0 = Stream m a
s
| Bool
otherwise =
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do (a
a, Stream m a
xs) <- forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadDES m =>
Int -> Stream m a -> Stream m a
dropStream (Int
n forall a. Num a => a -> a -> a
- Int
1) Stream m a
xs
dropStreamWhile :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
{-# INLINABLE dropStreamWhile #-}
dropStreamWhile :: forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
dropStreamWhile a -> Bool
p Stream m a
s =
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do (a
a, Stream m a
xs) <- forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
if a -> Bool
p a
a
then forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
dropStreamWhile a -> Bool
p Stream m a
xs
else forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
xs)
dropStreamWhileM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
{-# INLINABLE dropStreamWhileM #-}
dropStreamWhileM :: forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m Bool) -> Stream m a -> Stream m a
dropStreamWhileM a -> Process m Bool
p Stream m a
s =
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$
do (a
a, Stream m a
xs) <- forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
Bool
f <- a -> Process m Bool
p a
a
if Bool
f
then forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m Bool) -> Stream m a -> Stream m a
dropStreamWhileM a -> Process m Bool
p Stream m a
xs
else forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
xs)
cloneStream :: MonadDES m => Int -> Stream m a -> Simulation m [Stream m a]
{-# INLINABLE cloneStream #-}
cloneStream :: forall (m :: * -> *) a.
MonadDES m =>
Int -> Stream m a -> Simulation m [Stream m a]
cloneStream Int
n Stream m a
s =
do [FCFSQueue m a]
qs <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1..Int
n] forall a b. (a -> b) -> a -> b
$ \Int
i -> forall (m :: * -> *) a. MonadDES m => Simulation m (FCFSQueue m a)
IQ.newFCFSQueue
FCFSResource m
rs <- forall (m :: * -> *).
MonadDES m =>
Int -> Simulation m (FCFSResource m)
newFCFSResource Int
1
Ref m (Stream m a)
ref <- forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Stream m a
s
let reader :: a -> Queue m sm so a -> Process m a
reader a
m Queue m sm so a
q =
do Maybe a
a <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) sm so a.
(MonadDES m, DequeueStrategy m sm) =>
Queue m sm so a -> Event m (Maybe a)
IQ.tryDequeue Queue m sm so a
q
case Maybe a
a of
Just a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Maybe a
Nothing ->
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m a -> Process m a
usingResource FCFSResource m
rs forall a b. (a -> b) -> a -> b
$
do Maybe a
a <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) sm so a.
(MonadDES m, DequeueStrategy m sm) =>
Queue m sm so a -> Event m (Maybe a)
IQ.tryDequeue Queue m sm so a
q
case Maybe a
a of
Just a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Maybe a
Nothing ->
do Stream m a
s <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Stream m a)
ref
(a
a, Stream m a
xs) <- forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Stream m a)
ref Stream m a
xs
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (forall a b. [a] -> [b] -> [(a, b)]
zip [a
1..] [FCFSQueue m a]
qs) forall a b. (a -> b) -> a -> b
$ \(a
i, FCFSQueue m a
q) ->
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (a
i forall a. Eq a => a -> a -> Bool
== a
m) forall a b. (a -> b) -> a -> b
$
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) sm so a.
(MonadDES m, EnqueueStrategy m sm, DequeueStrategy m so) =>
Queue m sm so a -> a -> Event m ()
IQ.enqueue FCFSQueue m a
q a
a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM (forall a b. [a] -> [b] -> [(a, b)]
zip [Integer
1..] [FCFSQueue m a]
qs) forall a b. (a -> b) -> a -> b
$ \(Integer
i, FCFSQueue m a
q) ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess forall a b. (a -> b) -> a -> b
$ forall {a} {sm} {so}.
(Num a, Enum a, Eq a, DequeueStrategy m sm) =>
a -> Queue m sm so a -> Process m a
reader Integer
i FCFSQueue m a
q
firstArrivalStream :: MonadDES m => Int -> Stream m a -> Stream m a
{-# INLINABLE firstArrivalStream #-}
firstArrivalStream :: forall (m :: * -> *) a.
MonadDES m =>
Int -> Stream m a -> Stream m a
firstArrivalStream Int
n Stream m a
s = forall (m :: * -> *) acc a b.
MonadDES m =>
(acc -> a -> Process m (acc, Maybe b))
-> acc -> Stream m a -> Stream m b
assembleAccumStream forall {m :: * -> *} {a}.
Monad m =>
(Int, Maybe a) -> a -> m ((Int, Maybe a), Maybe a)
f (Int
1, forall a. Maybe a
Nothing) Stream m a
s
where f :: (Int, Maybe a) -> a -> m ((Int, Maybe a), Maybe a)
f (Int
i, Maybe a
a0) a
a =
let a0' :: Maybe a
a0' = forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a -> a
fromMaybe a
a Maybe a
a0
in if Int
i forall a. Integral a => a -> a -> a
`mod` Int
n forall a. Eq a => a -> a -> Bool
== Int
0
then forall (m :: * -> *) a. Monad m => a -> m a
return ((Int
1, forall a. Maybe a
Nothing), Maybe a
a0')
else forall (m :: * -> *) a. Monad m => a -> m a
return ((Int
i forall a. Num a => a -> a -> a
+ Int
1, Maybe a
a0'), forall a. Maybe a
Nothing)
lastArrivalStream :: MonadDES m => Int -> Stream m a -> Stream m a
{-# INLINABLE lastArrivalStream #-}
lastArrivalStream :: forall (m :: * -> *) a.
MonadDES m =>
Int -> Stream m a -> Stream m a
lastArrivalStream Int
n Stream m a
s = forall (m :: * -> *) acc a b.
MonadDES m =>
(acc -> a -> Process m (acc, Maybe b))
-> acc -> Stream m a -> Stream m b
assembleAccumStream forall {m :: * -> *} {a}. Monad m => Int -> a -> m (Int, Maybe a)
f Int
1 Stream m a
s
where f :: Int -> a -> m (Int, Maybe a)
f Int
i a
a =
if Int
i forall a. Integral a => a -> a -> a
`mod` Int
n forall a. Eq a => a -> a -> Bool
== Int
0
then forall (m :: * -> *) a. Monad m => a -> m a
return (Int
1, forall a. a -> Maybe a
Just a
a)
else forall (m :: * -> *) a. Monad m => a -> m a
return (Int
i forall a. Num a => a -> a -> a
+ Int
1, forall a. Maybe a
Nothing)
assembleAccumStream :: MonadDES m => (acc -> a -> Process m (acc, Maybe b)) -> acc -> Stream m a -> Stream m b
{-# INLINABLE assembleAccumStream #-}
assembleAccumStream :: forall (m :: * -> *) acc a b.
MonadDES m =>
(acc -> a -> Process m (acc, Maybe b))
-> acc -> Stream m a -> Stream m b
assembleAccumStream acc -> a -> Process m (acc, Maybe b)
f acc
acc Stream m a
s =
forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
filterStream forall a. Maybe a -> Bool
isJust forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) acc a b.
MonadDES m =>
(acc -> a -> Process m (acc, b)) -> acc -> Stream m a -> Stream m b
accumStream acc -> a -> Process m (acc, Maybe b)
f acc
acc Stream m a
s
traceStream :: MonadDES m
=> Maybe String
-> Maybe String
-> Stream m a
-> Stream m a
{-# INLINABLE traceStream #-}
traceStream :: forall (m :: * -> *) a.
MonadDES m =>
Maybe [Char] -> Maybe [Char] -> Stream m a -> Stream m a
traceStream Maybe [Char]
request Maybe [Char]
response Stream m a
s = forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$ forall {m :: * -> *} {a}.
MonadDES m =>
Stream m a -> Process m (a, Stream m a)
loop Stream m a
s where
loop :: Stream m a -> Process m (a, Stream m a)
loop Stream m a
s = do (a
a, Stream m a
xs) <-
case Maybe [Char]
request of
Maybe [Char]
Nothing -> forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
Just [Char]
message ->
forall (m :: * -> *) a.
MonadDES m =>
[Char] -> Process m a -> Process m a
traceProcess [Char]
message forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
case Maybe [Char]
response of
Maybe [Char]
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$ Stream m a -> Process m (a, Stream m a)
loop Stream m a
xs)
Just [Char]
message ->
forall (m :: * -> *) a.
MonadDES m =>
[Char] -> Process m a -> Process m a
traceProcess [Char]
message forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons forall a b. (a -> b) -> a -> b
$ Stream m a -> Process m (a, Stream m a)
loop Stream m a
xs)