-- |
-- Module      : Streamly.Internal.Data.Array.Stream.Mut.Foreign
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD3-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Combinators to efficiently manipulate streams of mutable arrays.
--
module Streamly.Internal.Data.Array.Stream.Mut.Foreign
    (
    -- * Generation
      arraysOf

    -- * Compaction
    , packArraysChunksOf
    , SpliceState (..)
    , lpackArraysChunksOf
#if !defined(mingw32_HOST_OS)
    , groupIOVecsOf
#endif
    , compact
    , compactLE
    , compactEQ
    , compactGE
    )
where

#include "inline.hs"

import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad (when)
import Data.Bifunctor (first)
import Foreign.Storable (Storable(..))
#if !defined(mingw32_HOST_OS)
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Ptr (castPtr)
import Streamly.Internal.FileSystem.FDIO (IOVec(..))
import Streamly.Internal.Data.Array.Foreign.Mut.Type (length)
import Streamly.Internal.Data.SVar (adaptState)
#endif
import Streamly.Internal.Data.Array.Foreign.Mut.Type (Array(..))
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamK.Type (IsStream)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))

import qualified Streamly.Internal.Data.Array.Foreign.Mut.Type as MArray
import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.Stream.StreamD as D

import Prelude hiding (length)

-- | @arraysOf n stream@ groups the elements in the input stream into arrays of
-- @n@ elements each.
--
-- Same as the following but may be more efficient:
--
-- > arraysOf n = Stream.foldMany (MArray.writeN n)
--
-- /Pre-release/
{-# INLINE arraysOf #-}
arraysOf :: (IsStream t, MonadIO m, Storable a)
    => Int -> t m a -> t m (Array a)
arraysOf :: Int -> t m a -> t m (Array a)
arraysOf Int
n = Stream m (Array a) -> t m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (Array a) -> t m (Array a))
-> (t m a -> Stream m (Array a)) -> t m a -> t m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Stream m a -> Stream m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m a -> Stream m (Array a)
MArray.arraysOf Int
n (Stream m a -> Stream m (Array a))
-> (t m a -> Stream m a) -> t m a -> Stream m (Array a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD

-------------------------------------------------------------------------------
-- Compact
-------------------------------------------------------------------------------

data SpliceState s arr
    = SpliceInitial s
    | SpliceBuffering s arr
    | SpliceYielding arr (SpliceState s arr)
    | SpliceFinish

-- | This mutates the first array (if it has space) to append values from the
-- second one. This would work for immutable arrays as well because an
-- immutable array never has space so a new array is allocated instead of
-- mutating it.
--
-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size. Note that if a single array is bigger than the
-- specified size we do not split it to fit. When we coalesce multiple arrays
-- if the size would exceed the specified size we do not coalesce therefore the
-- actual array size may be less than the specified chunk size.
--
-- @since 0.7.0
{-# INLINE_NORMAL packArraysChunksOf #-}
packArraysChunksOf :: (MonadIO m, Storable a)
    => Int -> D.Stream m (Array a) -> D.Stream m (Array a)
packArraysChunksOf :: Int -> Stream m (Array a) -> Stream m (Array a)
packArraysChunksOf Int
n (D.Stream State Stream m (Array a) -> s -> m (Step s (Array a))
step s
state) =
    (State Stream m (Array a)
 -> SpliceState s (Array a)
 -> m (Step (SpliceState s (Array a)) (Array a)))
-> SpliceState s (Array a) -> Stream m (Array a)
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m (Array a)
-> SpliceState s (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
step' (s -> SpliceState s (Array a)
forall s arr. s -> SpliceState s arr
SpliceInitial s
state)

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m (Array a)
-> SpliceState s (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
step' State Stream m (Array a)
gst (SpliceInitial s
st) = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            -- XXX we can pass the module string from the higher level API
            [Char] -> m ()
forall a. HasCallStack => [Char] -> a
error ([Char] -> m ()) -> [Char] -> m ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Streamly.Internal.Data.Array.Foreign.Mut.Type.packArraysChunksOf: the size of "
                 [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"arrays [" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
n [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
        Step s (Array a)
r <- State Stream m (Array a) -> s -> m (Step s (Array a))
step State Stream m (Array a)
gst s
st
        case Step s (Array a)
r of
            D.Yield Array a
arr s
s -> Step (SpliceState s (Array a)) (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SpliceState s (Array a)) (Array a)
 -> m (Step (SpliceState s (Array a)) (Array a)))
-> Step (SpliceState s (Array a)) (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
forall a b. (a -> b) -> a -> b
$
                let len :: Int
len = Array a -> Int
forall a. Array a -> Int
MArray.byteLength Array a
arr
                 in if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
                    then SpliceState s (Array a) -> Step (SpliceState s (Array a)) (Array a)
forall s a. s -> Step s a
D.Skip (Array a -> SpliceState s (Array a) -> SpliceState s (Array a)
forall s arr. arr -> SpliceState s arr -> SpliceState s arr
SpliceYielding Array a
arr (s -> SpliceState s (Array a)
forall s arr. s -> SpliceState s arr
SpliceInitial s
s))
                    else SpliceState s (Array a) -> Step (SpliceState s (Array a)) (Array a)
forall s a. s -> Step s a
D.Skip (s -> Array a -> SpliceState s (Array a)
forall s arr. s -> arr -> SpliceState s arr
SpliceBuffering s
s Array a
arr)
            D.Skip s
s -> Step (SpliceState s (Array a)) (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SpliceState s (Array a)) (Array a)
 -> m (Step (SpliceState s (Array a)) (Array a)))
-> Step (SpliceState s (Array a)) (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
forall a b. (a -> b) -> a -> b
$ SpliceState s (Array a) -> Step (SpliceState s (Array a)) (Array a)
forall s a. s -> Step s a
D.Skip (s -> SpliceState s (Array a)
forall s arr. s -> SpliceState s arr
SpliceInitial s
s)
            Step s (Array a)
D.Stop -> Step (SpliceState s (Array a)) (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
forall (m :: * -> *) a. Monad m => a -> m a
return Step (SpliceState s (Array a)) (Array a)
forall s a. Step s a
D.Stop

    step' State Stream m (Array a)
gst (SpliceBuffering s
st Array a
buf) = do
        Step s (Array a)
r <- State Stream m (Array a) -> s -> m (Step s (Array a))
step State Stream m (Array a)
gst s
st
        case Step s (Array a)
r of
            D.Yield Array a
arr s
s -> do
                let len :: Int
len = Array a -> Int
forall a. Array a -> Int
MArray.byteLength Array a
buf Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Array a -> Int
forall a. Array a -> Int
MArray.byteLength Array a
arr
                if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n
                then Step (SpliceState s (Array a)) (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SpliceState s (Array a)) (Array a)
 -> m (Step (SpliceState s (Array a)) (Array a)))
-> Step (SpliceState s (Array a)) (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
forall a b. (a -> b) -> a -> b
$
                    SpliceState s (Array a) -> Step (SpliceState s (Array a)) (Array a)
forall s a. s -> Step s a
D.Skip (Array a -> SpliceState s (Array a) -> SpliceState s (Array a)
forall s arr. arr -> SpliceState s arr -> SpliceState s arr
SpliceYielding Array a
buf (s -> Array a -> SpliceState s (Array a)
forall s arr. s -> arr -> SpliceState s arr
SpliceBuffering s
s Array a
arr))
                else do
                    Array a
buf' <- if Array a -> Int
forall a. Array a -> Int
MArray.byteCapacity Array a
buf Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
n
                            then IO (Array a) -> m (Array a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array a) -> m (Array a)) -> IO (Array a) -> m (Array a)
forall a b. (a -> b) -> a -> b
$ Int -> Array a -> IO (Array a)
forall a. Storable a => Int -> Array a -> IO (Array a)
MArray.realloc Int
n Array a
buf
                            else Array a -> m (Array a)
forall (m :: * -> *) a. Monad m => a -> m a
return Array a
buf
                    Array a
buf'' <- Array a -> Array a -> m (Array a)
forall (m :: * -> *) a.
MonadIO m =>
Array a -> Array a -> m (Array a)
MArray.spliceWith Array a
buf' Array a
arr
                    Step (SpliceState s (Array a)) (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SpliceState s (Array a)) (Array a)
 -> m (Step (SpliceState s (Array a)) (Array a)))
-> Step (SpliceState s (Array a)) (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
forall a b. (a -> b) -> a -> b
$ SpliceState s (Array a) -> Step (SpliceState s (Array a)) (Array a)
forall s a. s -> Step s a
D.Skip (s -> Array a -> SpliceState s (Array a)
forall s arr. s -> arr -> SpliceState s arr
SpliceBuffering s
s Array a
buf'')
            D.Skip s
s -> Step (SpliceState s (Array a)) (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SpliceState s (Array a)) (Array a)
 -> m (Step (SpliceState s (Array a)) (Array a)))
-> Step (SpliceState s (Array a)) (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
forall a b. (a -> b) -> a -> b
$ SpliceState s (Array a) -> Step (SpliceState s (Array a)) (Array a)
forall s a. s -> Step s a
D.Skip (s -> Array a -> SpliceState s (Array a)
forall s arr. s -> arr -> SpliceState s arr
SpliceBuffering s
s Array a
buf)
            Step s (Array a)
D.Stop -> Step (SpliceState s (Array a)) (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SpliceState s (Array a)) (Array a)
 -> m (Step (SpliceState s (Array a)) (Array a)))
-> Step (SpliceState s (Array a)) (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
forall a b. (a -> b) -> a -> b
$ SpliceState s (Array a) -> Step (SpliceState s (Array a)) (Array a)
forall s a. s -> Step s a
D.Skip (Array a -> SpliceState s (Array a) -> SpliceState s (Array a)
forall s arr. arr -> SpliceState s arr -> SpliceState s arr
SpliceYielding Array a
buf SpliceState s (Array a)
forall s arr. SpliceState s arr
SpliceFinish)

    step' State Stream m (Array a)
_ SpliceState s (Array a)
SpliceFinish = Step (SpliceState s (Array a)) (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
forall (m :: * -> *) a. Monad m => a -> m a
return Step (SpliceState s (Array a)) (Array a)
forall s a. Step s a
D.Stop

    step' State Stream m (Array a)
_ (SpliceYielding Array a
arr SpliceState s (Array a)
next) = Step (SpliceState s (Array a)) (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SpliceState s (Array a)) (Array a)
 -> m (Step (SpliceState s (Array a)) (Array a)))
-> Step (SpliceState s (Array a)) (Array a)
-> m (Step (SpliceState s (Array a)) (Array a))
forall a b. (a -> b) -> a -> b
$ Array a
-> SpliceState s (Array a)
-> Step (SpliceState s (Array a)) (Array a)
forall s a. a -> s -> Step s a
D.Yield Array a
arr SpliceState s (Array a)
next

{-# INLINE_NORMAL lpackArraysChunksOf #-}
lpackArraysChunksOf :: (MonadIO m, Storable a)
    => Int -> Fold m (Array a) () -> Fold m (Array a) ()
lpackArraysChunksOf :: Int -> Fold m (Array a) () -> Fold m (Array a) ()
lpackArraysChunksOf Int
n (Fold s -> Array a -> m (Step s ())
step1 m (Step s ())
initial1 s -> m ()
extract1) =
    (Tuple' (Maybe (Array a)) s
 -> Array a -> m (Step (Tuple' (Maybe (Array a)) s) ()))
-> m (Step (Tuple' (Maybe (Array a)) s) ())
-> (Tuple' (Maybe (Array a)) s -> m ())
-> Fold m (Array a) ()
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold Tuple' (Maybe (Array a)) s
-> Array a -> m (Step (Tuple' (Maybe (Array a)) s) ())
step m (Step (Tuple' (Maybe (Array a)) s) ())
forall a. m (Step (Tuple' (Maybe a) s) ())
initial Tuple' (Maybe (Array a)) s -> m ()
extract

    where

    initial :: m (Step (Tuple' (Maybe a) s) ())
initial = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            -- XXX we can pass the module string from the higher level API
            [Char] -> m ()
forall a. HasCallStack => [Char] -> a
error ([Char] -> m ()) -> [Char] -> m ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Streamly.Internal.Data.Array.Foreign.Mut.Type.packArraysChunksOf: the size of "
                 [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"arrays [" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
n [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"

        Step s ()
r <- m (Step s ())
initial1
        Step (Tuple' (Maybe a) s) () -> m (Step (Tuple' (Maybe a) s) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Maybe a) s) () -> m (Step (Tuple' (Maybe a) s) ()))
-> Step (Tuple' (Maybe a) s) () -> m (Step (Tuple' (Maybe a) s) ())
forall a b. (a -> b) -> a -> b
$ (s -> Tuple' (Maybe a) s)
-> Step s () -> Step (Tuple' (Maybe a) s) ()
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (Maybe a -> s -> Tuple' (Maybe a) s
forall a b. a -> b -> Tuple' a b
Tuple' Maybe a
forall a. Maybe a
Nothing) Step s ()
r

    extract :: Tuple' (Maybe (Array a)) s -> m ()
extract (Tuple' Maybe (Array a)
Nothing s
r1) = s -> m ()
extract1 s
r1
    extract (Tuple' (Just Array a
buf) s
r1) = do
        Step s ()
r <- s -> Array a -> m (Step s ())
step1 s
r1 Array a
buf
        case Step s ()
r of
            FL.Partial s
rr -> s -> m ()
extract1 s
rr
            FL.Done ()
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    step :: Tuple' (Maybe (Array a)) s
-> Array a -> m (Step (Tuple' (Maybe (Array a)) s) ())
step (Tuple' Maybe (Array a)
Nothing s
r1) Array a
arr =
            let len :: Int
len = Array a -> Int
forall a. Array a -> Int
MArray.byteLength Array a
arr
             in if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
                then do
                    Step s ()
r <- s -> Array a -> m (Step s ())
step1 s
r1 Array a
arr
                    case Step s ()
r of
                        FL.Done ()
_ -> Step (Tuple' (Maybe (Array a)) s) ()
-> m (Step (Tuple' (Maybe (Array a)) s) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Maybe (Array a)) s) ()
 -> m (Step (Tuple' (Maybe (Array a)) s) ()))
-> Step (Tuple' (Maybe (Array a)) s) ()
-> m (Step (Tuple' (Maybe (Array a)) s) ())
forall a b. (a -> b) -> a -> b
$ () -> Step (Tuple' (Maybe (Array a)) s) ()
forall s b. b -> Step s b
FL.Done ()
                        FL.Partial s
s -> do
                            s -> m ()
extract1 s
s
                            Step s ()
res <- m (Step s ())
initial1
                            Step (Tuple' (Maybe (Array a)) s) ()
-> m (Step (Tuple' (Maybe (Array a)) s) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Maybe (Array a)) s) ()
 -> m (Step (Tuple' (Maybe (Array a)) s) ()))
-> Step (Tuple' (Maybe (Array a)) s) ()
-> m (Step (Tuple' (Maybe (Array a)) s) ())
forall a b. (a -> b) -> a -> b
$ (s -> Tuple' (Maybe (Array a)) s)
-> Step s () -> Step (Tuple' (Maybe (Array a)) s) ()
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (Maybe (Array a) -> s -> Tuple' (Maybe (Array a)) s
forall a b. a -> b -> Tuple' a b
Tuple' Maybe (Array a)
forall a. Maybe a
Nothing) Step s ()
res
                else Step (Tuple' (Maybe (Array a)) s) ()
-> m (Step (Tuple' (Maybe (Array a)) s) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Maybe (Array a)) s) ()
 -> m (Step (Tuple' (Maybe (Array a)) s) ()))
-> Step (Tuple' (Maybe (Array a)) s) ()
-> m (Step (Tuple' (Maybe (Array a)) s) ())
forall a b. (a -> b) -> a -> b
$ Tuple' (Maybe (Array a)) s -> Step (Tuple' (Maybe (Array a)) s) ()
forall s b. s -> Step s b
FL.Partial (Tuple' (Maybe (Array a)) s
 -> Step (Tuple' (Maybe (Array a)) s) ())
-> Tuple' (Maybe (Array a)) s
-> Step (Tuple' (Maybe (Array a)) s) ()
forall a b. (a -> b) -> a -> b
$ Maybe (Array a) -> s -> Tuple' (Maybe (Array a)) s
forall a b. a -> b -> Tuple' a b
Tuple' (Array a -> Maybe (Array a)
forall a. a -> Maybe a
Just Array a
arr) s
r1

    step (Tuple' (Just Array a
buf) s
r1) Array a
arr = do
            let len :: Int
len = Array a -> Int
forall a. Array a -> Int
MArray.byteLength Array a
buf Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Array a -> Int
forall a. Array a -> Int
MArray.byteLength Array a
arr
            Array a
buf' <- if Array a -> Int
forall a. Array a -> Int
MArray.byteCapacity Array a
buf Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
len
                    then IO (Array a) -> m (Array a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array a) -> m (Array a)) -> IO (Array a) -> m (Array a)
forall a b. (a -> b) -> a -> b
$ Int -> Array a -> IO (Array a)
forall a. Storable a => Int -> Array a -> IO (Array a)
MArray.realloc (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
n Int
len) Array a
buf
                    else Array a -> m (Array a)
forall (m :: * -> *) a. Monad m => a -> m a
return Array a
buf
            Array a
buf'' <- Array a -> Array a -> m (Array a)
forall (m :: * -> *) a.
MonadIO m =>
Array a -> Array a -> m (Array a)
MArray.spliceWith Array a
buf' Array a
arr

            -- XXX this is common in both the equations of step
            if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
            then do
                Step s ()
r <- s -> Array a -> m (Step s ())
step1 s
r1 Array a
buf''
                case Step s ()
r of
                    FL.Done ()
_ -> Step (Tuple' (Maybe (Array a)) s) ()
-> m (Step (Tuple' (Maybe (Array a)) s) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Maybe (Array a)) s) ()
 -> m (Step (Tuple' (Maybe (Array a)) s) ()))
-> Step (Tuple' (Maybe (Array a)) s) ()
-> m (Step (Tuple' (Maybe (Array a)) s) ())
forall a b. (a -> b) -> a -> b
$ () -> Step (Tuple' (Maybe (Array a)) s) ()
forall s b. b -> Step s b
FL.Done ()
                    FL.Partial s
s -> do
                        s -> m ()
extract1 s
s
                        Step s ()
res <- m (Step s ())
initial1
                        Step (Tuple' (Maybe (Array a)) s) ()
-> m (Step (Tuple' (Maybe (Array a)) s) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Maybe (Array a)) s) ()
 -> m (Step (Tuple' (Maybe (Array a)) s) ()))
-> Step (Tuple' (Maybe (Array a)) s) ()
-> m (Step (Tuple' (Maybe (Array a)) s) ())
forall a b. (a -> b) -> a -> b
$ (s -> Tuple' (Maybe (Array a)) s)
-> Step s () -> Step (Tuple' (Maybe (Array a)) s) ()
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (Maybe (Array a) -> s -> Tuple' (Maybe (Array a)) s
forall a b. a -> b -> Tuple' a b
Tuple' Maybe (Array a)
forall a. Maybe a
Nothing) Step s ()
res
            else Step (Tuple' (Maybe (Array a)) s) ()
-> m (Step (Tuple' (Maybe (Array a)) s) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Maybe (Array a)) s) ()
 -> m (Step (Tuple' (Maybe (Array a)) s) ()))
-> Step (Tuple' (Maybe (Array a)) s) ()
-> m (Step (Tuple' (Maybe (Array a)) s) ())
forall a b. (a -> b) -> a -> b
$ Tuple' (Maybe (Array a)) s -> Step (Tuple' (Maybe (Array a)) s) ()
forall s b. s -> Step s b
FL.Partial (Tuple' (Maybe (Array a)) s
 -> Step (Tuple' (Maybe (Array a)) s) ())
-> Tuple' (Maybe (Array a)) s
-> Step (Tuple' (Maybe (Array a)) s) ()
forall a b. (a -> b) -> a -> b
$ Maybe (Array a) -> s -> Tuple' (Maybe (Array a)) s
forall a b. a -> b -> Tuple' a b
Tuple' (Array a -> Maybe (Array a)
forall a. a -> Maybe a
Just Array a
buf'') s
r1

-- XXX replace by compactLE
--
-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size in bytes.
--
-- @since 0.7.0
{-# INLINE compact #-}
compact :: (MonadIO m, Storable a)
    => Int -> SerialT m (Array a) -> SerialT m (Array a)
compact :: Int -> SerialT m (Array a) -> SerialT m (Array a)
compact Int
n SerialT m (Array a)
xs = Stream m (Array a) -> SerialT m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (Array a) -> SerialT m (Array a))
-> Stream m (Array a) -> SerialT m (Array a)
forall a b. (a -> b) -> a -> b
$ Int -> Stream m (Array a) -> Stream m (Array a)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m (Array a) -> Stream m (Array a)
packArraysChunksOf Int
n (SerialT m (Array a) -> Stream m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD SerialT m (Array a)
xs)

-- XXX Replace the above functions by a compactLEFold
-- | Coalesce adjacent arrays in incoming stream to form bigger arrays of a
-- maximum specified size. Note that if a single array is bigger than the
-- specified size we do not split it to fit. When we coalesce multiple arrays
-- if the size would exceed the specified size we do not coalesce therefore the
-- actual array size may be less than the specified chunk size.
--
-- /Unimplemented/
{-# INLINE_NORMAL compactLEFold #-}
compactLEFold :: -- (MonadIO m, Storable a) =>
    Int -> Fold m (Array a) (Array a)
compactLEFold :: Int -> Fold m (Array a) (Array a)
compactLEFold = Int -> Fold m (Array a) (Array a)
forall a. HasCallStack => a
undefined

compactLE :: (MonadIO m {-, Storable a-}) =>
    Int -> SerialT m (Array a) -> SerialT m (Array a)
compactLE :: Int -> SerialT m (Array a) -> SerialT m (Array a)
compactLE Int
n SerialT m (Array a)
xs = Stream m (Array a) -> SerialT m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m (Array a) -> SerialT m (Array a))
-> Stream m (Array a) -> SerialT m (Array a)
forall a b. (a -> b) -> a -> b
$ Fold m (Array a) (Array a)
-> Stream m (Array a) -> Stream m (Array a)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.foldMany (Int -> Fold m (Array a) (Array a)
forall (m :: * -> *) a. Int -> Fold m (Array a) (Array a)
compactLEFold Int
n) (SerialT m (Array a) -> Stream m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD SerialT m (Array a)
xs)

-- | Like 'compact' but generates arrays of exactly equal to the size specified
-- except for the last array in the stream which could be shorter.
--
-- /Unimplemented/
{-# INLINE compactEQ #-}
compactEQ :: -- (MonadIO m, Storable a) =>
    Int -> SerialT m (Array a) -> SerialT m (Array a)
compactEQ :: Int -> SerialT m (Array a) -> SerialT m (Array a)
compactEQ Int
_n SerialT m (Array a)
_xs = SerialT m (Array a)
forall a. HasCallStack => a
undefined
    -- D.fromStreamD $ D.foldMany (compactEQFold n) (D.toStreamD xs)

-- | Like 'compact' but generates arrays of size greater than or equal to the
-- specified except for the last array in the stream which could be shorter.
--
-- /Unimplemented/
{-# INLINE compactGE #-}
compactGE :: -- (MonadIO m, Storable a) =>
    Int -> SerialT m (Array a) -> SerialT m (Array a)
compactGE :: Int -> SerialT m (Array a) -> SerialT m (Array a)
compactGE Int
_n SerialT m (Array a)
_xs = SerialT m (Array a)
forall a. HasCallStack => a
undefined
    -- D.fromStreamD $ D.foldMany (compactGEFold n) (D.toStreamD xs)

-------------------------------------------------------------------------------
-- IOVec
-------------------------------------------------------------------------------

#if !defined(mingw32_HOST_OS)
data GatherState s arr
    = GatherInitial s
    | GatherBuffering s arr Int
    | GatherYielding arr (GatherState s arr)
    | GatherFinish

-- | @groupIOVecsOf maxBytes maxEntries@ groups arrays in the incoming stream
-- to create a stream of 'IOVec' arrays with a maximum of @maxBytes@ bytes in
-- each array and a maximum of @maxEntries@ entries in each array.
--
-- @since 0.7.0
{-# INLINE_NORMAL groupIOVecsOf #-}
groupIOVecsOf :: MonadIO m
    => Int -> Int -> D.Stream m (Array a) -> D.Stream m (Array IOVec)
groupIOVecsOf :: Int -> Int -> Stream m (Array a) -> Stream m (Array IOVec)
groupIOVecsOf Int
n Int
maxIOVLen (D.Stream State Stream m (Array a) -> s -> m (Step s (Array a))
step s
state) =
    (State Stream m (Array IOVec)
 -> GatherState s (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> GatherState s (Array IOVec) -> Stream m (Array IOVec)
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m (Array IOVec)
-> GatherState s (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a.
State Stream m a
-> GatherState s (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
step' (s -> GatherState s (Array IOVec)
forall s arr. s -> GatherState s arr
GatherInitial s
state)

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> GatherState s (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
step' State Stream m a
gst (GatherInitial s
st) = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            -- XXX we can pass the module string from the higher level API
            [Char] -> m ()
forall a. HasCallStack => [Char] -> a
error ([Char] -> m ()) -> [Char] -> m ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Streamly.Internal.Data.Array.Foreign.Mut.Type.groupIOVecsOf: the size of "
                 [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"groups [" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
n [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
maxIOVLen Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            -- XXX we can pass the module string from the higher level API
            [Char] -> m ()
forall a. HasCallStack => [Char] -> a
error ([Char] -> m ()) -> [Char] -> m ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Streamly.Internal.Data.Array.Foreign.Mut.Type.groupIOVecsOf: the number of "
                 [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"IOVec entries [" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
n [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
        Step s (Array a)
r <- State Stream m (Array a) -> s -> m (Step s (Array a))
step (State Stream m a -> State Stream m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s (Array a)
r of
            D.Yield Array a
arr s
s -> do
                let p :: Ptr a
p = ForeignPtr a -> Ptr a
forall a. ForeignPtr a -> Ptr a
unsafeForeignPtrToPtr (Array a -> ForeignPtr a
forall a. Array a -> ForeignPtr a
aStart Array a
arr)
                    len :: Int
len = Array a -> Int
forall a. Array a -> Int
MArray.byteLength Array a
arr
                Array IOVec
iov <- IO (Array IOVec) -> m (Array IOVec)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array IOVec) -> m (Array IOVec))
-> IO (Array IOVec) -> m (Array IOVec)
forall a b. (a -> b) -> a -> b
$ Int -> IO (Array IOVec)
forall a. Storable a => Int -> IO (Array a)
MArray.newArray Int
maxIOVLen
                Array IOVec
iov' <- IO (Array IOVec) -> m (Array IOVec)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array IOVec) -> m (Array IOVec))
-> IO (Array IOVec) -> m (Array IOVec)
forall a b. (a -> b) -> a -> b
$ Array IOVec -> IOVec -> IO (Array IOVec)
forall a. Storable a => Array a -> a -> IO (Array a)
MArray.unsafeSnoc Array IOVec
iov (Ptr Word8 -> Word64 -> IOVec
IOVec (Ptr a -> Ptr Word8
forall a b. Ptr a -> Ptr b
castPtr Ptr a
p)
                                                (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len))
                if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
                then Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GatherState s (Array IOVec)) (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall a b. (a -> b) -> a -> b
$ GatherState s (Array IOVec)
-> Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. s -> Step s a
D.Skip (Array IOVec
-> GatherState s (Array IOVec) -> GatherState s (Array IOVec)
forall s arr. arr -> GatherState s arr -> GatherState s arr
GatherYielding Array IOVec
iov' (s -> GatherState s (Array IOVec)
forall s arr. s -> GatherState s arr
GatherInitial s
s))
                else Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GatherState s (Array IOVec)) (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall a b. (a -> b) -> a -> b
$ GatherState s (Array IOVec)
-> Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. s -> Step s a
D.Skip (s -> Array IOVec -> Int -> GatherState s (Array IOVec)
forall s arr. s -> arr -> Int -> GatherState s arr
GatherBuffering s
s Array IOVec
iov' Int
len)
            D.Skip s
s -> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GatherState s (Array IOVec)) (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall a b. (a -> b) -> a -> b
$ GatherState s (Array IOVec)
-> Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. s -> Step s a
D.Skip (s -> GatherState s (Array IOVec)
forall s arr. s -> GatherState s arr
GatherInitial s
s)
            Step s (Array a)
D.Stop -> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. Step s a
D.Stop

    step' State Stream m a
gst (GatherBuffering s
st Array IOVec
iov Int
len) = do
        Step s (Array a)
r <- State Stream m (Array a) -> s -> m (Step s (Array a))
step (State Stream m a -> State Stream m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s (Array a)
r of
            D.Yield Array a
arr s
s -> do
                let p :: Ptr a
p = ForeignPtr a -> Ptr a
forall a. ForeignPtr a -> Ptr a
unsafeForeignPtrToPtr (Array a -> ForeignPtr a
forall a. Array a -> ForeignPtr a
aStart Array a
arr)
                    alen :: Int
alen = Array a -> Int
forall a. Array a -> Int
MArray.byteLength Array a
arr
                    len' :: Int
len' = Int
len Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
alen
                if Int
len' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n Bool -> Bool -> Bool
|| Array IOVec -> Int
forall a. Storable a => Array a -> Int
length Array IOVec
iov Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
maxIOVLen
                then do
                    Array IOVec
iov' <- IO (Array IOVec) -> m (Array IOVec)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array IOVec) -> m (Array IOVec))
-> IO (Array IOVec) -> m (Array IOVec)
forall a b. (a -> b) -> a -> b
$ Int -> IO (Array IOVec)
forall a. Storable a => Int -> IO (Array a)
MArray.newArray Int
maxIOVLen
                    Array IOVec
iov'' <- IO (Array IOVec) -> m (Array IOVec)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array IOVec) -> m (Array IOVec))
-> IO (Array IOVec) -> m (Array IOVec)
forall a b. (a -> b) -> a -> b
$ Array IOVec -> IOVec -> IO (Array IOVec)
forall a. Storable a => Array a -> a -> IO (Array a)
MArray.unsafeSnoc Array IOVec
iov' (Ptr Word8 -> Word64 -> IOVec
IOVec (Ptr a -> Ptr Word8
forall a b. Ptr a -> Ptr b
castPtr Ptr a
p)
                                                      (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
alen))
                    Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GatherState s (Array IOVec)) (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall a b. (a -> b) -> a -> b
$ GatherState s (Array IOVec)
-> Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. s -> Step s a
D.Skip (Array IOVec
-> GatherState s (Array IOVec) -> GatherState s (Array IOVec)
forall s arr. arr -> GatherState s arr -> GatherState s arr
GatherYielding Array IOVec
iov
                                        (s -> Array IOVec -> Int -> GatherState s (Array IOVec)
forall s arr. s -> arr -> Int -> GatherState s arr
GatherBuffering s
s Array IOVec
iov'' Int
alen))
                else do
                    Array IOVec
iov' <- IO (Array IOVec) -> m (Array IOVec)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array IOVec) -> m (Array IOVec))
-> IO (Array IOVec) -> m (Array IOVec)
forall a b. (a -> b) -> a -> b
$ Array IOVec -> IOVec -> IO (Array IOVec)
forall a. Storable a => Array a -> a -> IO (Array a)
MArray.unsafeSnoc Array IOVec
iov (Ptr Word8 -> Word64 -> IOVec
IOVec (Ptr a -> Ptr Word8
forall a b. Ptr a -> Ptr b
castPtr Ptr a
p)
                                                    (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
alen))
                    Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GatherState s (Array IOVec)) (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall a b. (a -> b) -> a -> b
$ GatherState s (Array IOVec)
-> Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. s -> Step s a
D.Skip (s -> Array IOVec -> Int -> GatherState s (Array IOVec)
forall s arr. s -> arr -> Int -> GatherState s arr
GatherBuffering s
s Array IOVec
iov' Int
len')
            D.Skip s
s -> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GatherState s (Array IOVec)) (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall a b. (a -> b) -> a -> b
$ GatherState s (Array IOVec)
-> Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. s -> Step s a
D.Skip (s -> Array IOVec -> Int -> GatherState s (Array IOVec)
forall s arr. s -> arr -> Int -> GatherState s arr
GatherBuffering s
s Array IOVec
iov Int
len)
            Step s (Array a)
D.Stop -> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GatherState s (Array IOVec)) (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall a b. (a -> b) -> a -> b
$ GatherState s (Array IOVec)
-> Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. s -> Step s a
D.Skip (Array IOVec
-> GatherState s (Array IOVec) -> GatherState s (Array IOVec)
forall s arr. arr -> GatherState s arr -> GatherState s arr
GatherYielding Array IOVec
iov GatherState s (Array IOVec)
forall s arr. GatherState s arr
GatherFinish)

    step' State Stream m a
_ GatherState s (Array IOVec)
GatherFinish = Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. Step s a
D.Stop

    step' State Stream m a
_ (GatherYielding Array IOVec
iov GatherState s (Array IOVec)
next) = Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GatherState s (Array IOVec)) (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall a b. (a -> b) -> a -> b
$ Array IOVec
-> GatherState s (Array IOVec)
-> Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. a -> s -> Step s a
D.Yield Array IOVec
iov GatherState s (Array IOVec)
next
#endif