{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes        #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
{-|
Module      : Frames.Streamly.Transform
Description : Support for transformations on Frames using streamly streams as an intermediate structure.
Copyright   : (c) Adam Conner-Sax 2020
License     : BSD-3-Clause
Maintainer  : adam_conner_sax@yahoo.com
Stability   : experimental

This module adds some functions for using streamly to make transformations on Frames that are either:

1. Like filtering or some one-to-many mapping of rows,
whic makes the efficient memory layout useless in-flight.  Frames already implements 'filterFrame'
this way, using Pipes and the ST monad.

2. Expensive enough per row that the concurrency features of streams become worthwhile.
-}
module Frames.Streamly.Transform
    ( transform
    , filter
    , concurrentMapM
    , mapMaybe
    , concurrentMapMaybeM
    )
where

import qualified Frames.Streamly.InCore as FS
import Prelude hiding (filter)

import qualified Streamly                               as Streamly
import qualified Streamly.Prelude                       as Streamly
import           Streamly                                ( IsStream )
import qualified Control.Monad.Primitive                as Prim
import Control.Monad.ST (runST)
import qualified Frames                                 as Frames
import qualified Frames.InCore                          as Frames




-- | Use streamly to transform a frame.
transform ::
  forall t as bs m.
  (IsStream t
  , Prim.PrimMonad m
  , Frames.RecVec as
  , Frames.RecVec bs
  )
  => (t m (Frames.Record as) -> Streamly.SerialT m (Frames.Record bs)) -> Frames.FrameRec as -> m (Frames.FrameRec bs)
transform :: (t m (Record as) -> SerialT m (Record bs))
-> FrameRec as -> m (FrameRec bs)
transform t m (Record as) -> SerialT m (Record bs)
f = SerialT m (Record bs) -> m (FrameRec bs)
forall (m :: * -> *) (rs :: [(Symbol, *)]).
(PrimMonad m, RecVec rs) =>
SerialT m (Record rs) -> m (FrameRec rs)
FS.inCoreAoS (SerialT m (Record bs) -> m (FrameRec bs))
-> (FrameRec as -> SerialT m (Record bs))
-> FrameRec as
-> m (FrameRec bs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m (Record as) -> SerialT m (Record bs)
f (t m (Record as) -> SerialT m (Record bs))
-> (FrameRec as -> t m (Record as))
-> FrameRec as
-> SerialT m (Record bs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FrameRec as -> t m (Record as)
forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
Streamly.fromFoldable
{-# INLINE transform #-}

-- | Filter using streamly 
filter :: (Frames.RecVec as) => (Frames.Record as -> Bool) -> Frames.FrameRec as -> Frames.FrameRec as
filter :: (Record as -> Bool) -> FrameRec as -> FrameRec as
filter Record as -> Bool
f FrameRec as
frame = (forall s. ST s (FrameRec as)) -> FrameRec as
forall a. (forall s. ST s a) -> a
runST ((forall s. ST s (FrameRec as)) -> FrameRec as)
-> (forall s. ST s (FrameRec as)) -> FrameRec as
forall a b. (a -> b) -> a -> b
$ (SerialT (ST s) (Record as) -> SerialT (ST s) (Record as))
-> FrameRec as -> ST s (FrameRec as)
forall (t :: (* -> *) -> * -> *) (as :: [(Symbol, *)])
       (bs :: [(Symbol, *)]) (m :: * -> *).
(IsStream t, PrimMonad m, RecVec as, RecVec bs) =>
(t m (Record as) -> SerialT m (Record bs))
-> FrameRec as -> m (FrameRec bs)
transform (SerialT (ST s) (Record as) -> SerialT (ST s) (Record as)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
SerialT m a -> t m a
Streamly.serially (SerialT (ST s) (Record as) -> SerialT (ST s) (Record as))
-> (SerialT (ST s) (Record as) -> SerialT (ST s) (Record as))
-> SerialT (ST s) (Record as)
-> SerialT (ST s) (Record as)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Record as -> Bool)
-> SerialT (ST s) (Record as) -> SerialT (ST s) (Record as)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Streamly.filter Record as -> Bool
f) FrameRec as
frame
{-# INLINE filter #-}

-- | map using speculative streams (concurrency that preserves ordering of results).
concurrentMapM :: (Prim.PrimMonad m
                  , Streamly.MonadAsync m                  
                  , Frames.RecVec as
                  , Frames.RecVec bs
                  ) => (Frames.Record as -> m (Frames.Record bs)) -> Frames.FrameRec as -> m (Frames.FrameRec bs)
concurrentMapM :: (Record as -> m (Record bs)) -> FrameRec as -> m (FrameRec bs)
concurrentMapM Record as -> m (Record bs)
f = (AheadT m (Record as) -> SerialT m (Record bs))
-> FrameRec as -> m (FrameRec bs)
forall (t :: (* -> *) -> * -> *) (as :: [(Symbol, *)])
       (bs :: [(Symbol, *)]) (m :: * -> *).
(IsStream t, PrimMonad m, RecVec as, RecVec bs) =>
(t m (Record as) -> SerialT m (Record bs))
-> FrameRec as -> m (FrameRec bs)
transform (AheadT m (Record bs) -> SerialT m (Record bs)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
AheadT m a -> t m a
Streamly.aheadly (AheadT m (Record bs) -> SerialT m (Record bs))
-> (AheadT m (Record as) -> AheadT m (Record bs))
-> AheadT m (Record as)
-> SerialT m (Record bs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Record as -> m (Record bs))
-> AheadT m (Record as) -> AheadT m (Record bs)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
Streamly.mapM Record as -> m (Record bs)
f)
{-# INLINE concurrentMapM #-}

-- | mapMaybe using streamly
mapMaybe :: (Frames.RecVec as
                      , Frames.RecVec bs
                      ) => (Frames.Record as -> Maybe (Frames.Record bs)) -> Frames.FrameRec as -> Frames.FrameRec bs
mapMaybe :: (Record as -> Maybe (Record bs)) -> FrameRec as -> FrameRec bs
mapMaybe Record as -> Maybe (Record bs)
f FrameRec as
frame = (forall s. ST s (FrameRec bs)) -> FrameRec bs
forall a. (forall s. ST s a) -> a
runST ((forall s. ST s (FrameRec bs)) -> FrameRec bs)
-> (forall s. ST s (FrameRec bs)) -> FrameRec bs
forall a b. (a -> b) -> a -> b
$ (AheadT (ST s) (Record as) -> SerialT (ST s) (Record bs))
-> FrameRec as -> ST s (FrameRec bs)
forall (t :: (* -> *) -> * -> *) (as :: [(Symbol, *)])
       (bs :: [(Symbol, *)]) (m :: * -> *).
(IsStream t, PrimMonad m, RecVec as, RecVec bs) =>
(t m (Record as) -> SerialT m (Record bs))
-> FrameRec as -> m (FrameRec bs)
transform (AheadT (ST s) (Record bs) -> SerialT (ST s) (Record bs)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
AheadT m a -> t m a
Streamly.aheadly (AheadT (ST s) (Record bs) -> SerialT (ST s) (Record bs))
-> (AheadT (ST s) (Record as) -> AheadT (ST s) (Record bs))
-> AheadT (ST s) (Record as)
-> SerialT (ST s) (Record bs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Record as -> Maybe (Record bs))
-> AheadT (ST s) (Record as) -> AheadT (ST s) (Record bs)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Streamly.mapMaybe Record as -> Maybe (Record bs)
f) FrameRec as
frame
{-# INLINE mapMaybe #-}

-- | mapMaybeM using speculative streams (concurrency that preserves ordering of results).
concurrentMapMaybeM :: (Prim.PrimMonad m
                       , Streamly.MonadAsync m                  
                       , Frames.RecVec as
                       , Frames.RecVec bs
                       ) => (Frames.Record as -> m (Maybe (Frames.Record bs))) -> Frames.FrameRec as -> m (Frames.FrameRec bs)
concurrentMapMaybeM :: (Record as -> m (Maybe (Record bs)))
-> FrameRec as -> m (FrameRec bs)
concurrentMapMaybeM Record as -> m (Maybe (Record bs))
f = (AheadT m (Record as) -> SerialT m (Record bs))
-> FrameRec as -> m (FrameRec bs)
forall (t :: (* -> *) -> * -> *) (as :: [(Symbol, *)])
       (bs :: [(Symbol, *)]) (m :: * -> *).
(IsStream t, PrimMonad m, RecVec as, RecVec bs) =>
(t m (Record as) -> SerialT m (Record bs))
-> FrameRec as -> m (FrameRec bs)
transform (AheadT m (Record bs) -> SerialT m (Record bs)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
AheadT m a -> t m a
Streamly.aheadly (AheadT m (Record bs) -> SerialT m (Record bs))
-> (AheadT m (Record as) -> AheadT m (Record bs))
-> AheadT m (Record as)
-> SerialT m (Record bs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Record as -> m (Maybe (Record bs)))
-> AheadT m (Record as) -> AheadT m (Record bs)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m, Functor (t m)) =>
(a -> m (Maybe b)) -> t m a -> t m b
Streamly.mapMaybeM Record as -> m (Maybe (Record bs))
f)
{-# INLINE concurrentMapMaybeM #-}