{-# LANGUAGE CPP                 #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}

#include "inline.hs"

-- |
-- Module      : Streamly.Internal.Memory.ArrayStream
-- Copyright   : (c) 2019 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Combinators to efficiently manipulate streams of arrays.
--
module Streamly.Internal.Memory.ArrayStream
    (
    -- * Creation
      arraysOf

    -- * Flattening to elements
    , concat
    , concatRev
    , interpose
    , interposeSuffix
    , intercalateSuffix

    -- * Transformation
    , splitOn
    , splitOnSuffix
    , compact -- compact

    -- * Elimination
    , toArray
    )
where

import Control.Monad.IO.Class (MonadIO(..))
-- import Data.Functor.Identity (Identity)
import Data.Word (Word8)
import Foreign.ForeignPtr (withForeignPtr)
import Foreign.Ptr (minusPtr, plusPtr, castPtr)
import Foreign.Storable (Storable(..))
import Prelude hiding (length, null, last, map, (!!), read, concat)

import Streamly.Internal.Memory.Array.Types (Array(..), length)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamK.Type (IsStream)

import qualified Streamly.Internal.Memory.Array as A
import qualified Streamly.Internal.Memory.Array.Types as A
import qualified Streamly.Internal.Prelude as S
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.Prelude as P

-- XXX efficiently compare two streams of arrays. Two streams can have chunks
-- of different sizes, we can handle that in the stream comparison abstraction.
-- This could be useful e.g. to fast compare whether two files differ.

-- | Convert a stream of arrays into a stream of their elements.
--
-- Same as the following but more efficient:
--
-- > concat = S.concatMap A.read
--
-- @since 0.7.0
{-# INLINE concat #-}
concat :: (IsStream t, MonadIO m, Storable a) => t m (Array a) -> t m a
-- concat m = D.fromStreamD $ A.flattenArrays (D.toStreamD m)
-- concat m = D.fromStreamD $ D.concatMap A.toStreamD (D.toStreamD m)
concat :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
t m (Array a) -> t m a
concat t m (Array a)
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
D.concatMapU forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (Array a)
m)

-- XXX should we have a reverseArrays API to reverse the stream of arrays
-- instead?
--
-- | Convert a stream of arrays into a stream of their elements reversing the
-- contents of each array before flattening.
--
-- @since 0.7.0
{-# INLINE concatRev #-}
concatRev :: (IsStream t, MonadIO m, Storable a) => t m (Array a) -> t m a
concatRev :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
t m (Array a) -> t m a
concatRev t m (Array a)
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Stream m (Array a) -> Stream m a
A.flattenArraysRev (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (Array a)
m)

-- | Flatten a stream of arrays after inserting the given element between
-- arrays.
--
-- /Internal/
{-# INLINE interpose #-}
interpose :: (MonadIO m, IsStream t, Storable a) => a -> t m (Array a) -> t m a
interpose :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadIO m, IsStream t, Storable a) =>
a -> t m (Array a) -> t m a
interpose a
x = forall (t :: (* -> *) -> * -> *) (m :: * -> *) c b.
(IsStream t, Monad m) =>
c -> Unfold m b c -> t m b -> t m c
S.interpose a
x forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read

{-# INLINE intercalateSuffix #-}
intercalateSuffix :: (MonadIO m, IsStream t, Storable a)
    => Array a -> t m (Array a) -> t m a
intercalateSuffix :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadIO m, IsStream t, Storable a) =>
Array a -> t m (Array a) -> t m a
intercalateSuffix Array a
arr = forall (t :: (* -> *) -> * -> *) (m :: * -> *) b c.
(IsStream t, Monad m) =>
b -> Unfold m b c -> t m b -> t m c
S.intercalateSuffix Array a
arr forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read

-- | Flatten a stream of arrays appending the given element after each
-- array.
--
-- @since 0.7.0
{-# INLINE interposeSuffix #-}
interposeSuffix :: (MonadIO m, IsStream t, Storable a)
    => a -> t m (Array a) -> t m a
-- interposeSuffix x = D.fromStreamD . A.unlines x . D.toStreamD
interposeSuffix :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadIO m, IsStream t, Storable a) =>
a -> t m (Array a) -> t m a
interposeSuffix a
x = forall (t :: (* -> *) -> * -> *) (m :: * -> *) c b.
(IsStream t, Monad m) =>
c -> Unfold m b c -> t m b -> t m c
S.interposeSuffix a
x forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read

-- | Split a stream of arrays on a given separator byte, dropping the separator
-- and coalescing all the arrays between two separators into a single array.
--
-- @since 0.7.0
{-# INLINE splitOn #-}
splitOn
    :: (IsStream t, MonadIO m)
    => Word8
    -> t m (Array Word8)
    -> t m (Array Word8)
splitOn :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Word8 -> t m (Array Word8) -> t m (Array Word8)
splitOn Word8
byte t m (Array Word8)
s =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (f :: * -> *) a.
Monad m =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBy (forall (m :: * -> *).
MonadIO m =>
Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
A.breakOn Word8
byte) forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> Array a -> m (Array a)
A.spliceTwo forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (Array Word8)
s

{-# INLINE splitOnSuffix #-}
splitOnSuffix
    :: (IsStream t, MonadIO m)
    => Word8
    -> t m (Array Word8)
    -> t m (Array Word8)
-- splitOn byte s = D.fromStreamD $ A.splitOn byte $ D.toStreamD s
splitOnSuffix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Word8 -> t m (Array Word8) -> t m (Array Word8)
splitOnSuffix Word8
byte t m (Array Word8)
s =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (f :: * -> *) a.
(Monad m, Eq (f a), Monoid (f a)) =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBySuffix (forall (m :: * -> *).
MonadIO m =>
Word8 -> Array Word8 -> m (Array Word8, Maybe (Array Word8))
A.breakOn Word8
byte) forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> Array a -> m (Array a)
A.spliceTwo forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m (Array Word8)
s

-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size in bytes.
--
-- @since 0.7.0
{-# INLINE compact #-}
compact :: (MonadIO m, Storable a)
    => Int -> SerialT m (Array a) -> SerialT m (Array a)
compact :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> SerialT m (Array a) -> SerialT m (Array a)
compact Int
n SerialT m (Array a)
xs = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m (Array a) -> Stream m (Array a)
A.packArraysChunksOf Int
n (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD SerialT m (Array a)
xs)

-- | @arraysOf n stream@ groups the elements in the input stream into arrays of
-- @n@ elements each.
--
-- Same as the following but more efficient:
--
-- > arraysOf n = S.chunksOf n (A.writeN n)
--
-- @since 0.7.0
{-# INLINE arraysOf #-}
arraysOf :: (IsStream t, MonadIO m, Storable a)
    => Int -> t m a -> t m (Array a)
arraysOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
Int -> t m a -> t m (Array a)
arraysOf Int
n t m a
str =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m a -> Stream m (Array a)
A.fromStreamDArraysOf Int
n (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
str)

-- XXX Both of these implementations of splicing seem to perform equally well.
-- We need to perform benchmarks over a range of sizes though.

-- CAUTION! length must more than equal to lengths of all the arrays in the
-- stream.
{-# INLINE spliceArraysLenUnsafe #-}
spliceArraysLenUnsafe :: (MonadIO m, Storable a)
    => Int -> SerialT m (Array a) -> m (Array a)
spliceArraysLenUnsafe :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> SerialT m (Array a) -> m (Array a)
spliceArraysLenUnsafe Int
len SerialT m (Array a)
buffered = do
    Array a
arr <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Storable a => Int -> IO (Array a)
A.newArray Int
len
    Ptr a
end <- forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> b -> SerialT m a -> m b
S.foldlM' forall {m :: * -> *} {a} {a} {b}.
MonadIO m =>
Ptr a -> Array a -> m (Ptr b)
writeArr (forall a. Array a -> Ptr a
aEnd Array a
arr) SerialT m (Array a)
buffered
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Array a
arr {aEnd :: Ptr a
aEnd = Ptr a
end}

    where

    writeArr :: Ptr a -> Array a -> m (Ptr b)
writeArr Ptr a
dst Array{Ptr a
ForeignPtr a
aBound :: forall a. Array a -> Ptr a
aStart :: forall a. Array a -> ForeignPtr a
aBound :: Ptr a
aEnd :: Ptr a
aStart :: ForeignPtr a
aEnd :: forall a. Array a -> Ptr a
..} =
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. ForeignPtr a -> (Ptr a -> IO b) -> IO b
withForeignPtr ForeignPtr a
aStart forall a b. (a -> b) -> a -> b
$ \Ptr a
src -> do
                        let count :: Int
count = Ptr a
aEnd forall a b. Ptr a -> Ptr b -> Int
`minusPtr` Ptr a
src
                        Ptr Word8 -> Ptr Word8 -> Int -> IO ()
A.memcpy (forall a b. Ptr a -> Ptr b
castPtr Ptr a
dst) (forall a b. Ptr a -> Ptr b
castPtr Ptr a
src) Int
count
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Ptr a
dst forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
count

{-# INLINE _spliceArraysBuffered #-}
_spliceArraysBuffered :: (MonadIO m, Storable a)
    => SerialT m (Array a) -> m (Array a)
_spliceArraysBuffered :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m (Array a) -> m (Array a)
_spliceArraysBuffered SerialT m (Array a)
s = do
    SerialT m (Array a)
buffered <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
(Monad m, IsStream t) =>
(a -> b -> b) -> b -> t m a -> m b
P.foldr forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
S.cons forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
S.nil SerialT m (Array a)
s
    Int
len <- forall (m :: * -> *) a. (Monad m, Num a) => SerialT m a -> m a
S.sum (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
S.map forall a. Storable a => Array a -> Int
length SerialT m (Array a)
buffered)
    forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> SerialT m (Array a) -> m (Array a)
spliceArraysLenUnsafe Int
len SerialT m (Array a)
s

{-# INLINE spliceArraysRealloced #-}
spliceArraysRealloced :: forall m a. (MonadIO m, Storable a)
    => SerialT m (Array a) -> m (Array a)
spliceArraysRealloced :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m (Array a) -> m (Array a)
spliceArraysRealloced SerialT m (Array a)
s = do
    Array a
idst <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Storable a => Int -> IO (Array a)
A.newArray (forall a. Storable a => a -> Int -> Int
A.bytesToElemCount (forall a. HasCallStack => a
undefined :: a)
                                (Int -> Int
A.mkChunkSizeKB Int
4))

    Array a
arr <- forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> b -> SerialT m a -> m b
S.foldlM' forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> Array a -> m (Array a)
A.spliceWithDoubling Array a
idst SerialT m (Array a)
s
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Storable a => Array a -> IO (Array a)
A.shrinkToFit Array a
arr

-- | Given a stream of arrays, splice them all together to generate a single
-- array. The stream must be /finite/.
--
-- @since 0.7.0
{-# INLINE toArray #-}
toArray :: (MonadIO m, Storable a) => SerialT m (Array a) -> m (Array a)
toArray :: forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m (Array a) -> m (Array a)
toArray = forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
SerialT m (Array a) -> m (Array a)
spliceArraysRealloced
-- spliceArrays = _spliceArraysBuffered

-- exponentially increasing sizes of the chunks upto the max limit.
-- XXX this will be easier to implement with parsers/terminating folds
-- With this we should be able to reduce the number of chunks/allocations.
-- The reallocation/copy based toArray can also be implemented using this.
--
{-
{-# INLINE toArraysInRange #-}
toArraysInRange :: (IsStream t, MonadIO m, Storable a)
    => Int -> Int -> Fold m (Array a) b -> Fold m a b
toArraysInRange low high (Fold step initial extract) =
-}

{-
-- | Fold the input to a pure buffered stream (List) of arrays.
{-# INLINE _toArraysOf #-}
_toArraysOf :: (MonadIO m, Storable a)
    => Int -> Fold m a (SerialT Identity (Array a))
_toArraysOf n = FL.lchunksOf n (A.writeNF n) FL.toStream
-}