{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE BangPatterns #-}
{-# OPTIONS_GHC -O2 -fwarn-incomplete-patterns #-}
module Control.MapReduce.Engines.Streamly
(
streamlyEngine
, streamlyEngineM
, concurrentStreamlyEngine
, toStreamlyFold
, toStreamlyFoldM
, resultToList
, concatStream
, concatStreamFold
, concatStreamFoldM
, concatConcurrentStreamFold
, groupByHashableKey
, groupByOrderedKey
#if MIN_VERSION_streamly(0,9,0)
, groupByOrderedKeyIO
#endif
, groupByHashableKeyST
, groupByDiscriminatedKey
#if MIN_VERSION_streamly(0,9,0)
, Stream
#else
, SerialT
, WSerialT
, AheadT
, AsyncT
, WAsyncT
, ParallelT
, MonadAsync
, IsStream
#endif
)
where
import qualified Control.MapReduce.Core as MRC
import qualified Control.MapReduce.Engines as MRE
import Control.Arrow ( second )
import qualified Control.Foldl as FL
import Control.Monad.ST as ST
import qualified Data.Discrimination.Grouping as DG
import qualified Data.Foldable as F
import Data.Functor.Identity ( Identity(runIdentity) )
import Data.Hashable ( Hashable )
import qualified Data.HashMap.Strict as HMS
import qualified Data.HashTable.Class as HT
import qualified Data.HashTable.ST.Cuckoo as HTC
import qualified Data.List.NonEmpty as LNE
import qualified Data.Maybe as Maybe
import qualified Data.Map.Strict as MS
import qualified Data.Sequence as Seq
#if MIN_VERSION_streamly(0,9,0)
import Streamly.Data.Stream (Stream)
import qualified Streamly.Data.Stream as S
import qualified Streamly.Data.Stream.Prelude as SP
import qualified Streamly.Data.StreamK as StreamK
import qualified Streamly.Data.Fold as SF
import qualified Streamly.Data.Unfold as SU
import qualified Streamly.Internal.Data.Fold as SF
import Control.Monad (join)
import Control.Monad.IO.Class (MonadIO)
#elif MIN_VERSION_streamly(0,8,0)
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Fold as SF
import Streamly.Prelude ( SerialT
, WSerialT
, AheadT
, AsyncT
, WAsyncT
, ParallelT
, MonadAsync
, IsStream
)
#else
import qualified Streamly.Prelude as S
import qualified Streamly as S
import qualified Streamly.Internal.Data.Fold as SF
import Streamly ( SerialT
, WSerialT
, AheadT
, AsyncT
, WAsyncT
, ParallelT
, MonadAsync
, IsStream
)
#endif
#if MIN_VERSION_streamly(0,9,0)
toStreamlyFoldM :: Functor m => FL.FoldM m a b -> SF.Fold m a b
toStreamlyFoldM (FL.FoldM step start done) = SF.Fold step' (SF.Partial <$> start) done where
step' s a = SF.Partial <$> step s a
toStreamlyFold :: Monad m => FL.Fold a b -> SF.Fold m a b
toStreamlyFold (FL.Fold step start done) = SF.Fold step' (pure $ SF.Partial $ start) (pure . done) where
step' s a = pure $ SF.Partial $ step s a
#elif MIN_VERSION_streamly(0,8,0)
fromEffect :: (Monad m, IsStream t) => m a -> t m a
fromEffect :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
S.fromEffect
{-# INLINE fromEffect #-}
toStreamlyFoldM :: Functor m => FL.FoldM m a b -> SF.Fold m a b
toStreamlyFoldM :: forall (m :: * -> *) a b. Functor m => FoldM m a b -> Fold m a b
toStreamlyFoldM (FL.FoldM x -> a -> m x
step m x
start x -> m b
done) = forall s a (m :: * -> *) b.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
SF.mkFoldM x -> a -> m (Step x b)
step' (forall s b. s -> Step s b
SF.Partial forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m x
start) x -> m b
done where
step' :: x -> a -> m (Step x b)
step' x
s a
a = forall s b. s -> Step s b
SF.Partial forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> x -> a -> m x
step x
s a
a
toStreamlyFold :: Monad m => FL.Fold a b -> SF.Fold m a b
toStreamlyFold :: forall (m :: * -> *) a b. Monad m => Fold a b -> Fold m a b
toStreamlyFold (FL.Fold x -> a -> x
step x
start x -> b
done) = forall (m :: * -> *) s a b.
Monad m =>
(s -> a -> Step s b) -> Step s b -> (s -> b) -> Fold m a b
SF.mkFold x -> a -> Step x b
step' (forall s b. s -> Step s b
SF.Partial x
start) x -> b
done where
step' :: x -> a -> Step x b
step' x
s a
a = forall s b. s -> Step s b
SF.Partial forall a b. (a -> b) -> a -> b
$ x -> a -> x
step x
s a
a
#else
fromEffect :: (Monad m, IsStream t) => m a -> t m a
fromEffect = S.yieldM
{-# INLINE fromEffect #-}
toStreamlyFoldM :: FL.FoldM m a b -> SF.Fold m a b
toStreamlyFoldM (FL.FoldM step start done) = SF.mkFold step start done
toStreamlyFold :: Monad m => FL.Fold a b -> SF.Fold m a b
toStreamlyFold (FL.Fold step start done) = SF.mkPure step start done
#endif
#if MIN_VERSION_streamly(0,9,0)
unpackStreamK :: MRC.Unpack x y -> StreamK.StreamK Identity x -> S.Stream Identity y
unpackStreamK (MRC.Filter t) = S.filter t . StreamK.toStream
unpackStreamK (MRC.Unpack f) = S.concatMap (StreamK.toStream . StreamK.fromFoldable . f) . StreamK.toStream
{-# INLINABLE unpackStreamK #-}
unpackStreamKM :: (Monad m) => MRC.UnpackM m x y -> StreamK.StreamK m x -> S.Stream m y
unpackStreamKM (MRC.FilterM t) = S.filterM t . StreamK.toStream
unpackStreamKM (MRC.UnpackM f) = S.concatMapM (fmap (StreamK.toStream . StreamK.fromFoldable) . f) . StreamK.toStream
{-# INLINABLE unpackStreamKM #-}
resultToList :: (Monad m) => S.Stream m a -> m [a]
resultToList = S.toList
{-# INLINEABLE resultToList #-}
concatStream :: (Monad m, Monoid a) => S.Stream m a -> m a
concatStream = S.fold (SF.foldl' (<>) mempty)
{-# INLINEABLE concatStream #-}
concatStreamFold :: Monoid b => FL.Fold a (S.Stream Identity b) -> FL.Fold a b
concatStreamFold = fmap (runIdentity . concatStream)
{-# INLINEABLE concatStreamFold #-}
concatStreamFoldM
:: (Monad m, Monoid b) => FL.FoldM m a (S.Stream m b) -> FL.FoldM m a b
concatStreamFoldM = MRC.postMapM concatStream
{-# INLINEABLE concatStreamFoldM #-}
concatConcurrentStreamFold
:: (Monad m, Monoid b) => FL.Fold a (S.Stream m b) -> FL.FoldM m a b
concatConcurrentStreamFold = concatStreamFoldM . FL.generalize
{-# INLINEABLE concatConcurrentStreamFold #-}
streamlyEngine
:: (Foldable g, Functor g)
=> (forall z . S.Stream Identity (k, z) -> S.Stream Identity (k, g z))
-> MRE.MapReduceFold y k c (S.Stream Identity) x d
streamlyEngine groupByKey u (MRC.Assign a) r = FL.Fold
(flip StreamK.cons)
StreamK.nil
( fmap (\(k, lc) -> MRE.reduceFunction r k lc)
. groupByKey
. fmap a
. unpackStreamK u
)
{-# INLINABLE streamlyEngine #-}
unpackConcurrentlyK
:: (SP.MonadAsync m) => MRC.Unpack x y -> StreamK.StreamK m x -> S.Stream m y
unpackConcurrentlyK (MRC.Filter t) = S.filter t . StreamK.toStream
unpackConcurrentlyK (MRC.Unpack f) = S.concatMap ((StreamK.toStream . StreamK.fromFoldable) . f) . StreamK.toStream
{-# INLINABLE unpackConcurrentlyK #-}
concurrentStreamlyEngine
:: forall m g y k c x d
. (SP.MonadAsync m, Foldable g, Functor g)
=> (forall z . S.Stream m (k, z) -> S.Stream m (k, g z))
-> MRE.MapReduceFold y k c (S.Stream m) x d
concurrentStreamlyEngine groupByKey u (MRC.Assign a) r = FL.Fold
(\s a' -> (pure a') `StreamK.consM` s)
StreamK.nil
( S.mapM (\(k, lc) -> return $ MRE.reduceFunction r k lc)
. groupByKey
. S.mapM (return . a)
. unpackConcurrentlyK u
)
{-# INLINABLE concurrentStreamlyEngine #-}
streamlyEngineM
:: (Monad m, SP.MonadAsync m, Traversable g)
=> (forall z . S.Stream m (k, z) -> S.Stream m (k, g z))
-> MRE.MapReduceFoldM m y k c (S.Stream m) x d
streamlyEngineM groupByKey u (MRC.AssignM a) r =
FL.generalize
$ FL.Fold
(flip StreamK.cons)
StreamK.nil
( S.mapM (\(k, lc) -> MRE.reduceFunctionM r k lc)
. groupByKey
. S.mapM a
. unpackStreamKM u
)
{-# INLINABLE streamlyEngineM #-}
toHashMap :: (Monad m, Eq k, Monoid a, Hashable k) => SF.Fold m (k, a) (HMS.HashMap k a)
toHashMap = SF.foldl' (\hm (k, a) -> HMS.insertWith (<>) k a hm) HMS.empty
{-# INLINEABLE toHashMap #-}
streamMeta :: Monad m => SF.Fold m a b -> SU.Unfold m b c -> S.Stream m a -> S.Stream m c
streamMeta fld unfld = S.concatEffect . fmap (S.unfold unfld) . S.fold fld
{-# INLINEABLE streamMeta #-}
groupByHashableKey
:: (Monad m, Hashable k, Eq k)
=> S.Stream m (k, c)
-> S.Stream m (k, Seq.Seq c)
groupByHashableKey = streamMeta toHashMap (SU.lmap HMS.toList $ SU.fromList) . fmap (second Seq.singleton)
{-# INLINABLE groupByHashableKey #-}
toMap :: (Monad m, Monoid a, Ord k) => SF.Fold m (k, a) (MS.Map k a)
toMap = SF.foldl' (\hm (k, a) -> MS.insertWith (<>) k a hm) MS.empty
{-# INLINEABLE toMap #-}
groupByOrderedKey
:: (Monad m, Ord k) => S.Stream m (k, c) -> S.Stream m (k, Seq.Seq c)
groupByOrderedKey =
streamMeta toMap (SU.lmap MS.toList $ SU.fromList) . fmap (second Seq.singleton)
{-# INLINABLE groupByOrderedKey #-}
toSeq :: Monad m => SF.Fold m a (Seq.Seq a)
toSeq = SF.foldl' (\s a -> s <> Seq.singleton a) mempty
{-# INLINEABLE toSeq #-}
groupByOrderedKeyIO
:: (Monad m, MonadIO m, Ord k) => S.Stream m (k, c) -> S.Stream m (k, Seq.Seq c)
groupByOrderedKeyIO = streamMeta (SF.toMapIO fst (SF.lmap snd toSeq)) (SU.lmap MS.toList $ SU.fromList)
{-# INLINABLE groupByOrderedKeyIO #-}
groupByHashableKeyST
:: forall m k c . (Monad m, Hashable k, Eq k)
=> S.Stream m (k, c)
-> S.Stream m (k, Seq.Seq c)
groupByHashableKeyST st = S.concatEffect $ fmap listToStream $ S.toList st
where
listToStream :: [(k, c)] -> S.Stream m (k, Seq.Seq c)
listToStream l = S.fromList $ ST.runST $ listToList l
listToList :: [(k, c)] -> ST.ST s [(k, Seq.Seq c)]
listToList = join . fmap HT.toList . toHT
toHT :: [(k, c)] -> ST.ST s (HTC.HashTable s k (Seq.Seq c))
toHT l = (MRE.fromListWithHT @HTC.HashTable) (<>)
$ fmap (second Seq.singleton) l
{-# INLINABLE groupByHashableKeyST #-}
groupByDiscriminatedKey
:: (Monad m, DG.Grouping k)
=> S.Stream m (k, c)
-> S.Stream m (k, Seq.Seq c)
groupByDiscriminatedKey s =
S.concatEffect
$ (StreamK.toStream . StreamK.fromFoldable . Maybe.catMaybes . fmap (fmap g . LNE.nonEmpty) . DG.groupWith fst)
<$> S.toList s
where
g :: LNE.NonEmpty (k, c) -> (k, Seq.Seq c)
g x = let k = fst (LNE.head x) in (k, F.fold $ fmap (Seq.singleton . snd) x)
{-# INLINABLE groupByDiscriminatedKey #-}
#else
unpackStream :: S.IsStream t => MRC.Unpack x y -> t Identity x -> t Identity y
unpackStream :: forall (t :: (* -> *) -> * -> *) x y.
IsStream t =>
Unpack x y -> t Identity x -> t Identity y
unpackStream (MRC.Filter x -> Bool
t) = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
S.filter x -> Bool
t
unpackStream (MRC.Unpack x -> g y
f) = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
S.concatMap (forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
S.fromFoldable forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> g y
f)
{-# INLINABLE unpackStream #-}
unpackStreamM :: (S.IsStream t, Monad m) => MRC.UnpackM m x y -> t m x -> t m y
unpackStreamM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) x y.
(IsStream t, Monad m) =>
UnpackM m x y -> t m x -> t m y
unpackStreamM (MRC.FilterM x -> m Bool
t) = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> m Bool) -> t m a -> t m a
S.filterM x -> m Bool
t
unpackStreamM (MRC.UnpackM x -> m (g y)
f) = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m (t m b)) -> t m a -> t m b
S.concatMapM (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
S.fromFoldable forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> m (g y)
f)
{-# INLINABLE unpackStreamM #-}
resultToList :: (Monad m, S.IsStream t) => t m a -> m [a]
resultToList :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
t m a -> m [a]
resultToList = forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt
concatStream :: (Monad m, Monoid a) => S.SerialT m a -> m a
concatStream :: forall (m :: * -> *) a. (Monad m, Monoid a) => SerialT m a -> m a
concatStream = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
S.foldl' forall a. Semigroup a => a -> a -> a
(<>) forall a. Monoid a => a
mempty
concatStreamFold :: Monoid b => FL.Fold a (S.SerialT Identity b) -> FL.Fold a b
concatStreamFold :: forall b a. Monoid b => Fold a (SerialT Identity b) -> Fold a b
concatStreamFold = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a. Identity a -> a
runIdentity forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. (Monad m, Monoid a) => SerialT m a -> m a
concatStream)
concatStreamFoldM
:: (Monad m, Monoid b, S.IsStream t) => FL.FoldM m a (t m b) -> FL.FoldM m a b
concatStreamFoldM :: forall (m :: * -> *) b (t :: (* -> *) -> * -> *) a.
(Monad m, Monoid b, IsStream t) =>
FoldM m a (t m b) -> FoldM m a b
concatStreamFoldM = forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MRC.postMapM (forall (m :: * -> *) a. (Monad m, Monoid a) => SerialT m a -> m a
concatStream forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt)
concatConcurrentStreamFold
:: (Monad m, Monoid b, S.IsStream t) => FL.Fold a (t m b) -> FL.FoldM m a b
concatConcurrentStreamFold :: forall (m :: * -> *) b (t :: (* -> *) -> * -> *) a.
(Monad m, Monoid b, IsStream t) =>
Fold a (t m b) -> FoldM m a b
concatConcurrentStreamFold = forall (m :: * -> *) b (t :: (* -> *) -> * -> *) a.
(Monad m, Monoid b, IsStream t) =>
FoldM m a (t m b) -> FoldM m a b
concatStreamFoldM forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize
streamlyEngine
:: (Foldable g, Functor g)
=> (forall z . S.SerialT Identity (k, z) -> S.SerialT Identity (k, g z))
-> MRE.MapReduceFold y k c (SerialT Identity) x d
streamlyEngine :: forall (g :: * -> *) k y c x d.
(Foldable g, Functor g) =>
(forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z))
-> MapReduceFold y k c (SerialT Identity) x d
streamlyEngine forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)
groupByKey Unpack x y
u (MRC.Assign y -> (k, c)
a) Reduce k c d
r = forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
(forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil
( forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
S.map (\(k
k, g c
lc) -> forall (h :: * -> *) k x d.
(Foldable h, Functor h) =>
Reduce k x d -> k -> h x -> d
MRE.reduceFunction Reduce k c d
r k
k g c
lc)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)
groupByKey
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
S.map y -> (k, c)
a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) x y.
IsStream t =>
Unpack x y -> t Identity x -> t Identity y
unpackStream Unpack x y
u
)
{-# INLINABLE streamlyEngine #-}
unpackConcurrently
:: (S.MonadAsync m, S.IsStream t) => MRC.Unpack x y -> t m x -> t m y
unpackConcurrently :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) x y.
(MonadAsync m, IsStream t) =>
Unpack x y -> t m x -> t m y
unpackConcurrently (MRC.Filter x -> Bool
t) = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
S.filter x -> Bool
t
unpackConcurrently (MRC.Unpack x -> g y
f) = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
S.concatMap (forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
S.fromFoldable forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> g y
f)
{-# INLINABLE unpackConcurrently #-}
concurrentStreamlyEngine
:: forall tIn tOut m g y k c x d
. (S.IsStream tIn, S.IsStream tOut, S.MonadAsync m, Foldable g, Functor g)
=> (forall z . S.SerialT m (k, z) -> S.SerialT m (k, g z))
-> MRE.MapReduceFold y k c (tOut m) x d
concurrentStreamlyEngine :: forall (tIn :: (* -> *) -> * -> *) (tOut :: (* -> *) -> * -> *)
(m :: * -> *) (g :: * -> *) y k c x d.
(IsStream tIn, IsStream tOut, MonadAsync m, Foldable g,
Functor g) =>
(forall z. SerialT m (k, z) -> SerialT m (k, g z))
-> MapReduceFold y k c (tOut m) x d
concurrentStreamlyEngine forall z. SerialT m (k, z) -> SerialT m (k, g z)
groupByKey Unpack x y
u (MRC.Assign y -> (k, c)
a) Reduce k c d
r = forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
(\tIn m x
s x
a' -> (forall (m :: * -> *) a. Monad m => a -> m a
return x
a') forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
`S.consM` tIn m x
s)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil
( forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM (\(k
k, g c
lc) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (h :: * -> *) k x d.
(Foldable h, Functor h) =>
Reduce k x d -> k -> h x -> d
MRE.reduceFunction Reduce k c d
r k
k g c
lc)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt @SerialT @tOut
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall z. SerialT m (k, z) -> SerialT m (k, g z)
groupByKey
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt @tIn @SerialT
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. y -> (k, c)
a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
(S.|$) (forall (m :: * -> *) (t :: (* -> *) -> * -> *) x y.
(MonadAsync m, IsStream t) =>
Unpack x y -> t m x -> t m y
unpackConcurrently Unpack x y
u)
)
{-# INLINABLE concurrentStreamlyEngine #-}
streamlyEngineM
:: (S.IsStream t, Monad m, S.MonadAsync m, Traversable g)
=> (forall z . SerialT m (k, z) -> SerialT m (k, g z))
-> MRE.MapReduceFoldM m y k c (t m) x d
streamlyEngineM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (g :: * -> *) k y c
x d.
(IsStream t, Monad m, MonadAsync m, Traversable g) =>
(forall z. SerialT m (k, z) -> SerialT m (k, g z))
-> MapReduceFoldM m y k c (t m) x d
streamlyEngineM forall z. SerialT m (k, z) -> SerialT m (k, g z)
groupByKey UnpackM m x y
u (MRC.AssignM y -> m (k, c)
a) ReduceM m k c d
r =
forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize
forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt
forall a b. (a -> b) -> a -> b
$ forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
(forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil
( forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM (\(k
k, g c
lc) -> forall (h :: * -> *) (m :: * -> *) k x d.
(Traversable h, Monad m) =>
ReduceM m k x d -> k -> h x -> m d
MRE.reduceFunctionM ReduceM m k c d
r k
k g c
lc)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall z. SerialT m (k, z) -> SerialT m (k, g z)
groupByKey
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM y -> m (k, c)
a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) x y.
(IsStream t, Monad m) =>
UnpackM m x y -> t m x -> t m y
unpackStreamM UnpackM m x y
u
)
{-# INLINABLE streamlyEngineM #-}
groupByHashableKey
:: (Monad m, Hashable k, Eq k)
=> S.SerialT m (k, c)
-> S.SerialT m (k, Seq.Seq c)
groupByHashableKey :: forall (m :: * -> *) k c.
(Monad m, Hashable k, Eq k) =>
SerialT m (k, c) -> SerialT m (k, Seq c)
groupByHashableKey SerialT m (k, c)
s = do
[(k, c)]
lkc <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect (forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList SerialT m (k, c)
s)
let hm :: HashMap k (Seq c)
hm = forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> [(k, v)] -> HashMap k v
HMS.fromListWith forall a. Semigroup a => a -> a -> a
(<>) forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second forall a b. (a -> b) -> a -> b
$ forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
forall k v a. (k -> v -> a -> a) -> a -> HashMap k v -> a
HMS.foldrWithKey (\k
k Seq c
lc SerialT m (k, Seq c)
s' -> forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons (k
k, Seq c
lc) SerialT m (k, Seq c)
s') forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil HashMap k (Seq c)
hm
{-# INLINABLE groupByHashableKey #-}
groupByOrderedKey
:: (Monad m, Ord k) => S.SerialT m (k, c) -> S.SerialT m (k, Seq.Seq c)
groupByOrderedKey :: forall (m :: * -> *) k c.
(Monad m, Ord k) =>
SerialT m (k, c) -> SerialT m (k, Seq c)
groupByOrderedKey SerialT m (k, c)
s = do
[(k, c)]
lkc <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect (forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList SerialT m (k, c)
s)
let hm :: Map k (Seq c)
hm = forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
MS.fromListWith forall a. Semigroup a => a -> a -> a
(<>) forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second forall a b. (a -> b) -> a -> b
$ forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
forall k a b. (k -> a -> b -> b) -> b -> Map k a -> b
MS.foldrWithKey (\k
k Seq c
lc SerialT m (k, Seq c)
s' -> forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons (k
k, Seq c
lc) SerialT m (k, Seq c)
s') forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil Map k (Seq c)
hm
{-# INLINABLE groupByOrderedKey #-}
groupByHashableKeyST
:: (Monad m, Hashable k, Eq k)
=> S.SerialT m (k, c)
-> S.SerialT m (k, Seq.Seq c)
groupByHashableKeyST :: forall (m :: * -> *) k c.
(Monad m, Hashable k, Eq k) =>
SerialT m (k, c) -> SerialT m (k, Seq c)
groupByHashableKeyST SerialT m (k, c)
st = do
[(k, c)]
lkc <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect (forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList SerialT m (k, c)
st)
forall a. (forall s. ST s a) -> a
ST.runST forall a b. (a -> b) -> a -> b
$ do
HashTable s k (Seq c)
hm <- (forall (h :: * -> * -> * -> *) k v s.
(HashTable h, Eq k, Hashable k) =>
(v -> v -> v) -> [(k, v)] -> ST s (h s k v)
MRE.fromListWithHT @HTC.HashTable) forall a. Semigroup a => a -> a -> a
(<>)
forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
forall (h :: * -> * -> * -> *) a k v s.
HashTable h =>
(a -> (k, v) -> ST s a) -> a -> h s k v -> ST s a
HT.foldM (\SerialT m (k, Seq c)
s' (k
k, Seq c
sc) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons (k
k, Seq c
sc) SerialT m (k, Seq c)
s') forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil HashTable s k (Seq c)
hm
{-# INLINABLE groupByHashableKeyST #-}
groupByDiscriminatedKey
:: (Monad m, DG.Grouping k)
=> S.SerialT m (k, c)
-> S.SerialT m (k, Seq.Seq c)
groupByDiscriminatedKey :: forall (m :: * -> *) k c.
(Monad m, Grouping k) =>
SerialT m (k, c) -> SerialT m (k, Seq c)
groupByDiscriminatedKey SerialT m (k, c)
s = do
[(k, c)]
lkc <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect (forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList SerialT m (k, c)
s)
let g :: LNE.NonEmpty (k, c) -> (k, Seq.Seq c)
g :: forall k c. NonEmpty (k, c) -> (k, Seq c)
g NonEmpty (k, c)
x = let k :: k
k = forall a b. (a, b) -> a
fst (forall a. NonEmpty a -> a
LNE.head NonEmpty (k, c)
x) in (k
k, forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
F.fold forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a. a -> Seq a
Seq.singleton forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> b
snd) NonEmpty (k, c)
x)
forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
S.fromFoldable forall a b. (a -> b) -> a -> b
$ forall a. [Maybe a] -> [a]
Maybe.catMaybes forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall k c. NonEmpty (k, c) -> (k, Seq c)
g forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. [a] -> Maybe (NonEmpty a)
LNE.nonEmpty) forall a b. (a -> b) -> a -> b
$ forall b a. Grouping b => (a -> b) -> [a] -> [[a]]
DG.groupWith forall a b. (a, b) -> a
fst [(k, c)]
lkc
{-# INLINABLE groupByDiscriminatedKey #-}
#endif