streamly-0.8.3: Dataflow programming and declarative concurrency
Copyright(c) 2018 Composewell Technologies
LicenseBSD-3-Clause
Maintainerstreamly@composewell.com
Stabilityreleased
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.Network.Socket

Description

This module provides Array and stream based socket operations to connect to remote hosts, to receive connections from remote hosts, and to read and write streams and arrays of bytes to and from network sockets.

For basic socket types and operations please consult the Network.Socket module of the network package.

Examples

To write a server, use the accept unfold to start listening for connections from clients. accept supplies a stream of connected sockets. We can map an effectful action on this socket stream to handle the connections. The action would typically use socket reading and writing operations to communicate with the remote host. We can read/write a stream of bytes or a stream of chunks of bytes (Array).

Following is a short example of a concurrent echo server. Please note that this example can be written even more succinctly by using higher level operations from Streamly.Network.Inet.TCP module.

{-# LANGUAGE FlexibleContexts #-}

import Data.Function ((&))
import Network.Socket
import Streamly.Network.Socket (SockSpec(..))

import qualified Streamly.Prelude as Stream
import qualified Streamly.Network.Socket as Socket

main = do
    let spec = SockSpec
               { sockFamily = AF_INET
               , sockType   = Stream
               , sockProto  = defaultProtocol
               , sockOpts   = []
               }
        addr = SockAddrInet 8090 (tupleToHostAddress (0,0,0,0))
     in server spec addr

    where

    server spec addr =
          Stream.unfold Socket.accept (maxListenQueue, spec, addr) -- ParallelT IO Socket
        & Stream.mapM (Socket.forSocketM echo)                     -- ParallelT IO ()
        & Stream.fromParallel                                      -- SerialT IO ()
        & Stream.drain                                             -- IO ()

    echo sk =
          Stream.unfold Socket.readChunks sk  -- SerialT IO (Array Word8)
        & Stream.fold (Socket.writeChunks sk) -- IO ()

Programmer Notes

Read IO requests to connected stream sockets are performed in chunks of defaultChunkSize. Unless specified otherwise in the API, writes are collected into chunks of defaultChunkSize before they are written to the socket. APIs are provided to control the chunking behavior.

import qualified Streamly.Network.Socket as Socket

See Also

Synopsis

Socket Specification

data SockSpec Source #

Specify the socket protocol details.

Accept Connections

accept :: MonadIO m => Unfold m (Int, SockSpec, SockAddr) Socket Source #

Unfold a three tuple (listenQLen, spec, addr) into a stream of connected protocol sockets corresponding to incoming connections. listenQLen is the maximum number of pending connections in the backlog. spec is the socket protocol and options specification and addr is the protocol address where the server listens for incoming connections.

Since: 0.7.0

Read

read :: MonadIO m => Unfold m Socket Word8 Source #

Unfolds a Socket into a byte stream. IO requests to the socket are performed in sizes of defaultChunkSize.

Since: 0.7.0

readWithBufferOf :: MonadIO m => Unfold m (Int, Socket) Word8 Source #

Unfolds the tuple (bufsize, socket) into a byte stream, read requests to the socket are performed using buffers of bufsize.

Since: 0.7.0

readChunks :: MonadIO m => Unfold m Socket (Array Word8) Source #

Unfolds a socket into a stream of Word8 arrays. Requests to the socket are performed using a buffer of size defaultChunkSize. The size of arrays in the resulting stream are therefore less than or equal to defaultChunkSize.

Since: 0.7.0

readChunksWithBufferOf :: MonadIO m => Unfold m (Int, Socket) (Array Word8) Source #

Unfold the tuple (bufsize, socket) into a stream of Word8 arrays. Read requests to the socket are performed using a buffer of size bufsize. The size of an array in the resulting stream is always less than or equal to bufsize.

Since: 0.7.0

readChunk :: Int -> Socket -> IO (Array Word8) Source #

Read a byte array from a file handle up to a maximum of the requested size. If no data is available on the handle it blocks until some data becomes available. If data is available then it immediately returns that data without blocking.

Since: 0.8.0

Write

write :: MonadIO m => Socket -> Fold m Word8 () Source #

Write a byte stream to a socket. Accumulates the input in chunks of up to defaultChunkSize bytes before writing.

write = writeWithBufferOf defaultChunkSize

Since: 0.7.0

writeWithBufferOf :: MonadIO m => Int -> Socket -> Fold m Word8 () Source #

Write a byte stream to a socket. Accumulates the input in chunks of specified number of bytes before writing.

Since: 0.7.0

writeChunks :: (MonadIO m, Storable a) => Socket -> Fold m (Array a) () Source #

Write a stream of arrays to a socket. Each array in the stream is written to the socket as a separate IO request.

Since: 0.7.0

writeChunksWithBufferOf :: (MonadIO m, Storable a) => Int -> Socket -> Fold m (Array a) () Source #

writeChunksWithBufferOf bufsize socket writes a stream of arrays to socket after coalescing the adjacent arrays in chunks of bufsize. Multiple arrays are coalesed as long as the total size remains below the specified size. It never splits an array, if a single array is bigger than the specified size it emitted as it is.

Since: 0.8.0

writeChunk :: Storable a => Socket -> Array a -> IO () Source #

Write an Array to a file handle.

Since: 0.8.0

Exceptions

forSocketM :: (MonadMask m, MonadIO m) => (Socket -> m ()) -> Socket -> m () Source #

forSocketM action socket runs the monadic computation action passing the socket handle to it. The handle will be closed on exit from forSocketM, whether by normal termination or by raising an exception. If closing the handle raises an exception, then this exception will be raised by forSocketM rather than any exception raised by action.

Since: 0.8.0