module Control.Distributed.Process.ManagedProcess.Internal.Types
(
InitResult(..)
, Condition(..)
, ProcessAction(..)
, ProcessReply(..)
, CallHandler
, CastHandler
, DeferredCallHandler
, StatelessCallHandler
, InfoHandler
, ChannelHandler
, StatelessChannelHandler
, InitHandler
, ShutdownHandler
, TimeoutHandler
, UnhandledMessagePolicy(..)
, ProcessDefinition(..)
, Priority(..)
, DispatchPriority(..)
, PrioritisedProcessDefinition(..)
, RecvTimeoutPolicy(..)
, ControlChannel(..)
, newControlChan
, ControlPort(..)
, channelControlPort
, Dispatcher(..)
, DeferredDispatcher(..)
, ExitSignalDispatcher(..)
, MessageMatcher(..)
, DynMessageHandler(..)
, Message(..)
, CallResponse(..)
, CallId
, CallRef(..)
, makeRef
, initCall
, unsafeInitCall
, waitResponse
) where
import Control.Distributed.Process hiding (Message)
import qualified Control.Distributed.Process as P (Message)
import Control.Distributed.Process.Serializable
import Control.Distributed.Process.Extras
( Recipient(..)
, ExitReason(..)
, Addressable
, Resolvable(..)
, Routable(..)
, NFSerializable
)
import Control.Distributed.Process.Extras.Internal.Types
( resolveOrDie
)
import Control.Distributed.Process.Extras.Time
import Control.DeepSeq (NFData(..))
import Data.Binary hiding (decode)
import Data.Typeable (Typeable)
import Prelude hiding (init)
import GHC.Generics
type CallId = MonitorRef
newtype CallRef a = CallRef { unCaller :: (Recipient, CallId) }
deriving (Eq, Show, Typeable, Generic)
instance Serializable a => Binary (CallRef a) where
instance NFData a => NFData (CallRef a) where rnf (CallRef x) = rnf x `seq` ()
makeRef :: forall a . (Serializable a) => Recipient -> CallId -> CallRef a
makeRef r c = CallRef (r, c)
instance Resolvable (CallRef a) where
resolve (CallRef (r, _)) = resolve r
instance Routable (CallRef a) where
sendTo (CallRef (client, tag)) msg = sendTo client (CallResponse msg tag)
unsafeSendTo (CallRef (c, tag)) msg = unsafeSendTo c (CallResponse msg tag)
data Message a b =
CastMessage a
| CallMessage a (CallRef b)
| ChanMessage a (SendPort b)
deriving (Typeable, Generic)
instance (Serializable a, Serializable b) => Binary (Message a b) where
instance (NFSerializable a, NFSerializable b) => NFData (Message a b) where
rnf (CastMessage a) = rnf a `seq` ()
rnf (CallMessage a b) = rnf a `seq` rnf b `seq` ()
rnf (ChanMessage a b) = rnf a `seq` rnf b `seq` ()
deriving instance (Eq a, Eq b) => Eq (Message a b)
deriving instance (Show a, Show b) => Show (Message a b)
data CallResponse a = CallResponse a CallId
deriving (Typeable, Generic)
instance Serializable a => Binary (CallResponse a)
instance NFSerializable a => NFData (CallResponse a) where
rnf (CallResponse a c) = rnf a `seq` rnf c `seq` ()
deriving instance Eq a => Eq (CallResponse a)
deriving instance Show a => Show (CallResponse a)
data InitResult s =
InitOk s Delay
| InitStop String
| InitIgnore
deriving (Typeable)
data ProcessAction s =
ProcessContinue s
| ProcessTimeout Delay s
| ProcessHibernate TimeInterval s
| ProcessStop ExitReason
| ProcessStopping s ExitReason
data ProcessReply r s =
ProcessReply r (ProcessAction s)
| NoReply (ProcessAction s)
data Condition s m =
Condition (s -> m -> Bool)
| State (s -> Bool)
| Input (m -> Bool)
type CallHandler s a b = s -> a -> Process (ProcessReply b s)
type DeferredCallHandler s a b = s -> CallRef b -> a -> Process (ProcessReply b s)
type StatelessCallHandler a b = a -> CallRef b -> Process (ProcessReply b ())
type CastHandler s a = s -> a -> Process (ProcessAction s)
type InfoHandler s a = s -> a -> Process (ProcessAction s)
type ChannelHandler s a b = s -> SendPort b -> a -> Process (ProcessAction s)
type StatelessChannelHandler a b = SendPort b -> a -> Process (ProcessAction ())
type InitHandler a s = a -> Process (InitResult s)
type ShutdownHandler s = s -> ExitReason -> Process ()
type TimeoutHandler s = s -> Delay -> Process (ProcessAction s)
newtype ControlChannel m =
ControlChannel {
unControl :: (SendPort (Message m ()), ReceivePort (Message m ()))
}
newControlChan :: (Serializable m) => Process (ControlChannel m)
newControlChan = newChan >>= return . ControlChannel
newtype ControlPort m =
ControlPort {
unPort :: SendPort (Message m ())
} deriving (Show)
deriving instance (Serializable m) => Binary (ControlPort m)
instance Eq (ControlPort m) where
a == b = unPort a == unPort b
channelControlPort :: (Serializable m)
=> ControlChannel m
-> ControlPort m
channelControlPort cc = ControlPort $ fst $ unControl cc
data Dispatcher s =
forall a b . (Serializable a, Serializable b) =>
Dispatch
{
dispatch :: s -> Message a b -> Process (ProcessAction s)
}
| forall a b . (Serializable a, Serializable b) =>
DispatchIf
{
dispatch :: s -> Message a b -> Process (ProcessAction s)
, dispatchIf :: s -> Message a b -> Bool
}
| forall a b . (Serializable a, Serializable b) =>
DispatchCC
{
channel :: ReceivePort (Message a b)
, dispatch :: s -> Message a b -> Process (ProcessAction s)
}
data DeferredDispatcher s =
DeferredDispatcher
{
dispatchInfo :: s
-> P.Message
-> Process (Maybe (ProcessAction s))
}
data ExitSignalDispatcher s =
ExitSignalDispatcher
{
dispatchExit :: s
-> ProcessId
-> P.Message
-> Process (Maybe (ProcessAction s))
}
class MessageMatcher d where
matchDispatch :: UnhandledMessagePolicy -> s -> d s -> Match (ProcessAction s)
instance MessageMatcher Dispatcher where
matchDispatch _ s (Dispatch d) = match (d s)
matchDispatch _ s (DispatchIf d cond) = matchIf (cond s) (d s)
matchDispatch _ s (DispatchCC c d) = matchChan c (d s)
class DynMessageHandler d where
dynHandleMessage :: UnhandledMessagePolicy
-> s
-> d s
-> P.Message
-> Process (Maybe (ProcessAction s))
instance DynMessageHandler Dispatcher where
dynHandleMessage _ s (Dispatch d) msg = handleMessage msg (d s)
dynHandleMessage _ s (DispatchIf d c) msg = handleMessageIf msg (c s) (d s)
dynHandleMessage _ _ (DispatchCC _ _) _ = error "ThisCanNeverHappen"
instance DynMessageHandler DeferredDispatcher where
dynHandleMessage _ s (DeferredDispatcher d) = d s
newtype Priority a = Priority { getPrio :: Int }
data DispatchPriority s =
PrioritiseCall
{
prioritise :: s -> P.Message -> Process (Maybe (Int, P.Message))
}
| PrioritiseCast
{
prioritise :: s -> P.Message -> Process (Maybe (Int, P.Message))
}
| PrioritiseInfo
{
prioritise :: s -> P.Message -> Process (Maybe (Int, P.Message))
}
data RecvTimeoutPolicy = RecvCounter Int | RecvTimer TimeInterval
deriving (Typeable)
data PrioritisedProcessDefinition s =
PrioritisedProcessDefinition
{
processDef :: ProcessDefinition s
, priorities :: [DispatchPriority s]
, recvTimeout :: RecvTimeoutPolicy
}
data UnhandledMessagePolicy =
Terminate
| DeadLetter ProcessId
| Log
| Drop
data ProcessDefinition s = ProcessDefinition {
apiHandlers :: [Dispatcher s]
, infoHandlers :: [DeferredDispatcher s]
, exitHandlers :: [ExitSignalDispatcher s]
, timeoutHandler :: TimeoutHandler s
, shutdownHandler :: ShutdownHandler s
, unhandledMessagePolicy :: UnhandledMessagePolicy
}
initCall :: forall s a b . (Addressable s, Serializable a, Serializable b)
=> s -> a -> Process (CallRef b)
initCall sid msg = do
pid <- resolveOrDie sid "initCall: unresolveable address "
mRef <- monitor pid
self <- getSelfPid
let cRef = makeRef (Pid self) mRef in do
sendTo pid (CallMessage msg cRef :: Message a b)
return cRef
unsafeInitCall :: forall s a b . (Addressable s,
NFSerializable a, NFSerializable b)
=> s -> a -> Process (CallRef b)
unsafeInitCall sid msg = do
pid <- resolveOrDie sid "unsafeInitCall: unresolveable address "
mRef <- monitor pid
self <- getSelfPid
let cRef = makeRef (Pid self) mRef in do
unsafeSendTo pid (CallMessage msg cRef :: Message a b)
return cRef
waitResponse :: forall b. (Serializable b)
=> Maybe TimeInterval
-> CallRef b
-> Process (Maybe (Either ExitReason b))
waitResponse mTimeout cRef =
let (_, mRef) = unCaller cRef
matchers = [ matchIf (\((CallResponse _ ref) :: CallResponse b) -> ref == mRef)
(\((CallResponse m _) :: CallResponse b) -> return (Right m))
, matchIf (\(ProcessMonitorNotification ref _ _) -> ref == mRef)
(\(ProcessMonitorNotification _ _ r) -> return (Left (err r)))
]
err r = ExitOther $ show r in
case mTimeout of
(Just ti) -> finally (receiveTimeout (asTimeout ti) matchers) (unmonitor mRef)
Nothing -> finally (receiveWait matchers >>= return . Just) (unmonitor mRef)