{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
module Frames.Streamly.Transform
( transform
, filter
, concurrentMapM
, mapMaybe
, concurrentMapMaybeM
)
where
import qualified Frames.Streamly.InCore as FS
import Prelude hiding (filter)
import qualified Streamly as Streamly
import qualified Streamly.Prelude as Streamly
import Streamly ( IsStream )
import qualified Control.Monad.Primitive as Prim
import Control.Monad.ST (runST)
import qualified Frames as Frames
import qualified Frames.InCore as Frames
transform ::
forall t as bs m.
(IsStream t
, Prim.PrimMonad m
, Frames.RecVec as
, Frames.RecVec bs
)
=> (t m (Frames.Record as) -> Streamly.SerialT m (Frames.Record bs)) -> Frames.FrameRec as -> m (Frames.FrameRec bs)
transform :: (t m (Record as) -> SerialT m (Record bs))
-> FrameRec as -> m (FrameRec bs)
transform t m (Record as) -> SerialT m (Record bs)
f = SerialT m (Record bs) -> m (FrameRec bs)
forall (m :: * -> *) (rs :: [(Symbol, *)]).
(PrimMonad m, RecVec rs) =>
SerialT m (Record rs) -> m (FrameRec rs)
FS.inCoreAoS (SerialT m (Record bs) -> m (FrameRec bs))
-> (FrameRec as -> SerialT m (Record bs))
-> FrameRec as
-> m (FrameRec bs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m (Record as) -> SerialT m (Record bs)
f (t m (Record as) -> SerialT m (Record bs))
-> (FrameRec as -> t m (Record as))
-> FrameRec as
-> SerialT m (Record bs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FrameRec as -> t m (Record as)
forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
Streamly.fromFoldable
{-# INLINE transform #-}
filter :: (Frames.RecVec as) => (Frames.Record as -> Bool) -> Frames.FrameRec as -> Frames.FrameRec as
filter :: (Record as -> Bool) -> FrameRec as -> FrameRec as
filter Record as -> Bool
f FrameRec as
frame = (forall s. ST s (FrameRec as)) -> FrameRec as
forall a. (forall s. ST s a) -> a
runST ((forall s. ST s (FrameRec as)) -> FrameRec as)
-> (forall s. ST s (FrameRec as)) -> FrameRec as
forall a b. (a -> b) -> a -> b
$ (SerialT (ST s) (Record as) -> SerialT (ST s) (Record as))
-> FrameRec as -> ST s (FrameRec as)
forall (t :: (* -> *) -> * -> *) (as :: [(Symbol, *)])
(bs :: [(Symbol, *)]) (m :: * -> *).
(IsStream t, PrimMonad m, RecVec as, RecVec bs) =>
(t m (Record as) -> SerialT m (Record bs))
-> FrameRec as -> m (FrameRec bs)
transform (SerialT (ST s) (Record as) -> SerialT (ST s) (Record as)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
SerialT m a -> t m a
Streamly.serially (SerialT (ST s) (Record as) -> SerialT (ST s) (Record as))
-> (SerialT (ST s) (Record as) -> SerialT (ST s) (Record as))
-> SerialT (ST s) (Record as)
-> SerialT (ST s) (Record as)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Record as -> Bool)
-> SerialT (ST s) (Record as) -> SerialT (ST s) (Record as)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Streamly.filter Record as -> Bool
f) FrameRec as
frame
{-# INLINE filter #-}
concurrentMapM :: (Prim.PrimMonad m
, Streamly.MonadAsync m
, Frames.RecVec as
, Frames.RecVec bs
) => (Frames.Record as -> m (Frames.Record bs)) -> Frames.FrameRec as -> m (Frames.FrameRec bs)
concurrentMapM :: (Record as -> m (Record bs)) -> FrameRec as -> m (FrameRec bs)
concurrentMapM Record as -> m (Record bs)
f = (AheadT m (Record as) -> SerialT m (Record bs))
-> FrameRec as -> m (FrameRec bs)
forall (t :: (* -> *) -> * -> *) (as :: [(Symbol, *)])
(bs :: [(Symbol, *)]) (m :: * -> *).
(IsStream t, PrimMonad m, RecVec as, RecVec bs) =>
(t m (Record as) -> SerialT m (Record bs))
-> FrameRec as -> m (FrameRec bs)
transform (AheadT m (Record bs) -> SerialT m (Record bs)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
AheadT m a -> t m a
Streamly.aheadly (AheadT m (Record bs) -> SerialT m (Record bs))
-> (AheadT m (Record as) -> AheadT m (Record bs))
-> AheadT m (Record as)
-> SerialT m (Record bs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Record as -> m (Record bs))
-> AheadT m (Record as) -> AheadT m (Record bs)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
Streamly.mapM Record as -> m (Record bs)
f)
{-# INLINE concurrentMapM #-}
mapMaybe :: (Frames.RecVec as
, Frames.RecVec bs
) => (Frames.Record as -> Maybe (Frames.Record bs)) -> Frames.FrameRec as -> Frames.FrameRec bs
mapMaybe :: (Record as -> Maybe (Record bs)) -> FrameRec as -> FrameRec bs
mapMaybe Record as -> Maybe (Record bs)
f FrameRec as
frame = (forall s. ST s (FrameRec bs)) -> FrameRec bs
forall a. (forall s. ST s a) -> a
runST ((forall s. ST s (FrameRec bs)) -> FrameRec bs)
-> (forall s. ST s (FrameRec bs)) -> FrameRec bs
forall a b. (a -> b) -> a -> b
$ (AheadT (ST s) (Record as) -> SerialT (ST s) (Record bs))
-> FrameRec as -> ST s (FrameRec bs)
forall (t :: (* -> *) -> * -> *) (as :: [(Symbol, *)])
(bs :: [(Symbol, *)]) (m :: * -> *).
(IsStream t, PrimMonad m, RecVec as, RecVec bs) =>
(t m (Record as) -> SerialT m (Record bs))
-> FrameRec as -> m (FrameRec bs)
transform (AheadT (ST s) (Record bs) -> SerialT (ST s) (Record bs)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
AheadT m a -> t m a
Streamly.aheadly (AheadT (ST s) (Record bs) -> SerialT (ST s) (Record bs))
-> (AheadT (ST s) (Record as) -> AheadT (ST s) (Record bs))
-> AheadT (ST s) (Record as)
-> SerialT (ST s) (Record bs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Record as -> Maybe (Record bs))
-> AheadT (ST s) (Record as) -> AheadT (ST s) (Record bs)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Streamly.mapMaybe Record as -> Maybe (Record bs)
f) FrameRec as
frame
{-# INLINE mapMaybe #-}
concurrentMapMaybeM :: (Prim.PrimMonad m
, Streamly.MonadAsync m
, Frames.RecVec as
, Frames.RecVec bs
) => (Frames.Record as -> m (Maybe (Frames.Record bs))) -> Frames.FrameRec as -> m (Frames.FrameRec bs)
concurrentMapMaybeM :: (Record as -> m (Maybe (Record bs)))
-> FrameRec as -> m (FrameRec bs)
concurrentMapMaybeM Record as -> m (Maybe (Record bs))
f = (AheadT m (Record as) -> SerialT m (Record bs))
-> FrameRec as -> m (FrameRec bs)
forall (t :: (* -> *) -> * -> *) (as :: [(Symbol, *)])
(bs :: [(Symbol, *)]) (m :: * -> *).
(IsStream t, PrimMonad m, RecVec as, RecVec bs) =>
(t m (Record as) -> SerialT m (Record bs))
-> FrameRec as -> m (FrameRec bs)
transform (AheadT m (Record bs) -> SerialT m (Record bs)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
AheadT m a -> t m a
Streamly.aheadly (AheadT m (Record bs) -> SerialT m (Record bs))
-> (AheadT m (Record as) -> AheadT m (Record bs))
-> AheadT m (Record as)
-> SerialT m (Record bs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Record as -> m (Maybe (Record bs)))
-> AheadT m (Record as) -> AheadT m (Record bs)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m, Functor (t m)) =>
(a -> m (Maybe b)) -> t m a -> t m b
Streamly.mapMaybeM Record as -> m (Maybe (Record bs))
f)
{-# INLINE concurrentMapMaybeM #-}