{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TypeOperators         #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE PolyKinds             #-}

{-# LANGUAGE TypeFamilies          #-}
{-# LANGUAGE ConstraintKinds       #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE AllowAmbiguousTypes   #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
{-|
Module      : Control.MapReduce.Engines.ParallelList
Description : Gatherer code for map-reduce-folds
Copyright   : (c) Adam Conner-Sax 2019
License     : BSD-3-Clause
Maintainer  : adam_conner_sax@yahoo.com
Stability   : experimental

Some basic attempts at parallel map-reduce folds with lists as intermediate type.  There are multiple places we can do things in parallel.

(1) We can map (unpack/assign) in parallel.
(2) We can reduce in parallel.
(3) We can fold our intermediate monoid (in the gatherer) in parallel
(4) We can fold our result monoid in parallel

NB: This does not seem to be faster--and is often slower!--than the serial engine.  I leave it here as a starting point for improvement.
-}
module Control.MapReduce.Engines.ParallelList
  (
    -- * Parallel map/reduce fold builder
    parallelListEngine

    -- * re-exports
  , NFData
  )
where

import qualified Control.MapReduce.Core        as MRC
import qualified Control.MapReduce.Engines     as MRE
import qualified Control.MapReduce.Engines.List
                                               as MRL

import qualified Control.Foldl                 as FL
import qualified Data.List                     as L
import qualified Data.List.Split               as L

import qualified Control.Parallel.Strategies   as PS
import           Control.Parallel.Strategies    ( NFData ) -- for re-export
--


{- | Parallel map-reduce-fold list engine.  Uses the given parameters to use multiple sparks when mapping and reducing.
 Chunks the input to numThreads chunks and sparks each chunk for mapping, merges the results, groups, then uses the same chunking and merging to do the reductions.
 grouping could also be parallel but that is under the control of the given function.
-}
parallelListEngine
  :: forall g y k c x d
   . (NFData k, NFData c, NFData d, Foldable g, Functor g)
  => Int
  -> ([(k, c)] -> [(k, g c)])
  -> MRE.MapReduceFold y k c [] x d
parallelListEngine :: Int -> ([(k, c)] -> [(k, g c)]) -> MapReduceFold y k c [] x d
parallelListEngine Int
numThreads [(k, c)] -> [(k, g c)]
groupByKey Unpack x y
u (MRC.Assign y -> (k, c)
a) Reduce k c d
r =
  let Fold x [[x]]
chunkedF :: FL.Fold x [[x]] =
        ([x] -> [[x]]) -> Fold x [x] -> Fold x [[x]]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> Int -> [x] -> [[x]]
forall a. Int -> Int -> [a] -> [[a]]
L.divvy Int
numThreads Int
numThreads) Fold x [x]
forall a. Fold a [a]
FL.list
      Fold x [(k, c)]
mappedF :: FL.Fold x [(k, c)] =
        ([[x]] -> [(k, c)]) -> Fold x [[x]] -> Fold x [(k, c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([[(k, c)]] -> [(k, c)]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
L.concat ([[(k, c)]] -> [(k, c)])
-> ([[x]] -> [[(k, c)]]) -> [[x]] -> [(k, c)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([x] -> [(k, c)]) -> [[x]] -> [[(k, c)]]
forall b a. NFData b => (a -> b) -> [a] -> [b]
parMapEach ((y -> (k, c)) -> [y] -> [(k, c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap y -> (k, c)
a ([y] -> [(k, c)]) -> ([x] -> [y]) -> [x] -> [(k, c)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Unpack x y -> [x] -> [y]
forall x y. Unpack x y -> [x] -> [y]
MRL.unpackList Unpack x y
u)) Fold x [[x]]
chunkedF
      Fold x [(k, g c)]
groupedF :: FL.Fold x [(k, g c)] = ([(k, c)] -> [(k, g c)]) -> Fold x [(k, c)] -> Fold x [(k, g c)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [(k, c)] -> [(k, g c)]
groupByKey Fold x [(k, c)]
mappedF
      Fold x [d]
reducedF :: FL.Fold x [d]        = ([(k, g c)] -> [d]) -> Fold x [(k, g c)] -> Fold x [d]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap
        ( [[d]] -> [d]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
L.concat
        ([[d]] -> [d]) -> ([(k, g c)] -> [[d]]) -> [(k, g c)] -> [d]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([(k, g c)] -> [d]) -> [[(k, g c)]] -> [[d]]
forall b a. NFData b => (a -> b) -> [a] -> [b]
parMapEach (((k, g c) -> d) -> [(k, g c)] -> [d]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((k -> g c -> d) -> (k, g c) -> d
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry ((k -> g c -> d) -> (k, g c) -> d)
-> (k -> g c -> d) -> (k, g c) -> d
forall a b. (a -> b) -> a -> b
$ Reduce k c d -> k -> g c -> d
forall (h :: * -> *) k x d.
(Foldable h, Functor h) =>
Reduce k x d -> k -> h x -> d
MRE.reduceFunction Reduce k c d
r))
        ([[(k, g c)]] -> [[d]])
-> ([(k, g c)] -> [[(k, g c)]]) -> [(k, g c)] -> [[d]]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Int -> [(k, g c)] -> [[(k, g c)]]
forall a. Int -> Int -> [a] -> [[a]]
L.divvy Int
numThreads Int
numThreads
        )
        Fold x [(k, g c)]
groupedF
  in  Fold x [d]
reducedF

parMapEach :: PS.NFData b => (a -> b) -> [a] -> [b]
parMapEach :: (a -> b) -> [a] -> [b]
parMapEach = Strategy b -> (a -> b) -> [a] -> [b]
forall b a. Strategy b -> (a -> b) -> [a] -> [b]
PS.parMap (Strategy b -> Strategy b
forall a. Strategy a -> Strategy a
PS.rparWith Strategy b
forall a. NFData a => Strategy a
PS.rdeepseq)
{-# INLINABLE parMapEach #-}