{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE DeriveGeneric         #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE OverloadedStrings     #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TemplateHaskell       #-}
{-# LANGUAGE TypeApplications      #-}
{-# LANGUAGE TypeOperators         #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE PolyKinds             #-}
{-# LANGUAGE TypeFamilies          #-}
{-# LANGUAGE ConstraintKinds       #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE AllowAmbiguousTypes   #-}
{-# LANGUAGE InstanceSigs          #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-|
Module      : Frames.Monomorphic.MapReduce
Description : Helpers for using the map-reduce-folds package with Frames.  Monomorphic in record and interpretation functor.
Copyright   : (c) Adam Conner-Sax 2019
License     : BSD-3-Clause
Maintainer  : adam_conner_sax@yahoo.com
Stability   : experimental

Frames-map-reduce provides helper functions for using <https://hackage.haskell.org/package/map-reduce-folds-0.1.0.0 map-reduce-folds>
with <http://hackage.haskell.org/package/Frames Frames>.  Please see those packages for more details.
-}
module Frames.MapReduce
  (
    -- * Unpackers
    unpackFilterRow
  , unpackFilterOnField
  , unpackGoodRows

    -- * Assigners
  , assignKeysAndData
  , assignKeys
  , splitOnKeys
  , splitOnData

  -- * Reduce and Re-Attach Key Cols
  , reduceAndAddKey
  , foldAndAddKey

  -- * Re-Attach Key Cols
  , makeRecsWithKey
  , makeRecsWithKeyM

  -- * Re-Exports
  , module Control.MapReduce
  )
where

import qualified Control.MapReduce             as MR
import           Control.MapReduce                 -- for re-export

import qualified Control.Foldl                 as FL
import qualified Data.Hashable                 as Hash

import qualified Frames                        as F
import qualified Frames.Melt                   as F
import qualified Frames.InCore                 as FI
import qualified Data.Vinyl                    as V
import qualified Data.Vinyl.TypeLevel          as V

-- | This is only here so we can use hash maps for the grouping step.  This should properly be in Frames itself.
instance Hash.Hashable (F.Record '[]) where
  hash :: Record '[] -> Int
hash = Int -> Record '[] -> Int
forall a b. a -> b -> a
const Int
0
  {-# INLINABLE hash #-}
  hashWithSalt :: Int -> Record '[] -> Int
hashWithSalt Int
s = Int -> Record '[] -> Int
forall a b. a -> b -> a
const Int
s -- TODO: this seems BAD! Or not?
  {-# INLINABLE hashWithSalt #-}

instance (V.KnownField t, Hash.Hashable (V.Snd t), Hash.Hashable (F.Record rs), rs F.⊆ (t ': rs)) => Hash.Hashable (F.Record (t ': rs)) where
  hashWithSalt :: Int -> Record (t : rs) -> Int
hashWithSalt Int
s Record (t : rs)
r = Int
s Int -> Snd t -> Int
forall a. Hashable a => Int -> a -> Int
`Hash.hashWithSalt` (Record (t : rs) -> Snd t
forall (t :: (Symbol, *)) (s :: Symbol) a (rs :: [(Symbol, *)]).
(t ~ '(s, a), t ∈ rs) =>
Record rs -> a
F.rgetField @t Record (t : rs)
r) Int -> Record rs -> Int
forall a. Hashable a => Int -> a -> Int
`Hash.hashWithSalt` (Record (t : rs) -> Record rs
forall k1 k2 (rs :: [k1]) (ss :: [k1]) (f :: k2 -> *)
       (record :: (k2 -> *) -> [k1] -> *) (is :: [Nat]).
(RecSubset record rs ss is, RecSubsetFCtx record f) =>
record f ss -> record f rs
F.rcast @rs Record (t : rs)
r)
  {-# INLINABLE hashWithSalt #-}

-- | Filter records using a function on the entire record. 
unpackFilterRow
  :: (F.Record rs -> Bool) -> MR.Unpack (F.Record rs) (F.Record rs)
unpackFilterRow :: (Record rs -> Bool) -> Unpack (Record rs) (Record rs)
unpackFilterRow Record rs -> Bool
test = (Record rs -> Bool) -> Unpack (Record rs) (Record rs)
forall x. (x -> Bool) -> Unpack x x
MR.Filter Record rs -> Bool
test

-- | Filter records based on a condition on only one field in the row.  Will usually require a Type Application to indicate which field.
unpackFilterOnField
  :: forall t rs
   . (V.KnownField t, F.ElemOf rs t)
  => (V.Snd t -> Bool)
  -> MR.Unpack (F.Record rs) (F.Record rs)
unpackFilterOnField :: (Snd t -> Bool) -> Unpack (Record rs) (Record rs)
unpackFilterOnField Snd t -> Bool
test = (Record rs -> Bool) -> Unpack (Record rs) (Record rs)
forall (rs :: [(Symbol, *)]).
(Record rs -> Bool) -> Unpack (Record rs) (Record rs)
unpackFilterRow (Snd t -> Bool
test (Snd t -> Bool) -> (Record rs -> Snd t) -> Record rs -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (s :: Symbol) a (rs :: [(Symbol, *)]).
(t ~ '(s, a), t ∈ rs) =>
Record rs -> a
forall (t :: (Symbol, *)) (s :: Symbol) a (rs :: [(Symbol, *)]).
(t ~ '(s, a), t ∈ rs) =>
Record rs -> a
F.rgetField @t)

-- | An unpack step which specifies a subset of columns, cs, (via a type-application) and then filters a @Rec (Maybe :. Elfield) rs@
-- to only rows which have all good data in that subset.
unpackGoodRows
  :: forall cs rs
   . (cs F.⊆ rs)
  => MR.Unpack (F.Rec (Maybe F.:. F.ElField) rs) (F.Record cs)
unpackGoodRows :: Unpack (Rec (Maybe :. ElField) rs) (Record cs)
unpackGoodRows = (Rec (Maybe :. ElField) rs -> Maybe (Record cs))
-> Unpack (Rec (Maybe :. ElField) rs) (Record cs)
forall (g :: * -> *) x y. Traversable g => (x -> g y) -> Unpack x y
MR.Unpack ((Rec (Maybe :. ElField) rs -> Maybe (Record cs))
 -> Unpack (Rec (Maybe :. ElField) rs) (Record cs))
-> (Rec (Maybe :. ElField) rs -> Maybe (Record cs))
-> Unpack (Rec (Maybe :. ElField) rs) (Record cs)
forall a b. (a -> b) -> a -> b
$ Rec (Maybe :. ElField) cs -> Maybe (Record cs)
forall (cs :: [(Symbol, *)]).
Rec (Maybe :. ElField) cs -> Maybe (Record cs)
F.recMaybe (Rec (Maybe :. ElField) cs -> Maybe (Record cs))
-> (Rec (Maybe :. ElField) rs -> Rec (Maybe :. ElField) cs)
-> Rec (Maybe :. ElField) rs
-> Maybe (Record cs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Rec (Maybe :. ElField) rs -> Rec (Maybe :. ElField) cs
forall k1 k2 (rs :: [k1]) (ss :: [k1]) (f :: k2 -> *)
       (record :: (k2 -> *) -> [k1] -> *) (is :: [Nat]).
(RecSubset record rs ss is, RecSubsetFCtx record f) =>
record f ss -> record f rs
F.rcast

-- | Assign both keys and data cols.  Uses type applications to specify them if they cannot be inferred.
-- Keys usually can't. Data sometimes can.
assignKeysAndData
  :: forall ks cs rs
   . (ks F.⊆ rs, cs F.⊆ rs)
  => MR.Assign (F.Record ks) (F.Record rs) (F.Record cs)
assignKeysAndData :: Assign (Record ks) (Record rs) (Record cs)
assignKeysAndData = (Record rs -> Record ks)
-> (Record rs -> Record cs)
-> Assign (Record ks) (Record rs) (Record cs)
forall k y c. (y -> k) -> (y -> c) -> Assign k y c
MR.assign (forall (ss :: [(Symbol, *)]) (f :: (Symbol, *) -> *)
       (record :: ((Symbol, *) -> *) -> [(Symbol, *)] -> *) (is :: [Nat]).
(RecSubset record ks ss is, RecSubsetFCtx record f) =>
record f ss -> record f ks
forall k1 k2 (rs :: [k1]) (ss :: [k1]) (f :: k2 -> *)
       (record :: (k2 -> *) -> [k1] -> *) (is :: [Nat]).
(RecSubset record rs ss is, RecSubsetFCtx record f) =>
record f ss -> record f rs
F.rcast @ks) (forall (ss :: [(Symbol, *)]) (f :: (Symbol, *) -> *)
       (record :: ((Symbol, *) -> *) -> [(Symbol, *)] -> *) (is :: [Nat]).
(RecSubset record cs ss is, RecSubsetFCtx record f) =>
record f ss -> record f cs
forall k1 k2 (rs :: [k1]) (ss :: [k1]) (f :: k2 -> *)
       (record :: (k2 -> *) -> [k1] -> *) (is :: [Nat]).
(RecSubset record rs ss is, RecSubsetFCtx record f) =>
record f ss -> record f rs
F.rcast @cs)
{-# INLINABLE assignKeysAndData #-}

-- | Assign keys and leave all columns, including the keys, in the data passed to reduce.
assignKeys
  :: forall ks rs
   . (ks F.⊆ rs)
  => MR.Assign (F.Record ks) (F.Record rs) (F.Record rs)
assignKeys :: Assign (Record ks) (Record rs) (Record rs)
assignKeys = (Record rs -> Record ks)
-> (Record rs -> Record rs)
-> Assign (Record ks) (Record rs) (Record rs)
forall k y c. (y -> k) -> (y -> c) -> Assign k y c
MR.assign (forall (ss :: [(Symbol, *)]) (f :: (Symbol, *) -> *)
       (record :: ((Symbol, *) -> *) -> [(Symbol, *)] -> *) (is :: [Nat]).
(RecSubset record ks ss is, RecSubsetFCtx record f) =>
record f ss -> record f ks
forall k1 k2 (rs :: [k1]) (ss :: [k1]) (f :: k2 -> *)
       (record :: (k2 -> *) -> [k1] -> *) (is :: [Nat]).
(RecSubset record rs ss is, RecSubsetFCtx record f) =>
record f ss -> record f rs
F.rcast @ks) Record rs -> Record rs
forall a. a -> a
id
{-# INLINABLE assignKeys #-}

-- | Assign keys and leave the rest of the columns, excluding the keys, in the data passed to reduce.
splitOnKeys
  :: forall ks rs cs
   . (ks F.⊆ rs, cs ~ F.RDeleteAll ks rs, cs F.⊆ rs)
  => MR.Assign (F.Record ks) (F.Record rs) (F.Record cs)
splitOnKeys :: Assign (Record ks) (Record rs) (Record cs)
splitOnKeys = forall (rs :: [(Symbol, *)]).
(ks ⊆ rs, cs ⊆ rs) =>
Assign (Record ks) (Record rs) (Record cs)
forall (ks :: [(Symbol, *)]) (cs :: [(Symbol, *)])
       (rs :: [(Symbol, *)]).
(ks ⊆ rs, cs ⊆ rs) =>
Assign (Record ks) (Record rs) (Record cs)
assignKeysAndData @ks @cs
{-# INLINABLE splitOnKeys #-}

-- | Assign data and leave the rest of the columns, excluding the data, as the key.
splitOnData
  :: forall cs rs ks
   . (cs F.⊆ rs, ks ~ F.RDeleteAll cs rs, ks F.⊆ rs)
  => MR.Assign (F.Record ks) (F.Record rs) (F.Record cs)
splitOnData :: Assign (Record ks) (Record rs) (Record cs)
splitOnData = forall (rs :: [(Symbol, *)]).
(ks ⊆ rs, cs ⊆ rs) =>
Assign (Record ks) (Record rs) (Record cs)
forall (ks :: [(Symbol, *)]) (cs :: [(Symbol, *)])
       (rs :: [(Symbol, *)]).
(ks ⊆ rs, cs ⊆ rs) =>
Assign (Record ks) (Record rs) (Record cs)
assignKeysAndData @ks @cs
{-# INLINABLE splitOnData #-}

-- | Reduce the data to a single row and then re-attach the key.
reduceAndAddKey
  :: forall ks cs x
   . FI.RecVec ((ks V.++ cs))
  => (forall h . Foldable h => h x -> F.Record cs) -- ^ reduction step
  -> MR.Reduce (F.Record ks) x (F.FrameRec (ks V.++ cs))
reduceAndAddKey :: (forall (h :: * -> *). Foldable h => h x -> Record cs)
-> Reduce (Record ks) x (FrameRec (ks ++ cs))
reduceAndAddKey forall (h :: * -> *). Foldable h => h x -> Record cs
process =
  (Record (ks ++ cs) -> FrameRec (ks ++ cs))
-> Reduce (Record ks) x (Record (ks ++ cs))
-> Reduce (Record ks) x (FrameRec (ks ++ cs))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([Record (ks ++ cs)] -> FrameRec (ks ++ cs)
forall (f :: * -> *) (rs :: [(Symbol, *)]).
(Foldable f, RecVec rs) =>
f (Record rs) -> Frame (Record rs)
F.toFrame ([Record (ks ++ cs)] -> FrameRec (ks ++ cs))
-> (Record (ks ++ cs) -> [Record (ks ++ cs)])
-> Record (ks ++ cs)
-> FrameRec (ks ++ cs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Applicative [] => a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure @[]) (Reduce (Record ks) x (Record (ks ++ cs))
 -> Reduce (Record ks) x (FrameRec (ks ++ cs)))
-> Reduce (Record ks) x (Record (ks ++ cs))
-> Reduce (Record ks) x (FrameRec (ks ++ cs))
forall a b. (a -> b) -> a -> b
$ (forall (h :: * -> *). (Foldable h, Functor h) => h x -> Record cs)
-> (Record ks -> Record cs -> Record (ks ++ cs))
-> Reduce (Record ks) x (Record (ks ++ cs))
forall x y k z.
(forall (h :: * -> *). (Foldable h, Functor h) => h x -> y)
-> (k -> y -> z) -> Reduce k x z
MR.processAndLabel forall (h :: * -> *). Foldable h => h x -> Record cs
forall (h :: * -> *). (Foldable h, Functor h) => h x -> Record cs
process Record ks -> Record cs -> Record (ks ++ cs)
forall k (f :: k -> *) (as :: [k]) (bs :: [k]).
Rec f as -> Rec f bs -> Rec f (as ++ bs)
V.rappend
{-# INLINABLE reduceAndAddKey #-}

-- | Reduce by folding the data to a single row and then re-attaching the key.
foldAndAddKey
  :: (FI.RecVec ((ks V.++ cs)))
  => FL.Fold x (F.Record cs) -- ^ reduction fold
  -> MR.Reduce (F.Record ks) x (F.FrameRec (ks V.++ cs))
foldAndAddKey :: Fold x (Record cs) -> Reduce (Record ks) x (FrameRec (ks ++ cs))
foldAndAddKey Fold x (Record cs)
fld = (Record (ks ++ cs) -> FrameRec (ks ++ cs))
-> Reduce (Record ks) x (Record (ks ++ cs))
-> Reduce (Record ks) x (FrameRec (ks ++ cs))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([Record (ks ++ cs)] -> FrameRec (ks ++ cs)
forall (f :: * -> *) (rs :: [(Symbol, *)]).
(Foldable f, RecVec rs) =>
f (Record rs) -> Frame (Record rs)
F.toFrame ([Record (ks ++ cs)] -> FrameRec (ks ++ cs))
-> (Record (ks ++ cs) -> [Record (ks ++ cs)])
-> Record (ks ++ cs)
-> FrameRec (ks ++ cs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Applicative [] => a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure @[]) (Reduce (Record ks) x (Record (ks ++ cs))
 -> Reduce (Record ks) x (FrameRec (ks ++ cs)))
-> Reduce (Record ks) x (Record (ks ++ cs))
-> Reduce (Record ks) x (FrameRec (ks ++ cs))
forall a b. (a -> b) -> a -> b
$ Fold x (Record cs)
-> (Record ks -> Record cs -> Record (ks ++ cs))
-> Reduce (Record ks) x (Record (ks ++ cs))
forall x y k z. Fold x y -> (k -> y -> z) -> Reduce k x z
MR.foldAndLabel Fold x (Record cs)
fld Record ks -> Record cs -> Record (ks ++ cs)
forall k (f :: k -> *) (as :: [k]) (bs :: [k]).
Rec f as -> Rec f bs -> Rec f (as ++ bs)
V.rappend  -- is Frame a reasonably fast thing for many appends?
{-# INLINABLE foldAndAddKey #-}

-- | Transform a reduce which produces a container of results, with a function from each result to a record,
-- into a reduce which produces a FrameRec of the result records with the key re-attached.
makeRecsWithKey
  :: (Functor g, Foldable g, (FI.RecVec (ks V.++ as)))
  => (y -> F.Record as) -- ^ map a result to a record
  -> MR.Reduce (F.Record ks) x (g y) -- ^ original reduce
  -> MR.Reduce (F.Record ks) x (F.FrameRec (ks V.++ as))
makeRecsWithKey :: (y -> Record as)
-> Reduce (Record ks) x (g y)
-> Reduce (Record ks) x (FrameRec (ks ++ as))
makeRecsWithKey y -> Record as
makeRec Reduce (Record ks) x (g y)
reduceToY = (g (Record (ks ++ as)) -> FrameRec (ks ++ as))
-> Reduce (Record ks) x (g (Record (ks ++ as)))
-> Reduce (Record ks) x (FrameRec (ks ++ as))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap g (Record (ks ++ as)) -> FrameRec (ks ++ as)
forall (f :: * -> *) (rs :: [(Symbol, *)]).
(Foldable f, RecVec rs) =>
f (Record rs) -> Frame (Record rs)
F.toFrame
  (Reduce (Record ks) x (g (Record (ks ++ as)))
 -> Reduce (Record ks) x (FrameRec (ks ++ as)))
-> Reduce (Record ks) x (g (Record (ks ++ as)))
-> Reduce (Record ks) x (FrameRec (ks ++ as))
forall a b. (a -> b) -> a -> b
$ (Record ks -> g y -> g (Record (ks ++ as)))
-> Reduce (Record ks) x (g y)
-> Reduce (Record ks) x (g (Record (ks ++ as)))
forall k y z x. (k -> y -> z) -> Reduce k x y -> Reduce k x z
MR.reduceMapWithKey Record ks -> g y -> g (Record (ks ++ as))
addKey Reduce (Record ks) x (g y)
reduceToY
  where addKey :: Record ks -> g y -> g (Record (ks ++ as))
addKey Record ks
k = (y -> Record (ks ++ as)) -> g y -> g (Record (ks ++ as))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Record ks -> Record as -> Record (ks ++ as)
forall k (f :: k -> *) (as :: [k]) (bs :: [k]).
Rec f as -> Rec f bs -> Rec f (as ++ bs)
V.rappend Record ks
k (Record as -> Record (ks ++ as))
-> (y -> Record as) -> y -> Record (ks ++ as)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. y -> Record as
makeRec)
{-# INLINABLE makeRecsWithKey #-}

-- | Transform an effectful reduce which produces a container of results, with a function from each result to a record,
-- into a reduce which produces a FrameRec of the result records with the key re-attached.
makeRecsWithKeyM
  :: (Monad m, Functor g, Foldable g, (FI.RecVec (ks V.++ as)))
  => (y -> F.Record as) -- ^ map a result to a record
  -> MR.ReduceM m (F.Record ks) x (g y) -- ^ original reduce
  -> MR.ReduceM m (F.Record ks) x (F.FrameRec (ks V.++ as))
makeRecsWithKeyM :: (y -> Record as)
-> ReduceM m (Record ks) x (g y)
-> ReduceM m (Record ks) x (FrameRec (ks ++ as))
makeRecsWithKeyM y -> Record as
makeRec ReduceM m (Record ks) x (g y)
reduceToY = (g (Record (ks ++ as)) -> FrameRec (ks ++ as))
-> ReduceM m (Record ks) x (g (Record (ks ++ as)))
-> ReduceM m (Record ks) x (FrameRec (ks ++ as))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap g (Record (ks ++ as)) -> FrameRec (ks ++ as)
forall (f :: * -> *) (rs :: [(Symbol, *)]).
(Foldable f, RecVec rs) =>
f (Record rs) -> Frame (Record rs)
F.toFrame
  (ReduceM m (Record ks) x (g (Record (ks ++ as)))
 -> ReduceM m (Record ks) x (FrameRec (ks ++ as)))
-> ReduceM m (Record ks) x (g (Record (ks ++ as)))
-> ReduceM m (Record ks) x (FrameRec (ks ++ as))
forall a b. (a -> b) -> a -> b
$ (Record ks -> g y -> g (Record (ks ++ as)))
-> ReduceM m (Record ks) x (g y)
-> ReduceM m (Record ks) x (g (Record (ks ++ as)))
forall k y z (m :: * -> *) x.
(k -> y -> z) -> ReduceM m k x y -> ReduceM m k x z
MR.reduceMMapWithKey Record ks -> g y -> g (Record (ks ++ as))
addKey ReduceM m (Record ks) x (g y)
reduceToY
  where addKey :: Record ks -> g y -> g (Record (ks ++ as))
addKey Record ks
k = (y -> Record (ks ++ as)) -> g y -> g (Record (ks ++ as))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Record ks -> Record as -> Record (ks ++ as)
forall k (f :: k -> *) (as :: [k]) (bs :: [k]).
Rec f as -> Rec f bs -> Rec f (as ++ bs)
V.rappend Record ks
k (Record as -> Record (ks ++ as))
-> (y -> Record as) -> y -> Record (ks ++ as)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. y -> Record as
makeRec)
{-# INLINABLE makeRecsWithKeyM #-}