module Simulation.Aivika.Stream
(
Stream(..),
emptyStream,
mergeStreams,
mergeQueuedStreams,
mergePriorityStreams,
concatStreams,
concatQueuedStreams,
concatPriorityStreams,
splitStream,
splitStreamQueueing,
splitStreamPrioritising,
splitStreamFiltering,
splitStreamFilteringQueueing,
streamUsingId,
prefetchStream,
delayStream,
arrivalStream,
memoStream,
zipStreamSeq,
zipStreamParallel,
zip3StreamSeq,
zip3StreamParallel,
unzipStream,
streamSeq,
streamParallel,
consumeStream,
sinkStream,
repeatProcess,
mapStream,
mapStreamM,
accumStream,
apStream,
apStreamM,
filterStream,
filterStreamM,
takeStream,
takeStreamWhile,
takeStreamWhileM,
dropStream,
dropStreamWhile,
dropStreamWhileM,
singletonStream,
joinStream,
failoverStream,
signalStream,
streamSignal,
queuedSignalStream,
leftStream,
rightStream,
replaceLeftStream,
replaceRightStream,
partitionEitherStream,
cloneStream,
firstArrivalStream,
lastArrivalStream,
assembleAccumStream,
traceStream) where
import Data.IORef
import Data.Maybe
import Data.Monoid hiding ((<>))
import Data.Semigroup (Semigroup(..))
import Data.List.NonEmpty (NonEmpty((:|)))
import Control.Applicative
import Control.Monad
import Control.Monad.Trans
import Simulation.Aivika.Simulation
import Simulation.Aivika.Dynamics
import Simulation.Aivika.Event
import Simulation.Aivika.Composite
import Simulation.Aivika.Cont
import Simulation.Aivika.Process
import Simulation.Aivika.Signal
import Simulation.Aivika.Resource.Base
import Simulation.Aivika.QueueStrategy
import qualified Simulation.Aivika.Queue.Infinite.Base as IQ
import Simulation.Aivika.Internal.Arrival
newtype Stream a = Cons { forall a. Stream a -> Process (a, Stream a)
runStream :: Process (a, Stream a)
}
instance Functor Stream where
fmap :: forall a b. (a -> b) -> Stream a -> Stream b
fmap = (a -> b) -> Stream a -> Stream b
forall a b. (a -> b) -> Stream a -> Stream b
mapStream
instance Applicative Stream where
pure :: forall a. a -> Stream a
pure a
a = let y :: Stream a
y = Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons ((a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
y)) in Stream a
y
<*> :: forall a b. Stream (a -> b) -> Stream a -> Stream b
(<*>) = Stream (a -> b) -> Stream a -> Stream b
forall a b. Stream (a -> b) -> Stream a -> Stream b
apStream
instance Alternative Stream where
empty :: forall a. Stream a
empty = Stream a
forall a. Stream a
emptyStream
<|> :: forall a. Stream a -> Stream a -> Stream a
(<|>) = Stream a -> Stream a -> Stream a
forall a. Stream a -> Stream a -> Stream a
mergeStreams
instance Semigroup (Stream a) where
<> :: Stream a -> Stream a -> Stream a
(<>) = Stream a -> Stream a -> Stream a
forall a. Stream a -> Stream a -> Stream a
mergeStreams
sconcat :: NonEmpty (Stream a) -> Stream a
sconcat (Stream a
h :| [Stream a]
t) = [Stream a] -> Stream a
forall a. [Stream a] -> Stream a
concatStreams (Stream a
h Stream a -> [Stream a] -> [Stream a]
forall a. a -> [a] -> [a]
: [Stream a]
t)
instance Monoid (Stream a) where
mempty :: Stream a
mempty = Stream a
forall a. Stream a
emptyStream
mappend :: Stream a -> Stream a -> Stream a
mappend = Stream a -> Stream a -> Stream a
forall a. Semigroup a => a -> a -> a
(<>)
mconcat :: [Stream a] -> Stream a
mconcat = [Stream a] -> Stream a
forall a. [Stream a] -> Stream a
concatStreams
streamUsingId :: ProcessId -> Stream a -> Stream a
streamUsingId :: forall a. ProcessId -> Stream a -> Stream a
streamUsingId ProcessId
pid (Cons Process (a, Stream a)
s) =
Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons (Process (a, Stream a) -> Stream a)
-> Process (a, Stream a) -> Stream a
forall a b. (a -> b) -> a -> b
$ ProcessId -> Process (a, Stream a) -> Process (a, Stream a)
forall a. ProcessId -> Process a -> Process a
processUsingId ProcessId
pid Process (a, Stream a)
s
memoStream :: Stream a -> Simulation (Stream a)
memoStream :: forall a. Stream a -> Simulation (Stream a)
memoStream (Cons Process (a, Stream a)
s) =
do Process (a, Stream a)
p <- Process (a, Stream a) -> Simulation (Process (a, Stream a))
forall a. Process a -> Simulation (Process a)
memoProcess (Process (a, Stream a) -> Simulation (Process (a, Stream a)))
-> Process (a, Stream a) -> Simulation (Process (a, Stream a))
forall a b. (a -> b) -> a -> b
$
do ~(a
x, Stream a
xs) <- Process (a, Stream a)
s
Stream a
xs' <- Simulation (Stream a) -> Process (Stream a)
forall a. Simulation a -> Process a
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Stream a) -> Process (Stream a))
-> Simulation (Stream a) -> Process (Stream a)
forall a b. (a -> b) -> a -> b
$ Stream a -> Simulation (Stream a)
forall a. Stream a -> Simulation (Stream a)
memoStream Stream a
xs
(a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
x, Stream a
xs')
Stream a -> Simulation (Stream a)
forall a. a -> Simulation a
forall (m :: * -> *) a. Monad m => a -> m a
return (Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
p)
zipStreamSeq :: Stream a -> Stream b -> Stream (a, b)
zipStreamSeq :: forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamSeq (Cons Process (a, Stream a)
sa) (Cons Process (b, Stream b)
sb) = Process ((a, b), Stream (a, b)) -> Stream (a, b)
forall a. Process (a, Stream a) -> Stream a
Cons Process ((a, b), Stream (a, b))
y where
y :: Process ((a, b), Stream (a, b))
y = do ~(a
x, Stream a
xs) <- Process (a, Stream a)
sa
~(b
y, Stream b
ys) <- Process (b, Stream b)
sb
((a, b), Stream (a, b)) -> Process ((a, b), Stream (a, b))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y), Stream a -> Stream b -> Stream (a, b)
forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamSeq Stream a
xs Stream b
ys)
zipStreamParallel :: Stream a -> Stream b -> Stream (a, b)
zipStreamParallel :: forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamParallel (Cons Process (a, Stream a)
sa) (Cons Process (b, Stream b)
sb) = Process ((a, b), Stream (a, b)) -> Stream (a, b)
forall a. Process (a, Stream a) -> Stream a
Cons Process ((a, b), Stream (a, b))
y where
y :: Process ((a, b), Stream (a, b))
y = do ~((a
x, Stream a
xs), (b
y, Stream b
ys)) <- Process (a, Stream a)
-> Process (b, Stream b) -> Process ((a, Stream a), (b, Stream b))
forall a b. Process a -> Process b -> Process (a, b)
zipProcessParallel Process (a, Stream a)
sa Process (b, Stream b)
sb
((a, b), Stream (a, b)) -> Process ((a, b), Stream (a, b))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y), Stream a -> Stream b -> Stream (a, b)
forall a b. Stream a -> Stream b -> Stream (a, b)
zipStreamParallel Stream a
xs Stream b
ys)
zip3StreamSeq :: Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamSeq :: forall a b c. Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamSeq (Cons Process (a, Stream a)
sa) (Cons Process (b, Stream b)
sb) (Cons Process (c, Stream c)
sc) = Process ((a, b, c), Stream (a, b, c)) -> Stream (a, b, c)
forall a. Process (a, Stream a) -> Stream a
Cons Process ((a, b, c), Stream (a, b, c))
y where
y :: Process ((a, b, c), Stream (a, b, c))
y = do ~(a
x, Stream a
xs) <- Process (a, Stream a)
sa
~(b
y, Stream b
ys) <- Process (b, Stream b)
sb
~(c
z, Stream c
zs) <- Process (c, Stream c)
sc
((a, b, c), Stream (a, b, c))
-> Process ((a, b, c), Stream (a, b, c))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y, c
z), Stream a -> Stream b -> Stream c -> Stream (a, b, c)
forall a b c. Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamSeq Stream a
xs Stream b
ys Stream c
zs)
zip3StreamParallel :: Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamParallel :: forall a b c. Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamParallel (Cons Process (a, Stream a)
sa) (Cons Process (b, Stream b)
sb) (Cons Process (c, Stream c)
sc) = Process ((a, b, c), Stream (a, b, c)) -> Stream (a, b, c)
forall a. Process (a, Stream a) -> Stream a
Cons Process ((a, b, c), Stream (a, b, c))
y where
y :: Process ((a, b, c), Stream (a, b, c))
y = do ~((a
x, Stream a
xs), (b
y, Stream b
ys), (c
z, Stream c
zs)) <- Process (a, Stream a)
-> Process (b, Stream b)
-> Process (c, Stream c)
-> Process ((a, Stream a), (b, Stream b), (c, Stream c))
forall a b c.
Process a -> Process b -> Process c -> Process (a, b, c)
zip3ProcessParallel Process (a, Stream a)
sa Process (b, Stream b)
sb Process (c, Stream c)
sc
((a, b, c), Stream (a, b, c))
-> Process ((a, b, c), Stream (a, b, c))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y, c
z), Stream a -> Stream b -> Stream c -> Stream (a, b, c)
forall a b c. Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamParallel Stream a
xs Stream b
ys Stream c
zs)
unzipStream :: Stream (a, b) -> Simulation (Stream a, Stream b)
unzipStream :: forall a b. Stream (a, b) -> Simulation (Stream a, Stream b)
unzipStream Stream (a, b)
s =
do Stream (a, b)
s' <- Stream (a, b) -> Simulation (Stream (a, b))
forall a. Stream a -> Simulation (Stream a)
memoStream Stream (a, b)
s
let sa :: Stream a
sa = ((a, b) -> a) -> Stream (a, b) -> Stream a
forall a b. (a -> b) -> Stream a -> Stream b
mapStream (a, b) -> a
forall a b. (a, b) -> a
fst Stream (a, b)
s'
sb :: Stream b
sb = ((a, b) -> b) -> Stream (a, b) -> Stream b
forall a b. (a -> b) -> Stream a -> Stream b
mapStream (a, b) -> b
forall a b. (a, b) -> b
snd Stream (a, b)
s'
(Stream a, Stream b) -> Simulation (Stream a, Stream b)
forall a. a -> Simulation a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream a
sa, Stream b
sb)
streamSeq :: [Stream a] -> Stream [a]
streamSeq :: forall a. [Stream a] -> Stream [a]
streamSeq [Stream a]
xs = Process ([a], Stream [a]) -> Stream [a]
forall a. Process (a, Stream a) -> Stream a
Cons Process ([a], Stream [a])
y where
y :: Process ([a], Stream [a])
y = do [(a, Stream a)]
ps <- [Stream a]
-> (Stream a -> Process (a, Stream a)) -> Process [(a, Stream a)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Stream a]
xs Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream
([a], Stream [a]) -> Process ([a], Stream [a])
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (((a, Stream a) -> a) -> [(a, Stream a)] -> [a]
forall a b. (a -> b) -> [a] -> [b]
map (a, Stream a) -> a
forall a b. (a, b) -> a
fst [(a, Stream a)]
ps, [Stream a] -> Stream [a]
forall a. [Stream a] -> Stream [a]
streamSeq ([Stream a] -> Stream [a]) -> [Stream a] -> Stream [a]
forall a b. (a -> b) -> a -> b
$ ((a, Stream a) -> Stream a) -> [(a, Stream a)] -> [Stream a]
forall a b. (a -> b) -> [a] -> [b]
map (a, Stream a) -> Stream a
forall a b. (a, b) -> b
snd [(a, Stream a)]
ps)
streamParallel :: [Stream a] -> Stream [a]
streamParallel :: forall a. [Stream a] -> Stream [a]
streamParallel [Stream a]
xs = Process ([a], Stream [a]) -> Stream [a]
forall a. Process (a, Stream a) -> Stream a
Cons Process ([a], Stream [a])
y where
y :: Process ([a], Stream [a])
y = do [(a, Stream a)]
ps <- [Process (a, Stream a)] -> Process [(a, Stream a)]
forall a. [Process a] -> Process [a]
processParallel ([Process (a, Stream a)] -> Process [(a, Stream a)])
-> [Process (a, Stream a)] -> Process [(a, Stream a)]
forall a b. (a -> b) -> a -> b
$ (Stream a -> Process (a, Stream a))
-> [Stream a] -> [Process (a, Stream a)]
forall a b. (a -> b) -> [a] -> [b]
map Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream [Stream a]
xs
([a], Stream [a]) -> Process ([a], Stream [a])
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (((a, Stream a) -> a) -> [(a, Stream a)] -> [a]
forall a b. (a -> b) -> [a] -> [b]
map (a, Stream a) -> a
forall a b. (a, b) -> a
fst [(a, Stream a)]
ps, [Stream a] -> Stream [a]
forall a. [Stream a] -> Stream [a]
streamParallel ([Stream a] -> Stream [a]) -> [Stream a] -> Stream [a]
forall a b. (a -> b) -> a -> b
$ ((a, Stream a) -> Stream a) -> [(a, Stream a)] -> [Stream a]
forall a b. (a -> b) -> [a] -> [b]
map (a, Stream a) -> Stream a
forall a b. (a, b) -> b
snd [(a, Stream a)]
ps)
repeatProcess :: Process a -> Stream a
repeatProcess :: forall a. Process a -> Stream a
repeatProcess Process a
p = Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
y where
y :: Process (a, Stream a)
y = do a
a <- Process a
p
(a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Process a -> Stream a
forall a. Process a -> Stream a
repeatProcess Process a
p)
mapStream :: (a -> b) -> Stream a -> Stream b
mapStream :: forall a b. (a -> b) -> Stream a -> Stream b
mapStream a -> b
f (Cons Process (a, Stream a)
s) = Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons Process (b, Stream b)
y where
y :: Process (b, Stream b)
y = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
(b, Stream b) -> Process (b, Stream b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b
f a
a, (a -> b) -> Stream a -> Stream b
forall a b. (a -> b) -> Stream a -> Stream b
mapStream a -> b
f Stream a
xs)
mapStreamM :: (a -> Process b) -> Stream a -> Stream b
mapStreamM :: forall a b. (a -> Process b) -> Stream a -> Stream b
mapStreamM a -> Process b
f (Cons Process (a, Stream a)
s) = Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons Process (b, Stream b)
y where
y :: Process (b, Stream b)
y = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
b
b <- a -> Process b
f a
a
(b, Stream b) -> Process (b, Stream b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, (a -> Process b) -> Stream a -> Stream b
forall a b. (a -> Process b) -> Stream a -> Stream b
mapStreamM a -> Process b
f Stream a
xs)
accumStream :: (acc -> a -> Process (acc, b)) -> acc -> Stream a -> Stream b
accumStream :: forall acc a b.
(acc -> a -> Process (acc, b)) -> acc -> Stream a -> Stream b
accumStream acc -> a -> Process (acc, b)
f acc
acc Stream a
xs = Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons (Process (b, Stream b) -> Stream b)
-> Process (b, Stream b) -> Stream b
forall a b. (a -> b) -> a -> b
$ Stream a -> acc -> Process (b, Stream b)
loop Stream a
xs acc
acc where
loop :: Stream a -> acc -> Process (b, Stream b)
loop (Cons Process (a, Stream a)
s) acc
acc =
do (a
a, Stream a
xs) <- Process (a, Stream a)
s
(acc
acc', b
b) <- acc -> a -> Process (acc, b)
f acc
acc a
a
(b, Stream b) -> Process (b, Stream b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons (Process (b, Stream b) -> Stream b)
-> Process (b, Stream b) -> Stream b
forall a b. (a -> b) -> a -> b
$ Stream a -> acc -> Process (b, Stream b)
loop Stream a
xs acc
acc')
apStream :: Stream (a -> b) -> Stream a -> Stream b
apStream :: forall a b. Stream (a -> b) -> Stream a -> Stream b
apStream (Cons Process (a -> b, Stream (a -> b))
sf) (Cons Process (a, Stream a)
sa) = Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons Process (b, Stream b)
y where
y :: Process (b, Stream b)
y = do (a -> b
f, Stream (a -> b)
sf') <- Process (a -> b, Stream (a -> b))
sf
(a
a, Stream a
sa') <- Process (a, Stream a)
sa
(b, Stream b) -> Process (b, Stream b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b
f a
a, Stream (a -> b) -> Stream a -> Stream b
forall a b. Stream (a -> b) -> Stream a -> Stream b
apStream Stream (a -> b)
sf' Stream a
sa')
apStreamM :: Stream (a -> Process b) -> Stream a -> Stream b
apStreamM :: forall a b. Stream (a -> Process b) -> Stream a -> Stream b
apStreamM (Cons Process (a -> Process b, Stream (a -> Process b))
sf) (Cons Process (a, Stream a)
sa) = Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons Process (b, Stream b)
y where
y :: Process (b, Stream b)
y = do (a -> Process b
f, Stream (a -> Process b)
sf') <- Process (a -> Process b, Stream (a -> Process b))
sf
(a
a, Stream a
sa') <- Process (a, Stream a)
sa
b
x <- a -> Process b
f a
a
(b, Stream b) -> Process (b, Stream b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (b
x, Stream (a -> Process b) -> Stream a -> Stream b
forall a b. Stream (a -> Process b) -> Stream a -> Stream b
apStreamM Stream (a -> Process b)
sf' Stream a
sa')
filterStream :: (a -> Bool) -> Stream a -> Stream a
filterStream :: forall a. (a -> Bool) -> Stream a -> Stream a
filterStream a -> Bool
p (Cons Process (a, Stream a)
s) = Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
y where
y :: Process (a, Stream a)
y = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
if a -> Bool
p a
a
then (a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, (a -> Bool) -> Stream a -> Stream a
forall a. (a -> Bool) -> Stream a -> Stream a
filterStream a -> Bool
p Stream a
xs)
else let Cons Process (a, Stream a)
z = (a -> Bool) -> Stream a -> Stream a
forall a. (a -> Bool) -> Stream a -> Stream a
filterStream a -> Bool
p Stream a
xs in Process (a, Stream a)
z
filterStreamM :: (a -> Process Bool) -> Stream a -> Stream a
filterStreamM :: forall a. (a -> Process Bool) -> Stream a -> Stream a
filterStreamM a -> Process Bool
p (Cons Process (a, Stream a)
s) = Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
y where
y :: Process (a, Stream a)
y = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
Bool
b <- a -> Process Bool
p a
a
if Bool
b
then (a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, (a -> Process Bool) -> Stream a -> Stream a
forall a. (a -> Process Bool) -> Stream a -> Stream a
filterStreamM a -> Process Bool
p Stream a
xs)
else let Cons Process (a, Stream a)
z = (a -> Process Bool) -> Stream a -> Stream a
forall a. (a -> Process Bool) -> Stream a -> Stream a
filterStreamM a -> Process Bool
p Stream a
xs in Process (a, Stream a)
z
leftStream :: Stream (Either a b) -> Stream a
leftStream :: forall a b. Stream (Either a b) -> Stream a
leftStream (Cons Process (Either a b, Stream (Either a b))
s) = Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
y where
y :: Process (a, Stream a)
y = do (Either a b
a, Stream (Either a b)
xs) <- Process (Either a b, Stream (Either a b))
s
case Either a b
a of
Left a
a -> (a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream (Either a b) -> Stream a
forall a b. Stream (Either a b) -> Stream a
leftStream Stream (Either a b)
xs)
Right b
_ -> let Cons Process (a, Stream a)
z = Stream (Either a b) -> Stream a
forall a b. Stream (Either a b) -> Stream a
leftStream Stream (Either a b)
xs in Process (a, Stream a)
z
rightStream :: Stream (Either a b) -> Stream b
rightStream :: forall a b. Stream (Either a b) -> Stream b
rightStream (Cons Process (Either a b, Stream (Either a b))
s) = Process (b, Stream b) -> Stream b
forall a. Process (a, Stream a) -> Stream a
Cons Process (b, Stream b)
y where
y :: Process (b, Stream b)
y = do (Either a b
a, Stream (Either a b)
xs) <- Process (Either a b, Stream (Either a b))
s
case Either a b
a of
Left a
_ -> let Cons Process (b, Stream b)
z = Stream (Either a b) -> Stream b
forall a b. Stream (Either a b) -> Stream b
rightStream Stream (Either a b)
xs in Process (b, Stream b)
z
Right b
a -> (b, Stream b) -> Process (b, Stream b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (b
a, Stream (Either a b) -> Stream b
forall a b. Stream (Either a b) -> Stream b
rightStream Stream (Either a b)
xs)
replaceLeftStream :: Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream :: forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream (Cons Process (Either a b, Stream (Either a b))
sab) (ys0 :: Stream c
ys0@(Cons Process (c, Stream c)
sc)) = Process (Either c b, Stream (Either c b)) -> Stream (Either c b)
forall a. Process (a, Stream a) -> Stream a
Cons Process (Either c b, Stream (Either c b))
z where
z :: Process (Either c b, Stream (Either c b))
z = do (Either a b
a, Stream (Either a b)
xs) <- Process (Either a b, Stream (Either a b))
sab
case Either a b
a of
Left a
_ ->
do (c
b, Stream c
ys) <- Process (c, Stream c)
sc
(Either c b, Stream (Either c b))
-> Process (Either c b, Stream (Either c b))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (c -> Either c b
forall a b. a -> Either a b
Left c
b, Stream (Either a b) -> Stream c -> Stream (Either c b)
forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream Stream (Either a b)
xs Stream c
ys)
Right b
a ->
(Either c b, Stream (Either c b))
-> Process (Either c b, Stream (Either c b))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either c b
forall a b. b -> Either a b
Right b
a, Stream (Either a b) -> Stream c -> Stream (Either c b)
forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream Stream (Either a b)
xs Stream c
ys0)
replaceRightStream :: Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream :: forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream (Cons Process (Either a b, Stream (Either a b))
sab) (ys0 :: Stream c
ys0@(Cons Process (c, Stream c)
sc)) = Process (Either a c, Stream (Either a c)) -> Stream (Either a c)
forall a. Process (a, Stream a) -> Stream a
Cons Process (Either a c, Stream (Either a c))
z where
z :: Process (Either a c, Stream (Either a c))
z = do (Either a b
a, Stream (Either a b)
xs) <- Process (Either a b, Stream (Either a b))
sab
case Either a b
a of
Right b
_ ->
do (c
b, Stream c
ys) <- Process (c, Stream c)
sc
(Either a c, Stream (Either a c))
-> Process (Either a c, Stream (Either a c))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (c -> Either a c
forall a b. b -> Either a b
Right c
b, Stream (Either a b) -> Stream c -> Stream (Either a c)
forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream Stream (Either a b)
xs Stream c
ys)
Left a
a ->
(Either a c, Stream (Either a c))
-> Process (Either a c, Stream (Either a c))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Either a c
forall a b. a -> Either a b
Left a
a, Stream (Either a b) -> Stream c -> Stream (Either a c)
forall a b c.
Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream Stream (Either a b)
xs Stream c
ys0)
partitionEitherStream :: Stream (Either a b) -> Simulation (Stream a, Stream b)
partitionEitherStream :: forall a b. Stream (Either a b) -> Simulation (Stream a, Stream b)
partitionEitherStream Stream (Either a b)
s =
do Stream (Either a b)
s' <- Stream (Either a b) -> Simulation (Stream (Either a b))
forall a. Stream a -> Simulation (Stream a)
memoStream Stream (Either a b)
s
(Stream a, Stream b) -> Simulation (Stream a, Stream b)
forall a. a -> Simulation a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream (Either a b) -> Stream a
forall a b. Stream (Either a b) -> Stream a
leftStream Stream (Either a b)
s', Stream (Either a b) -> Stream b
forall a b. Stream (Either a b) -> Stream b
rightStream Stream (Either a b)
s')
splitStream :: Int -> Stream a -> Simulation [Stream a]
splitStream :: forall a. Int -> Stream a -> Simulation [Stream a]
splitStream = FCFS -> Int -> Stream a -> Simulation [Stream a]
forall s a.
EnqueueStrategy s =>
s -> Int -> Stream a -> Simulation [Stream a]
splitStreamQueueing FCFS
FCFS
splitStreamQueueing :: EnqueueStrategy s
=> s
-> Int
-> Stream a
-> Simulation [Stream a]
splitStreamQueueing :: forall s a.
EnqueueStrategy s =>
s -> Int -> Stream a -> Simulation [Stream a]
splitStreamQueueing s
s Int
n Stream a
x =
do IORef (Stream a)
ref <- IO (IORef (Stream a)) -> Simulation (IORef (Stream a))
forall a. IO a -> Simulation a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Stream a)) -> Simulation (IORef (Stream a)))
-> IO (IORef (Stream a)) -> Simulation (IORef (Stream a))
forall a b. (a -> b) -> a -> b
$ Stream a -> IO (IORef (Stream a))
forall a. a -> IO (IORef a)
newIORef Stream a
x
Resource s
res <- s -> Int -> Simulation (Resource s)
forall s. QueueStrategy s => s -> Int -> Simulation (Resource s)
newResource s
s Int
1
let reader :: Process a
reader =
Resource s -> Process a -> Process a
forall s a.
EnqueueStrategy s =>
Resource s -> Process a -> Process a
usingResource Resource s
res (Process a -> Process a) -> Process a -> Process a
forall a b. (a -> b) -> a -> b
$
do Stream a
p <- IO (Stream a) -> Process (Stream a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Stream a) -> Process (Stream a))
-> IO (Stream a) -> Process (Stream a)
forall a b. (a -> b) -> a -> b
$ IORef (Stream a) -> IO (Stream a)
forall a. IORef a -> IO a
readIORef IORef (Stream a)
ref
(a
a, Stream a
xs) <- Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Stream a) -> Stream a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Stream a)
ref Stream a
xs
a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
[Stream a] -> Simulation [Stream a]
forall a. a -> Simulation a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Stream a] -> Simulation [Stream a])
-> [Stream a] -> Simulation [Stream a]
forall a b. (a -> b) -> a -> b
$ (Int -> Stream a) -> [Int] -> [Stream a]
forall a b. (a -> b) -> [a] -> [b]
map (\Int
i -> Process a -> Stream a
forall a. Process a -> Stream a
repeatProcess Process a
reader) [Int
1..Int
n]
splitStreamPrioritising :: PriorityQueueStrategy s p
=> s
-> [Stream p]
-> Stream a
-> Simulation [Stream a]
splitStreamPrioritising :: forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream p] -> Stream a -> Simulation [Stream a]
splitStreamPrioritising s
s [Stream p]
ps Stream a
x =
do IORef (Stream a)
ref <- IO (IORef (Stream a)) -> Simulation (IORef (Stream a))
forall a. IO a -> Simulation a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Stream a)) -> Simulation (IORef (Stream a)))
-> IO (IORef (Stream a)) -> Simulation (IORef (Stream a))
forall a b. (a -> b) -> a -> b
$ Stream a -> IO (IORef (Stream a))
forall a. a -> IO (IORef a)
newIORef Stream a
x
Resource s
res <- s -> Int -> Simulation (Resource s)
forall s. QueueStrategy s => s -> Int -> Simulation (Resource s)
newResource s
s Int
1
let stream :: Stream p -> Stream a
stream (Cons Process (p, Stream p)
p) = Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
z where
z :: Process (a, Stream a)
z = do (p
p', Stream p
ps) <- Process (p, Stream p)
p
a
a <- Resource s -> p -> Process a -> Process a
forall s p a.
PriorityQueueStrategy s p =>
Resource s -> p -> Process a -> Process a
usingResourceWithPriority Resource s
res p
p' (Process a -> Process a) -> Process a -> Process a
forall a b. (a -> b) -> a -> b
$
do Stream a
p <- IO (Stream a) -> Process (Stream a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Stream a) -> Process (Stream a))
-> IO (Stream a) -> Process (Stream a)
forall a b. (a -> b) -> a -> b
$ IORef (Stream a) -> IO (Stream a)
forall a. IORef a -> IO a
readIORef IORef (Stream a)
ref
(a
a, Stream a
xs) <- Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Stream a) -> Stream a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Stream a)
ref Stream a
xs
a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
(a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream p -> Stream a
stream Stream p
ps)
[Stream a] -> Simulation [Stream a]
forall a. a -> Simulation a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Stream a] -> Simulation [Stream a])
-> [Stream a] -> Simulation [Stream a]
forall a b. (a -> b) -> a -> b
$ (Stream p -> Stream a) -> [Stream p] -> [Stream a]
forall a b. (a -> b) -> [a] -> [b]
map Stream p -> Stream a
stream [Stream p]
ps
splitStreamFiltering :: [a -> Event Bool] -> Stream a -> Simulation [Stream a]
splitStreamFiltering :: forall a. [a -> Event Bool] -> Stream a -> Simulation [Stream a]
splitStreamFiltering = FCFS -> [a -> Event Bool] -> Stream a -> Simulation [Stream a]
forall s a.
EnqueueStrategy s =>
s -> [a -> Event Bool] -> Stream a -> Simulation [Stream a]
splitStreamFilteringQueueing FCFS
FCFS
splitStreamFilteringQueueing :: EnqueueStrategy s
=> s
-> [a -> Event Bool]
-> Stream a
-> Simulation [Stream a]
splitStreamFilteringQueueing :: forall s a.
EnqueueStrategy s =>
s -> [a -> Event Bool] -> Stream a -> Simulation [Stream a]
splitStreamFilteringQueueing s
s [a -> Event Bool]
preds Stream a
x =
do IORef (Stream a)
ref <- IO (IORef (Stream a)) -> Simulation (IORef (Stream a))
forall a. IO a -> Simulation a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Stream a)) -> Simulation (IORef (Stream a)))
-> IO (IORef (Stream a)) -> Simulation (IORef (Stream a))
forall a b. (a -> b) -> a -> b
$ Stream a -> IO (IORef (Stream a))
forall a. a -> IO (IORef a)
newIORef Stream a
x
Resource s
res <- s -> Int -> Simulation (Resource s)
forall s. QueueStrategy s => s -> Int -> Simulation (Resource s)
newResource s
s Int
1
let reader :: (a -> Event Bool) -> Process a
reader a -> Event Bool
pred =
do Maybe a
a <-
Resource s -> Process (Maybe a) -> Process (Maybe a)
forall s a.
EnqueueStrategy s =>
Resource s -> Process a -> Process a
usingResource Resource s
res (Process (Maybe a) -> Process (Maybe a))
-> Process (Maybe a) -> Process (Maybe a)
forall a b. (a -> b) -> a -> b
$
do Stream a
p <- IO (Stream a) -> Process (Stream a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Stream a) -> Process (Stream a))
-> IO (Stream a) -> Process (Stream a)
forall a b. (a -> b) -> a -> b
$ IORef (Stream a) -> IO (Stream a)
forall a. IORef a -> IO a
readIORef IORef (Stream a)
ref
(a
a, Stream a
xs) <- Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p
Event (Maybe a) -> Process (Maybe a)
forall a. Event a -> Process a
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event (Maybe a) -> Process (Maybe a))
-> Event (Maybe a) -> Process (Maybe a)
forall a b. (a -> b) -> a -> b
$
do Bool
f <- a -> Event Bool
pred a
a
if Bool
f
then do IO () -> Event ()
forall a. IO a -> Event a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Event ()) -> IO () -> Event ()
forall a b. (a -> b) -> a -> b
$ IORef (Stream a) -> Stream a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Stream a)
ref Stream a
xs
Maybe a -> Event (Maybe a)
forall a. a -> Event a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> Event (Maybe a)) -> Maybe a -> Event (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
a
else do IO () -> Event ()
forall a. IO a -> Event a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Event ()) -> IO () -> Event ()
forall a b. (a -> b) -> a -> b
$ IORef (Stream a) -> Stream a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Stream a)
ref (Stream a -> IO ()) -> Stream a -> IO ()
forall a b. (a -> b) -> a -> b
$ Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons ((a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
xs))
Maybe a -> Event (Maybe a)
forall a. a -> Event a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
case Maybe a
a of
Just a
a -> a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Maybe a
Nothing -> (a -> Event Bool) -> Process a
reader a -> Event Bool
pred
[Stream a] -> Simulation [Stream a]
forall a. a -> Simulation a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Stream a] -> Simulation [Stream a])
-> [Stream a] -> Simulation [Stream a]
forall a b. (a -> b) -> a -> b
$ ((a -> Event Bool) -> Stream a) -> [a -> Event Bool] -> [Stream a]
forall a b. (a -> b) -> [a] -> [b]
map (Process a -> Stream a
forall a. Process a -> Stream a
repeatProcess (Process a -> Stream a)
-> ((a -> Event Bool) -> Process a)
-> (a -> Event Bool)
-> Stream a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Event Bool) -> Process a
reader) [a -> Event Bool]
preds
concatStreams :: [Stream a] -> Stream a
concatStreams :: forall a. [Stream a] -> Stream a
concatStreams = FCFS -> [Stream a] -> Stream a
forall s a. EnqueueStrategy s => s -> [Stream a] -> Stream a
concatQueuedStreams FCFS
FCFS
concatQueuedStreams :: EnqueueStrategy s
=> s
-> [Stream a]
-> Stream a
concatQueuedStreams :: forall s a. EnqueueStrategy s => s -> [Stream a] -> Stream a
concatQueuedStreams s
s [Stream a]
streams = Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
z where
z :: Process (a, Stream a)
z = do Resource FCFS
reading <- Simulation (Resource FCFS) -> Process (Resource FCFS)
forall a. Simulation a -> Process a
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Resource FCFS) -> Process (Resource FCFS))
-> Simulation (Resource FCFS) -> Process (Resource FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation (Resource FCFS)
forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
Resource s
writing <- Simulation (Resource s) -> Process (Resource s)
forall a. Simulation a -> Process a
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Resource s) -> Process (Resource s))
-> Simulation (Resource s) -> Process (Resource s)
forall a b. (a -> b) -> a -> b
$ s -> Int -> Maybe Int -> Simulation (Resource s)
forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount s
s Int
1 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
Resource FCFS
conting <- Simulation (Resource FCFS) -> Process (Resource FCFS)
forall a. Simulation a -> Process a
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Resource FCFS) -> Process (Resource FCFS))
-> Simulation (Resource FCFS) -> Process (Resource FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation (Resource FCFS)
forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
IORef (Maybe a)
ref <- IO (IORef (Maybe a)) -> Process (IORef (Maybe a))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe a)) -> Process (IORef (Maybe a)))
-> IO (IORef (Maybe a)) -> Process (IORef (Maybe a))
forall a b. (a -> b) -> a -> b
$ Maybe a -> IO (IORef (Maybe a))
forall a. a -> IO (IORef a)
newIORef Maybe a
forall a. Maybe a
Nothing
let writer :: Stream a -> Process b
writer Stream a
p =
do (a
a, Stream a
xs) <- Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p
Resource s -> Process ()
forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource s
writing
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
Resource FCFS -> Process ()
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
reading
Resource FCFS -> Process ()
forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
conting
Stream a -> Process b
writer Stream a
xs
reader :: Process a
reader =
do Resource FCFS -> Process ()
forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
reading
Just a
a <- IO (Maybe a) -> Process (Maybe a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> Process (Maybe a))
-> IO (Maybe a) -> Process (Maybe a)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe a) -> IO (Maybe a)
forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref Maybe a
forall a. Maybe a
Nothing
Resource s -> Process ()
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource s
writing
a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
[Stream a] -> (Stream a -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Stream a]
streams ((Stream a -> Process ()) -> Process ())
-> (Stream a -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ Process () -> Process ()
spawnProcess (Process () -> Process ())
-> (Stream a -> Process ()) -> Stream a -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream a -> Process ()
forall {b}. Stream a -> Process b
writer
a
a <- Process a
reader
let xs :: Stream a
xs = Process a -> Stream a
forall a. Process a -> Stream a
repeatProcess (Resource FCFS -> Process ()
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
conting Process () -> Process a -> Process a
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process a
reader)
(a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
xs)
concatPriorityStreams :: PriorityQueueStrategy s p
=> s
-> [Stream (p, a)]
-> Stream a
concatPriorityStreams :: forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream (p, a)] -> Stream a
concatPriorityStreams s
s [Stream (p, a)]
streams = Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
z where
z :: Process (a, Stream a)
z = do Resource FCFS
reading <- Simulation (Resource FCFS) -> Process (Resource FCFS)
forall a. Simulation a -> Process a
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Resource FCFS) -> Process (Resource FCFS))
-> Simulation (Resource FCFS) -> Process (Resource FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation (Resource FCFS)
forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
Resource s
writing <- Simulation (Resource s) -> Process (Resource s)
forall a. Simulation a -> Process a
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Resource s) -> Process (Resource s))
-> Simulation (Resource s) -> Process (Resource s)
forall a b. (a -> b) -> a -> b
$ s -> Int -> Maybe Int -> Simulation (Resource s)
forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount s
s Int
1 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
Resource FCFS
conting <- Simulation (Resource FCFS) -> Process (Resource FCFS)
forall a. Simulation a -> Process a
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Resource FCFS) -> Process (Resource FCFS))
-> Simulation (Resource FCFS) -> Process (Resource FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation (Resource FCFS)
forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
IORef (Maybe a)
ref <- IO (IORef (Maybe a)) -> Process (IORef (Maybe a))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe a)) -> Process (IORef (Maybe a)))
-> IO (IORef (Maybe a)) -> Process (IORef (Maybe a))
forall a b. (a -> b) -> a -> b
$ Maybe a -> IO (IORef (Maybe a))
forall a. a -> IO (IORef a)
newIORef Maybe a
forall a. Maybe a
Nothing
let writer :: Stream (p, a) -> Process b
writer Stream (p, a)
p =
do ((p
priority, a
a), Stream (p, a)
xs) <- Stream (p, a) -> Process ((p, a), Stream (p, a))
forall a. Stream a -> Process (a, Stream a)
runStream Stream (p, a)
p
Resource s -> p -> Process ()
forall s p.
PriorityQueueStrategy s p =>
Resource s -> p -> Process ()
requestResourceWithPriority Resource s
writing p
priority
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
Resource FCFS -> Process ()
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
reading
Resource FCFS -> Process ()
forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
conting
Stream (p, a) -> Process b
writer Stream (p, a)
xs
reader :: Process a
reader =
do Resource FCFS -> Process ()
forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
reading
Just a
a <- IO (Maybe a) -> Process (Maybe a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> Process (Maybe a))
-> IO (Maybe a) -> Process (Maybe a)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe a) -> IO (Maybe a)
forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref Maybe a
forall a. Maybe a
Nothing
Resource s -> Process ()
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource s
writing
a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
[Stream (p, a)] -> (Stream (p, a) -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Stream (p, a)]
streams ((Stream (p, a) -> Process ()) -> Process ())
-> (Stream (p, a) -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ Process () -> Process ()
spawnProcess (Process () -> Process ())
-> (Stream (p, a) -> Process ()) -> Stream (p, a) -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream (p, a) -> Process ()
forall {b}. Stream (p, a) -> Process b
writer
a
a <- Process a
reader
let xs :: Stream a
xs = Process a -> Stream a
forall a. Process a -> Stream a
repeatProcess (Resource FCFS -> Process ()
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
conting Process () -> Process a -> Process a
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process a
reader)
(a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
xs)
mergeStreams :: Stream a -> Stream a -> Stream a
mergeStreams :: forall a. Stream a -> Stream a -> Stream a
mergeStreams = FCFS -> Stream a -> Stream a -> Stream a
forall s a.
EnqueueStrategy s =>
s -> Stream a -> Stream a -> Stream a
mergeQueuedStreams FCFS
FCFS
mergeQueuedStreams :: EnqueueStrategy s
=> s
-> Stream a
-> Stream a
-> Stream a
mergeQueuedStreams :: forall s a.
EnqueueStrategy s =>
s -> Stream a -> Stream a -> Stream a
mergeQueuedStreams s
s Stream a
x Stream a
y = s -> [Stream a] -> Stream a
forall s a. EnqueueStrategy s => s -> [Stream a] -> Stream a
concatQueuedStreams s
s [Stream a
x, Stream a
y]
mergePriorityStreams :: PriorityQueueStrategy s p
=> s
-> Stream (p, a)
-> Stream (p, a)
-> Stream a
mergePriorityStreams :: forall s p a.
PriorityQueueStrategy s p =>
s -> Stream (p, a) -> Stream (p, a) -> Stream a
mergePriorityStreams s
s Stream (p, a)
x Stream (p, a)
y = s -> [Stream (p, a)] -> Stream a
forall s p a.
PriorityQueueStrategy s p =>
s -> [Stream (p, a)] -> Stream a
concatPriorityStreams s
s [Stream (p, a)
x, Stream (p, a)
y]
emptyStream :: Stream a
emptyStream :: forall a. Stream a
emptyStream = Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
forall a. Process a
neverProcess
consumeStream :: (a -> Process ()) -> Stream a -> Process ()
consumeStream :: forall a. (a -> Process ()) -> Stream a -> Process ()
consumeStream a -> Process ()
f = Stream a -> Process ()
forall {b}. Stream a -> Process b
p where
p :: Stream a -> Process b
p (Cons Process (a, Stream a)
s) = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
a -> Process ()
f a
a
Stream a -> Process b
p Stream a
xs
sinkStream :: Stream a -> Process ()
sinkStream :: forall a. Stream a -> Process ()
sinkStream = Stream a -> Process ()
forall {a} {b}. Stream a -> Process b
p where
p :: Stream a -> Process b
p (Cons Process (a, Stream a)
s) = do (a
a, Stream a
xs) <- Process (a, Stream a)
s
Stream a -> Process b
p Stream a
xs
prefetchStream :: Stream a -> Stream a
prefetchStream :: forall a. Stream a -> Stream a
prefetchStream Stream a
s = Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
z where
z :: Process (a, Stream a)
z = do Resource FCFS
reading <- Simulation (Resource FCFS) -> Process (Resource FCFS)
forall a. Simulation a -> Process a
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Resource FCFS) -> Process (Resource FCFS))
-> Simulation (Resource FCFS) -> Process (Resource FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation (Resource FCFS)
forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
Resource FCFS
writing <- Simulation (Resource FCFS) -> Process (Resource FCFS)
forall a. Simulation a -> Process a
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Resource FCFS) -> Process (Resource FCFS))
-> Simulation (Resource FCFS) -> Process (Resource FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation (Resource FCFS)
forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
1 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
IORef (Maybe a)
ref <- IO (IORef (Maybe a)) -> Process (IORef (Maybe a))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe a)) -> Process (IORef (Maybe a)))
-> IO (IORef (Maybe a)) -> Process (IORef (Maybe a))
forall a b. (a -> b) -> a -> b
$ Maybe a -> IO (IORef (Maybe a))
forall a. a -> IO (IORef a)
newIORef Maybe a
forall a. Maybe a
Nothing
let writer :: Stream a -> Process b
writer Stream a
p =
do (a
a, Stream a
xs) <- Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p
Resource FCFS -> Process ()
forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
writing
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
Resource FCFS -> Process ()
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
reading
Stream a -> Process b
writer Stream a
xs
reader :: Process a
reader =
do Resource FCFS -> Process ()
forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
reading
Just a
a <- IO (Maybe a) -> Process (Maybe a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> Process (Maybe a))
-> IO (Maybe a) -> Process (Maybe a)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe a) -> IO (Maybe a)
forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref Maybe a
forall a. Maybe a
Nothing
Resource FCFS -> Process ()
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
writing
a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Process () -> Process ()
spawnProcess (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ Stream a -> Process ()
forall {b}. Stream a -> Process b
writer Stream a
s
Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream (Stream a -> Process (a, Stream a))
-> Stream a -> Process (a, Stream a)
forall a b. (a -> b) -> a -> b
$ Process a -> Stream a
forall a. Process a -> Stream a
repeatProcess Process a
reader
queuedSignalStream :: (a -> Event ())
-> Process a
-> Signal a
-> Composite (Stream a)
queuedSignalStream :: forall a.
(a -> Event ()) -> Process a -> Signal a -> Composite (Stream a)
queuedSignalStream a -> Event ()
enqueue Process a
dequeue Signal a
s =
do DisposableEvent
h <- Event DisposableEvent -> Composite DisposableEvent
forall a. Event a -> Composite a
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event DisposableEvent -> Composite DisposableEvent)
-> Event DisposableEvent -> Composite DisposableEvent
forall a b. (a -> b) -> a -> b
$
Signal a -> (a -> Event ()) -> Event DisposableEvent
forall a. Signal a -> (a -> Event ()) -> Event DisposableEvent
handleSignal Signal a
s a -> Event ()
enqueue
DisposableEvent -> Composite ()
disposableComposite DisposableEvent
h
Stream a -> Composite (Stream a)
forall a. a -> Composite a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream a -> Composite (Stream a))
-> Stream a -> Composite (Stream a)
forall a b. (a -> b) -> a -> b
$ Process a -> Stream a
forall a. Process a -> Stream a
repeatProcess Process a
dequeue
signalStream :: Signal a -> Composite (Stream a)
signalStream :: forall a. Signal a -> Composite (Stream a)
signalStream Signal a
s =
do FCFSQueue a
q <- Simulation (FCFSQueue a) -> Composite (FCFSQueue a)
forall a. Simulation a -> Composite a
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation Simulation (FCFSQueue a)
forall a. Simulation (FCFSQueue a)
IQ.newFCFSQueue
(a -> Event ()) -> Process a -> Signal a -> Composite (Stream a)
forall a.
(a -> Event ()) -> Process a -> Signal a -> Composite (Stream a)
queuedSignalStream (FCFSQueue a -> a -> Event ()
forall sm so a.
(EnqueueStrategy sm, DequeueStrategy so) =>
Queue sm so a -> a -> Event ()
IQ.enqueue FCFSQueue a
q) (FCFSQueue a -> Process a
forall sm so a.
(DequeueStrategy sm, EnqueueStrategy so) =>
Queue sm so a -> Process a
IQ.dequeue FCFSQueue a
q) Signal a
s
streamSignal :: Stream a -> Composite (Signal a)
streamSignal :: forall a. Stream a -> Composite (Signal a)
streamSignal Stream a
z =
do SignalSource a
s <- Simulation (SignalSource a) -> Composite (SignalSource a)
forall a. Simulation a -> Composite a
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation Simulation (SignalSource a)
forall a. Simulation (SignalSource a)
newSignalSource
ProcessId
pid <- Simulation ProcessId -> Composite ProcessId
forall a. Simulation a -> Composite a
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation Simulation ProcessId
newProcessId
Event () -> Composite ()
forall a. Event a -> Composite a
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event () -> Composite ()) -> Event () -> Composite ()
forall a b. (a -> b) -> a -> b
$
ProcessId -> Process () -> Event ()
runProcessUsingId ProcessId
pid (Process () -> Event ()) -> Process () -> Event ()
forall a b. (a -> b) -> a -> b
$
(a -> Process ()) -> Stream a -> Process ()
forall a. (a -> Process ()) -> Stream a -> Process ()
consumeStream (Event () -> Process ()
forall a. Event a -> Process a
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event () -> Process ()) -> (a -> Event ()) -> a -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SignalSource a -> a -> Event ()
forall a. SignalSource a -> a -> Event ()
triggerSignal SignalSource a
s) Stream a
z
DisposableEvent -> Composite ()
disposableComposite (DisposableEvent -> Composite ())
-> DisposableEvent -> Composite ()
forall a b. (a -> b) -> a -> b
$
Event () -> DisposableEvent
DisposableEvent (Event () -> DisposableEvent) -> Event () -> DisposableEvent
forall a b. (a -> b) -> a -> b
$
ProcessId -> Event ()
cancelProcessWithId ProcessId
pid
Signal a -> Composite (Signal a)
forall a. a -> Composite a
forall (m :: * -> *) a. Monad m => a -> m a
return (Signal a -> Composite (Signal a))
-> Signal a -> Composite (Signal a)
forall a b. (a -> b) -> a -> b
$ SignalSource a -> Signal a
forall a. SignalSource a -> Signal a
publishSignal SignalSource a
s
arrivalStream :: Stream a -> Stream (Arrival a)
arrivalStream :: forall a. Stream a -> Stream (Arrival a)
arrivalStream Stream a
s = Process (Arrival a, Stream (Arrival a)) -> Stream (Arrival a)
forall a. Process (a, Stream a) -> Stream a
Cons (Process (Arrival a, Stream (Arrival a)) -> Stream (Arrival a))
-> Process (Arrival a, Stream (Arrival a)) -> Stream (Arrival a)
forall a b. (a -> b) -> a -> b
$ Stream a -> Maybe Double -> Process (Arrival a, Stream (Arrival a))
forall {a}.
Stream a -> Maybe Double -> Process (Arrival a, Stream (Arrival a))
loop Stream a
s Maybe Double
forall a. Maybe a
Nothing where
loop :: Stream a -> Maybe Double -> Process (Arrival a, Stream (Arrival a))
loop Stream a
s Maybe Double
t0 = do (a
a, Stream a
xs) <- Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
Double
t <- Dynamics Double -> Process Double
forall a. Dynamics a -> Process a
forall (m :: * -> *) a. DynamicsLift m => Dynamics a -> m a
liftDynamics Dynamics Double
time
let b :: Arrival a
b = Arrival { arrivalValue :: a
arrivalValue = a
a,
arrivalTime :: Double
arrivalTime = Double
t,
arrivalDelay :: Maybe Double
arrivalDelay =
case Maybe Double
t0 of
Maybe Double
Nothing -> Maybe Double
forall a. Maybe a
Nothing
Just Double
t0 -> Double -> Maybe Double
forall a. a -> Maybe a
Just (Double
t Double -> Double -> Double
forall a. Num a => a -> a -> a
- Double
t0) }
(Arrival a, Stream (Arrival a))
-> Process (Arrival a, Stream (Arrival a))
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Arrival a
b, Process (Arrival a, Stream (Arrival a)) -> Stream (Arrival a)
forall a. Process (a, Stream a) -> Stream a
Cons (Process (Arrival a, Stream (Arrival a)) -> Stream (Arrival a))
-> Process (Arrival a, Stream (Arrival a)) -> Stream (Arrival a)
forall a b. (a -> b) -> a -> b
$ Stream a -> Maybe Double -> Process (Arrival a, Stream (Arrival a))
loop Stream a
xs (Double -> Maybe Double
forall a. a -> Maybe a
Just Double
t))
delayStream :: a -> Stream a -> Stream a
delayStream :: forall a. a -> Stream a -> Stream a
delayStream a
a0 Stream a
s = Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons (Process (a, Stream a) -> Stream a)
-> Process (a, Stream a) -> Stream a
forall a b. (a -> b) -> a -> b
$ (a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a0, Stream a
s)
singletonStream :: a -> Stream a
singletonStream :: forall a. a -> Stream a
singletonStream a
a = Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons (Process (a, Stream a) -> Stream a)
-> Process (a, Stream a) -> Stream a
forall a b. (a -> b) -> a -> b
$ (a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
forall a. Stream a
emptyStream)
joinStream :: Process (Stream a) -> Stream a
joinStream :: forall a. Process (Stream a) -> Stream a
joinStream Process (Stream a)
m = Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons (Process (a, Stream a) -> Stream a)
-> Process (a, Stream a) -> Stream a
forall a b. (a -> b) -> a -> b
$ Process (Stream a)
m Process (Stream a)
-> (Stream a -> Process (a, Stream a)) -> Process (a, Stream a)
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream
failoverStream :: [Stream a] -> Stream a
failoverStream :: forall a. [Stream a] -> Stream a
failoverStream [Stream a]
ps = Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons Process (a, Stream a)
z where
z :: Process (a, Stream a)
z = do Resource FCFS
reading <- Simulation (Resource FCFS) -> Process (Resource FCFS)
forall a. Simulation a -> Process a
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Resource FCFS) -> Process (Resource FCFS))
-> Simulation (Resource FCFS) -> Process (Resource FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation (Resource FCFS)
forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
Resource FCFS
writing <- Simulation (Resource FCFS) -> Process (Resource FCFS)
forall a. Simulation a -> Process a
forall (m :: * -> *) a. SimulationLift m => Simulation a -> m a
liftSimulation (Simulation (Resource FCFS) -> Process (Resource FCFS))
-> Simulation (Resource FCFS) -> Process (Resource FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation (Resource FCFS)
forall s.
QueueStrategy s =>
s -> Int -> Maybe Int -> Simulation (Resource s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
IORef (Maybe a)
ref <- IO (IORef (Maybe a)) -> Process (IORef (Maybe a))
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe a)) -> Process (IORef (Maybe a)))
-> IO (IORef (Maybe a)) -> Process (IORef (Maybe a))
forall a b. (a -> b) -> a -> b
$ Maybe a -> IO (IORef (Maybe a))
forall a. a -> IO (IORef a)
newIORef Maybe a
forall a. Maybe a
Nothing
ProcessId
pid <- Process ProcessId
processId
let writer :: Stream a -> Process b
writer Stream a
p =
do Resource FCFS -> Process ()
forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
writing
ProcessId
pid' <- Process ProcessId
processId
(a
a, Stream a
xs) <-
Process (a, Stream a) -> Process () -> Process (a, Stream a)
forall a b. Process a -> Process b -> Process a
finallyProcess (Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
p) (Process () -> Process (a, Stream a))
-> Process () -> Process (a, Stream a)
forall a b. (a -> b) -> a -> b
$
Event () -> Process ()
forall a. Event a -> Process a
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event () -> Process ()) -> Event () -> Process ()
forall a b. (a -> b) -> a -> b
$
do Bool
cancelled' <- ProcessId -> Event Bool
processCancelled ProcessId
pid'
Bool -> Event () -> Event ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cancelled' (Event () -> Event ()) -> Event () -> Event ()
forall a b. (a -> b) -> a -> b
$
Resource FCFS -> Event ()
forall s. DequeueStrategy s => Resource s -> Event ()
releaseResourceWithinEvent Resource FCFS
writing
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
Resource FCFS -> Process ()
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
reading
Stream a -> Process b
writer Stream a
xs
reader :: Process a
reader =
do Resource FCFS -> Process ()
forall s. DequeueStrategy s => Resource s -> Process ()
releaseResource Resource FCFS
writing
Resource FCFS -> Process ()
forall s. EnqueueStrategy s => Resource s -> Process ()
requestResource Resource FCFS
reading
Just a
a <- IO (Maybe a) -> Process (Maybe a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> Process (Maybe a))
-> IO (Maybe a) -> Process (Maybe a)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe a) -> IO (Maybe a)
forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref Maybe a
forall a. Maybe a
Nothing
a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
loop :: [Stream a] -> Process ()
loop [] = () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
loop (Stream a
p: [Stream a]
ps) =
do ProcessId
pid' <- Process ProcessId
processId
DisposableEvent
h' <- Event DisposableEvent -> Process DisposableEvent
forall a. Event a -> Process a
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event DisposableEvent -> Process DisposableEvent)
-> Event DisposableEvent -> Process DisposableEvent
forall a b. (a -> b) -> a -> b
$
Signal () -> (() -> Event ()) -> Event DisposableEvent
forall a. Signal a -> (a -> Event ()) -> Event DisposableEvent
handleSignal (ProcessId -> Signal ()
processCancelling ProcessId
pid) ((() -> Event ()) -> Event DisposableEvent)
-> (() -> Event ()) -> Event DisposableEvent
forall a b. (a -> b) -> a -> b
$ \() ->
ProcessId -> Event ()
cancelProcessWithId ProcessId
pid'
Process () -> Process () -> Process ()
forall a b. Process a -> Process b -> Process a
finallyProcess (Stream a -> Process ()
forall {b}. Stream a -> Process b
writer Stream a
p) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
Event () -> Process ()
forall a. Event a -> Process a
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event () -> Process ()) -> Event () -> Process ()
forall a b. (a -> b) -> a -> b
$
do DisposableEvent -> Event ()
disposeEvent DisposableEvent
h'
Bool
cancelled <- ProcessId -> Event Bool
processCancelled ProcessId
pid
Bool -> Event () -> Event ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
cancelled (Event () -> Event ()) -> Event () -> Event ()
forall a b. (a -> b) -> a -> b
$
do Bool
cancelled' <- ProcessId -> Event Bool
processCancelled ProcessId
pid'
Bool -> Event () -> Event ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
cancelled' (Event () -> Event ()) -> Event () -> Event ()
forall a b. (a -> b) -> a -> b
$
[Char] -> Event ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Expected the sub-process to be cancelled: failoverStream"
Process () -> Event ()
runProcess (Process () -> Event ()) -> Process () -> Event ()
forall a b. (a -> b) -> a -> b
$ [Stream a] -> Process ()
loop [Stream a]
ps
Event () -> Process ()
forall a. Event a -> Process a
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event () -> Process ()) -> Event () -> Process ()
forall a b. (a -> b) -> a -> b
$ Process () -> Event ()
runProcess (Process () -> Event ()) -> Process () -> Event ()
forall a b. (a -> b) -> a -> b
$ [Stream a] -> Process ()
loop [Stream a]
ps
Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream (Stream a -> Process (a, Stream a))
-> Stream a -> Process (a, Stream a)
forall a b. (a -> b) -> a -> b
$ Process a -> Stream a
forall a. Process a -> Stream a
repeatProcess Process a
reader
takeStream :: Int -> Stream a -> Stream a
takeStream :: forall a. Int -> Stream a -> Stream a
takeStream Int
n Stream a
s
| Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = Stream a
forall a. Stream a
emptyStream
| Bool
otherwise =
Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons (Process (a, Stream a) -> Stream a)
-> Process (a, Stream a) -> Stream a
forall a b. (a -> b) -> a -> b
$
do (a
a, Stream a
xs) <- Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
(a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Int -> Stream a -> Stream a
forall a. Int -> Stream a -> Stream a
takeStream (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Stream a
xs)
takeStreamWhile :: (a -> Bool) -> Stream a -> Stream a
takeStreamWhile :: forall a. (a -> Bool) -> Stream a -> Stream a
takeStreamWhile a -> Bool
p Stream a
s =
Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons (Process (a, Stream a) -> Stream a)
-> Process (a, Stream a) -> Stream a
forall a b. (a -> b) -> a -> b
$
do (a
a, Stream a
xs) <- Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
if a -> Bool
p a
a
then (a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, (a -> Bool) -> Stream a -> Stream a
forall a. (a -> Bool) -> Stream a -> Stream a
takeStreamWhile a -> Bool
p Stream a
xs)
else Process (a, Stream a)
forall a. Process a
neverProcess
takeStreamWhileM :: (a -> Process Bool) -> Stream a -> Stream a
takeStreamWhileM :: forall a. (a -> Process Bool) -> Stream a -> Stream a
takeStreamWhileM a -> Process Bool
p Stream a
s =
Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons (Process (a, Stream a) -> Stream a)
-> Process (a, Stream a) -> Stream a
forall a b. (a -> b) -> a -> b
$
do (a
a, Stream a
xs) <- Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
Bool
f <- a -> Process Bool
p a
a
if Bool
f
then (a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, (a -> Process Bool) -> Stream a -> Stream a
forall a. (a -> Process Bool) -> Stream a -> Stream a
takeStreamWhileM a -> Process Bool
p Stream a
xs)
else Process (a, Stream a)
forall a. Process a
neverProcess
dropStream :: Int -> Stream a -> Stream a
dropStream :: forall a. Int -> Stream a -> Stream a
dropStream Int
n Stream a
s
| Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = Stream a
s
| Bool
otherwise =
Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons (Process (a, Stream a) -> Stream a)
-> Process (a, Stream a) -> Stream a
forall a b. (a -> b) -> a -> b
$
do (a
a, Stream a
xs) <- Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream (Stream a -> Process (a, Stream a))
-> Stream a -> Process (a, Stream a)
forall a b. (a -> b) -> a -> b
$ Int -> Stream a -> Stream a
forall a. Int -> Stream a -> Stream a
dropStream (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Stream a
xs
dropStreamWhile :: (a -> Bool) -> Stream a -> Stream a
dropStreamWhile :: forall a. (a -> Bool) -> Stream a -> Stream a
dropStreamWhile a -> Bool
p Stream a
s =
Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons (Process (a, Stream a) -> Stream a)
-> Process (a, Stream a) -> Stream a
forall a b. (a -> b) -> a -> b
$
do (a
a, Stream a
xs) <- Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
if a -> Bool
p a
a
then Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream (Stream a -> Process (a, Stream a))
-> Stream a -> Process (a, Stream a)
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream a -> Stream a
forall a. (a -> Bool) -> Stream a -> Stream a
dropStreamWhile a -> Bool
p Stream a
xs
else (a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
xs)
dropStreamWhileM :: (a -> Process Bool) -> Stream a -> Stream a
dropStreamWhileM :: forall a. (a -> Process Bool) -> Stream a -> Stream a
dropStreamWhileM a -> Process Bool
p Stream a
s =
Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons (Process (a, Stream a) -> Stream a)
-> Process (a, Stream a) -> Stream a
forall a b. (a -> b) -> a -> b
$
do (a
a, Stream a
xs) <- Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
Bool
f <- a -> Process Bool
p a
a
if Bool
f
then Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream (Stream a -> Process (a, Stream a))
-> Stream a -> Process (a, Stream a)
forall a b. (a -> b) -> a -> b
$ (a -> Process Bool) -> Stream a -> Stream a
forall a. (a -> Process Bool) -> Stream a -> Stream a
dropStreamWhileM a -> Process Bool
p Stream a
xs
else (a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream a
xs)
cloneStream :: Int -> Stream a -> Simulation [Stream a]
cloneStream :: forall a. Int -> Stream a -> Simulation [Stream a]
cloneStream Int
n Stream a
s =
do [FCFSQueue a]
qs <- [Int]
-> (Int -> Simulation (FCFSQueue a)) -> Simulation [FCFSQueue a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1..Int
n] ((Int -> Simulation (FCFSQueue a)) -> Simulation [FCFSQueue a])
-> (Int -> Simulation (FCFSQueue a)) -> Simulation [FCFSQueue a]
forall a b. (a -> b) -> a -> b
$ \Int
i -> Simulation (FCFSQueue a)
forall a. Simulation (FCFSQueue a)
IQ.newFCFSQueue
Resource FCFS
rs <- Int -> Simulation (Resource FCFS)
newFCFSResource Int
1
IORef (Stream a)
ref <- IO (IORef (Stream a)) -> Simulation (IORef (Stream a))
forall a. IO a -> Simulation a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Stream a)) -> Simulation (IORef (Stream a)))
-> IO (IORef (Stream a)) -> Simulation (IORef (Stream a))
forall a b. (a -> b) -> a -> b
$ Stream a -> IO (IORef (Stream a))
forall a. a -> IO (IORef a)
newIORef Stream a
s
let reader :: a -> Queue sm so a -> Process a
reader a
m Queue sm so a
q =
do Maybe a
a <- Event (Maybe a) -> Process (Maybe a)
forall a. Event a -> Process a
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event (Maybe a) -> Process (Maybe a))
-> Event (Maybe a) -> Process (Maybe a)
forall a b. (a -> b) -> a -> b
$ Queue sm so a -> Event (Maybe a)
forall sm so a.
DequeueStrategy sm =>
Queue sm so a -> Event (Maybe a)
IQ.tryDequeue Queue sm so a
q
case Maybe a
a of
Just a
a -> a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Maybe a
Nothing ->
Resource FCFS -> Process a -> Process a
forall s a.
EnqueueStrategy s =>
Resource s -> Process a -> Process a
usingResource Resource FCFS
rs (Process a -> Process a) -> Process a -> Process a
forall a b. (a -> b) -> a -> b
$
do Maybe a
a <- Event (Maybe a) -> Process (Maybe a)
forall a. Event a -> Process a
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event (Maybe a) -> Process (Maybe a))
-> Event (Maybe a) -> Process (Maybe a)
forall a b. (a -> b) -> a -> b
$ Queue sm so a -> Event (Maybe a)
forall sm so a.
DequeueStrategy sm =>
Queue sm so a -> Event (Maybe a)
IQ.tryDequeue Queue sm so a
q
case Maybe a
a of
Just a
a -> a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Maybe a
Nothing ->
do Stream a
s <- IO (Stream a) -> Process (Stream a)
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Stream a) -> Process (Stream a))
-> IO (Stream a) -> Process (Stream a)
forall a b. (a -> b) -> a -> b
$ IORef (Stream a) -> IO (Stream a)
forall a. IORef a -> IO a
readIORef IORef (Stream a)
ref
(a
a, Stream a
xs) <- Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ IORef (Stream a) -> Stream a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Stream a)
ref Stream a
xs
[(a, FCFSQueue a)]
-> ((a, FCFSQueue a) -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([a] -> [FCFSQueue a] -> [(a, FCFSQueue a)]
forall a b. [a] -> [b] -> [(a, b)]
zip [a
1..] [FCFSQueue a]
qs) (((a, FCFSQueue a) -> Process ()) -> Process ())
-> ((a, FCFSQueue a) -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \(a
i, FCFSQueue a
q) ->
Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (a
i a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
m) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$
Event () -> Process ()
forall a. Event a -> Process a
forall (m :: * -> *) a. EventLift m => Event a -> m a
liftEvent (Event () -> Process ()) -> Event () -> Process ()
forall a b. (a -> b) -> a -> b
$ FCFSQueue a -> a -> Event ()
forall sm so a.
(EnqueueStrategy sm, DequeueStrategy so) =>
Queue sm so a -> a -> Event ()
IQ.enqueue FCFSQueue a
q a
a
a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
[(Integer, FCFSQueue a)]
-> ((Integer, FCFSQueue a) -> Simulation (Stream a))
-> Simulation [Stream a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM ([Integer] -> [FCFSQueue a] -> [(Integer, FCFSQueue a)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Integer
1..] [FCFSQueue a]
qs) (((Integer, FCFSQueue a) -> Simulation (Stream a))
-> Simulation [Stream a])
-> ((Integer, FCFSQueue a) -> Simulation (Stream a))
-> Simulation [Stream a]
forall a b. (a -> b) -> a -> b
$ \(Integer
i, FCFSQueue a
q) ->
Stream a -> Simulation (Stream a)
forall a. a -> Simulation a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream a -> Simulation (Stream a))
-> Stream a -> Simulation (Stream a)
forall a b. (a -> b) -> a -> b
$ Process a -> Stream a
forall a. Process a -> Stream a
repeatProcess (Process a -> Stream a) -> Process a -> Stream a
forall a b. (a -> b) -> a -> b
$ Integer -> FCFSQueue a -> Process a
forall {sm} {a} {so}.
(DequeueStrategy sm, Num a, Enum a, Eq a) =>
a -> Queue sm so a -> Process a
reader Integer
i FCFSQueue a
q
firstArrivalStream :: Int -> Stream a -> Stream a
firstArrivalStream :: forall a. Int -> Stream a -> Stream a
firstArrivalStream Int
n Stream a
s = ((Int, Maybe a) -> a -> Process ((Int, Maybe a), Maybe a))
-> (Int, Maybe a) -> Stream a -> Stream a
forall acc a b.
(acc -> a -> Process (acc, Maybe b)) -> acc -> Stream a -> Stream b
assembleAccumStream (Int, Maybe a) -> a -> Process ((Int, Maybe a), Maybe a)
forall {m :: * -> *} {a}.
Monad m =>
(Int, Maybe a) -> a -> m ((Int, Maybe a), Maybe a)
f (Int
1, Maybe a
forall a. Maybe a
Nothing) Stream a
s
where f :: (Int, Maybe a) -> a -> m ((Int, Maybe a), Maybe a)
f (Int
i, Maybe a
a0) a
a =
let a0' :: Maybe a
a0' = a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> a -> Maybe a
forall a b. (a -> b) -> a -> b
$ a -> Maybe a -> a
forall a. a -> Maybe a -> a
fromMaybe a
a Maybe a
a0
in if Int
i Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then ((Int, Maybe a), Maybe a) -> m ((Int, Maybe a), Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ((Int
1, Maybe a
forall a. Maybe a
Nothing), Maybe a
a0')
else ((Int, Maybe a), Maybe a) -> m ((Int, Maybe a), Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ((Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, Maybe a
a0'), Maybe a
forall a. Maybe a
Nothing)
lastArrivalStream :: Int -> Stream a -> Stream a
lastArrivalStream :: forall a. Int -> Stream a -> Stream a
lastArrivalStream Int
n Stream a
s = (Int -> a -> Process (Int, Maybe a)) -> Int -> Stream a -> Stream a
forall acc a b.
(acc -> a -> Process (acc, Maybe b)) -> acc -> Stream a -> Stream b
assembleAccumStream Int -> a -> Process (Int, Maybe a)
forall {m :: * -> *} {a}. Monad m => Int -> a -> m (Int, Maybe a)
f Int
1 Stream a
s
where f :: Int -> a -> m (Int, Maybe a)
f Int
i a
a =
if Int
i Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then (Int, Maybe a) -> m (Int, Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
1, a -> Maybe a
forall a. a -> Maybe a
Just a
a)
else (Int, Maybe a) -> m (Int, Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, Maybe a
forall a. Maybe a
Nothing)
assembleAccumStream :: (acc -> a -> Process (acc, Maybe b)) -> acc -> Stream a -> Stream b
assembleAccumStream :: forall acc a b.
(acc -> a -> Process (acc, Maybe b)) -> acc -> Stream a -> Stream b
assembleAccumStream acc -> a -> Process (acc, Maybe b)
f acc
acc Stream a
s =
(Maybe b -> b) -> Stream (Maybe b) -> Stream b
forall a b. (a -> b) -> Stream a -> Stream b
mapStream Maybe b -> b
forall a. HasCallStack => Maybe a -> a
fromJust (Stream (Maybe b) -> Stream b) -> Stream (Maybe b) -> Stream b
forall a b. (a -> b) -> a -> b
$
(Maybe b -> Bool) -> Stream (Maybe b) -> Stream (Maybe b)
forall a. (a -> Bool) -> Stream a -> Stream a
filterStream Maybe b -> Bool
forall a. Maybe a -> Bool
isJust (Stream (Maybe b) -> Stream (Maybe b))
-> Stream (Maybe b) -> Stream (Maybe b)
forall a b. (a -> b) -> a -> b
$
(acc -> a -> Process (acc, Maybe b))
-> acc -> Stream a -> Stream (Maybe b)
forall acc a b.
(acc -> a -> Process (acc, b)) -> acc -> Stream a -> Stream b
accumStream acc -> a -> Process (acc, Maybe b)
f acc
acc Stream a
s
traceStream :: Maybe String
-> Maybe String
-> Stream a
-> Stream a
traceStream :: forall a. Maybe [Char] -> Maybe [Char] -> Stream a -> Stream a
traceStream Maybe [Char]
request Maybe [Char]
response Stream a
s = Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons (Process (a, Stream a) -> Stream a)
-> Process (a, Stream a) -> Stream a
forall a b. (a -> b) -> a -> b
$ Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
loop Stream a
s where
loop :: Stream a -> Process (a, Stream a)
loop Stream a
s = do (a
a, Stream a
xs) <-
case Maybe [Char]
request of
Maybe [Char]
Nothing -> Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
Just [Char]
message ->
[Char] -> Process (a, Stream a) -> Process (a, Stream a)
forall a. [Char] -> Process a -> Process a
traceProcess [Char]
message (Process (a, Stream a) -> Process (a, Stream a))
-> Process (a, Stream a) -> Process (a, Stream a)
forall a b. (a -> b) -> a -> b
$
Stream a -> Process (a, Stream a)
forall a. Stream a -> Process (a, Stream a)
runStream Stream a
s
case Maybe [Char]
response of
Maybe [Char]
Nothing -> (a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons (Process (a, Stream a) -> Stream a)
-> Process (a, Stream a) -> Stream a
forall a b. (a -> b) -> a -> b
$ Stream a -> Process (a, Stream a)
loop Stream a
xs)
Just [Char]
message ->
[Char] -> Process (a, Stream a) -> Process (a, Stream a)
forall a. [Char] -> Process a -> Process a
traceProcess [Char]
message (Process (a, Stream a) -> Process (a, Stream a))
-> Process (a, Stream a) -> Process (a, Stream a)
forall a b. (a -> b) -> a -> b
$
(a, Stream a) -> Process (a, Stream a)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Process (a, Stream a) -> Stream a
forall a. Process (a, Stream a) -> Stream a
Cons (Process (a, Stream a) -> Stream a)
-> Process (a, Stream a) -> Stream a
forall a b. (a -> b) -> a -> b
$ Stream a -> Process (a, Stream a)
loop Stream a
xs)