{-# LANGUAGE RankNTypes #-} {-| The pull-to-push transformations in this module require functions that are polymorphic over a monad transformer. Because of this, some of the type signatures look scary, but actually many (suitably polymorphic) operations on 'Stream's will unify with them. To get "interruptible" operations that can exit early with an error, put a 'ExceptT' transformer just below the polymorphic monad transformer. See 'foldE'. Inspired by http://pchiusano.blogspot.com.es/2011/12/programmatic-translation-to-iteratees.html -} module Streaming.Eversion ( -- * Evertible Stream folds Evertible , evertible , evert , EvertibleM , evertibleM , evertM , EvertibleMIO , evertibleMIO , evertMIO -- * Transvertible Stream transformations , Transvertible , transvertible , transvert , TransvertibleM , transvertibleM , transvertM , TransvertibleMIO , transvertibleMIO , transvertMIO -- * Auxiliary functions , foldE ) where import Data.Bifunctor import Data.Profunctor import Control.Foldl (Fold(..),FoldM(..)) import qualified Control.Foldl as Foldl import Streaming (Stream,Of(..)) import Streaming.Prelude (yield,next) import qualified Streaming.Prelude as S import Control.Monad.IO.Class import Control.Monad.Trans.Class import Control.Monad.Free import qualified Control.Monad.Trans.Free as TF import Control.Monad.Trans.Except import Control.Comonad {- $setup >>> import Data.Functor.Identity >>> import Control.Monad.Trans.Except >>> import Control.Monad.Trans.Identity >>> import Control.Foldl (Fold(..),FoldM(..)) >>> import qualified Control.Foldl as L >>> import Streaming (Stream,Of(..)) >>> import Streaming.Prelude (yield,next) >>> import qualified Streaming.Prelude as S -} ----------------------------------------------------------------------------------------- data Feed a = Input a | EOF -- What type could go here for efficiency? type Iteratee a = Free ((->) a) evertedStream :: forall a. Stream (Of a) (Iteratee (Feed a)) () evertedStream = do r <- lift (liftF id) case r of Input a -> do yield a evertedStream EOF -> return () type IterateeT a m = TF.FreeT ((->) a) m evertedStreamM :: forall a m. Monad m => Stream (Of a) (IterateeT (Feed a) m) () evertedStreamM = do r <- lift (TF.liftF id) case r of Input a -> do yield a evertedStreamM EOF -> return () ----------------------------------------------------------------------------------------- -- | A stream-consuming function that can be turned into a pure, push-based fold. newtype Evertible a x = Evertible (forall m r. Monad m => Stream (Of a) m r -> m (Of x r)) instance Functor (Evertible a) where fmap f (Evertible somefold) = Evertible (fmap (first f) . somefold) instance Profunctor Evertible where lmap f (Evertible somefold) = Evertible (somefold . S.map f) rmap = fmap stoppedBeforeEOF :: String stoppedBeforeEOF = "Stopped before receiving EOF." continuedAfterEOF :: String continuedAfterEOF = "Continued after receiving EOF." evertible :: (forall m r. Monad m => Stream (Of a) m r -> m (Of x r)) -> Evertible a x evertible = Evertible evert :: Evertible a x -> Fold a x evert (Evertible consumer) = Fold step begin done where begin = consumer evertedStream step s a = case s of Pure _ -> error stoppedBeforeEOF Free f -> f (Input a) done s = case s of Pure _ -> error stoppedBeforeEOF Free f -> case f EOF of Pure (a :> ()) -> a Free _ -> error continuedAfterEOF {- | Like 'Evertible', but gives the stream-consuming function access to a base monad. >>> :{ let f stream = fmap ((:>) ()) (lift (putStrLn "x") >> S.effects stream) in L.foldM (evertM (evertibleM f)) ["a","b","c"] :} x Note however that control operations can't be lifted through the transformer. -} newtype EvertibleM m a x = EvertibleM (forall t r. (MonadTrans t, Monad (t m)) => Stream (Of a) (t m) r -> t m (Of x r)) instance Functor (EvertibleM m a) where fmap f (EvertibleM somefold) = EvertibleM (fmap (first f) . somefold) instance Profunctor (EvertibleM m) where lmap f (EvertibleM somefold) = EvertibleM (somefold . S.map f) rmap = fmap evertibleM ::(forall t r . (MonadTrans t, Monad (t m)) => Stream (Of a) (t m) r -> t m (Of x r)) -- ^ -> EvertibleM m a x evertibleM = EvertibleM evertM :: Monad m => EvertibleM m a x -> FoldM m a x evertM (EvertibleM consumer) = FoldM step begin done where begin = return (consumer evertedStreamM) step (TF.FreeT ms) i = do s <- ms case s of TF.Pure _ -> error stoppedBeforeEOF TF.Free f -> return (f (Input i)) done (TF.FreeT ms) = do s <- ms case s of TF.Pure _ -> error stoppedBeforeEOF TF.Free f -> do let TF.FreeT ms' = f EOF s' <- ms' case s' of TF.Pure (a :> ()) -> return a TF.Free _ -> error continuedAfterEOF {-| Like 'EvertibleM', but gives the stream-consuming function the ability to use 'liftIO'. >>> L.foldM (evertMIO (evertibleMIO (\stream -> fmap ((:>) ()) (S.print stream)))) ["a","b","c"] "a" "b" "c" -} newtype EvertibleMIO m a x = EvertibleMIO (forall t r. (MonadTrans t, MonadIO (t m)) => Stream (Of a) (t m) r -> t m (Of x r)) instance Functor (EvertibleMIO m a) where fmap f (EvertibleMIO somefold) = EvertibleMIO (fmap (first f) . somefold) instance Profunctor (EvertibleMIO m) where lmap f (EvertibleMIO somefold) = EvertibleMIO (somefold . S.map f) rmap = fmap evertibleMIO ::(forall t r . (MonadTrans t, MonadIO (t m)) => Stream (Of a) (t m) r -> t m (Of x r)) -- ^ -> EvertibleMIO m a x evertibleMIO = EvertibleMIO evertMIO :: MonadIO m => EvertibleMIO m a x -> FoldM m a x evertMIO (EvertibleMIO consumer) = FoldM step begin done where begin = return (consumer evertedStreamM) step (TF.FreeT ms) i = do s <- ms case s of TF.Pure _ -> error stoppedBeforeEOF TF.Free f -> return (f (Input i)) done (TF.FreeT ms) = do s <- ms case s of TF.Pure _ -> error stoppedBeforeEOF TF.Free f -> do let TF.FreeT ms' = f EOF s' <- ms' case s' of TF.Pure (a :> ()) -> return a TF.Free _ -> error continuedAfterEOF -- | A stream-transforming function that can be turned into fold-transforming function. newtype Transvertible a b = Transvertible (forall m r. Monad m => Stream (Of a) m r -> Stream (Of b) m r) instance Functor (Transvertible a) where fmap f (Transvertible transducer) = Transvertible (S.map f . transducer) instance Profunctor Transvertible where lmap f (Transvertible somefold) = Transvertible (somefold . S.map f) rmap = fmap data Pair a b = Pair !a !b data StreamState a b = Pristine (Stream (Of b) (Iteratee (Feed a)) ()) | Waiting (Feed a -> Iteratee (Feed a) (Either () (b, Stream (Of b) (Iteratee (Feed a)) ()))) transvertible :: (forall m r. Monad m => Stream (Of a) m r -> Stream (Of b) m r) -- ^ -> Transvertible a b transvertible = Transvertible transvert :: Transvertible b a -> (forall x. Fold a x -> Fold b x) transvert (Transvertible transducer) somefold = Fold step begin done where begin = Pair somefold (Pristine (transducer evertedStream)) step (Pair innerfold (Pristine pristine)) i = step (advance innerfold pristine) i step (Pair innerfold (Waiting waiting)) i = case waiting (Input i) of Pure (Left ()) -> error stoppedBeforeEOF Pure (Right (a, stream)) -> advance (Foldl.fold (duplicate innerfold) [a]) stream Free f -> Pair innerfold (Waiting f) advance innerfold stream = case next stream of Pure (Left ()) -> error stoppedBeforeEOF Pure (Right (a,future)) -> advance (Foldl.fold (duplicate innerfold) [a]) future Free f -> Pair innerfold (Waiting f) done (Pair innerfold (Pristine pristine)) = done (advance innerfold pristine) done (Pair innerfold (Waiting waiting)) = case waiting EOF of Pure (Left ()) -> extract innerfold Pure (Right (a, stream)) -> extract (advancefinal (Foldl.fold (duplicate innerfold) [a]) stream) Free _ -> error continuedAfterEOF advancefinal innerfold stream = case next stream of Pure (Left ()) -> innerfold Pure (Right (a,future)) -> advancefinal (Foldl.fold (duplicate innerfold) [a]) future Free _ -> error continuedAfterEOF data StreamStateM m a b = PristineM (Stream (Of b) (IterateeT (Feed a) m) ()) | WaitingM (Feed a -> IterateeT (Feed a) m (Either () (b, Stream (Of b) (IterateeT (Feed a) m) ()))) -- | Like 'Transvertible', but gives the stream-transforming function access to a base monad. -- -- Note however that control operations can't be lifted through the transformer. -- newtype TransvertibleM m a b = TransvertibleM (forall t r. (MonadTrans t, Monad (t m)) => Stream (Of a) (t m) r -> Stream (Of b) (t m) r) transvertibleM :: (forall t r. (MonadTrans t, Monad (t m)) => Stream (Of a) (t m) r -> Stream (Of b) (t m) r) -- ^ -> TransvertibleM m a b transvertibleM = TransvertibleM instance Functor (TransvertibleM m a) where fmap f (TransvertibleM transducer) = TransvertibleM (S.map f . transducer) instance Profunctor (TransvertibleM m) where lmap f (TransvertibleM somefold) = TransvertibleM (somefold . S.map f) rmap = fmap transvertM :: Monad m => TransvertibleM m b a -> (forall x . FoldM m a x -> FoldM m b x) transvertM (TransvertibleM transducer) somefold = FoldM step begin done where begin = return (Pair somefold (PristineM (transducer evertedStreamM))) step (Pair innerfold (PristineM pristine)) i = do s <- advance innerfold pristine step s i step (Pair innerfold (WaitingM waiting)) i = do s <- TF.runFreeT (waiting (Input i)) case s of TF.Pure (Left ()) -> error stoppedBeforeEOF TF.Pure (Right (a, nexx)) -> do step1 <- Foldl.foldM (Foldl.duplicateM innerfold) [a] advance step1 nexx TF.Free f -> return (Pair innerfold (WaitingM f)) advance innerfold stream = do r <- TF.runFreeT (next stream) case r of TF.Pure (Left ()) -> error stoppedBeforeEOF TF.Pure (Right (a,future)) -> do step1 <- Foldl.foldM (Foldl.duplicateM innerfold) [a] advance step1 future TF.Free f -> return (Pair innerfold (WaitingM f)) done (Pair innerfold (PristineM pristine)) = do s <- advance innerfold pristine done s done (Pair innerfold (WaitingM waiting)) = do s <- TF.runFreeT (waiting EOF) case s of TF.Pure (Left ()) -> do Foldl.foldM innerfold [] TF.Pure (Right (a,future)) -> do step1 <- Foldl.foldM (Foldl.duplicateM innerfold) [a] r <- advancefinal step1 future Foldl.foldM r [] TF.Free _ -> error continuedAfterEOF advancefinal innerfold stream = do r <- TF.runFreeT (next stream) case r of TF.Pure (Right (a,future)) -> do step1 <- Foldl.foldM (Foldl.duplicateM innerfold) [a] advancefinal step1 future TF.Pure (Left ()) -> return innerfold TF.Free _ -> error continuedAfterEOF -- | Like 'TransvertibleM', but gives the stream-consuming function the ability to use 'liftIO'. -- newtype TransvertibleMIO m a b = TransvertibleMIO (forall t r. (MonadTrans t, MonadIO (t m)) => Stream (Of a) (t m) r -> Stream (Of b) (t m) r) instance Functor (TransvertibleMIO m a) where fmap f (TransvertibleMIO transducer) = TransvertibleMIO (S.map f . transducer) instance Profunctor (TransvertibleMIO m) where lmap f (TransvertibleMIO somefold) = TransvertibleMIO (somefold . S.map f) rmap = fmap transvertibleMIO :: (forall t r. (MonadTrans t, MonadIO (t m)) => Stream (Of a) (t m) r -> Stream (Of b) (t m) r) -- ^ -> TransvertibleMIO m a b transvertibleMIO = TransvertibleMIO transvertMIO :: (MonadIO m) => TransvertibleMIO m b a -> (forall x . FoldM m a x -> FoldM m b x) transvertMIO (TransvertibleMIO transducer) somefold = FoldM step begin done where begin = return (Pair somefold (PristineM (transducer evertedStreamM))) step (Pair innerfold (PristineM pristine)) i = do s <- advance innerfold pristine step s i step (Pair innerfold (WaitingM waiting)) i = do s <- TF.runFreeT (waiting (Input i)) case s of TF.Pure (Left ()) -> error stoppedBeforeEOF TF.Pure (Right (a, nexx)) -> do step1 <- Foldl.foldM (Foldl.duplicateM innerfold) [a] advance step1 nexx TF.Free f -> return (Pair innerfold (WaitingM f)) advance innerfold stream = do r <- TF.runFreeT (next stream) case r of TF.Pure (Left ()) -> error stoppedBeforeEOF TF.Pure (Right (a,future)) -> do step1 <- Foldl.foldM (Foldl.duplicateM innerfold) [a] advance step1 future TF.Free f -> return (Pair innerfold (WaitingM f)) done (Pair innerfold (PristineM pristine)) = do s <- advance innerfold pristine done s done (Pair innerfold (WaitingM waiting)) = do s <- TF.runFreeT (waiting EOF) case s of TF.Pure (Left ()) -> do Foldl.foldM innerfold [] TF.Pure (Right (a,future)) -> do step1 <- Foldl.foldM (Foldl.duplicateM innerfold) [a] r <- advancefinal step1 future Foldl.foldM r [] TF.Free _ -> error continuedAfterEOF advancefinal innerfold stream = do r <- TF.runFreeT (next stream) case r of TF.Pure (Right (a,future)) -> do step1 <- Foldl.foldM (Foldl.duplicateM innerfold) [a] advancefinal step1 future TF.Pure (Left ()) -> return innerfold TF.Free _ -> error continuedAfterEOF {-| If your stream-folding computation can fail early returning a 'Left', compose it with this function before passing it to 'evertibleM'. The result will be an 'EvertibleM' that works on 'ExceptT'. >>> runExceptT $ L.foldM (evertM (evertibleM (foldE . (\_ -> return (Left ()))))) [1..10] Left () -} foldE :: (MonadTrans t, Monad m, Monad (t (ExceptT e m))) => t (ExceptT e m) (Either e r) -- ^ -> t (ExceptT e m) r foldE action = action >>= lift . ExceptT . return