module Control.Distributed.Process.Extras.Internal.Types
(
Tag
, TagPool
, newTagPool
, getTag
, Linkable(..)
, Killable(..)
, Resolvable(..)
, Routable(..)
, Addressable
, sendToRecipient
, Recipient(..)
, RegisterSelf(..)
, whereisRemote
, resolveOrDie
, CancelWait(..)
, Channel
, Shutdown(..)
, ExitReason(..)
, ServerDisconnected(..)
, NFSerializable
, __remoteTable
) where
import Control.Concurrent.MVar
( MVar
, newMVar
, modifyMVar
)
import Control.DeepSeq (NFData(..), ($!!))
import Control.Distributed.Process hiding (send)
import qualified Control.Distributed.Process as P
( send
, unsafeSend
, unsafeNSend
)
import Control.Distributed.Process.Closure
( remotable
, mkClosure
, functionTDict
)
import Control.Distributed.Process.Serializable
import Data.Binary
import Data.Typeable (Typeable)
import GHC.Generics
class (NFData a, Serializable a) => NFSerializable a
instance (NFData a, Serializable a) => NFSerializable a
type Tag = Int
type TagPool = MVar Tag
newTagPool :: Process TagPool
newTagPool = liftIO $ newMVar 0
getTag :: TagPool -> Process Tag
getTag tp = liftIO $ modifyMVar tp (\tag -> return (tag+1,tag))
data CancelWait = CancelWait
deriving (Eq, Show, Typeable, Generic)
instance Binary CancelWait where
instance NFData CancelWait where
type Channel a = (SendPort a, ReceivePort a)
data RegisterSelf = RegisterSelf
deriving (Typeable, Generic)
instance Binary RegisterSelf where
instance NFData RegisterSelf where
data Shutdown = Shutdown
deriving (Typeable, Generic, Show, Eq)
instance Binary Shutdown where
instance NFData Shutdown where
data ExitReason =
ExitNormal
| ExitShutdown
| ExitOther !String
deriving (Typeable, Generic, Eq, Show)
instance Binary ExitReason where
instance NFData ExitReason where
data Recipient =
Pid !ProcessId
| Registered !String
| RemoteRegistered !String !NodeId
deriving (Typeable, Generic, Show, Eq)
instance Binary Recipient where
instance NFData Recipient where
rnf (Pid p) = rnf p `seq` ()
rnf (Registered s) = rnf s `seq` ()
rnf (RemoteRegistered s n) = rnf s `seq` rnf n `seq` ()
data ServerDisconnected = ServerDisconnected !DiedReason
deriving (Typeable, Generic)
instance Binary ServerDisconnected where
instance NFData ServerDisconnected where
$(remotable ['whereis])
whereisRemote :: NodeId -> String -> Process (Maybe ProcessId)
whereisRemote node name =
call $(functionTDict 'whereis) node ($(mkClosure 'whereis) name)
sendToRecipient :: (Serializable m) => Recipient -> m -> Process ()
sendToRecipient (Pid p) m = P.send p m
sendToRecipient (Registered s) m = nsend s m
sendToRecipient (RemoteRegistered s n) m = nsendRemote n s m
unsafeSendToRecipient :: (NFSerializable m) => Recipient -> m -> Process ()
unsafeSendToRecipient (Pid p) m = P.unsafeSend p $!! m
unsafeSendToRecipient (Registered s) m = P.unsafeNSend s $!! m
unsafeSendToRecipient (RemoteRegistered s n) m = nsendRemote n s m
baseAddressableErrorMessage :: (Routable a) => a -> String
baseAddressableErrorMessage _ = "CannotResolveAddressable"
class Linkable a where
linkTo :: a -> Process ()
class Resolvable a where
resolve :: a -> Process (Maybe ProcessId)
class Killable a where
killProc :: a -> String -> Process ()
exitProc :: (Serializable m) => a -> m -> Process ()
instance Killable ProcessId where
killProc = kill
exitProc = exit
instance Resolvable r => Killable r where
killProc r s = resolve r >>= maybe (return ()) (flip kill $ s)
exitProc r m = resolve r >>= maybe (return ()) (flip exit $ m)
class Routable a where
sendTo :: (Serializable m) => a -> m -> Process ()
unsafeSendTo :: (NFSerializable m) => a -> m -> Process ()
unresolvableMessage :: a -> String
unresolvableMessage = baseAddressableErrorMessage
instance (Resolvable a) => Routable a where
sendTo a m = do
mPid <- resolve a
maybe (die (unresolvableMessage a))
(\p -> P.send p m)
mPid
unsafeSendTo a m = do
mPid <- resolve a
maybe (die (unresolvableMessage a))
(\p -> P.unsafeSend p $!! m)
mPid
unresolvableMessage = baseAddressableErrorMessage
instance Resolvable Recipient where
resolve (Pid p) = return (Just p)
resolve (Registered n) = whereis n
resolve (RemoteRegistered s n) = whereisRemote n s
instance Routable Recipient where
sendTo = sendToRecipient
unsafeSendTo = unsafeSendToRecipient
unresolvableMessage (Pid p) = unresolvableMessage p
unresolvableMessage (Registered n) = unresolvableMessage n
unresolvableMessage (RemoteRegistered s n) = unresolvableMessage (n, s)
instance Resolvable ProcessId where
resolve p = return (Just p)
instance Routable ProcessId where
sendTo = P.send
unsafeSendTo pid msg = P.unsafeSend pid $!! msg
unresolvableMessage p = "CannotResolvePid[" ++ (show p) ++ "]"
instance Resolvable String where
resolve = whereis
instance Routable String where
sendTo = nsend
unsafeSendTo name msg = P.unsafeNSend name $!! msg
unresolvableMessage s = "CannotResolveRegisteredName[" ++ s ++ "]"
instance Resolvable (NodeId, String) where
resolve (nid, pname) = whereisRemote nid pname
instance Routable (NodeId, String) where
sendTo (nid, pname) msg = nsendRemote nid pname msg
unsafeSendTo = sendTo
unresolvableMessage (n, s) =
"CannotResolveRemoteRegisteredName[name: " ++ s ++ ", node: " ++ (show n) ++ "]"
instance Routable (Message -> Process ()) where
sendTo f = f . wrapMessage
unsafeSendTo f = f . unsafeWrapMessage
class (Resolvable a, Routable a) => Addressable a
instance (Resolvable a, Routable a) => Addressable a
resolveOrDie :: (Routable a, Resolvable a) => a -> String -> Process ProcessId
resolveOrDie resolvable failureMsg = do
result <- resolve resolvable
case result of
Nothing -> die $ failureMsg ++ " " ++ unresolvableMessage resolvable
Just pid -> return pid