{-# LANGUAGE AllowAmbiguousTypes   #-}
{-# LANGUAGE ConstraintKinds       #-}
{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE DeriveGeneric         #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE GADTs                 #-}
       {-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings     #-}
{-# LANGUAGE PolyKinds             #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TemplateHaskell       #-}
{-# LANGUAGE TypeApplications      #-}
{-# LANGUAGE TypeFamilies          #-}
{-# LANGUAGE TypeOperators         #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE UndecidableSuperClasses #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
{-|
Module      : Frames.Aggregation.Maybe
Description : A specialised Map/Reduce for aggregating one set of keys to a smaller one given some operation to merge data. 
Copyright   : (c) Adam Conner-Sax 2019
License     : BSD
Maintainer  : adam_conner_sax@yahoo.com
Stability   : experimental

Frames.Aggregation.Maybe contains types and functions to support a specific but common map/reduce operation.
Frequently, data is given with more specificity than required for downstream operations.
Perhaps an age is given in years and we only need to know the
age-band.  Assuming we know how to aggregagte data columns, we want to perform that aggregation on all the subsets required to
build the data-set with the simpler key, while perhaps leaving some other columns alone.  @aggregateFold@ does this.

This module specializes the general versions to the (Maybe :. ElField) intepretation functor since that is a frequent use case.
-}
module Frames.Aggregation.Maybe
  (
  -- * Type-alias for maps from one record key to another
    RecordKeyMap
    -- * Aggregation Function combinators
  , combineKeyAggregations
  , keyMap
    -- * aggregationFolds
  , aggregateAllFold
  , aggregateFold
  , mergeDataFolds
  )
where

import           Frames.MapReduce.General       ( RecGetFieldC(..)
                                                , RCastC(..)
                                                , IsoRec(..)
                                                )

import qualified Frames.Aggregation.General    as FA
import           Frames.Aggregation.General     ( RecordKeyMap )
import qualified Frames                        as F
import           Frames                         ( (:.) )
import qualified Frames.Melt                   as F
import qualified Data.Vinyl                    as V
import           Data.Vinyl                     ( ElField )
import qualified Data.Vinyl.TypeLevel          as V
import qualified Control.Foldl                 as FL

import           GHC.TypeLits                   ( Symbol )
import           Data.Kind                      ( Type )


-- | Combine 2 key aggregation functions over disjoint columns.
combineKeyAggregations
  :: forall (a :: [(Symbol, Type)]) b a' b' record
   . ( a F.⊆ (a V.++ b)
     , b F.⊆ (a V.++ b)
     , F.Disjoint a' b' ~ 'True
     , RCastC a (a V.++ b) record Maybe
     , RCastC b (a V.++ b) record Maybe
     , IsoRec a' record Maybe
     , IsoRec b' record Maybe
     , IsoRec (a' V.++ b') record Maybe
     )
  => RecordKeyMap record Maybe a a'
  -> RecordKeyMap record Maybe b b'
  -> RecordKeyMap record Maybe (a V.++ b) (a' V.++ b')
combineKeyAggregations :: RecordKeyMap record Maybe a a'
-> RecordKeyMap record Maybe b b'
-> RecordKeyMap record Maybe (a ++ b) (a' ++ b')
combineKeyAggregations = RecordKeyMap record Maybe a a'
-> RecordKeyMap record Maybe b b'
-> RecordKeyMap record Maybe (a ++ b) (a' ++ b')
forall (a :: [(Symbol, *)]) (b :: [(Symbol, *)])
       (a' :: [(Symbol, *)]) (b' :: [(Symbol, *)])
       (record :: ((Symbol, *) -> *) -> [(Symbol, *)] -> *) (f :: * -> *).
(a ⊆ (a ++ b), b ⊆ (a ++ b), Disjoint a' b' ~ 'True,
 RCastC a (a ++ b) record f, RCastC b (a ++ b) record f,
 IsoRec a' record f, IsoRec b' record f,
 IsoRec (a' ++ b') record f) =>
RecordKeyMap record f a a'
-> RecordKeyMap record f b b'
-> RecordKeyMap record f (a ++ b) (a' ++ b')
FA.combineKeyAggregations

-- | Promote an ordinary function @a -> b@ to a @RecordKeyMap aCol bCol@ where
-- @aCol@ holds values of type @a@ and @bCol@ holds values of type @b@.
keyMap
  :: forall a b record
   . ( V.KnownField a
     , V.KnownField b
     , RecGetFieldC a record Maybe '[a]
     , IsoRec '[b] record Maybe
     , Applicative Maybe
     )
  => (V.Snd a -> V.Snd b)
  -> RecordKeyMap record Maybe '[a] '[b]
keyMap :: (Snd a -> Snd b) -> RecordKeyMap record Maybe '[a] '[b]
keyMap = (Snd a -> Snd b) -> RecordKeyMap record Maybe '[a] '[b]
forall (a :: (Symbol, *)) (b :: (Symbol, *))
       (record :: ((Symbol, *) -> *) -> [(Symbol, *)] -> *) (f :: * -> *).
(KnownField a, KnownField b, RecGetFieldC a record f '[a],
 IsoRec '[b] record f, Applicative f) =>
(Snd a -> Snd b) -> RecordKeyMap record f '[a] '[b]
FA.keyMap

-- | Given some group keys in columns k,
-- some keys to aggregate over in columns ak,
-- some keys to aggregate into in (new) columns ak',
-- a (hopefully surjective) map from records of ak to records of ak',
-- and a fold over the data, in columns d, aggregating over the rows
-- where ak was distinct but ak' is not,
-- produce a fold to transform data keyed by k and ak to data keyed
-- by k and ak' with appropriate aggregations done in the d.
-- E.g., suppose you have voter turnout data for all 50 states in the US,
-- keyed by state and age of voter in years.  The data is two columns:
-- total votes cast and turnout as a percentage.
-- You want to aggregate the ages into two bands, over and under some age.
-- So your k is the state column, ak is the age column, ak' is a new column with
-- data type to indicate over/under.  The Fold has to sum over the total votes and
-- perform a weighted-sum over the percentages.
aggregateAllFold
  :: forall (ak :: [(Symbol, Type)]) ak' d record
   . ( (ak' V.++ d) F.⊆ ((ak V.++ d) V.++ ak')
     , ak F.⊆ (ak V.++ d)
     , ak' F.⊆ (ak' V.++ d)
     , d F.⊆ (ak' V.++ d)
     , Ord (record (Maybe :. ElField) ak')
     , Ord (record (Maybe :. ElField) ak)
     , RCastC (ak' V.++ d) ((ak V.++ d) V.++ ak') record Maybe
     , RCastC ak (ak V.++ d) record Maybe
     , RCastC ak' (ak' V.++ d) record Maybe
     , RCastC d (ak' V.++ d) record Maybe
     , IsoRec d record Maybe
     , IsoRec (ak V.++ d) record Maybe
     , IsoRec (ak' V.++ d) record Maybe
     , IsoRec ak' record Maybe
     , IsoRec ((ak V.++ d) V.++ ak') record Maybe
     )
  => RecordKeyMap record Maybe ak ak' -- ^ get aggregated key from key
  -> (FL.Fold (record (Maybe :. ElField) d) (record (Maybe :. ElField) d)) -- ^ aggregate data
  -> FL.Fold
       (record (Maybe :. ElField) (ak V.++ d))
       [(record (Maybe :. ElField) (ak' V.++ d))]
aggregateAllFold :: RecordKeyMap record Maybe ak ak'
-> Fold (record (Maybe :. ElField) d) (record (Maybe :. ElField) d)
-> Fold
     (record (Maybe :. ElField) (ak ++ d))
     [record (Maybe :. ElField) (ak' ++ d)]
aggregateAllFold = RecordKeyMap record Maybe ak ak'
-> Fold (record (Maybe :. ElField) d) (record (Maybe :. ElField) d)
-> Fold
     (record (Maybe :. ElField) (ak ++ d))
     [record (Maybe :. ElField) (ak' ++ d)]
forall (ak :: [(Symbol, *)]) (ak' :: [(Symbol, *)])
       (d :: [(Symbol, *)])
       (record :: ((Symbol, *) -> *) -> [(Symbol, *)] -> *) (f :: * -> *).
((ak' ++ d) ⊆ ((ak ++ d) ++ ak'), ak ⊆ (ak ++ d), ak' ⊆ (ak' ++ d),
 d ⊆ (ak' ++ d), Ord (record (f :. ElField) ak'),
 Ord (record (f :. ElField) ak),
 RCastC (ak' ++ d) ((ak ++ d) ++ ak') record f,
 RCastC ak (ak ++ d) record f, RCastC ak' (ak' ++ d) record f,
 RCastC d (ak' ++ d) record f, IsoRec d record f,
 IsoRec (ak ++ d) record f, IsoRec (ak' ++ d) record f,
 IsoRec ak' record f, IsoRec ((ak ++ d) ++ ak') record f) =>
RecordKeyMap record f ak ak'
-> Fold (record (f :. ElField) d) (record (f :. ElField) d)
-> Fold
     (record (f :. ElField) (ak ++ d))
     [record (f :. ElField) (ak' ++ d)]
FA.aggregateAllFold

-- | Aggregate key columns @ak@ into @ak'@ while leaving key columns @k@ along.
-- Allows aggregation over only some fields.  Will often require a typeapplication
-- to specify what @k@ is.
aggregateFold
  :: forall (k :: [(Symbol, Type)]) ak ak' d record
   . ( (ak' V.++ d) F.⊆ ((ak V.++ d) V.++ ak')
     , ak F.⊆ (ak V.++ d)
     , ak' F.⊆ (ak' V.++ d)
     , d F.⊆ (ak' V.++ d)
     , Ord (record (Maybe :. ElField) ak')
     , Ord (record (Maybe :. ElField) ak)
     , (k V.++ (ak' V.++ d)) ~ ((k V.++ ak') V.++ d)
     , Ord (record (Maybe :. ElField) k)
     , k F.⊆ ((k V.++ ak') V.++ d)
     , k F.⊆ ((k V.++ ak) V.++ d)
     , (ak V.++ d) F.⊆ ((k V.++ ak) V.++ d)
     , RCastC ak (ak V.++ d) record Maybe
     , RCastC ak' (ak' V.++ d) record Maybe
     , RCastC d (ak' V.++ d) record Maybe
     , RCastC k ((k V.++ ak) V.++ d) record Maybe
     , RCastC (ak V.++ d) ((k V.++ ak) V.++ d) record Maybe
     , RCastC (ak' V.++ d) ((ak V.++ d) V.++ ak') record Maybe
     , IsoRec k record Maybe
     , IsoRec d record Maybe
     , IsoRec ((k V.++ ak') V.++ d) record Maybe
     , IsoRec (ak V.++ d) record Maybe
     , IsoRec (ak' V.++ d) record Maybe
     , IsoRec ak' record Maybe
     , IsoRec ((ak V.++ d) V.++ ak') record Maybe
     )
  => RecordKeyMap record Maybe ak ak' -- ^ get aggregated key from key
  -> (FL.Fold (record (Maybe :. ElField) d) (record (Maybe :. ElField) d)) -- ^ aggregate data
  -> FL.Fold
       (record (Maybe :. ElField) (k V.++ ak V.++ d))
       [record (Maybe :. ElField) (k V.++ ak' V.++ d)]
aggregateFold :: RecordKeyMap record Maybe ak ak'
-> Fold (record (Maybe :. ElField) d) (record (Maybe :. ElField) d)
-> Fold
     (record (Maybe :. ElField) ((k ++ ak) ++ d))
     [record (Maybe :. ElField) ((k ++ ak') ++ d)]
aggregateFold = forall (k :: [(Symbol, *)]) (ak :: [(Symbol, *)])
       (ak' :: [(Symbol, *)]) (d :: [(Symbol, *)])
       (record :: ((Symbol, *) -> *) -> [(Symbol, *)] -> *) (f :: * -> *).
((ak' ++ d) ⊆ ((ak ++ d) ++ ak'), ak ⊆ (ak ++ d), ak' ⊆ (ak' ++ d),
 d ⊆ (ak' ++ d), Ord (record (f :. ElField) ak'),
 Ord (record (f :. ElField) ak),
 (k ++ (ak' ++ d)) ~ ((k ++ ak') ++ d),
 Ord (record (f :. ElField) k), k ⊆ ((k ++ ak') ++ d),
 k ⊆ ((k ++ ak) ++ d), (ak ++ d) ⊆ ((k ++ ak) ++ d),
 RCastC ak (ak ++ d) record f, RCastC ak' (ak' ++ d) record f,
 RCastC d (ak' ++ d) record f, RCastC k ((k ++ ak) ++ d) record f,
 RCastC (ak ++ d) ((k ++ ak) ++ d) record f,
 RCastC (ak' ++ d) ((ak ++ d) ++ ak') record f, IsoRec k record f,
 IsoRec d record f, IsoRec ((k ++ ak') ++ d) record f,
 IsoRec (ak ++ d) record f, IsoRec (ak' ++ d) record f,
 IsoRec ak' record f, IsoRec ((ak ++ d) ++ ak') record f) =>
RecordKeyMap record f ak ak'
-> Fold (record (f :. ElField) d) (record (f :. ElField) d)
-> Fold
     (record (f :. ElField) ((k ++ ak) ++ d))
     [record (f :. ElField) ((k ++ ak') ++ d)]
forall (ak :: [(Symbol, *)]) (ak' :: [(Symbol, *)])
       (d :: [(Symbol, *)])
       (record :: ((Symbol, *) -> *) -> [(Symbol, *)] -> *) (f :: * -> *).
((ak' ++ d) ⊆ ((ak ++ d) ++ ak'), ak ⊆ (ak ++ d), ak' ⊆ (ak' ++ d),
 d ⊆ (ak' ++ d), Ord (record (f :. ElField) ak'),
 Ord (record (f :. ElField) ak),
 (k ++ (ak' ++ d)) ~ ((k ++ ak') ++ d),
 Ord (record (f :. ElField) k), k ⊆ ((k ++ ak') ++ d),
 k ⊆ ((k ++ ak) ++ d), (ak ++ d) ⊆ ((k ++ ak) ++ d),
 RCastC ak (ak ++ d) record f, RCastC ak' (ak' ++ d) record f,
 RCastC d (ak' ++ d) record f, RCastC k ((k ++ ak) ++ d) record f,
 RCastC (ak ++ d) ((k ++ ak) ++ d) record f,
 RCastC (ak' ++ d) ((ak ++ d) ++ ak') record f, IsoRec k record f,
 IsoRec d record f, IsoRec ((k ++ ak') ++ d) record f,
 IsoRec (ak ++ d) record f, IsoRec (ak' ++ d) record f,
 IsoRec ak' record f, IsoRec ((ak ++ d) ++ ak') record f) =>
RecordKeyMap record f ak ak'
-> Fold (record (f :. ElField) d) (record (f :. ElField) d)
-> Fold
     (record (f :. ElField) ((k ++ ak) ++ d))
     [record (f :. ElField) ((k ++ ak') ++ d)]
FA.aggregateFold @k

mergeDataFolds
  :: forall (a :: (Symbol, Type)) b d record
   . ( IsoRec '[b] record Maybe
     , IsoRec '[a] record Maybe
     , IsoRec '[a, b] record Maybe
     )
  => FL.Fold (record (Maybe :. ElField) d) (record (Maybe :. ElField) '[a])
  -> FL.Fold
       (record (Maybe :. ElField) d)
       (record (Maybe :. ElField) '[b])
  -> FL.Fold
       (record (Maybe :. ElField) d)
       (record (Maybe :. ElField) '[a, b])
mergeDataFolds :: Fold (record (Maybe :. ElField) d) (record (Maybe :. ElField) '[a])
-> Fold
     (record (Maybe :. ElField) d) (record (Maybe :. ElField) '[b])
-> Fold
     (record (Maybe :. ElField) d) (record (Maybe :. ElField) '[a, b])
mergeDataFolds = Fold (record (Maybe :. ElField) d) (record (Maybe :. ElField) '[a])
-> Fold
     (record (Maybe :. ElField) d) (record (Maybe :. ElField) '[b])
-> Fold
     (record (Maybe :. ElField) d) (record (Maybe :. ElField) '[a, b])
forall (a :: (Symbol, *)) (b :: (Symbol, *)) (d :: [(Symbol, *)])
       (record :: ((Symbol, *) -> *) -> [(Symbol, *)] -> *) (f :: * -> *).
(IsoRec '[b] record f, IsoRec '[a] record f,
 IsoRec '[a, b] record f) =>
Fold (record (f :. ElField) d) (record (f :. ElField) '[a])
-> Fold (record (f :. ElField) d) (record (f :. ElField) '[b])
-> Fold (record (f :. ElField) d) (record (f :. ElField) '[a, b])
FA.mergeDataFolds