#include "inline.hs"
module Streamly.Internal.Network.Socket
(
SockSpec (..)
, forSocketM
, withSocket
, accept
, acceptor
, connect
, connectFrom
, getChunk
, read
, readWith
, readChunks
, readChunksWith
, reader
, readerWith
, chunkReader
, chunkReaderWith
, putChunk
, write
, writeWith
, writeChunks
, writeChunksWith
, writeMaybesWith
, putChunks
, putBytesWith
, putBytes
, readWithBufferOf
, readChunksWithBufferOf
, writeWithBufferOf
, writeChunksWithBufferOf
)
where
import Control.Concurrent (threadWaitWrite, rtsSupportsBoundThreads)
import Control.Exception (onException)
import Control.Monad (forM_, when)
import Control.Monad.Catch (MonadCatch, finally, MonadMask)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Maybe (isNothing, fromJust)
import Data.Word (Word8)
import Foreign.Ptr (plusPtr, Ptr, castPtr)
import Streamly.Data.MutByteArray (Unbox)
import Network.Socket
(Socket, SocketOption(..), Family(..), SockAddr(..),
ProtocolNumber, withSocketsDo, SocketType(..), socket, bind,
setSocketOption, sendBuf, recvBuf)
#if MIN_VERSION_network(3,1,0)
import Network.Socket (withFdSocket)
#else
import Network.Socket (fdSocket)
#endif
import Prelude hiding (read)
import qualified Network.Socket as Net
import Streamly.Internal.Data.Array (Array(..))
import Streamly.Data.Fold (Fold)
import Streamly.Data.Stream (Stream)
import Streamly.Internal.Data.Unfold (Unfold(..))
import Streamly.Internal.System.IO (defaultChunkSize)
import qualified Streamly.Data.Array as A
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Data.Stream as S
import qualified Streamly.Data.Unfold as UF
import qualified Streamly.Internal.Data.Array as A
( unsafeFreeze, unsafePinnedAsPtr, byteLength, pinnedChunksOf,
pinnedCreateOf, unsafePinnedCreateOf, lCompactGE )
import qualified Streamly.Internal.Data.MutArray as MArray
(MutArray(..), unsafePinnedAsPtr, pinnedEmptyOf)
import qualified Streamly.Internal.Data.Stream as S (fromStreamK, Stream(..), Step(..))
import qualified Streamly.Internal.Data.StreamK as K (mkStream)
{-# INLINE forSocketM #-}
forSocketM :: (MonadMask m, MonadIO m) => (Socket -> m ()) -> Socket -> m ()
forSocketM :: forall (m :: * -> *).
(MonadMask m, MonadIO m) =>
(Socket -> m ()) -> Socket -> m ()
forSocketM Socket -> m ()
f Socket
sk = m () -> m () -> m ()
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
finally (Socket -> m ()
f Socket
sk) (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO ()
Net.close Socket
sk))
{-# INLINE withSocket #-}
withSocket :: (MonadIO m, MonadCatch m) =>
Socket -> (Socket -> Stream m a) -> Stream m a
withSocket :: forall (m :: * -> *) a.
(MonadIO m, MonadCatch m) =>
Socket -> (Socket -> Stream m a) -> Stream m a
withSocket Socket
sk Socket -> Stream m a
f = IO () -> Stream m a -> Stream m a
forall (m :: * -> *) b a.
(MonadIO m, MonadCatch m) =>
IO b -> Stream m a -> Stream m a
S.finallyIO (Socket -> IO ()
Net.close Socket
sk) (Socket -> Stream m a
f Socket
sk)
data SockSpec = SockSpec
{
SockSpec -> Family
sockFamily :: !Family
, SockSpec -> SocketType
sockType :: !SocketType
, SockSpec -> CInt
sockProto :: !ProtocolNumber
, SockSpec -> [(SocketOption, Int)]
sockOpts :: ![(SocketOption, Int)]
}
initListener :: Int -> SockSpec -> SockAddr -> IO Socket
initListener :: Int -> SockSpec -> SockAddr -> IO Socket
initListener Int
listenQLen SockSpec{[(SocketOption, Int)]
CInt
SocketType
Family
sockOpts :: [(SocketOption, Int)]
sockProto :: CInt
sockType :: SocketType
sockFamily :: Family
sockOpts :: SockSpec -> [(SocketOption, Int)]
sockProto :: SockSpec -> CInt
sockType :: SockSpec -> SocketType
sockFamily :: SockSpec -> Family
..} SockAddr
addr =
IO Socket -> IO Socket
forall a. IO a -> IO a
withSocketsDo (IO Socket -> IO Socket) -> IO Socket -> IO Socket
forall a b. (a -> b) -> a -> b
$ do
Socket
sock <- Family -> SocketType -> CInt -> IO Socket
socket Family
sockFamily SocketType
sockType CInt
sockProto
Socket -> IO ()
use Socket
sock IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`onException` Socket -> IO ()
Net.close Socket
sock
Socket -> IO Socket
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sock
where
use :: Socket -> IO ()
use Socket
sock = do
((SocketOption, Int) -> IO ()) -> [(SocketOption, Int)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(SocketOption
opt, Int
val) -> Socket -> SocketOption -> Int -> IO ()
setSocketOption Socket
sock SocketOption
opt Int
val) [(SocketOption, Int)]
sockOpts
Socket -> SockAddr -> IO ()
bind Socket
sock SockAddr
addr
Socket -> Int -> IO ()
Net.listen Socket
sock Int
listenQLen
{-# INLINE listenTuples #-}
listenTuples :: MonadIO m
=> Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
listenTuples :: forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
listenTuples = (Socket -> m (Step Socket (Socket, SockAddr)))
-> ((Int, SockSpec, SockAddr) -> m Socket)
-> Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
Unfold Socket -> m (Step Socket (Socket, SockAddr))
forall {m :: * -> *}.
MonadIO m =>
Socket -> m (Step Socket (Socket, SockAddr))
step (Int, SockSpec, SockAddr) -> m Socket
forall {m :: * -> *}.
MonadIO m =>
(Int, SockSpec, SockAddr) -> m Socket
inject
where
inject :: (Int, SockSpec, SockAddr) -> m Socket
inject (Int
listenQLen, SockSpec
spec, SockAddr
addr) =
IO Socket -> m Socket
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Socket -> m Socket) -> IO Socket -> m Socket
forall a b. (a -> b) -> a -> b
$ Int -> SockSpec -> SockAddr -> IO Socket
initListener Int
listenQLen SockSpec
spec SockAddr
addr
step :: Socket -> m (Step Socket (Socket, SockAddr))
step Socket
listener = do
(Socket, SockAddr)
r <- IO (Socket, SockAddr) -> m (Socket, SockAddr)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO (Socket, SockAddr)
Net.accept Socket
listener IO (Socket, SockAddr) -> IO () -> IO (Socket, SockAddr)
forall a b. IO a -> IO b -> IO a
`onException` Socket -> IO ()
Net.close Socket
listener)
Step Socket (Socket, SockAddr)
-> m (Step Socket (Socket, SockAddr))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Socket (Socket, SockAddr)
-> m (Step Socket (Socket, SockAddr)))
-> Step Socket (Socket, SockAddr)
-> m (Step Socket (Socket, SockAddr))
forall a b. (a -> b) -> a -> b
$ (Socket, SockAddr) -> Socket -> Step Socket (Socket, SockAddr)
forall s a. a -> s -> Step s a
S.Yield (Socket, SockAddr)
r Socket
listener
{-# INLINE acceptor #-}
acceptor :: MonadIO m => Unfold m (Int, SockSpec, SockAddr) Socket
acceptor :: forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, SockSpec, SockAddr) Socket
acceptor = ((Socket, SockAddr) -> Socket)
-> Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
-> Unfold m (Int, SockSpec, SockAddr) Socket
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Socket, SockAddr) -> Socket
forall a b. (a, b) -> a
fst Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, SockSpec, SockAddr) (Socket, SockAddr)
listenTuples
{-# INLINE connectCommon #-}
connectCommon :: SockSpec -> Maybe SockAddr -> SockAddr -> IO Socket
connectCommon :: SockSpec -> Maybe SockAddr -> SockAddr -> IO Socket
connectCommon SockSpec{[(SocketOption, Int)]
CInt
SocketType
Family
sockOpts :: [(SocketOption, Int)]
sockProto :: CInt
sockType :: SocketType
sockFamily :: Family
sockOpts :: SockSpec -> [(SocketOption, Int)]
sockProto :: SockSpec -> CInt
sockType :: SockSpec -> SocketType
sockFamily :: SockSpec -> Family
..} Maybe SockAddr
local SockAddr
remote = IO Socket -> IO Socket
forall a. IO a -> IO a
withSocketsDo (IO Socket -> IO Socket) -> IO Socket -> IO Socket
forall a b. (a -> b) -> a -> b
$ do
Socket
sock <- Family -> SocketType -> CInt -> IO Socket
socket Family
sockFamily SocketType
sockType CInt
sockProto
Socket -> IO ()
use Socket
sock IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`onException` Socket -> IO ()
Net.close Socket
sock
Socket -> IO Socket
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sock
where
use :: Socket -> IO ()
use Socket
sock = do
((SocketOption, Int) -> IO ()) -> [(SocketOption, Int)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(SocketOption
opt, Int
val) -> Socket -> SocketOption -> Int -> IO ()
setSocketOption Socket
sock SocketOption
opt Int
val) [(SocketOption, Int)]
sockOpts
Maybe SockAddr -> (SockAddr -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe SockAddr
local (Socket -> SockAddr -> IO ()
bind Socket
sock)
Socket -> SockAddr -> IO ()
Net.connect Socket
sock SockAddr
remote
{-# INLINE connect #-}
connect :: SockSpec -> SockAddr -> IO Socket
connect :: SockSpec -> SockAddr -> IO Socket
connect SockSpec
spec = SockSpec -> Maybe SockAddr -> SockAddr -> IO Socket
connectCommon SockSpec
spec Maybe SockAddr
forall a. Maybe a
Nothing
{-# INLINE connectFrom #-}
connectFrom :: SockSpec -> SockAddr -> SockAddr -> IO Socket
connectFrom :: SockSpec -> SockAddr -> SockAddr -> IO Socket
connectFrom SockSpec
spec SockAddr
local = SockSpec -> Maybe SockAddr -> SockAddr -> IO Socket
connectCommon SockSpec
spec (SockAddr -> Maybe SockAddr
forall a. a -> Maybe a
Just SockAddr
local)
{-# INLINE recvConnectionTuplesWith #-}
recvConnectionTuplesWith :: MonadIO m
=> Int -> SockSpec -> SockAddr -> Stream m (Socket, SockAddr)
recvConnectionTuplesWith :: forall (m :: * -> *).
MonadIO m =>
Int -> SockSpec -> SockAddr -> Stream m (Socket, SockAddr)
recvConnectionTuplesWith Int
tcpListenQ SockSpec
spec SockAddr
addr = (Maybe Socket -> m (Maybe ((Socket, SockAddr), Maybe Socket)))
-> Maybe Socket -> Stream m (Socket, SockAddr)
forall (m :: * -> *) s a.
Monad m =>
(s -> m (Maybe (a, s))) -> s -> Stream m a
S.unfoldrM Maybe Socket -> m (Maybe ((Socket, SockAddr), Maybe Socket))
forall {m :: * -> *}.
MonadIO m =>
Maybe Socket -> m (Maybe ((Socket, SockAddr), Maybe Socket))
step Maybe Socket
forall a. Maybe a
Nothing
where
step :: Maybe Socket -> m (Maybe ((Socket, SockAddr), Maybe Socket))
step Maybe Socket
Nothing = do
Socket
listener <- IO Socket -> m Socket
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Socket -> m Socket) -> IO Socket -> m Socket
forall a b. (a -> b) -> a -> b
$ Int -> SockSpec -> SockAddr -> IO Socket
initListener Int
tcpListenQ SockSpec
spec SockAddr
addr
(Socket, SockAddr)
r <- IO (Socket, SockAddr) -> m (Socket, SockAddr)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO (Socket, SockAddr)
Net.accept Socket
listener IO (Socket, SockAddr) -> IO () -> IO (Socket, SockAddr)
forall a b. IO a -> IO b -> IO a
`onException` Socket -> IO ()
Net.close Socket
listener)
Maybe ((Socket, SockAddr), Maybe Socket)
-> m (Maybe ((Socket, SockAddr), Maybe Socket))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ((Socket, SockAddr), Maybe Socket)
-> m (Maybe ((Socket, SockAddr), Maybe Socket)))
-> Maybe ((Socket, SockAddr), Maybe Socket)
-> m (Maybe ((Socket, SockAddr), Maybe Socket))
forall a b. (a -> b) -> a -> b
$ ((Socket, SockAddr), Maybe Socket)
-> Maybe ((Socket, SockAddr), Maybe Socket)
forall a. a -> Maybe a
Just ((Socket, SockAddr)
r, Socket -> Maybe Socket
forall a. a -> Maybe a
Just Socket
listener)
step (Just Socket
listener) = do
(Socket, SockAddr)
r <- IO (Socket, SockAddr) -> m (Socket, SockAddr)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO (Socket, SockAddr)
Net.accept Socket
listener IO (Socket, SockAddr) -> IO () -> IO (Socket, SockAddr)
forall a b. IO a -> IO b -> IO a
`onException` Socket -> IO ()
Net.close Socket
listener)
Maybe ((Socket, SockAddr), Maybe Socket)
-> m (Maybe ((Socket, SockAddr), Maybe Socket))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ((Socket, SockAddr), Maybe Socket)
-> m (Maybe ((Socket, SockAddr), Maybe Socket)))
-> Maybe ((Socket, SockAddr), Maybe Socket)
-> m (Maybe ((Socket, SockAddr), Maybe Socket))
forall a b. (a -> b) -> a -> b
$ ((Socket, SockAddr), Maybe Socket)
-> Maybe ((Socket, SockAddr), Maybe Socket)
forall a. a -> Maybe a
Just ((Socket, SockAddr)
r, Socket -> Maybe Socket
forall a. a -> Maybe a
Just Socket
listener)
{-# INLINE accept #-}
accept :: MonadIO m => Int -> SockSpec -> SockAddr -> Stream m Socket
accept :: forall (m :: * -> *).
MonadIO m =>
Int -> SockSpec -> SockAddr -> Stream m Socket
accept Int
tcpListenQ SockSpec
spec SockAddr
addr =
(Socket, SockAddr) -> Socket
forall a b. (a, b) -> a
fst ((Socket, SockAddr) -> Socket)
-> Stream m (Socket, SockAddr) -> Stream m Socket
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> SockSpec -> SockAddr -> Stream m (Socket, SockAddr)
forall (m :: * -> *).
MonadIO m =>
Int -> SockSpec -> SockAddr -> Stream m (Socket, SockAddr)
recvConnectionTuplesWith Int
tcpListenQ SockSpec
spec SockAddr
addr
{-# INLINABLE readArrayUptoWith #-}
readArrayUptoWith
:: (h -> Ptr Word8 -> Int -> IO Int)
-> Int
-> h
-> IO (Array Word8)
readArrayUptoWith :: forall h.
(h -> Ptr Word8 -> Int -> IO Int) -> Int -> h -> IO (Array Word8)
readArrayUptoWith h -> Ptr Word8 -> Int -> IO Int
f Int
size h
h = do
MutArray Word8
arr <- Int -> IO (MutArray Word8)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> m (MutArray a)
MArray.pinnedEmptyOf Int
size
MutArray Word8
-> (Ptr Word8 -> IO (Array Word8)) -> IO (Array Word8)
forall (m :: * -> *) a b.
MonadIO m =>
MutArray a -> (Ptr a -> m b) -> m b
MArray.unsafePinnedAsPtr MutArray Word8
arr ((Ptr Word8 -> IO (Array Word8)) -> IO (Array Word8))
-> (Ptr Word8 -> IO (Array Word8)) -> IO (Array Word8)
forall a b. (a -> b) -> a -> b
$ \Ptr Word8
p -> do
Int
n <- h -> Ptr Word8 -> Int -> IO Int
f h
h Ptr Word8
p Int
size
let v :: Array a
v = MutArray a -> Array a
forall a. MutArray a -> Array a
A.unsafeFreeze
(MutArray a -> Array a) -> MutArray a -> Array a
forall a b. (a -> b) -> a -> b
$ MutArray Word8
arr { arrEnd :: Int
MArray.arrEnd = Int
n, arrBound :: Int
MArray.arrBound = Int
size }
Array Word8 -> IO (Array Word8)
forall (m :: * -> *) a. Monad m => a -> m a
return Array Word8
forall {a}. Array a
v
{-# INLINABLE getChunk #-}
getChunk :: Int -> Socket -> IO (Array Word8)
getChunk :: Int -> Socket -> IO (Array Word8)
getChunk = (Socket -> Ptr Word8 -> Int -> IO Int)
-> Int -> Socket -> IO (Array Word8)
forall h.
(h -> Ptr Word8 -> Int -> IO Int) -> Int -> h -> IO (Array Word8)
readArrayUptoWith Socket -> Ptr Word8 -> Int -> IO Int
recvBuf
waitWhen0 :: Int -> Socket -> IO ()
waitWhen0 :: Int -> Socket -> IO ()
waitWhen0 Int
0 Socket
s = Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
rtsSupportsBoundThreads (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
#if MIN_VERSION_network(3,1,0)
Socket -> (CInt -> IO ()) -> IO ()
forall r. Socket -> (CInt -> IO r) -> IO r
withFdSocket Socket
s ((CInt -> IO ()) -> IO ()) -> (CInt -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \CInt
fd -> Fd -> IO ()
threadWaitWrite (Fd -> IO ()) -> Fd -> IO ()
forall a b. (a -> b) -> a -> b
$ CInt -> Fd
forall a b. (Integral a, Num b) => a -> b
fromIntegral CInt
fd
#elif MIN_VERSION_network(3,0,0)
fdSocket s >>= threadWaitWrite . fromIntegral
#else
let fd = fdSocket s in threadWaitWrite $ fromIntegral fd
#endif
waitWhen0 Int
_ Socket
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendAll :: Socket -> Ptr Word8 -> Int -> IO ()
sendAll :: Socket -> Ptr Word8 -> Int -> IO ()
sendAll Socket
_ Ptr Word8
_ Int
len | Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
sendAll Socket
s Ptr Word8
p Int
len = do
Int
sent <- Socket -> Ptr Word8 -> Int -> IO Int
sendBuf Socket
s Ptr Word8
p Int
len
Int -> Socket -> IO ()
waitWhen0 Int
sent Socket
s
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
sent Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Socket -> Ptr Word8 -> Int -> IO ()
sendAll Socket
s (Ptr Word8
p Ptr Word8 -> Int -> Ptr Word8
forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
sent) (Int
len Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
sent)
{-# INLINABLE writeArrayWith #-}
writeArrayWith :: Unbox a
=> (h -> Ptr Word8 -> Int -> IO ())
-> h
-> Array a
-> IO ()
writeArrayWith :: forall a h.
Unbox a =>
(h -> Ptr Word8 -> Int -> IO ()) -> h -> Array a -> IO ()
writeArrayWith h -> Ptr Word8 -> Int -> IO ()
_ h
_ Array a
arr | Array a -> Int
forall a. Unbox a => Array a -> Int
A.length Array a
arr Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
writeArrayWith h -> Ptr Word8 -> Int -> IO ()
f h
h Array a
arr = Array a -> (Ptr a -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadIO m =>
Array a -> (Ptr a -> m b) -> m b
A.unsafePinnedAsPtr Array a
arr ((Ptr a -> IO ()) -> IO ()) -> (Ptr a -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Ptr a
ptr -> h -> Ptr Word8 -> Int -> IO ()
f h
h (Ptr a -> Ptr Word8
forall a b. Ptr a -> Ptr b
castPtr Ptr a
ptr) Int
aLen
where
aLen :: Int
aLen = Array a -> Int
forall a. Array a -> Int
A.byteLength Array a
arr
{-# INLINABLE putChunk #-}
putChunk :: Unbox a => Socket -> Array a -> IO ()
putChunk :: forall a. Unbox a => Socket -> Array a -> IO ()
putChunk = (Socket -> Ptr Word8 -> Int -> IO ()) -> Socket -> Array a -> IO ()
forall a h.
Unbox a =>
(h -> Ptr Word8 -> Int -> IO ()) -> h -> Array a -> IO ()
writeArrayWith Socket -> Ptr Word8 -> Int -> IO ()
sendAll
{-# INLINE _readChunksUptoWith #-}
_readChunksUptoWith :: (MonadIO m)
=> (Int -> h -> IO (Array Word8))
-> Int -> h -> Stream m (Array Word8)
_readChunksUptoWith :: forall (m :: * -> *) h.
MonadIO m =>
(Int -> h -> IO (Array Word8))
-> Int -> h -> Stream m (Array Word8)
_readChunksUptoWith Int -> h -> IO (Array Word8)
f Int
size h
h = StreamK m (Array Word8) -> Stream m (Array Word8)
forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
S.fromStreamK StreamK m (Array Word8)
go
where
go :: StreamK m (Array Word8)
go = (forall r.
State StreamK m (Array Word8)
-> (Array Word8 -> StreamK m (Array Word8) -> m r)
-> (Array Word8 -> m r)
-> m r
-> m r)
-> StreamK m (Array Word8)
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m (Array Word8)
-> (Array Word8 -> StreamK m (Array Word8) -> m r)
-> (Array Word8 -> m r)
-> m r
-> m r)
-> StreamK m (Array Word8))
-> (forall r.
State StreamK m (Array Word8)
-> (Array Word8 -> StreamK m (Array Word8) -> m r)
-> (Array Word8 -> m r)
-> m r
-> m r)
-> StreamK m (Array Word8)
forall a b. (a -> b) -> a -> b
$ \State StreamK m (Array Word8)
_ Array Word8 -> StreamK m (Array Word8) -> m r
yld Array Word8 -> m r
_ m r
stp -> do
Array Word8
arr <- IO (Array Word8) -> m (Array Word8)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array Word8) -> m (Array Word8))
-> IO (Array Word8) -> m (Array Word8)
forall a b. (a -> b) -> a -> b
$ Int -> h -> IO (Array Word8)
f Int
size h
h
if Array Word8 -> Int
forall a. Unbox a => Array a -> Int
A.length Array Word8
arr Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then m r
stp
else Array Word8 -> StreamK m (Array Word8) -> m r
yld Array Word8
arr StreamK m (Array Word8)
go
{-# INLINE_NORMAL readChunksWith #-}
readChunksWith :: MonadIO m => Int -> Socket -> Stream m (Array Word8)
readChunksWith :: forall (m :: * -> *).
MonadIO m =>
Int -> Socket -> Stream m (Array Word8)
readChunksWith Int
size Socket
h = (State StreamK m (Array Word8) -> () -> m (Step () (Array Word8)))
-> () -> Stream m (Array Word8)
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
S.Stream State StreamK m (Array Word8) -> () -> m (Step () (Array Word8))
forall {m :: * -> *} {p} {p}.
MonadIO m =>
p -> p -> m (Step () (Array Word8))
step ()
where
{-# INLINE_LATE step #-}
step :: p -> p -> m (Step () (Array Word8))
step p
_ p
_ = do
Array Word8
arr <- IO (Array Word8) -> m (Array Word8)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array Word8) -> m (Array Word8))
-> IO (Array Word8) -> m (Array Word8)
forall a b. (a -> b) -> a -> b
$ Int -> Socket -> IO (Array Word8)
getChunk Int
size Socket
h
Step () (Array Word8) -> m (Step () (Array Word8))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step () (Array Word8) -> m (Step () (Array Word8)))
-> Step () (Array Word8) -> m (Step () (Array Word8))
forall a b. (a -> b) -> a -> b
$
case Array Word8 -> Int
forall a. Unbox a => Array a -> Int
A.length Array Word8
arr of
Int
0 -> Step () (Array Word8)
forall s a. Step s a
S.Stop
Int
_ -> Array Word8 -> () -> Step () (Array Word8)
forall s a. a -> s -> Step s a
S.Yield Array Word8
arr ()
{-# INLINE readChunks #-}
readChunks :: MonadIO m => Socket -> Stream m (Array Word8)
readChunks :: forall (m :: * -> *). MonadIO m => Socket -> Stream m (Array Word8)
readChunks = Int -> Socket -> Stream m (Array Word8)
forall (m :: * -> *).
MonadIO m =>
Int -> Socket -> Stream m (Array Word8)
readChunksWith Int
defaultChunkSize
{-# INLINE_NORMAL chunkReaderWith #-}
chunkReaderWith :: MonadIO m => Unfold m (Int, Socket) (Array Word8)
chunkReaderWith :: forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, Socket) (Array Word8)
chunkReaderWith = ((Int, Socket) -> m (Step (Int, Socket) (Array Word8)))
-> ((Int, Socket) -> m (Int, Socket))
-> Unfold m (Int, Socket) (Array Word8)
forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
Unfold (Int, Socket) -> m (Step (Int, Socket) (Array Word8))
forall {m :: * -> *}.
MonadIO m =>
(Int, Socket) -> m (Step (Int, Socket) (Array Word8))
step (Int, Socket) -> m (Int, Socket)
forall (m :: * -> *) a. Monad m => a -> m a
return
where
{-# INLINE_LATE step #-}
step :: (Int, Socket) -> m (Step (Int, Socket) (Array Word8))
step (Int
size, Socket
h) = do
Array Word8
arr <- IO (Array Word8) -> m (Array Word8)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Array Word8) -> m (Array Word8))
-> IO (Array Word8) -> m (Array Word8)
forall a b. (a -> b) -> a -> b
$ Int -> Socket -> IO (Array Word8)
getChunk Int
size Socket
h
Step (Int, Socket) (Array Word8)
-> m (Step (Int, Socket) (Array Word8))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Int, Socket) (Array Word8)
-> m (Step (Int, Socket) (Array Word8)))
-> Step (Int, Socket) (Array Word8)
-> m (Step (Int, Socket) (Array Word8))
forall a b. (a -> b) -> a -> b
$
case Array Word8 -> Int
forall a. Unbox a => Array a -> Int
A.length Array Word8
arr of
Int
0 -> Step (Int, Socket) (Array Word8)
forall s a. Step s a
S.Stop
Int
_ -> Array Word8 -> (Int, Socket) -> Step (Int, Socket) (Array Word8)
forall s a. a -> s -> Step s a
S.Yield Array Word8
arr (Int
size, Socket
h)
{-# DEPRECATED readChunksWithBufferOf "Please use 'chunkReaderWith' instead" #-}
{-# INLINE_NORMAL readChunksWithBufferOf #-}
readChunksWithBufferOf :: MonadIO m => Unfold m (Int, Socket) (Array Word8)
readChunksWithBufferOf :: forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, Socket) (Array Word8)
readChunksWithBufferOf = Unfold m (Int, Socket) (Array Word8)
forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, Socket) (Array Word8)
chunkReaderWith
{-# INLINE chunkReader #-}
chunkReader :: MonadIO m => Unfold m Socket (Array Word8)
chunkReader :: forall (m :: * -> *). MonadIO m => Unfold m Socket (Array Word8)
chunkReader = Int
-> Unfold m (Int, Socket) (Array Word8)
-> Unfold m Socket (Array Word8)
forall a (m :: * -> *) b c. a -> Unfold m (a, b) c -> Unfold m b c
UF.first Int
defaultChunkSize Unfold m (Int, Socket) (Array Word8)
forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, Socket) (Array Word8)
chunkReaderWith
{-# INLINE concatChunks #-}
concatChunks :: (Monad m, Unbox a) => Stream m (Array a) -> Stream m a
concatChunks :: forall (m :: * -> *) a.
(Monad m, Unbox a) =>
Stream m (Array a) -> Stream m a
concatChunks = Unfold m (Array a) a -> Stream m (Array a) -> Stream m a
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
S.unfoldMany Unfold m (Array a) a
forall (m :: * -> *) a. (Monad m, Unbox a) => Unfold m (Array a) a
A.reader
{-# INLINE readWith #-}
readWith :: MonadIO m => Int -> Socket -> Stream m Word8
readWith :: forall (m :: * -> *). MonadIO m => Int -> Socket -> Stream m Word8
readWith Int
size = Stream m (Array Word8) -> Stream m Word8
forall (m :: * -> *) a.
(Monad m, Unbox a) =>
Stream m (Array a) -> Stream m a
concatChunks (Stream m (Array Word8) -> Stream m Word8)
-> (Socket -> Stream m (Array Word8)) -> Socket -> Stream m Word8
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Socket -> Stream m (Array Word8)
forall (m :: * -> *).
MonadIO m =>
Int -> Socket -> Stream m (Array Word8)
readChunksWith Int
size
{-# INLINE read #-}
read :: MonadIO m => Socket -> Stream m Word8
read :: forall (m :: * -> *). MonadIO m => Socket -> Stream m Word8
read = Int -> Socket -> Stream m Word8
forall (m :: * -> *). MonadIO m => Int -> Socket -> Stream m Word8
readWith Int
defaultChunkSize
{-# INLINE readerWith #-}
readerWith :: MonadIO m => Unfold m (Int, Socket) Word8
readerWith :: forall (m :: * -> *). MonadIO m => Unfold m (Int, Socket) Word8
readerWith = Unfold m (Array Word8) Word8
-> Unfold m (Int, Socket) (Array Word8)
-> Unfold m (Int, Socket) Word8
forall (m :: * -> *) b c a.
Monad m =>
Unfold m b c -> Unfold m a b -> Unfold m a c
UF.many Unfold m (Array Word8) Word8
forall (m :: * -> *) a. (Monad m, Unbox a) => Unfold m (Array a) a
A.reader Unfold m (Int, Socket) (Array Word8)
forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, Socket) (Array Word8)
chunkReaderWith
{-# DEPRECATED readWithBufferOf "Please use 'readerWith' instead" #-}
{-# INLINE readWithBufferOf #-}
readWithBufferOf :: MonadIO m => Unfold m (Int, Socket) Word8
readWithBufferOf :: forall (m :: * -> *). MonadIO m => Unfold m (Int, Socket) Word8
readWithBufferOf = Unfold m (Int, Socket) Word8
forall (m :: * -> *). MonadIO m => Unfold m (Int, Socket) Word8
readerWith
{-# INLINE reader #-}
reader :: MonadIO m => Unfold m Socket Word8
reader :: forall (m :: * -> *). MonadIO m => Unfold m Socket Word8
reader = Int -> Unfold m (Int, Socket) Word8 -> Unfold m Socket Word8
forall a (m :: * -> *) b c. a -> Unfold m (a, b) c -> Unfold m b c
UF.first Int
defaultChunkSize Unfold m (Int, Socket) Word8
forall (m :: * -> *). MonadIO m => Unfold m (Int, Socket) Word8
readerWith
{-# INLINE putChunks #-}
putChunks :: (MonadIO m, Unbox a)
=> Socket -> Stream m (Array a) -> m ()
putChunks :: forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Socket -> Stream m (Array a) -> m ()
putChunks Socket
h = Fold m (Array a) () -> Stream m (Array a) -> m ()
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
S.fold ((Array a -> m ()) -> Fold m (Array a) ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> Fold m a ()
FL.drainMapM (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Array a -> IO ()) -> Array a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> Array a -> IO ()
forall a. Unbox a => Socket -> Array a -> IO ()
putChunk Socket
h))
{-# INLINE writeChunks #-}
writeChunks :: (MonadIO m, Unbox a) => Socket -> Fold m (Array a) ()
writeChunks :: forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Socket -> Fold m (Array a) ()
writeChunks Socket
h = (Array a -> m ()) -> Fold m (Array a) ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> Fold m a ()
FL.drainMapM (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Array a -> IO ()) -> Array a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> Array a -> IO ()
forall a. Unbox a => Socket -> Array a -> IO ()
putChunk Socket
h)
{-# INLINE writeChunksWith #-}
writeChunksWith :: (MonadIO m, Unbox a)
=> Int -> Socket -> Fold m (Array a) ()
writeChunksWith :: forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Socket -> Fold m (Array a) ()
writeChunksWith Int
n Socket
h = Int -> Fold m (Array a) () -> Fold m (Array a) ()
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Fold m (Array a) () -> Fold m (Array a) ()
A.lCompactGE Int
n (Socket -> Fold m (Array a) ()
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Socket -> Fold m (Array a) ()
writeChunks Socket
h)
{-# DEPRECATED writeChunksWithBufferOf "Please use 'writeChunksWith' instead" #-}
{-# INLINE writeChunksWithBufferOf #-}
writeChunksWithBufferOf :: (MonadIO m, Unbox a)
=> Int -> Socket -> Fold m (Array a) ()
writeChunksWithBufferOf :: forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Socket -> Fold m (Array a) ()
writeChunksWithBufferOf = Int -> Socket -> Fold m (Array a) ()
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Socket -> Fold m (Array a) ()
writeChunksWith
{-# INLINE putBytesWith #-}
putBytesWith :: MonadIO m => Int -> Socket -> Stream m Word8 -> m ()
putBytesWith :: forall (m :: * -> *).
MonadIO m =>
Int -> Socket -> Stream m Word8 -> m ()
putBytesWith Int
n Socket
h Stream m Word8
m = Socket -> Stream m (Array Word8) -> m ()
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Socket -> Stream m (Array a) -> m ()
putChunks Socket
h (Stream m (Array Word8) -> m ()) -> Stream m (Array Word8) -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> Stream m Word8 -> Stream m (Array Word8)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Stream m a -> Stream m (Array a)
A.pinnedChunksOf Int
n Stream m Word8
m
{-# INLINE writeWith #-}
writeWith :: MonadIO m => Int -> Socket -> Fold m Word8 ()
writeWith :: forall (m :: * -> *). MonadIO m => Int -> Socket -> Fold m Word8 ()
writeWith Int
n Socket
h = Int
-> Fold m Word8 (Array Word8)
-> Fold m (Array Word8) ()
-> Fold m Word8 ()
forall (m :: * -> *) a b c.
Monad m =>
Int -> Fold m a b -> Fold m b c -> Fold m a c
FL.groupsOf Int
n (Int -> Fold m Word8 (Array Word8)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Fold m a (Array a)
A.unsafePinnedCreateOf Int
n) (Socket -> Fold m (Array Word8) ()
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Socket -> Fold m (Array a) ()
writeChunks Socket
h)
{-# DEPRECATED writeWithBufferOf "Please use 'writeWith' instead" #-}
{-# INLINE writeWithBufferOf #-}
writeWithBufferOf :: MonadIO m => Int -> Socket -> Fold m Word8 ()
writeWithBufferOf :: forall (m :: * -> *). MonadIO m => Int -> Socket -> Fold m Word8 ()
writeWithBufferOf = Int -> Socket -> Fold m Word8 ()
forall (m :: * -> *). MonadIO m => Int -> Socket -> Fold m Word8 ()
writeWith
{-# INLINE writeMaybesWith #-}
writeMaybesWith :: (MonadIO m )
=> Int -> Socket -> Fold m (Maybe Word8) ()
writeMaybesWith :: forall (m :: * -> *).
MonadIO m =>
Int -> Socket -> Fold m (Maybe Word8) ()
writeMaybesWith Int
n Socket
h =
let writeNJusts :: Fold m (Maybe Word8) (Array Word8)
writeNJusts = (Maybe Word8 -> Word8)
-> Fold m Word8 (Array Word8) -> Fold m (Maybe Word8) (Array Word8)
forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
FL.lmap Maybe Word8 -> Word8
forall a. HasCallStack => Maybe a -> a
fromJust (Fold m Word8 (Array Word8) -> Fold m (Maybe Word8) (Array Word8))
-> Fold m Word8 (Array Word8) -> Fold m (Maybe Word8) (Array Word8)
forall a b. (a -> b) -> a -> b
$ Int -> Fold m Word8 (Array Word8)
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Fold m a (Array a)
A.pinnedCreateOf Int
n
writeOnNothing :: Fold m (Maybe Word8) (Array Word8)
writeOnNothing = (Maybe Word8 -> Bool)
-> Fold m (Maybe Word8) (Array Word8)
-> Fold m (Maybe Word8) (Array Word8)
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy_ Maybe Word8 -> Bool
forall a. Maybe a -> Bool
isNothing Fold m (Maybe Word8) (Array Word8)
writeNJusts
in Fold m (Maybe Word8) (Array Word8)
-> Fold m (Array Word8) () -> Fold m (Maybe Word8) ()
forall (m :: * -> *) a b c.
Monad m =>
Fold m a b -> Fold m b c -> Fold m a c
FL.many Fold m (Maybe Word8) (Array Word8)
writeOnNothing (Socket -> Fold m (Array Word8) ()
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Socket -> Fold m (Array a) ()
writeChunks Socket
h)
{-# INLINE putBytes #-}
putBytes :: MonadIO m => Socket -> Stream m Word8 -> m ()
putBytes :: forall (m :: * -> *). MonadIO m => Socket -> Stream m Word8 -> m ()
putBytes = Int -> Socket -> Stream m Word8 -> m ()
forall (m :: * -> *).
MonadIO m =>
Int -> Socket -> Stream m Word8 -> m ()
putBytesWith Int
defaultChunkSize
{-# INLINE write #-}
write :: MonadIO m => Socket -> Fold m Word8 ()
write :: forall (m :: * -> *). MonadIO m => Socket -> Fold m Word8 ()
write = Int -> Socket -> Fold m Word8 ()
forall (m :: * -> *). MonadIO m => Int -> Socket -> Fold m Word8 ()
writeWith Int
defaultChunkSize