{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
module Control.MapReduce.Engines.Streaming
(
StreamResult(..)
, streamingEngine
, streamingEngineM
, resultToList
, concatStream
, concatStreamFold
, concatStreamFoldM
, groupByHashableKey
, groupByOrderedKey
)
where
import qualified Control.MapReduce.Core as MRC
import qualified Control.MapReduce.Engines as MRE
import qualified Control.Foldl as FL
import Data.Functor.Identity ( Identity )
import Data.Hashable ( Hashable )
import qualified Data.HashMap.Strict as HMS
import qualified Data.Map.Strict as MS
import qualified Data.Sequence as Seq
import qualified Streaming.Prelude as S
import qualified Streaming as S
import Streaming ( Stream
, Of
)
import Control.Arrow ( second )
unpackStream
:: MRC.Unpack x y -> S.Stream (Of x) Identity r -> Stream (Of y) Identity r
unpackStream :: forall x y r.
Unpack x y -> Stream (Of x) Identity r -> Stream (Of y) Identity r
unpackStream (MRC.Filter x -> Bool
t) = forall (m :: * -> *) a r.
Monad m =>
(a -> Bool) -> Stream (Of a) m r -> Stream (Of a) m r
S.filter x -> Bool
t
unpackStream (MRC.Unpack x -> g y
f) = forall (m :: * -> *) (f :: * -> *) a r.
(Monad m, Foldable f) =>
Stream (Of (f a)) m r -> Stream (Of a) m r
S.concat forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b r.
Monad m =>
(a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
S.map x -> g y
f
{-# INLINABLE unpackStream #-}
unpackStreamM
:: Monad m => MRC.UnpackM m x y -> Stream (Of x) m r -> Stream (Of y) m r
unpackStreamM :: forall (m :: * -> *) x y r.
Monad m =>
UnpackM m x y -> Stream (Of x) m r -> Stream (Of y) m r
unpackStreamM (MRC.FilterM x -> m Bool
t) = forall (m :: * -> *) a r.
Monad m =>
(a -> m Bool) -> Stream (Of a) m r -> Stream (Of a) m r
S.filterM x -> m Bool
t
unpackStreamM (MRC.UnpackM x -> m (g y)
f) = forall (m :: * -> *) (f :: * -> *) a r.
(Monad m, Foldable f) =>
Stream (Of (f a)) m r -> Stream (Of a) m r
S.concat forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b r.
Monad m =>
(a -> m b) -> Stream (Of a) m r -> Stream (Of b) m r
S.mapM x -> m (g y)
f
{-# INLINABLE unpackStreamM #-}
groupByHashableKey
:: forall m k c r
. (Monad m, Hashable k, Eq k)
=> Stream (Of (k, c)) m r
-> Stream (Of (k, Seq.Seq c)) m r
groupByHashableKey :: forall (m :: * -> *) k c r.
(Monad m, Hashable k, Eq k) =>
Stream (Of (k, c)) m r -> Stream (Of (k, Seq c)) m r
groupByHashableKey Stream (Of (k, c)) m r
s = forall (m :: * -> *) (f :: * -> *) r.
(Monad m, Functor f) =>
m (Stream f m r) -> Stream f m r
S.effect forall a b. (a -> b) -> a -> b
$ do
([(k, c)]
lkc S.:> r
r) <- forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Of [a] r)
S.toList Stream (Of (k, c)) m r
s
let hm :: HashMap k (Seq c)
hm = forall k v.
(Eq k, Hashable k) =>
(v -> v -> v) -> [(k, v)] -> HashMap k v
HMS.fromListWith forall a. Semigroup a => a -> a -> a
(<>) forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall k v a. (k -> v -> a -> a) -> a -> HashMap k v -> a
HMS.foldrWithKey (\k
k Seq c
lc Stream (Of (k, Seq c)) m r
s' -> forall (m :: * -> *) a r.
Monad m =>
a -> Stream (Of a) m r -> Stream (Of a) m r
S.cons (k
k, Seq c
lc) Stream (Of (k, Seq c)) m r
s') (forall (m :: * -> *) a. Monad m => a -> m a
return r
r) HashMap k (Seq c)
hm
{-# INLINABLE groupByHashableKey #-}
groupByOrderedKey
:: forall m k c r
. (Monad m, Ord k)
=> Stream (Of (k, c)) m r
-> Stream (Of (k, Seq.Seq c)) m r
groupByOrderedKey :: forall (m :: * -> *) k c r.
(Monad m, Ord k) =>
Stream (Of (k, c)) m r -> Stream (Of (k, Seq c)) m r
groupByOrderedKey Stream (Of (k, c)) m r
s = forall (m :: * -> *) (f :: * -> *) r.
(Monad m, Functor f) =>
m (Stream f m r) -> Stream f m r
S.effect forall a b. (a -> b) -> a -> b
$ do
([(k, c)]
lkc S.:> r
r) <- forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Of [a] r)
S.toList Stream (Of (k, c)) m r
s
let hm :: Map k (Seq c)
hm = forall k a. Ord k => (a -> a -> a) -> [(k, a)] -> Map k a
MS.fromListWith forall a. Semigroup a => a -> a -> a
(<>) forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (d, b) (d, c)
second forall a. a -> Seq a
Seq.singleton) [(k, c)]
lkc
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall k a b. (k -> a -> b -> b) -> b -> Map k a -> b
MS.foldrWithKey (\k
k Seq c
lc Stream (Of (k, Seq c)) m r
s' -> forall (m :: * -> *) a r.
Monad m =>
a -> Stream (Of a) m r -> Stream (Of a) m r
S.cons (k
k, Seq c
lc) Stream (Of (k, Seq c)) m r
s') (forall (m :: * -> *) a. Monad m => a -> m a
return r
r) Map k (Seq c)
hm
{-# INLINABLE groupByOrderedKey #-}
newtype StreamResult m d = StreamResult { forall (m :: * -> *) d. StreamResult m d -> Stream (Of d) m ()
unRes :: Stream (Of d) m () }
resultToList :: Monad m => StreamResult m d -> m [d]
resultToList :: forall (m :: * -> *) d. Monad m => StreamResult m d -> m [d]
resultToList = forall (m :: * -> *) a r. Monad m => Stream (Of a) m r -> m [a]
S.toList_ forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) d. StreamResult m d -> Stream (Of d) m ()
unRes
concatStreaming :: (Monad m, Monoid a) => Stream (Of a) m () -> m a
concatStreaming :: forall (m :: * -> *) a.
(Monad m, Monoid a) =>
Stream (Of a) m () -> m a
concatStreaming = forall (f :: * -> *) (m :: * -> *) a.
(Functor f, Monad m) =>
(f (m a) -> m a) -> Stream f m a -> m a
S.iterT forall {f :: * -> *} {b}.
(Functor f, Semigroup b) =>
Of b (f b) -> f b
g forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a b. a -> b -> a
const forall a. Monoid a => a
mempty)
where g :: Of b (f b) -> f b
g (b
a S.:> f b
ma) = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (b
a forall a. Semigroup a => a -> a -> a
<>) f b
ma
concatStream :: (Monad m, Monoid a) => StreamResult m a -> m a
concatStream :: forall (m :: * -> *) a.
(Monad m, Monoid a) =>
StreamResult m a -> m a
concatStream = forall (m :: * -> *) a.
(Monad m, Monoid a) =>
Stream (Of a) m () -> m a
concatStreaming forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) d. StreamResult m d -> Stream (Of d) m ()
unRes
concatStreamFold
:: Monoid b => FL.Fold a (StreamResult Identity b) -> FL.Fold a b
concatStreamFold :: forall b a.
Monoid b =>
Fold a (StreamResult Identity b) -> Fold a b
concatStreamFold = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a. Identity a -> a
S.runIdentity forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
(Monad m, Monoid a) =>
StreamResult m a -> m a
concatStream)
concatStreamFoldM
:: (Monad m, Monoid b) => FL.FoldM m a (StreamResult m b) -> FL.FoldM m a b
concatStreamFoldM :: forall (m :: * -> *) b a.
(Monad m, Monoid b) =>
FoldM m a (StreamResult m b) -> FoldM m a b
concatStreamFoldM = forall (m :: * -> *) a b x.
Monad m =>
(a -> m b) -> FoldM m x a -> FoldM m x b
MRC.postMapM forall (m :: * -> *) a.
(Monad m, Monoid a) =>
StreamResult m a -> m a
concatStream
streamingEngine
:: (Foldable g, Functor g)
=> ( forall z r
. Stream (Of (k, z)) Identity r
-> Stream (Of (k, g z)) Identity r
)
-> MRE.MapReduceFold y k c (StreamResult Identity) x d
streamingEngine :: forall (g :: * -> *) k y c x d.
(Foldable g, Functor g) =>
(forall z r.
Stream (Of (k, z)) Identity r -> Stream (Of (k, g z)) Identity r)
-> MapReduceFold y k c (StreamResult Identity) x d
streamingEngine forall z r.
Stream (Of (k, z)) Identity r -> Stream (Of (k, g z)) Identity r
groupByKey Unpack x y
u (MRC.Assign y -> (k, c)
a) Reduce k c d
r = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (m :: * -> *) d. Stream (Of d) m () -> StreamResult m d
StreamResult forall a b. (a -> b) -> a -> b
$ forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
(forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (m :: * -> *) a r.
Monad m =>
a -> Stream (Of a) m r -> Stream (Of a) m r
S.cons)
(forall (m :: * -> *) a. Monad m => a -> m a
return ())
( forall (m :: * -> *) a b r.
Monad m =>
(a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
S.map (\(k
k, g c
lc) -> forall (h :: * -> *) k x d.
(Foldable h, Functor h) =>
Reduce k x d -> k -> h x -> d
MRE.reduceFunction Reduce k c d
r k
k g c
lc)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall z r.
Stream (Of (k, z)) Identity r -> Stream (Of (k, g z)) Identity r
groupByKey
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b r.
Monad m =>
(a -> b) -> Stream (Of a) m r -> Stream (Of b) m r
S.map y -> (k, c)
a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall x y r.
Unpack x y -> Stream (Of x) Identity r -> Stream (Of y) Identity r
unpackStream Unpack x y
u
)
{-# INLINABLE streamingEngine #-}
streamingEngineM
:: (Monad m, Traversable g)
=> (forall z r . Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r)
-> MRE.MapReduceFoldM m y k c (StreamResult m) x d
streamingEngineM :: forall (m :: * -> *) (g :: * -> *) k y c x d.
(Monad m, Traversable g) =>
(forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r)
-> MapReduceFoldM m y k c (StreamResult m) x d
streamingEngineM forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r
groupByKey UnpackM m x y
u (MRC.AssignM y -> m (k, c)
a) ReduceM m k c d
r =
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (m :: * -> *) d. Stream (Of d) m () -> StreamResult m d
StreamResult forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
FL.generalize forall a b. (a -> b) -> a -> b
$ forall a b x. (x -> a -> x) -> x -> (x -> b) -> Fold a b
FL.Fold
(forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (m :: * -> *) a r.
Monad m =>
a -> Stream (Of a) m r -> Stream (Of a) m r
S.cons)
(forall (m :: * -> *) a. Monad m => a -> m a
return ())
( forall (m :: * -> *) a b r.
Monad m =>
(a -> m b) -> Stream (Of a) m r -> Stream (Of b) m r
S.mapM (\(k
k, g c
lc) -> forall (h :: * -> *) (m :: * -> *) k x d.
(Traversable h, Monad m) =>
ReduceM m k x d -> k -> h x -> m d
MRE.reduceFunctionM ReduceM m k c d
r k
k g c
lc)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall z r. Stream (Of (k, z)) m r -> Stream (Of (k, g z)) m r
groupByKey
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b r.
Monad m =>
(a -> m b) -> Stream (Of a) m r -> Stream (Of b) m r
S.mapM y -> m (k, c)
a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) x y r.
Monad m =>
UnpackM m x y -> Stream (Of x) m r -> Stream (Of y) m r
unpackStreamM UnpackM m x y
u
)
{-# INLINABLE streamingEngineM #-}