-- | Simple backend based on the TCP transport which offers node discovery
-- based on UDP multicast. This is a zero-configuration backend designed to
-- get you going with Cloud Haskell quickly without imposing any structure
-- on your application.
--
-- To simplify getting started we provide special support for /master/ and
-- /slave/ nodes (see 'startSlave' and 'startMaster'). Use of these functions
-- is completely optional; you can use the local backend without making use
-- of the predefined master and slave nodes.
--
-- [Minimal example]
--
-- > import System.Environment (getArgs)
-- > import Control.Distributed.Process
-- > import Control.Distributed.Process.Node (initRemoteTable)
-- > import Control.Distributed.Process.Backend.SimpleLocalnet
-- >
-- > master :: Backend -> [NodeId] -> Process ()
-- > master backend slaves = do
-- >   -- Do something interesting with the slaves
-- >   liftIO . putStrLn $ "Slaves: " ++ show slaves
-- >   -- Terminate the slaves when the master terminates (this is optional)
-- >   terminateAllSlaves backend
-- >
-- > main :: IO ()
-- > main = do
-- >   args <- getArgs
-- >
-- >   case args of
-- >     ["master", host, port] -> do
-- >       backend <- initializeBackend host port initRemoteTable
-- >       startMaster backend (master backend)
-- >     ["slave", host, port] -> do
-- >       backend <- initializeBackend host port initRemoteTable
-- >       startSlave backend
--
-- [Compiling and Running]
--
-- Save to @example.hs@ and compile using
--
-- > ghc -threaded example.hs
--
-- Fire up some slave nodes (for the example, we run them on a single machine):
--
-- > ./example slave localhost 8080 &
-- > ./example slave localhost 8081 &
-- > ./example slave localhost 8082 &
-- > ./example slave localhost 8083 &
--
-- And start the master node:
--
-- > ./example master localhost 8084
--
-- which should then output:
--
-- > Slaves: [nid://localhost:8083:0,nid://localhost:8082:0,nid://localhost:8081:0,nid://localhost:8080:0]
--
-- at which point the slaves should exit.
--
-- To run the example on multiple machines, you could run
--
-- > ./example slave 198.51.100.1 8080 &
-- > ./example slave 198.51.100.2 8080 &
-- > ./example slave 198.51.100.3 8080 &
-- > ./example slave 198.51.100.4 8080 &
--
-- on four different machines (with IP addresses 198.51.100.1..4), and run the
-- master on a fifth node (or on any of the four machines that run the slave
-- nodes).
--
-- It is important that every node has a unique (hostname, port number) pair,
-- and that the hostname you use to initialize the node can be resolved by
-- peer nodes. In other words, if you start a node and pass hostname @localhost@
-- then peer nodes won't be able to reach it because @localhost@ will resolve
-- to a different IP address for them.
--
-- [Troubleshooting]
--
-- If you try the above example and the master process cannot find any slaves,
-- then it might be that your firewall settings do not allow for UDP multicast
-- (in particular, the default iptables on some Linux distributions might not
-- allow it).
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Control.Distributed.Process.Backend.SimpleLocalnet
  ( -- * Initialization
    Backend(..)
  , initializeBackend
    -- * Slave nodes
  , startSlave
  , terminateSlave
  , findSlaves
  , terminateAllSlaves
    -- * Master nodes
  , startMaster
  ) where

import System.IO (fixIO)
import Data.Maybe (catMaybes)
import Data.Binary (Binary(get, put), getWord8, putWord8)
import Data.Accessor (Accessor, accessor, (^:), (^.))
import Data.Set (Set)
import qualified Data.Set as Set (insert, empty, toList)
import Data.Foldable (forM_)
import Data.Typeable (Typeable)
import Control.Applicative ((<$>))
import Control.Exception (throw)
import Control.Monad (forever, replicateM, replicateM_)
import Control.Monad.IO.Class (liftIO)
import Control.Concurrent (forkIO, threadDelay, ThreadId)
import Control.Concurrent.MVar (MVar, newMVar, readMVar, modifyMVar_)
import Control.Distributed.Process
  ( RemoteTable
  , NodeId
  , Process
  , ProcessId
  , WhereIsReply(..)
  , whereis
  , whereisRemoteAsync
  , getSelfPid
  , register
  , reregister
  , expect
  , nsendRemote
  , receiveWait
  , match
  , processNodeId
  , monitorNode
  , monitor
  , unmonitor
  , NodeMonitorNotification(..)
  , ProcessRegistrationException
  , finally
  , newChan
  , receiveChan
  , nsend
  , SendPort
  , bracket
  , try
  , send
  )
import qualified Control.Distributed.Process.Node as Node
  ( LocalNode
  , newLocalNode
  , localNodeId
  , runProcess
  )
import qualified Network.Transport.TCP as NT
  ( createTransport
  , defaultTCPParameters
  )
import qualified Network.Transport as NT (Transport)
import qualified Network.Socket as N (HostName, ServiceName, SockAddr)
import Control.Distributed.Process.Backend.SimpleLocalnet.Internal.Multicast (initMulticast)

-- | Local backend
data Backend = Backend {
    -- | Create a new local node
    newLocalNode :: IO Node.LocalNode
    -- | @findPeers t@ broadcasts a /who's there?/ message on the local
    -- network, waits 't' microseconds, and then collects and returns the answers.
    -- You can use this to dynamically discover peer nodes.
  , findPeers :: Int -> IO [NodeId]
    -- | Make sure that all log messages are printed by the logger on the
    -- current node
  , redirectLogsHere :: [ProcessId] -> Process ()
  }

data BackendState = BackendState {
   _localNodes      :: [Node.LocalNode]
 , _peers           :: Set NodeId
 ,  discoveryDaemon :: ThreadId
 }

-- | Initialize the backend
initializeBackend :: N.HostName -> N.ServiceName -> RemoteTable -> IO Backend
initializeBackend host port rtable = do
  mTransport   <- NT.createTransport host port NT.defaultTCPParameters
  (recv, sendp) <- initMulticast  "224.0.0.99" 9999 1024
  (_, backendState) <- fixIO $ \ ~(tid, _) -> do
    backendState <- newMVar BackendState
                      { _localNodes      = []
                      , _peers           = Set.empty
                      ,  discoveryDaemon = tid
                      }
    tid' <- forkIO $ peerDiscoveryDaemon backendState recv sendp
    return (tid', backendState)
  case mTransport of
    Left err -> throw err
    Right transport ->
      let backend = Backend {
          newLocalNode       = apiNewLocalNode transport rtable backendState
        , findPeers          = apiFindPeers sendp backendState
        , redirectLogsHere   = apiRedirectLogsHere backend
        }
      in return backend

-- | Create a new local node
apiNewLocalNode :: NT.Transport
                -> RemoteTable
                -> MVar BackendState
                -> IO Node.LocalNode
apiNewLocalNode transport rtable backendState = do
  localNode <- Node.newLocalNode transport rtable
  modifyMVar_ backendState $ return . (localNodes ^: (localNode :))
  return localNode

-- | Peer discovery
apiFindPeers :: (PeerDiscoveryMsg -> IO ())
             -> MVar BackendState
             -> Int
             -> IO [NodeId]
apiFindPeers sendfn backendState delay = do
  sendfn PeerDiscoveryRequest
  threadDelay delay
  Set.toList . (^. peers) <$> readMVar backendState

data PeerDiscoveryMsg =
    PeerDiscoveryRequest
  | PeerDiscoveryReply NodeId

instance Binary PeerDiscoveryMsg where
  put PeerDiscoveryRequest     = putWord8 0
  put (PeerDiscoveryReply nid) = putWord8 1 >> put nid
  get = do
    header <- getWord8
    case header of
      0 -> return PeerDiscoveryRequest
      1 -> PeerDiscoveryReply <$> get
      _ -> fail "PeerDiscoveryMsg.get: invalid"

-- | Respond to peer discovery requests sent by other nodes
peerDiscoveryDaemon :: MVar BackendState
                    -> IO (PeerDiscoveryMsg, N.SockAddr)
                    -> (PeerDiscoveryMsg -> IO ())
                    -> IO ()
peerDiscoveryDaemon backendState recv sendfn = forever go
  where
    go = do
      (msg, _) <- recv
      case msg of
        PeerDiscoveryRequest -> do
          nodes <- (^. localNodes) <$> readMVar backendState
          forM_ nodes $ sendfn . PeerDiscoveryReply . Node.localNodeId
        PeerDiscoveryReply nid ->
          modifyMVar_ backendState $ return . (peers ^: Set.insert nid)

--------------------------------------------------------------------------------
-- Back-end specific primitives                                               --
--------------------------------------------------------------------------------

-- | Make sure that all log messages are printed by the logger on this node
apiRedirectLogsHere :: Backend -> [ProcessId] -> Process ()
apiRedirectLogsHere _backend slavecontrollers = do
  mLogger <- whereis "logger"
  myPid <- getSelfPid

  forM_ mLogger $ \logger -> do

  bracket
   (mapM monitor slavecontrollers)
   (mapM unmonitor)
   $ \_ -> do

   -- fire off redirect requests
   forM_ slavecontrollers $ \pid -> send pid (RedirectLogsTo logger myPid)

   -- Wait for the replies
   replicateM_ (length slavecontrollers) $ do
     receiveWait
       [ match (\(RedirectLogsReply {}) -> return ())
       , match (\(NodeMonitorNotification {}) -> return ())
       ]

--------------------------------------------------------------------------------
-- Slaves                                                                     --
--------------------------------------------------------------------------------

-- | Messages to slave nodes
--
-- This datatype is not exposed; instead, we expose primitives for dealing
-- with slaves.
data SlaveControllerMsg
   = SlaveTerminate
   | RedirectLogsTo ProcessId ProcessId
  deriving (Typeable, Show)

instance Binary SlaveControllerMsg where
  put SlaveTerminate = putWord8 0
  put (RedirectLogsTo a b) = do putWord8 1; put (a,b)
  get = do
    header <- getWord8
    case header of
      0 -> return SlaveTerminate
      1 -> do (a,b) <- get; return (RedirectLogsTo a b)
      _ -> fail "SlaveControllerMsg.get: invalid"

data RedirectLogsReply
  = RedirectLogsReply ProcessId Bool
  deriving (Typeable, Show)

instance Binary RedirectLogsReply where
  put (RedirectLogsReply from ok) = put (from,ok)
  get = do
    (from,ok) <- get
    return (RedirectLogsReply from ok)

-- | Calling 'slave' sets up a new local node and then waits. You start
-- processes on the slave by calling 'spawn' from other nodes.
--
-- This function does not return. The only way to exit the slave is to CTRL-C
-- the process or call terminateSlave from another node.
startSlave :: Backend -> IO ()
startSlave backend = do
  node <- newLocalNode backend
  Node.runProcess node slaveController

-- | The slave controller interprets 'SlaveControllerMsg's
slaveController :: Process ()
slaveController = do
    pid <- getSelfPid
    register "slaveController" pid
    go
  where
    go = do
      msg <- expect
      case msg of
        SlaveTerminate -> return ()
        RedirectLogsTo loggerPid from -> do
          r <- try (reregister "logger" loggerPid)
          ok <- case (r :: Either ProcessRegistrationException ()) of
                  Right _ -> return True
                  Left _  -> do
                    s <- try (register "logger" loggerPid)
                    case (s :: Either ProcessRegistrationException ()) of
                      Right _ -> return True
                      Left _  -> return False
          pid <- getSelfPid
          send from (RedirectLogsReply pid ok)
          go

-- | Terminate the slave at the given node ID
terminateSlave :: NodeId -> Process ()
terminateSlave nid = nsendRemote nid "slaveController" SlaveTerminate

-- | Find slave nodes
findSlaves :: Backend -> Process [ProcessId]
findSlaves backend = do
  nodes <- liftIO $ findPeers backend 1000000
  -- Fire off asynchronous requests for the slave controller

  bracket
   (mapM monitorNode nodes)
   (mapM unmonitor)
   $ \_ -> do

   -- fire off whereis requests
   forM_ nodes $ \nid -> whereisRemoteAsync nid "slaveController"

   -- Wait for the replies
   catMaybes <$> replicateM (length nodes) (
     receiveWait
       [ match (\(WhereIsReply "slaveController" mPid) -> return mPid)
       , match (\(NodeMonitorNotification {}) -> return Nothing)
       ])

-- | Terminate all slaves
terminateAllSlaves :: Backend -> Process ()
terminateAllSlaves backend = do
  slaves <- findSlaves backend
  forM_ slaves $ \pid -> send pid SlaveTerminate
  liftIO $ threadDelay 1000000

--------------------------------------------------------------------------------
-- Master nodes
--------------------------------------------------------------------------------

-- | 'startMaster' finds all slaves /currently/ available on the local network,
-- redirects all log messages to itself, and then calls the specified process,
-- passing the list of slaves nodes.
--
-- Terminates when the specified process terminates. If you want to terminate
-- the slaves when the master terminates, you should manually call
-- 'terminateAllSlaves'.
--
-- If you start more slave nodes after having started the master node, you can
-- discover them with later calls to 'findSlaves', but be aware that you will
-- need to call 'redirectLogHere' to redirect their logs to the master node.
--
-- Note that you can use functionality of "SimpleLocalnet" directly (through
-- 'Backend'), instead of using 'startMaster'/'startSlave', if the master/slave
-- distinction does not suit your application.
startMaster :: Backend -> ([NodeId] -> Process ()) -> IO ()
startMaster backend proc = do
  node <- newLocalNode backend
  Node.runProcess node $ do
    slaves <- findSlaves backend
    redirectLogsHere backend slaves
    proc (map processNodeId slaves) `finally` shutdownLogger

--
-- | shut down the logger process. This ensures that any pending
-- messages are flushed before the process exits.
--
shutdownLogger :: Process ()
shutdownLogger = do
  (sport,rport) <- newChan
  nsend "logger" (sport :: SendPort ())
  receiveChan rport
  -- TODO: we should monitor the logger process so we don't deadlock if
  -- it has already died.

--------------------------------------------------------------------------------
-- Accessors                                                                  --
--------------------------------------------------------------------------------

localNodes :: Accessor BackendState [Node.LocalNode]
localNodes = accessor _localNodes (\ns st -> st { _localNodes = ns })

peers :: Accessor BackendState (Set NodeId)
peers = accessor _peers (\ps st -> st { _peers = ps })