{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes        #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
{-|
Module      : Frames.Streamly.Transform
Description : Support for transformations on Frames using streamly streams as an intermediate structure.
Copyright   : (c) Adam Conner-Sax 2020
License     : BSD-3-Clause
Maintainer  : adam_conner_sax@yahoo.com
Stability   : experimental

This module adds some functions for using streamly to make transformations on Frames that are either:

1. Like filtering or some one-to-many mapping of rows,
whic makes the efficient memory layout useless in-flight.  Frames already implements 'filterFrame'
this way, using Pipes and the ST monad.

2. Expensive enough per row that the concurrency features of streams become worthwhile.
-}
module Frames.Streamly.Transform
    ( transform
    , filter
    , concurrentMapM
    , mapMaybe
    , concurrentMapMaybeM
    )
where

import qualified Frames.Streamly.InCore as FS
import Prelude hiding (filter)

import qualified Streamly                               as Streamly
import qualified Streamly.Prelude                       as Streamly
import           Streamly                                ( IsStream )
import qualified Control.Monad.Primitive                as Prim
import Control.Monad.ST (runST)
import qualified Frames                                 as Frames
import qualified Frames.InCore                          as Frames




-- | Use streamly to transform a frame.
transform ::
  forall t as bs m.
  (IsStream t
  , Prim.PrimMonad m
  , Frames.RecVec as
  , Frames.RecVec bs
  )
  => (t m (Frames.Record as) -> Streamly.SerialT m (Frames.Record bs)) -> Frames.FrameRec as -> m (Frames.FrameRec bs)
transform f = FS.inCoreAoS . f . Streamly.fromFoldable
{-# INLINE transform #-}

-- | Filter using streamly 
filter :: (Frames.RecVec as) => (Frames.Record as -> Bool) -> Frames.FrameRec as -> Frames.FrameRec as
filter f frame = runST $ transform (Streamly.serially . Streamly.filter f) frame
{-# INLINE filter #-}

-- | map using speculative streams (concurrency that preserves ordering of results).
concurrentMapM :: (Prim.PrimMonad m
                  , Streamly.MonadAsync m
                  , Frames.RecVec as
                  , Frames.RecVec bs
                  ) => (Frames.Record as -> m (Frames.Record bs)) -> Frames.FrameRec as -> m (Frames.FrameRec bs)
concurrentMapM f = transform (Streamly.aheadly . Streamly.mapM f)
{-# INLINE concurrentMapM #-}

-- | mapMaybe using streamly
mapMaybe :: (Frames.RecVec as
                      , Frames.RecVec bs
                      ) => (Frames.Record as -> Maybe (Frames.Record bs)) -> Frames.FrameRec as -> Frames.FrameRec bs
mapMaybe f frame = runST $ transform (Streamly.aheadly . Streamly.mapMaybe f) frame
{-# INLINE mapMaybe #-}

-- | mapMaybeM using speculative streams (concurrency that preserves ordering of results).
concurrentMapMaybeM :: (Prim.PrimMonad m
                       , Streamly.MonadAsync m
                       , Frames.RecVec as
                       , Frames.RecVec bs
                       ) => (Frames.Record as -> m (Maybe (Frames.Record bs))) -> Frames.FrameRec as -> m (Frames.FrameRec bs)
concurrentMapMaybeM f = transform (Streamly.aheadly . Streamly.mapMaybeM f)
{-# INLINE concurrentMapMaybeM #-}