module Simulation.Aivika.Stream
(
Stream(..),
emptyStream,
mergeStreams,
mergeQueuedStreams,
mergePriorityStreams,
concatStreams,
concatQueuedStreams,
concatPriorityStreams,
splitStream,
splitStreamQueuing,
splitStreamPrioritising,
streamUsingId,
prefetchStream,
memoStream,
zipStreamSeq,
zipStreamParallel,
zip3StreamSeq,
zip3StreamParallel,
unzipStream,
streamSeq,
streamParallel,
consumeStream,
sinkStream,
repeatProcess,
mapStream,
mapStreamM,
apStreamDataFirst,
apStreamDataLater,
apStreamParallel,
filterStream,
filterStreamM,
leftStream,
rightStream,
replaceLeftStream,
replaceRightStream,
partitionEitherStream) where
import Data.IORef
import Data.Maybe
import Data.Monoid
import Control.Monad
import Control.Monad.Trans
import Simulation.Aivika.Simulation
import Simulation.Aivika.Cont
import Simulation.Aivika.Process
import Simulation.Aivika.Resource
import Simulation.Aivika.QueueStrategy
newtype Stream a = Cons { runStream :: Process (a, Stream a)
}
instance Functor Stream where
fmap f (Cons s) = Cons y where
y = do ~(x, xs) <- s
return (f x, fmap f xs)
instance Monoid (Stream a) where
mempty = emptyStream
mappend = mergeStreams
mconcat = concatStreams
streamUsingId :: ProcessId -> Stream a -> Stream a
streamUsingId pid (Cons s) =
Cons $ processUsingId pid s
memoStream :: Stream a -> Simulation (Stream a)
memoStream (Cons s) =
do p <- memoProcess $
do ~(x, xs) <- s
xs' <- liftSimulation $ memoStream xs
return (x, xs')
return (Cons p)
zipStreamSeq :: Stream a -> Stream b -> Stream (a, b)
zipStreamSeq (Cons sa) (Cons sb) = Cons y where
y = do ~(x, xs) <- sa
~(y, ys) <- sb
return ((x, y), zipStreamSeq xs ys)
zipStreamParallel :: Stream a -> Stream b -> Stream (a, b)
zipStreamParallel (Cons sa) (Cons sb) = Cons y where
y = do ~((x, xs), (y, ys)) <- zipProcessParallel sa sb
return ((x, y), zipStreamParallel xs ys)
zip3StreamSeq :: Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamSeq (Cons sa) (Cons sb) (Cons sc) = Cons y where
y = do ~(x, xs) <- sa
~(y, ys) <- sb
~(z, zs) <- sc
return ((x, y, z), zip3StreamSeq xs ys zs)
zip3StreamParallel :: Stream a -> Stream b -> Stream c -> Stream (a, b, c)
zip3StreamParallel (Cons sa) (Cons sb) (Cons sc) = Cons y where
y = do ~((x, xs), (y, ys), (z, zs)) <- zip3ProcessParallel sa sb sc
return ((x, y, z), zip3StreamParallel xs ys zs)
unzipStream :: Stream (a, b) -> Simulation (Stream a, Stream b)
unzipStream s =
do s' <- memoStream s
let sa = mapStream fst s'
sb = mapStream snd s'
return (sa, sb)
streamSeq :: [Stream a] -> Stream [a]
streamSeq xs = Cons y where
y = do ps <- forM xs $ runStream
return (map fst ps, streamSeq $ map snd ps)
streamParallel :: [Stream a] -> Stream [a]
streamParallel xs = Cons y where
y = do ps <- processParallel $ map runStream xs
return (map fst ps, streamParallel $ map snd ps)
repeatProcess :: Process a -> Stream a
repeatProcess p = Cons y where
y = do a <- p
return (a, repeatProcess p)
mapStream :: (a -> b) -> Stream a -> Stream b
mapStream = fmap
mapStreamM :: (a -> Process b) -> Stream a -> Stream b
mapStreamM f (Cons s) = Cons y where
y = do (a, xs) <- s
b <- f a
return (b, mapStreamM f xs)
apStreamDataFirst :: Process (a -> b) -> Stream a -> Stream b
apStreamDataFirst f (Cons s) = Cons y where
y = do ~(a, xs) <- s
g <- f
return (g a, apStreamDataFirst f xs)
apStreamDataLater :: Process (a -> b) -> Stream a -> Stream b
apStreamDataLater f (Cons s) = Cons y where
y = do g <- f
~(a, xs) <- s
return (g a, apStreamDataLater f xs)
apStreamParallel :: Process (a -> b) -> Stream a -> Stream b
apStreamParallel f (Cons s) = Cons y where
y = do ~(g, (a, xs)) <- zipProcessParallel f s
return (g a, apStreamParallel f xs)
filterStream :: (a -> Bool) -> Stream a -> Stream a
filterStream p (Cons s) = Cons y where
y = do (a, xs) <- s
if p a
then return (a, filterStream p xs)
else let Cons z = filterStream p xs in z
filterStreamM :: (a -> Process Bool) -> Stream a -> Stream a
filterStreamM p (Cons s) = Cons y where
y = do (a, xs) <- s
b <- p a
if b
then return (a, filterStreamM p xs)
else let Cons z = filterStreamM p xs in z
leftStream :: Stream (Either a b) -> Stream a
leftStream (Cons s) = Cons y where
y = do (a, xs) <- s
case a of
Left a -> return (a, leftStream xs)
Right _ -> let Cons z = leftStream xs in z
rightStream :: Stream (Either a b) -> Stream b
rightStream (Cons s) = Cons y where
y = do (a, xs) <- s
case a of
Left _ -> let Cons z = rightStream xs in z
Right a -> return (a, rightStream xs)
replaceLeftStream :: Stream (Either a b) -> Stream c -> Stream (Either c b)
replaceLeftStream (Cons sab) (ys0 @ ~(Cons sc)) = Cons z where
z = do (a, xs) <- sab
case a of
Left _ ->
do (b, ys) <- sc
return (Left b, replaceLeftStream xs ys)
Right a ->
return (Right a, replaceLeftStream xs ys0)
replaceRightStream :: Stream (Either a b) -> Stream c -> Stream (Either a c)
replaceRightStream (Cons sab) (ys0 @ ~(Cons sc)) = Cons z where
z = do (a, xs) <- sab
case a of
Right _ ->
do (b, ys) <- sc
return (Right b, replaceRightStream xs ys)
Left a ->
return (Left a, replaceRightStream xs ys0)
partitionEitherStream :: Stream (Either a b) -> Simulation (Stream a, Stream b)
partitionEitherStream s =
do s' <- memoStream s
return (leftStream s', rightStream s')
splitStream :: Int -> Stream a -> Simulation [Stream a]
splitStream = splitStreamQueuing FCFS
splitStreamQueuing :: EnqueueStrategy s q
=> s
-> Int
-> Stream a
-> Simulation [Stream a]
splitStreamQueuing s n x =
do ref <- liftIO $ newIORef x
res <- newResource s 1
let reader =
usingResource res $
do p <- liftIO $ readIORef ref
(a, xs) <- runStream p
liftIO $ writeIORef ref xs
return a
return $ map (\i -> repeatProcess reader) [1..n]
splitStreamPrioritising :: PriorityQueueStrategy s q p
=> s
-> [Stream p]
-> Stream a
-> Simulation [Stream a]
splitStreamPrioritising s ps x =
do ref <- liftIO $ newIORef x
res <- newResource s 1
let stream (Cons p) = Cons z where
z = do (p', ps) <- p
a <- usingResourceWithPriority res p' $
do p <- liftIO $ readIORef ref
(a, xs) <- runStream p
liftIO $ writeIORef ref xs
return a
return (a, stream ps)
return $ map stream ps
concatStreams :: [Stream a] -> Stream a
concatStreams = concatQueuedStreams FCFS
concatQueuedStreams :: EnqueueStrategy s q
=> s
-> [Stream a]
-> Stream a
concatQueuedStreams s streams = Cons z where
z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
writing <- liftSimulation $ newResourceWithMaxCount s 1 (Just 1)
conting <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
ref <- liftIO $ newIORef Nothing
let writer p =
do (a, xs) <- runStream p
requestResource writing
liftIO $ writeIORef ref (Just a)
releaseResource reading
requestResource conting
writer xs
reader =
do requestResource reading
Just a <- liftIO $ readIORef ref
liftIO $ writeIORef ref Nothing
releaseResource writing
return a
forM_ streams $ spawnProcess CancelTogether . writer
a <- reader
let xs = repeatProcess (releaseResource conting >> reader)
return (a, xs)
concatPriorityStreams :: PriorityQueueStrategy s q p
=> s
-> [Stream (p, a)]
-> Stream a
concatPriorityStreams s streams = Cons z where
z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
writing <- liftSimulation $ newResourceWithMaxCount s 1 (Just 1)
conting <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
ref <- liftIO $ newIORef Nothing
let writer p =
do ((priority, a), xs) <- runStream p
requestResourceWithPriority writing priority
liftIO $ writeIORef ref (Just a)
releaseResource reading
requestResource conting
writer xs
reader =
do requestResource reading
Just a <- liftIO $ readIORef ref
liftIO $ writeIORef ref Nothing
releaseResource writing
return a
forM_ streams $ spawnProcess CancelTogether . writer
a <- reader
let xs = repeatProcess (releaseResource conting >> reader)
return (a, xs)
mergeStreams :: Stream a -> Stream a -> Stream a
mergeStreams = mergeQueuedStreams FCFS
mergeQueuedStreams :: EnqueueStrategy s q
=> s
-> Stream a
-> Stream a
-> Stream a
mergeQueuedStreams s x y = concatQueuedStreams s [x, y]
mergePriorityStreams :: PriorityQueueStrategy s q p
=> s
-> Stream (p, a)
-> Stream (p, a)
-> Stream a
mergePriorityStreams s x y = concatPriorityStreams s [x, y]
emptyStream :: Stream a
emptyStream = Cons z where
z = do pid <- liftSimulation newProcessId
processUsingId pid passivateProcess
error "It should never happen: emptyStream."
consumeStream :: (a -> Process ()) -> Stream a -> Process ()
consumeStream f s = p s where
p (Cons s) = do (a, xs) <- s
f a
p xs
sinkStream :: Stream a -> Process ()
sinkStream s = p s where
p (Cons s) = do (a, xs) <- s
p xs
prefetchStream :: Stream a -> Stream a
prefetchStream s = Cons z where
z = do reading <- liftSimulation $ newResourceWithMaxCount FCFS 0 (Just 1)
writing <- liftSimulation $ newResourceWithMaxCount FCFS 1 (Just 1)
ref <- liftIO $ newIORef Nothing
let writer p =
do (a, xs) <- runStream p
requestResource writing
liftIO $ writeIORef ref (Just a)
releaseResource reading
writer xs
reader =
do requestResource reading
Just a <- liftIO $ readIORef ref
liftIO $ writeIORef ref Nothing
releaseResource writing
return a
spawnProcess CancelTogether $ writer s
runStream $ repeatProcess reader