{-# LANGUAGE DataKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
module Control.MapReduce.Engines.ParallelList
(
parallelListEngine
, NFData
)
where
import qualified Control.MapReduce.Core as MRC
import qualified Control.MapReduce.Engines as MRE
import qualified Control.MapReduce.Engines.List
as MRL
import qualified Control.Foldl as FL
import qualified Data.List as L
import qualified Data.List.Split as L
import qualified Control.Parallel.Strategies as PS
import Control.Parallel.Strategies ( NFData )
parallelListEngine
:: forall g y k c x d
. (NFData k, NFData c, NFData d, Foldable g, Functor g)
=> Int
-> ([(k, c)] -> [(k, g c)])
-> MRE.MapReduceFold y k c [] x d
parallelListEngine numThreads groupByKey u (MRC.Assign a) r =
let chunkedF :: FL.Fold x [[x]] =
fmap (L.divvy numThreads numThreads) FL.list
mappedF :: FL.Fold x [(k, c)] =
fmap (L.concat . parMapEach (fmap a . MRL.unpackList u)) chunkedF
groupedF :: FL.Fold x [(k, g c)] = fmap groupByKey mappedF
reducedF :: FL.Fold x [d] = fmap
( L.concat
. parMapEach (fmap (uncurry $ MRE.reduceFunction r))
. L.divvy numThreads numThreads
)
groupedF
in reducedF
parMapEach :: PS.NFData b => (a -> b) -> [a] -> [b]
parMapEach = PS.parMap (PS.rparWith PS.rdeepseq)
{-# INLINABLE parMapEach #-}