{-# LANGUAGE DataKinds             #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TypeOperators         #-}
{-# LANGUAGE RankNTypes            #-}
{-# LANGUAGE PolyKinds             #-}
{-# LANGUAGE TypeFamilies          #-}
{-# LANGUAGE ConstraintKinds       #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE AllowAmbiguousTypes   #-}
{-# OPTIONS_GHC -fwarn-incomplete-patterns #-}
module Control.MapReduce.Core
  (
    
    
    Unpack(..)
  , Assign(..)
  , Reduce(..)
  
  , UnpackM(..)
  , AssignM(..)
  , ReduceM(..)
  
  , generalizeUnpack
  , generalizeAssign
  , generalizeReduce
  
  , functionToFold
  , functionToFoldM
  , postMapM
  
  , Fold
  , FoldM
  , fold
  , foldM
  )
where
import qualified Control.Foldl                 as FL
import           Control.Foldl                  ( Fold
                                                , FoldM
                                                , fold
                                                , foldM
                                                ) 
import qualified Data.Profunctor               as P
import           Data.Profunctor                ( Profunctor )
import qualified Data.Sequence                 as S
import           Control.Arrow                  ( second )
data Unpack x y where
  Filter :: (x -> Bool) -> Unpack x x 
  Unpack :: Traversable g => (x -> g y) -> Unpack x y 
boolToMaybe :: Bool -> a -> Maybe a
boolToMaybe b x = if b then Just x else Nothing
ifToMaybe :: (x -> Bool) -> x -> Maybe x
ifToMaybe t x = boolToMaybe (t x) x
instance Functor (Unpack x) where
  fmap h (Filter t) = Unpack (fmap h . ifToMaybe t)
  fmap h (Unpack f) = Unpack (fmap h . f)
  {-# INLINABLE fmap #-}
instance P.Profunctor Unpack where
  dimap l r (Filter t) = Unpack ( fmap r . ifToMaybe t . l)
  dimap l r (Unpack f) = Unpack ( fmap r . f . l)
  {-# INLINABLE dimap #-}
data UnpackM m x y where
  FilterM :: Monad m => (x -> m Bool) -> UnpackM m x x
  UnpackM :: (Monad m, Traversable g) => (x -> m (g y)) -> UnpackM m x y
ifToMaybeM :: Monad m => (x -> m Bool) -> x -> m (Maybe x)
ifToMaybeM t x = fmap (`boolToMaybe` x) (t x)
instance Functor (UnpackM m x) where
  fmap h (FilterM t) = UnpackM (fmap (fmap h) . ifToMaybeM t)
  fmap h (UnpackM f) = UnpackM (fmap (fmap h) . f)
  {-# INLINABLE fmap #-}
instance Profunctor (UnpackM m) where
  dimap l r (FilterM t) = UnpackM ( fmap (fmap r) . ifToMaybeM t . l)
  dimap l r (UnpackM f) = UnpackM ( fmap (fmap r) . f . l)
  {-# INLINABLE dimap #-}
generalizeUnpack :: Monad m => Unpack x y -> UnpackM m x y
generalizeUnpack (Filter t) = FilterM $ return . t
generalizeUnpack (Unpack f) = UnpackM $ return . f
{-# INLINABLE generalizeUnpack #-}
data Assign k y c where
  Assign :: (y -> (k, c)) -> Assign k y c
instance Functor (Assign k y) where
  fmap f (Assign h) = Assign $ second f . h 
  {-# INLINABLE fmap #-}
instance Profunctor (Assign k) where
  dimap l r (Assign h) = Assign $ second r . h . l 
  {-# INLINABLE dimap #-}
data AssignM m k y c where
  AssignM :: Monad m => (y -> m (k, c)) -> AssignM m k y c
instance Functor (AssignM m k y) where
  fmap f (AssignM h) = AssignM $ fmap (second f) . h
  {-# INLINABLE fmap #-}
instance Profunctor (AssignM m k) where
  dimap l r (AssignM h) = AssignM $ fmap (second r) . h . l
  {-# INLINABLE dimap #-}
generalizeAssign :: Monad m => Assign k y c -> AssignM m k y c
generalizeAssign (Assign h) = AssignM $ return . h
{-# INLINABLE generalizeAssign #-}
data Reduce k x d where
  Reduce :: (k -> (forall h. (Foldable h, Functor h) => (h x -> d))) -> Reduce k x d
  ReduceFold :: (k -> FL.Fold x d) -> Reduce k x d
data ReduceM m k x d where
  ReduceM :: Monad m => (k -> (forall h. (Foldable h, Functor h) => (h x -> m d))) -> ReduceM m k x d
  ReduceFoldM :: Monad m => (k -> FL.FoldM m x d) -> ReduceM m k x d
instance Functor (Reduce k x) where
  fmap f (Reduce g) = Reduce $ \k -> f . g k
  fmap f (ReduceFold g) = ReduceFold $ \k -> fmap f (g k)
  {-# INLINABLE fmap #-}
instance Functor (ReduceM m k x) where
  fmap f (ReduceM g) = ReduceM $ \k -> fmap f . g k
  fmap f (ReduceFoldM g) = ReduceFoldM $ \k -> fmap f (g k)
  {-# INLINABLE fmap #-}
instance Profunctor (Reduce k) where
  dimap l r (Reduce g)  = Reduce $ \k -> P.dimap (fmap l) r (g k)
  dimap l r (ReduceFold g) = ReduceFold $ \k -> P.dimap l r (g k)
  {-# INLINABLE dimap #-}
instance Profunctor (ReduceM m k) where
  dimap l r (ReduceM g)  = ReduceM $ \k -> P.dimap (fmap l) (fmap r) (g k)
  dimap l r (ReduceFoldM g) = ReduceFoldM $ \k -> P.dimap l r (g k)
  {-# INLINABLE dimap #-}
instance Applicative (Reduce k x) where
  pure x = ReduceFold $ const (pure x)
  {-# INLINABLE pure #-}
  Reduce r1 <*> Reduce r2 = Reduce $ \k -> r1 k <*> r2 k
  ReduceFold f1 <*> ReduceFold f2 = ReduceFold $ \k -> f1 k <*> f2 k
  Reduce r1 <*> ReduceFold f2 = Reduce $ \k -> r1 k <*> FL.fold (f2 k)
  ReduceFold f1 <*> Reduce r2 = Reduce $ \k -> FL.fold (f1 k) <*> r2 k
  {-# INLINABLE (<*>) #-}
instance Monad m => Applicative (ReduceM m k x) where
  pure x = ReduceM $ \_ -> pure $ pure x
  {-# INLINABLE pure #-}
  ReduceM r1 <*> ReduceM r2 = ReduceM $ \k -> (<*>) <$> r1 k <*> r2 k
  ReduceFoldM f1 <*> ReduceFoldM f2 = ReduceFoldM $ \k -> f1 k <*> f2 k
  ReduceM r1 <*> ReduceFoldM f2 = ReduceM $ \k -> (<*>) <$> r1 k <*> FL.foldM (f2 k)
  ReduceFoldM f1 <*> ReduceM r2 = ReduceM $ \k -> (<*>) <$> FL.foldM (f1 k) <*> r2 k
  {-# INLINABLE (<*>) #-}
generalizeReduce :: Monad m => Reduce k x d -> ReduceM m k x d
generalizeReduce (Reduce     f) = ReduceM $ \k -> return . f k
generalizeReduce (ReduceFold f) = ReduceFoldM $ \k -> FL.generalize (f k)
{-# INLINABLE generalizeReduce #-}
postMapM :: Monad m => (a -> m b) -> FL.FoldM m x a -> FL.FoldM m x b
postMapM f (FL.FoldM step begin done) = FL.FoldM step begin done'
  where done' x = done x >>= f
{-# INLINABLE postMapM #-}
seqF :: FL.Fold a (S.Seq a)
seqF = FL.Fold (S.|>) S.empty id
functionToFold :: (forall h . Foldable h => h x -> a) -> FL.Fold x a
functionToFold f = fmap f seqF
functionToFoldM
  :: Monad m => (forall h . Foldable h => h x -> m a) -> FL.FoldM m x a
functionToFoldM f = postMapM f $ FL.generalize seqF