{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
module Control.MapReduce.Engines.ParallelList
(
parallelListEngine
, NFData
)
where
import qualified Control.MapReduce.Core as MRC
import qualified Control.MapReduce.Engines as MRE
import qualified Control.MapReduce.Engines.List
as MRL
import qualified Control.Foldl as FL
import qualified Data.List as L
import qualified Data.List.Split as L
import qualified Control.Parallel.Strategies as PS
import Control.Parallel.Strategies ( NFData )
parallelListEngine
:: forall g y k c x d
. (NFData k, NFData c, NFData d, Foldable g, Functor g)
=> Int
-> ([(k, c)] -> [(k, g c)])
-> MRE.MapReduceFold y k c [] x d
parallelListEngine :: Int -> ([(k, c)] -> [(k, g c)]) -> MapReduceFold y k c [] x d
parallelListEngine Int
numThreads [(k, c)] -> [(k, g c)]
groupByKey Unpack x y
u (MRC.Assign y -> (k, c)
a) Reduce k c d
r =
let Fold x [[x]]
chunkedF :: FL.Fold x [[x]] =
([x] -> [[x]]) -> Fold x [x] -> Fold x [[x]]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> Int -> [x] -> [[x]]
forall a. Int -> Int -> [a] -> [[a]]
L.divvy Int
numThreads Int
numThreads) Fold x [x]
forall a. Fold a [a]
FL.list
Fold x [(k, c)]
mappedF :: FL.Fold x [(k, c)] =
([[x]] -> [(k, c)]) -> Fold x [[x]] -> Fold x [(k, c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([[(k, c)]] -> [(k, c)]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
L.concat ([[(k, c)]] -> [(k, c)])
-> ([[x]] -> [[(k, c)]]) -> [[x]] -> [(k, c)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([x] -> [(k, c)]) -> [[x]] -> [[(k, c)]]
forall b a. NFData b => (a -> b) -> [a] -> [b]
parMapEach ((y -> (k, c)) -> [y] -> [(k, c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap y -> (k, c)
a ([y] -> [(k, c)]) -> ([x] -> [y]) -> [x] -> [(k, c)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Unpack x y -> [x] -> [y]
forall x y. Unpack x y -> [x] -> [y]
MRL.unpackList Unpack x y
u)) Fold x [[x]]
chunkedF
Fold x [(k, g c)]
groupedF :: FL.Fold x [(k, g c)] = ([(k, c)] -> [(k, g c)]) -> Fold x [(k, c)] -> Fold x [(k, g c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [(k, c)] -> [(k, g c)]
groupByKey Fold x [(k, c)]
mappedF
Fold x [d]
reducedF :: FL.Fold x [d] = ([(k, g c)] -> [d]) -> Fold x [(k, g c)] -> Fold x [d]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
( [[d]] -> [d]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
L.concat
([[d]] -> [d]) -> ([(k, g c)] -> [[d]]) -> [(k, g c)] -> [d]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([(k, g c)] -> [d]) -> [[(k, g c)]] -> [[d]]
forall b a. NFData b => (a -> b) -> [a] -> [b]
parMapEach (((k, g c) -> d) -> [(k, g c)] -> [d]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((k -> g c -> d) -> (k, g c) -> d
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry ((k -> g c -> d) -> (k, g c) -> d)
-> (k -> g c -> d) -> (k, g c) -> d
forall a b. (a -> b) -> a -> b
$ Reduce k c d -> k -> g c -> d
forall (h :: * -> *) k x d.
(Foldable h, Functor h) =>
Reduce k x d -> k -> h x -> d
MRE.reduceFunction Reduce k c d
r))
([[(k, g c)]] -> [[d]])
-> ([(k, g c)] -> [[(k, g c)]]) -> [(k, g c)] -> [[d]]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Int -> [(k, g c)] -> [[(k, g c)]]
forall a. Int -> Int -> [a] -> [[a]]
L.divvy Int
numThreads Int
numThreads
)
Fold x [(k, g c)]
groupedF
in Fold x [d]
reducedF
parMapEach :: PS.NFData b => (a -> b) -> [a] -> [b]
parMapEach :: (a -> b) -> [a] -> [b]
parMapEach = Strategy b -> (a -> b) -> [a] -> [b]
forall b a. Strategy b -> (a -> b) -> [a] -> [b]
PS.parMap (Strategy b -> Strategy b
forall a. Strategy a -> Strategy a
PS.rparWith Strategy b
forall a. NFData a => Strategy a
PS.rdeepseq)
{-# INLINABLE parMapEach #-}