Copyright | (c) Serokell 2016 |
---|---|
License | GPL-3 (see the file LICENSE) |
Maintainer | Serokell <hi@serokell.io> |
Stability | experimental |
Portability | POSIX, GHC |
Safe Haskell | None |
Language | Haskell2010 |
This module provides implementation of MonadTransfer
.
It operates with so called lively sockets, so that, if error occured while sending or receiving, it would try to restore connection before reporting error.
When some data is sent for first time to given address, connection with single lively-socket is created; it would be reused for further sends until closed.
Then server is getting up at some port, it creates single thread to handle incoming connections, then for each input connection lively-socket is created.
TODO [TW-67]: close all connections upon quiting Transfer
monad.
About lively sockets:
Lively socket keeps queue of byte chunks inside.
For given lively-socket, send
function just pushes chunks to send-queue, whenever
receive
infinitelly acquires chunks from receive-queue.
Those queues are connected to plain socket behind the scene.
Let's say lively socket to be active if it successfully sends and receives required data at the moment. Upon becoming active, lively socket spawns `processing-observer` thread, which itself spawns 3 threads: one pushes chunks from send-queue to socket, another one pulls chunks from socket to receive-queue, and the last tracks whether socket was closed. Processor thread finishes in one of the following cases:
- One of it's children threads threw an error
- Socket was closed
If some error occures, lively socket goes to exceptional state (which is not expressed in code, however), where it could be closed or provided with newly created plain socket to continue work with and thus become active again.
UPGRADE-NOTE [TW-59]: Currently, if an error in listener occures (parse error), socket gets closed. Need to make it reconnect, if possible.
- newtype Transfer s a = Transfer {
- getTransfer :: ReaderT Settings (ReaderT (TVar (ConnectionPool s)) (ReaderT (IO s) (LoggerNameBox TimedIO))) a
- data TransferException = AlreadyListeningOutbound Text
- data ConnectionPool s
- runTransfer :: IO s -> Transfer s a -> LoggerNameBox TimedIO a
- runTransferS :: Settings -> IO s -> Transfer s a -> LoggerNameBox TimedIO a
- runTransferRaw :: Settings -> TVar (ConnectionPool s) -> IO s -> Transfer s a -> LoggerNameBox TimedIO a
- getConnPool :: Transfer s (TVar (ConnectionPool s))
- type FailsInRow = Int
- data Settings = Settings {
- queueSize :: Int
- reconnectPolicy :: forall m. (HasLoggerName m, MonadIO m) => FailsInRow -> m (Maybe Microsecond)
Transfer
Transfer | |
|
MonadBase IO (Transfer s) Source # | |
MonadBaseControl IO (Transfer s) Source # | |
MonadTransfer s (Transfer s) Source # | |
Monad (Transfer s) Source # | |
Functor (Transfer s) Source # | |
Applicative (Transfer s) Source # | |
MonadIO (Transfer s) Source # | |
MonadThrow (Transfer s) Source # | |
MonadCatch (Transfer s) Source # | |
MonadMask (Transfer s) Source # | |
CanLog (Transfer s) Source # | |
HasLoggerName (Transfer s) Source # | |
MonadTimed (Transfer s) Source # | |
type ThreadId (Transfer s) Source # | |
type StM (Transfer s) a Source # | |
data TransferException Source #
Error thrown if attempt to listen at already being listened connection is performed.
data ConnectionPool s Source #
runTransfer :: IO s -> Transfer s a -> LoggerNameBox TimedIO a Source #
Run Transfer
, with a way to create initial state for socket.
runTransferS :: Settings -> IO s -> Transfer s a -> LoggerNameBox TimedIO a Source #
Run with specified settings.
runTransferRaw :: Settings -> TVar (ConnectionPool s) -> IO s -> Transfer s a -> LoggerNameBox TimedIO a Source #
getConnPool :: Transfer s (TVar (ConnectionPool s)) Source #
Settings
type FailsInRow = Int Source #
Number of consequent fails while trying to establish connection.
Settings | |
|