Safe Haskell | None |
---|---|
Language | Haskell2010 |
- data Node = Node {
- nodeHost :: HostName
- nodePort :: Int
- connection :: Maybe (MVar Pool)
- nodeServices :: Service
- newtype Cloud a = Cloud {
- runCloud' :: TransIO a
- runCloud :: Cloud a -> TransIO a
- tlsHooks :: IORef (SData -> ByteString -> IO (), SData -> IO ByteString, Socket -> ByteString -> TransIO (), String -> Socket -> ByteString -> TransIO ())
- sendTLSData :: SData -> ByteString -> IO ()
- local :: Loggable a => TransIO a -> Cloud a
- runCloudIO :: Typeable a => Cloud a -> IO (Maybe a)
- runCloudIO' :: Typeable a => Cloud a -> IO (Maybe a)
- onAll :: TransIO a -> Cloud a
- lazy :: TransIO a -> Cloud a
- fixRemote :: TransIO b -> Cloud b
- fixClosure :: Cloud ()
- loggedc :: Loggable a => Cloud a -> Cloud a
- loggedc' :: Loggable a => Cloud a -> Cloud a
- lliftIO :: Loggable a => IO a -> Cloud a
- localIO :: Loggable a => IO a -> Cloud a
- fullStop :: TransIO stop
- beamTo :: Node -> Cloud ()
- forkTo :: Node -> Cloud ()
- callTo :: Loggable a => Node -> Cloud a -> Cloud a
- callTo' :: (Show a, Read a, Typeable a) => Node -> Cloud a -> Cloud a
- atRemote :: Loggable a => Cloud a -> Cloud a
- runAt :: Loggable a => Node -> Cloud a -> Cloud a
- single :: TransIO a -> TransIO a
- unique :: a -> TransIO ()
- wormhole :: Loggable a => Node -> Cloud a -> Cloud a
- type JSString = String
- pack :: a -> a
- data CloudException = CloudException Node IdClosure String
- teleport :: Cloud ()
- reportBack :: TransIO ()
- copyData :: (Typeable * b, Read b, Show b) => b -> Cloud b
- putMailbox :: Typeable val => val -> TransIO ()
- putMailbox' :: (Typeable key, Ord key, Typeable val) => key -> val -> TransIO ()
- newMailbox :: MailboxId -> TransIO ()
- errorMailBox :: a
- getMailbox :: Typeable val => TransIO val
- getMailbox' :: (Typeable key, Ord key, Typeable val) => key -> TransIO val
- cleanMailbox :: Typeable a => a -> TransIO ()
- cleanMailbox' :: Typeable a => Int -> a -> TransIO ()
- clustered :: Loggable a => Cloud a -> Cloud a
- mclustered :: (Monoid a, Loggable a) => Cloud a -> Cloud a
- callNodes :: (Show a2, Show a1, Read a2, Read a1, Typeable * a2, Typeable * a1) => (Cloud a2 -> Cloud a1 -> Cloud a1) -> Cloud a1 -> Cloud a2 -> Cloud a1
- callNodes' :: (Typeable * a2, Typeable * a1, Read a2, Read a1, Show a2, Show a1) => [Node] -> (Cloud a2 -> Cloud a1 -> Cloud a1) -> Cloud a1 -> Cloud a2 -> Cloud a1
- sendRaw :: MonadIO m => Connection -> ByteString -> m ()
- type LengthFulLog = Int
- data NodeMSG
- msend :: Connection -> StreamData NodeMSG -> TransIO ()
- mread :: Loggable a => Connection -> TransIO (StreamData a)
- parallelReadHandler :: Loggable a => TransIO (StreamData a)
- getWebServerNode :: TransIO Node
- mclose :: Connection -> IO ()
- mconnect :: Node -> TransIO Connection
- data ConnectionError = ConnectionError String Node
- connectTo' :: Int -> HostName -> PortID -> IO Socket
- type Blocked = MVar ()
- type BuffSize = Int
- data ConnectionData
- = Node2Node { }
- | TLSNode2Node {
- tlscontext :: SData
- | Node2Web { }
- | Self
- | Relay Connection Node
- data MailboxId = (Typeable a, Ord a) => MailboxId a TypeRep
- data Connection = Connection {}
- defConnection :: (MonadIO m, MonadState EventF m) => m Connection
- setBuffSize :: Int -> TransIO ()
- getBuffSize :: TransIO BuffSize
- listen :: Node -> Cloud ()
- relay :: StreamData NodeMSG -> TransIO ()
- nat :: Node -> Node -> Node -> [Node] -> (Node, Node)
- listenNew :: PortNumber -> Connection -> TransIO (StreamData NodeMSG)
- rsetHost :: IORef Bool
- listenResponses :: Loggable a => TransIO (StreamData a)
- type IdClosure = Int
- newtype Closure = Closure IdClosure
- type RemoteClosure = (Node, IdClosure)
- newtype JobGroup = JobGroup (Map String RemoteClosure)
- stopRemoteJob :: String -> Cloud ()
- resetRemote :: Cloud ()
- manageClosures :: Cloud ()
- execLog :: StreamData NodeMSG -> TransIO ()
- type Pool = [Connection]
- type Package = String
- type Program = String
- type Service = [(Package, Program)]
- readFrom :: Connection -> IO ByteString
- toStrict :: ByteString -> ByteString
- makeWSStreamFromConn :: Connection -> IO Stream
- makeWebsocketConnection :: MonadIO m => Connection -> ByteString -> Headers -> m Connection
- servePages :: (a, ByteString, [(CI ByteString, b)]) -> TransIO ()
- api :: TransIO ByteString -> Cloud ()
- isWebSocketsReq :: [(CI ByteString, b)] -> Bool
- data HTTPMethod
- getFirstLine :: TransIO (ByteString, ByteString, ByteString)
- getRawHeaders :: TransIO ByteString
- type PostParams = [(ByteString, String)]
- parsePostUrlEncoded :: TransIO PostParams
- getHeaders :: TransIO [(CI ByteString, ByteString)]
- isBrowserInstance :: Bool
- emptyPool :: MonadIO m => m (MVar Pool)
- createNodeServ :: HostName -> Int -> Service -> IO Node
- createNode :: HostName -> Int -> IO Node
- createWebNode :: IO Node
- nodeList :: TVar [Node]
- errorMyNode :: [Char] -> a
- getMyNode :: TransIO Node
- getNodes :: MonadIO m => m [Node]
- getEqualNodes :: TransIO [Node]
- matchNodes :: MonadIO m => ((Package, Program) -> Bool) -> m [[(Package, Program)]]
- addNodes :: [Node] -> TransIO ()
- fixNode :: MonadIO m => Node -> m Node
- setNodes :: MonadIO m => [Node] -> m ()
- shuffleNodes :: MonadIO m => m [Node]
- connect :: Node -> Node -> Cloud ()
- connect' :: Node -> Cloud ()
Documentation
runCloud :: Cloud a -> TransIO a Source #
Execute a distributed computation inside a TransIO computation.
All the computations in the TransIO monad that enclose the cloud computation must be logged
tlsHooks :: IORef (SData -> ByteString -> IO (), SData -> IO ByteString, Socket -> ByteString -> TransIO (), String -> Socket -> ByteString -> TransIO ()) Source #
sendTLSData :: SData -> ByteString -> IO () Source #
runCloudIO :: Typeable a => Cloud a -> IO (Maybe a) Source #
Run a distributed computation inside the IO monad. Enables asynchronous
console input (see keep
).
runCloudIO' :: Typeable a => Cloud a -> IO (Maybe a) Source #
Run a distributed computation inside the IO monad with no console input.
onAll :: TransIO a -> Cloud a Source #
alternative to local
It means that if the computation is translated to other node
this will be executed again if this has not been executed inside a local
computation.
onAll foo local foo' local $ do bar runCloud $ do onAll baz runAt node .... callTo node' .....
Here foo will be executed in node' but foo' bar and baz don't.
However foo bar and baz will e executed in node.
lazy :: TransIO a -> Cloud a Source #
only executes if the result is demanded. It is useful when the conputation result is only used in the remote node, but it is not serializable.
fixRemote :: TransIO b -> Cloud b Source #
executes a non-serilizable action in the remote node, whose result can be used by subsequent remote invocations
fixClosure :: Cloud () Source #
experimental: subsequent remote invocatioms will send logs to this closure. Therefore logs will be shorter.
Also, non serializable statements before it will not be re-executed
fullStop :: TransIO stop Source #
stop the current computation and does not execute any alternative computation
forkTo :: Node -> Cloud () Source #
execute in the remote node a process with the same execution state
callTo :: Loggable a => Node -> Cloud a -> Cloud a Source #
open a wormhole to another node and executes an action on it. currently by default it keep open the connection to receive additional requests and responses (streaming)
callTo' :: (Show a, Read a, Typeable a) => Node -> Cloud a -> Cloud a Source #
A connectionless version of callTo for long running remote calls
atRemote :: Loggable a => Cloud a -> Cloud a Source #
Within a connection to a node opened by wormhole
, it run the computation in the remote node and return
the result back to the original node.
If atRemote
is executed in the remote node, then the computation is executed in the original node
wormhole node2 $ do t <- atRemote $ do r <- foo -- executed in node2 s <- atRemote bar r -- executed in the original node baz s -- in node2 bat t -- in the original node
runAt :: Loggable a => Node -> Cloud a -> Cloud a Source #
Execute a computation in the node that initiated the connection.
if the sequence of connections is n1 -> n2 -> n3 then `atCallingNode $ atCallingNode foo` in n3
would execute foo
in n1, -- while `atRemote $ atRemote foo` would execute it in n3
atCallingNode :: Loggable a => Cloud a -> Cloud a
atCallingNode proc= connectCaller $ atRemote proc
synonymous of callTo
single :: TransIO a -> TransIO a Source #
run a single thread with that action for each connection created. When the same action is re-executed within that connection, all the threads generated by the previous execution are killed
box <- foo r <- runAt node . local . single $ getMailbox box localIO $ print r
if foo return differnt mainbox indentifiers, the above code would print the messages of the last one. Without single, it would print the messages of all of them.
unique :: a -> TransIO () Source #
run an unique continuation for each connection. The first thread that execute unique
is
executed for that connection. The rest are ignored.
wormhole :: Loggable a => Node -> Cloud a -> Cloud a Source #
A wormhole opens a connection with another node anywhere in a computation.
teleport
uses this connection to translate the computation back and forth between the two nodes connected
type JSString = String Source #
connect to the caller node. connectCaller :: Loggable a => Cloud a -> Cloud a connectCaller (Cloud comp)= local $ do conn getState ! CONNECTCALLER case connData conn of Nothing -> empty Just Self -> empty _ -> if not $ calling conn !> ("calling", calling conn) then comp else do ParentConnection conn mmclosure getState <| error "connectCaller: No connection defined: use wormhole" moldconn <- getData :: TransIO (Maybe Connection) mclosure <- getData :: TransIO (Maybe Closure)
data CloudException Source #
reportBack :: TransIO () Source #
forward exceptions to the calling node
copyData :: (Typeable * b, Read b, Show b) => b -> Cloud b Source #
copy a session data variable from the local to the remote node. If there is none set in the local node, The parameter is the default value. In this case, the default value is also set in the local node.
putMailbox :: Typeable val => val -> TransIO () Source #
write to the mailbox
Mailboxes are node-wide, for all processes that share the same connection data, that is, are under the
same listen
or connect
while EVars are only visible by the process that initialized it and his children.
Internally, the mailbox is in a well known EVar stored by listen
in the Connection
state.
putMailbox' :: (Typeable key, Ord key, Typeable val) => key -> val -> TransIO () Source #
write to a mailbox identified by an identifier besides the type
newMailbox :: MailboxId -> TransIO () Source #
errorMailBox :: a Source #
getMailbox :: Typeable val => TransIO val Source #
get messages from the mailbox that matches with the type expected.
The order of reading is defined by readTChan
This is reactive. it means that each new message trigger the execution of the continuation
each message wake up all the getMailbox
computations waiting for it.
getMailbox' :: (Typeable key, Ord key, Typeable val) => key -> TransIO val Source #
read from a mailbox identified by an identifier besides the type
cleanMailbox :: Typeable a => a -> TransIO () Source #
delete all subscriptions for that mailbox expecting this kind of data
cleanMailbox' :: Typeable a => Int -> a -> TransIO () Source #
clean a mailbox identified by an Int and the type
clustered :: Loggable a => Cloud a -> Cloud a Source #
execute a Transient action in each of the nodes connected.
The response of each node is received by the invoking node and processed by the rest of the procedure. By default, each response is processed in a new thread. To restrict the number of threads use the thread control primitives.
this snippet receive a message from each of the simulated nodes:
main = keep $ do let nodes= map createLocalNode [2000..2005] addNodes nodes (foldl (<|>) empty $ map listen nodes) <|> return () r <- clustered $ do Connection (Just(PortNumber port, _, _, _)) _ <- getSData return $ "hi from " ++ show port++ "\n" liftIO $ putStrLn r where createLocalNode n= createNode "localhost" (PortNumber n)
callNodes :: (Show a2, Show a1, Read a2, Read a1, Typeable * a2, Typeable * a1) => (Cloud a2 -> Cloud a1 -> Cloud a1) -> Cloud a1 -> Cloud a2 -> Cloud a1 Source #
callNodes' :: (Typeable * a2, Typeable * a1, Read a2, Read a1, Show a2, Show a1) => [Node] -> (Cloud a2 -> Cloud a1 -> Cloud a1) -> Cloud a1 -> Cloud a2 -> Cloud a1 Source #
sendRaw :: MonadIO m => Connection -> ByteString -> m () Source #
type LengthFulLog = Int Source #
msend :: Connection -> StreamData NodeMSG -> TransIO () Source #
mread :: Loggable a => Connection -> TransIO (StreamData a) Source #
parallelReadHandler :: Loggable a => TransIO (StreamData a) Source #
getWebServerNode :: TransIO Node Source #
mclose :: Connection -> IO () Source #
mconnect :: Node -> TransIO Connection Source #
data ConnectionError Source #
data ConnectionData Source #
Node2Node | |
TLSNode2Node | |
| |
Node2Web | |
Self | |
Relay Connection Node |
data Connection Source #
defConnection :: (MonadIO m, MonadState EventF m) => m Connection Source #
setBuffSize :: Int -> TransIO () Source #
getBuffSize :: TransIO BuffSize Source #
listenNew :: PortNumber -> Connection -> TransIO (StreamData NodeMSG) Source #
listenResponses :: Loggable a => TransIO (StreamData a) Source #
type RemoteClosure = (Node, IdClosure) Source #
stopRemoteJob :: String -> Cloud () Source #
if there is a remote job identified by th string identifier, it stop that job, and set the
current remote operation (if any) as the current remote job for this identifier.
The purpose is to have a single remote job.
to identify the remote job, it should be used after the wormhole
and before the remote call:
r <- wormhole node $ do stopRemoteJob "streamlog" atRemote myRemotejob
So:
runAtUnique ident node job= wormhole node $ do stopRemoteJob ident; aRemote job
resetRemote :: Cloud () Source #
manageClosures :: Cloud () Source #
delete closures in a remote node when is requested by resetRemote
or stopRemoteJob
.
This is necessary because a remote closure can be reactive or may take a long time.
It should be located as an alternative computation to the program:
main= initNode $ inputNodes <|> manageClosures <|> myCloudCode
type Pool = [Connection] Source #
readFrom :: Connection -> IO ByteString Source #
toStrict :: ByteString -> ByteString Source #
makeWebsocketConnection :: MonadIO m => Connection -> ByteString -> Headers -> m Connection Source #
servePages :: (a, ByteString, [(CI ByteString, b)]) -> TransIO () Source #
api :: TransIO ByteString -> Cloud () Source #
isWebSocketsReq :: [(CI ByteString, b)] -> Bool Source #
getFirstLine :: TransIO (ByteString, ByteString, ByteString) Source #
getRawHeaders :: TransIO ByteString Source #
type PostParams = [(ByteString, String)] Source #
parsePostUrlEncoded :: TransIO PostParams Source #
getHeaders :: TransIO [(CI ByteString, ByteString)] Source #
isBrowserInstance :: Bool Source #
Returns True
if we are running in the browser.
createNodeServ :: HostName -> Int -> Service -> IO Node Source #
Create a node from a hostname (or IP address), port number and a list of services.
createWebNode :: IO Node Source #
errorMyNode :: [Char] -> a Source #
getMyNode :: TransIO Node Source #
Return the local node i.e. the node where this computation is running.
getEqualNodes :: TransIO [Node] Source #
shuffleNodes :: MonadIO m => m [Node] Source #
Shuffle the list of cluster nodes and return the shuffled list.
connect :: Node -> Node -> Cloud () Source #
Add a node (first parameter) to the cluster using a node that is already part of the cluster (second parameter). The added node starts listening for incoming connections and the rest of the computation is executed on this newly added node.
connect' :: Node -> Cloud () Source #
Reconcile the list of nodes in the cluster using a remote node already part of the cluster. Reconciliation end up in each node in the cluster having the same list of nodes.