module Control.Distributed.Process.Execution.Exchange.Internal
( Exchange(..)
, Message(..)
, ExchangeType(..)
, startExchange
, startSupervised
, startSupervisedRef
, runExchange
, post
, postMessage
, configureExchange
, createMessage
, applyHandlers
) where
import Control.Concurrent.MVar (MVar, takeMVar, putMVar, newEmptyMVar)
import Control.DeepSeq (NFData)
import Control.Distributed.Process
( Process
, ProcessMonitorNotification(..)
, ProcessId
, liftIO
, spawnLocal
, unsafeWrapMessage
)
import qualified Control.Distributed.Process as P (Message, link)
import Control.Distributed.Process.Serializable hiding (SerializableDict)
import Control.Distributed.Process.Extras.Internal.Types
( Resolvable(..)
)
import Control.Distributed.Process.Extras.Internal.Primitives
( Linkable(..)
)
import Control.Distributed.Process.ManagedProcess
( channelControlPort
, handleControlChan
, handleInfo
, handleRaw
, continue
, defaultProcess
, InitHandler
, InitResult(..)
, ProcessAction
, ProcessDefinition(..)
, ControlChannel
, ControlPort
)
import qualified Control.Distributed.Process.ManagedProcess as MP
( chanServe
)
import Control.Distributed.Process.ManagedProcess.UnsafeClient
( sendControlMessage
)
import Control.Distributed.Process.Supervisor (SupervisorPid)
import Control.Distributed.Process.Extras.Time (Delay(Infinity))
import Data.Binary
import Data.Typeable (Typeable)
import GHC.Generics
import Prelude hiding (drop)
data Exchange = Exchange { pid :: !ProcessId
, cchan :: !(ControlPort ControlMessage)
, xType :: !String
} deriving (Typeable, Generic, Eq)
instance Binary Exchange where
instance Show Exchange where
show Exchange{..} = (xType ++ ":" ++ (show pid))
instance Resolvable Exchange where
resolve = return . Just . pid
instance Linkable Exchange where
linkTo = P.link . pid
sendCtrlMsg :: Exchange -> ControlMessage -> Process ()
sendCtrlMsg Exchange{..} = sendControlMessage cchan
data Message =
Message { key :: !String
, headers :: ![(String, String)]
, payload :: !P.Message
} deriving (Typeable, Generic, Show)
instance Binary Message where
instance NFData Message where
data ControlMessage =
Configure !P.Message
| Post !Message
deriving (Typeable, Generic)
instance Binary ControlMessage where
instance NFData ControlMessage where
data ExchangeType s =
ExchangeType { name :: String
, state :: s
, configureEx :: s -> P.Message -> Process s
, routeEx :: s -> Message -> Process s
}
startExchange :: forall s. ExchangeType s -> Process Exchange
startExchange = doStart Nothing
startSupervisedRef :: forall s . ExchangeType s
-> SupervisorPid
-> Process (ProcessId, P.Message)
startSupervisedRef t s = do
ex <- startSupervised t s
return (pid ex, unsafeWrapMessage ex)
startSupervised :: forall s . ExchangeType s
-> SupervisorPid
-> Process Exchange
startSupervised t s = doStart (Just s) t
doStart :: Maybe SupervisorPid -> ExchangeType s -> Process Exchange
doStart mSp t = do
cchan <- liftIO $ newEmptyMVar
spawnLocal (maybeLink mSp >> runExchange t cchan) >>= \pid -> do
cc <- liftIO $ takeMVar cchan
return $ Exchange pid cc (name t)
where
maybeLink Nothing = return ()
maybeLink (Just p') = P.link p'
runExchange :: forall s.
ExchangeType s
-> MVar (ControlPort ControlMessage)
-> Process ()
runExchange t tc = MP.chanServe t exInit (processDefinition t tc)
exInit :: forall s. InitHandler (ExchangeType s) (ExchangeType s)
exInit t = return $ InitOk t Infinity
post :: Serializable a => Exchange -> a -> Process ()
post ex msg = postMessage ex $ Message "" [] (unsafeWrapMessage msg)
postMessage :: Exchange -> Message -> Process ()
postMessage ex msg = msg `seq` sendCtrlMsg ex $ Post msg
configureExchange :: Serializable m => Exchange -> m -> Process ()
configureExchange e m = sendCtrlMsg e $ Configure (unsafeWrapMessage m)
createMessage :: Serializable m => String -> [(String, String)] -> m -> Message
createMessage k h m = Message k h $ unsafeWrapMessage m
applyHandlers :: a
-> P.Message
-> [P.Message -> Process (Maybe a)]
-> Process a
applyHandlers e _ [] = return e
applyHandlers e m (f:fs) = do
r <- f m
case r of
Nothing -> applyHandlers e m fs
Just r' -> return r'
processDefinition :: forall s.
ExchangeType s
-> MVar (ControlPort ControlMessage)
-> ControlChannel ControlMessage
-> Process (ProcessDefinition (ExchangeType s))
processDefinition _ tc cc = do
liftIO $ putMVar tc $ channelControlPort cc
return $
defaultProcess {
apiHandlers = [ handleControlChan cc handleControlMessage ]
, infoHandlers = [ handleInfo handleMonitor
, handleRaw convertToCC
]
} :: Process (ProcessDefinition (ExchangeType s))
handleMonitor :: forall s.
ExchangeType s
-> ProcessMonitorNotification
-> Process (ProcessAction (ExchangeType s))
handleMonitor ex m = do
handleControlMessage ex (Configure (unsafeWrapMessage m))
convertToCC :: forall s.
ExchangeType s
-> P.Message
-> Process (ProcessAction (ExchangeType s))
convertToCC ex msg = do
liftIO $ putStrLn "convert to cc"
handleControlMessage ex (Post $ Message "" [] msg)
handleControlMessage :: forall s.
ExchangeType s
-> ControlMessage
-> Process (ProcessAction (ExchangeType s))
handleControlMessage ex@ExchangeType{..} cm =
let action = case cm of
Configure msg -> configureEx state msg
Post msg -> routeEx state msg
in action >>= \s -> continue $ ex { state = s }