module Hpp.Streamer (Streamer(..), StreamStep(..), Source, encase,
done, yield, yields, awaits, source, liftS,
nextOutput, run,
before, (~>), processPrefix, mapping, filtering, mapStream,
mappingMaybe, onDone, mapTil, flattenTil,
Chunky(..), metamorph) where
import Control.Applicative (Alternative(..))
import Control.Monad ((<=<))
import Data.Foldable (toList)
import Data.Void
import Hpp.Types (HasError(..), HasHppState(..), HasEnv(..))
data StreamStep r i o f = Await (i -> f) f
| Yield !o f
| Done (Maybe r)
instance Functor (StreamStep r i o) where
fmap f (Await g d) = Await (f . g) (f d)
fmap f (Yield o n) = Yield o (f n)
fmap _ (Done r) = Done r
newtype Streamer m i o r =
Streamer { runStream :: m (StreamStep r i o (Streamer m i o r)) }
type Source m o r = Streamer m Void o r
encase :: Monad m => StreamStep r i o (Streamer m i o r) -> Streamer m i o r
encase = Streamer . return
instance Monad m => Functor (Streamer m i o) where
fmap f (Streamer ma) = Streamer . flip fmap ma $ \case
Await g d -> Await (fmap f . g) (fmap f d)
Yield o n -> Yield o (fmap f n)
Done r -> Done (fmap f r)
instance Monad m => Applicative (Streamer m i o) where
pure = Streamer . return . Done . Just
Streamer ma <*> g = Streamer $ ma >>= \case
Await f d -> return $ Await ((<*> g) . f) (d <*> g)
Yield o n -> return $ Yield o (n <*> g)
Done r -> maybe (runStream empty) (runStream . flip fmap g) r
instance Monad m => Alternative (Streamer m r i) where
empty = Streamer . return $ Done Nothing
Streamer ma <|> b = Streamer . flip fmap ma $ \case
Await g d -> Await ((<|> b) . g) (b <|> d)
Yield o n -> Yield o (n <|> b)
Done r -> Done r
instance (Monad m, HasError m) => HasError (Streamer m i o) where
throwError = liftS . throwError
instance (Monad m, HasHppState m) => HasHppState (Streamer m i o) where
getState = liftS getState
setState = liftS . setState
instance (Monad m, HasEnv m) => HasEnv (Streamer m i o) where
getEnv = liftS getEnv
setEnv = liftS . setEnv
yield :: Monad m => o -> Streamer m i o ()
yield o = encase $ Yield o (done ())
yields :: Monad m => o -> Streamer m i o r -> Streamer m i o r
yields = (encase .) . Yield
awaits :: Monad m => (i -> Streamer m i o r) -> Streamer m i o r
awaits f = encase $ Await f empty
done :: Monad m => r -> Streamer m i o r
done = pure
source :: (Monad m, Foldable f) => f a -> Streamer m i a ()
source = go . toList
where go [] = done ()
go (x:xs) = encase $ Yield x (go xs)
liftS :: Functor m => m a -> Streamer m i o a
liftS = Streamer . fmap (Done . Just)
run :: Monad m => Source m Void r -> m (Maybe r)
run (Streamer s) = s >>= go
where go (Done r) = return r
go (Await _ _) = error "Source is awaiting in exhaustStreamer"
go (Yield _ _) = error "A capped sink is yielding in exhaustStreamer"
nextOutput :: Monad m
=> Streamer m i o r -> m (Either (Maybe r) (o, Streamer m i o r))
nextOutput s = runStream s >>= go
where go (Await _ n) = runStream n >>= go
go (Yield o n) = return (Right (o, n))
go (Done r) = return (Left r)
mapStream :: Monad m => (a -> b) -> Streamer m i a r -> Streamer m i b r
mapStream f = go
where go (Streamer s) = Streamer $ s >>= \case
Done r -> pure $ Done r
Await g e -> pure $ Await (go . g) (go e)
Yield o n -> pure $ Yield (f o) (go n)
(~>) :: Monad m => Streamer m a b r -> Streamer m b c r' -> Streamer m a c r'
src0 ~> Streamer mb = Streamer $ mb >>= goSnk src0
where goSnk _ (Done r) = return $ Done r
goSnk src (Yield o n) = return
$ Yield o (Streamer $ runStream n >>= goSnk src)
goSnk src (Await f e) = runStream src >>= goSrc f e
goSrc _ e (Done _) = runStream e >>= goSnk empty
goSrc k _ (Yield i n) = runStream (k i) >>= goSnk n
goSrc k e (Await f' e') =
return $ Await (\i -> Streamer $ runStream (f' i) >>= goSrc k e)
(e' ~> e)
infixl 9 ~>
_feedStreamer :: Monad m => Streamer m i o r -> [i] -> m ([i], [o], Maybe r)
_feedStreamer s xs0 = runStream s >>= aux xs0 id
where aux [] acc (Await _ d) = runStream d >>= aux [] acc
aux (x:xs) acc (Await f _) = runStream (f x) >>= aux xs acc
aux xs acc (Yield o n) = runStream n >>= aux xs (acc . (o:))
aux xs acc (Done r) = return (xs, acc [], r)
before :: Monad m => Streamer m i o q -> Streamer m i o r -> Streamer m i o r
before (Streamer ma) mb = Streamer $ ma >>= go
where go (Await f d) = return $ Await (\i -> Streamer $ runStream (f i) >>= go)
(Streamer $ runStream d >>= go)
go (Yield o n) = return $ Yield o (Streamer $ runStream n >>= go)
go (Done _) = runStream mb
onDone :: Monad m
=> (Maybe r -> Maybe r')
-> Streamer m i o r
-> Streamer m i o r'
onDone f (Streamer m) = Streamer $ m >>= go
where go (Done r) = return $ Done (f r)
go (Yield o n) = return $ Yield o (Streamer $ runStream n >>= go)
go (Await f' e) = return $ Await (\i -> Streamer $ runStream (f' i) >>= go)
(Streamer $ runStream e >>= go)
mapping :: Monad m => (a -> b) -> Streamer m a b r
mapping f = go
where go = awaits (\i -> yields (f i) go)
filtering :: Monad m => (a -> Bool) -> Streamer m a a r
filtering p = go
where go = encase $ Await aux empty
aux x = if p x then encase $ Yield x go else go
mappingMaybe :: Monad m => (a -> Maybe b) -> Streamer m a b r
mappingMaybe f = go
where go = awaits (\i -> maybe go (flip yields go) $ f i)
predicateMap :: (a -> Bool) -> (a -> b) -> a -> Maybe b
predicateMap p f = \x -> if p x then Just (f x) else Nothing
maybeNot :: (a -> Bool) -> Maybe a -> Maybe a
maybeNot p = \x -> case x of
Nothing -> Nothing
Just x' -> if p x' then x else Nothing
processPrefix :: Monad m => Source m o r -> Streamer m o o r' -> Source m o r
processPrefix src0 snk = Streamer $ runStream snk >>= goSnk src0
where goSnk src (Done _) = runStream src
goSnk src (Yield o n) =
return $ Yield o (Streamer $ runStream n >>= goSnk src)
goSnk src (Await f _) = runStream src >>= goSrc f
goSrc _ d@(Done _) = return d
goSrc k (Yield i n) = runStream (k i) >>= goSnk n
goSrc k (Await f e) =
return $ Await (\i -> Streamer $ runStream (f i) >>= goSrc k)
(Streamer $ runStream e >>= goSrc k)
common :: Monad m
=> (s -> Maybe (Streamer m i o ()))
-> ((s -> Streamer m i o r -> Streamer m i' o' (Streamer m i o r))
-> s -> Streamer m i o r -> Streamer m i' o' (Streamer m i o r))
-> s
-> Streamer m i o r -> Streamer m i' o' (Streamer m i o r)
common fin nxt = go
where go acc src = encase $ Await (const (fin' acc src)) (nxt go acc src)
fin' acc src = done . maybe src (`before` src) $ fin acc
flattenTil :: Monad m
=> Source m [i] r -> Source m i (Source m [i] r)
flattenTil = common fin go []
where fin acc = if null acc then Nothing else Just (yield acc)
go k [] src = Streamer $ nextOutput src >>= \case
Left r -> return $ Done (Just (encase (Done r)))
Right (o,n) -> runStream $ go k o n
go k (x:xs) src = encase $ Yield x (k xs src)
mapTil :: Monad m
=> (a -> b)
-> Streamer m Void a r
-> Streamer m Void b (Streamer m Void a r)
mapTil f = common (const Nothing) go ()
where go k () src = Streamer $ nextOutput src >>= \case
Left r -> return $ Done (Just (encase (Done r)))
Right (o, n) -> return $ Yield (f o) (k () n)
newtype Chunky m a b = Chunky (a -> Source m b (Chunky m a b))
chunkMap :: Monad m => (b -> c) -> Chunky m a b -> Chunky m a c
chunkMap g (Chunky f) = Chunky (onDone (fmap (chunkMap g)) . mapStream g . f)
chunkConmap :: Monad m => (a -> b) -> Chunky m b c -> Chunky m a c
chunkConmap g (Chunky f) = Chunky (onDone (fmap (chunkConmap g)) . f . g)
metamorph :: Monad m
=> Chunky m a b
-> Streamer m a b ()
metamorph (Chunky f) = go
where go = awaits (Streamer . aux . f)
aux s = runStream s >>= \case
Done (Just k) -> runStream $ metamorph k
Done Nothing -> return $ Done Nothing
Await _ _ -> error "Sources shouldn't await"
Yield o n -> return $ Yield o (Streamer $ aux n)