{-# LANGUAGE CPP                   #-}
{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TypeOperators         #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE PolyKinds             #-}
{-# LANGUAGE TypeFamilies          #-}
{-# LANGUAGE ConstraintKinds       #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE AllowAmbiguousTypes   #-}
{-# LANGUAGE TypeApplications      #-}
{-# LANGUAGE BangPatterns          #-}
{-# OPTIONS_GHC -O2 -fwarn-incomplete-patterns #-}
{-|
Module      : Control.MapReduce.Engines.Streams
Description : map-reduce-folds builders
Copyright   : (c) Adam Conner-Sax 2019
License     : BSD-3-Clause
Maintainer  : adam_conner_sax@yahoo.com
Stability   : experimental

map-reduce engine (fold builder) using @Streamly@ streams as its intermediate and return type.

Notes:
1. These are polymorphic in the return stream type.  Thought the streams do have to be @serial@ when @groupBy@ is called
So you have to specify the stream type in the call or it has to be inferrable from the use of the result.

2. There is a concurrent engine here, one that uses Streamly's concurrency features to map over the stream.  I've not
been able to verify that this is faster on an appropriate task with appropriate runtime settings.
-}
module Control.MapReduce.Engines.Streamly
  (
    -- * Engines
    streamlyEngine
  , streamlyEngineM
  , concurrentStreamlyEngine

    -- * Streamly Combinators
  , toStreamlyFold
  , toStreamlyFoldM

    -- * Result Extraction
  , resultToList
  , concatStream
  , concatStreamFold
  , concatStreamFoldM
  , concatConcurrentStreamFold

    -- * @groupBy@ Functions
  , groupByHashableKey
  , groupByOrderedKey
#if MIN_VERSION_streamly(0,9,0)
  , groupByOrderedKeyIO
#endif
  , groupByHashableKeyST
  , groupByDiscriminatedKey

    -- * Re-Exports
#if MIN_VERSION_streamly(0,9,0)
  , Stream
#else
  , SerialT
  , WSerialT
  , AheadT
  , AsyncT
  , WAsyncT
  , ParallelT
  , MonadAsync
  , IsStream
#endif
  )
where

import qualified Control.MapReduce.Core        as MRC
import qualified Control.MapReduce.Engines     as MRE

import           Control.Arrow                  ( second )
import qualified Control.Foldl                 as FL
import           Control.Monad.ST              as ST
import qualified Data.Discrimination.Grouping  as DG
import qualified Data.Foldable                 as F
import           Data.Functor.Identity          ( Identity(runIdentity) )
import           Data.Hashable                  ( Hashable )
import qualified Data.HashMap.Strict           as HMS
import qualified Data.HashTable.Class          as HT
import qualified Data.HashTable.ST.Cuckoo      as HTC
import qualified Data.List.NonEmpty            as LNE
import qualified Data.Maybe                    as Maybe
import qualified Data.Map.Strict               as MS
import qualified Data.Sequence                 as Seq
#if MIN_VERSION_streamly(0,9,0)
import Streamly.Data.Stream (Stream)
import qualified Streamly.Data.Stream as S
import qualified Streamly.Data.Stream.Prelude as SP
import qualified Streamly.Data.StreamK as StreamK
import qualified Streamly.Data.Fold   as SF
import qualified Streamly.Data.Unfold   as SU
import qualified Streamly.Internal.Data.Fold   as SF
import           Control.Monad (join)
import           Control.Monad.IO.Class (MonadIO)
#elif MIN_VERSION_streamly(0,8,0)
import qualified Streamly.Prelude              as S
import qualified Streamly.Internal.Data.Fold   as SF
import           Streamly.Prelude              ( SerialT
                                                , WSerialT
                                                , AheadT
                                                , AsyncT
                                                , WAsyncT
                                                , ParallelT
                                                , MonadAsync
                                                , IsStream
                                                )
#else
import qualified Streamly.Prelude              as S
import qualified Streamly                      as S
import qualified Streamly.Internal.Data.Fold   as SF
import           Streamly                       ( SerialT
                                                , WSerialT
                                                , AheadT
                                                , AsyncT
                                                , WAsyncT
                                                , ParallelT
                                                , MonadAsync
                                                , IsStream
                                                )
--import qualified Streamly.Internal.Data.Parser.ParserK.Type as Streamly
#endif
#if MIN_VERSION_streamly(0,9,0)
-- | convert a Control.Foldl FoldM into a Streamly.Data.Fold fold
toStreamlyFoldM :: Functor m => FL.FoldM m a b -> SF.Fold m a b
toStreamlyFoldM (FL.FoldM step start done) = SF.Fold step' (SF.Partial <$> start) done where
  step' s a = SF.Partial <$> step s a

-- | convert a Control.Foldl Fold into a Streamly.Data.Fold fold
toStreamlyFold :: Monad m => FL.Fold a b -> SF.Fold m a b
toStreamlyFold (FL.Fold step start done) = SF.Fold step' (pure $ SF.Partial $ start) (pure . done) where
  step' s a = pure $ SF.Partial $ step s a

#elif MIN_VERSION_streamly(0,8,0)
fromEffect :: (Monad m, IsStream t) => m a -> t m a
fromEffect :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
S.fromEffect
{-# INLINE fromEffect #-}

-- | convert a Control.Foldl FoldM into a Streamly.Data.Fold fold
toStreamlyFoldM :: Functor m => FL.FoldM m a b -> SF.Fold m a b
toStreamlyFoldM :: forall (m :: * -> *) a b. Functor m => FoldM m a b -> Fold m a b
toStreamlyFoldM (FL.FoldM x -> a -> m x
step m x
start x -> m b
done) = forall s a (m :: * -> *) b.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
SF.mkFoldM x -> a -> m (Step x b)
step' (forall s b. s -> Step s b
SF.Partial forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m x
start) x -> m b
done where
  step' :: x -> a -> m (Step x b)
step' x
s a
a = forall s b. s -> Step s b
SF.Partial forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> x -> a -> m x
step x
s a
a

-- | convert a Control.Foldl Fold into a Streamly.Data.Fold fold
toStreamlyFold :: Monad m => FL.Fold a b -> SF.Fold m a b
toStreamlyFold :: forall (m :: * -> *) a b. Monad m => Fold a b -> Fold m a b
toStreamlyFold (FL.Fold x -> a -> x
step x
start x -> b
done) = forall (m :: * -> *) s a b.
Monad m =>
(s -> a -> Step s b) -> Step s b -> (s -> b) -> Fold m a b
SF.mkFold x -> a -> Step x b
step' (forall s b. s -> Step s b
SF.Partial x
start) x -> b
done where
  step' :: x -> a -> Step x b
step' x
s a
a = forall s b. s -> Step s b
SF.Partial forall a b. (a -> b) -> a -> b
$ x -> a -> x
step x
s a
a
#else
fromEffect :: (Monad m, IsStream t) => m a -> t m a
fromEffect = S.yieldM
{-# INLINE fromEffect #-}

-- | convert a Control.Foldl FoldM into a Streamly.Data.Fold fold
toStreamlyFoldM :: FL.FoldM m a b -> SF.Fold m a b
toStreamlyFoldM (FL.FoldM step start done) = SF.mkFold step start done

-- | convert a Control.Foldl Fold into a Streamly.Data.Fold fold
toStreamlyFold :: Monad m => FL.Fold a b -> SF.Fold m a b
toStreamlyFold (FL.Fold step start done) = SF.mkPure step start done
#endif

-- | unpack for streamly based map/reduce
#if MIN_VERSION_streamly(0,9,0)
unpackStreamK :: MRC.Unpack x y -> StreamK.StreamK Identity x -> S.Stream Identity y
unpackStreamK (MRC.Filter t) = S.filter t . StreamK.toStream
unpackStreamK (MRC.Unpack f) = S.concatMap (StreamK.toStream . StreamK.fromFoldable . f) . StreamK.toStream
--  S.concatMap (StreamK.toStream . StreamK.fromFoldable . f)
{-# INLINABLE unpackStreamK #-}

-- | effectful (monadic) unpack for streamly based map/reduce
unpackStreamKM :: (Monad m) => MRC.UnpackM m x y -> StreamK.StreamK m x -> S.Stream m y
unpackStreamKM (MRC.FilterM t) = S.filterM t . StreamK.toStream
unpackStreamKM (MRC.UnpackM f) = S.concatMapM (fmap (StreamK.toStream . StreamK.fromFoldable) . f) . StreamK.toStream
{-# INLINABLE unpackStreamKM #-}

-- | make a stream into an (effectful) @[]@
resultToList :: (Monad m) => S.Stream m a -> m [a]
resultToList = S.toList
{-# INLINEABLE resultToList #-}

-- | mappend all in a monoidal stream
concatStream :: (Monad m, Monoid a) => S.Stream m a -> m a
concatStream = S.fold (SF.foldl' (<>) mempty)
{-# INLINEABLE concatStream #-}

-- | mappend everything in a pure Streamly fold
concatStreamFold :: Monoid b => FL.Fold a (S.Stream Identity b) -> FL.Fold a b
concatStreamFold = fmap (runIdentity . concatStream)
{-# INLINEABLE concatStreamFold #-}

-- | mappend everything in an effectful Streamly fold.
concatStreamFoldM
  :: (Monad m, Monoid b) => FL.FoldM m a (S.Stream m b) -> FL.FoldM m a b
concatStreamFoldM = MRC.postMapM concatStream
{-# INLINEABLE concatStreamFoldM #-}

-- | mappend everything in a concurrent Streamly fold.
concatConcurrentStreamFold
  :: (Monad m, Monoid b) => FL.Fold a (S.Stream m b) -> FL.FoldM m a b
concatConcurrentStreamFold = concatStreamFoldM . FL.generalize
{-# INLINEABLE concatConcurrentStreamFold #-}

-- | map-reduce-fold builder returning a @SerialT Identity d@ result
streamlyEngine
  :: (Foldable g, Functor g)
  => (forall z . S.Stream Identity (k, z) -> S.Stream Identity (k, g z))
  -> MRE.MapReduceFold y k c (S.Stream Identity) x d
streamlyEngine groupByKey u (MRC.Assign a) r = FL.Fold
  (flip StreamK.cons)
  StreamK.nil
  ( fmap (\(k, lc) -> MRE.reduceFunction r k lc)
    . groupByKey
    . fmap a
    . unpackStreamK u
  )
{-# INLINABLE streamlyEngine #-}

-- | unpack for concurrent streamly based map/reduce
unpackConcurrentlyK
  :: (SP.MonadAsync m) => MRC.Unpack x y -> StreamK.StreamK m x -> S.Stream m y
unpackConcurrentlyK (MRC.Filter t) = S.filter t . StreamK.toStream
unpackConcurrentlyK (MRC.Unpack f) = S.concatMap ((StreamK.toStream . StreamK.fromFoldable) . f) . StreamK.toStream
{-# INLINABLE unpackConcurrentlyK #-}

-- | possibly (depending on chosen stream types) concurrent map-reduce-fold builder returning an @(Istream t, MonadAsync m) => t m d@ result
concurrentStreamlyEngine
  :: forall m g y k c x d
   . (SP.MonadAsync m, Foldable g, Functor g)
  => (forall z . S.Stream m (k, z) -> S.Stream m (k, g z))
  -> MRE.MapReduceFold y k c (S.Stream m) x d
concurrentStreamlyEngine groupByKey u (MRC.Assign a) r = FL.Fold
  (\s a' -> (pure a') `StreamK.consM` s)
  StreamK.nil
  ( S.mapM (\(k, lc) -> return $ MRE.reduceFunction r k lc)
  . groupByKey
  . S.mapM (return . a)
  . unpackConcurrentlyK u
  )
{-# INLINABLE concurrentStreamlyEngine #-}


-- | effectful map-reduce-fold engine returning a (Istream t => t m d) result
-- The "MonadAsync" constraint here more or less requires us to run in IO, or something IO like.
streamlyEngineM
  :: (Monad m, SP.MonadAsync m, Traversable g)
  => (forall z . S.Stream m (k, z) -> S.Stream m (k, g z))
  -> MRE.MapReduceFoldM m y k c (S.Stream m) x d
streamlyEngineM groupByKey u (MRC.AssignM a) r =
  FL.generalize
    $ FL.Fold
        (flip StreamK.cons)
        StreamK.nil
        ( S.mapM (\(k, lc) -> MRE.reduceFunctionM r k lc)
        . groupByKey -- this requires a serial stream.
        . S.mapM a
        . unpackStreamKM u
        )
{-# INLINABLE streamlyEngineM #-}

toHashMap :: (Monad m, Eq k, Monoid a, Hashable k) => SF.Fold m (k, a) (HMS.HashMap k a)
toHashMap = SF.foldl' (\hm (k, a) -> HMS.insertWith (<>) k a hm) HMS.empty
{-# INLINEABLE toHashMap #-}

streamMeta :: Monad m => SF.Fold m a b -> SU.Unfold m b c -> S.Stream m a  -> S.Stream m c
streamMeta fld unfld = S.concatEffect . fmap (S.unfold unfld) . S.fold fld
{-# INLINEABLE streamMeta #-}

-- | Group streamly stream of @(k,c)@ by @hashable@ key.
groupByHashableKey
  :: (Monad m, Hashable k, Eq k)
  => S.Stream m (k, c)
  -> S.Stream m (k, Seq.Seq c)
groupByHashableKey = streamMeta toHashMap (SU.lmap HMS.toList $ SU.fromList) . fmap (second Seq.singleton)
{-# INLINABLE groupByHashableKey #-}

toMap :: (Monad m, Monoid a, Ord k) => SF.Fold m (k, a) (MS.Map k a)
toMap = SF.foldl' (\hm (k, a) -> MS.insertWith (<>) k a hm) MS.empty
{-# INLINEABLE toMap #-}

-- | Group streamly stream of @(k,c)@ by ordered key.
groupByOrderedKey
  :: (Monad m, Ord k) => S.Stream m (k, c) -> S.Stream m (k, Seq.Seq c)
groupByOrderedKey = --streamMeta (SF.toMap fst (SF.lmap snd toSeq)) (SU.lmap MS.toList $ SU.fromList)
  streamMeta toMap (SU.lmap MS.toList $ SU.fromList) . fmap (second Seq.singleton)
{-# INLINABLE groupByOrderedKey #-}

toSeq :: Monad m => SF.Fold m a (Seq.Seq a)
toSeq = SF.foldl' (\s a -> s <> Seq.singleton a) mempty
{-# INLINEABLE toSeq #-}

-- | Group streamly stream of @(k,c)@ by ordered key. Using toMapIO for mutable cells in the fold.
groupByOrderedKeyIO
  :: (Monad m, MonadIO m, Ord k) => S.Stream m (k, c) -> S.Stream m (k, Seq.Seq c)
groupByOrderedKeyIO = streamMeta (SF.toMapIO fst (SF.lmap snd toSeq)) (SU.lmap MS.toList $ SU.fromList)
{-# INLINABLE groupByOrderedKeyIO #-}

{-
toHashMapSeqST :: (Monad m, Eq k, Hashable k) => SF.Fold m (k, a) (ST.ST s (HTC.HashTable s k (Seq.Seq a)))
toHashMapSeqST = SF.foldl' (\hm (k, a) -> traverse (mutate k (f a)) hm) HTC.new where
  mutate k f hm = HTC.mutate hm k f
  f :: forall s . a -> Maybe (Seq.Seq a) -> Maybe (Seq.Seq a, ((HTC.HashTable s k (Seq.Seq a))))
  f a ht saM = case saM of
    Nothing -> (Seq.singleton a, ())
    Just sa -> (sa Seq.|> a, ())
{-# INLINEABLE toHashMapSeqST #-}
-}

-- | Group streamly stream of @(k,c)@ by @hashable@ key. Uses mutable hashtables running in the ST monad.
-- NB: this function uses the fact that @SerialT m@ is a monad
groupByHashableKeyST
  :: forall m k c . (Monad m, Hashable k, Eq k)
  => S.Stream m (k, c)
  -> S.Stream m (k, Seq.Seq c)
groupByHashableKeyST st = S.concatEffect $ fmap listToStream $ S.toList st
--  lkc <- fromEffect (S.toList st)
  where
    listToStream :: [(k, c)] -> S.Stream m (k, Seq.Seq c)
    listToStream l = S.fromList $ ST.runST $ listToList l
    listToList :: [(k, c)] -> ST.ST s [(k, Seq.Seq c)]
    listToList = join . fmap HT.toList . toHT
    toHT :: [(k, c)] -> ST.ST s (HTC.HashTable s k (Seq.Seq c))
    toHT l = (MRE.fromListWithHT @HTC.HashTable) (<>)
        $ fmap (second Seq.singleton) l
{-# INLINABLE groupByHashableKeyST #-}


-- | Group streamly stream of @(k,c)@ by key with instance of Grouping from <http://hackage.haskell.org/package/discrimination>.
-- NB: this function uses the fact that @SerialT m@ is a monad
groupByDiscriminatedKey
  :: (Monad m, DG.Grouping k)
  => S.Stream m (k, c)
  -> S.Stream m (k, Seq.Seq c)
groupByDiscriminatedKey s =
  S.concatEffect
  $ (StreamK.toStream . StreamK.fromFoldable . Maybe.catMaybes . fmap (fmap g . LNE.nonEmpty) . DG.groupWith fst)
  <$> S.toList s
  where
    g :: LNE.NonEmpty (k, c) -> (k, Seq.Seq c)
    g x = let k = fst (LNE.head x) in (k, F.fold $ fmap (Seq.singleton . snd) x)
{-# INLINABLE groupByDiscriminatedKey #-}
#else

unpackStream :: S.IsStream t => MRC.Unpack x y -> t Identity x -> t Identity y
unpackStream :: forall (t :: (* -> *) -> * -> *) x y.
IsStream t =>
Unpack x y -> t Identity x -> t Identity y
unpackStream (MRC.Filter x -> Bool
t) = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
S.filter x -> Bool
t
unpackStream (MRC.Unpack x -> g y
f) = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
S.concatMap (forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
S.fromFoldable forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> g y
f)
{-# INLINABLE unpackStream #-}

-- | effectful (monadic) unpack for streamly based map/reduce
unpackStreamM :: (S.IsStream t, Monad m) => MRC.UnpackM m x y -> t m x -> t m y
unpackStreamM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) x y.
(IsStream t, Monad m) =>
UnpackM m x y -> t m x -> t m y
unpackStreamM (MRC.FilterM x -> m Bool
t) = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> m Bool) -> t m a -> t m a
S.filterM x -> m Bool
t
unpackStreamM (MRC.UnpackM x -> m (g y)
f) = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m (t m b)) -> t m a -> t m b
S.concatMapM (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
S.fromFoldable forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> m (g y)
f)
{-# INLINABLE unpackStreamM #-}

-- | make a stream into an (effectful) @[]@
resultToList :: (Monad m, S.IsStream t) => t m a -> m [a]
resultToList :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
t m a -> m [a]
resultToList = forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt

-- | mappend all in a monoidal stream
concatStream :: (Monad m, Monoid a) => S.SerialT m a -> m a
concatStream :: forall (m :: * -> *) a. (Monad m, Monoid a) => SerialT m a -> m a
concatStream = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
S.foldl' forall a. Semigroup a => a -> a -> a
(<>) forall a. Monoid a => a
mempty

-- | mappend everything in a pure Streamly fold
concatStreamFold :: Monoid b => FL.Fold a (S.SerialT Identity b) -> FL.Fold a b
concatStreamFold :: forall b a. Monoid b => Fold a (SerialT Identity b) -> Fold a b
concatStreamFold = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a. Identity a -> a
runIdentity forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. (Monad m, Monoid a) => SerialT m a -> m a
concatStream)

-- | mappend everything in an effectful Streamly fold.
concatStreamFoldM
  :: (Monad m, Monoid b, S.IsStream t) => FL.FoldM m a (t m b) -> FL.FoldM m a b
concatStreamFoldM :: forall (m :: * -> *) b (t :: (* -> *) -> * -> *) a.
(Monad m, Monoid b, IsStream t) =>
FoldM m a (t m b) -> FoldM m a b
concatStreamFoldM = forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MRC.postMapM (forall (m :: * -> *) a. (Monad m, Monoid a) => SerialT m a -> m a
concatStream forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt)

-- | mappend everything in a concurrent Streamly fold.
concatConcurrentStreamFold
  :: (Monad m, Monoid b, S.IsStream t) => FL.Fold a (t m b) -> FL.FoldM m a b
concatConcurrentStreamFold :: forall (m :: * -> *) b (t :: (* -> *) -> * -> *) a.
(Monad m, Monoid b, IsStream t) =>
Fold a (t m b) -> FoldM m a b
concatConcurrentStreamFold = forall (m :: * -> *) b (t :: (* -> *) -> * -> *) a.
(Monad m, Monoid b, IsStream t) =>
FoldM m a (t m b) -> FoldM m a b
concatStreamFoldM forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize

-- | map-reduce-fold builder returning a @SerialT Identity d@ result
streamlyEngine
  :: (Foldable g, Functor g)
  => (forall z . S.SerialT Identity (k, z) -> S.SerialT Identity (k, g z))
  -> MRE.MapReduceFold y k c (SerialT Identity) x d
streamlyEngine :: forall (g :: * -> *) k y c x d.
(Foldable g, Functor g) =>
(forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z))
-> MapReduceFold y k c (SerialT Identity) x d
streamlyEngine forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)
groupByKey Unpack x y
u (MRC.Assign y -> (k, c)
a) Reduce k c d
r = forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
  (forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons)
  forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil
  ( forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
S.map (\(k
k, g c
lc) -> forall (h :: * -> *) k x d.
(Foldable h, Functor h) =>
Reduce k x d -> k -> h x -> d
MRE.reduceFunction Reduce k c d
r k
k g c
lc)
  forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall z. SerialT Identity (k, z) -> SerialT Identity (k, g z)
groupByKey
  forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
S.map y -> (k, c)
a
  forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) x y.
IsStream t =>
Unpack x y -> t Identity x -> t Identity y
unpackStream Unpack x y
u
  )
{-# INLINABLE streamlyEngine #-}

-- | unpack for concurrent streamly based map/reduce
unpackConcurrently
  :: (S.MonadAsync m, S.IsStream t) => MRC.Unpack x y -> t m x -> t m y
unpackConcurrently :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) x y.
(MonadAsync m, IsStream t) =>
Unpack x y -> t m x -> t m y
unpackConcurrently (MRC.Filter x -> Bool
t) = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
S.filter x -> Bool
t
unpackConcurrently (MRC.Unpack x -> g y
f) = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
S.concatMap (forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
S.fromFoldable forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> g y
f)
{-# INLINABLE unpackConcurrently #-}

-- | possibly (depending on chosen stream types) concurrent map-reduce-fold builder returning an @(Istream t, MonadAsync m) => t m d@ result
concurrentStreamlyEngine
  :: forall tIn tOut m g y k c x d
   . (S.IsStream tIn, S.IsStream tOut, S.MonadAsync m, Foldable g, Functor g)
  => (forall z . S.SerialT m (k, z) -> S.SerialT m (k, g z))
  -> MRE.MapReduceFold y k c (tOut m) x d
concurrentStreamlyEngine :: forall (tIn :: (* -> *) -> * -> *) (tOut :: (* -> *) -> * -> *)
       (m :: * -> *) (g :: * -> *) y k c x d.
(IsStream tIn, IsStream tOut, MonadAsync m, Foldable g,
 Functor g) =>
(forall z. SerialT m (k, z) -> SerialT m (k, g z))
-> MapReduceFold y k c (tOut m) x d
concurrentStreamlyEngine forall z. SerialT m (k, z) -> SerialT m (k, g z)
groupByKey Unpack x y
u (MRC.Assign y -> (k, c)
a) Reduce k c d
r = forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
  (\tIn m x
s x
a' -> (forall (m :: * -> *) a. Monad m => a -> m a
return x
a') forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
`S.consM` tIn m x
s)
  forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil
  ( forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM (\(k
k, g c
lc) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (h :: * -> *) k x d.
(Foldable h, Functor h) =>
Reduce k x d -> k -> h x -> d
MRE.reduceFunction Reduce k c d
r k
k g c
lc)
  forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt @SerialT @tOut -- make it concurrent for reducing
  forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall z. SerialT m (k, z) -> SerialT m (k, g z)
groupByKey
  forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt @tIn @SerialT-- make it serial for grouping
  forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. y -> (k, c)
a)
  forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
(S.|$) (forall (m :: * -> *) (t :: (* -> *) -> * -> *) x y.
(MonadAsync m, IsStream t) =>
Unpack x y -> t m x -> t m y
unpackConcurrently Unpack x y
u)
  )
{-# INLINABLE concurrentStreamlyEngine #-}


-- | effectful map-reduce-fold engine returning a (Istream t => t m d) result
-- The "MonadAsync" constraint here more or less requires us to run in IO, or something IO like.
streamlyEngineM
  :: (S.IsStream t, Monad m, S.MonadAsync m, Traversable g)
  => (forall z . SerialT m (k, z) -> SerialT m (k, g z))
  -> MRE.MapReduceFoldM m y k c (t m) x d
streamlyEngineM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (g :: * -> *) k y c
       x d.
(IsStream t, Monad m, MonadAsync m, Traversable g) =>
(forall z. SerialT m (k, z) -> SerialT m (k, g z))
-> MapReduceFoldM m y k c (t m) x d
streamlyEngineM forall z. SerialT m (k, z) -> SerialT m (k, g z)
groupByKey UnpackM m x y
u (MRC.AssignM y -> m (k, c)
a) ReduceM m k c d
r =
  forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize
    forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
S.adapt
    forall a b. (a -> b) -> a -> b
$ forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
        (forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons)
        forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil
        ( forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM (\(k
k, g c
lc) -> forall (h :: * -> *) (m :: * -> *) k x d.
(Traversable h, Monad m) =>
ReduceM m k x d -> k -> h x -> m d
MRE.reduceFunctionM ReduceM m k c d
r k
k g c
lc)
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall z. SerialT m (k, z) -> SerialT m (k, g z)
groupByKey -- this requires a serial stream.
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
S.mapM y -> m (k, c)
a
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) x y.
(IsStream t, Monad m) =>
UnpackM m x y -> t m x -> t m y
unpackStreamM UnpackM m x y
u
        )
{-# INLINABLE streamlyEngineM #-}

-- TODO: Try using Streamly folds and Map.insertWith instead of toList and fromListWith.  Prolly the same.
-- | Group streamly stream of @(k,c)@ by @hashable@ key.
-- NB: this function uses the fact that @SerialT m@ is a monad
groupByHashableKey
  :: (Monad m, Hashable k, Eq k)
  => S.SerialT m (k, c)
  -> S.SerialT m (k, Seq.Seq c)
groupByHashableKey :: forall (m :: * -> *) k c.
(Monad m, Hashable k, Eq k) =>
SerialT m (k, c) -> SerialT m (k, Seq c)
groupByHashableKey SerialT m (k, c)
s = do
  [(k, c)]
lkc <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect (forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList SerialT m (k, c)
s)
  let hm :: HashMap k (Seq c)
hm = forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> [(k, v)] -> HashMap k v
HMS.fromListWith forall a. Semigroup a => a -> a -> a
(<>) forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second forall a b. (a -> b) -> a -> b
$ forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
  forall k v a. (k -> v -> a -> a) -> a -> HashMap k v -> a
HMS.foldrWithKey (\k
k Seq c
lc SerialT m (k, Seq c)
s' -> forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons (k
k, Seq c
lc) SerialT m (k, Seq c)
s') forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil HashMap k (Seq c)
hm
{-# INLINABLE groupByHashableKey #-}

-- TODO: Try using Streamly folds and Map.insertWith instead of toList and fromListWith.  Prolly the same.
-- | Group streamly stream of @(k,c)@ by ordered key.
-- NB: this function uses the fact that @SerialT m@ is a monad
groupByOrderedKey
  :: (Monad m, Ord k) => S.SerialT m (k, c) -> S.SerialT m (k, Seq.Seq c)
groupByOrderedKey :: forall (m :: * -> *) k c.
(Monad m, Ord k) =>
SerialT m (k, c) -> SerialT m (k, Seq c)
groupByOrderedKey SerialT m (k, c)
s = do
  [(k, c)]
lkc <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect (forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList SerialT m (k, c)
s)
  let hm :: Map k (Seq c)
hm = forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
MS.fromListWith forall a. Semigroup a => a -> a -> a
(<>) forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second forall a b. (a -> b) -> a -> b
$ forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
  forall k a b. (k -> a -> b -> b) -> b -> Map k a -> b
MS.foldrWithKey (\k
k Seq c
lc SerialT m (k, Seq c)
s' -> forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons (k
k, Seq c
lc) SerialT m (k, Seq c)
s') forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil Map k (Seq c)
hm
{-# INLINABLE groupByOrderedKey #-}

-- | Group streamly stream of @(k,c)@ by @hashable@ key. Uses mutable hashtables running in the ST monad.
-- NB: this function uses the fact that @SerialT m@ is a monad
groupByHashableKeyST
  :: (Monad m, Hashable k, Eq k)
  => S.SerialT m (k, c)
  -> S.SerialT m (k, Seq.Seq c)
groupByHashableKeyST :: forall (m :: * -> *) k c.
(Monad m, Hashable k, Eq k) =>
SerialT m (k, c) -> SerialT m (k, Seq c)
groupByHashableKeyST SerialT m (k, c)
st = do
  [(k, c)]
lkc <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect (forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList SerialT m (k, c)
st)
  forall a. (forall s. ST s a) -> a
ST.runST forall a b. (a -> b) -> a -> b
$ do
    HashTable s k (Seq c)
hm <- (forall (h :: * -> * -> * -> *) k v s.
(HashTable h, Eq k, Hashable k) =>
(v -> v -> v) -> [(k, v)] -> ST s (h s k v)
MRE.fromListWithHT @HTC.HashTable) forall a. Semigroup a => a -> a -> a
(<>)
      forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
    forall (h :: * -> * -> * -> *) a k v s.
HashTable h =>
(a -> (k, v) -> ST s a) -> a -> h s k v -> ST s a
HT.foldM (\SerialT m (k, Seq c)
s' (k
k, Seq c
sc) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons (k
k, Seq c
sc) SerialT m (k, Seq c)
s') forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil HashTable s k (Seq c)
hm
{-# INLINABLE groupByHashableKeyST #-}


-- | Group streamly stream of @(k,c)@ by key with instance of Grouping from <http://hackage.haskell.org/package/discrimination>.
-- NB: this function uses the fact that @SerialT m@ is a monad
groupByDiscriminatedKey
  :: (Monad m, DG.Grouping k)
  => S.SerialT m (k, c)
  -> S.SerialT m (k, Seq.Seq c)
groupByDiscriminatedKey :: forall (m :: * -> *) k c.
(Monad m, Grouping k) =>
SerialT m (k, c) -> SerialT m (k, Seq c)
groupByDiscriminatedKey SerialT m (k, c)
s = do
  [(k, c)]
lkc <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect (forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
S.toList SerialT m (k, c)
s)
  let g :: LNE.NonEmpty (k, c) -> (k, Seq.Seq c)
      g :: forall k c. NonEmpty (k, c) -> (k, Seq c)
g NonEmpty (k, c)
x = let k :: k
k = forall a b. (a, b) -> a
fst (forall a. NonEmpty a -> a
LNE.head NonEmpty (k, c)
x) in (k
k, forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
F.fold forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a. a -> Seq a
Seq.singleton forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> b
snd) NonEmpty (k, c)
x)
  forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
S.fromFoldable forall a b. (a -> b) -> a -> b
$ forall a. [Maybe a] -> [a]
Maybe.catMaybes forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall k c. NonEmpty (k, c) -> (k, Seq c)
g forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. [a] -> Maybe (NonEmpty a)
LNE.nonEmpty) forall a b. (a -> b) -> a -> b
$ forall b a. Grouping b => (a -> b) -> [a] -> [[a]]
DG.groupWith forall a b. (a, b) -> a
fst [(k, c)]
lkc
{-# INLINABLE groupByDiscriminatedKey #-}
#endif