{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
module Frames.Streamly.Transform
( transform
, filter
, concurrentMapM
, mapMaybe
, concurrentMapMaybeM
)
where
import qualified Frames.Streamly.InCore as FS
import Prelude hiding (filter)
import qualified Streamly as Streamly
import qualified Streamly.Prelude as Streamly
import Streamly ( IsStream )
import qualified Control.Monad.Primitive as Prim
import Control.Monad.ST (runST)
import qualified Frames as Frames
import qualified Frames.InCore as Frames
transform ::
forall t as bs m.
(IsStream t
, Prim.PrimMonad m
, Frames.RecVec as
, Frames.RecVec bs
)
=> (t m (Frames.Record as) -> Streamly.SerialT m (Frames.Record bs)) -> Frames.FrameRec as -> m (Frames.FrameRec bs)
transform f = FS.inCoreAoS . f . Streamly.fromFoldable
{-# INLINE transform #-}
filter :: (Frames.RecVec as) => (Frames.Record as -> Bool) -> Frames.FrameRec as -> Frames.FrameRec as
filter f frame = runST $ transform (Streamly.serially . Streamly.filter f) frame
{-# INLINE filter #-}
concurrentMapM :: (Prim.PrimMonad m
, Streamly.MonadAsync m
, Frames.RecVec as
, Frames.RecVec bs
) => (Frames.Record as -> m (Frames.Record bs)) -> Frames.FrameRec as -> m (Frames.FrameRec bs)
concurrentMapM f = transform (Streamly.aheadly . Streamly.mapM f)
{-# INLINE concurrentMapM #-}
mapMaybe :: (Frames.RecVec as
, Frames.RecVec bs
) => (Frames.Record as -> Maybe (Frames.Record bs)) -> Frames.FrameRec as -> Frames.FrameRec bs
mapMaybe f frame = runST $ transform (Streamly.aheadly . Streamly.mapMaybe f) frame
{-# INLINE mapMaybe #-}
concurrentMapMaybeM :: (Prim.PrimMonad m
, Streamly.MonadAsync m
, Frames.RecVec as
, Frames.RecVec bs
) => (Frames.Record as -> m (Maybe (Frames.Record bs))) -> Frames.FrameRec as -> m (Frames.FrameRec bs)
concurrentMapMaybeM f = transform (Streamly.aheadly . Streamly.mapMaybeM f)
{-# INLINE concurrentMapMaybeM #-}