{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TypeOperators         #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE PolyKinds             #-}
{-# LANGUAGE TypeFamilies          #-}
{-# LANGUAGE ConstraintKinds       #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE AllowAmbiguousTypes   #-}
{-# LANGUAGE TypeApplications      #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
{-|
Module      : Control.MapReduce.Engines.Streams
Description : map-reduce-folds builders
Copyright   : (c) Adam Conner-Sax 2019
License     : BSD-3-Clause
Maintainer  : adam_conner_sax@yahoo.com
Stability   : experimental

map-reduce engine (fold builder) using @Streaming.Streams@ as its intermediate type.  Because @Streaming.Stream@ does not end with the
type of data data in the @Stream@, we wrap this type in @StreamResult@ for the purposes of the output type of the fold.
-}
module Control.MapReduce.Engines.Streaming
  (
    -- * Helper Types
    StreamResult(..)

    -- * Engines
  , streamingEngine
  , streamingEngineM

  -- * Result Extractors
  , resultToList
  , concatStream
  , concatStreamFold
  , concatStreamFoldM

  -- * @groupBy@ functions
  , groupByHashableKey
  , groupByOrderedKey
  )
where

import qualified Control.MapReduce.Core        as MRC
import qualified Control.MapReduce.Engines     as MRE

import qualified Control.Foldl                 as FL
import           Data.Functor.Identity          ( Identity )
import           Data.Hashable                  ( Hashable )
import qualified Data.HashMap.Strict           as HMS
import qualified Data.Map.Strict               as MS
import qualified Data.Sequence                 as Seq
import qualified Streaming.Prelude             as S
import qualified Streaming                     as S
import           Streaming                      ( Stream
                                                , Of
                                                )
import           Control.Arrow                  ( second )


-- | unpack for streaming based map/reduce
unpackStream
  :: MRC.Unpack x y -> S.Stream (Of x) Identity r -> Stream (Of y) Identity r
unpackStream :: Unpack x y -> Stream (Of x) Identity r -> Stream (Of y) Identity r
unpackStream (MRC.Filter x -> Bool
t) = (x -> Bool) -> Stream (Of x) Identity r -> Stream (Of x) Identity r
forall (m :: * -> *) a r.
Monad m =>
(a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m r
S.filter x -> Bool
t
unpackStream (MRC.Unpack x -> g y
f) = Stream (Of (g y)) Identity r -> Stream (Of y) Identity r
forall (m :: * -> *) (f :: * -> *) a r.
(Monad m, Foldable f) =>
Stream (Of (f a)) m r -> Stream (Of a) m r
S.concat (Stream (Of (g y)) Identity r -> Stream (Of y) Identity r)
-> (Stream (Of x) Identity r -> Stream (Of (g y)) Identity r)
-> Stream (Of x) Identity r
-> Stream (Of y) Identity r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (x -> g y)
-> Stream (Of x) Identity r -> Stream (Of (g y)) Identity r
forall (m :: * -> *) a b r.
Monad m =>
(a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
S.map x -> g y
f
{-# INLINABLE unpackStream #-}

-- | effectful (monadic) unpack for streaming based map/reduce
unpackStreamM
  :: Monad m => MRC.UnpackM m x y -> Stream (Of x) m r -> Stream (Of y) m r
unpackStreamM :: UnpackM m x y -> Stream (Of x) m r -> Stream (Of y) m r
unpackStreamM (MRC.FilterM x -> m Bool
t) = (x -> m Bool) -> Stream (Of x) m r -> Stream (Of x) m r
forall (m :: * -> *) a r.
Monad m =>
(a -> m Bool) -> Stream (Of a) m r -> Stream (Of a) m r
S.filterM x -> m Bool
t
unpackStreamM (MRC.UnpackM x -> m (g y)
f) = Stream (Of (g y)) m r -> Stream (Of y) m r
forall (m :: * -> *) (f :: * -> *) a r.
(Monad m, Foldable f) =>
Stream (Of (f a)) m r -> Stream (Of a) m r
S.concat (Stream (Of (g y)) m r -> Stream (Of y) m r)
-> (Stream (Of x) m r -> Stream (Of (g y)) m r)
-> Stream (Of x) m r
-> Stream (Of y) m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (x -> m (g y)) -> Stream (Of x) m r -> Stream (Of (g y)) m r
forall (m :: * -> *) a b r.
Monad m =>
(a -> m b) -> Stream (Of a) m r -> Stream (Of b) m r
S.mapM x -> m (g y)
f
{-# INLINABLE unpackStreamM #-}

-- | group the mapped and assigned values by key using a @Data.HashMap.Strict@
groupByHashableKey
  :: forall m k c r
   . (Monad m, Hashable k, Eq k)
  => Stream (Of (k, c)) m r
  -> Stream (Of (k, Seq.Seq c)) m r
groupByHashableKey :: Stream (Of (k, c)) m r -> Stream (Of (k, Seq c)) m r
groupByHashableKey Stream (Of (k, c)) m r
s = m (Stream (Of (k, Seq c)) m r) -> Stream (Of (k, Seq c)) m r
forall (m :: * -> *) (f :: * -> *) r.
(Monad m, Functor f) =>
m (Stream f m r) -> Stream f m r
S.effect (m (Stream (Of (k, Seq c)) m r) -> Stream (Of (k, Seq c)) m r)
-> m (Stream (Of (k, Seq c)) m r) -> Stream (Of (k, Seq c)) m r
forall a b. (a -> b) -> a -> b
$ do
  ([(k, c)]
lkc S.:> r
r) <- Stream (Of (k, c)) m r -> m (Of [(k, c)] r)
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Of [a] r)
S.toList Stream (Of (k, c)) m r
s
  let hm :: HashMap k (Seq c)
hm = (Seq c -> Seq c -> Seq c) -> [(k, Seq c)] -> HashMap k (Seq c)
forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> [(k, v)] -> HashMap k v
HMS.fromListWith Seq c -> Seq c -> Seq c
forall a. Semigroup a => a -> a -> a
(<>) ([(k, Seq c)] -> HashMap k (Seq c))
-> [(k, Seq c)] -> HashMap k (Seq c)
forall a b. (a -> b) -> a -> b
$ ((k, c) -> (k, Seq c)) -> [(k, c)] -> [(k, Seq c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((c -> Seq c) -> (k, c) -> (k, Seq c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second c -> Seq c
forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
  Stream (Of (k, Seq c)) m r -> m (Stream (Of (k, Seq c)) m r)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream (Of (k, Seq c)) m r -> m (Stream (Of (k, Seq c)) m r))
-> Stream (Of (k, Seq c)) m r -> m (Stream (Of (k, Seq c)) m r)
forall a b. (a -> b) -> a -> b
$ (k
 -> Seq c
 -> Stream (Of (k, Seq c)) m r
 -> Stream (Of (k, Seq c)) m r)
-> Stream (Of (k, Seq c)) m r
-> HashMap k (Seq c)
-> Stream (Of (k, Seq c)) m r
forall k v a. (k -> v -> a -> a) -> a -> HashMap k v -> a
HMS.foldrWithKey (\k
k Seq c
lc Stream (Of (k, Seq c)) m r
s' -> (k, Seq c)
-> Stream (Of (k, Seq c)) m r -> Stream (Of (k, Seq c)) m r
forall (m :: * -> *) a r.
Monad m =>
a -> Stream (Of a) m r -> Stream (Of a) m r
S.cons (k
k, Seq c
lc) Stream (Of (k, Seq c)) m r
s') (r -> Stream (Of (k, Seq c)) m r
forall (m :: * -> *) a. Monad m => a -> m a
return r
r) HashMap k (Seq c)
hm
{-# INLINABLE groupByHashableKey #-}

-- | group the mapped and assigned values by key using a @Data.Map.Strict@
groupByOrderedKey
  :: forall m k c r
   . (Monad m, Ord k)
  => Stream (Of (k, c)) m r
  -> Stream (Of (k, Seq.Seq c)) m r
groupByOrderedKey :: Stream (Of (k, c)) m r -> Stream (Of (k, Seq c)) m r
groupByOrderedKey Stream (Of (k, c)) m r
s = m (Stream (Of (k, Seq c)) m r) -> Stream (Of (k, Seq c)) m r
forall (m :: * -> *) (f :: * -> *) r.
(Monad m, Functor f) =>
m (Stream f m r) -> Stream f m r
S.effect (m (Stream (Of (k, Seq c)) m r) -> Stream (Of (k, Seq c)) m r)
-> m (Stream (Of (k, Seq c)) m r) -> Stream (Of (k, Seq c)) m r
forall a b. (a -> b) -> a -> b
$ do
  ([(k, c)]
lkc S.:> r
r) <- Stream (Of (k, c)) m r -> m (Of [(k, c)] r)
forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Of [a] r)
S.toList Stream (Of (k, c)) m r
s
  let hm :: Map k (Seq c)
hm = (Seq c -> Seq c -> Seq c) -> [(k, Seq c)] -> Map k (Seq c)
forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
MS.fromListWith Seq c -> Seq c -> Seq c
forall a. Semigroup a => a -> a -> a
(<>) ([(k, Seq c)] -> Map k (Seq c)) -> [(k, Seq c)] -> Map k (Seq c)
forall a b. (a -> b) -> a -> b
$ ((k, c) -> (k, Seq c)) -> [(k, c)] -> [(k, Seq c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((c -> Seq c) -> (k, c) -> (k, Seq c)
forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second c -> Seq c
forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
  Stream (Of (k, Seq c)) m r -> m (Stream (Of (k, Seq c)) m r)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream (Of (k, Seq c)) m r -> m (Stream (Of (k, Seq c)) m r))
-> Stream (Of (k, Seq c)) m r -> m (Stream (Of (k, Seq c)) m r)
forall a b. (a -> b) -> a -> b
$ (k
 -> Seq c
 -> Stream (Of (k, Seq c)) m r
 -> Stream (Of (k, Seq c)) m r)
-> Stream (Of (k, Seq c)) m r
-> Map k (Seq c)
-> Stream (Of (k, Seq c)) m r
forall k a b. (k -> a -> b -> b) -> b -> Map k a -> b
MS.foldrWithKey (\k
k Seq c
lc Stream (Of (k, Seq c)) m r
s' -> (k, Seq c)
-> Stream (Of (k, Seq c)) m r -> Stream (Of (k, Seq c)) m r
forall (m :: * -> *) a r.
Monad m =>
a -> Stream (Of a) m r -> Stream (Of a) m r
S.cons (k
k, Seq c
lc) Stream (Of (k, Seq c)) m r
s') (r -> Stream (Of (k, Seq c)) m r
forall (m :: * -> *) a. Monad m => a -> m a
return r
r) Map k (Seq c)
hm
{-# INLINABLE groupByOrderedKey #-}

-- | Wrap @Stream (Of d) m ()@ in a type which has @d@ as its last parameter
newtype StreamResult m d = StreamResult { StreamResult m d -> Stream (Of d) m ()
unRes :: Stream (Of d) m () }

-- | get a @[]@ result from a Stream
resultToList :: Monad m => StreamResult m d -> m [d]
resultToList :: StreamResult m d -> m [d]
resultToList = Stream (Of d) m () -> m [d]
forall (m :: * -> *) a r. Monad m => Stream (Of a) m r -> m [a]
S.toList_ (Stream (Of d) m () -> m [d])
-> (StreamResult m d -> Stream (Of d) m ())
-> StreamResult m d
-> m [d]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamResult m d -> Stream (Of d) m ()
forall (m :: * -> *) d. StreamResult m d -> Stream (Of d) m ()
unRes

concatStreaming :: (Monad m, Monoid a) => Stream (Of a) m () -> m a
concatStreaming :: Stream (Of a) m () -> m a
concatStreaming = (Of a (m a) -> m a) -> Stream (Of a) m a -> m a
forall (f :: * -> *) (m :: * -> *) a.
(Functor f, Monad m) =>
(f (m a) -> m a) -> Stream f m a -> m a
S.iterT Of a (m a) -> m a
forall (f :: * -> *) b.
(Functor f, Semigroup b) =>
Of b (f b) -> f b
g (Stream (Of a) m a -> m a)
-> (Stream (Of a) m () -> Stream (Of a) m a)
-> Stream (Of a) m ()
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (() -> a) -> Stream (Of a) m () -> Stream (Of a) m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a -> () -> a
forall a b. a -> b -> a
const a
forall a. Monoid a => a
mempty)
  where g :: Of b (f b) -> f b
g (b
a S.:> f b
ma) = (b -> b) -> f b -> f b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (b
a b -> b -> b
forall a. Semigroup a => a -> a -> a
<>) f b
ma

-- | @mappend@ all elements of a @StreamResult@ of monoids
concatStream :: (Monad m, Monoid a) => StreamResult m a -> m a
concatStream :: StreamResult m a -> m a
concatStream = Stream (Of a) m () -> m a
forall (m :: * -> *) a.
(Monad m, Monoid a) =>
Stream (Of a) m () -> m a
concatStreaming (Stream (Of a) m () -> m a)
-> (StreamResult m a -> Stream (Of a) m ())
-> StreamResult m a
-> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamResult m a -> Stream (Of a) m ()
forall (m :: * -> *) d. StreamResult m d -> Stream (Of d) m ()
unRes

-- | apply monoidal stream-concatenation to a fold returning a stream to produce a fold returning the monoid
concatStreamFold
  :: Monoid b => FL.Fold a (StreamResult Identity b) -> FL.Fold a b
concatStreamFold :: Fold a (StreamResult Identity b) -> Fold a b
concatStreamFold = (StreamResult Identity b -> b)
-> Fold a (StreamResult Identity b) -> Fold a b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Identity b -> b
forall a. Identity a -> a
S.runIdentity (Identity b -> b)
-> (StreamResult Identity b -> Identity b)
-> StreamResult Identity b
-> b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. StreamResult Identity b -> Identity b
forall (m :: * -> *) a.
(Monad m, Monoid a) =>
StreamResult m a -> m a
concatStream)

-- | apply monoidal stream-concatenation to an effectful fold returning a stream to produce an effectful fold returning the monoid
concatStreamFoldM
  :: (Monad m, Monoid b) => FL.FoldM m a (StreamResult m b) -> FL.FoldM m a b
concatStreamFoldM :: FoldM m a (StreamResult m b) -> FoldM m a b
concatStreamFoldM = (StreamResult m b -> m b)
-> FoldM m a (StreamResult m b) -> FoldM m a b
forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MRC.postMapM StreamResult m b -> m b
forall (m :: * -> *) a.
(Monad m, Monoid a) =>
StreamResult m a -> m a
concatStream

-- | map-reduce-fold builder returning a @StreamResult@
streamingEngine
  :: (Foldable g, Functor g)
  => (  forall z r
      . Stream (Of (k, z)) Identity r
     -> Stream (Of (k, g z)) Identity r
     )
  -> MRE.MapReduceFold y k c (StreamResult Identity) x d
streamingEngine :: (forall z r.
 Stream (Of (k, z)) Identity r -> Stream (Of (k, g z)) Identity r)
-> MapReduceFold y k c (StreamResult Identity) x d
streamingEngine forall z r.
Stream (Of (k, z)) Identity r -> Stream (Of (k, g z)) Identity r
groupByKey Unpack x y
u (MRC.Assign y -> (k, c)
a) Reduce k c d
r = (Stream (Of d) Identity () -> StreamResult Identity d)
-> Fold x (Stream (Of d) Identity ())
-> Fold x (StreamResult Identity d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Stream (Of d) Identity () -> StreamResult Identity d
forall (m :: * -> *) d. Stream (Of d) m () -> StreamResult m d
StreamResult (Fold x (Stream (Of d) Identity ())
 -> Fold x (StreamResult Identity d))
-> Fold x (Stream (Of d) Identity ())
-> Fold x (StreamResult Identity d)
forall a b. (a -> b) -> a -> b
$ (Stream (Of x) Identity () -> x -> Stream (Of x) Identity ())
-> Stream (Of x) Identity ()
-> (Stream (Of x) Identity () -> Stream (Of d) Identity ())
-> Fold x (Stream (Of d) Identity ())
forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
  ((x -> Stream (Of x) Identity () -> Stream (Of x) Identity ())
-> Stream (Of x) Identity () -> x -> Stream (Of x) Identity ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip x -> Stream (Of x) Identity () -> Stream (Of x) Identity ()
forall (m :: * -> *) a r.
Monad m =>
a -> Stream (Of a) m r -> Stream (Of a) m r
S.cons)
  (() -> Stream (Of x) Identity ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
  ( ((k, g c) -> d)
-> Stream (Of (k, g c)) Identity () -> Stream (Of d) Identity ()
forall (m :: * -> *) a b r.
Monad m =>
(a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
S.map (\(k
k, g c
lc) -> Reduce k c d -> k -> g c -> d
forall (h :: * -> *) k x d.
(Foldable h, Functor h) =>
Reduce k x d -> k -> h x -> d
MRE.reduceFunction Reduce k c d
r k
k g c
lc)
  (Stream (Of (k, g c)) Identity () -> Stream (Of d) Identity ())
-> (Stream (Of x) Identity () -> Stream (Of (k, g c)) Identity ())
-> Stream (Of x) Identity ()
-> Stream (Of d) Identity ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream (Of (k, c)) Identity () -> Stream (Of (k, g c)) Identity ()
forall z r.
Stream (Of (k, z)) Identity r -> Stream (Of (k, g z)) Identity r
groupByKey
  (Stream (Of (k, c)) Identity ()
 -> Stream (Of (k, g c)) Identity ())
-> (Stream (Of x) Identity () -> Stream (Of (k, c)) Identity ())
-> Stream (Of x) Identity ()
-> Stream (Of (k, g c)) Identity ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (y -> (k, c))
-> Stream (Of y) Identity () -> Stream (Of (k, c)) Identity ()
forall (m :: * -> *) a b r.
Monad m =>
(a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
S.map y -> (k, c)
a
  (Stream (Of y) Identity () -> Stream (Of (k, c)) Identity ())
-> (Stream (Of x) Identity () -> Stream (Of y) Identity ())
-> Stream (Of x) Identity ()
-> Stream (Of (k, c)) Identity ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Unpack x y
-> Stream (Of x) Identity () -> Stream (Of y) Identity ()
forall x y r.
Unpack x y -> Stream (Of x) Identity r -> Stream (Of y) Identity r
unpackStream Unpack x y
u
  )
{-# INLINABLE streamingEngine #-}

-- | effectful map-reduce-fold builder returning a @StreamResult@
streamingEngineM
  :: (Monad m, Traversable g)
  => (forall z r . Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r)
  -> MRE.MapReduceFoldM m y k c (StreamResult m) x d
streamingEngineM :: (forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r)
-> MapReduceFoldM m y k c (StreamResult m) x d
streamingEngineM forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r
groupByKey UnpackM m x y
u (MRC.AssignM y -> m (k, c)
a) ReduceM m k c d
r =
  (Stream (Of d) m () -> StreamResult m d)
-> FoldM m x (Stream (Of d) m ()) -> FoldM m x (StreamResult m d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Stream (Of d) m () -> StreamResult m d
forall (m :: * -> *) d. Stream (Of d) m () -> StreamResult m d
StreamResult (FoldM m x (Stream (Of d) m ()) -> FoldM m x (StreamResult m d))
-> (Fold x (Stream (Of d) m ()) -> FoldM m x (Stream (Of d) m ()))
-> Fold x (Stream (Of d) m ())
-> FoldM m x (StreamResult m d)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Fold x (Stream (Of d) m ()) -> FoldM m x (Stream (Of d) m ())
forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize (Fold x (Stream (Of d) m ()) -> FoldM m x (StreamResult m d))
-> Fold x (Stream (Of d) m ()) -> FoldM m x (StreamResult m d)
forall a b. (a -> b) -> a -> b
$ (Stream (Of x) m () -> x -> Stream (Of x) m ())
-> Stream (Of x) m ()
-> (Stream (Of x) m () -> Stream (Of d) m ())
-> Fold x (Stream (Of d) m ())
forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
    ((x -> Stream (Of x) m () -> Stream (Of x) m ())
-> Stream (Of x) m () -> x -> Stream (Of x) m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip x -> Stream (Of x) m () -> Stream (Of x) m ()
forall (m :: * -> *) a r.
Monad m =>
a -> Stream (Of a) m r -> Stream (Of a) m r
S.cons)
    (() -> Stream (Of x) m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
    ( ((k, g c) -> m d)
-> Stream (Of (k, g c)) m () -> Stream (Of d) m ()
forall (m :: * -> *) a b r.
Monad m =>
(a -> m b) -> Stream (Of a) m r -> Stream (Of b) m r
S.mapM (\(k
k, g c
lc) -> ReduceM m k c d -> k -> g c -> m d
forall (h :: * -> *) (m :: * -> *) k x d.
(Traversable h, Monad m) =>
ReduceM m k x d -> k -> h x -> m d
MRE.reduceFunctionM ReduceM m k c d
r k
k g c
lc)
    (Stream (Of (k, g c)) m () -> Stream (Of d) m ())
-> (Stream (Of x) m () -> Stream (Of (k, g c)) m ())
-> Stream (Of x) m ()
-> Stream (Of d) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream (Of (k, c)) m () -> Stream (Of (k, g c)) m ()
forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r
groupByKey
    (Stream (Of (k, c)) m () -> Stream (Of (k, g c)) m ())
-> (Stream (Of x) m () -> Stream (Of (k, c)) m ())
-> Stream (Of x) m ()
-> Stream (Of (k, g c)) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (y -> m (k, c)) -> Stream (Of y) m () -> Stream (Of (k, c)) m ()
forall (m :: * -> *) a b r.
Monad m =>
(a -> m b) -> Stream (Of a) m r -> Stream (Of b) m r
S.mapM y -> m (k, c)
a
    (Stream (Of y) m () -> Stream (Of (k, c)) m ())
-> (Stream (Of x) m () -> Stream (Of y) m ())
-> Stream (Of x) m ()
-> Stream (Of (k, c)) m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. UnpackM m x y -> Stream (Of x) m () -> Stream (Of y) m ()
forall (m :: * -> *) x y r.
Monad m =>
UnpackM m x y -> Stream (Of x) m r -> Stream (Of y) m r
unpackStreamM UnpackM m x y
u
    )
{-# INLINABLE streamingEngineM #-}