{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LiberalTypeSynonyms #-}
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FunctionalDependencies #-}
module Control.Distributed.Process.ManagedProcess.Internal.Types
(
InitResult(..)
, GenProcess()
, runProcess
, lift
, liftIO
, ProcessState(..)
, State
, Queue
, Limit
, Condition(..)
, ProcessAction(..)
, ProcessReply(..)
, Action
, Reply
, ActionHandler
, CallHandler
, CastHandler
, StatelessHandler
, DeferredCallHandler
, StatelessCallHandler
, InfoHandler
, ChannelHandler
, StatelessChannelHandler
, InitHandler
, ShutdownHandler
, ExitState(..)
, isCleanShutdown
, exitState
, TimeoutHandler
, UnhandledMessagePolicy(..)
, ProcessDefinition(..)
, Priority(..)
, DispatchPriority(..)
, DispatchFilter(..)
, Filter(..)
, PrioritisedProcessDefinition(..)
, RecvTimeoutPolicy(..)
, ControlChannel(..)
, newControlChan
, ControlPort(..)
, channelControlPort
, Dispatcher(..)
, ExternDispatcher(..)
, DeferredDispatcher(..)
, ExitSignalDispatcher(..)
, MessageMatcher(..)
, ExternMatcher(..)
, Message(..)
, CallResponse(..)
, CallId
, CallRef(..)
, CallRejected(..)
, makeRef
, caller
, rejectToCaller
, recipient
, tag
, initCall
, unsafeInitCall
, waitResponse
) where
import Control.Concurrent.STM (STM)
import Control.Distributed.Process hiding (Message, mask, finally, liftIO)
import qualified Control.Distributed.Process as P (Message, liftIO)
import Control.Distributed.Process.Serializable
import Control.Distributed.Process.Extras
( Recipient(..)
, ExitReason(..)
, Addressable
, Resolvable(..)
, Routable(..)
, NFSerializable
)
import Control.Distributed.Process.ManagedProcess.Internal.PriorityQueue
( PriorityQ
)
import Control.Distributed.Process.Extras.Internal.Types
( resolveOrDie
)
import Control.Distributed.Process.Extras.Time
import Control.Distributed.Process.ManagedProcess.Timer (Timer, TimerKey)
import Control.DeepSeq (NFData(..))
import Control.Monad.Fix (MonadFix)
import Control.Monad.Catch
( catch
, throwM
, uninterruptibleMask
, mask
, finally
, MonadThrow
, MonadCatch
, MonadMask(..)
)
import qualified Control.Monad.Catch as Catch
( catch
, throwM
)
import Control.Monad.IO.Class (MonadIO)
import qualified Control.Monad.State.Strict as ST
( MonadState
, StateT
, get
, lift
, runStateT
)
import Data.Binary hiding (decode)
import Data.Map.Strict (Map)
import Data.Typeable (Typeable)
import Data.IORef (IORef)
import Prelude hiding (init)
import GHC.Generics
type CallId = MonitorRef
newtype CallRef a = CallRef { unCaller :: (Recipient, CallId) }
deriving (Eq, Show, Typeable, Generic)
recipient :: CallRef a -> Recipient
recipient = fst . unCaller
tag :: CallRef a -> CallId
tag = snd . unCaller
instance Binary (CallRef a) where
instance NFData (CallRef a) where rnf (CallRef x) = rnf x `seq` ()
makeRef :: Recipient -> CallId -> CallRef a
makeRef r c = CallRef (r, c)
data Message a b =
CastMessage a
| CallMessage a (CallRef b)
| ChanMessage a (SendPort b)
deriving (Typeable, Generic)
caller :: forall a b . Message a b -> Maybe Recipient
caller (CallMessage _ ref) = Just $ recipient ref
caller _ = Nothing
rejectToCaller :: forall a b .
Message a b -> String -> Process ()
rejectToCaller (CallMessage _ ref) m = sendTo ref (CallRejected m (tag ref))
rejectToCaller _ _ = return ()
instance (Serializable a, Serializable b) => Binary (Message a b) where
instance (NFSerializable a, NFSerializable b) => NFData (Message a b) where
rnf (CastMessage a) = rnf a `seq` ()
rnf (CallMessage a b) = rnf a `seq` rnf b `seq` ()
rnf (ChanMessage a b) = rnf a `seq` rnf b `seq` ()
deriving instance (Eq a, Eq b) => Eq (Message a b)
deriving instance (Show a, Show b) => Show (Message a b)
data CallResponse a = CallResponse a CallId
deriving (Typeable, Generic)
instance Serializable a => Binary (CallResponse a)
instance NFSerializable a => NFData (CallResponse a) where
rnf (CallResponse a c) = rnf a `seq` rnf c `seq` ()
deriving instance Eq a => Eq (CallResponse a)
deriving instance Show a => Show (CallResponse a)
data CallRejected = CallRejected String CallId
deriving (Typeable, Generic, Show, Eq)
instance Binary CallRejected where
instance NFData CallRejected where
instance Resolvable (CallRef a) where
resolve (CallRef (r, _)) = resolve r
instance Routable (CallRef a) where
sendTo (CallRef (c, _)) = sendTo c
unsafeSendTo (CallRef (c, _)) = unsafeSendTo c
data InitResult s =
InitOk s Delay
| InitStop String
| InitIgnore
deriving (Typeable)
type Limit = Maybe Int
type Queue = PriorityQ Int P.Message
type TimerMap = Map TimerKey (Timer, P.Message)
data ProcessState s = ProcessState { timeoutSpec :: RecvTimeoutPolicy
, procDef :: ProcessDefinition s
, procPrio :: [DispatchPriority s]
, procFilters :: [DispatchFilter s]
, usrTimeout :: Delay
, sysTimeout :: Timer
, usrTimers :: TimerMap
, internalQ :: Queue
, procState :: s
}
type State s = IORef (ProcessState s)
newtype GenProcess s a = GenProcess {
unManaged :: ST.StateT (State s) Process a
}
deriving ( Functor
, Monad
, ST.MonadState (State s)
, MonadIO
, MonadFix
, Typeable
, Applicative
)
instance forall s . MonadThrow (GenProcess s) where
throwM = lift . Catch.throwM
instance forall s . MonadCatch (GenProcess s) where
catch p h = do
pSt <- ST.get
(a, _) <- lift $ Catch.catch (runProcess pSt p) (runProcess pSt . h)
return a
instance forall s . MonadMask (GenProcess s) where
mask p = do
pSt <- ST.get
lift $ mask $ \restore -> do
(a, _) <- runProcess pSt (p (liftRestore restore))
return a
where
liftRestore restoreP = \p2 -> do
ourSTate <- ST.get
(a', _) <- lift $ restoreP $ runProcess ourSTate p2
return a'
uninterruptibleMask p = do
pSt <- ST.get
(a, _) <- lift $ uninterruptibleMask $ \restore ->
runProcess pSt (p (liftRestore restore))
return a
where
liftRestore restoreP = \p2 -> do
ourSTate <- ST.get
(a', _) <- lift $ restoreP $ runProcess ourSTate p2
return a'
#if MIN_VERSION_exceptions(0,10,0)
generalBracket acquire release inner = GenProcess $
generalBracket (unManaged acquire)
(\a e -> unManaged $ release a e)
(unManaged . inner)
#endif
runProcess :: State s -> GenProcess s a -> Process (a, State s)
runProcess state proc = ST.runStateT (unManaged proc) state
lift :: Process a -> GenProcess s a
lift p = GenProcess $ ST.lift p
liftIO :: IO a -> GenProcess s a
liftIO = lift . P.liftIO
data ProcessAction s =
ProcessSkip
| ProcessActivity (GenProcess s ())
| ProcessExpression (GenProcess s (ProcessAction s))
| ProcessContinue s
| ProcessTimeout Delay s
| ProcessHibernate TimeInterval s
| ProcessStop ExitReason
| ProcessStopping s ExitReason
| ProcessBecome (ProcessDefinition s) s
data ProcessReply r s =
ProcessReply r (ProcessAction s)
| ProcessReject String (ProcessAction s)
| NoReply (ProcessAction s)
data Condition s m =
Condition (s -> m -> Bool)
| State (s -> Bool)
| Input (m -> Bool)
data ExitState s = CleanShutdown s
| LastKnown s
isCleanShutdown :: ExitState s -> Bool
isCleanShutdown (CleanShutdown _) = True
isCleanShutdown _ = False
exitState :: ExitState s -> s
exitState (CleanShutdown s) = s
exitState (LastKnown s) = s
type Action s = Process (ProcessAction s)
type Reply b s = Process (ProcessReply b s)
type ActionHandler s a = s -> a -> Action s
type CallHandler s a b = s -> a -> Reply b s
type StatelessHandler s a = a -> (s -> Action s)
type DeferredCallHandler s a b = CallRef b -> CallHandler s a b
type StatelessCallHandler s a b = CallRef b -> a -> Reply b s
type CastHandler s a = ActionHandler s a
type InfoHandler s a = ActionHandler s a
type ChannelHandler s a b = SendPort b -> ActionHandler s a
type StatelessChannelHandler s a b = SendPort b -> StatelessHandler s a
type InitHandler a s = a -> Process (InitResult s)
type ShutdownHandler s = ExitState s -> ExitReason -> Process ()
type TimeoutHandler s = ActionHandler s Delay
newtype ControlChannel m =
ControlChannel {
unControl :: (SendPort (Message m ()), ReceivePort (Message m ()))
}
newControlChan :: (Serializable m) => Process (ControlChannel m)
newControlChan = fmap ControlChannel newChan
newtype ControlPort m =
ControlPort {
unPort :: SendPort (Message m ())
} deriving (Show)
deriving instance (Serializable m) => Binary (ControlPort m)
instance Eq (ControlPort m) where
a == b = unPort a == unPort b
channelControlPort :: ControlChannel m
-> ControlPort m
channelControlPort cc = ControlPort $ fst $ unControl cc
data Filter s = FilterOk s
| FilterSafe s
| forall m . (Show m) => FilterReject m s
| FilterSkip s
| FilterStop s ExitReason
data DispatchFilter s =
forall a b . (Serializable a, Serializable b) =>
FilterApi
{
apiFilter :: s -> Message a b -> Process (Filter s)
}
| forall a . (Serializable a) =>
FilterAny
{
anyFilter :: s -> a -> Process (Filter s)
}
| FilterRaw
{
rawFilter :: s -> P.Message -> Process (Maybe (Filter s))
}
| FilterState
{
stateFilter :: s -> Process (Maybe (Filter s))
}
data Dispatcher s =
forall a b . (Serializable a, Serializable b) =>
Dispatch
{
dispatch :: s -> Message a b -> Process (ProcessAction s)
}
| forall a b . (Serializable a, Serializable b) =>
DispatchIf
{
dispatch :: s -> Message a b -> Process (ProcessAction s)
, dispatchIf :: s -> Message a b -> Bool
}
data ExternDispatcher s =
forall a b . (Serializable a, Serializable b) =>
DispatchCC
{
channel :: ReceivePort (Message a b)
, dispatchChan :: s -> Message a b -> Process (ProcessAction s)
}
| forall a . (Serializable a) =>
DispatchSTM
{
stmAction :: STM a
, dispatchStm :: s -> a -> Process (ProcessAction s)
, matchStm :: Match P.Message
, matchAnyStm :: forall m . (P.Message -> m) -> Match m
}
data DeferredDispatcher s =
DeferredDispatcher
{
dispatchInfo :: s
-> P.Message
-> Process (Maybe (ProcessAction s))
}
data ExitSignalDispatcher s =
ExitSignalDispatcher
{
dispatchExit :: s
-> ProcessId
-> P.Message
-> Process (Maybe (ProcessAction s))
}
class MessageMatcher d where
matchDispatch :: UnhandledMessagePolicy -> s -> d s -> Match (ProcessAction s)
instance MessageMatcher Dispatcher where
matchDispatch _ s (Dispatch d) = match (d s)
matchDispatch _ s (DispatchIf d cond) = matchIf (cond s) (d s)
instance MessageMatcher ExternDispatcher where
matchDispatch _ s (DispatchCC c d) = matchChan c (d s)
matchDispatch _ s (DispatchSTM c d _ _) = matchSTM c (d s)
class ExternMatcher d where
matchExtern :: UnhandledMessagePolicy -> s -> d s -> Match P.Message
matchMapExtern :: forall m s . UnhandledMessagePolicy
-> s -> (P.Message -> m) -> d s -> Match m
instance ExternMatcher ExternDispatcher where
matchExtern _ _ (DispatchCC c _) = matchChan c (return . unsafeWrapMessage)
matchExtern _ _ (DispatchSTM _ _ m _) = m
matchMapExtern _ _ f (DispatchCC c _) = matchChan c (return . f . unsafeWrapMessage)
matchMapExtern _ _ f (DispatchSTM _ _ _ p) = p f
newtype Priority a = Priority { getPrio :: Int }
data DispatchPriority s =
PrioritiseCall
{
prioritise :: s -> P.Message -> Process (Maybe (Int, P.Message))
}
| PrioritiseCast
{
prioritise :: s -> P.Message -> Process (Maybe (Int, P.Message))
}
| PrioritiseInfo
{
prioritise :: s -> P.Message -> Process (Maybe (Int, P.Message))
}
data RecvTimeoutPolicy = RecvMaxBacklog Int | RecvTimer TimeInterval
deriving (Typeable)
data PrioritisedProcessDefinition s =
PrioritisedProcessDefinition
{
processDef :: ProcessDefinition s
, priorities :: [DispatchPriority s]
, filters :: [DispatchFilter s]
, recvTimeout :: RecvTimeoutPolicy
}
data UnhandledMessagePolicy =
Terminate
| DeadLetter ProcessId
| Log
| Drop
deriving (Show, Eq)
data ProcessDefinition s = ProcessDefinition {
apiHandlers :: [Dispatcher s]
, infoHandlers :: [DeferredDispatcher s]
, externHandlers :: [ExternDispatcher s]
, exitHandlers :: [ExitSignalDispatcher s]
, timeoutHandler :: TimeoutHandler s
, shutdownHandler :: ShutdownHandler s
, unhandledMessagePolicy :: UnhandledMessagePolicy
}
initCall :: forall s a b . (Addressable s, Serializable a, Serializable b)
=> s -> a -> Process (CallRef b)
initCall sid msg = do
pid <- resolveOrDie sid "initCall: unresolveable address "
mRef <- monitor pid
self <- getSelfPid
let cRef = makeRef (Pid self) mRef in do
sendTo pid (CallMessage msg cRef :: Message a b)
return cRef
unsafeInitCall :: forall s a b . ( Addressable s
, NFSerializable a
, NFSerializable b
)
=> s -> a -> Process (CallRef b)
unsafeInitCall sid msg = do
pid <- resolveOrDie sid "unsafeInitCall: unresolveable address "
mRef <- monitor pid
self <- getSelfPid
let cRef = makeRef (Pid self) mRef in do
unsafeSendTo pid (CallMessage msg cRef :: Message a b)
return cRef
waitResponse :: forall b. (Serializable b)
=> Maybe TimeInterval
-> CallRef b
-> Process (Maybe (Either ExitReason b))
waitResponse mTimeout cRef =
let (_, mRef) = unCaller cRef
matchers = [ matchIf (\((CallResponse _ ref) :: CallResponse b) -> ref == mRef)
(\((CallResponse m _) :: CallResponse b) -> return (Right m))
, matchIf (\((CallRejected _ ref)) -> ref == mRef)
(\(CallRejected s _) -> return (Left $ ExitOther $ s))
, matchIf (\(ProcessMonitorNotification ref _ _) -> ref == mRef)
(\(ProcessMonitorNotification _ _ r) -> return (Left (err r)))
]
err r = ExitOther $ show r in
case mTimeout of
(Just ti) -> finally (receiveTimeout (asTimeout ti) matchers) (unmonitor mRef)
Nothing -> finally (fmap Just (receiveWait matchers)) (unmonitor mRef)