{-# LANGUAGE RankNTypes #-}


{-| The pull-to-push transformations in this module require functions that are 
    polymorphic over a monad transformer.  
    
    Because of this, some of the type signatures look scary, but actually many
    (suitably polymorphic) operations on 'Stream's will unify with them.
   
    To get "interruptible" operations that can exit early with an error, put a
    'ExceptT' transformer just below the polymorphic monad transformer. See
    'foldE'.
   
    Inspired by http://pchiusano.blogspot.com.es/2011/12/programmatic-translation-to-iteratees.html
-}

module Streaming.Eversion (
        -- * Evertible Stream folds
        Evertible
    ,   evertible
    ,   evert
    ,   EvertibleM
    ,   evertibleM
    ,   evertM
    ,   EvertibleMIO
    ,   evertibleMIO
    ,   evertMIO
        -- * Transvertible Stream transformations
    ,   Transvertible
    ,   transvertible
    ,   transvert
    ,   TransvertibleM
    ,   transvertibleM
    ,   transvertM
    ,   TransvertibleMIO
    ,   transvertibleMIO
    ,   transvertMIO
        -- * Auxiliary functions
    ,   foldE
    ) where

import           Data.Bifunctor
import           Data.Profunctor

import           Control.Foldl (Fold(..),FoldM(..))
import qualified Control.Foldl as Foldl
import           Streaming (Stream,Of(..))
import           Streaming.Prelude (yield,next)
import qualified Streaming.Prelude as S

import           Control.Monad.IO.Class
import           Control.Monad.Trans.Class
import           Control.Monad.Free
import qualified Control.Monad.Trans.Free as TF
import           Control.Monad.Trans.Except
import           Control.Comonad

{- $setup
>>> import           Data.Functor.Identity
>>> import           Control.Monad.Trans.Except
>>> import           Control.Monad.Trans.Identity
>>> import           Control.Foldl (Fold(..),FoldM(..))
>>> import qualified Control.Foldl as L
>>> import           Streaming (Stream,Of(..))
>>> import           Streaming.Prelude (yield,next)
>>> import qualified Streaming.Prelude as S
-}

-----------------------------------------------------------------------------------------

data Feed a = Input a | EOF

-- What type could go here for efficiency?
type Iteratee a = Free ((->) a)  

evertedStream :: forall a. Stream (Of a) (Iteratee (Feed a)) ()
evertedStream = do
    r <- lift (liftF id)
    case r of
        Input a -> do
            yield a
            evertedStream
        EOF -> return ()

type IterateeT a m = TF.FreeT ((->) a) m 

evertedStreamM :: forall a m. Monad m => Stream (Of a) (IterateeT (Feed a) m) ()
evertedStreamM = do
    r <- lift (TF.liftF id)
    case r of
        Input a -> do
            yield a
            evertedStreamM
        EOF -> return ()

-----------------------------------------------------------------------------------------

-- | A stream-consuming function that can be turned into a pure, push-based fold. 
newtype Evertible a x = 
        Evertible (forall m r. Monad m => Stream (Of a) m r -> m (Of x r))  

instance Functor (Evertible a) where
    fmap f (Evertible somefold) = Evertible (fmap (first f) . somefold) 

instance Profunctor Evertible where
    lmap f (Evertible somefold) = Evertible (somefold . S.map f)
    rmap = fmap

stoppedBeforeEOF :: String
stoppedBeforeEOF = "Stopped before receiving EOF."

continuedAfterEOF :: String
continuedAfterEOF = "Continued after receiving EOF."

evertible :: (forall m r. Monad m => Stream (Of a) m r -> m (Of x r)) -> Evertible a x
evertible = Evertible

evert :: Evertible a x -> Fold a x
evert (Evertible consumer) = Fold step begin done
    where
    begin = consumer evertedStream
    step s a = case s of
        Pure _ -> error stoppedBeforeEOF
        Free f -> f (Input a)
    done s = case s of
        Pure _ -> error stoppedBeforeEOF
        Free f -> case f EOF of
            Pure (a :> ()) -> a
            Free _ -> error continuedAfterEOF


{- | Like 'Evertible', but gives the stream-consuming function access to a base monad.
   
>>> :{
    let f stream = fmap ((:>) ()) (lift (putStrLn "x") >> S.effects stream)
    in  L.foldM (evertM (evertibleM f)) ["a","b","c"]
    :}
x

    Note however that control operations can't be lifted through the transformer.
-}
newtype EvertibleM m a x = 
        EvertibleM (forall t r. (MonadTrans t, Monad (t m)) => Stream (Of a) (t m) r -> t m (Of x r)) 

instance Functor (EvertibleM m a) where
    fmap f (EvertibleM somefold) = EvertibleM (fmap (first f) . somefold) 

instance Profunctor (EvertibleM m) where
    lmap f (EvertibleM somefold) = EvertibleM (somefold . S.map f)
    rmap = fmap

evertibleM ::(forall t r . (MonadTrans t, Monad (t m)) => Stream (Of a) (t m) r -> t m (Of x r)) -- ^
            -> EvertibleM m a x
evertibleM = EvertibleM                                                      

evertM :: Monad m => EvertibleM m a x -> FoldM m a x
evertM (EvertibleM consumer) = FoldM step begin done
    where
    begin = return (consumer evertedStreamM)
    step (TF.FreeT ms) i = do
        s <- ms
        case s of
            TF.Pure _ -> error stoppedBeforeEOF
            TF.Free f -> return (f (Input i))
    done (TF.FreeT ms) = do
        s <- ms
        case s of 
            TF.Pure _ -> error stoppedBeforeEOF
            TF.Free f -> do
                let TF.FreeT ms' = f EOF
                s' <- ms'
                case s' of
                    TF.Pure (a :> ()) -> return a
                    TF.Free _ -> error continuedAfterEOF

{-| Like 'EvertibleM', but gives the stream-consuming function the ability to use 'liftIO'.
 
>>> L.foldM (evertMIO (evertibleMIO (\stream -> fmap ((:>) ()) (S.print stream)))) ["a","b","c"]
"a"
"b"
"c"

-}
newtype EvertibleMIO m a x = 
        EvertibleMIO (forall t r. (MonadTrans t, MonadIO (t m)) => Stream (Of a) (t m) r -> t m (Of x r)) 

instance Functor (EvertibleMIO m a) where
    fmap f (EvertibleMIO somefold) = EvertibleMIO (fmap (first f) . somefold) 

instance Profunctor (EvertibleMIO m) where
    lmap f (EvertibleMIO somefold) = EvertibleMIO (somefold . S.map f)
    rmap = fmap

evertibleMIO ::(forall t r . (MonadTrans t, MonadIO (t m)) => Stream (Of a) (t m) r -> t m (Of x r)) -- ^
            -> EvertibleMIO m a x
evertibleMIO = EvertibleMIO                                                      

evertMIO :: MonadIO m => EvertibleMIO m a x -> FoldM m a x 
evertMIO (EvertibleMIO consumer) = FoldM step begin done
    where
    begin = return (consumer evertedStreamM)
    step (TF.FreeT ms) i = do
        s <- ms
        case s of
            TF.Pure _ -> error stoppedBeforeEOF
            TF.Free f -> return (f (Input i))
    done (TF.FreeT ms) = do
        s <- ms
        case s of 
            TF.Pure _ -> error stoppedBeforeEOF
            TF.Free f -> do
                let TF.FreeT ms' = f EOF
                s' <- ms'
                case s' of
                    TF.Pure (a :> ()) -> return a
                    TF.Free _ -> error continuedAfterEOF

-- | A stream-transforming function that can be turned into fold-transforming function.
newtype Transvertible a b = 
        Transvertible (forall m r. Monad m => Stream (Of a) m r -> Stream (Of b) m r)

instance Functor (Transvertible a) where
    fmap f (Transvertible transducer) = Transvertible (S.map f . transducer) 

instance Profunctor Transvertible where
    lmap f (Transvertible somefold) = Transvertible (somefold . S.map f)
    rmap = fmap

data Pair a b = Pair !a !b

data StreamState a b = Pristine (Stream (Of b) (Iteratee (Feed a)) ())
                     | Waiting  (Feed a -> Iteratee (Feed a) (Either () (b, Stream (Of b) (Iteratee (Feed a)) ())))


transvertible :: (forall m r. Monad m => Stream (Of a) m r -> Stream (Of b) m r) -- ^
                 -> Transvertible a b
transvertible = Transvertible

transvert :: Transvertible b a 
          -> (forall x. Fold a x -> Fold b x)
transvert (Transvertible transducer) somefold = Fold step begin done
    where
    begin = Pair somefold (Pristine (transducer evertedStream))
    step (Pair innerfold (Pristine pristine)) i = step (advance innerfold pristine) i
    step (Pair innerfold (Waiting waiting)) i = 
        case waiting (Input i) of
            Pure (Left ()) -> error stoppedBeforeEOF
            Pure (Right (a, stream)) -> advance (Foldl.fold (duplicate innerfold) [a]) stream
            Free f -> Pair innerfold (Waiting f)
    advance innerfold stream =  
        case next stream of
            Pure (Left ()) -> error stoppedBeforeEOF
            Pure (Right (a,future)) -> advance (Foldl.fold (duplicate innerfold) [a]) future
            Free f -> Pair innerfold (Waiting f)
    done (Pair innerfold (Pristine pristine)) = done (advance innerfold pristine) 
    done (Pair innerfold (Waiting waiting)) =
        case waiting EOF of
            Pure (Left ()) -> extract innerfold
            Pure (Right (a, stream)) -> extract (advancefinal (Foldl.fold (duplicate innerfold) [a]) stream)
            Free _ -> error continuedAfterEOF
    advancefinal innerfold stream =  
        case next stream of
            Pure (Left ()) -> innerfold 
            Pure (Right (a,future)) -> advancefinal (Foldl.fold (duplicate innerfold) [a]) future
            Free _ -> error continuedAfterEOF

data StreamStateM m a b = PristineM (Stream (Of b) (IterateeT (Feed a) m) ())
                        | WaitingM  (Feed a -> IterateeT (Feed a) m (Either () (b, Stream (Of b) (IterateeT (Feed a) m) ())))

-- | Like 'Transvertible', but gives the stream-transforming function access to a base monad.
--   
--   Note however that control operations can't be lifted through the transformer.
--
newtype TransvertibleM m a b = 
        TransvertibleM (forall t r. (MonadTrans t, Monad (t m)) => Stream (Of a) (t m) r -> Stream (Of b) (t m) r)

transvertibleM :: (forall t r. (MonadTrans t, Monad (t m)) => Stream (Of a) (t m) r -> Stream (Of b) (t m) r) -- ^
                  -> TransvertibleM m a b 
transvertibleM = TransvertibleM

instance Functor (TransvertibleM m a) where
    fmap f (TransvertibleM transducer) = TransvertibleM (S.map f . transducer) 

instance Profunctor (TransvertibleM m) where
    lmap f (TransvertibleM somefold) = TransvertibleM (somefold . S.map f)
    rmap = fmap

transvertM :: Monad m 
           => TransvertibleM m b a 
           -> (forall x . FoldM m a x -> FoldM m b x)
transvertM (TransvertibleM transducer) somefold = FoldM step begin done
    where
    begin = return (Pair somefold (PristineM (transducer evertedStreamM)))
    step (Pair innerfold (PristineM pristine)) i = do
        s <- advance innerfold pristine 
        step s i
    step (Pair innerfold (WaitingM waiting)) i = do 
        s <- TF.runFreeT (waiting (Input i))
        case s of
            TF.Pure (Left ()) -> error stoppedBeforeEOF
            TF.Pure (Right (a, nexx)) -> do
                step1 <- Foldl.foldM (Foldl.duplicateM innerfold) [a]
                advance step1 nexx 
            TF.Free f -> return (Pair innerfold (WaitingM f))
    advance innerfold stream = do 
        r <- TF.runFreeT (next stream) 
        case r of
            TF.Pure (Left ()) -> error stoppedBeforeEOF
            TF.Pure (Right (a,future)) -> do
                step1 <- Foldl.foldM (Foldl.duplicateM innerfold) [a]
                advance step1 future
            TF.Free f -> return (Pair innerfold (WaitingM f))
    done (Pair innerfold (PristineM pristine)) = do
        s <- advance innerfold pristine 
        done s
    done (Pair innerfold (WaitingM waiting)) = do
        s <- TF.runFreeT (waiting EOF)
        case s of
            TF.Pure (Left ()) -> do
                Foldl.foldM innerfold []
            TF.Pure (Right (a,future)) -> do
                step1 <- Foldl.foldM (Foldl.duplicateM innerfold) [a]
                r <- advancefinal step1 future
                Foldl.foldM r []
            TF.Free _ -> error continuedAfterEOF
    advancefinal innerfold stream = do
        r <- TF.runFreeT (next stream) 
        case r of
            TF.Pure (Right (a,future)) -> do
                step1 <- Foldl.foldM (Foldl.duplicateM innerfold) [a]
                advancefinal step1 future
            TF.Pure (Left ()) -> return innerfold
            TF.Free _ -> error continuedAfterEOF


-- | Like 'TransvertibleM', but gives the stream-consuming function the ability to use 'liftIO'.
--   
newtype TransvertibleMIO m a b = 
        TransvertibleMIO (forall t r. (MonadTrans t, MonadIO (t m)) => Stream (Of a) (t m) r -> Stream (Of b) (t m) r)

instance Functor (TransvertibleMIO m a) where
    fmap f (TransvertibleMIO transducer) = TransvertibleMIO (S.map f . transducer) 

instance Profunctor (TransvertibleMIO m) where
    lmap f (TransvertibleMIO somefold) = TransvertibleMIO (somefold . S.map f)
    rmap = fmap

transvertibleMIO :: (forall t r. (MonadTrans t, MonadIO (t m)) => Stream (Of a) (t m) r -> Stream (Of b) (t m) r) -- ^
                  -> TransvertibleMIO m a b 
transvertibleMIO = TransvertibleMIO

transvertMIO :: (MonadIO m) 
             => TransvertibleMIO m b a 
             -> (forall x . FoldM m a x -> FoldM m b x)

transvertMIO (TransvertibleMIO transducer) somefold = FoldM step begin done
    where
    begin = return (Pair somefold (PristineM (transducer evertedStreamM)))
    step (Pair innerfold (PristineM pristine)) i = do
        s <- advance innerfold pristine 
        step s i
    step (Pair innerfold (WaitingM waiting)) i = do 
        s <- TF.runFreeT (waiting (Input i))
        case s of
            TF.Pure (Left ()) -> error stoppedBeforeEOF
            TF.Pure (Right (a, nexx)) -> do
                step1 <- Foldl.foldM (Foldl.duplicateM innerfold) [a]
                advance step1 nexx 
            TF.Free f -> return (Pair innerfold (WaitingM f))
    advance innerfold stream = do 
        r <- TF.runFreeT (next stream) 
        case r of
            TF.Pure (Left ()) -> error stoppedBeforeEOF
            TF.Pure (Right (a,future)) -> do
                step1 <- Foldl.foldM (Foldl.duplicateM innerfold) [a]
                advance step1 future
            TF.Free f -> return (Pair innerfold (WaitingM f))
    done (Pair innerfold (PristineM pristine)) = do
        s <- advance innerfold pristine 
        done s
    done (Pair innerfold (WaitingM waiting)) = do
        s <- TF.runFreeT (waiting EOF)
        case s of
            TF.Pure (Left ()) -> do
                Foldl.foldM innerfold []
            TF.Pure (Right (a,future)) -> do
                step1 <- Foldl.foldM (Foldl.duplicateM innerfold) [a]
                r <- advancefinal step1 future
                Foldl.foldM r []
            TF.Free _ -> error continuedAfterEOF
    advancefinal innerfold stream = do
        r <- TF.runFreeT (next stream) 
        case r of
            TF.Pure (Right (a,future)) -> do
                step1 <- Foldl.foldM (Foldl.duplicateM innerfold) [a]
                advancefinal step1 future
            TF.Pure (Left ()) -> return innerfold
            TF.Free _ -> error continuedAfterEOF

{-| If your stream-folding computation can fail early returning a 'Left',
    compose it with this function before passing it to 'evertibleM'. 

    The result will be an 'EvertibleM' that works on 'ExceptT'.

>>> runExceptT $ L.foldM (evertM (evertibleM (foldE . (\_ -> return (Left ()))))) [1..10]
Left ()

-} 
foldE :: (MonadTrans t, Monad m, Monad (t (ExceptT e m))) 
        => t (ExceptT e m) (Either e r)  -- ^
        -> t (ExceptT e m) r
foldE action = action >>= lift . ExceptT . return