#include "inline.hs"
module Streamly.Internal.Network.Inet.TCP
(
acceptOnAddr
, acceptOnAddrWith
, acceptOnPort
, acceptOnPortWith
, acceptOnPortLocal
, connectionsOnAddr
, connectionsOnAddrWith
, connectionsOnPort
, connectionsOnLocalHost
, connect
, withConnectionM
, usingConnection
, read
, withConnection
, toBytes
, write
, writeWithBufferOf
, putBytes
, putBytesWithBufferOf
, writeChunks
, putChunks
, processBytes
)
where
import Control.Exception (onException)
import Control.Monad.Catch (MonadCatch, MonadMask, bracket)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Word (Word8)
import Network.Socket
(Socket, PortNumber, SocketOption(..), Family(..), SockAddr(..),
SocketType(..), defaultProtocol, maxListenQueue, tupleToHostAddress,
socket)
import Prelude hiding (read)
import Streamly.Internal.Control.Concurrent (MonadAsync, fork)
import Streamly.Internal.Data.Array.Foreign.Type (Array(..), writeNUnsafe)
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Stream.IsStream.Type (IsStream)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))
import Streamly.Internal.Data.Unfold.Type (Unfold(..))
import Streamly.Internal.Network.Socket (SockSpec(..), accept, connections)
import Streamly.Internal.System.IO (defaultChunkSize)
import qualified Control.Monad.Catch as MC
import qualified Network.Socket as Net
import qualified Streamly.Internal.Data.Unfold as UF
import qualified Streamly.Internal.Data.Array.Foreign as A
import qualified Streamly.Internal.Data.Array.Stream.Foreign as AS
import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.Stream.IsStream as S
import qualified Streamly.Internal.Network.Socket as ISK
{-# INLINE acceptOnAddrWith #-}
acceptOnAddrWith
:: MonadIO m
=> [(SocketOption, Int)]
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddrWith :: [(SocketOption, Int)]
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddrWith [(SocketOption, Int)]
opts = (((Word8, Word8, Word8, Word8), PortNumber)
-> (Int, SockSpec, SockAddr))
-> Unfold m (Int, SockSpec, SockAddr) Socket
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
forall a c (m :: * -> *) b.
(a -> c) -> Unfold m c b -> Unfold m a b
UF.lmap ((Word8, Word8, Word8, Word8), PortNumber)
-> (Int, SockSpec, SockAddr)
f Unfold m (Int, SockSpec, SockAddr) Socket
forall (m :: * -> *).
MonadIO m =>
Unfold m (Int, SockSpec, SockAddr) Socket
accept
where
f :: ((Word8, Word8, Word8, Word8), PortNumber)
-> (Int, SockSpec, SockAddr)
f ((Word8, Word8, Word8, Word8)
addr, PortNumber
port) =
(Int
maxListenQueue
, SockSpec :: Family
-> SocketType
-> ProtocolNumber
-> [(SocketOption, Int)]
-> SockSpec
SockSpec
{ sockFamily :: Family
sockFamily = Family
AF_INET
, sockType :: SocketType
sockType = SocketType
Stream
, sockProto :: ProtocolNumber
sockProto = ProtocolNumber
defaultProtocol
, sockOpts :: [(SocketOption, Int)]
sockOpts = [(SocketOption, Int)]
opts
}
, PortNumber -> HostAddress -> SockAddr
SockAddrInet PortNumber
port ((Word8, Word8, Word8, Word8) -> HostAddress
tupleToHostAddress (Word8, Word8, Word8, Word8)
addr)
)
{-# INLINE acceptOnAddr #-}
acceptOnAddr
:: MonadIO m
=> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddr :: Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddr = [(SocketOption, Int)]
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
forall (m :: * -> *).
MonadIO m =>
[(SocketOption, Int)]
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddrWith []
{-# INLINE acceptOnPortWith #-}
acceptOnPortWith :: MonadIO m
=> [(SocketOption, Int)]
-> Unfold m PortNumber Socket
acceptOnPortWith :: [(SocketOption, Int)] -> Unfold m PortNumber Socket
acceptOnPortWith [(SocketOption, Int)]
opts = (Word8, Word8, Word8, Word8)
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
-> Unfold m PortNumber Socket
forall a (m :: * -> *) b c. a -> Unfold m (a, b) c -> Unfold m b c
UF.supplyFirst (Word8
0,Word8
0,Word8
0,Word8
0) ([(SocketOption, Int)]
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
forall (m :: * -> *).
MonadIO m =>
[(SocketOption, Int)]
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddrWith [(SocketOption, Int)]
opts)
{-# INLINE acceptOnPort #-}
acceptOnPort :: MonadIO m => Unfold m PortNumber Socket
acceptOnPort :: Unfold m PortNumber Socket
acceptOnPort = (Word8, Word8, Word8, Word8)
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
-> Unfold m PortNumber Socket
forall a (m :: * -> *) b c. a -> Unfold m (a, b) c -> Unfold m b c
UF.supplyFirst (Word8
0,Word8
0,Word8
0,Word8
0) Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
forall (m :: * -> *).
MonadIO m =>
Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddr
{-# INLINE acceptOnPortLocal #-}
acceptOnPortLocal :: MonadIO m => Unfold m PortNumber Socket
acceptOnPortLocal :: Unfold m PortNumber Socket
acceptOnPortLocal = (Word8, Word8, Word8, Word8)
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
-> Unfold m PortNumber Socket
forall a (m :: * -> *) b c. a -> Unfold m (a, b) c -> Unfold m b c
UF.supplyFirst (Word8
127,Word8
0,Word8
0,Word8
1) Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
forall (m :: * -> *).
MonadIO m =>
Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddr
{-# INLINE connectionsOnAddrWith #-}
connectionsOnAddrWith
:: MonadAsync m
=> [(SocketOption, Int)]
-> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Socket
connectionsOnAddrWith :: [(SocketOption, Int)]
-> (Word8, Word8, Word8, Word8) -> PortNumber -> SerialT m Socket
connectionsOnAddrWith [(SocketOption, Int)]
opts (Word8, Word8, Word8, Word8)
addr PortNumber
port =
Int -> SockSpec -> SockAddr -> SerialT m Socket
forall (m :: * -> *).
MonadAsync m =>
Int -> SockSpec -> SockAddr -> SerialT m Socket
connections Int
maxListenQueue SockSpec :: Family
-> SocketType
-> ProtocolNumber
-> [(SocketOption, Int)]
-> SockSpec
SockSpec
{ sockFamily :: Family
sockFamily = Family
AF_INET
, sockType :: SocketType
sockType = SocketType
Stream
, sockProto :: ProtocolNumber
sockProto = ProtocolNumber
defaultProtocol
, sockOpts :: [(SocketOption, Int)]
sockOpts = [(SocketOption, Int)]
opts
}
(PortNumber -> HostAddress -> SockAddr
SockAddrInet PortNumber
port ((Word8, Word8, Word8, Word8) -> HostAddress
tupleToHostAddress (Word8, Word8, Word8, Word8)
addr))
{-# INLINE connectionsOnAddr #-}
connectionsOnAddr
:: MonadAsync m
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Socket
connectionsOnAddr :: (Word8, Word8, Word8, Word8) -> PortNumber -> SerialT m Socket
connectionsOnAddr = [(SocketOption, Int)]
-> (Word8, Word8, Word8, Word8) -> PortNumber -> SerialT m Socket
forall (m :: * -> *).
MonadAsync m =>
[(SocketOption, Int)]
-> (Word8, Word8, Word8, Word8) -> PortNumber -> SerialT m Socket
connectionsOnAddrWith []
{-# INLINE connectionsOnPort #-}
connectionsOnPort :: MonadAsync m => PortNumber -> SerialT m Socket
connectionsOnPort :: PortNumber -> SerialT m Socket
connectionsOnPort = (Word8, Word8, Word8, Word8) -> PortNumber -> SerialT m Socket
forall (m :: * -> *).
MonadAsync m =>
(Word8, Word8, Word8, Word8) -> PortNumber -> SerialT m Socket
connectionsOnAddr (Word8
0,Word8
0,Word8
0,Word8
0)
{-# INLINE connectionsOnLocalHost #-}
connectionsOnLocalHost :: MonadAsync m => PortNumber -> SerialT m Socket
connectionsOnLocalHost :: PortNumber -> SerialT m Socket
connectionsOnLocalHost = (Word8, Word8, Word8, Word8) -> PortNumber -> SerialT m Socket
forall (m :: * -> *).
MonadAsync m =>
(Word8, Word8, Word8, Word8) -> PortNumber -> SerialT m Socket
connectionsOnAddr (Word8
127,Word8
0,Word8
0,Word8
1)
connect :: (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect :: (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port = do
Socket
sock <- Family -> SocketType -> ProtocolNumber -> IO Socket
socket Family
AF_INET SocketType
Stream ProtocolNumber
defaultProtocol
Socket -> SockAddr -> IO ()
Net.connect Socket
sock (PortNumber -> HostAddress -> SockAddr
SockAddrInet PortNumber
port ((Word8, Word8, Word8, Word8) -> HostAddress
Net.tupleToHostAddress (Word8, Word8, Word8, Word8)
addr))
IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`onException` Socket -> IO ()
Net.close Socket
sock
Socket -> IO Socket
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sock
{-# INLINABLE withConnectionM #-}
withConnectionM :: (MonadMask m, MonadIO m)
=> (Word8, Word8, Word8, Word8) -> PortNumber -> (Socket -> m ()) -> m ()
withConnectionM :: (Word8, Word8, Word8, Word8)
-> PortNumber -> (Socket -> m ()) -> m ()
withConnectionM (Word8, Word8, Word8, Word8)
addr PortNumber
port =
m Socket -> (Socket -> m ()) -> (Socket -> m ()) -> m ()
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket (IO Socket -> m Socket
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Socket -> m Socket) -> IO Socket -> m Socket
forall a b. (a -> b) -> a -> b
$ (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port) (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Socket -> IO ()) -> Socket -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> IO ()
Net.close)
{-# INLINABLE usingConnection #-}
usingConnection :: (MonadCatch m, MonadAsync m)
=> Unfold m Socket a
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) a
usingConnection :: Unfold m Socket a
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) a
usingConnection =
(((Word8, Word8, Word8, Word8), PortNumber) -> m Socket)
-> (Socket -> m ())
-> Unfold m Socket a
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) a
forall (m :: * -> *) a c d b.
(MonadAsync m, MonadCatch m) =>
(a -> m c) -> (c -> m d) -> Unfold m c b -> Unfold m a b
UF.bracket (\((Word8, Word8, Word8, Word8)
addr, PortNumber
port) -> IO Socket -> m Socket
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Socket -> m Socket) -> IO Socket -> m Socket
forall a b. (a -> b) -> a -> b
$ (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port)
(IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Socket -> IO ()) -> Socket -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> IO ()
Net.close)
{-# INLINABLE withConnection #-}
withConnection :: (IsStream t, MonadCatch m, MonadAsync m)
=> (Word8, Word8, Word8, Word8) -> PortNumber -> (Socket -> t m a) -> t m a
withConnection :: (Word8, Word8, Word8, Word8)
-> PortNumber -> (Socket -> t m a) -> t m a
withConnection (Word8, Word8, Word8, Word8)
addr PortNumber
port =
m Socket -> (Socket -> m ()) -> (Socket -> t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b c a.
(IsStream t, MonadAsync m, MonadCatch m) =>
m b -> (b -> m c) -> (b -> t m a) -> t m a
S.bracket (IO Socket -> m Socket
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Socket -> m Socket) -> IO Socket -> m Socket
forall a b. (a -> b) -> a -> b
$ (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port) (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Socket -> IO ()) -> Socket -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Socket -> IO ()
Net.close)
{-# INLINE read #-}
read :: (MonadCatch m, MonadAsync m)
=> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Word8
read :: Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Word8
read = Unfold m ((Word8, Word8, Word8, Word8), PortNumber) (Array Word8)
-> Unfold m (Array Word8) Word8
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Word8
forall (m :: * -> *) a b c.
Monad m =>
Unfold m a b -> Unfold m b c -> Unfold m a c
UF.many (Unfold m Socket (Array Word8)
-> Unfold
m ((Word8, Word8, Word8, Word8), PortNumber) (Array Word8)
forall (m :: * -> *) a.
(MonadCatch m, MonadAsync m) =>
Unfold m Socket a
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) a
usingConnection Unfold m Socket (Array Word8)
forall (m :: * -> *). MonadIO m => Unfold m Socket (Array Word8)
ISK.readChunks) Unfold m (Array Word8) Word8
forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
A.read
{-# INLINE toBytes #-}
toBytes :: (IsStream t, MonadCatch m, MonadAsync m)
=> (Word8, Word8, Word8, Word8) -> PortNumber -> t m Word8
toBytes :: (Word8, Word8, Word8, Word8) -> PortNumber -> t m Word8
toBytes (Word8, Word8, Word8, Word8)
addr PortNumber
port = t m (Array Word8) -> t m Word8
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
t m (Array a) -> t m a
AS.concat (t m (Array Word8) -> t m Word8) -> t m (Array Word8) -> t m Word8
forall a b. (a -> b) -> a -> b
$ (Word8, Word8, Word8, Word8)
-> PortNumber -> (Socket -> t m (Array Word8)) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> (Socket -> t m a) -> t m a
withConnection (Word8, Word8, Word8, Word8)
addr PortNumber
port Socket -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Socket -> t m (Array Word8)
ISK.toChunks
{-# INLINE putChunks #-}
putChunks
:: (MonadCatch m, MonadAsync m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m (Array Word8)
-> m ()
putChunks :: (Word8, Word8, Word8, Word8)
-> PortNumber -> SerialT m (Array Word8) -> m ()
putChunks (Word8, Word8, Word8, Word8)
addr PortNumber
port SerialT m (Array Word8)
xs =
SerialT m () -> m ()
forall (m :: * -> *) a. Monad m => SerialT m a -> m ()
S.drain (SerialT m () -> m ()) -> SerialT m () -> m ()
forall a b. (a -> b) -> a -> b
$ (Word8, Word8, Word8, Word8)
-> PortNumber -> (Socket -> SerialT m ()) -> SerialT m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> (Socket -> t m a) -> t m a
withConnection (Word8, Word8, Word8, Word8)
addr PortNumber
port (\Socket
sk -> m () -> SerialT m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
S.fromEffect (m () -> SerialT m ()) -> m () -> SerialT m ()
forall a b. (a -> b) -> a -> b
$ Socket -> SerialT m (Array Word8) -> m ()
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Socket -> SerialT m (Array a) -> m ()
ISK.putChunks Socket
sk SerialT m (Array Word8)
xs)
{-# INLINE writeChunks #-}
writeChunks
:: (MonadAsync m, MonadCatch m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Fold m (Array Word8) ()
writeChunks :: (Word8, Word8, Word8, Word8)
-> PortNumber -> Fold m (Array Word8) ()
writeChunks (Word8, Word8, Word8, Word8)
addr PortNumber
port = (Tuple' (Fold m (Array Word8) ()) Socket
-> Array Word8
-> m (Step (Tuple' (Fold m (Array Word8) ()) Socket) ()))
-> m (Step (Tuple' (Fold m (Array Word8) ()) Socket) ())
-> (Tuple' (Fold m (Array Word8) ()) Socket -> m ())
-> Fold m (Array Word8) ()
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold Tuple' (Fold m (Array Word8) ()) Socket
-> Array Word8
-> m (Step (Tuple' (Fold m (Array Word8) ()) Socket) ())
forall (m :: * -> *) a b b.
(MonadCatch m, MonadIO m) =>
Tuple' (Fold m a b) Socket
-> a -> m (Step (Tuple' (Fold m a b) Socket) b)
step m (Step (Tuple' (Fold m (Array Word8) ()) Socket) ())
forall b. m (Step (Tuple' (Fold m (Array Word8) ()) Socket) b)
initial Tuple' (Fold m (Array Word8) ()) Socket -> m ()
forall (m :: * -> *) a b.
MonadIO m =>
Tuple' (Fold m a b) Socket -> m b
extract
where
initial :: m (Step (Tuple' (Fold m (Array Word8) ()) Socket) b)
initial = do
Socket
skt <- IO Socket -> m Socket
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ((Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port)
Fold m (Array Word8) ()
fld <- Fold m (Array Word8) () -> m (Fold m (Array Word8) ())
forall (m :: * -> *) a b. Monad m => Fold m a b -> m (Fold m a b)
FL.initialize (Socket -> Fold m (Array Word8) ()
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Socket -> Fold m (Array a) ()
ISK.writeChunks Socket
skt)
m (Fold m (Array Word8) ()) -> m () -> m (Fold m (Array Word8) ())
forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`MC.onException` IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO ()
Net.close Socket
skt)
Step (Tuple' (Fold m (Array Word8) ()) Socket) b
-> m (Step (Tuple' (Fold m (Array Word8) ()) Socket) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Fold m (Array Word8) ()) Socket) b
-> m (Step (Tuple' (Fold m (Array Word8) ()) Socket) b))
-> Step (Tuple' (Fold m (Array Word8) ()) Socket) b
-> m (Step (Tuple' (Fold m (Array Word8) ()) Socket) b)
forall a b. (a -> b) -> a -> b
$ Tuple' (Fold m (Array Word8) ()) Socket
-> Step (Tuple' (Fold m (Array Word8) ()) Socket) b
forall s b. s -> Step s b
FL.Partial (Fold m (Array Word8) ()
-> Socket -> Tuple' (Fold m (Array Word8) ()) Socket
forall a b. a -> b -> Tuple' a b
Tuple' Fold m (Array Word8) ()
fld Socket
skt)
step :: Tuple' (Fold m a b) Socket
-> a -> m (Step (Tuple' (Fold m a b) Socket) b)
step (Tuple' Fold m a b
fld Socket
skt) a
x = do
Fold m a b
r <- Fold m a b -> a -> m (Fold m a b)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> a -> m (Fold m a b)
FL.snoc Fold m a b
fld a
x m (Fold m a b) -> m () -> m (Fold m a b)
forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`MC.onException` IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Socket -> IO ()
Net.close Socket
skt)
Step (Tuple' (Fold m a b) Socket) b
-> m (Step (Tuple' (Fold m a b) Socket) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' (Fold m a b) Socket) b
-> m (Step (Tuple' (Fold m a b) Socket) b))
-> Step (Tuple' (Fold m a b) Socket) b
-> m (Step (Tuple' (Fold m a b) Socket) b)
forall a b. (a -> b) -> a -> b
$ Tuple' (Fold m a b) Socket -> Step (Tuple' (Fold m a b) Socket) b
forall s b. s -> Step s b
FL.Partial (Fold m a b -> Socket -> Tuple' (Fold m a b) Socket
forall a b. a -> b -> Tuple' a b
Tuple' Fold m a b
r Socket
skt)
extract :: Tuple' (Fold m a b) Socket -> m b
extract (Tuple' (Fold s -> a -> m (Step s b)
_ m (Step s b)
initial1 s -> m b
extract1) Socket
skt) = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Socket -> IO ()
Net.close Socket
skt
Step s b
res <- m (Step s b)
initial1
case Step s b
res of
FL.Partial s
fs -> s -> m b
extract1 s
fs
FL.Done b
fb -> b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
fb
{-# INLINE putBytesWithBufferOf #-}
putBytesWithBufferOf
:: (MonadCatch m, MonadAsync m)
=> Int
-> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Word8
-> m ()
putBytesWithBufferOf :: Int
-> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Word8
-> m ()
putBytesWithBufferOf Int
n (Word8, Word8, Word8, Word8)
addr PortNumber
port SerialT m Word8
m = (Word8, Word8, Word8, Word8)
-> PortNumber -> SerialT m (Array Word8) -> m ()
forall (m :: * -> *).
(MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> SerialT m (Array Word8) -> m ()
putChunks (Word8, Word8, Word8, Word8)
addr PortNumber
port (SerialT m (Array Word8) -> m ())
-> SerialT m (Array Word8) -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> SerialT m Word8 -> SerialT m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
Int -> t m a -> t m (Array a)
AS.arraysOf Int
n SerialT m Word8
m
{-# INLINE writeWithBufferOf #-}
writeWithBufferOf
:: (MonadAsync m, MonadCatch m)
=> Int
-> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Fold m Word8 ()
writeWithBufferOf :: Int
-> (Word8, Word8, Word8, Word8) -> PortNumber -> Fold m Word8 ()
writeWithBufferOf Int
n (Word8, Word8, Word8, Word8)
addr PortNumber
port =
Int
-> Fold m Word8 (Array Word8)
-> Fold m (Array Word8) ()
-> Fold m Word8 ()
forall (m :: * -> *) a b c.
Monad m =>
Int -> Fold m a b -> Fold m b c -> Fold m a c
FL.chunksOf Int
n (Int -> Fold m Word8 (Array Word8)
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Fold m a (Array a)
writeNUnsafe Int
n) ((Word8, Word8, Word8, Word8)
-> PortNumber -> Fold m (Array Word8) ()
forall (m :: * -> *).
(MonadAsync m, MonadCatch m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> Fold m (Array Word8) ()
writeChunks (Word8, Word8, Word8, Word8)
addr PortNumber
port)
{-# INLINE putBytes #-}
putBytes :: (MonadCatch m, MonadAsync m)
=> (Word8, Word8, Word8, Word8) -> PortNumber -> SerialT m Word8 -> m ()
putBytes :: (Word8, Word8, Word8, Word8)
-> PortNumber -> SerialT m Word8 -> m ()
putBytes = Int
-> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Word8
-> m ()
forall (m :: * -> *).
(MonadCatch m, MonadAsync m) =>
Int
-> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Word8
-> m ()
putBytesWithBufferOf Int
defaultChunkSize
{-# INLINE write #-}
write :: (MonadAsync m, MonadCatch m)
=> (Word8, Word8, Word8, Word8) -> PortNumber -> Fold m Word8 ()
write :: (Word8, Word8, Word8, Word8) -> PortNumber -> Fold m Word8 ()
write = Int
-> (Word8, Word8, Word8, Word8) -> PortNumber -> Fold m Word8 ()
forall (m :: * -> *).
(MonadAsync m, MonadCatch m) =>
Int
-> (Word8, Word8, Word8, Word8) -> PortNumber -> Fold m Word8 ()
writeWithBufferOf Int
defaultChunkSize
{-# INLINABLE withInputConnect #-}
withInputConnect
:: (IsStream t, MonadCatch m, MonadAsync m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Word8
-> (Socket -> t m a)
-> t m a
withInputConnect :: (Word8, Word8, Word8, Word8)
-> PortNumber -> SerialT m Word8 -> (Socket -> t m a) -> t m a
withInputConnect (Word8, Word8, Word8, Word8)
addr PortNumber
port SerialT m Word8
input Socket -> t m a
f = m (Socket, ThreadId)
-> ((Socket, ThreadId) -> m ())
-> ((Socket, ThreadId) -> t m a)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b c a.
(IsStream t, MonadAsync m, MonadCatch m) =>
m b -> (b -> m c) -> (b -> t m a) -> t m a
S.bracket m (Socket, ThreadId)
pre (Socket, ThreadId) -> m ()
forall (m :: * -> *) b. MonadIO m => (Socket, b) -> m ()
post (Socket, ThreadId) -> t m a
forall b. (Socket, b) -> t m a
handler
where
pre :: m (Socket, ThreadId)
pre = do
Socket
sk <- IO Socket -> m Socket
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Socket -> m Socket) -> IO Socket -> m Socket
forall a b. (a -> b) -> a -> b
$ (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect (Word8, Word8, Word8, Word8)
addr PortNumber
port
ThreadId
tid <- m () -> m ThreadId
forall (m :: * -> *). MonadBaseControl IO m => m () -> m ThreadId
fork (Socket -> SerialT m Word8 -> m ()
forall (m :: * -> *).
MonadIO m =>
Socket -> SerialT m Word8 -> m ()
ISK.putBytes Socket
sk SerialT m Word8
input)
(Socket, ThreadId) -> m (Socket, ThreadId)
forall (m :: * -> *) a. Monad m => a -> m a
return (Socket
sk, ThreadId
tid)
handler :: (Socket, b) -> t m a
handler (Socket
sk, b
_) = Socket -> t m a
f Socket
sk
post :: (Socket, b) -> m ()
post (Socket
sk, b
_) = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Socket -> IO ()
Net.close Socket
sk
{-# INLINABLE processBytes #-}
processBytes
:: (IsStream t, MonadAsync m, MonadCatch m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Word8
-> t m Word8
processBytes :: (Word8, Word8, Word8, Word8)
-> PortNumber -> SerialT m Word8 -> t m Word8
processBytes (Word8, Word8, Word8, Word8)
addr PortNumber
port SerialT m Word8
input = (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Word8
-> (Socket -> t m Word8)
-> t m Word8
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadCatch m, MonadAsync m) =>
(Word8, Word8, Word8, Word8)
-> PortNumber -> SerialT m Word8 -> (Socket -> t m a) -> t m a
withInputConnect (Word8, Word8, Word8, Word8)
addr PortNumber
port SerialT m Word8
input Socket -> t m Word8
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Socket -> t m Word8
ISK.toBytes