{-# LANGUAGE LambdaCase #-} -- | Streaming input and output. module Hpp.Streamer (Streamer(..), StreamStep(..), Source, encase, done, yield, yields, awaits, source, liftS, nextOutput, run, before, (~>), processPrefix, mapping, filtering, mapStream, mappingMaybe, onDone, mapTil, flattenTil, Chunky(..), metamorph) where import Control.Applicative (Alternative(..)) import Control.Monad ((<=<)) import Data.Foldable (toList) import Data.Void import Hpp.Types (HasError(..), HasHppState(..), HasEnv(..)) -- * Streams of Steps -- | Basic pipe. data StreamStep r i o f = Await (i -> f) f | Yield !o f | Done (Maybe r) instance Functor (StreamStep r i o) where fmap f (Await g d) = Await (f . g) (f d) fmap f (Yield o n) = Yield o (f n) fmap _ (Done r) = Done r {-# INLINE fmap #-} -- | A stream of steps in a computational context. newtype Streamer m i o r = Streamer { runStream :: m (StreamStep r i o (Streamer m i o r)) } -- | A stream of steps that never awaits anything from upstream. type Source m o r = Streamer m Void o r -- | Package a step into a 'Streamer' encase :: Monad m => StreamStep r i o (Streamer m i o r) -> Streamer m i o r encase = Streamer . return {-# INLINE encase #-} instance Monad m => Functor (Streamer m i o) where fmap f (Streamer ma) = Streamer . flip fmap ma $ \case Await g d -> Await (fmap f . g) (fmap f d) Yield o n -> Yield o (fmap f n) Done r -> Done (fmap f r) {-# INLINE fmap #-} instance Monad m => Applicative (Streamer m i o) where pure = Streamer . return . Done . Just {-# INLINE pure #-} Streamer ma <*> g = Streamer $ ma >>= \case Await f d -> return $ Await ((<*> g) . f) (d <*> g) Yield o n -> return $ Yield o (n <*> g) Done r -> maybe (runStream empty) (runStream . flip fmap g) r {-# INLINE (<*>) #-} instance Monad m => Alternative (Streamer m r i) where empty = Streamer . return $ Done Nothing {-# INLINE empty #-} Streamer ma <|> b = Streamer . flip fmap ma $ \case Await g d -> Await ((<|> b) . g) (b <|> d) Yield o n -> Yield o (n <|> b) Done r -> Done r {-# INLINE (<|>) #-} {- -- This instance is quite often not really wanted due to associativity -- issues. instance Monad m => Monad (Streamer m r i) where return = pure {-# INLINE return #-} Streamer ma >>= fb = Streamer $ ma >>= \case Await f d -> return $ Await (\i -> f i >>= fb) (d >>= fb) Yield o n -> return $ Yield o (n >>= fb) Done r -> maybe (runStream empty) (runStream . fb) r {-# INLINE (>>=) #-} instance Monad m => MonadPlus (Streamer m r i) where mzero = empty mplus = (<|>) -} instance (Monad m, HasError m) => HasError (Streamer m i o) where throwError = liftS . throwError {-# INLINE throwError #-} instance (Monad m, HasHppState m) => HasHppState (Streamer m i o) where getState = liftS getState {-# INLINE getState #-} setState = liftS . setState {-# INLINE setState #-} instance (Monad m, HasEnv m) => HasEnv (Streamer m i o) where getEnv = liftS getEnv {-# INLINE getEnv #-} setEnv = liftS . setEnv {-# INLINE setEnv #-} -- * Builders -- | Yield a value downstream, then finish. yield :: Monad m => o -> Streamer m i o () yield o = encase $ Yield o (done ()) {-# INLINE yield #-} -- | Yield a value then continue with another 'Streamer'. yields :: Monad m => o -> Streamer m i o r -> Streamer m i o r yields = (encase .) . Yield {-# INLINE yields #-} -- | Package a function that returns a 'Streamer' into a 'Streamer'. awaits :: Monad m => (i -> Streamer m i o r) -> Streamer m i o r awaits f = encase $ Await f empty {-# INLINE awaits #-} -- | The end of a stream. done :: Monad m => r -> Streamer m i o r done = pure {-# INLINE done #-} -- | Feed values downstream. source :: (Monad m, Foldable f) => f a -> Streamer m i a () source = go . toList where go [] = done () go (x:xs) = encase $ Yield x (go xs) {-# INLINE source #-} -- | Lift a monadic value into a 'Streamer' liftS :: Functor m => m a -> Streamer m i o a liftS = Streamer . fmap (Done . Just) {-# INLINE liftS #-} -- * Runners -- | A source whose outputs have all been sunk may be run for its -- effects and return value. run :: Monad m => Source m Void r -> m (Maybe r) run (Streamer s) = s >>= go where go (Done r) = return r go (Await _ _) = error "Source is awaiting in exhaustStreamer" go (Yield _ _) = error "A capped sink is yielding in exhaustStreamer" -- | Compute the next step of a 'Streamer'. nextOutput :: Monad m => Streamer m i o r -> m (Either (Maybe r) (o, Streamer m i o r)) nextOutput s = runStream s >>= go where go (Await _ n) = runStream n >>= go go (Yield o n) = return (Right (o, n)) go (Done r) = return (Left r) -- * Combinators -- | Map a function over the values yielded by a stream. mapStream :: Monad m => (a -> b) -> Streamer m i a r -> Streamer m i b r mapStream f = go where go (Streamer s) = Streamer $ s >>= \case Done r -> pure $ Done r Await g e -> pure $ Await (go . g) (go e) Yield o n -> pure $ Yield (f o) (go n) {-# INLINE[1] mapStream #-} {-# RULES "hpp: mapStream/mapStream" forall f g s. mapStream f (mapStream g s) = mapStream (f . g) s #-} -- | @upstream ~> downstream@ composes two streams such that values -- flow from upstream to downstream. (~>) :: Monad m => Streamer m a b r -> Streamer m b c r' -> Streamer m a c r' src0 ~> Streamer mb = Streamer $ mb >>= goSnk src0 where goSnk _ (Done r) = return $ Done r goSnk src (Yield o n) = return $ Yield o (Streamer $ runStream n >>= goSnk src) goSnk src (Await f e) = runStream src >>= goSrc f e goSrc _ e (Done _) = runStream e >>= goSnk empty goSrc k _ (Yield i n) = runStream (k i) >>= goSnk n goSrc k e (Await f' e') = return $ Await (\i -> Streamer $ runStream (f' i) >>= goSrc k e) (e' ~> e) {-# INLINE[1] (~>) #-} infixl 9 ~> _feedStreamer :: Monad m => Streamer m i o r -> [i] -> m ([i], [o], Maybe r) _feedStreamer s xs0 = runStream s >>= aux xs0 id where aux [] acc (Await _ d) = runStream d >>= aux [] acc aux (x:xs) acc (Await f _) = runStream (f x) >>= aux xs acc aux xs acc (Yield o n) = runStream n >>= aux xs (acc . (o:)) aux xs acc (Done r) = return (xs, acc [], r) -- | @x `before` y@ runs @x@ to completion, discards its 'Done' value, -- then becomes @y@. before :: Monad m => Streamer m i o q -> Streamer m i o r -> Streamer m i o r before (Streamer ma) mb = Streamer $ ma >>= go where go (Await f d) = return $ Await (\i -> Streamer $ runStream (f i) >>= go) (Streamer $ runStream d >>= go) go (Yield o n) = return $ Yield o (Streamer $ runStream n >>= go) go (Done _) = runStream mb -- | Apply a function to the ending value of a stream. onDone :: Monad m => (Maybe r -> Maybe r') -> Streamer m i o r -> Streamer m i o r' onDone f (Streamer m) = Streamer $ m >>= go where go (Done r) = return $ Done (f r) go (Yield o n) = return $ Yield o (Streamer $ runStream n >>= go) go (Await f' e) = return $ Await (\i -> Streamer $ runStream (f' i) >>= go) (Streamer $ runStream e >>= go) {-# INLINE[1] onDone #-} {-# RULES "hpp: onDone/onDone" forall g f s. onDone g (onDone f s) = onDone (g . f) s #-} -- | Apply a function to each value in a stream. mapping :: Monad m => (a -> b) -> Streamer m a b r mapping f = go where go = awaits (\i -> yields (f i) go) {-# INLINABLE go #-} {-# INLINE[1] mapping #-} {-# RULES "hpp: mapping/mapping" forall f g. mapping f ~> mapping g = mapping (g . f) #-} -- | Discard all values that do not satisfy a predicate. filtering :: Monad m => (a -> Bool) -> Streamer m a a r filtering p = go where go = encase $ Await aux empty aux x = if p x then encase $ Yield x go else go {-# INLINE[1] filtering #-} -- | A combined filter and map. mappingMaybe :: Monad m => (a -> Maybe b) -> Streamer m a b r mappingMaybe f = go where go = awaits (\i -> maybe go (flip yields go) $ f i) {-# INLINE[1] mappingMaybe #-} predicateMap :: (a -> Bool) -> (a -> b) -> a -> Maybe b predicateMap p f = \x -> if p x then Just (f x) else Nothing {-# INLINE[1] predicateMap #-} maybeNot :: (a -> Bool) -> Maybe a -> Maybe a maybeNot p = \x -> case x of Nothing -> Nothing Just x' -> if p x' then x else Nothing {-# INLINE[1] maybeNot #-} {-# RULES "hpp: mapping ~> filtering" forall f p. mapping f ~> filtering p = mappingMaybe (maybeNot p . Just . f) ; "hpp: filtering ~> mapping" forall p f. filtering p ~> mapping f = mappingMaybe (predicateMap p f) ; "hpp: mappingMaybe ~> mappingMaybe" forall f g. mappingMaybe f ~> mappingMaybe g = mappingMaybe (f <=< g) #-} -- | @processPrefix src snk@ is like '~>' except that when @snk@ -- finishes, the composite 'Streamer' becomes the remaining @src@. processPrefix :: Monad m => Source m o r -> Streamer m o o r' -> Source m o r processPrefix src0 snk = Streamer $ runStream snk >>= goSnk src0 where goSnk src (Done _) = runStream src goSnk src (Yield o n) = return $ Yield o (Streamer $ runStream n >>= goSnk src) goSnk src (Await f _) = runStream src >>= goSrc f goSrc _ d@(Done _) = return d goSrc k (Yield i n) = runStream (k i) >>= goSnk n goSrc k (Await f e) = return $ Await (\i -> Streamer $ runStream (f i) >>= goSrc k) (Streamer $ runStream e >>= goSrc k) -- * Zoom support -- | This is a left fold over a 'Streamer' with a final step to deal -- with leftovers represented by whatever state the fold function -- maintains. common :: Monad m => (s -> Maybe (Streamer m i o ())) -> ((s -> Streamer m i o r -> Streamer m i' o' (Streamer m i o r)) -> s -> Streamer m i o r -> Streamer m i' o' (Streamer m i o r)) -> s -> Streamer m i o r -> Streamer m i' o' (Streamer m i o r) common fin nxt = go where go acc src = encase $ Await (const (fin' acc src)) (nxt go acc src) fin' acc src = done . maybe src (`before` src) $ fin acc {-# INLINABLE common #-} -- | Flatten out chunks of inputs into individual values. The returned -- 'Source' smuggles the remaining original 'Source' in an 'Await' -- constructor, while the flattened source continues on with the -- \"empty\" part of the 'Await' step. The upshot is that the value -- may be used a regular 'Source', but it can also be swapped back -- into the original 'Source'. flattenTil :: Monad m => Source m [i] r -> Source m i (Source m [i] r) flattenTil = common fin go [] where fin acc = if null acc then Nothing else Just (yield acc) go k [] src = Streamer $ nextOutput src >>= \case Left r -> return $ Done (Just (encase (Done r))) -- Right (o,n) -> runStream $ go k o n Right (o,n) -> runStream $ go k o n go k (x:xs) src = encase $ Yield x (k xs src) {-# INLINABLE flattenTil #-} -- | See 'flattenTil' for an explanation. mapTil :: Monad m => (a -> b) -> Streamer m Void a r -> Streamer m Void b (Streamer m Void a r) mapTil f = common (const Nothing) go () where go k () src = Streamer $ nextOutput src >>= \case Left r -> return $ Done (Just (encase (Done r))) Right (o, n) -> return $ Yield (f o) (k () n) {-# INLINABLE mapTil #-} -- * Chunky metamorphisms -- | A function that produces an output stream that finishes with -- another such function. Think of the input to this function as -- coming from upstream, while the closure of the streamed output may -- be used to thread state. newtype Chunky m a b = Chunky (a -> Source m b (Chunky m a b)) -- | Apply a function to a 'Chunky''s output. chunkMap :: Monad m => (b -> c) -> Chunky m a b -> Chunky m a c chunkMap g (Chunky f) = Chunky (onDone (fmap (chunkMap g)) . mapStream g . f) {-# INLINE[1] chunkMap #-} {-# RULES "hpp: chunkMap/chunkMap" forall f g c. chunkMap f (chunkMap g c) = chunkMap (f . g) c #-} -- | Apply a function to a 'Chunky's input. chunkConmap :: Monad m => (a -> b) -> Chunky m b c -> Chunky m a c chunkConmap g (Chunky f) = Chunky (onDone (fmap (chunkConmap g)) . f . g) {-# INLINE[1] chunkConmap #-} {-# RULES "hpp: chunkConmap/chunkConmap" forall f g c. chunkConmap f (chunkConmap g c) = chunkConmap (f . g) c #-} -- | This is something like a composition of an unfold with a fold. We -- fold the upstream values into some state carried by a 'Chunky', -- then unfold that state in the 'Chunky''s output stream. metamorph :: Monad m => Chunky m a b -> Streamer m a b () metamorph (Chunky f) = go where go = awaits (Streamer . aux . f) aux s = runStream s >>= \case Done (Just k) -> runStream $ metamorph k Done Nothing -> return $ Done Nothing Await _ _ -> error "Sources shouldn't await" Yield o n -> return $ Yield o (Streamer $ aux n) {- INLINABLE metamorph #-} -- inlining metamorph trips a bug in GHC-7.10.2 {-# RULES "hpp: metamorph/mapping" forall c f. metamorph c ~> mapping f = metamorph (chunkMap f c) ; "hpp: mapping/metamorph" forall c f. mapping f ~> metamorph c = metamorph (chunkConmap f c) #-}