-- |
-- Module      : Streamly.Internal.System.IOVec
-- Copyright   : (c) 2019 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Low level IO routines interfacing the operating system.
--

module Streamly.Internal.System.IOVec
    ( IOVec(..)
    , c_writev
    , c_safe_writev
#if !defined(mingw32_HOST_OS)
    , groupIOVecsOf
    , groupIOVecsOfMut
#endif
    )
where

#include "inline.hs"

#if !defined(mingw32_HOST_OS)
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Foreign.Ptr (castPtr)
import Streamly.Internal.Data.Array.Foreign.Mut.Type (length)
import Streamly.Internal.Data.SVar.Type (adaptState)
import Streamly.Internal.Data.Array.Foreign.Mut.Type (Array(..))

import qualified Streamly.Internal.Data.Array.Foreign.Type as Array
import qualified Streamly.Internal.Data.Array.Foreign.Mut.Type as MArray
import qualified Streamly.Internal.Data.Stream.StreamD as D
#endif

import Streamly.Internal.System.IOVec.Type

import Prelude hiding (length)

#if !defined(mingw32_HOST_OS)
data GatherState s arr
    = GatherInitial s
    | GatherBuffering s arr Int
    | GatherYielding arr (GatherState s arr)
    | GatherFinish

-- | @groupIOVecsOf maxBytes maxEntries@ groups arrays in the incoming stream
-- to create a stream of 'IOVec' arrays with a maximum of @maxBytes@ bytes in
-- each array and a maximum of @maxEntries@ entries in each array.
--
-- @since 0.7.0
{-# INLINE_NORMAL groupIOVecsOfMut #-}
groupIOVecsOfMut :: MonadIO m
    => Int -> Int -> D.Stream m (Array a) -> D.Stream m (Array IOVec)
groupIOVecsOfMut :: forall (m :: * -> *) a.
MonadIO m =>
Int -> Int -> Stream m (Array a) -> Stream m (Array IOVec)
groupIOVecsOfMut Int
n Int
maxIOVLen (D.Stream State Stream m (Array a) -> s -> m (Step s (Array a))
step s
state) =
    forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream forall {m :: * -> *} {a}.
State Stream m a
-> GatherState s (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
step' (forall s arr. s -> GatherState s arr
GatherInitial s
state)

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> GatherState s (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
step' State Stream m a
gst (GatherInitial s
st) = do
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$
            -- XXX we can pass the module string from the higher level API
            forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"Streamly.Internal.Data.Array.Foreign.Mut.Type.groupIOVecsOf: the size of "
                 forall a. [a] -> [a] -> [a]
++ [Char]
"groups [" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
n forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
maxIOVLen forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$
            -- XXX we can pass the module string from the higher level API
            forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"Streamly.Internal.Data.Array.Foreign.Mut.Type.groupIOVecsOf: the number of "
                 forall a. [a] -> [a] -> [a]
++ [Char]
"IOVec entries [" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
n forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
        Step s (Array a)
r <- State Stream m (Array a) -> s -> m (Step s (Array a))
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s (Array a)
r of
            D.Yield Array a
arr s
s -> do
                let p :: Ptr a
p = forall a. Array a -> Ptr a
arrStart Array a
arr
                    len :: Int
len = forall a. Array a -> Int
MArray.byteLength Array a
arr
                Array IOVec
iov <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> m (Array a)
MArray.newArray Int
maxIOVLen
                Array IOVec
iov' <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> a -> m (Array a)
MArray.snocUnsafe Array IOVec
iov (Ptr Word8 -> Word64 -> IOVec
IOVec (forall a b. Ptr a -> Ptr b
castPtr Ptr a
p)
                                                (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len))
                if Int
len forall a. Ord a => a -> a -> Bool
>= Int
n
                then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. arr -> GatherState s arr -> GatherState s arr
GatherYielding Array IOVec
iov' (forall s arr. s -> GatherState s arr
GatherInitial s
s))
                else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. s -> arr -> Int -> GatherState s arr
GatherBuffering s
s Array IOVec
iov' Int
len)
            D.Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. s -> GatherState s arr
GatherInitial s
s)
            Step s (Array a)
D.Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
D.Stop

    step' State Stream m a
gst (GatherBuffering s
st Array IOVec
iov Int
len) = do
        Step s (Array a)
r <- State Stream m (Array a) -> s -> m (Step s (Array a))
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s (Array a)
r of
            D.Yield Array a
arr s
s -> do
                let p :: Ptr a
p = forall a. Array a -> Ptr a
arrStart Array a
arr
                    alen :: Int
alen = forall a. Array a -> Int
MArray.byteLength Array a
arr
                    len' :: Int
len' = Int
len forall a. Num a => a -> a -> a
+ Int
alen
                if Int
len' forall a. Ord a => a -> a -> Bool
> Int
n Bool -> Bool -> Bool
|| forall a. Storable a => Array a -> Int
length Array IOVec
iov forall a. Ord a => a -> a -> Bool
>= Int
maxIOVLen
                then do
                    Array IOVec
iov' <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> m (Array a)
MArray.newArray Int
maxIOVLen
                    Array IOVec
iov'' <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> a -> m (Array a)
MArray.snocUnsafe Array IOVec
iov' (Ptr Word8 -> Word64 -> IOVec
IOVec (forall a b. Ptr a -> Ptr b
castPtr Ptr a
p)
                                                      (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
alen))
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. arr -> GatherState s arr -> GatherState s arr
GatherYielding Array IOVec
iov
                                        (forall s arr. s -> arr -> Int -> GatherState s arr
GatherBuffering s
s Array IOVec
iov'' Int
alen))
                else do
                    Array IOVec
iov' <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> a -> m (Array a)
MArray.snocUnsafe Array IOVec
iov (Ptr Word8 -> Word64 -> IOVec
IOVec (forall a b. Ptr a -> Ptr b
castPtr Ptr a
p)
                                                    (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
alen))
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. s -> arr -> Int -> GatherState s arr
GatherBuffering s
s Array IOVec
iov' Int
len')
            D.Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. s -> arr -> Int -> GatherState s arr
GatherBuffering s
s Array IOVec
iov Int
len)
            Step s (Array a)
D.Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. arr -> GatherState s arr -> GatherState s arr
GatherYielding Array IOVec
iov forall s arr. GatherState s arr
GatherFinish)

    step' State Stream m a
_ GatherState s (Array IOVec)
GatherFinish = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
D.Stop

    step' State Stream m a
_ (GatherYielding Array IOVec
iov GatherState s (Array IOVec)
next) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
D.Yield Array IOVec
iov GatherState s (Array IOVec)
next

-- | @groupIOVecsOf maxBytes maxEntries@ groups arrays in the incoming stream
-- to create a stream of 'IOVec' arrays with a maximum of @maxBytes@ bytes in
-- each array and a maximum of @maxEntries@ entries in each array.
--
-- @since 0.7.0
{-# INLINE_NORMAL groupIOVecsOf #-}
groupIOVecsOf :: MonadIO m
    => Int
    -> Int
    -> D.Stream m (Array.Array a) -> D.Stream m (Array.Array IOVec)
groupIOVecsOf :: forall (m :: * -> *) a.
MonadIO m =>
Int -> Int -> Stream m (Array a) -> Stream m (Array IOVec)
groupIOVecsOf Int
n Int
maxIOVLen Stream m (Array a)
str =
    forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
D.map forall a. Array a -> Array a
Array.unsafeFreeze
        forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadIO m =>
Int -> Int -> Stream m (Array a) -> Stream m (Array IOVec)
groupIOVecsOfMut Int
n Int
maxIOVLen
        forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
D.map forall a. Array a -> Array a
Array.unsafeThaw Stream m (Array a)
str
#endif