{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
#include "inline.hs"
module Streamly.Internal.Network.Inet.TCP
(
acceptOnAddr
, acceptOnAddrWith
, acceptOnPort
, acceptOnPortWith
, acceptOnPortLocal
, connectionsOnAddr
, connectionsOnAddrWith
, connectionsOnPort
, connectionsOnLocalHost
, connect
, withConnectionM
, usingConnection
, read
, withConnection
, toBytes
, write
, writeWithBufferOf
, fromBytes
, fromBytesWithBufferOf
, writeChunks
, fromChunks
, transformBytesWith
)
where
import Control.Monad.Catch (MonadCatch, MonadMask, bracket)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Word (Word8)
import Network.Socket
(Socket, PortNumber, SocketOption(..), Family(..), SockAddr(..),
SocketType(..), defaultProtocol, maxListenQueue, tupleToHostAddress,
socket)
import Prelude hiding (read)
import Streamly (MonadAsync)
import Streamly.Internal.Data.Fold.Types (Fold(..))
import Streamly.Internal.Data.SVar (fork)
import Streamly.Internal.Data.Unfold.Types (Unfold(..))
import Streamly.Internal.Network.Socket (SockSpec(..), accept, connections)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Memory.Array.Types (Array(..), defaultChunkSize, writeNUnsafe)
import Streamly.Internal.Data.Stream.StreamK.Type (IsStream)
import qualified Control.Monad.Catch as MC
import qualified Network.Socket as Net
import qualified Streamly.Internal.Data.Unfold as UF
import qualified Streamly.Internal.Memory.Array as A
import qualified Streamly.Internal.Memory.ArrayStream as AS
import qualified Streamly.Internal.Data.Fold.Types as FL
import qualified Streamly.Prelude as S
import qualified Streamly.Network.Socket as SK
import qualified Streamly.Internal.Network.Socket as ISK
{-# INLINE acceptOnAddrWith #-}
acceptOnAddrWith
:: MonadIO m
=> [(SocketOption, Int)]
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddrWith opts = UF.lmap f accept
where
f (addr, port) =
(maxListenQueue
, SockSpec
{ sockFamily = AF_INET
, sockType = Stream
, sockProto = defaultProtocol
, sockOpts = opts
}
, SockAddrInet port (tupleToHostAddress addr)
)
{-# INLINE acceptOnAddr #-}
acceptOnAddr
:: MonadIO m
=> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Socket
acceptOnAddr = acceptOnAddrWith []
{-# INLINE acceptOnPortWith #-}
acceptOnPortWith :: MonadIO m
=> [(SocketOption, Int)]
-> Unfold m PortNumber Socket
acceptOnPortWith opts = UF.supplyFirst (acceptOnAddrWith opts) (0,0,0,0)
{-# INLINE acceptOnPort #-}
acceptOnPort :: MonadIO m => Unfold m PortNumber Socket
acceptOnPort = UF.supplyFirst acceptOnAddr (0,0,0,0)
{-# INLINE acceptOnPortLocal #-}
acceptOnPortLocal :: MonadIO m => Unfold m PortNumber Socket
acceptOnPortLocal = UF.supplyFirst acceptOnAddr (127,0,0,1)
{-# INLINE connectionsOnAddrWith #-}
connectionsOnAddrWith
:: MonadAsync m
=> [(SocketOption, Int)]
-> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Socket
connectionsOnAddrWith opts addr port =
connections maxListenQueue SockSpec
{ sockFamily = AF_INET
, sockType = Stream
, sockProto = defaultProtocol
, sockOpts = opts
}
(SockAddrInet port (tupleToHostAddress addr))
{-# INLINE connectionsOnAddr #-}
connectionsOnAddr
:: MonadAsync m
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Socket
connectionsOnAddr = connectionsOnAddrWith []
{-# INLINE connectionsOnPort #-}
connectionsOnPort :: MonadAsync m => PortNumber -> SerialT m Socket
connectionsOnPort = connectionsOnAddr (0,0,0,0)
{-# INLINE connectionsOnLocalHost #-}
connectionsOnLocalHost :: MonadAsync m => PortNumber -> SerialT m Socket
connectionsOnLocalHost = connectionsOnAddr (127,0,0,1)
connect :: (Word8, Word8, Word8, Word8) -> PortNumber -> IO Socket
connect addr port = do
sock <- socket AF_INET Stream defaultProtocol
Net.connect sock $ SockAddrInet port (Net.tupleToHostAddress addr)
return sock
{-# INLINABLE withConnectionM #-}
withConnectionM :: (MonadMask m, MonadIO m)
=> (Word8, Word8, Word8, Word8) -> PortNumber -> (Socket -> m ()) -> m ()
withConnectionM addr port =
bracket (liftIO $ connect addr port) (liftIO . Net.close)
{-# INLINABLE usingConnection #-}
usingConnection :: (MonadCatch m, MonadIO m)
=> Unfold m Socket a
-> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) a
usingConnection =
UF.bracket (\(addr, port) -> liftIO $ connect addr port)
(liftIO . Net.close)
{-# INLINABLE withConnection #-}
withConnection :: (IsStream t, MonadCatch m, MonadIO m)
=> (Word8, Word8, Word8, Word8) -> PortNumber -> (Socket -> t m a) -> t m a
withConnection addr port =
S.bracket (liftIO $ connect addr port) (liftIO . Net.close)
{-# INLINE read #-}
read :: (MonadCatch m, MonadIO m)
=> Unfold m ((Word8, Word8, Word8, Word8), PortNumber) Word8
read = UF.concat (usingConnection ISK.readChunks) A.read
{-# INLINE toBytes #-}
toBytes :: (IsStream t, MonadCatch m, MonadIO m)
=> (Word8, Word8, Word8, Word8) -> PortNumber -> t m Word8
toBytes addr port = AS.concat $ withConnection addr port ISK.toChunks
{-# INLINE fromChunks #-}
fromChunks
:: (MonadCatch m, MonadAsync m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m (Array Word8)
-> m ()
fromChunks addr port xs =
S.drain $ withConnection addr port (\sk -> S.yieldM $ ISK.fromChunks sk xs)
{-# INLINE writeChunks #-}
writeChunks
:: (MonadAsync m, MonadCatch m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Fold m (Array Word8) ()
writeChunks addr port = Fold step initial extract
where
initial = do
skt <- liftIO (connect addr port)
fld <- FL.initialize (SK.writeChunks skt) `MC.onException` liftIO (Net.close skt)
return (fld, skt)
step (fld, skt) x = do
r <- FL.runStep fld x `MC.onException` liftIO (Net.close skt)
return (r, skt)
extract (Fold _ initial1 extract1, skt) = do
liftIO $ Net.close skt
initial1 >>= extract1
{-# INLINE fromBytesWithBufferOf #-}
fromBytesWithBufferOf
:: (MonadCatch m, MonadAsync m)
=> Int
-> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Word8
-> m ()
fromBytesWithBufferOf n addr port m = fromChunks addr port $ AS.arraysOf n m
{-# INLINE writeWithBufferOf #-}
writeWithBufferOf
:: (MonadAsync m, MonadCatch m)
=> Int
-> (Word8, Word8, Word8, Word8)
-> PortNumber
-> Fold m Word8 ()
writeWithBufferOf n addr port =
FL.lchunksOf n (writeNUnsafe n) (writeChunks addr port)
{-# INLINE fromBytes #-}
fromBytes :: (MonadCatch m, MonadAsync m)
=> (Word8, Word8, Word8, Word8) -> PortNumber -> SerialT m Word8 -> m ()
fromBytes = fromBytesWithBufferOf defaultChunkSize
{-# INLINE write #-}
write :: (MonadAsync m, MonadCatch m)
=> (Word8, Word8, Word8, Word8) -> PortNumber -> Fold m Word8 ()
write = writeWithBufferOf defaultChunkSize
{-# INLINABLE withInputConnect #-}
withInputConnect
:: (IsStream t, MonadCatch m, MonadAsync m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Word8
-> (Socket -> t m a)
-> t m a
withInputConnect addr port input f = S.bracket pre post handler
where
pre = do
sk <- liftIO $ connect addr port
tid <- fork (ISK.fromBytes sk input)
return (sk, tid)
handler (sk, _) = f sk
post (sk, _) = liftIO $ Net.close sk
{-# INLINABLE transformBytesWith #-}
transformBytesWith
:: (IsStream t, MonadAsync m, MonadCatch m)
=> (Word8, Word8, Word8, Word8)
-> PortNumber
-> SerialT m Word8
-> t m Word8
transformBytesWith addr port input =
withInputConnect addr port input ISK.toBytes