-- |
-- Module      : Streamly.Internal.System.IOVec
-- Copyright   : (c) 2019 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Low level IO routines interfacing the operating system.
--

module Streamly.Internal.System.IOVec
    ( IOVec(..)
    , c_writev
    , c_safe_writev
#if !defined(mingw32_HOST_OS)
    , groupIOVecsOf
    , groupIOVecsOfMut
#endif
    )
where

#include "inline.hs"

#if !defined(mingw32_HOST_OS)
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Foreign.Ptr (castPtr)
import Streamly.Internal.Data.Array.Foreign.Mut.Type (length)
import Streamly.Internal.Data.SVar (adaptState)
import Streamly.Internal.Data.Array.Foreign.Mut.Type (Array(..))

import qualified Streamly.Internal.Data.Array.Foreign.Type as Array
import qualified Streamly.Internal.Data.Array.Foreign.Mut.Type as MArray
import qualified Streamly.Internal.Data.Stream.StreamD as D
#endif

import Streamly.Internal.System.IOVec.Type

import Prelude hiding (length)

#if !defined(mingw32_HOST_OS)
data GatherState s arr
    = GatherInitial s
    | GatherBuffering s arr Int
    | GatherYielding arr (GatherState s arr)
    | GatherFinish

-- | @groupIOVecsOf maxBytes maxEntries@ groups arrays in the incoming stream
-- to create a stream of 'IOVec' arrays with a maximum of @maxBytes@ bytes in
-- each array and a maximum of @maxEntries@ entries in each array.
--
-- @since 0.7.0
{-# INLINE_NORMAL groupIOVecsOfMut #-}
groupIOVecsOfMut :: MonadIO m
    => Int -> Int -> D.Stream m (Array a) -> D.Stream m (Array IOVec)
groupIOVecsOfMut :: Int -> Int -> Stream m (Array a) -> Stream m (Array IOVec)
groupIOVecsOfMut Int
n Int
maxIOVLen (D.Stream State Stream m (Array a) -> s -> m (Step s (Array a))
step s
state) =
    (State Stream m (Array IOVec)
 -> GatherState s (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> GatherState s (Array IOVec) -> Stream m (Array IOVec)
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m (Array IOVec)
-> GatherState s (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a.
State Stream m a
-> GatherState s (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
step' (s -> GatherState s (Array IOVec)
forall s arr. s -> GatherState s arr
GatherInitial s
state)

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> GatherState s (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
step' State Stream m a
gst (GatherInitial s
st) = do
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            -- XXX we can pass the module string from the higher level API
            [Char] -> m ()
forall a. HasCallStack => [Char] -> a
error ([Char] -> m ()) -> [Char] -> m ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Streamly.Internal.Data.Array.Foreign.Mut.Type.groupIOVecsOf: the size of "
                 [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"groups [" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
n [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
maxIOVLen Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            -- XXX we can pass the module string from the higher level API
            [Char] -> m ()
forall a. HasCallStack => [Char] -> a
error ([Char] -> m ()) -> [Char] -> m ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Streamly.Internal.Data.Array.Foreign.Mut.Type.groupIOVecsOf: the number of "
                 [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"IOVec entries [" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
n [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
        Step s (Array a)
r <- State Stream m (Array a) -> s -> m (Step s (Array a))
step (State Stream m a -> State Stream m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s (Array a)
r of
            D.Yield Array a
arr s
s -> do
                let p :: Ptr a
p = Array a -> Ptr a
forall a. Array a -> Ptr a
arrStart Array a
arr
                    len :: Int
len = Array a -> Int
forall a. Array a -> Int
MArray.byteLength Array a
arr
                Array IOVec
iov <- IO (Array IOVec) -> m (Array IOVec)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array IOVec) -> m (Array IOVec))
-> IO (Array IOVec) -> m (Array IOVec)
forall a b. (a -> b) -> a -> b
$ Int -> IO (Array IOVec)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> m (Array a)
MArray.newArray Int
maxIOVLen
                Array IOVec
iov' <- IO (Array IOVec) -> m (Array IOVec)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array IOVec) -> m (Array IOVec))
-> IO (Array IOVec) -> m (Array IOVec)
forall a b. (a -> b) -> a -> b
$ Array IOVec -> IOVec -> IO (Array IOVec)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> a -> m (Array a)
MArray.snocUnsafe Array IOVec
iov (Ptr Word8 -> Word64 -> IOVec
IOVec (Ptr a -> Ptr Word8
forall a b. Ptr a -> Ptr b
castPtr Ptr a
p)
                                                (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len))
                if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
                then Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GatherState s (Array IOVec)) (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall a b. (a -> b) -> a -> b
$ GatherState s (Array IOVec)
-> Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. s -> Step s a
D.Skip (Array IOVec
-> GatherState s (Array IOVec) -> GatherState s (Array IOVec)
forall s arr. arr -> GatherState s arr -> GatherState s arr
GatherYielding Array IOVec
iov' (s -> GatherState s (Array IOVec)
forall s arr. s -> GatherState s arr
GatherInitial s
s))
                else Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GatherState s (Array IOVec)) (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall a b. (a -> b) -> a -> b
$ GatherState s (Array IOVec)
-> Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. s -> Step s a
D.Skip (s -> Array IOVec -> Int -> GatherState s (Array IOVec)
forall s arr. s -> arr -> Int -> GatherState s arr
GatherBuffering s
s Array IOVec
iov' Int
len)
            D.Skip s
s -> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GatherState s (Array IOVec)) (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall a b. (a -> b) -> a -> b
$ GatherState s (Array IOVec)
-> Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. s -> Step s a
D.Skip (s -> GatherState s (Array IOVec)
forall s arr. s -> GatherState s arr
GatherInitial s
s)
            Step s (Array a)
D.Stop -> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. Step s a
D.Stop

    step' State Stream m a
gst (GatherBuffering s
st Array IOVec
iov Int
len) = do
        Step s (Array a)
r <- State Stream m (Array a) -> s -> m (Step s (Array a))
step (State Stream m a -> State Stream m (Array a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s (Array a)
r of
            D.Yield Array a
arr s
s -> do
                let p :: Ptr a
p = Array a -> Ptr a
forall a. Array a -> Ptr a
arrStart Array a
arr
                    alen :: Int
alen = Array a -> Int
forall a. Array a -> Int
MArray.byteLength Array a
arr
                    len' :: Int
len' = Int
len Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
alen
                if Int
len' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n Bool -> Bool -> Bool
|| Array IOVec -> Int
forall a. Storable a => Array a -> Int
length Array IOVec
iov Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
maxIOVLen
                then do
                    Array IOVec
iov' <- IO (Array IOVec) -> m (Array IOVec)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array IOVec) -> m (Array IOVec))
-> IO (Array IOVec) -> m (Array IOVec)
forall a b. (a -> b) -> a -> b
$ Int -> IO (Array IOVec)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> m (Array a)
MArray.newArray Int
maxIOVLen
                    Array IOVec
iov'' <- IO (Array IOVec) -> m (Array IOVec)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array IOVec) -> m (Array IOVec))
-> IO (Array IOVec) -> m (Array IOVec)
forall a b. (a -> b) -> a -> b
$ Array IOVec -> IOVec -> IO (Array IOVec)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> a -> m (Array a)
MArray.snocUnsafe Array IOVec
iov' (Ptr Word8 -> Word64 -> IOVec
IOVec (Ptr a -> Ptr Word8
forall a b. Ptr a -> Ptr b
castPtr Ptr a
p)
                                                      (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
alen))
                    Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GatherState s (Array IOVec)) (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall a b. (a -> b) -> a -> b
$ GatherState s (Array IOVec)
-> Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. s -> Step s a
D.Skip (Array IOVec
-> GatherState s (Array IOVec) -> GatherState s (Array IOVec)
forall s arr. arr -> GatherState s arr -> GatherState s arr
GatherYielding Array IOVec
iov
                                        (s -> Array IOVec -> Int -> GatherState s (Array IOVec)
forall s arr. s -> arr -> Int -> GatherState s arr
GatherBuffering s
s Array IOVec
iov'' Int
alen))
                else do
                    Array IOVec
iov' <- IO (Array IOVec) -> m (Array IOVec)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array IOVec) -> m (Array IOVec))
-> IO (Array IOVec) -> m (Array IOVec)
forall a b. (a -> b) -> a -> b
$ Array IOVec -> IOVec -> IO (Array IOVec)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> a -> m (Array a)
MArray.snocUnsafe Array IOVec
iov (Ptr Word8 -> Word64 -> IOVec
IOVec (Ptr a -> Ptr Word8
forall a b. Ptr a -> Ptr b
castPtr Ptr a
p)
                                                    (Int -> Word64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
alen))
                    Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GatherState s (Array IOVec)) (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall a b. (a -> b) -> a -> b
$ GatherState s (Array IOVec)
-> Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. s -> Step s a
D.Skip (s -> Array IOVec -> Int -> GatherState s (Array IOVec)
forall s arr. s -> arr -> Int -> GatherState s arr
GatherBuffering s
s Array IOVec
iov' Int
len')
            D.Skip s
s -> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GatherState s (Array IOVec)) (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall a b. (a -> b) -> a -> b
$ GatherState s (Array IOVec)
-> Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. s -> Step s a
D.Skip (s -> Array IOVec -> Int -> GatherState s (Array IOVec)
forall s arr. s -> arr -> Int -> GatherState s arr
GatherBuffering s
s Array IOVec
iov Int
len)
            Step s (Array a)
D.Stop -> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GatherState s (Array IOVec)) (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall a b. (a -> b) -> a -> b
$ GatherState s (Array IOVec)
-> Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. s -> Step s a
D.Skip (Array IOVec
-> GatherState s (Array IOVec) -> GatherState s (Array IOVec)
forall s arr. arr -> GatherState s arr -> GatherState s arr
GatherYielding Array IOVec
iov GatherState s (Array IOVec)
forall s arr. GatherState s arr
GatherFinish)

    step' State Stream m a
_ GatherState s (Array IOVec)
GatherFinish = Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. Step s a
D.Stop

    step' State Stream m a
_ (GatherYielding Array IOVec
iov GatherState s (Array IOVec)
next) = Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GatherState s (Array IOVec)) (Array IOVec)
 -> m (Step (GatherState s (Array IOVec)) (Array IOVec)))
-> Step (GatherState s (Array IOVec)) (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
forall a b. (a -> b) -> a -> b
$ Array IOVec
-> GatherState s (Array IOVec)
-> Step (GatherState s (Array IOVec)) (Array IOVec)
forall s a. a -> s -> Step s a
D.Yield Array IOVec
iov GatherState s (Array IOVec)
next

-- | @groupIOVecsOf maxBytes maxEntries@ groups arrays in the incoming stream
-- to create a stream of 'IOVec' arrays with a maximum of @maxBytes@ bytes in
-- each array and a maximum of @maxEntries@ entries in each array.
--
-- @since 0.7.0
{-# INLINE_NORMAL groupIOVecsOf #-}
groupIOVecsOf :: MonadIO m
    => Int
    -> Int
    -> D.Stream m (Array.Array a) -> D.Stream m (Array.Array IOVec)
groupIOVecsOf :: Int -> Int -> Stream m (Array a) -> Stream m (Array IOVec)
groupIOVecsOf Int
n Int
maxIOVLen Stream m (Array a)
str =
    (Array IOVec -> Array IOVec)
-> Stream m (Array IOVec) -> Stream m (Array IOVec)
forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
D.map Array IOVec -> Array IOVec
forall a. Array a -> Array a
Array.unsafeFreeze
        (Stream m (Array IOVec) -> Stream m (Array IOVec))
-> Stream m (Array IOVec) -> Stream m (Array IOVec)
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Stream m (Array a) -> Stream m (Array IOVec)
forall (m :: * -> *) a.
MonadIO m =>
Int -> Int -> Stream m (Array a) -> Stream m (Array IOVec)
groupIOVecsOfMut Int
n Int
maxIOVLen
        (Stream m (Array a) -> Stream m (Array IOVec))
-> Stream m (Array a) -> Stream m (Array IOVec)
forall a b. (a -> b) -> a -> b
$ (Array a -> Array a) -> Stream m (Array a) -> Stream m (Array a)
forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
D.map Array a -> Array a
forall a. Array a -> Array a
Array.unsafeThaw Stream m (Array a)
str
#endif