module Simulation.Aivika.Stream
(
Stream(..),
emptyStream,
mergeStreams,
mergeQueuedStreams,
mergePriorityStreams,
concatStreams,
concatQueuedStreams,
concatPriorityStreams,
splitStream,
splitStreamQueueing,
splitStreamPrioritising,
splitStreamFiltering,
splitStreamFilteringQueueing,
streamUsingId,
prefetchStream,
delayStream,
arrivalStream,
memoStream,
zipStreamSeq,
zipStreamParallel,
zip3StreamSeq,
zip3StreamParallel,
unzipStream,
streamSeq,
streamParallel,
consumeStream,
sinkStream,
repeatProcess,
mapStream,
mapStreamM,
accumStream,
apStream,
apStreamM,
filterStream,
filterStreamM,
takeStream,
takeStreamWhile,
takeStreamWhileM,
dropStream,
dropStreamWhile,
dropStreamWhileM,
singletonStream,
joinStream,
failoverStream,
signalStream,
streamSignal,
queuedSignalStream,
leftStream,
rightStream,
replaceLeftStream,
replaceRightStream,
partitionEitherStream,
cloneStream,
firstArrivalStream,
lastArrivalStream,
assembleAccumStream,
traceStream) where
import Data.IORef
import Data.Maybe
import Data.Monoid hiding ((<>))
import Data.Semigroup (Semigroup(..))
import Data.List.NonEmpty (NonEmpty((:|)))
import Control.Applicative
import Control.Monad
import Control.Monad.Trans
import Simulation.Aivika.Simulation
import Simulation.Aivika.Dynamics
import Simulation.Aivika.Event
import Simulation.Aivika.Composite
import Simulation.Aivika.Cont
import Simulation.Aivika.Process
import Simulation.Aivika.Signal
import Simulation.Aivika.Resource.Base
import Simulation.Aivika.QueueStrategy
import qualified Simulation.Aivika.Queue.Infinite.Base as IQ
import Simulation.Aivika.Internal.Arrival
newtype Stream a = Cons { forall a. Stream a -> Process (a, Stream a)
runStream :: Process (a, Stream a)
}
instance Functor Stream where
fmap :: forall a b. (a -> b) -> Stream a -> Stream b
fmap = forall a b. (a -> b) -> Stream a -> Stream b
mapStream
instance Applicative Stream where
pure :: forall a. a -> Stream a
pure a
a = let y :: Stream a
y = forall a. Process (a, Stream a) -> Stream a
Cons (forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
y)) in Stream a
y
<*> :: forall a b. Stream (a -> b) -> Stream a -> Stream b
(<*>) = forall a b. Stream (a -> b) -> Stream a -> Stream b
apStream
instance Alternative Stream where
empty :: forall a. Stream a
empty = forall a. Stream a
emptyStream
<|> :: forall a. Stream a -> Stream a -> Stream a
(<|>) = forall a. Stream a -> Stream a -> Stream a
mergeStreams
instance Semigroup (Stream a) where
<> :: Stream a -> Stream a -> Stream a
(<>) = forall a. Stream a -> Stream a -> Stream a
mergeStreams
sconcat :: NonEmpty (Stream a) -> Stream a
sconcat (Stream a
h :| [Stream a]
t) = forall a. [Stream a] -> Stream a
concatStreams (Stream a
h forall a. a -> [a] -> [a]
: [Stream a]
t)
instance Monoid (Stream a) where
mempty :: Stream a
mempty = forall a. Stream a
emptyStream
mappend :: Stream a -> Stream a -> Stream a
mappend = forall a. Semigroup a => a -> a -> a
(<>)
mconcat :: [Stream a] -> Stream a
mconcat = forall a. [Stream a] -> Stream a
concatStreams
streamUsingId :: ProcessId -> Stream a -> Stream a
streamUsingId :: forall a. ProcessId -> Stream a -> Stream a
streamUsingId ProcessId
pid (Cons Process (a, Stream a)
s) =
forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ forall a. ProcessId -> Process a -> Process a
processUsingId ProcessId
pid Process (a, Stream a)
s
memoStream :: Stream a -> Simulation (Stream a)
memoStream :: forall a. Stream a -> Simulation (Stream a)
memoStream (Cons Process (a, Stream a)
s) =
do Process (a, Stream a)
p <- forall a. Process a -> Simulation (Process a)
memoProcess forall a b. (a -> b) -> a -> b
$
do ~(a
x, Stream a
xs) <- Process (a, Stream a)
s
Stream a
xs' <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall a. Stream a -> Simulation (Stream a)
memoStream Stream a
xs
forall (m :: * -> *) a. Monad m => a -> m a
return (a
x, Stream a
xs')
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
p)
zipStreamSeq :: Stream a -> Stream b -> Stream (a, b)
zipStreamSeq :: forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamSeq (Cons Process (a, Stream a)
sa) (Cons Process (b, Stream b)
sb) = forall a. Process (a, Stream a) -> Stream a
Cons Process ((a, b), Stream (a, b))
y where
y :: Process ((a, b), Stream (a, b))
y = do ~(a
x, Stream a
xs) <- Process (a, Stream a)
sa
~(b
y, Stream b
ys) <- Process (b, Stream b)
sb
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y), forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamSeq Stream a
xs Stream b
ys)
zipStreamParallel :: Stream a -> Stream b -> Stream (a, b)
zipStreamParallel :: forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamParallel (Cons Process (a, Stream a)
sa) (Cons Process (b, Stream b)
sb) = forall a. Process (a, Stream a) -> Stream a
Cons Process ((a, b), Stream (a, b))
y where
y :: Process ((a, b), Stream (a, b))
y = do ~((a
x, Stream a
xs), (b
y, Stream b
ys)) <- forall a b. Process a -> Process b -> Process (a, b)
zipProcessParallel Process (a, Stream a)
sa Process (b, Stream b)
sb
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y), forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamParallel Stream a
xs Stream b
ys)
zip3StreamSeq :: Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamSeq :: forall a b c. Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamSeq (Cons Process (a, Stream a)
sa) (Cons Process (b, Stream b)
sb) (Cons Process (c, Stream c)
sc) = forall a. Process (a, Stream a) -> Stream a
Cons Process ((a, b, c), Stream (a, b, c))
y where
y :: Process ((a, b, c), Stream (a, b, c))
y = do ~(a
x, Stream a
xs) <- Process (a, Stream a)
sa
~(b
y, Stream b
ys) <- Process (b, Stream b)
sb
~(c
z, Stream c
zs) <- Process (c, Stream c)
sc
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y, c
z), forall a b c. Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamSeq Stream a
xs Stream b
ys Stream c
zs)
zip3StreamParallel :: Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamParallel :: forall a b c. Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamParallel (Cons Process (a, Stream a)
sa) (Cons Process (b, Stream b)
sb) (Cons Process (c, Stream c)
sc) = forall a. Process (a, Stream a) -> Stream a
Cons Process ((a, b, c), Stream (a, b, c))
y where
y :: Process ((a, b, c), Stream (a, b, c))
y = do ~((a
x, Stream a
xs), (b
y, Stream b
ys), (c
z, Stream c
zs)) <- forall a b c.
Process a -> Process b -> Process c -> Process (a, b, c)
zip3ProcessParallel Process (a, Stream a)
sa Process (b, Stream b)
sb Process (c, Stream c)
sc
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y, c
z), forall a b c. Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamParallel Stream a
xs Stream b
ys Stream c
zs)
unzipStream :: Stream (a, b) -> Simulation (Stream a, Stream b)
unzipStream :: forall a b. Stream (a, b) -> Simulation (Stream a, Stream b)
unzipStream Stream (a, b)
s =
do Stream (a, b)
s' <- forall a. Stream a -> Simulation (Stream a)
memoStream Stream (a, b)
s
let sa :: Stream a
sa = forall a b. (a -> b) -> Stream a -> Stream b
mapStream forall a b. (a, b) -> a
fst Stream (a, b)
s'
sb :: Stream b
sb = forall a b. (a -> b) -> Stream a -> Stream b
mapStream forall a b. (a, b) -> b
snd Stream (a, b)
s'
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream a
sa, Stream b
sb)
streamSeq :: [Stream a] -> Stream [a]
streamSeq :: forall a. [Stream a] -> Stream [a]
streamSeq [Stream a]
xs = forall a. Process (a, Stream a) -> Stream a
Cons Process ([a], Stream [a])
y where
y :: Process ([a], Stream [a])
y = do [(a, Stream a)]
ps <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Stream a]
xs forall a. Stream a -> Process (a, Stream a)
runStream
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst [(a, Stream a)]
ps, forall a. [Stream a] -> Stream [a]
streamSeq forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> b
snd [(a, Stream a)]
ps)
streamParallel :: [Stream a] -> Stream [a]
streamParallel :: forall a. [Stream a] -> Stream [a]
streamParallel [Stream a]
xs = forall a. Process (a, Stream a) -> Stream a
Cons Process ([a], Stream [a])
y where
y :: Process ([a], Stream [a])
y = do [(a, Stream a)]
ps <- forall a. [Process a] -> Process [a]
processParallel forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map forall a. Stream a -> Process (a, Stream a)
runStream [Stream a]
xs
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> a
fst [(a, Stream a)]
ps, forall a. [Stream a] -> Stream [a]
streamParallel forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map forall a b. (a, b) -> b
snd [(a, Stream a)]
ps)
repeatProcess :: Process a -> Stream a
repeatProcess :: forall a. Process a -> Stream a
repeatProcess Process a
p = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
y where
y :: Process (a, Stream a)
y = do a
a <- Process a
p
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. Process a -> Stream a
repeatProcess Process a
p)
mapStream :: (a -> b) -> Stream a -> Stream b
mapStream :: forall a b. (a -> b) -> Stream a -> Stream b
mapStream a -> b
f (Cons Process (a, Stream a)
s) = forall a. Process (a, Stream a) -> Stream a
Cons Process (b, Stream b)
y where
y :: Process (b, Stream b)
y = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b
f a
a, forall a b. (a -> b) -> Stream a -> Stream b
mapStream a -> b
f Stream a
xs)
mapStreamM :: (a -> Process b) -> Stream a -> Stream b
mapStreamM :: forall a b. (a -> Process b) -> Stream a -> Stream b
mapStreamM a -> Process b
f (Cons Process (a, Stream a)
s) = forall a. Process (a, Stream a) -> Stream a
Cons Process (b, Stream b)
y where
y :: Process (b, Stream b)
y = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
b
b <- a -> Process b
f a
a
forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, forall a b. (a -> Process b) -> Stream a -> Stream b
mapStreamM a -> Process b
f Stream a
xs)
accumStream :: (acc -> a -> Process (acc, b)) -> acc -> Stream a -> Stream b
accumStream :: forall acc a b.
(acc -> a -> Process (acc, b)) -> acc -> Stream a -> Stream b
accumStream acc -> a -> Process (acc, b)
f acc
acc Stream a
xs = forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ Stream a -> acc -> Process (b, Stream b)
loop Stream a
xs acc
acc where
loop :: Stream a -> acc -> Process (b, Stream b)
loop (Cons Process (a, Stream a)
s) acc
acc =
do (a
a, Stream a
xs) <- Process (a, Stream a)
s
(acc
acc', b
b) <- acc -> a -> Process (acc, b)
f acc
acc a
a
forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ Stream a -> acc -> Process (b, Stream b)
loop Stream a
xs acc
acc')
apStream :: Stream (a -> b) -> Stream a -> Stream b
apStream :: forall a b. Stream (a -> b) -> Stream a -> Stream b
apStream (Cons Process (a -> b, Stream (a -> b))
sf) (Cons Process (a, Stream a)
sa) = forall a. Process (a, Stream a) -> Stream a
Cons Process (b, Stream b)
y where
y :: Process (b, Stream b)
y = do (a -> b
f, Stream (a -> b)
sf') <- Process (a -> b, Stream (a -> b))
sf
(a
a, Stream a
sa') <- Process (a, Stream a)
sa
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b
f a
a, forall a b. Stream (a -> b) -> Stream a -> Stream b
apStream Stream (a -> b)
sf' Stream a
sa')
apStreamM :: Stream (a -> Process b) -> Stream a -> Stream b
apStreamM :: forall a b. Stream (a -> Process b) -> Stream a -> Stream b
apStreamM (Cons Process (a -> Process b, Stream (a -> Process b))
sf) (Cons Process (a, Stream a)
sa) = forall a. Process (a, Stream a) -> Stream a
Cons Process (b, Stream b)
y where
y :: Process (b, Stream b)
y = do (a -> Process b
f, Stream (a -> Process b)
sf') <- Process (a -> Process b, Stream (a -> Process b))
sf
(a
a, Stream a
sa') <- Process (a, Stream a)
sa
b
x <- a -> Process b
f a
a
forall (m :: * -> *) a. Monad m => a -> m a
return (b
x, forall a b. Stream (a -> Process b) -> Stream a -> Stream b
apStreamM Stream (a -> Process b)
sf' Stream a
sa')
filterStream :: (a -> Bool) -> Stream a -> Stream a
filterStream :: forall a. (a -> Bool) -> Stream a -> Stream a
filterStream a -> Bool
p (Cons Process (a, Stream a)
s) = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
y where
y :: Process (a, Stream a)
y = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
if a -> Bool
p a
a
then forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. (a -> Bool) -> Stream a -> Stream a
filterStream a -> Bool
p Stream a
xs)
else let Cons Process (a, Stream a)
z = forall a. (a -> Bool) -> Stream a -> Stream a
filterStream a -> Bool
p Stream a
xs in Process (a, Stream a)
z
filterStreamM :: (a -> Process Bool) -> Stream a -> Stream a
filterStreamM :: forall a. (a -> Process Bool) -> Stream a -> Stream a
filterStreamM a -> Process Bool
p (Cons Process (a, Stream a)
s) = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
y where
y :: Process (a, Stream a)
y = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
Bool
b <- a -> Process Bool
p a
a
if Bool
b
then forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. (a -> Process Bool) -> Stream a -> Stream a
filterStreamM a -> Process Bool
p Stream a
xs)
else let Cons Process (a, Stream a)
z = forall a. (a -> Process Bool) -> Stream a -> Stream a
filterStreamM a -> Process Bool
p Stream a
xs in Process (a, Stream a)
z
leftStream :: Stream (Either a b) -> Stream a
leftStream :: forall a b. Stream (Either a b) -> Stream a
leftStream (Cons Process (Either a b, Stream (Either a b))
s) = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
y where
y :: Process (a, Stream a)
y = do (Either a b
a, Stream (Either a b)
xs) <- Process (Either a b, Stream (Either a b))
s
case Either a b
a of
Left a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a b. Stream (Either a b) -> Stream a
leftStream Stream (Either a b)
xs)
Right b
_ -> let Cons Process (a, Stream a)
z = forall a b. Stream (Either a b) -> Stream a
leftStream Stream (Either a b)
xs in Process (a, Stream a)
z
rightStream :: Stream (Either a b) -> Stream b
rightStream :: forall a b. Stream (Either a b) -> Stream b
rightStream (Cons Process (Either a b, Stream (Either a b))
s) = forall a. Process (a, Stream a) -> Stream a
Cons Process (b, Stream b)
y where
y :: Process (b, Stream b)
y = do (Either a b
a, Stream (Either a b)
xs) <- Process (Either a b, Stream (Either a b))
s
case Either a b
a of
Left a
_ -> let Cons Process (b, Stream b)
z = forall a b. Stream (Either a b) -> Stream b
rightStream Stream (Either a b)
xs in Process (b, Stream b)
z
Right b
a -> forall (m :: * -> *) a. Monad m => a -> m a
return (b
a, forall a b. Stream (Either a b) -> Stream b
rightStream Stream (Either a b)
xs)
replaceLeftStream :: Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream :: forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream (Cons Process (Either a b, Stream (Either a b))
sab) (ys0 :: Stream c
ys0@(Cons Process (c, Stream c)
sc)) = forall a. Process (a, Stream a) -> Stream a
Cons Process (Either c b, Stream (Either c b))
z where
z :: Process (Either c b, Stream (Either c b))
z = do (Either a b
a, Stream (Either a b)
xs) <- Process (Either a b, Stream (Either a b))
sab
case Either a b
a of
Left a
_ ->
do (c
b, Stream c
ys) <- Process (c, Stream c)
sc
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left c
b, forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream Stream (Either a b)
xs Stream c
ys)
Right b
a ->
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right b
a, forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream Stream (Either a b)
xs Stream c
ys0)
replaceRightStream :: Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream :: forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream (Cons Process (Either a b, Stream (Either a b))
sab) (ys0 :: Stream c
ys0@(Cons Process (c, Stream c)
sc)) = forall a. Process (a, Stream a) -> Stream a
Cons Process (Either a c, Stream (Either a c))
z where
z :: Process (Either a c, Stream (Either a c))
z = do (Either a b
a, Stream (Either a b)
xs) <- Process (Either a b, Stream (Either a b))
sab
case Either a b
a of
Right b
_ ->
do (c
b, Stream c
ys) <- Process (c, Stream c)
sc
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either a b
Right c
b, forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream Stream (Either a b)
xs Stream c
ys)
Left a
a ->
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. a -> Either a b
Left a
a, forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream Stream (Either a b)
xs Stream c
ys0)
partitionEitherStream :: Stream (Either a b) -> Simulation (Stream a, Stream b)
partitionEitherStream :: forall a b. Stream (Either a b) -> Simulation (Stream a, Stream b)
partitionEitherStream Stream (Either a b)
s =
do Stream (Either a b)
s' <- forall a. Stream a -> Simulation (Stream a)
memoStream Stream (Either a b)
s
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. Stream (Either a b) -> Stream a
leftStream Stream (Either a b)
s', forall a b. Stream (Either a b) -> Stream b
rightStream Stream (Either a b)
s')
splitStream :: Int -> Stream a -> Simulation [Stream a]
splitStream :: forall a. Int -> Stream a -> Simulation [Stream a]
splitStream = forall s a.
EnqueueStrategy s =>
s -> Int -> Stream a -> Simulation [Stream a]
splitStreamQueueing FCFS
FCFS
splitStreamQueueing :: EnqueueStrategy s
=> s
-> Int
-> Stream a
-> Simulation [Stream a]
splitStreamQueueing :: forall s a.
EnqueueStrategy s =>
s -> Int -> Stream a -> Simulation [Stream a]
splitStreamQueueing s
s Int
n Stream a
x =
do IORef (Stream a)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef Stream a
x
Resource s
res <- forall s. QueueStrategy s => s -> Int -> Simulation (Resource s)
newResource s
s Int
1
let reader :: Process a
reader =
forall s a.
EnqueueStrategy s =>
Resource s -> Process a -> Process a
usingResource Resource s
res forall a b. (a -> b) -> a -> b
$
do Stream a
p <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Stream a)
ref
(a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Stream a)
ref Stream a
xs
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map (\Int
i -> forall a. Process a -> Stream a
repeatProcess Process a
reader) [Int
1..Int
n]
splitStreamPrioritising :: PriorityQueueStrategy s p
=> s
-> [Stream p]
-> Stream a
-> Simulation [Stream a]
splitStreamPrioritising :: forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream p] -> Stream a -> Simulation [Stream a]
splitStreamPrioritising s
s [Stream p]
ps Stream a
x =
do IORef (Stream a)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef Stream a
x
Resource s
res <- forall s. QueueStrategy s => s -> Int -> Simulation (Resource s)
newResource s
s Int
1
let stream :: Stream a -> Stream a
stream (Cons Process (a, Stream a)
p) = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
z where
z :: Process (a, Stream a)
z = do (a
p', Stream a
ps) <- Process (a, Stream a)
p
a
a <- forall s p a.
PriorityQueueStrategy s p =>
Resource s -> p -> Process a -> Process a
usingResourceWithPriority Resource s
res a
p' forall a b. (a -> b) -> a -> b
$
do Stream a
p <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Stream a)
ref
(a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Stream a)
ref Stream a
xs
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a -> Stream a
stream Stream a
ps)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map forall {a}. PriorityQueueStrategy s a => Stream a -> Stream a
stream [Stream p]
ps
splitStreamFiltering :: [a -> Event Bool] -> Stream a -> Simulation [Stream a]
splitStreamFiltering :: forall a. [a -> Event Bool] -> Stream a -> Simulation [Stream a]
splitStreamFiltering = forall s a.
EnqueueStrategy s =>
s -> [a -> Event Bool] -> Stream a -> Simulation [Stream a]
splitStreamFilteringQueueing FCFS
FCFS
splitStreamFilteringQueueing :: EnqueueStrategy s
=> s
-> [a -> Event Bool]
-> Stream a
-> Simulation [Stream a]
splitStreamFilteringQueueing :: forall s a.
EnqueueStrategy s =>
s -> [a -> Event Bool] -> Stream a -> Simulation [Stream a]
splitStreamFilteringQueueing s
s [a -> Event Bool]
preds Stream a
x =
do IORef (Stream a)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef Stream a
x
Resource s
res <- forall s. QueueStrategy s => s -> Int -> Simulation (Resource s)
newResource s
s Int
1
let reader :: (a -> Event Bool) -> Process a
reader a -> Event Bool
pred =
do Maybe a
a <-
forall s a.
EnqueueStrategy s =>
Resource s -> Process a -> Process a
usingResource Resource s
res forall a b. (a -> b) -> a -> b
$
do Stream a
p <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Stream a)
ref
(a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
do Bool
f <- a -> Event Bool
pred a
a
if Bool
f
then do forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Stream a)
ref Stream a
xs
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just a
a
else do forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Stream a)
ref forall a b. (a -> b) -> a -> b
$ forall a. Process (a, Stream a) -> Stream a
Cons (forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
xs))
forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
case Maybe a
a of
Just a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Maybe a
Nothing -> (a -> Event Bool) -> Process a
reader a -> Event Bool
pred
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. (a -> b) -> [a] -> [b]
map (forall a. Process a -> Stream a
repeatProcess forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Event Bool) -> Process a
reader) [a -> Event Bool]
preds
concatStreams :: [Stream a] -> Stream a
concatStreams :: forall a. [Stream a] -> Stream a
concatStreams = forall s a. EnqueueStrategy s => s -> [Stream a] -> Stream a
concatQueuedStreams FCFS
FCFS
concatQueuedStreams :: EnqueueStrategy s
=> s
-> [Stream a]
-> Stream a
concatQueuedStreams :: forall s a. EnqueueStrategy s => s -> [Stream a] -> Stream a
concatQueuedStreams s
s [Stream a]
streams = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
z where
z :: Process (a, Stream a)
z = do Resource FCFS
reading <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
Resource s
writing <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount s
s Int
1 (forall a. a -> Maybe a
Just Int
1)
Resource FCFS
conting <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
IORef (Maybe a)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing
let writer :: Stream a -> Process b
writer Stream a
p =
do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p
forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource s
writing
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref (forall a. a -> Maybe a
Just a
a)
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
reading
forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
conting
Stream a -> Process b
writer Stream a
xs
reader :: Process a
reader =
do forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
reading
Just a
a <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref forall a. Maybe a
Nothing
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource s
writing
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Stream a]
streams forall a b. (a -> b) -> a -> b
$ Process () -> Process ()
spawnProcess forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall {b}. Stream a -> Process b
writer
a
a <- Process a
reader
let xs :: Stream a
xs = forall a. Process a -> Stream a
repeatProcess (forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
conting forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process a
reader)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
xs)
concatPriorityStreams :: PriorityQueueStrategy s p
=> s
-> [Stream (p, a)]
-> Stream a
concatPriorityStreams :: forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream (p, a)] -> Stream a
concatPriorityStreams s
s [Stream (p, a)]
streams = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
z where
z :: Process (a, Stream a)
z = do Resource FCFS
reading <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
Resource s
writing <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount s
s Int
1 (forall a. a -> Maybe a
Just Int
1)
Resource FCFS
conting <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
IORef (Maybe a)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing
let writer :: Stream (p, a) -> Process b
writer Stream (p, a)
p =
do ((p
priority, a
a), Stream (p, a)
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream (p, a)
p
forall s p.
PriorityQueueStrategy s p =>
Resource s -> p -> Process ()
requestResourceWithPriority Resource s
writing p
priority
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref (forall a. a -> Maybe a
Just a
a)
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
reading
forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
conting
Stream (p, a) -> Process b
writer Stream (p, a)
xs
reader :: Process a
reader =
do forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
reading
Just a
a <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref forall a. Maybe a
Nothing
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource s
writing
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Stream (p, a)]
streams forall a b. (a -> b) -> a -> b
$ Process () -> Process ()
spawnProcess forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall {p} {b}.
PriorityQueueStrategy s p =>
Stream (p, a) -> Process b
writer
a
a <- Process a
reader
let xs :: Stream a
xs = forall a. Process a -> Stream a
repeatProcess (forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
conting forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process a
reader)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
xs)
mergeStreams :: Stream a -> Stream a -> Stream a
mergeStreams :: forall a. Stream a -> Stream a -> Stream a
mergeStreams = forall s a.
EnqueueStrategy s =>
s -> Stream a -> Stream a -> Stream a
mergeQueuedStreams FCFS
FCFS
mergeQueuedStreams :: EnqueueStrategy s
=> s
-> Stream a
-> Stream a
-> Stream a
mergeQueuedStreams :: forall s a.
EnqueueStrategy s =>
s -> Stream a -> Stream a -> Stream a
mergeQueuedStreams s
s Stream a
x Stream a
y = forall s a. EnqueueStrategy s => s -> [Stream a] -> Stream a
concatQueuedStreams s
s [Stream a
x, Stream a
y]
mergePriorityStreams :: PriorityQueueStrategy s p
=> s
-> Stream (p, a)
-> Stream (p, a)
-> Stream a
mergePriorityStreams :: forall s p a.
PriorityQueueStrategy s p =>
s -> Stream (p, a) -> Stream (p, a) -> Stream a
mergePriorityStreams s
s Stream (p, a)
x Stream (p, a)
y = forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream (p, a)] -> Stream a
concatPriorityStreams s
s [Stream (p, a)
x, Stream (p, a)
y]
emptyStream :: Stream a
emptyStream :: forall a. Stream a
emptyStream = forall a. Process (a, Stream a) -> Stream a
Cons forall a. Process a
neverProcess
consumeStream :: (a -> Process ()) -> Stream a -> Process ()
consumeStream :: forall a. (a -> Process ()) -> Stream a -> Process ()
consumeStream a -> Process ()
f = forall {b}. Stream a -> Process b
p where
p :: Stream a -> Process b
p (Cons Process (a, Stream a)
s) = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
a -> Process ()
f a
a
Stream a -> Process b
p Stream a
xs
sinkStream :: Stream a -> Process ()
sinkStream :: forall a. Stream a -> Process ()
sinkStream = forall {a} {b}. Stream a -> Process b
p where
p :: Stream a -> Process b
p (Cons Process (a, Stream a)
s) = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
Stream a -> Process b
p Stream a
xs
prefetchStream :: Stream a -> Stream a
prefetchStream :: forall a. Stream a -> Stream a
prefetchStream Stream a
s = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
z where
z :: Process (a, Stream a)
z = do Resource FCFS
reading <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
Resource FCFS
writing <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
1 (forall a. a -> Maybe a
Just Int
1)
IORef (Maybe a)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing
let writer :: Stream a -> Process b
writer Stream a
p =
do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p
forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
writing
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref (forall a. a -> Maybe a
Just a
a)
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
reading
Stream a -> Process b
writer Stream a
xs
reader :: Process a
reader =
do forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
reading
Just a
a <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref forall a. Maybe a
Nothing
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
writing
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Process () -> Process ()
spawnProcess forall a b. (a -> b) -> a -> b
$ forall {b}. Stream a -> Process b
writer Stream a
s
forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a. Process a -> Stream a
repeatProcess Process a
reader
queuedSignalStream :: (a -> Event ())
-> Process a
-> Signal a
-> Composite (Stream a)
queuedSignalStream :: forall a.
(a -> Event ()) -> Process a -> Signal a -> Composite (Stream a)
queuedSignalStream a -> Event ()
enqueue Process a
dequeue Signal a
s =
do DisposableEvent
h <- forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
forall a. Signal a -> (a -> Event ()) -> Event DisposableEvent
handleSignal Signal a
s a -> Event ()
enqueue
DisposableEvent -> Composite ()
disposableComposite DisposableEvent
h
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. Process a -> Stream a
repeatProcess Process a
dequeue
signalStream :: Signal a -> Composite (Stream a)
signalStream :: forall a. Signal a -> Composite (Stream a)
signalStream Signal a
s =
do FCFSQueue a
q <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a. Simulation (FCFSQueue a)
IQ.newFCFSQueue
forall a.
(a -> Event ()) -> Process a -> Signal a -> Composite (Stream a)
queuedSignalStream (forall sm so a.
(EnqueueStrategy sm, DequeueStrategy so) =>
Queue sm so a -> a -> Event ()
IQ.enqueue FCFSQueue a
q) (forall sm so a.
(DequeueStrategy sm, EnqueueStrategy so) =>
Queue sm so a -> Process a
IQ.dequeue FCFSQueue a
q) Signal a
s
streamSignal :: Stream a -> Composite (Signal a)
streamSignal :: forall a. Stream a -> Composite (Signal a)
streamSignal Stream a
z =
do SignalSource a
s <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a. Simulation (SignalSource a)
newSignalSource
ProcessId
pid <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation Simulation ProcessId
newProcessId
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
ProcessId -> Process () -> Event ()
runProcessUsingId ProcessId
pid forall a b. (a -> b) -> a -> b
$
forall a. (a -> Process ()) -> Stream a -> Process ()
consumeStream (forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. SignalSource a -> a -> Event ()
triggerSignal SignalSource a
s) Stream a
z
DisposableEvent -> Composite ()
disposableComposite forall a b. (a -> b) -> a -> b
$
Event () -> DisposableEvent
DisposableEvent forall a b. (a -> b) -> a -> b
$
ProcessId -> Event ()
cancelProcessWithId ProcessId
pid
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. SignalSource a -> Signal a
publishSignal SignalSource a
s
arrivalStream :: Stream a -> Stream (Arrival a)
arrivalStream :: forall a. Stream a -> Stream (Arrival a)
arrivalStream Stream a
s = forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ forall {a}.
Stream a -> Maybe Double -> Process (Arrival a, Stream (Arrival a))
loop Stream a
s forall a. Maybe a
Nothing where
loop :: Stream a -> Maybe Double -> Process (Arrival a, Stream (Arrival a))
loop Stream a
s Maybe Double
t0 = do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
Double
t <- forall (m :: * -> *) a. DynamicsLift m => Dynamics a -> m a
liftDynamics Dynamics Double
time
let b :: Arrival a
b = Arrival { arrivalValue :: a
arrivalValue = a
a,
arrivalTime :: Double
arrivalTime = Double
t,
arrivalDelay :: Maybe Double
arrivalDelay =
case Maybe Double
t0 of
Maybe Double
Nothing -> forall a. Maybe a
Nothing
Just Double
t0 -> forall a. a -> Maybe a
Just (Double
t forall a. Num a => a -> a -> a
- Double
t0) }
forall (m :: * -> *) a. Monad m => a -> m a
return (Arrival a
b, forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ Stream a -> Maybe Double -> Process (Arrival a, Stream (Arrival a))
loop Stream a
xs (forall a. a -> Maybe a
Just Double
t))
delayStream :: a -> Stream a -> Stream a
delayStream :: forall a. a -> Stream a -> Stream a
delayStream a
a0 Stream a
s = forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return (a
a0, Stream a
s)
singletonStream :: a -> Stream a
singletonStream :: forall a. a -> Stream a
singletonStream a
a = forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. Stream a
emptyStream)
joinStream :: Process (Stream a) -> Stream a
joinStream :: forall a. Process (Stream a) -> Stream a
joinStream Process (Stream a)
m = forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ Process (Stream a)
m forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. Stream a -> Process (a, Stream a)
runStream
failoverStream :: [Stream a] -> Stream a
failoverStream :: forall a. [Stream a] -> Stream a
failoverStream [Stream a]
ps = forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
z where
z :: Process (a, Stream a)
z = do Resource FCFS
reading <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
Resource FCFS
writing <- forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation forall a b. (a -> b) -> a -> b
$ forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (forall a. a -> Maybe a
Just Int
1)
IORef (Maybe a)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a. Maybe a
Nothing
ProcessId
pid <- Process ProcessId
processId
let writer :: Stream a -> Process b
writer Stream a
p =
do forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
writing
ProcessId
pid' <- Process ProcessId
processId
(a
a, Stream a
xs) <-
forall a b. Process a -> Process b -> Process a
finallyProcess (forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p) forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
do Bool
cancelled' <- ProcessId -> Event Bool
processCancelled ProcessId
pid'
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cancelled' forall a b. (a -> b) -> a -> b
$
forall s. DequeueStrategy s => Resource s -> Event ()
releaseResourceWithinEvent Resource FCFS
writing
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref (forall a. a -> Maybe a
Just a
a)
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
reading
Stream a -> Process b
writer Stream a
xs
reader :: Process a
reader =
do forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
writing
forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
reading
Just a
a <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref forall a. Maybe a
Nothing
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
loop :: [Stream a] -> Process ()
loop [] = forall (m :: * -> *) a. Monad m => a -> m a
return ()
loop (Stream a
p: [Stream a]
ps) =
do ProcessId
pid' <- Process ProcessId
processId
DisposableEvent
h' <- forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
forall a. Signal a -> (a -> Event ()) -> Event DisposableEvent
handleSignal (ProcessId -> Signal ()
processCancelling ProcessId
pid) forall a b. (a -> b) -> a -> b
$ \() ->
ProcessId -> Event ()
cancelProcessWithId ProcessId
pid'
forall a b. Process a -> Process b -> Process a
finallyProcess (forall {b}. Stream a -> Process b
writer Stream a
p) forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$
do DisposableEvent -> Event ()
disposeEvent DisposableEvent
h'
Bool
cancelled <- ProcessId -> Event Bool
processCancelled ProcessId
pid
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
cancelled forall a b. (a -> b) -> a -> b
$
do Bool
cancelled' <- ProcessId -> Event Bool
processCancelled ProcessId
pid'
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
cancelled' forall a b. (a -> b) -> a -> b
$
forall a. HasCallStack => [Char] -> a
error [Char]
"Expected the sub-process to be cancelled: failoverStream"
Process () -> Event ()
runProcess forall a b. (a -> b) -> a -> b
$ [Stream a] -> Process ()
loop [Stream a]
ps
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$ Process () -> Event ()
runProcess forall a b. (a -> b) -> a -> b
$ [Stream a] -> Process ()
loop [Stream a]
ps
forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a. Process a -> Stream a
repeatProcess Process a
reader
takeStream :: Int -> Stream a -> Stream a
takeStream :: forall a. Int -> Stream a -> Stream a
takeStream Int
n Stream a
s
| Int
n forall a. Ord a => a -> a -> Bool
<= Int
0 = forall a. Stream a
emptyStream
| Bool
otherwise =
forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. Int -> Stream a -> Stream a
takeStream (Int
n forall a. Num a => a -> a -> a
- Int
1) Stream a
xs)
takeStreamWhile :: (a -> Bool) -> Stream a -> Stream a
takeStreamWhile :: forall a. (a -> Bool) -> Stream a -> Stream a
takeStreamWhile a -> Bool
p Stream a
s =
forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
if a -> Bool
p a
a
then forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. (a -> Bool) -> Stream a -> Stream a
takeStreamWhile a -> Bool
p Stream a
xs)
else forall a. Process a
neverProcess
takeStreamWhileM :: (a -> Process Bool) -> Stream a -> Stream a
takeStreamWhileM :: forall a. (a -> Process Bool) -> Stream a -> Stream a
takeStreamWhileM a -> Process Bool
p Stream a
s =
forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
Bool
f <- a -> Process Bool
p a
a
if Bool
f
then forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. (a -> Process Bool) -> Stream a -> Stream a
takeStreamWhileM a -> Process Bool
p Stream a
xs)
else forall a. Process a
neverProcess
dropStream :: Int -> Stream a -> Stream a
dropStream :: forall a. Int -> Stream a -> Stream a
dropStream Int
n Stream a
s
| Int
n forall a. Ord a => a -> a -> Bool
<= Int
0 = Stream a
s
| Bool
otherwise =
forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a. Int -> Stream a -> Stream a
dropStream (Int
n forall a. Num a => a -> a -> a
- Int
1) Stream a
xs
dropStreamWhile :: (a -> Bool) -> Stream a -> Stream a
dropStreamWhile :: forall a. (a -> Bool) -> Stream a -> Stream a
dropStreamWhile a -> Bool
p Stream a
s =
forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
if a -> Bool
p a
a
then forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a. (a -> Bool) -> Stream a -> Stream a
dropStreamWhile a -> Bool
p Stream a
xs
else forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
xs)
dropStreamWhileM :: (a -> Process Bool) -> Stream a -> Stream a
dropStreamWhileM :: forall a. (a -> Process Bool) -> Stream a -> Stream a
dropStreamWhileM a -> Process Bool
p Stream a
s =
forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$
do (a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
Bool
f <- a -> Process Bool
p a
a
if Bool
f
then forall a. Stream a -> Process (a, Stream a)
runStream forall a b. (a -> b) -> a -> b
$ forall a. (a -> Process Bool) -> Stream a -> Stream a
dropStreamWhileM a -> Process Bool
p Stream a
xs
else forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
xs)
cloneStream :: Int -> Stream a -> Simulation [Stream a]
cloneStream :: forall a. Int -> Stream a -> Simulation [Stream a]
cloneStream Int
n Stream a
s =
do [FCFSQueue a]
qs <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1..Int
n] forall a b. (a -> b) -> a -> b
$ \Int
i -> forall a. Simulation (FCFSQueue a)
IQ.newFCFSQueue
Resource FCFS
rs <- Int -> Simulation (Resource FCFS)
newFCFSResource Int
1
IORef (Stream a)
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef Stream a
s
let reader :: a -> Queue sm so a -> Process a
reader a
m Queue sm so a
q =
do Maybe a
a <- forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall sm so a.
DequeueStrategy sm =>
Queue sm so a -> Event (Maybe a)
IQ.tryDequeue Queue sm so a
q
case Maybe a
a of
Just a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Maybe a
Nothing ->
forall s a.
EnqueueStrategy s =>
Resource s -> Process a -> Process a
usingResource Resource FCFS
rs forall a b. (a -> b) -> a -> b
$
do Maybe a
a <- forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall sm so a.
DequeueStrategy sm =>
Queue sm so a -> Event (Maybe a)
IQ.tryDequeue Queue sm so a
q
case Maybe a
a of
Just a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Maybe a
Nothing ->
do Stream a
s <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef (Stream a)
ref
(a
a, Stream a
xs) <- forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef (Stream a)
ref Stream a
xs
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (forall a b. [a] -> [b] -> [(a, b)]
zip [a
1..] [FCFSQueue a]
qs) forall a b. (a -> b) -> a -> b
$ \(a
i, FCFSQueue a
q) ->
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (a
i forall a. Eq a => a -> a -> Bool
== a
m) forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent forall a b. (a -> b) -> a -> b
$ forall sm so a.
(EnqueueStrategy sm, DequeueStrategy so) =>
Queue sm so a -> a -> Event ()
IQ.enqueue FCFSQueue a
q a
a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM (forall a b. [a] -> [b] -> [(a, b)]
zip [Integer
1..] [FCFSQueue a]
qs) forall a b. (a -> b) -> a -> b
$ \(Integer
i, FCFSQueue a
q) ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. Process a -> Stream a
repeatProcess forall a b. (a -> b) -> a -> b
$ forall {sm} {a} {so}.
(DequeueStrategy sm, Num a, Enum a, Eq a) =>
a -> Queue sm so a -> Process a
reader Integer
i FCFSQueue a
q
firstArrivalStream :: Int -> Stream a -> Stream a
firstArrivalStream :: forall a. Int -> Stream a -> Stream a
firstArrivalStream Int
n Stream a
s = forall acc a b.
(acc -> a -> Process (acc, Maybe b)) -> acc -> Stream a -> Stream b
assembleAccumStream forall {m :: * -> *} {a}.
Monad m =>
(Int, Maybe a) -> a -> m ((Int, Maybe a), Maybe a)
f (Int
1, forall a. Maybe a
Nothing) Stream a
s
where f :: (Int, Maybe a) -> a -> m ((Int, Maybe a), Maybe a)
f (Int
i, Maybe a
a0) a
a =
let a0' :: Maybe a
a0' = forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a -> a
fromMaybe a
a Maybe a
a0
in if Int
i forall a. Integral a => a -> a -> a
`mod` Int
n forall a. Eq a => a -> a -> Bool
== Int
0
then forall (m :: * -> *) a. Monad m => a -> m a
return ((Int
1, forall a. Maybe a
Nothing), Maybe a
a0')
else forall (m :: * -> *) a. Monad m => a -> m a
return ((Int
i forall a. Num a => a -> a -> a
+ Int
1, Maybe a
a0'), forall a. Maybe a
Nothing)
lastArrivalStream :: Int -> Stream a -> Stream a
lastArrivalStream :: forall a. Int -> Stream a -> Stream a
lastArrivalStream Int
n Stream a
s = forall acc a b.
(acc -> a -> Process (acc, Maybe b)) -> acc -> Stream a -> Stream b
assembleAccumStream forall {m :: * -> *} {a}. Monad m => Int -> a -> m (Int, Maybe a)
f Int
1 Stream a
s
where f :: Int -> a -> m (Int, Maybe a)
f Int
i a
a =
if Int
i forall a. Integral a => a -> a -> a
`mod` Int
n forall a. Eq a => a -> a -> Bool
== Int
0
then forall (m :: * -> *) a. Monad m => a -> m a
return (Int
1, forall a. a -> Maybe a
Just a
a)
else forall (m :: * -> *) a. Monad m => a -> m a
return (Int
i forall a. Num a => a -> a -> a
+ Int
1, forall a. Maybe a
Nothing)
assembleAccumStream :: (acc -> a -> Process (acc, Maybe b)) -> acc -> Stream a -> Stream b
assembleAccumStream :: forall acc a b.
(acc -> a -> Process (acc, Maybe b)) -> acc -> Stream a -> Stream b
assembleAccumStream acc -> a -> Process (acc, Maybe b)
f acc
acc Stream a
s =
forall a b. (a -> b) -> Stream a -> Stream b
mapStream forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$
forall a. (a -> Bool) -> Stream a -> Stream a
filterStream forall a. Maybe a -> Bool
isJust forall a b. (a -> b) -> a -> b
$
forall acc a b.
(acc -> a -> Process (acc, b)) -> acc -> Stream a -> Stream b
accumStream acc -> a -> Process (acc, Maybe b)
f acc
acc Stream a
s
traceStream :: Maybe String
-> Maybe String
-> Stream a
-> Stream a
traceStream :: forall a. Maybe [Char] -> Maybe [Char] -> Stream a -> Stream a
traceStream Maybe [Char]
request Maybe [Char]
response Stream a
s = forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ forall a. Stream a -> Process (a, Stream a)
loop Stream a
s where
loop :: Stream a -> Process (a, Stream a)
loop Stream a
s = do (a
a, Stream a
xs) <-
case Maybe [Char]
request of
Maybe [Char]
Nothing -> forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
Just [Char]
message ->
forall a. [Char] -> Process a -> Process a
traceProcess [Char]
message forall a b. (a -> b) -> a -> b
$
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
case Maybe [Char]
response of
Maybe [Char]
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ Stream a -> Process (a, Stream a)
loop Stream a
xs)
Just [Char]
message ->
forall a. [Char] -> Process a -> Process a
traceProcess [Char]
message forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. Process (a, Stream a) -> Stream a
Cons forall a b. (a -> b) -> a -> b
$ Stream a -> Process (a, Stream a)
loop Stream a
xs)