{-# OPTIONS_GHC -fno-warn-orphans #-}
module Control.Distributed.Process.Backend.SimpleLocalnet
(
Backend(..)
, initializeBackend
, startSlave
, terminateSlave
, findSlaves
, terminateAllSlaves
, startMaster
) where
import System.IO (fixIO)
import Data.Maybe (catMaybes)
import Data.Binary (Binary(get, put), getWord8, putWord8)
import Data.Accessor (Accessor, accessor, (^:), (^.))
import Data.Set (Set)
import qualified Data.Set as Set (insert, empty, toList)
import Data.Foldable (forM_)
import Data.Typeable (Typeable)
import Control.Exception (throw)
import Control.Monad (forever, replicateM, replicateM_)
import Control.Monad.Catch (bracket, try, finally)
import Control.Monad.IO.Class (liftIO)
import Control.Concurrent (forkIO, threadDelay, ThreadId)
import Control.Concurrent.MVar (MVar, newMVar, readMVar, modifyMVar_)
import Control.Distributed.Process
( RemoteTable
, NodeId
, Process
, ProcessId
, WhereIsReply(..)
, whereis
, whereisRemoteAsync
, getSelfPid
, register
, reregister
, expect
, nsendRemote
, receiveWait
, match
, processNodeId
, monitorNode
, monitor
, unmonitor
, NodeMonitorNotification(..)
, ProcessRegistrationException
, newChan
, receiveChan
, nsend
, SendPort
, send
)
import qualified Control.Distributed.Process.Node as Node
( LocalNode
, newLocalNode
, localNodeId
, runProcess
)
import qualified Network.Transport.TCP as NT
( createTransport
, defaultTCPParameters
, TCPAddr(Addressable)
, TCPAddrInfo(TCPAddrInfo)
)
import qualified Network.Transport as NT (Transport)
import qualified Network.Socket as N (HostName, ServiceName, SockAddr)
import Control.Distributed.Process.Backend.SimpleLocalnet.Internal.Multicast (initMulticast)
data Backend = Backend {
Backend -> IO LocalNode
newLocalNode :: IO Node.LocalNode
, Backend -> Int -> IO [NodeId]
findPeers :: Int -> IO [NodeId]
, Backend -> [ProcessId] -> Process ()
redirectLogsHere :: [ProcessId] -> Process ()
}
data BackendState = BackendState {
BackendState -> [LocalNode]
_localNodes :: [Node.LocalNode]
, BackendState -> Set NodeId
_peers :: Set NodeId
, BackendState -> ThreadId
discoveryDaemon :: ThreadId
}
initializeBackend :: N.HostName -> N.ServiceName -> RemoteTable -> IO Backend
initializeBackend :: String -> String -> RemoteTable -> IO Backend
initializeBackend String
host String
port RemoteTable
rtable = do
Either IOException Transport
mTransport <- TCPAddr -> TCPParameters -> IO (Either IOException Transport)
NT.createTransport (TCPAddrInfo -> TCPAddr
NT.Addressable (TCPAddrInfo -> TCPAddr) -> TCPAddrInfo -> TCPAddr
forall a b. (a -> b) -> a -> b
$ String -> String -> (String -> (String, String)) -> TCPAddrInfo
NT.TCPAddrInfo String
host String
port (\String
sn -> (String
host, String
sn)))
TCPParameters
NT.defaultTCPParameters
(IO (PeerDiscoveryMsg, SockAddr)
recv, PeerDiscoveryMsg -> IO ()
sendp) <- String
-> PortNumber
-> Int
-> IO (IO (PeerDiscoveryMsg, SockAddr), PeerDiscoveryMsg -> IO ())
forall a.
Binary a =>
String -> PortNumber -> Int -> IO (IO (a, SockAddr), a -> IO ())
initMulticast String
"224.0.0.99" PortNumber
9999 Int
1024
(ThreadId
_, MVar BackendState
backendState) <- ((ThreadId, MVar BackendState) -> IO (ThreadId, MVar BackendState))
-> IO (ThreadId, MVar BackendState)
forall a. (a -> IO a) -> IO a
fixIO (((ThreadId, MVar BackendState)
-> IO (ThreadId, MVar BackendState))
-> IO (ThreadId, MVar BackendState))
-> ((ThreadId, MVar BackendState)
-> IO (ThreadId, MVar BackendState))
-> IO (ThreadId, MVar BackendState)
forall a b. (a -> b) -> a -> b
$ \ ~(ThreadId
tid, MVar BackendState
_) -> do
MVar BackendState
backendState <- BackendState -> IO (MVar BackendState)
forall a. a -> IO (MVar a)
newMVar BackendState
{ _localNodes :: [LocalNode]
_localNodes = []
, _peers :: Set NodeId
_peers = Set NodeId
forall a. Set a
Set.empty
, discoveryDaemon :: ThreadId
discoveryDaemon = ThreadId
tid
}
ThreadId
tid' <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ MVar BackendState
-> IO (PeerDiscoveryMsg, SockAddr)
-> (PeerDiscoveryMsg -> IO ())
-> IO ()
peerDiscoveryDaemon MVar BackendState
backendState IO (PeerDiscoveryMsg, SockAddr)
recv PeerDiscoveryMsg -> IO ()
sendp
(ThreadId, MVar BackendState) -> IO (ThreadId, MVar BackendState)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId
tid', MVar BackendState
backendState)
case Either IOException Transport
mTransport of
Left IOException
err -> IOException -> IO Backend
forall a e. Exception e => e -> a
throw IOException
err
Right Transport
transport ->
let backend :: Backend
backend = Backend {
newLocalNode :: IO LocalNode
newLocalNode = Transport -> RemoteTable -> MVar BackendState -> IO LocalNode
apiNewLocalNode Transport
transport RemoteTable
rtable MVar BackendState
backendState
, findPeers :: Int -> IO [NodeId]
findPeers = (PeerDiscoveryMsg -> IO ())
-> MVar BackendState -> Int -> IO [NodeId]
apiFindPeers PeerDiscoveryMsg -> IO ()
sendp MVar BackendState
backendState
, redirectLogsHere :: [ProcessId] -> Process ()
redirectLogsHere = Backend -> [ProcessId] -> Process ()
apiRedirectLogsHere Backend
backend
}
in Backend -> IO Backend
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Backend
backend
apiNewLocalNode :: NT.Transport
-> RemoteTable
-> MVar BackendState
-> IO Node.LocalNode
apiNewLocalNode :: Transport -> RemoteTable -> MVar BackendState -> IO LocalNode
apiNewLocalNode Transport
transport RemoteTable
rtable MVar BackendState
backendState = do
LocalNode
localNode <- Transport -> RemoteTable -> IO LocalNode
Node.newLocalNode Transport
transport RemoteTable
rtable
MVar BackendState -> (BackendState -> IO BackendState) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar BackendState
backendState ((BackendState -> IO BackendState) -> IO ())
-> (BackendState -> IO BackendState) -> IO ()
forall a b. (a -> b) -> a -> b
$ BackendState -> IO BackendState
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (BackendState -> IO BackendState)
-> (BackendState -> BackendState)
-> BackendState
-> IO BackendState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Accessor BackendState [LocalNode]
localNodes Accessor BackendState [LocalNode]
-> ([LocalNode] -> [LocalNode]) -> BackendState -> BackendState
forall r a. T r a -> (a -> a) -> r -> r
^: (LocalNode
localNode LocalNode -> [LocalNode] -> [LocalNode]
forall a. a -> [a] -> [a]
:))
LocalNode -> IO LocalNode
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return LocalNode
localNode
apiFindPeers :: (PeerDiscoveryMsg -> IO ())
-> MVar BackendState
-> Int
-> IO [NodeId]
apiFindPeers :: (PeerDiscoveryMsg -> IO ())
-> MVar BackendState -> Int -> IO [NodeId]
apiFindPeers PeerDiscoveryMsg -> IO ()
sendfn MVar BackendState
backendState Int
delay = do
PeerDiscoveryMsg -> IO ()
sendfn PeerDiscoveryMsg
PeerDiscoveryRequest
Int -> IO ()
threadDelay Int
delay
Set NodeId -> [NodeId]
forall a. Set a -> [a]
Set.toList (Set NodeId -> [NodeId])
-> (BackendState -> Set NodeId) -> BackendState -> [NodeId]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (BackendState -> T BackendState (Set NodeId) -> Set NodeId
forall r a. r -> T r a -> a
^. T BackendState (Set NodeId)
peers) (BackendState -> [NodeId]) -> IO BackendState -> IO [NodeId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVar BackendState -> IO BackendState
forall a. MVar a -> IO a
readMVar MVar BackendState
backendState
data PeerDiscoveryMsg =
PeerDiscoveryRequest
| PeerDiscoveryReply NodeId
instance Binary PeerDiscoveryMsg where
put :: PeerDiscoveryMsg -> Put
put PeerDiscoveryMsg
PeerDiscoveryRequest = Word8 -> Put
putWord8 Word8
0
put (PeerDiscoveryReply NodeId
nid) = Word8 -> Put
putWord8 Word8
1 Put -> Put -> Put
forall a b. PutM a -> PutM b -> PutM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> NodeId -> Put
forall t. Binary t => t -> Put
put NodeId
nid
get :: Get PeerDiscoveryMsg
get = do
Word8
header <- Get Word8
getWord8
case Word8
header of
Word8
0 -> PeerDiscoveryMsg -> Get PeerDiscoveryMsg
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return PeerDiscoveryMsg
PeerDiscoveryRequest
Word8
1 -> NodeId -> PeerDiscoveryMsg
PeerDiscoveryReply (NodeId -> PeerDiscoveryMsg) -> Get NodeId -> Get PeerDiscoveryMsg
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get NodeId
forall t. Binary t => Get t
get
Word8
_ -> String -> Get PeerDiscoveryMsg
forall a. String -> Get a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"PeerDiscoveryMsg.get: invalid"
peerDiscoveryDaemon :: MVar BackendState
-> IO (PeerDiscoveryMsg, N.SockAddr)
-> (PeerDiscoveryMsg -> IO ())
-> IO ()
peerDiscoveryDaemon :: MVar BackendState
-> IO (PeerDiscoveryMsg, SockAddr)
-> (PeerDiscoveryMsg -> IO ())
-> IO ()
peerDiscoveryDaemon MVar BackendState
backendState IO (PeerDiscoveryMsg, SockAddr)
recv PeerDiscoveryMsg -> IO ()
sendfn = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever IO ()
go
where
go :: IO ()
go = do
(PeerDiscoveryMsg
msg, SockAddr
_) <- IO (PeerDiscoveryMsg, SockAddr)
recv
case PeerDiscoveryMsg
msg of
PeerDiscoveryMsg
PeerDiscoveryRequest -> do
[LocalNode]
nodes <- (BackendState -> Accessor BackendState [LocalNode] -> [LocalNode]
forall r a. r -> T r a -> a
^. Accessor BackendState [LocalNode]
localNodes) (BackendState -> [LocalNode]) -> IO BackendState -> IO [LocalNode]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVar BackendState -> IO BackendState
forall a. MVar a -> IO a
readMVar MVar BackendState
backendState
[LocalNode] -> (LocalNode -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [LocalNode]
nodes ((LocalNode -> IO ()) -> IO ()) -> (LocalNode -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ PeerDiscoveryMsg -> IO ()
sendfn (PeerDiscoveryMsg -> IO ())
-> (LocalNode -> PeerDiscoveryMsg) -> LocalNode -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeId -> PeerDiscoveryMsg
PeerDiscoveryReply (NodeId -> PeerDiscoveryMsg)
-> (LocalNode -> NodeId) -> LocalNode -> PeerDiscoveryMsg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalNode -> NodeId
Node.localNodeId
PeerDiscoveryReply NodeId
nid ->
MVar BackendState -> (BackendState -> IO BackendState) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar BackendState
backendState ((BackendState -> IO BackendState) -> IO ())
-> (BackendState -> IO BackendState) -> IO ()
forall a b. (a -> b) -> a -> b
$ BackendState -> IO BackendState
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (BackendState -> IO BackendState)
-> (BackendState -> BackendState)
-> BackendState
-> IO BackendState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (T BackendState (Set NodeId)
peers T BackendState (Set NodeId)
-> (Set NodeId -> Set NodeId) -> BackendState -> BackendState
forall r a. T r a -> (a -> a) -> r -> r
^: NodeId -> Set NodeId -> Set NodeId
forall a. Ord a => a -> Set a -> Set a
Set.insert NodeId
nid)
apiRedirectLogsHere :: Backend -> [ProcessId] -> Process ()
apiRedirectLogsHere :: Backend -> [ProcessId] -> Process ()
apiRedirectLogsHere Backend
_backend [ProcessId]
slavecontrollers = do
Maybe ProcessId
mLogger <- String -> Process (Maybe ProcessId)
whereis String
"logger"
ProcessId
myPid <- Process ProcessId
getSelfPid
Maybe ProcessId -> (ProcessId -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe ProcessId
mLogger ((ProcessId -> Process ()) -> Process ())
-> (ProcessId -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \ProcessId
logger -> do
Process [MonitorRef]
-> ([MonitorRef] -> Process [()])
-> ([MonitorRef] -> Process ())
-> Process ()
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket
((ProcessId -> Process MonitorRef)
-> [ProcessId] -> Process [MonitorRef]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM ProcessId -> Process MonitorRef
monitor [ProcessId]
slavecontrollers)
((MonitorRef -> Process ()) -> [MonitorRef] -> Process [()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM MonitorRef -> Process ()
unmonitor)
(([MonitorRef] -> Process ()) -> Process ())
-> ([MonitorRef] -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \[MonitorRef]
_ -> do
[ProcessId] -> (ProcessId -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ProcessId]
slavecontrollers ((ProcessId -> Process ()) -> Process ())
-> (ProcessId -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \ProcessId
pid -> ProcessId -> SlaveControllerMsg -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
pid (ProcessId -> ProcessId -> SlaveControllerMsg
RedirectLogsTo ProcessId
logger ProcessId
myPid)
Int -> Process () -> Process ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ ([ProcessId] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ProcessId]
slavecontrollers) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ do
[Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait
[ (RedirectLogsReply -> Process ()) -> Match ()
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(RedirectLogsReply {}) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
, (NodeMonitorNotification -> Process ()) -> Match ()
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(NodeMonitorNotification {}) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
]
data SlaveControllerMsg
= SlaveTerminate
| RedirectLogsTo ProcessId ProcessId
deriving (Typeable, Int -> SlaveControllerMsg -> ShowS
[SlaveControllerMsg] -> ShowS
SlaveControllerMsg -> String
(Int -> SlaveControllerMsg -> ShowS)
-> (SlaveControllerMsg -> String)
-> ([SlaveControllerMsg] -> ShowS)
-> Show SlaveControllerMsg
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SlaveControllerMsg -> ShowS
showsPrec :: Int -> SlaveControllerMsg -> ShowS
$cshow :: SlaveControllerMsg -> String
show :: SlaveControllerMsg -> String
$cshowList :: [SlaveControllerMsg] -> ShowS
showList :: [SlaveControllerMsg] -> ShowS
Show)
instance Binary SlaveControllerMsg where
put :: SlaveControllerMsg -> Put
put SlaveControllerMsg
SlaveTerminate = Word8 -> Put
putWord8 Word8
0
put (RedirectLogsTo ProcessId
a ProcessId
b) = do Word8 -> Put
putWord8 Word8
1; (ProcessId, ProcessId) -> Put
forall t. Binary t => t -> Put
put (ProcessId
a,ProcessId
b)
get :: Get SlaveControllerMsg
get = do
Word8
header <- Get Word8
getWord8
case Word8
header of
Word8
0 -> SlaveControllerMsg -> Get SlaveControllerMsg
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return SlaveControllerMsg
SlaveTerminate
Word8
1 -> do (ProcessId
a,ProcessId
b) <- Get (ProcessId, ProcessId)
forall t. Binary t => Get t
get; SlaveControllerMsg -> Get SlaveControllerMsg
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessId -> ProcessId -> SlaveControllerMsg
RedirectLogsTo ProcessId
a ProcessId
b)
Word8
_ -> String -> Get SlaveControllerMsg
forall a. String -> Get a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"SlaveControllerMsg.get: invalid"
data RedirectLogsReply
= RedirectLogsReply ProcessId Bool
deriving (Typeable, Int -> RedirectLogsReply -> ShowS
[RedirectLogsReply] -> ShowS
RedirectLogsReply -> String
(Int -> RedirectLogsReply -> ShowS)
-> (RedirectLogsReply -> String)
-> ([RedirectLogsReply] -> ShowS)
-> Show RedirectLogsReply
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RedirectLogsReply -> ShowS
showsPrec :: Int -> RedirectLogsReply -> ShowS
$cshow :: RedirectLogsReply -> String
show :: RedirectLogsReply -> String
$cshowList :: [RedirectLogsReply] -> ShowS
showList :: [RedirectLogsReply] -> ShowS
Show)
instance Binary RedirectLogsReply where
put :: RedirectLogsReply -> Put
put (RedirectLogsReply ProcessId
from Bool
ok) = (ProcessId, Bool) -> Put
forall t. Binary t => t -> Put
put (ProcessId
from,Bool
ok)
get :: Get RedirectLogsReply
get = do
(ProcessId
from,Bool
ok) <- Get (ProcessId, Bool)
forall t. Binary t => Get t
get
RedirectLogsReply -> Get RedirectLogsReply
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessId -> Bool -> RedirectLogsReply
RedirectLogsReply ProcessId
from Bool
ok)
startSlave :: Backend -> IO ()
startSlave :: Backend -> IO ()
startSlave Backend
backend = do
LocalNode
node <- Backend -> IO LocalNode
newLocalNode Backend
backend
LocalNode -> Process () -> IO ()
Node.runProcess LocalNode
node Process ()
slaveController
slaveController :: Process ()
slaveController :: Process ()
slaveController = do
ProcessId
pid <- Process ProcessId
getSelfPid
String -> ProcessId -> Process ()
register String
"slaveController" ProcessId
pid
Process ()
go
where
go :: Process ()
go = do
SlaveControllerMsg
msg <- Process SlaveControllerMsg
forall a. Serializable a => Process a
expect
case SlaveControllerMsg
msg of
SlaveControllerMsg
SlaveTerminate -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
RedirectLogsTo ProcessId
loggerPid ProcessId
from -> do
Either ProcessRegistrationException ()
r <- Process () -> Process (Either ProcessRegistrationException ())
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (String -> ProcessId -> Process ()
reregister String
"logger" ProcessId
loggerPid)
Bool
ok <- case (Either ProcessRegistrationException ()
r :: Either ProcessRegistrationException ()) of
Right ()
_ -> Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Left ProcessRegistrationException
_ -> do
Either ProcessRegistrationException ()
s <- Process () -> Process (Either ProcessRegistrationException ())
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (String -> ProcessId -> Process ()
register String
"logger" ProcessId
loggerPid)
case (Either ProcessRegistrationException ()
s :: Either ProcessRegistrationException ()) of
Right ()
_ -> Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Left ProcessRegistrationException
_ -> Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
ProcessId
pid <- Process ProcessId
getSelfPid
ProcessId -> RedirectLogsReply -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
from (ProcessId -> Bool -> RedirectLogsReply
RedirectLogsReply ProcessId
pid Bool
ok)
Process ()
go
terminateSlave :: NodeId -> Process ()
terminateSlave :: NodeId -> Process ()
terminateSlave NodeId
nid = NodeId -> String -> SlaveControllerMsg -> Process ()
forall a. Serializable a => NodeId -> String -> a -> Process ()
nsendRemote NodeId
nid String
"slaveController" SlaveControllerMsg
SlaveTerminate
findSlaves :: Backend -> Process [ProcessId]
findSlaves :: Backend -> Process [ProcessId]
findSlaves Backend
backend = do
[NodeId]
nodes <- IO [NodeId] -> Process [NodeId]
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [NodeId] -> Process [NodeId])
-> IO [NodeId] -> Process [NodeId]
forall a b. (a -> b) -> a -> b
$ Backend -> Int -> IO [NodeId]
findPeers Backend
backend Int
1000000
Process [MonitorRef]
-> ([MonitorRef] -> Process [()])
-> ([MonitorRef] -> Process [ProcessId])
-> Process [ProcessId]
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket
((NodeId -> Process MonitorRef) -> [NodeId] -> Process [MonitorRef]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM NodeId -> Process MonitorRef
monitorNode [NodeId]
nodes)
((MonitorRef -> Process ()) -> [MonitorRef] -> Process [()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM MonitorRef -> Process ()
unmonitor)
(([MonitorRef] -> Process [ProcessId]) -> Process [ProcessId])
-> ([MonitorRef] -> Process [ProcessId]) -> Process [ProcessId]
forall a b. (a -> b) -> a -> b
$ \[MonitorRef]
_ -> do
[NodeId] -> (NodeId -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [NodeId]
nodes ((NodeId -> Process ()) -> Process ())
-> (NodeId -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \NodeId
nid -> NodeId -> String -> Process ()
whereisRemoteAsync NodeId
nid String
"slaveController"
[Maybe ProcessId] -> [ProcessId]
forall a. [Maybe a] -> [a]
catMaybes ([Maybe ProcessId] -> [ProcessId])
-> Process [Maybe ProcessId] -> Process [ProcessId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> Process (Maybe ProcessId) -> Process [Maybe ProcessId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM ([NodeId] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [NodeId]
nodes) (
[Match (Maybe ProcessId)] -> Process (Maybe ProcessId)
forall b. [Match b] -> Process b
receiveWait
[ (WhereIsReply -> Process (Maybe ProcessId))
-> Match (Maybe ProcessId)
forall a b. Serializable a => (a -> Process b) -> Match b
match WhereIsReply -> Process (Maybe ProcessId)
handleWhereIsReply
, (NodeMonitorNotification -> Process (Maybe ProcessId))
-> Match (Maybe ProcessId)
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(NodeMonitorNotification {}) -> Maybe ProcessId -> Process (Maybe ProcessId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ProcessId
forall a. Maybe a
Nothing)
])
where
handleWhereIsReply :: WhereIsReply -> Process (Maybe ProcessId)
handleWhereIsReply :: WhereIsReply -> Process (Maybe ProcessId)
handleWhereIsReply (WhereIsReply String
name Maybe ProcessId
mPid)
| String
name String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
"slaveController" = Maybe ProcessId -> Process (Maybe ProcessId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ProcessId
mPid
| Bool
otherwise = Maybe ProcessId -> Process (Maybe ProcessId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ProcessId
forall a. Maybe a
Nothing
terminateAllSlaves :: Backend -> Process ()
terminateAllSlaves :: Backend -> Process ()
terminateAllSlaves Backend
backend = do
[ProcessId]
slaves <- Backend -> Process [ProcessId]
findSlaves Backend
backend
[ProcessId] -> (ProcessId -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ProcessId]
slaves ((ProcessId -> Process ()) -> Process ())
-> (ProcessId -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \ProcessId
pid -> ProcessId -> SlaveControllerMsg -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
pid SlaveControllerMsg
SlaveTerminate
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
1000000
startMaster :: Backend -> ([NodeId] -> Process ()) -> IO ()
startMaster :: Backend -> ([NodeId] -> Process ()) -> IO ()
startMaster Backend
backend [NodeId] -> Process ()
proc = do
LocalNode
node <- Backend -> IO LocalNode
newLocalNode Backend
backend
LocalNode -> Process () -> IO ()
Node.runProcess LocalNode
node (Process () -> IO ()) -> Process () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
[ProcessId]
slaves <- Backend -> Process [ProcessId]
findSlaves Backend
backend
Backend -> [ProcessId] -> Process ()
redirectLogsHere Backend
backend [ProcessId]
slaves
[NodeId] -> Process ()
proc ((ProcessId -> NodeId) -> [ProcessId] -> [NodeId]
forall a b. (a -> b) -> [a] -> [b]
map ProcessId -> NodeId
processNodeId [ProcessId]
slaves) Process () -> Process () -> Process ()
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
`finally` Process ()
shutdownLogger
shutdownLogger :: Process ()
shutdownLogger :: Process ()
shutdownLogger = do
(SendPort ()
sport,ReceivePort ()
rport) <- Process (SendPort (), ReceivePort ())
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
String -> SendPort () -> Process ()
forall a. Serializable a => String -> a -> Process ()
nsend String
"logger" (SendPort ()
sport :: SendPort ())
ReceivePort () -> Process ()
forall a. Serializable a => ReceivePort a -> Process a
receiveChan ReceivePort ()
rport
localNodes :: Accessor BackendState [Node.LocalNode]
localNodes :: Accessor BackendState [LocalNode]
localNodes = (BackendState -> [LocalNode])
-> ([LocalNode] -> BackendState -> BackendState)
-> Accessor BackendState [LocalNode]
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor BackendState -> [LocalNode]
_localNodes (\[LocalNode]
ns BackendState
st -> BackendState
st { _localNodes = ns })
peers :: Accessor BackendState (Set NodeId)
peers :: T BackendState (Set NodeId)
peers = (BackendState -> Set NodeId)
-> (Set NodeId -> BackendState -> BackendState)
-> T BackendState (Set NodeId)
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor BackendState -> Set NodeId
_peers (\Set NodeId
ps BackendState
st -> BackendState
st { _peers = ps })