{-# LANGUAGE AllowAmbiguousTypes   #-}
{-# LANGUAGE ConstraintKinds       #-}
{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE DeriveGeneric         #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE InstanceSigs          #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings     #-}
{-# LANGUAGE PolyKinds             #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TemplateHaskell       #-}
{-# LANGUAGE TypeApplications      #-}
{-# LANGUAGE TypeOperators         #-}
{-# LANGUAGE TypeFamilies          #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
{-|
Module      : Frames.Aggregation
Description : A specialised Map/Reduce for aggregating one set of keys to a smaller one given some operation to merge data.
Copyright   : (c) Adam Conner-Sax 2019
License     : BSD
Maintainer  : adam_conner_sax@yahoo.com
Stability   : experimental

Frames.Aggregation.General contains types and functions to support a specific map/reduce operation.  Frequently, data is given
with more specificity than required for downstream operations.  Perhaps an age is given in years and we only need to know the
age-band.  Assuming we know how to aggregagte data columns, we want to perform that aggregation on all the subsets required to
build the data-set with the simpler key, while perhaps leaving some other columns alone.  @aggregateFold@ does this.
-}
module Frames.Aggregation
  (
    -- * Types
    RecordKeyMap
    -- * Constraint Helpers
  , AggregateAllC
  , AggregateC
  , CombineKeyAggregationsC
    -- * Aggregation Function combinators
  , combineKeyAggregations
  , keyMap
    -- * aggregationFolds
  , aggregateAllFold
  , aggregateFold
  , mergeDataFolds
  )
where

import qualified Control.MapReduce             as MR
import qualified Frames.MapReduce              as FMR

import qualified Control.Foldl                 as FL

import qualified Frames                        as F
import qualified Frames.Melt                   as F
import qualified Frames.InCore                 as FI
import qualified Data.Vinyl                    as V
import qualified Data.Vinyl.TypeLevel          as V

-- | Type-alias for key aggregation functions.
type RecordKeyMap k k' = F.Record k -> F.Record k'

type CombineKeyAggregationsC a a' b b' = (a F.⊆ (a V.++ b), b F.⊆ (a V.++ b), F.Disjoint a' b' ~ 'True)

-- | Combine 2 key aggregation functions over disjoint columns.
combineKeyAggregations
  :: forall a a' b b'.CombineKeyAggregationsC a a' b b'
  => RecordKeyMap a a'
  -> RecordKeyMap b b'
  -> RecordKeyMap (a V.++ b) (a' V.++ b')
combineKeyAggregations :: RecordKeyMap a a'
-> RecordKeyMap b b' -> RecordKeyMap (a ++ b) (a' ++ b')
combineKeyAggregations RecordKeyMap a a'
aToa' RecordKeyMap b b'
bTob' Record (a ++ b)
r =
  RecordKeyMap a a'
aToa' (Record (a ++ b) -> Rec ElField a
forall k1 k2 (rs :: [k1]) (ss :: [k1]) (f :: k2 -> *)
       (record :: (k2 -> *) -> [k1] -> *) (is :: [Nat]).
(RecSubset record rs ss is, RecSubsetFCtx record f) =>
record f ss -> record f rs
F.rcast Record (a ++ b)
r) Rec ElField a' -> Rec ElField b' -> Rec ElField (a' ++ b')
forall k (f :: k -> *) (as :: [k]) (bs :: [k]).
Rec f as -> Rec f bs -> Rec f (as ++ bs)
`V.rappend` RecordKeyMap b b'
bTob' (Record (a ++ b) -> Rec ElField b
forall k1 k2 (rs :: [k1]) (ss :: [k1]) (f :: k2 -> *)
       (record :: (k2 -> *) -> [k1] -> *) (is :: [Nat]).
(RecSubset record rs ss is, RecSubsetFCtx record f) =>
record f ss -> record f rs
F.rcast Record (a ++ b)
r)

-- | Promote an ordinary function @a -> b@ to a @RecordKeyMap aCol bCol@ where
-- @aCol@ holds values of type @a@ and @bCol@ holds values of type @b@.
keyMap
  :: forall a b
   . (V.KnownField a, V.KnownField b)
  => (V.Snd a -> V.Snd b)
  -> RecordKeyMap '[a] '[b]
keyMap :: (Snd a -> Snd b) -> RecordKeyMap '[a] '[b]
keyMap Snd a -> Snd b
f Record '[a]
r = Snd a -> Snd b
f (Record '[a] -> Snd a
forall (t :: (Symbol, *)) (s :: Symbol) a (rs :: [(Symbol, *)]).
(t ~ '(s, a), t ∈ rs) =>
Record rs -> a
F.rgetField @a Record '[a]
r) Snd b -> Record '[] -> Record '[Fst b :-> Snd b]
forall (s :: Symbol) a (rs :: [(Symbol, *)]).
KnownSymbol s =>
a -> Record rs -> Record ((s :-> a) : rs)
F.&: Record '[]
forall u (a :: u -> *). Rec a '[]
V.RNil

type AggregateAllC ak ak' d = ((ak' V.++ d) F.⊆ ((ak V.++ d) V.++ ak')
                              , ak F.⊆ (ak V.++ d)
                              , ak' F.⊆ (ak' V.++ d)
                              , d F.⊆ (ak' V.++ d)
                              , Ord (F.Record ak')
                              , Ord (F.Record ak)
                              , FI.RecVec (ak' V.++ d)
                              )

-- | Given some group keys in columns k,
-- some keys to aggregate over in columns ak,
-- some keys to aggregate into in (new) columns ak',
-- a (hopefully surjective) map from records of ak to records of ak',
-- and a fold over the data, in columns d, aggregating over the rows
-- where ak was distinct but ak' is not,
-- produce a fold to transform data keyed by k and ak to data keyed
-- by k and ak' with appropriate aggregations done in the d.
-- E.g., suppose you have voter turnout data for all 50 states in the US,
-- keyed by state and age of voter in years.  The data is two columns:
-- total votes cast and turnout as a percentage.
-- You want to aggregate the ages into two bands, over and under some age.
-- So your k is the state column, ak is the age column, ak' is a new column with
-- data type to indicate over/under.  The Fold has to sum over the total votes and
-- perform a weighted-sum over the percentages.
aggregateAllFold
  :: forall ak ak' d
   . ( (ak' V.++ d) F.⊆ ((ak V.++ d) V.++ ak')
     , ak F.⊆ (ak V.++ d)
     , ak' F.⊆ (ak' V.++ d)
     , d F.⊆ (ak' V.++ d)
     , Ord (F.Record ak')
     , Ord (F.Record ak)
     , FI.RecVec (ak' V.++ d)
     )
  => RecordKeyMap ak ak' -- ^ get aggregated key from key
  -> (FL.Fold (F.Record d) (F.Record d)) -- ^ aggregate data
  -> FL.Fold (F.Record (ak V.++ d)) (F.FrameRec (ak' V.++ d))
aggregateAllFold :: RecordKeyMap ak ak'
-> Fold (Record d) (Record d)
-> Fold (Record (ak ++ d)) (FrameRec (ak' ++ d))
aggregateAllFold RecordKeyMap ak ak'
toAggKey Fold (Record d) (Record d)
aggDataF =
  let aggUnpack :: Unpack (Record (ak ++ d)) (Record (ak' ++ d))
aggUnpack =
        (Record (ak ++ d) -> [Record (ak' ++ d)])
-> Unpack (Record (ak ++ d)) (Record (ak' ++ d))
forall (g :: * -> *) x y. Traversable g => (x -> g y) -> Unpack x y
MR.Unpack
          (\Record (ak ++ d)
r -> [forall (ss :: [(Symbol, *)]) (f :: (Symbol, *) -> *)
       (record :: ((Symbol, *) -> *) -> [(Symbol, *)] -> *) (is :: [Nat]).
(RecSubset record (ak' ++ d) ss is, RecSubsetFCtx record f) =>
record f ss -> record f (ak' ++ d)
forall k1 k2 (rs :: [k1]) (ss :: [k1]) (f :: k2 -> *)
       (record :: (k2 -> *) -> [k1] -> *) (is :: [Nat]).
(RecSubset record rs ss is, RecSubsetFCtx record f) =>
record f ss -> record f rs
F.rcast @(ak' V.++ d) (Rec ElField ((ak ++ d) ++ ak') -> Record (ak' ++ d))
-> Rec ElField ((ak ++ d) ++ ak') -> Record (ak' ++ d)
forall a b. (a -> b) -> a -> b
$ Record (ak ++ d)
r Record (ak ++ d)
-> Rec ElField ak' -> Rec ElField ((ak ++ d) ++ ak')
forall k (f :: k -> *) (as :: [k]) (bs :: [k]).
Rec f as -> Rec f bs -> Rec f (as ++ bs)
`V.rappend` (RecordKeyMap ak ak'
toAggKey (Record (ak ++ d) -> Rec ElField ak
forall k1 k2 (rs :: [k1]) (ss :: [k1]) (f :: k2 -> *)
       (record :: (k2 -> *) -> [k1] -> *) (is :: [Nat]).
(RecSubset record rs ss is, RecSubsetFCtx record f) =>
record f ss -> record f rs
F.rcast Record (ak ++ d)
r))]) -- add new keys, lose old
      aggAssign :: Assign (Rec ElField ak') (Record (ak' ++ d)) (Record d)
aggAssign = forall (rs :: [(Symbol, *)]).
(ak' ⊆ rs, d ⊆ rs) =>
Assign (Rec ElField ak') (Record rs) (Record d)
forall (ks :: [(Symbol, *)]) (cs :: [(Symbol, *)])
       (rs :: [(Symbol, *)]).
(ks ⊆ rs, cs ⊆ rs) =>
Assign (Record ks) (Record rs) (Record cs)
FMR.assignKeysAndData @ak' @d
  in  Fold (Record (ak ++ d)) [FrameRec (ak' ++ d)]
-> Fold (Record (ak ++ d)) (FrameRec (ak' ++ d))
forall d (g :: * -> *) a.
(Monoid d, Foldable g) =>
Fold a (g d) -> Fold a d
FMR.concatFold
        (Fold (Record (ak ++ d)) [FrameRec (ak' ++ d)]
 -> Fold (Record (ak ++ d)) (FrameRec (ak' ++ d)))
-> Fold (Record (ak ++ d)) [FrameRec (ak' ++ d)]
-> Fold (Record (ak ++ d)) (FrameRec (ak' ++ d))
forall a b. (a -> b) -> a -> b
$ Unpack (Record (ak ++ d)) (Record (ak' ++ d))
-> Assign (Rec ElField ak') (Record (ak' ++ d)) (Record d)
-> Reduce (Rec ElField ak') (Record d) (FrameRec (ak' ++ d))
-> Fold (Record (ak ++ d)) [FrameRec (ak' ++ d)]
forall k x y c d.
Ord k =>
Unpack x y -> Assign k y c -> Reduce k c d -> Fold x [d]
FMR.mapReduceFold Unpack (Record (ak ++ d)) (Record (ak' ++ d))
aggUnpack Assign (Rec ElField ak') (Record (ak' ++ d)) (Record d)
aggAssign (Fold (Record d) (Record d)
-> Reduce (Rec ElField ak') (Record d) (FrameRec (ak' ++ d))
forall (ks :: [(Symbol, *)]) (cs :: [(Symbol, *)]) x.
RecVec (ks ++ cs) =>
Fold x (Record cs) -> Reduce (Record ks) x (FrameRec (ks ++ cs))
FMR.foldAndAddKey Fold (Record d) (Record d)
aggDataF)

type AggregateC k ak ak' d = (CombineKeyAggregationsC k k ak ak'
                             ,AggregateAllC (k V.++ ak) (k V.++ ak') d
                             )

-- | Aggregate key columns @ak@ into @ak'@ while leaving key columns @k@ alone.
-- Allows aggregation over only some fields.  Will often require a typeapplication
-- to specify what @k@ is.
aggregateFold :: forall k ak ak' d.AggregateC k ak ak' d
  => RecordKeyMap ak ak' -- ^ get aggregated key from key
  -> (FL.Fold (F.Record d) (F.Record d)) -- ^ aggregate data
  -> FL.Fold
       (F.Record (k V.++ ak V.++ d))
       (F.FrameRec (k V.++ ak' V.++ d))
aggregateFold :: RecordKeyMap ak ak'
-> Fold (Record d) (Record d)
-> Fold (Record ((k ++ ak) ++ d)) (FrameRec ((k ++ ak') ++ d))
aggregateFold RecordKeyMap ak ak'
f = RecordKeyMap (k ++ ak) (k ++ ak')
-> Fold (Record d) (Record d)
-> Fold (Record ((k ++ ak) ++ d)) (FrameRec ((k ++ ak') ++ d))
forall (ak :: [(Symbol, *)]) (ak' :: [(Symbol, *)])
       (d :: [(Symbol, *)]).
((ak' ++ d) ⊆ ((ak ++ d) ++ ak'), ak ⊆ (ak ++ d), ak' ⊆ (ak' ++ d),
 d ⊆ (ak' ++ d), Ord (Record ak'), Ord (Record ak),
 RecVec (ak' ++ d)) =>
RecordKeyMap ak ak'
-> Fold (Record d) (Record d)
-> Fold (Record (ak ++ d)) (FrameRec (ak' ++ d))
aggregateAllFold (RecordKeyMap k k
-> RecordKeyMap ak ak' -> RecordKeyMap (k ++ ak) (k ++ ak')
forall (a :: [(Symbol, *)]) (a' :: [(Symbol, *)])
       (b :: [(Symbol, *)]) (b' :: [(Symbol, *)]).
CombineKeyAggregationsC a a' b b' =>
RecordKeyMap a a'
-> RecordKeyMap b b' -> RecordKeyMap (a ++ b) (a' ++ b')
combineKeyAggregations @k @k RecordKeyMap k k
forall a. a -> a
id RecordKeyMap ak ak'
f)


{-
aggregateFold
  :: forall k ak ak' d
   . ( (ak' V.++ d) F.⊆ ((ak V.++ d) V.++ ak')
     , ak F.⊆ (ak V.++ d)
     , ak' F.⊆ (ak' V.++ d)
     , d F.⊆ (ak' V.++ d)
     , Ord (F.Record ak')
     , FI.RecVec (ak' V.++ d)
     , Ord (F.Record ak)
     , (k V.++ (ak' V.++ d)) ~ ((k V.++ ak') V.++ d)
     , Ord (F.Record k)
     , k F.⊆ ((k V.++ ak') V.++ d)
     , k F.⊆ ((k V.++ ak) V.++ d)
     , (ak V.++ d) F.⊆ ((k V.++ ak) V.++ d)
     , FI.RecVec ((k V.++ ak') V.++ d)
     )
  => RecordKeyMap ak ak' -- ^ get aggregated key from key
  -> (FL.Fold (F.Record d) (F.Record d)) -- ^ aggregate data
  -> FL.Fold
       (F.Record (k V.++ ak V.++ d))
       (F.FrameRec (k V.++ ak' V.++ d))
aggregateFold keyAgg aggDataF = FMR.concatFold $ FMR.mapReduceFold
  MR.noUnpack
  (FMR.assignKeysAndData @k @(ak V.++ d))
  ( FMR.makeRecsWithKey id
  $ MR.ReduceFold (const $ aggregateAllFold keyAgg aggDataF)
  )
-}

mergeDataFolds
  :: FL.Fold (F.Record d) (F.Record '[a])
  -> FL.Fold (F.Record d) (F.Record '[b])
  -> FL.Fold (F.Record d) (F.Record '[a, b])
mergeDataFolds :: Fold (Record d) (Record '[a])
-> Fold (Record d) (Record '[b])
-> Fold (Record d) (Record '[a, b])
mergeDataFolds Fold (Record d) (Record '[a])
aF Fold (Record d) (Record '[b])
bF = Record '[a] -> Record '[b] -> Record '[a, b]
forall k (f :: k -> *) (as :: [k]) (bs :: [k]).
Rec f as -> Rec f bs -> Rec f (as ++ bs)
V.rappend (Record '[a] -> Record '[b] -> Record '[a, b])
-> Fold (Record d) (Record '[a])
-> Fold (Record d) (Record '[b] -> Record '[a, b])
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Fold (Record d) (Record '[a])
aF Fold (Record d) (Record '[b] -> Record '[a, b])
-> Fold (Record d) (Record '[b])
-> Fold (Record d) (Record '[a, b])
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Fold (Record d) (Record '[b])
bF