module Streamly.Internal.System.IOVec
( IOVec(..)
, c_writev
, c_safe_writev
#if !defined(mingw32_HOST_OS)
, groupIOVecsOf
, groupIOVecsOfMut
#endif
)
where
#include "inline.hs"
#if !defined(mingw32_HOST_OS)
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Foreign.Ptr (castPtr)
import Streamly.Internal.Data.Array.Foreign.Mut.Type (length)
import Streamly.Internal.Data.SVar.Type (adaptState)
import Streamly.Internal.Data.Array.Foreign.Mut.Type (Array(..))
import qualified Streamly.Internal.Data.Array.Foreign.Type as Array
import qualified Streamly.Internal.Data.Array.Foreign.Mut.Type as MArray
import qualified Streamly.Internal.Data.Stream.StreamD as D
#endif
import Streamly.Internal.System.IOVec.Type
import Prelude hiding (length)
#if !defined(mingw32_HOST_OS)
data GatherState s arr
= GatherInitial s
| GatherBuffering s arr Int
| GatherYielding arr (GatherState s arr)
| GatherFinish
{-# INLINE_NORMAL groupIOVecsOfMut #-}
groupIOVecsOfMut :: MonadIO m
=> Int -> Int -> D.Stream m (Array a) -> D.Stream m (Array IOVec)
groupIOVecsOfMut :: forall (m :: * -> *) a.
MonadIO m =>
Int -> Int -> Stream m (Array a) -> Stream m (Array IOVec)
groupIOVecsOfMut Int
n Int
maxIOVLen (D.Stream State Stream m (Array a) -> s -> m (Step s (Array a))
step s
state) =
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream forall {m :: * -> *} {a}.
State Stream m a
-> GatherState s (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
step' (forall s arr. s -> GatherState s arr
GatherInitial s
state)
where
{-# INLINE_LATE step' #-}
step' :: State Stream m a
-> GatherState s (Array IOVec)
-> m (Step (GatherState s (Array IOVec)) (Array IOVec))
step' State Stream m a
gst (GatherInitial s
st) = do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$
forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"Streamly.Internal.Data.Array.Foreign.Mut.Type.groupIOVecsOf: the size of "
forall a. [a] -> [a] -> [a]
++ [Char]
"groups [" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
n forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
maxIOVLen forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$
forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"Streamly.Internal.Data.Array.Foreign.Mut.Type.groupIOVecsOf: the number of "
forall a. [a] -> [a] -> [a]
++ [Char]
"IOVec entries [" forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
n forall a. [a] -> [a] -> [a]
++ [Char]
"] must be a natural number"
Step s (Array a)
r <- State Stream m (Array a) -> s -> m (Step s (Array a))
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
case Step s (Array a)
r of
D.Yield Array a
arr s
s -> do
let p :: Ptr a
p = forall a. Array a -> Ptr a
arrStart Array a
arr
len :: Int
len = forall a. Array a -> Int
MArray.byteLength Array a
arr
Array IOVec
iov <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> m (Array a)
MArray.newArray Int
maxIOVLen
Array IOVec
iov' <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> a -> m (Array a)
MArray.snocUnsafe Array IOVec
iov (Ptr Word8 -> Word64 -> IOVec
IOVec (forall a b. Ptr a -> Ptr b
castPtr Ptr a
p)
(forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len))
if Int
len forall a. Ord a => a -> a -> Bool
>= Int
n
then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. arr -> GatherState s arr -> GatherState s arr
GatherYielding Array IOVec
iov' (forall s arr. s -> GatherState s arr
GatherInitial s
s))
else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. s -> arr -> Int -> GatherState s arr
GatherBuffering s
s Array IOVec
iov' Int
len)
D.Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. s -> GatherState s arr
GatherInitial s
s)
Step s (Array a)
D.Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
D.Stop
step' State Stream m a
gst (GatherBuffering s
st Array IOVec
iov Int
len) = do
Step s (Array a)
r <- State Stream m (Array a) -> s -> m (Step s (Array a))
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
case Step s (Array a)
r of
D.Yield Array a
arr s
s -> do
let p :: Ptr a
p = forall a. Array a -> Ptr a
arrStart Array a
arr
alen :: Int
alen = forall a. Array a -> Int
MArray.byteLength Array a
arr
len' :: Int
len' = Int
len forall a. Num a => a -> a -> a
+ Int
alen
if Int
len' forall a. Ord a => a -> a -> Bool
> Int
n Bool -> Bool -> Bool
|| forall a. Storable a => Array a -> Int
length Array IOVec
iov forall a. Ord a => a -> a -> Bool
>= Int
maxIOVLen
then do
Array IOVec
iov' <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> m (Array a)
MArray.newArray Int
maxIOVLen
Array IOVec
iov'' <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> a -> m (Array a)
MArray.snocUnsafe Array IOVec
iov' (Ptr Word8 -> Word64 -> IOVec
IOVec (forall a b. Ptr a -> Ptr b
castPtr Ptr a
p)
(forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
alen))
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. arr -> GatherState s arr -> GatherState s arr
GatherYielding Array IOVec
iov
(forall s arr. s -> arr -> Int -> GatherState s arr
GatherBuffering s
s Array IOVec
iov'' Int
alen))
else do
Array IOVec
iov' <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Array a -> a -> m (Array a)
MArray.snocUnsafe Array IOVec
iov (Ptr Word8 -> Word64 -> IOVec
IOVec (forall a b. Ptr a -> Ptr b
castPtr Ptr a
p)
(forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
alen))
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. s -> arr -> Int -> GatherState s arr
GatherBuffering s
s Array IOVec
iov' Int
len')
D.Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. s -> arr -> Int -> GatherState s arr
GatherBuffering s
s Array IOVec
iov Int
len)
Step s (Array a)
D.Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall s arr. arr -> GatherState s arr -> GatherState s arr
GatherYielding Array IOVec
iov forall s arr. GatherState s arr
GatherFinish)
step' State Stream m a
_ GatherState s (Array IOVec)
GatherFinish = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
D.Stop
step' State Stream m a
_ (GatherYielding Array IOVec
iov GatherState s (Array IOVec)
next) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
D.Yield Array IOVec
iov GatherState s (Array IOVec)
next
{-# INLINE_NORMAL groupIOVecsOf #-}
groupIOVecsOf :: MonadIO m
=> Int
-> Int
-> D.Stream m (Array.Array a) -> D.Stream m (Array.Array IOVec)
groupIOVecsOf :: forall (m :: * -> *) a.
MonadIO m =>
Int -> Int -> Stream m (Array a) -> Stream m (Array IOVec)
groupIOVecsOf Int
n Int
maxIOVLen Stream m (Array a)
str =
forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
D.map forall a. Array a -> Array a
Array.unsafeFreeze
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadIO m =>
Int -> Int -> Stream m (Array a) -> Stream m (Array IOVec)
groupIOVecsOfMut Int
n Int
maxIOVLen
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
D.map forall a. Array a -> Array a
Array.unsafeThaw Stream m (Array a)
str
#endif