{-# LANGUAGE AllowAmbiguousTypes   #-}
{-# LANGUAGE ConstraintKinds       #-}
{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE DeriveGeneric         #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE GADTs                 #-}
       {-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings     #-}
{-# LANGUAGE PolyKinds             #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TemplateHaskell       #-}
{-# LANGUAGE TypeApplications      #-}
{-# LANGUAGE TypeFamilies          #-}
{-# LANGUAGE TypeOperators         #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE UndecidableSuperClasses #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
{-|
Module      : Frames.Aggregation.General
Description : A specialised Map/Reduce for aggregating one set of keys to a smaller one given some operation to merge data. 
Copyright   : (c) Adam Conner-Sax 2019
License     : BSD
Maintainer  : adam_conner_sax@yahoo.com
Stability   : experimental

Frames.Aggregation.General contains types and functions to support a specific map/reduce operation.  Frequently, data is given
with more specificity than required for downstream operations.  Perhaps an age is given in years and we only need to know the
age-band.  Assuming we know how to aggregagte data columns, we want to perform that aggregation on all the subsets required to
build the data-set with the simpler key, while perhaps leaving some other columns alone.  @aggregateFold@ does this.
-}
module Frames.Aggregation.General
  (
  -- * Type-alias for maps from one record key to another
    RecordKeyMap
    -- * Aggregation Function combinators
  , combineKeyAggregations
  , keyMap
    -- * aggregationFolds
  , aggregateAllFold
  , aggregateFold
  , mergeDataFolds
  )
where

import           Frames.MapReduce.General       ( RecGetFieldC(..)
                                                , RCastC(..)
                                                , IsoRec(..)
                                                , isoRecAppend
                                                )

import qualified Control.MapReduce             as MR
import qualified Frames.MapReduce.General      as FMR

import qualified Control.Foldl                 as FL

import qualified Frames                        as F
import qualified Frames.Melt                   as F
import qualified Data.Vinyl                    as V
import qualified Data.Vinyl.TypeLevel          as V

import           Data.Vinyl                     ( ElField )
import qualified Data.Vinyl.Functor            as V
import           Frames                         ( (:.) )

import           GHC.TypeLits                   ( Symbol )
import           Data.Kind                      ( Type )

-- | Type-alias for key aggregation functions.
type RecordKeyMap record f k k' = record (f :. ElField) k -> record (f :. ElField) k'

-- | Combine 2 key aggregation functions over disjoint columns.
combineKeyAggregations
  :: forall (a :: [(Symbol, Type)]) b a' b' record f
   . ( a F.⊆ (a V.++ b)
     , b F.⊆ (a V.++ b)
     , F.Disjoint a' b' ~ 'True
     , RCastC a (a V.++ b) record f
     , RCastC b (a V.++ b) record f
     , IsoRec a' record f
     , IsoRec b' record f
     , IsoRec (a' V.++ b') record f
     )
  => RecordKeyMap record f a a'
  -> RecordKeyMap record f b b'
  -> RecordKeyMap record f (a V.++ b) (a' V.++ b')
combineKeyAggregations aToa' bTob' r =
  aToa' (rcastF r) `isoRecAppend` bTob' (rcastF r)

-- | Promote an ordinary function @a -> b@ to a @RecordKeyMap aCol bCol@ where
-- @aCol@ holds values of type @a@ and @bCol@ holds values of type @b@.
keyMap
  :: forall a b record f
   . ( V.KnownField a
     , V.KnownField b
     , RecGetFieldC a record f '[a]
     , IsoRec '[b] record f
     , Applicative f
     )
  => (V.Snd a -> V.Snd b)
  -> RecordKeyMap record f '[a] '[b]
keyMap g r =
  fromRec
    $ (V.Compose . fmap (V.Field . g . V.getField) . V.getCompose) (rgetF @a r)
    V.:& V.RNil

-- | Given some group keys in columns k,
-- some keys to aggregate over in columns ak,
-- some keys to aggregate into in (new) columns ak',
-- a (hopefully surjective) map from records of ak to records of ak',
-- and a fold over the data, in columns d, aggregating over the rows
-- where ak was distinct but ak' is not,
-- produce a fold to transform data keyed by k and ak to data keyed
-- by k and ak' with appropriate aggregations done in the d.
-- E.g., suppose you have voter turnout data for all 50 states in the US,
-- keyed by state and age of voter in years.  The data is two columns:
-- total votes cast and turnout as a percentage.
-- You want to aggregate the ages into two bands, over and under some age.
-- So your k is the state column, ak is the age column, ak' is a new column with
-- data type to indicate over/under.  The Fold has to sum over the total votes and
-- perform a weighted-sum over the percentages.
aggregateAllFold
  :: forall (ak :: [(Symbol, Type)]) ak' d record f
   . ( (ak' V.++ d) F.⊆ ((ak V.++ d) V.++ ak')
     , ak F.⊆ (ak V.++ d)
     , ak' F.⊆ (ak' V.++ d)
     , d F.⊆ (ak' V.++ d)
     , Ord (record (f :. ElField) ak')
     , Ord (record (f :. ElField) ak)
     , RCastC (ak' V.++ d) ((ak V.++ d) V.++ ak') record f
     , RCastC ak (ak V.++ d) record f
     , RCastC ak' (ak' V.++ d) record f
     , RCastC d (ak' V.++ d) record f
     , IsoRec d record f
     , IsoRec (ak V.++ d) record f
     , IsoRec (ak' V.++ d) record f
     , IsoRec ak' record f
     , IsoRec ((ak V.++ d) V.++ ak') record f
     )
  => RecordKeyMap record f ak ak' -- ^ get aggregated key from key
  -> (FL.Fold (record (f :. ElField) d) (record (f :. ElField) d)) -- ^ aggregate data
  -> FL.Fold
       (record (f :. ElField) (ak V.++ d))
       [(record (f :. ElField) (ak' V.++ d))]
aggregateAllFold toAggKey aggDataF =
  let aggUnpack
        :: MR.Unpack
             (record (f :. ElField) (ak V.++ d))
             (record (f :. ElField) (ak' V.++ d))
      aggUnpack =
        MR.Unpack (\r -> [rcastF $ r `isoRecAppend` toAggKey (rcastF r)]) -- add new keys, lose old
      aggAssign = FMR.assignKeysAndData @ak' @d
  in  MR.mapReduceFold aggUnpack aggAssign (FMR.foldAndAddKey aggDataF)

-- | Aggregate key columns @ak@ into @ak'@ while leaving key columns @k@ along.
-- Allows aggregation over only some fields.  Will often require a typeapplication
-- to specify what @k@ is.
aggregateFold
  :: forall (k :: [(Symbol, Type)]) ak ak' d record f
   . ( (ak' V.++ d) F.⊆ ((ak V.++ d) V.++ ak')
     , ak F.⊆ (ak V.++ d)
     , ak' F.⊆ (ak' V.++ d)
     , d F.⊆ (ak' V.++ d)
     , Ord (record (f :. ElField) ak')
     , Ord (record (f :. ElField) ak)
     , (k V.++ (ak' V.++ d)) ~ ((k V.++ ak') V.++ d)
     , Ord (record (f :. ElField) k)
     , k F.⊆ ((k V.++ ak') V.++ d)
     , k F.⊆ ((k V.++ ak) V.++ d)
     , (ak V.++ d) F.⊆ ((k V.++ ak) V.++ d)
     , RCastC ak (ak V.++ d) record f
     , RCastC ak' (ak' V.++ d) record f
     , RCastC d (ak' V.++ d) record f
     , RCastC k ((k V.++ ak) V.++ d) record f
     , RCastC (ak V.++ d) ((k V.++ ak) V.++ d) record f
     , RCastC (ak' V.++ d) ((ak V.++ d) V.++ ak') record f
     , IsoRec k record f
     , IsoRec d record f
     , IsoRec ((k V.++ ak') V.++ d) record f
     , IsoRec (ak V.++ d) record f
     , IsoRec (ak' V.++ d) record f
     , IsoRec ak' record f
     , IsoRec ((ak V.++ d) V.++ ak') record f
     )
  => RecordKeyMap record f ak ak' -- ^ get aggregated key from key
  -> (FL.Fold (record (f :. ElField) d) (record (f :. ElField) d)) -- ^ aggregate data
  -> FL.Fold
       (record (f :. ElField) (k V.++ ak V.++ d))
       [record (f :. ElField) (k V.++ ak' V.++ d)]
aggregateFold keyAgg aggDataF = MR.concatFold $ MR.mapReduceFold
  MR.noUnpack
  (FMR.assignKeysAndData @k @(ak V.++ d))
  ( FMR.makeRecsWithKey id
  $ MR.ReduceFold (const $ aggregateAllFold keyAgg aggDataF)
  )


mergeDataFolds
  :: forall (a :: (Symbol, Type)) b d record f
   . (IsoRec '[b] record f, IsoRec '[a] record f, IsoRec '[a, b] record f)
  => FL.Fold (record (f :. ElField) d) (record (f :. ElField) '[a])
  -> FL.Fold (record (f :. ElField) d) (record (f :. ElField) '[b])
  -> FL.Fold (record (f :. ElField) d) (record (f :. ElField) '[a, b])
mergeDataFolds aF bF = isoRecAppend <$> aF <*> bF