{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes        #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
{-|
Module      : Frames.Streamly.InCore
Description : Support for transformations between streamly streams and Frames Array of Structures (SoA).
Copyright   : (c) Adam Conner-Sax 2020
License     : BSD-3-Clause
Maintainer  : adam_conner_sax@yahoo.com
Stability   : experimental

This module can be used in-place of Frames.InCore.  The pipes functions are all replaced by equivalent streamly functions.
Relevant classes and type-families are re-exported for convenience.
-}
module Frames.Streamly.InCore
    (
      inCoreSoA
    , inCoreAoS
    , inCoreAoS'
    , inCoreSoA_F
    , inCoreAoS_F
    , inCoreAoS'_F
    -- * Re-exports from Frames
    , toAoS
    , VectorFor
    , VectorMFor
    , VectorMs
    , Vectors
    , RecVec(..)    
    
    )
where

import qualified Streamly                               as Streamly
import qualified Streamly.Prelude                       as Streamly
import qualified Streamly.Data.Fold                     as Streamly.Fold
import qualified Streamly.Internal.Data.Fold            as Streamly.Fold

import qualified Control.Monad.Primitive                as Prim

import qualified Data.Vinyl                             as Vinyl

import qualified Frames                                 as Frames
import qualified Frames.InCore                          as Frames
import           Frames.InCore                           (VectorFor, VectorMFor, VectorMs, Vectors, RecVec(..), toAoS)

import Data.Proxy (Proxy(..))


-- | Fold a stream of 'Vinyl' records into SoA (Structure-of-Arrays) form.
-- Here as a 'streamly' fold, so it may be deployed along with other folds or on only part of a stream.
inCoreSoA_F :: forall m rs. (Prim.PrimMonad m, Frames.RecVec rs)
          => Streamly.Fold.Fold m (Frames.Record rs) (Int, Vinyl.Rec (((->) Int) Frames.:. Frames.ElField) rs)
inCoreSoA_F :: Fold m (Record rs) (Int, Rec ((->) Int :. ElField) rs)
inCoreSoA_F = ((Int, Int, Record (VectorMs m rs))
 -> Record rs -> m (Int, Int, Record (VectorMs m rs)))
-> m (Int, Int, Record (VectorMs m rs))
-> ((Int, Int, Record (VectorMs m rs))
    -> m (Int, Rec ((->) Int :. ElField) rs))
-> Fold m (Record rs) (Int, Rec ((->) Int :. ElField) rs)
forall s a (m :: * -> *) b.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Streamly.Fold.mkFold (Int, Int, Record (VectorMs m rs))
-> Record rs -> m (Int, Int, Record (VectorMs m rs))
forall (m :: * -> *).
PrimMonad m =>
(Int, Int, Record (VectorMs m rs))
-> Record rs -> m (Int, Int, Record (VectorMs m rs))
feed m (Int, Int, Record (VectorMs m rs))
initial (Int, Int, Record (VectorMs m rs))
-> m (Int, Rec ((->) Int :. ElField) rs)
forall (m :: * -> *) b.
PrimMonad m =>
(Int, b, Record (VectorMs m rs))
-> m (Int, Rec ((->) Int :. ElField) rs)
fin
  where feed :: (Int, Int, Record (VectorMs m rs))
-> Record rs -> m (Int, Int, Record (VectorMs m rs))
feed (!Int
i, !Int
sz, !Record (VectorMs m rs)
mvs') Record rs
row
          | Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
sz = Proxy rs -> Record (VectorMs m rs) -> m (Record (VectorMs m rs))
forall (rs :: [(Symbol, *)]) (m :: * -> *)
       (proxy :: [(Symbol, *)] -> *).
(RecVec rs, PrimMonad m) =>
proxy rs -> Record (VectorMs m rs) -> m (Record (VectorMs m rs))
Frames.growRec (Proxy rs
forall k (t :: k). Proxy t
Proxy::Proxy rs) Record (VectorMs m rs)
mvs'
                      m (Record (VectorMs m rs))
-> (Record (VectorMs m rs) -> m (Int, Int, Record (VectorMs m rs)))
-> m (Int, Int, Record (VectorMs m rs))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ((Int, Int, Record (VectorMs m rs))
 -> Record rs -> m (Int, Int, Record (VectorMs m rs)))
-> Record rs
-> (Int, Int, Record (VectorMs m rs))
-> m (Int, Int, Record (VectorMs m rs))
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Int, Int, Record (VectorMs m rs))
-> Record rs -> m (Int, Int, Record (VectorMs m rs))
feed Record rs
row ((Int, Int, Record (VectorMs m rs))
 -> m (Int, Int, Record (VectorMs m rs)))
-> (Record (VectorMs m rs) -> (Int, Int, Record (VectorMs m rs)))
-> Record (VectorMs m rs)
-> m (Int, Int, Record (VectorMs m rs))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int
i, Int
szInt -> Int -> Int
forall a. Num a => a -> a -> a
*Int
2,)
          | Bool
otherwise = do Proxy rs -> Int -> Record (VectorMs m rs) -> Record rs -> m ()
forall (rs :: [(Symbol, *)]) (m :: * -> *)
       (proxy :: [(Symbol, *)] -> *).
(RecVec rs, PrimMonad m) =>
proxy rs -> Int -> Record (VectorMs m rs) -> Record rs -> m ()
Frames.writeRec (Proxy rs
forall k (t :: k). Proxy t
Proxy::Proxy rs) Int
i Record (VectorMs m rs)
mvs' Record rs
row
                           (Int, Int, Record (VectorMs m rs))
-> m (Int, Int, Record (VectorMs m rs))
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
iInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1, Int
sz, Record (VectorMs m rs)
mvs')
                         
        initial :: m (Int, Int, Record (VectorMs m rs))
initial = do
          Record (VectorMs m rs)
mvs <- Proxy rs -> Int -> m (Record (VectorMs m rs))
forall (rs :: [(Symbol, *)]) (m :: * -> *)
       (proxy :: [(Symbol, *)] -> *).
(RecVec rs, PrimMonad m) =>
proxy rs -> Int -> m (Record (VectorMs m rs))
Frames.allocRec (Proxy rs
forall k (t :: k). Proxy t
Proxy :: Proxy rs) Int
Frames.initialCapacity
          (Int, Int, Record (VectorMs m rs))
-> m (Int, Int, Record (VectorMs m rs))
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
0, Int
Frames.initialCapacity, Record (VectorMs m rs)
mvs)
          
        fin :: (Int, b, Record (VectorMs m rs))
-> m (Int, Rec ((->) Int :. ElField) rs)
fin (Int
n, b
_, Record (VectorMs m rs)
mvs') =
          do Record (Vectors rs)
vs <- Proxy rs
-> Int -> Record (VectorMs m rs) -> m (Record (Vectors rs))
forall (rs :: [(Symbol, *)]) (m :: * -> *)
       (proxy :: [(Symbol, *)] -> *).
(RecVec rs, PrimMonad m) =>
proxy rs
-> Int -> Record (VectorMs m rs) -> m (Record (Vectors rs))
Frames.freezeRec (Proxy rs
forall k (t :: k). Proxy t
Proxy::Proxy rs) Int
n Record (VectorMs m rs)
mvs'
             (Int, Rec ((->) Int :. ElField) rs)
-> m (Int, Rec ((->) Int :. ElField) rs)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Int, Rec ((->) Int :. ElField) rs)
 -> m (Int, Rec ((->) Int :. ElField) rs))
-> (Rec ((->) Int :. ElField) rs
    -> (Int, Rec ((->) Int :. ElField) rs))
-> Rec ((->) Int :. ElField) rs
-> m (Int, Rec ((->) Int :. ElField) rs)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int
n,) (Rec ((->) Int :. ElField) rs
 -> m (Int, Rec ((->) Int :. ElField) rs))
-> Rec ((->) Int :. ElField) rs
-> m (Int, Rec ((->) Int :. ElField) rs)
forall a b. (a -> b) -> a -> b
$ Proxy rs -> Record (Vectors rs) -> Rec ((->) Int :. ElField) rs
forall (rs :: [(Symbol, *)]) (proxy :: [(Symbol, *)] -> *).
RecVec rs =>
proxy rs -> Record (Vectors rs) -> Rec ((->) Int :. ElField) rs
Frames.produceRec (Proxy rs
forall k (t :: k). Proxy t
Proxy::Proxy rs) Record (Vectors rs)
vs
{-# INLINE inCoreSoA_F #-}

-- | Perform the 'inCoreSoA_F' fold on a stream of records.
inCoreSoA :: forall m rs. (Prim.PrimMonad m, Frames.RecVec rs)
          => Streamly.SerialT m (Frames.Record rs)
          -> m (Int, Vinyl.Rec (((->) Int) Frames.:. Frames.ElField) rs)
inCoreSoA :: SerialT m (Record rs) -> m (Int, Rec ((->) Int :. ElField) rs)
inCoreSoA = Fold m (Record rs) (Int, Rec ((->) Int :. ElField) rs)
-> SerialT m (Record rs) -> m (Int, Rec ((->) Int :. ElField) rs)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> m b
Streamly.fold Fold m (Record rs) (Int, Rec ((->) Int :. ElField) rs)
forall (m :: * -> *) (rs :: [(Symbol, *)]).
(PrimMonad m, RecVec rs) =>
Fold m (Record rs) (Int, Rec ((->) Int :. ElField) rs)
inCoreSoA_F
{-# INLINE inCoreSoA #-}

-- | Fold a stream of 'Vinyl' records into AoS (Array-of-Structures) form.
inCoreAoS_F :: forall m rs. (Prim.PrimMonad m, Frames.RecVec rs)
          => Streamly.Fold.Fold m (Frames.Record rs) (Frames.FrameRec rs)
inCoreAoS_F :: Fold m (Record rs) (FrameRec rs)
inCoreAoS_F = ((Int, Rec ((->) Int :. ElField) rs) -> FrameRec rs)
-> Fold m (Record rs) (Int, Rec ((->) Int :. ElField) rs)
-> Fold m (Record rs) (FrameRec rs)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Int -> Rec ((->) Int :. ElField) rs -> FrameRec rs)
-> (Int, Rec ((->) Int :. ElField) rs) -> FrameRec rs
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry Int -> Rec ((->) Int :. ElField) rs -> FrameRec rs
forall (rs :: [(Symbol, *)]).
Int -> Rec ((->) Int :. ElField) rs -> FrameRec rs
Frames.toAoS) Fold m (Record rs) (Int, Rec ((->) Int :. ElField) rs)
forall (m :: * -> *) (rs :: [(Symbol, *)]).
(PrimMonad m, RecVec rs) =>
Fold m (Record rs) (Int, Rec ((->) Int :. ElField) rs)
inCoreSoA_F
{-# INLINE inCoreAoS_F #-}

-- | Perform the 'inCoreAoS_F' fold on a stream of records.
inCoreAoS :: forall m rs. (Prim.PrimMonad m, Frames.RecVec rs)
          => Streamly.SerialT m (Frames.Record rs)
          -> m (Frames.FrameRec rs)
inCoreAoS :: SerialT m (Record rs) -> m (FrameRec rs)
inCoreAoS = Fold m (Record rs) (FrameRec rs)
-> SerialT m (Record rs) -> m (FrameRec rs)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> m b
Streamly.fold Fold m (Record rs) (FrameRec rs)
forall (m :: * -> *) (rs :: [(Symbol, *)]).
(PrimMonad m, RecVec rs) =>
Fold m (Record rs) (FrameRec rs)
inCoreAoS_F --fmap (uncurry Frames.toAoS) . inCoreSoA
{-# INLINE inCoreAoS #-}


-- | More general AoS fold, allowing for a, possible column changing, transformation of the records while in SoA form.
inCoreAoS'_F ::  forall ss rs m. (Prim.PrimMonad m, Frames.RecVec rs)
           => (Frames.Rec ((->) Int Frames.:. Frames.ElField) rs -> Frames.Rec ((->) Int Frames.:. Frames.ElField) ss)
           -> Streamly.Fold.Fold m (Frames.Record rs) (Frames.FrameRec ss)
inCoreAoS'_F :: (Rec ((->) Int :. ElField) rs -> Rec ((->) Int :. ElField) ss)
-> Fold m (Record rs) (FrameRec ss)
inCoreAoS'_F Rec ((->) Int :. ElField) rs -> Rec ((->) Int :. ElField) ss
f  = ((Int, Rec ((->) Int :. ElField) rs) -> FrameRec ss)
-> Fold m (Record rs) (Int, Rec ((->) Int :. ElField) rs)
-> Fold m (Record rs) (FrameRec ss)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Int -> Rec ((->) Int :. ElField) ss -> FrameRec ss)
-> (Int, Rec ((->) Int :. ElField) ss) -> FrameRec ss
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry Int -> Rec ((->) Int :. ElField) ss -> FrameRec ss
forall (rs :: [(Symbol, *)]).
Int -> Rec ((->) Int :. ElField) rs -> FrameRec rs
Frames.toAoS ((Int, Rec ((->) Int :. ElField) ss) -> FrameRec ss)
-> ((Int, Rec ((->) Int :. ElField) rs)
    -> (Int, Rec ((->) Int :. ElField) ss))
-> (Int, Rec ((->) Int :. ElField) rs)
-> FrameRec ss
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int, Rec ((->) Int :. ElField) rs)
-> (Int, Rec ((->) Int :. ElField) ss)
forall a.
(a, Rec ((->) Int :. ElField) rs)
-> (a, Rec ((->) Int :. ElField) ss)
aux) Fold m (Record rs) (Int, Rec ((->) Int :. ElField) rs)
forall (m :: * -> *) (rs :: [(Symbol, *)]).
(PrimMonad m, RecVec rs) =>
Fold m (Record rs) (Int, Rec ((->) Int :. ElField) rs)
inCoreSoA_F
  where aux :: (a, Rec ((->) Int :. ElField) rs)
-> (a, Rec ((->) Int :. ElField) ss)
aux (a
x,Rec ((->) Int :. ElField) rs
y) = (a
x, Rec ((->) Int :. ElField) rs -> Rec ((->) Int :. ElField) ss
f Rec ((->) Int :. ElField) rs
y)
{-# INLINE inCoreAoS'_F #-}  

-- | Perform the more general AoS fold on a stream of records.
inCoreAoS' ::  forall ss rs m. (Prim.PrimMonad m, Frames.RecVec rs)
           => (Frames.Rec ((->) Int Frames.:. Frames.ElField) rs -> Frames.Rec ((->) Int Frames.:. Frames.ElField) ss)
           -> Streamly.SerialT m (Frames.Record rs)
           -> m (Frames.FrameRec ss)
inCoreAoS' :: (Rec ((->) Int :. ElField) rs -> Rec ((->) Int :. ElField) ss)
-> SerialT m (Record rs) -> m (FrameRec ss)
inCoreAoS' Rec ((->) Int :. ElField) rs -> Rec ((->) Int :. ElField) ss
f = Fold m (Record rs) (FrameRec ss)
-> SerialT m (Record rs) -> m (FrameRec ss)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> m b
Streamly.fold ((Rec ((->) Int :. ElField) rs -> Rec ((->) Int :. ElField) ss)
-> Fold m (Record rs) (FrameRec ss)
forall (ss :: [(Symbol, *)]) (rs :: [(Symbol, *)]) (m :: * -> *).
(PrimMonad m, RecVec rs) =>
(Rec ((->) Int :. ElField) rs -> Rec ((->) Int :. ElField) ss)
-> Fold m (Record rs) (FrameRec ss)
inCoreAoS'_F Rec ((->) Int :. ElField) rs -> Rec ((->) Int :. ElField) ss
f)
{-# INLINE inCoreAoS' #-}