Copyright | (c) 2018 Composewell Technologies |
---|---|
License | BSD-3-Clause |
Maintainer | streamly@composewell.com |
Stability | released |
Portability | GHC |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
This module provides socket based streaming APIs to to receive connections from remote hosts, and to read and write from and to network sockets.
For basic socket types and non-streaming operations please consult the Network.Socket
module of the network package.
Examples
To write a server, use the accept
stream to start listening for
connections from clients. accept
generates a stream of connected
sockets. We can map an effectful action on this socket stream to handle the
connections. The action would typically use socket reading and writing
operations to communicate with the remote host. We can read/write a stream
of bytes or a stream of chunks of bytes (Array
).
Following is a short example of a concurrent echo server. Please note that this example can be written even more succinctly by using higher level operations from Streamly.Network.Inet.TCP module.
>>>
:set -XFlexibleContexts
>>>
>>>
import Data.Function ((&))
>>>
import Network.Socket
>>>
import Streamly.Network.Socket (SockSpec(..))
>>>
>>>
import qualified Streamly.Data.Fold as Fold
>>>
import qualified Streamly.Data.Stream.Prelude as Stream
>>>
import qualified Streamly.Network.Socket as Socket
>>>
>>>
:{
main :: IO () main = do let spec = SockSpec { sockFamily = AF_INET , sockType = Stream , sockProto = defaultProtocol , sockOpts = [] } addr = SockAddrInet 8090 (tupleToHostAddress (0,0,0,0)) in server spec addr where server spec addr = Socket.accept maxListenQueue spec addr & Stream.parMapM (Stream.eager True) (Socket.forSocketM echo) & Stream.fold Fold.drain echo sk = Socket.readChunks sk -- Stream IO (Array Word8) & Stream.fold (Socket.writeChunks sk) -- IO () :}
Programmer Notes
Read IO requests to connected stream sockets are performed in chunks of
defaultChunkSize
. Unless
specified otherwise in the API, writes are collected into chunks of
defaultChunkSize
before they are written to
the socket.
>>>
import qualified Streamly.Network.Socket as Socket
See Also
Synopsis
- data SockSpec = SockSpec {
- sockFamily :: !Family
- sockType :: !SocketType
- sockProto :: !ProtocolNumber
- sockOpts :: ![(SocketOption, Int)]
- accept :: MonadIO m => Int -> SockSpec -> SockAddr -> Stream m Socket
- acceptor :: MonadIO m => Unfold m (Int, SockSpec, SockAddr) Socket
- getChunk :: Int -> Socket -> IO (Array Word8)
- read :: MonadIO m => Socket -> Stream m Word8
- readWith :: MonadIO m => Int -> Socket -> Stream m Word8
- readChunks :: MonadIO m => Socket -> Stream m (Array Word8)
- readChunksWith :: MonadIO m => Int -> Socket -> Stream m (Array Word8)
- reader :: MonadIO m => Unfold m Socket Word8
- readerWith :: MonadIO m => Unfold m (Int, Socket) Word8
- chunkReader :: MonadIO m => Unfold m Socket (Array Word8)
- chunkReaderWith :: MonadIO m => Unfold m (Int, Socket) (Array Word8)
- putChunk :: Unbox a => Socket -> Array a -> IO ()
- write :: MonadIO m => Socket -> Fold m Word8 ()
- writeWith :: MonadIO m => Int -> Socket -> Fold m Word8 ()
- writeChunks :: (MonadIO m, Unbox a) => Socket -> Fold m (Array a) ()
- writeChunksWith :: (MonadIO m, Unbox a) => Int -> Socket -> Fold m (Array a) ()
- forSocketM :: (MonadMask m, MonadIO m) => (Socket -> m ()) -> Socket -> m ()
- readChunk :: Int -> Socket -> IO (Array Word8)
- writeChunk :: Unbox a => Socket -> Array a -> IO ()
- readWithBufferOf :: MonadIO m => Unfold m (Int, Socket) Word8
- readChunksWithBufferOf :: MonadIO m => Unfold m (Int, Socket) (Array Word8)
- writeWithBufferOf :: MonadIO m => Int -> Socket -> Fold m Word8 ()
- writeChunksWithBufferOf :: (MonadIO m, Unbox a) => Int -> Socket -> Fold m (Array a) ()
Socket Specification
Specify the socket protocol details.
SockSpec | |
|
Accept Connections
accept :: MonadIO m => Int -> SockSpec -> SockAddr -> Stream m Socket Source #
Start a TCP stream server that listens for connections on the supplied server address specification (address family, local interface IP address and port). The server generates a stream of connected sockets. The first argument is the maximum number of pending connections in the backlog.
Pre-release
acceptor :: MonadIO m => Unfold m (Int, SockSpec, SockAddr) Socket Source #
Unfold a three tuple (listenQLen, spec, addr)
into a stream of connected
protocol sockets corresponding to incoming connections. listenQLen
is the
maximum number of pending connections in the backlog. spec
is the socket
protocol and options specification and addr
is the protocol address where
the server listens for incoming connections.
Reads
Singleton
getChunk :: Int -> Socket -> IO (Array Word8) Source #
Read a byte array from a file handle up to a maximum of the requested size. If no data is available on the handle it blocks until some data becomes available. If data is available then it immediately returns that data without blocking.
Streams
read :: MonadIO m => Socket -> Stream m Word8 Source #
Generate a byte stream from a socket.
>>>
read = Socket.readWith defaultChunkSize
Pre-release
readWith :: MonadIO m => Int -> Socket -> Stream m Word8 Source #
Generate a byte stream from a socket using a buffer of the given size.
Pre-release
readChunks :: MonadIO m => Socket -> Stream m (Array Word8) Source #
Read a stream of byte arrays from a socket. The maximum size of a single
array is limited to defaultChunkSize
.
>>>
readChunks = Socket.readChunksWith defaultChunkSize
Pre-release
readChunksWith :: MonadIO m => Int -> Socket -> Stream m (Array Word8) Source #
readChunksWith bufsize socket
reads a stream of arrays from socket
.
The maximum size of a single array is limited to bufsize
.
Pre-release
Unfolds
reader :: MonadIO m => Unfold m Socket Word8 Source #
Unfolds a Socket
into a byte stream. IO requests to the socket are
performed in sizes of
defaultChunkSize
.
readerWith :: MonadIO m => Unfold m (Int, Socket) Word8 Source #
Unfolds the tuple (bufsize, socket)
into a byte stream, read requests
to the socket are performed using buffers of bufsize
.
chunkReader :: MonadIO m => Unfold m Socket (Array Word8) Source #
Unfolds a socket into a stream of Word8
arrays. Requests to the socket
are performed using a buffer of size
defaultChunkSize
. The
size of arrays in the resulting stream are therefore less than or equal to
defaultChunkSize
.
chunkReaderWith :: MonadIO m => Unfold m (Int, Socket) (Array Word8) Source #
Unfold the tuple (bufsize, socket)
into a stream of Word8
arrays.
Read requests to the socket are performed using a buffer of size bufsize
.
The size of an array in the resulting stream is always less than or equal to
bufsize
.
Writes
Singleton
Folds
write :: MonadIO m => Socket -> Fold m Word8 () Source #
Write a byte stream to a socket. Accumulates the input in chunks of
up to defaultChunkSize
bytes before writing.
>>>
write = Socket.writeWith defaultChunkSize
writeWith :: MonadIO m => Int -> Socket -> Fold m Word8 () Source #
Write a byte stream to a socket. Accumulates the input in chunks of specified number of bytes before writing.
writeChunks :: (MonadIO m, Unbox a) => Socket -> Fold m (Array a) () Source #
Write a stream of arrays to a socket. Each array in the stream is written to the socket as a separate IO request.
writeChunksWith :: (MonadIO m, Unbox a) => Int -> Socket -> Fold m (Array a) () Source #
writeChunksWith bufsize socket
writes a stream of arrays to
socket
after coalescing the adjacent arrays in chunks of bufsize
.
Multiple arrays are coalesed as long as the total size remains below the
specified size. It never splits an array, if a single array is bigger than
the specified size it emitted as it is.
Exceptions
forSocketM :: (MonadMask m, MonadIO m) => (Socket -> m ()) -> Socket -> m () Source #
runs the monadic computation forSocketM
action socketaction
passing
the socket handle to it. The handle will be closed on exit from
forSocketM
, whether by normal termination or by raising an exception. If
closing the handle raises an exception, then this exception will be raised
by forSocketM
rather than any exception raised by action
.
Deprecated
readWithBufferOf :: MonadIO m => Unfold m (Int, Socket) Word8 Source #
Deprecated: Please use readerWith
instead
Same as readWith
readChunksWithBufferOf :: MonadIO m => Unfold m (Int, Socket) (Array Word8) Source #
Deprecated: Please use chunkReaderWith
instead
Same as chunkReaderWith
writeChunksWithBufferOf :: (MonadIO m, Unbox a) => Int -> Socket -> Fold m (Array a) () Source #
Deprecated: Please use writeChunksWith
instead
Same as writeChunksWith