time-warp-1.1.1.2: Distributed systems execution emulation

Copyright(c) Serokell 2016
LicenseGPL-3 (see the file LICENSE)
MaintainerSerokell <hi@serokell.io>
Stabilityexperimental
PortabilityPOSIX, GHC
Safe HaskellNone
LanguageHaskell2010

Control.TimeWarp.Rpc.Transfer

Contents

Description

This module provides implementation of MonadTransfer.

It operates with so called lively sockets, so that, if error occured while sending or receiving, it would try to restore connection before reporting error.

When some data is sent for first time to given address, connection with single lively-socket is created; it would be reused for further sends until closed.

Then server is getting up at some port, it creates single thread to handle incoming connections, then for each input connection lively-socket is created.

TODO [TW-67]: close all connections upon quiting Transfer monad.

About lively sockets:

Lively socket keeps queue of byte chunks inside. For given lively-socket, send function just pushes chunks to send-queue, whenever receive infinitelly acquires chunks from receive-queue. Those queues are connected to plain socket behind the scene.

Let's say lively socket to be active if it successfully sends and receives required data at the moment. Upon becoming active, lively socket spawns `processing-observer` thread, which itself spawns 3 threads: one pushes chunks from send-queue to socket, another one pulls chunks from socket to receive-queue, and the last tracks whether socket was closed. Processor thread finishes in one of the following cases:

  • One of it's children threads threw an error
  • Socket was closed

If some error occures, lively socket goes to exceptional state (which is not expressed in code, however), where it could be closed or provided with newly created plain socket to continue work with and thus become active again.

UPGRADE-NOTE [TW-59]: Currently, if an error in listener occures (parse error), socket gets closed. Need to make it reconnect, if possible.

Synopsis

Transfer

newtype Transfer s a Source #

Instances

MonadBase IO (Transfer s) Source # 

Methods

liftBase :: IO α -> Transfer s α #

MonadBaseControl IO (Transfer s) Source # 

Associated Types

type StM (Transfer s :: * -> *) a :: * #

Methods

liftBaseWith :: (RunInBase (Transfer s) IO -> IO a) -> Transfer s a #

restoreM :: StM (Transfer s) a -> Transfer s a #

MonadTransfer s (Transfer s) Source # 
Monad (Transfer s) Source # 

Methods

(>>=) :: Transfer s a -> (a -> Transfer s b) -> Transfer s b #

(>>) :: Transfer s a -> Transfer s b -> Transfer s b #

return :: a -> Transfer s a #

fail :: String -> Transfer s a #

Functor (Transfer s) Source # 

Methods

fmap :: (a -> b) -> Transfer s a -> Transfer s b #

(<$) :: a -> Transfer s b -> Transfer s a #

Applicative (Transfer s) Source # 

Methods

pure :: a -> Transfer s a #

(<*>) :: Transfer s (a -> b) -> Transfer s a -> Transfer s b #

(*>) :: Transfer s a -> Transfer s b -> Transfer s b #

(<*) :: Transfer s a -> Transfer s b -> Transfer s a #

MonadIO (Transfer s) Source # 

Methods

liftIO :: IO a -> Transfer s a #

MonadThrow (Transfer s) Source # 

Methods

throwM :: Exception e => e -> Transfer s a #

MonadCatch (Transfer s) Source # 

Methods

catch :: Exception e => Transfer s a -> (e -> Transfer s a) -> Transfer s a #

MonadMask (Transfer s) Source # 

Methods

mask :: ((forall a. Transfer s a -> Transfer s a) -> Transfer s b) -> Transfer s b #

uninterruptibleMask :: ((forall a. Transfer s a -> Transfer s a) -> Transfer s b) -> Transfer s b #

CanLog (Transfer s) Source # 
HasLoggerName (Transfer s) Source # 
MonadTimed (Transfer s) Source # 
type ThreadId (Transfer s) Source # 
type StM (Transfer s) a Source # 

runTransfer :: IO s -> Transfer s a -> LoggerNameBox TimedIO a Source #

Run Transfer, with a way to create initial state for socket.

runTransferS :: Settings -> IO s -> Transfer s a -> LoggerNameBox TimedIO a Source #

Run with specified settings.

Settings

type FailsInRow = Int Source #

Number of consequent fails while trying to establish connection.

data Settings Source #

Constructors

Settings 

Fields

Instances

Default Settings Source #

Default settings, you can use it like transferSettings { queueSize = 1 }

Methods

def :: Settings #