{-# LANGUAGE CPP #-}
{-# LANGUAGE ExistentialQuantification #-}
#include "inline.hs"
module Streamly.Internal.Data.Pipe.Types
( Step (..)
, Pipe (..)
, PipeState (..)
, zipWith
, tee
, map
, compose
)
where
import Control.Arrow (Arrow(..))
import Control.Category (Category(..))
import Data.Maybe (isJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Prelude hiding (zipWith, map, id, unzip, null)
import Streamly.Internal.Data.Strict (Tuple'(..), Tuple3'(..))
import qualified Prelude
data Step s a =
Yield a s
| Continue s
data PipeState s1 s2 = Consume s1 | Produce s2
isProduce :: PipeState s1 s2 -> Bool
isProduce s =
case s of
Produce _ -> True
Consume _ -> False
data Pipe m a b =
forall s1 s2. Pipe (s1 -> a -> m (Step (PipeState s1 s2) b))
(s2 -> m (Step (PipeState s1 s2) b)) s1
instance Monad m => Functor (Pipe m a) where
{-# INLINE_NORMAL fmap #-}
fmap f (Pipe consume produce initial) = Pipe consume' produce' initial
where
{-# INLINE_LATE consume' #-}
consume' st a = do
r <- consume st a
return $ case r of
Yield x s -> Yield (f x) s
Continue s -> Continue s
{-# INLINE_LATE produce' #-}
produce' st = do
r <- produce st
return $ case r of
Yield x s -> Yield (f x) s
Continue s -> Continue s
data Deque a = Deque [a] [a]
{-# INLINE null #-}
null :: Deque a -> Bool
null (Deque [] []) = True
null _ = False
{-# INLINE snoc #-}
snoc :: a -> Deque a -> Deque a
snoc a (Deque snocList consList) = Deque (a : snocList) consList
{-# INLINE uncons #-}
uncons :: Deque a -> Maybe (a, Deque a)
uncons (Deque snocList consList) =
case consList of
h : t -> Just (h, Deque snocList t)
_ ->
case Prelude.reverse snocList of
h : t -> Just (h, Deque [] t)
_ -> Nothing
{-# INLINE_NORMAL zipWith #-}
zipWith :: Monad m => (a -> b -> c) -> Pipe m i a -> Pipe m i b -> Pipe m i c
zipWith f (Pipe consumeL produceL stateL) (Pipe consumeR produceR stateR) =
Pipe consume produce state
where
state = Tuple' (Consume stateL, Nothing, Nothing)
(Consume stateR, Nothing, Nothing)
{-# INLINE_LATE consume #-}
consume (Tuple' (sL, resL, lq) (sR, resR, rq)) a = do
s1 <- drive sL resL lq consumeL produceL a
s2 <- drive sR resR rq consumeR produceR a
yieldOutput s1 s2
where
{-# INLINE drive #-}
drive st res queue fConsume fProduce val = do
case res of
Nothing -> goConsume st queue val fConsume fProduce
Just x -> return $
case queue of
Nothing -> (st, Just x, Just $ (Deque [val] []))
Just q -> (st, Just x, Just $ snoc val q)
{-# INLINE goConsume #-}
goConsume stt queue val fConsume stp2 = do
case stt of
Consume st -> do
case queue of
Nothing -> do
r <- fConsume st val
return $ case r of
Yield x s -> (s, Just x, Nothing)
Continue s -> (s, Nothing, Nothing)
Just queue' ->
case uncons queue' of
Just (v, q) -> do
r <- fConsume st v
let q' = snoc val q
return $ case r of
Yield x s -> (s, Just x, Just q')
Continue s -> (s, Nothing, Just q')
Nothing -> undefined
Produce st -> do
r <- stp2 st
return $ case r of
Yield x s -> (s, Just x, queue)
Continue s -> (s, Nothing, queue)
{-# INLINE_LATE produce #-}
produce (Tuple' (sL, resL, lq) (sR, resR, rq)) = do
s1 <- drive sL resL lq consumeL produceL
s2 <- drive sR resR rq consumeR produceR
yieldOutput s1 s2
where
{-# INLINE drive #-}
drive stt res q fConsume fProduce = do
case res of
Nothing -> goProduce stt q fConsume fProduce
Just x -> return (stt, Just x, q)
{-# INLINE goProduce #-}
goProduce stt queue fConsume fProduce = do
case stt of
Consume st -> do
case queue of
Nothing -> undefined
Just queue' ->
case uncons queue' of
Just (v, q) -> do
r <- fConsume st v
let q' = if null q
then Nothing
else Just q
return $ case r of
Yield x s -> (s, Just x, q')
Continue s -> (s, Nothing, q')
Nothing -> return (stt, Nothing, Nothing)
Produce st -> do
r <- fProduce st
return $ case r of
Yield x s -> (s, Just x, queue)
Continue s -> (s, Nothing, queue)
{-# INLINE yieldOutput #-}
yieldOutput s1@(sL', resL', lq') s2@(sR', resR', rq') = return $
if (isProduce sL' || isJust lq') && (isProduce sR' || isJust rq')
then
case (resL', resR') of
(Just xL, Just xR) ->
Yield (f xL xR) (Produce (Tuple' (clear s1) (clear s2)))
_ -> Continue (Produce (Tuple' s1 s2))
else
case (resL', resR') of
(Just xL, Just xR) ->
Yield (f xL xR) (Consume (Tuple' (clear s1) (clear s2)))
_ -> Continue (Consume (Tuple' s1 s2))
where clear (s, _, q) = (s, Nothing, q)
instance Monad m => Applicative (Pipe m a) where
{-# INLINE pure #-}
pure b = Pipe (\_ _ -> pure $ Yield b (Consume ())) undefined ()
(<*>) = zipWith id
{-# INLINE_NORMAL tee #-}
tee :: Monad m => Pipe m a b -> Pipe m a b -> Pipe m a b
tee (Pipe consumeL produceL stateL) (Pipe consumeR produceR stateR) =
Pipe consume produce state
where
state = Tuple' (Consume stateL) (Consume stateR)
consume (Tuple' sL sR) a = do
case sL of
Consume st -> do
r <- consumeL st a
return $ case r of
Yield x s -> Yield x (Produce (Tuple3' (Just a) s sR))
Continue s -> Continue (Produce (Tuple3' (Just a) s sR))
Produce _st -> undefined
produce (Tuple3' (Just a) sL sR) = do
case sL of
Consume _ -> do
case sR of
Consume st -> do
r <- consumeR st a
let nextL s = Consume (Tuple' sL s)
let nextR s = Produce (Tuple3' Nothing sL s)
return $ case r of
Yield x s@(Consume _) -> Yield x (nextL s)
Yield x s@(Produce _) -> Yield x (nextR s)
Continue s@(Consume _) -> Continue (nextL s)
Continue s@(Produce _) -> Continue (nextR s)
Produce _ -> undefined
Produce st -> do
r <- produceL st
let next s = Produce (Tuple3' (Just a) s sR)
return $ case r of
Yield x s -> Yield x (next s)
Continue s -> Continue (next s)
produce (Tuple3' Nothing sL sR) = do
case sR of
Consume _ -> undefined
Produce st -> do
r <- produceR st
return $ case r of
Yield x s@(Consume _) ->
Yield x (Consume (Tuple' sL s))
Yield x s@(Produce _) ->
Yield x (Produce (Tuple3' Nothing sL s))
Continue s@(Consume _) ->
Continue (Consume (Tuple' sL s))
Continue s@(Produce _) ->
Continue (Produce (Tuple3' Nothing sL s))
instance Monad m => Semigroup (Pipe m a b) where
{-# INLINE (<>) #-}
(<>) = tee
{-# INLINE map #-}
map :: Monad m => (a -> b) -> Pipe m a b
map f = Pipe consume undefined ()
where
consume _ a = return $ Yield (f a) (Consume ())
{-# INLINE_NORMAL compose #-}
compose :: Monad m => Pipe m b c -> Pipe m a b -> Pipe m a c
compose (Pipe consumeL produceL stateL) (Pipe consumeR produceR stateR) =
Pipe consume produce state
where
state = Tuple' (Consume stateL) (Consume stateR)
consume (Tuple' sL sR) a = do
case sL of
Consume stt ->
case sR of
Consume st -> do
rres <- consumeR st a
case rres of
Yield x sR' -> do
let next s =
if isProduce sR'
then Produce s
else Consume s
lres <- consumeL stt x
return $ case lres of
Yield y s1@(Consume _) ->
Yield y (next $ Tuple' s1 sR')
Yield y s1@(Produce _) ->
Yield y (Produce $ Tuple' s1 sR')
Continue s1@(Consume _) ->
Continue (next $ Tuple' s1 sR')
Continue s1@(Produce _) ->
Continue (Produce $ Tuple' s1 sR')
Continue s1@(Consume _) ->
return $ Continue (Consume $ Tuple' sL s1)
Continue s1@(Produce _) ->
return $ Continue (Produce $ Tuple' sL s1)
Produce _ -> undefined
Produce _ -> undefined
produce (Tuple' sL sR) = do
case sL of
Produce st -> do
r <- produceL st
let next s = if isProduce sR then Produce s else Consume s
return $ case r of
Yield x s@(Consume _) -> Yield x (next $ Tuple' s sR)
Yield x s@(Produce _) -> Yield x (Produce $ Tuple' s sR)
Continue s@(Consume _) -> Continue (next $ Tuple' s sR)
Continue s@(Produce _) -> Continue (Produce $ Tuple' s sR)
Consume stt ->
case sR of
Produce st -> do
rR <- produceR st
case rR of
Yield x sR' -> do
let next s =
if isProduce sR'
then Produce s
else Consume s
rL <- consumeL stt x
return $ case rL of
Yield y s1@(Consume _) ->
Yield y (next $ Tuple' s1 sR')
Yield y s1@(Produce _) ->
Yield y (Produce $ Tuple' s1 sR')
Continue s1@(Consume _) ->
Continue (next $ Tuple' s1 sR')
Continue s1@(Produce _) ->
Continue (Produce $ Tuple' s1 sR')
Continue s1@(Consume _) ->
return $ Continue (Consume $ Tuple' sL s1)
Continue s1@(Produce _) ->
return $ Continue (Produce $ Tuple' sL s1)
Consume _ -> return $ Continue (Consume $ Tuple' sL sR)
instance Monad m => Category (Pipe m) where
{-# INLINE id #-}
id = map Prelude.id
{-# INLINE (.) #-}
(.) = compose
unzip :: Pipe m a x -> Pipe m b y -> Pipe m (a, b) (x, y)
unzip = undefined
instance Monad m => Arrow (Pipe m) where
{-# INLINE arr #-}
arr = map
{-# INLINE (***) #-}
(***) = unzip
{-# INLINE (&&&) #-}
(&&&) = zipWith (,)