module Streamly.Internal.Data.Sink
(
Sink (..)
, toFold
, tee
, distribute
, demux
, unzipM
, unzip
, lmap
, lmapM
, lfilter
, lfilterM
, drain
, drainM
)
where
import Control.Monad ((>=>), when, void)
import Data.Map.Strict (Map)
import Prelude
hiding (filter, drop, dropWhile, take, takeWhile, zipWith, foldr,
foldl, map, mapM, sequence, all, any, sum, product, elem,
notElem, maximum, minimum, head, last, tail, length, null,
reverse, iterate, init, and, or, lookup, foldr1, (!!),
scanl, scanl1, replicate, concatMap, mconcat, foldMap, unzip)
import Streamly.Internal.Data.Fold.Type (Fold(..), Step(..))
import Streamly.Internal.Data.Sink.Type (Sink(..))
import qualified Data.Map.Strict as Map
toFold :: Monad m => Sink m a -> Fold m a ()
toFold :: forall (m :: * -> *) a. Monad m => Sink m a -> Fold m a ()
toFold (Sink a -> m ()
f) = forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold forall {p} {b}. p -> a -> m (Step () b)
step forall {b}. m (Step () b)
begin forall {m :: * -> *} {p}. Monad m => p -> m ()
done
where
begin :: m (Step () b)
begin = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
Partial ()
step :: p -> a -> m (Step () b)
step p
_ a
a = forall s b. s -> Step s b
Partial forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> a -> m ()
f a
a
done :: p -> m ()
done p
_ = forall (m :: * -> *) a. Monad m => a -> m a
return ()
tee :: Monad m => Sink m a -> Sink m a -> Sink m a
tee :: forall (m :: * -> *) a. Monad m => Sink m a -> Sink m a -> Sink m a
tee (Sink a -> m ()
fL) (Sink a -> m ()
fR) = forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
a -> a -> m ()
fL a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m ()
fR a
a)
{-# INLINE distribute #-}
distribute :: Monad m => [Sink m a] -> Sink m a
distribute :: forall (m :: * -> *) a. Monad m => [Sink m a] -> Sink m a
distribute [Sink m a]
ss = forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
a -> forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (\(Sink a -> m ()
f) -> a -> m ()
f a
a) [Sink m a]
ss)
demux :: (Monad m, Ord k) => Map k (Sink m a) -> Sink m (a, k)
demux :: forall (m :: * -> *) k a.
(Monad m, Ord k) =>
Map k (Sink m a) -> Sink m (a, k)
demux Map k (Sink m a)
kv = forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (a, k) -> m ()
step
where
step :: (a, k) -> m ()
step (a
a, k
k) =
case forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup k
k Map k (Sink m a)
kv of
Maybe (Sink m a)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just (Sink a -> m ()
g) -> a -> m ()
g a
a
{-# INLINE unzipM #-}
unzipM :: Monad m => (a -> m (b,c)) -> Sink m b -> Sink m c -> Sink m a
unzipM :: forall (m :: * -> *) a b c.
Monad m =>
(a -> m (b, c)) -> Sink m b -> Sink m c -> Sink m a
unzipM a -> m (b, c)
f (Sink b -> m ()
stepB) (Sink c -> m ()
stepC) =
forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (a -> m (b, c)
f forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> (\(b
b, c
c) -> b -> m ()
stepB b
b forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> c -> m ()
stepC c
c))
{-# INLINE unzip #-}
unzip :: Monad m => (a -> (b,c)) -> Sink m b -> Sink m c -> Sink m a
unzip :: forall (m :: * -> *) a b c.
Monad m =>
(a -> (b, c)) -> Sink m b -> Sink m c -> Sink m a
unzip a -> (b, c)
f = forall (m :: * -> *) a b c.
Monad m =>
(a -> m (b, c)) -> Sink m b -> Sink m c -> Sink m a
unzipM (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> (b, c)
f)
{-# INLINABLE lmap #-}
lmap :: (a -> b) -> Sink m b -> Sink m a
lmap :: forall a b (m :: * -> *). (a -> b) -> Sink m b -> Sink m a
lmap a -> b
f (Sink b -> m ()
step) = forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (b -> m ()
step forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
f)
{-# INLINABLE lmapM #-}
lmapM :: Monad m => (a -> m b) -> Sink m b -> Sink m a
lmapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Sink m b -> Sink m a
lmapM a -> m b
f (Sink b -> m ()
step) = forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (a -> m b
f forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> b -> m ()
step)
{-# INLINABLE lfilter #-}
lfilter :: Monad m => (a -> Bool) -> Sink m a -> Sink m a
lfilter :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Sink m a -> Sink m a
lfilter a -> Bool
f (Sink a -> m ()
step) = forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
a -> forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (a -> Bool
f a
a) forall a b. (a -> b) -> a -> b
$ a -> m ()
step a
a)
{-# INLINABLE lfilterM #-}
lfilterM :: Monad m => (a -> m Bool) -> Sink m a -> Sink m a
lfilterM :: forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Sink m a -> Sink m a
lfilterM a -> m Bool
f (Sink a -> m ()
step) = forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
a -> a -> m Bool
f a
a forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
use -> forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
use forall a b. (a -> b) -> a -> b
$ a -> m ()
step a
a)
drain :: Monad m => Sink m a
drain :: forall (m :: * -> *) a. Monad m => Sink m a
drain = forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (\a
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ())
{-# INLINABLE drainM #-}
drainM :: Monad m => (a -> m b) -> Sink m a
drainM :: forall (m :: * -> *) a b. Monad m => (a -> m b) -> Sink m a
drainM a -> m b
f = forall (m :: * -> *) a. (a -> m ()) -> Sink m a
Sink (forall (f :: * -> *) a. Functor f => f a -> f ()
void forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f)