Safe Haskell | Safe-Inferred |
---|---|
Language | Haskell2010 |
TCP implementation of the transport layer.
The TCP implementation guarantees that only a single TCP connection (socket)
will be used between endpoints, provided that the addresses specified are
canonical. If A connects to B and reports its address as
192.168.0.1:8080
and B subsequently connects tries to connect to A as
client1.local:http-alt
then the transport layer will not realize that the
TCP connection can be reused.
Applications that use the TCP transport should use
withSocketsDo
in their main function for Windows
compatibility (see Network.Socket).
Synopsis
- createTransport :: TCPAddr -> TCPParameters -> IO (Either IOException Transport)
- data TCPAddr
- defaultTCPAddr :: HostName -> ServiceName -> TCPAddr
- data TCPAddrInfo = TCPAddrInfo {}
- data TCPParameters = TCPParameters {
- tcpBacklog :: Int
- tcpReuseServerAddr :: Bool
- tcpReuseClientAddr :: Bool
- tcpNoDelay :: Bool
- tcpKeepAlive :: Bool
- tcpUserTimeout :: Maybe Int
- transportConnectTimeout :: Maybe Int
- tcpNewQDisc :: forall t. IO (QDisc t)
- tcpMaxAddressLength :: Word32
- tcpMaxReceiveLength :: Word32
- tcpCheckPeerHost :: Bool
- tcpServerExceptionHandler :: SomeException -> IO ()
- defaultTCPParameters :: TCPParameters
- createTransportExposeInternals :: TCPAddr -> TCPParameters -> IO (Either IOException (Transport, TransportInternals))
- data TransportInternals = TransportInternals {
- transportThread :: Maybe ThreadId
- newEndPointInternal :: (forall t. Maybe (QDisc t)) -> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
- socketBetween :: EndPointAddress -> EndPointAddress -> IO Socket
- type EndPointId = Word32
- data ControlHeader
- data ConnectionRequestResponse
- firstNonReservedLightweightConnectionId :: LightweightConnectionId
- firstNonReservedHeavyweightConnectionId :: HeavyweightConnectionId
- socketToEndPoint :: Maybe EndPointAddress -> EndPointAddress -> Bool -> Bool -> Bool -> Maybe Int -> Maybe Int -> IO (Either (TransportError ConnectErrorCode) (MVar (), Socket, ConnectionRequestResponse))
- type LightweightConnectionId = Word32
- data QDisc t = QDisc {
- qdiscDequeue :: IO t
- qdiscEnqueue :: EndPointAddress -> Event -> t -> IO ()
- simpleUnboundedQDisc :: forall t. IO (QDisc t)
- simpleOnePlaceQDisc :: forall t. IO (QDisc t)
Main API
createTransport :: TCPAddr -> TCPParameters -> IO (Either IOException Transport) Source #
Create a TCP transport
Addressability of a transport. If your transport cannot be connected to, for instance because it runs behind NAT, use Unaddressable.
defaultTCPAddr :: HostName -> ServiceName -> TCPAddr Source #
The bind and external host/port are the same.
data TCPAddrInfo Source #
A transport which is addressable from the network must give a host/port on which to bindlisten, and determine its external address (hostport) from the actual port (which may not be known, in case 0 is used for the bind port).
data TCPParameters Source #
Parameters for setting up the TCP transport
TCPParameters | |
|
defaultTCPParameters :: TCPParameters Source #
Default TCP parameters
Internals (exposed for unit tests)
createTransportExposeInternals :: TCPAddr -> TCPParameters -> IO (Either IOException (Transport, TransportInternals)) Source #
You should probably not use this function (used for unit testing only)
data TransportInternals Source #
Internal functionality we expose for unit testing
TransportInternals | |
|
type EndPointId = Word32 Source #
Local identifier for an endpoint within this transport
data ControlHeader Source #
Control headers
CreatedNewConnection | Tell the remote endpoint that we created a new connection |
CloseConnection | Tell the remote endpoint we will no longer be using a connection |
CloseSocket | Request to close the connection (see module description) |
CloseEndPoint | Sent by an endpoint when it is closed. |
ProbeSocket | Message sent to probe a socket |
ProbeSocketAck | Acknowledgement of the ProbeSocket message |
Instances
Show ControlHeader Source # | |
Defined in Network.Transport.TCP.Internal showsPrec :: Int -> ControlHeader -> ShowS # show :: ControlHeader -> String # showList :: [ControlHeader] -> ShowS # |
data ConnectionRequestResponse Source #
Response sent by B to A when A tries to connect
ConnectionRequestUnsupportedVersion | B does not support the protocol version requested by A. |
ConnectionRequestAccepted | B accepts the connection |
ConnectionRequestInvalid | A requested an invalid endpoint |
ConnectionRequestCrossed | As request crossed with a request from B (see protocols) |
ConnectionRequestHostMismatch | A gave an incorrect host (did not match the host that B observed). |
Instances
Show ConnectionRequestResponse Source # | |
Defined in Network.Transport.TCP.Internal showsPrec :: Int -> ConnectionRequestResponse -> ShowS # show :: ConnectionRequestResponse -> String # showList :: [ConnectionRequestResponse] -> ShowS # |
firstNonReservedLightweightConnectionId :: LightweightConnectionId Source #
We reserve a bunch of connection IDs for control messages
firstNonReservedHeavyweightConnectionId :: HeavyweightConnectionId Source #
We reserve some connection IDs for special heavyweight connections
:: Maybe EndPointAddress | Our address |
-> EndPointAddress | Their address |
-> Bool | Use SO_REUSEADDR? |
-> Bool | Use TCP_NODELAY |
-> Bool | Use TCP_KEEPALIVE |
-> Maybe Int | Maybe TCP_USER_TIMEOUT |
-> Maybe Int | Timeout for connect |
-> IO (Either (TransportError ConnectErrorCode) (MVar (), Socket, ConnectionRequestResponse)) |
Establish a connection to a remote endpoint
Maybe throw a TransportError
If a socket is created and returned (Right is given) then the caller is responsible for eventually closing the socket and filling the MVar (which is empty). The MVar must be filled immediately after, and never before, the socket is closed.
type LightweightConnectionId = Word32 Source #
Lightweight connection ID (sender allocated)
A ConnectionId is the concentation of a HeavyweightConnectionId
and a
LightweightConnectionId
.
Abstraction of a queue for an EndPoint
.
A value of type QDisc t
is a queue of events of an abstract type t
.
This specifies which Event
s will come from
'receive :: EndPoint -> IO Event' and when. It is highly general so that
the simple yet potentially very fast implementation backed by a single
unbounded channel can be used, without excluding more nuanced policies
like class-based queueing with bounded buffers for each peer, which may be
faster in certain conditions but probably has lower maximal throughput.
A QDisc
must satisfy some properties in order for the semantics of
network-transport to hold true. In general, an event fed with
qdiscEnqueue
must not be dropped. i.e. provided that no other event in
the QDisc has higher priority, the event should eventually be returned by
qdiscDequeue
. An exception to this are Receive
events of unreliable
connections.
Every call to receive
is just qdiscDequeue
on that EndPoint
s
QDisc
. Whenever an event arises from a socket, qdiscEnqueue
is called
with the relevant metadata in the same thread that reads from the socket.
You can be clever about when to block here, so as to control network
ingress. This applies also to loopback connections (an EndPoint
connects
to itself), in which case blocking on the enqueue would only block some
thread in your program rather than some chatty network peer. The Event
which is to be enqueued is given to qdiscEnqueue
so that the QDisc
can know about open connections, their identifiers and peer addresses, etc.
QDisc | |
|
simpleUnboundedQDisc :: forall t. IO (QDisc t) Source #
A very simple QDisc backed by an unbounded channel.
simpleOnePlaceQDisc :: forall t. IO (QDisc t) Source #
A very simple QDisc backed by a 1-place queue (MVar).
With this QDisc, all threads reading from sockets will try to put their
events into the same MVar. That MVar will be cleared by calls to
receive
. Thus the rate at which data is read from the wire is directly
related to the rate at which data is pulled from the EndPoint by
receive
.
Design notes
- Goals
The TCP transport maps multiple logical connections between A and B (in either direction) to a single TCP connection:
+-------+ +-------+ | A |==========================| B | | |>~~~~~~~~~~~~~~~~~~~~~~~~~|~~~\ | | Q |>~~~~~~~~~~~~~~~~~~~~~~~~~|~~~Q | | \~~~|~~~~~~~~~~~~~~~~~~~~~~~~~<| | | |==========================| | +-------+ +-------+
Ignoring the complications detailed below, the TCP connection is set up is when the first lightweight connection is created (in either direction), and torn down when the last lightweight connection (in either direction) is closed.
- Connecting
Let A, B be two endpoints without any connections. When A wants to connect to B, it locally records that it is trying to connect to B and sends a request to B. As part of the request A sends its own endpoint address to B (so that B can reuse the connection in the other direction).
When B receives the connection request it first checks if it did not
already initiate a connection request to A. If not it will acknowledge the
connection request by sending ConnectionRequestAccepted
to A and record
that it has a TCP connection to A.
The tricky case arises when A sends a connection request to B and B
finds that it had already sent a connection request to A. In this case B
will accept the connection request from A if As endpoint address is
smaller (lexicographically) than Bs, and reject it otherwise. If it rejects
it, it sends a ConnectionRequestCrossed
message to A. The
lexicographical ordering is an arbitrary but convenient way to break the
tie. If a connection exists between A and B when B rejects the request,
B will probe the connection to make sure it is healthy. If A does not
answer timely to the probe, B will discard the connection.
When it receives a ConnectionRequestCrossed
message the A thread that
initiated the request just needs to wait until the A thread that is dealing
with B's connection request completes, unless there is a network failure.
If there is a network failure, the initiator thread would timeout and return
an error.
- Disconnecting
The TCP connection is created as soon as the first logical connection from
A to B (or B to A) is established. At this point a thread (#
) is
spawned that listens for incoming connections from B:
+-------+ +-------+ | A |==========================| B | | |>~~~~~~~~~~~~~~~~~~~~~~~~~|~~~\ | | | | Q | | #| | | | |==========================| | +-------+ +-------+
The question is when the TCP connection can be closed again. Conceptually, we want to do reference counting: when there are no logical connections left between A and B we want to close the socket (possibly after some timeout).
However, A and B need to agree that the refcount has reached zero. It might happen that B sends a connection request over the existing socket at the same time that A closes its logical connection to B and closes the socket. This will cause a failure in B (which will have to retry) which is not caused by a network failure, which is unfortunate. (Note that the connection request from B might succeed even if A closes the socket.)
Instead, when A is ready to close the socket it sends a CloseSocket
request to B and records that its connection to B is closing. If A
receives a new connection request from B after having sent the
CloseSocket
request it simply forgets that it sent a CloseSocket
request
and increments the reference count of the connection again.
When B receives a CloseSocket
message and it too is ready to close the
connection, it will respond with a reciprocal CloseSocket
request to A
and then actually close the socket. A meanwhile will not send any more
requests to B after having sent a CloseSocket
request, and will actually
close its end of the socket only when receiving the CloseSocket
message
from B. (Since A recorded that its connection to B is in closing state
after sending a CloseSocket
request to B, it knows not to reciprocate B
reciprocal CloseSocket
message.)
If there is a concurrent thread in A waiting to connect to B after A
has sent a CloseSocket
request then this thread will block until A knows
whether to reuse the old socket (if B sends a new connection request
instead of acknowledging the CloseSocket
) or to set up a new socket.