module Control.Distributed.Process.Internal.Closure.BuiltIn
(
remoteTable
, staticDecode
, sdictUnit
, sdictProcessId
, sdictSendPort
, sndStatic
, CP
, idCP
, splitCP
, returnCP
, bindCP
, seqCP
, decodeProcessIdStatic
, cpLink
, cpUnlink
, cpRelay
, cpSend
, cpExpect
, cpNewChan
, cpDelay
, cpEnableTraceRemote
) where
import Data.ByteString.Lazy (ByteString)
import Data.Binary (decode, encode)
import Data.Rank1Typeable (Typeable, ANY, ANY1, ANY2, ANY3, ANY4)
import Data.Rank1Dynamic (toDynamic)
import Control.Distributed.Static
( RemoteTable
, registerStatic
, Static
, staticLabel
, staticApply
, Closure
, closure
, closureApplyStatic
, closureApply
, staticCompose
, staticClosure
)
import Control.Distributed.Process.Serializable
( SerializableDict(..)
, Serializable
)
import Control.Distributed.Process.Internal.Types
( Process
, ProcessId
, SendPort
, ReceivePort
, ProcessMonitorNotification(ProcessMonitorNotification)
)
import Control.Distributed.Process.Internal.Primitives
( link
, unlink
, relay
, send
, expect
, newChan
, monitor
, unmonitor
, match
, matchIf
, receiveWait
)
remoteTable :: RemoteTable -> RemoteTable
remoteTable =
registerStatic "$decodeDict" (toDynamic (decodeDict :: SerializableDict ANY -> ByteString -> ANY))
. registerStatic "$sdictUnit" (toDynamic (SerializableDict :: SerializableDict ()))
. registerStatic "$sdictProcessId" (toDynamic (SerializableDict :: SerializableDict ProcessId))
. registerStatic "$sdictSendPort_" (toDynamic (sdictSendPort_ :: SerializableDict ANY -> SerializableDict (SendPort ANY)))
. registerStatic "$returnProcess" (toDynamic (return :: ANY -> Process ANY))
. registerStatic "$seqProcess" (toDynamic ((>>) :: Process ANY1 -> Process ANY2 -> Process ANY2))
. registerStatic "$bindProcess" (toDynamic ((>>=) :: Process ANY1 -> (ANY1 -> Process ANY2) -> Process ANY2))
. registerStatic "$decodeProcessId" (toDynamic (decode :: ByteString -> ProcessId))
. registerStatic "$link" (toDynamic link)
. registerStatic "$unlink" (toDynamic unlink)
. registerStatic "$relay" (toDynamic relay)
. registerStatic "$sendDict" (toDynamic (sendDict :: SerializableDict ANY -> ProcessId -> ANY -> Process ()))
. registerStatic "$expectDict" (toDynamic (expectDict :: SerializableDict ANY -> Process ANY))
. registerStatic "$newChanDict" (toDynamic (newChanDict :: SerializableDict ANY -> Process (SendPort ANY, ReceivePort ANY)))
. registerStatic "$cpSplit" (toDynamic (cpSplit :: (ANY1 -> Process ANY3) -> (ANY2 -> Process ANY4) -> (ANY1, ANY2) -> Process (ANY3, ANY4)))
. registerStatic "$snd" (toDynamic (snd :: (ANY1, ANY2) -> ANY2))
. registerStatic "$delay" (toDynamic delay)
where
decodeDict :: forall a. SerializableDict a -> ByteString -> a
decodeDict SerializableDict = decode
sdictSendPort_ :: forall a. SerializableDict a -> SerializableDict (SendPort a)
sdictSendPort_ SerializableDict = SerializableDict
sendDict :: forall a. SerializableDict a -> ProcessId -> a -> Process ()
sendDict SerializableDict = send
expectDict :: forall a. SerializableDict a -> Process a
expectDict SerializableDict = expect
newChanDict :: forall a. SerializableDict a -> Process (SendPort a, ReceivePort a)
newChanDict SerializableDict = newChan
cpSplit :: forall a b c d. (a -> Process c) -> (b -> Process d) -> (a, b) -> Process (c, d)
cpSplit f g (a, b) = do
c <- f a
d <- g b
return (c, d)
staticDecode :: Typeable a => Static (SerializableDict a) -> Static (ByteString -> a)
staticDecode dict = decodeDictStatic `staticApply` dict
where
decodeDictStatic :: Typeable a => Static (SerializableDict a -> ByteString -> a)
decodeDictStatic = staticLabel "$decodeDict"
sdictUnit :: Static (SerializableDict ())
sdictUnit = staticLabel "$sdictUnit"
sdictProcessId :: Static (SerializableDict ProcessId)
sdictProcessId = staticLabel "$sdictProcessId"
sdictSendPort :: Typeable a
=> Static (SerializableDict a) -> Static (SerializableDict (SendPort a))
sdictSendPort = staticApply (staticLabel "$sdictSendPort_")
sndStatic :: Static ((a, b) -> b)
sndStatic = staticLabel "$snd"
type CP a b = Closure (a -> Process b)
returnProcessStatic :: Typeable a => Static (a -> Process a)
returnProcessStatic = staticLabel "$returnProcess"
idCP :: Typeable a => CP a a
idCP = staticClosure returnProcessStatic
splitCP :: (Typeable a, Typeable b, Typeable c, Typeable d)
=> CP a c -> CP b d -> CP (a, b) (c, d)
splitCP p q = cpSplitStatic `closureApplyStatic` p `closureApply` q
where
cpSplitStatic :: Static ((a -> Process c) -> (b -> Process d) -> (a, b) -> Process (c, d))
cpSplitStatic = staticLabel "$cpSplit"
returnCP :: forall a. Serializable a
=> Static (SerializableDict a) -> a -> Closure (Process a)
returnCP dict x = closure decoder (encode x)
where
decoder :: Static (ByteString -> Process a)
decoder = returnProcessStatic
`staticCompose`
staticDecode dict
seqCP :: (Typeable a, Typeable b)
=> Closure (Process a) -> Closure (Process b) -> Closure (Process b)
seqCP p q = seqProcessStatic `closureApplyStatic` p `closureApply` q
where
seqProcessStatic :: (Typeable a, Typeable b)
=> Static (Process a -> Process b -> Process b)
seqProcessStatic = staticLabel "$seqProcess"
bindCP :: forall a b. (Typeable a, Typeable b)
=> Closure (Process a) -> CP a b -> Closure (Process b)
bindCP x f = bindProcessStatic `closureApplyStatic` x `closureApply` f
where
bindProcessStatic :: (Typeable a, Typeable b)
=> Static (Process a -> (a -> Process b) -> Process b)
bindProcessStatic = staticLabel "$bindProcess"
decodeProcessIdStatic :: Static (ByteString -> ProcessId)
decodeProcessIdStatic = staticLabel "$decodeProcessId"
cpLink :: ProcessId -> Closure (Process ())
cpLink = closure (linkStatic `staticCompose` decodeProcessIdStatic) . encode
where
linkStatic :: Static (ProcessId -> Process ())
linkStatic = staticLabel "$link"
cpUnlink :: ProcessId -> Closure (Process ())
cpUnlink = closure (unlinkStatic `staticCompose` decodeProcessIdStatic) . encode
where
unlinkStatic :: Static (ProcessId -> Process ())
unlinkStatic = staticLabel "$unlink"
cpSend :: forall a. Typeable a
=> Static (SerializableDict a) -> ProcessId -> CP a ()
cpSend dict pid = closure decoder (encode pid)
where
decoder :: Static (ByteString -> a -> Process ())
decoder = (sendDictStatic `staticApply` dict)
`staticCompose`
decodeProcessIdStatic
sendDictStatic :: Typeable a
=> Static (SerializableDict a -> ProcessId -> a -> Process ())
sendDictStatic = staticLabel "$sendDict"
cpExpect :: Typeable a => Static (SerializableDict a) -> Closure (Process a)
cpExpect dict = staticClosure (expectDictStatic `staticApply` dict)
where
expectDictStatic :: Typeable a => Static (SerializableDict a -> Process a)
expectDictStatic = staticLabel "$expectDict"
cpNewChan :: Typeable a
=> Static (SerializableDict a)
-> Closure (Process (SendPort a, ReceivePort a))
cpNewChan dict = staticClosure (newChanDictStatic `staticApply` dict)
where
newChanDictStatic :: Typeable a
=> Static (SerializableDict a -> Process (SendPort a, ReceivePort a))
newChanDictStatic = staticLabel "$newChanDict"
cpRelay :: ProcessId -> Closure (Process ())
cpRelay = closure (relayStatic `staticCompose` decodeProcessIdStatic) . encode
where
relayStatic :: Static (ProcessId -> Process ())
relayStatic = staticLabel "$relay"
cpEnableTraceRemote :: ProcessId -> Closure (Process ())
cpEnableTraceRemote =
closure (enableTraceStatic `staticCompose` decodeProcessIdStatic) . encode
where
enableTraceStatic :: Static (ProcessId -> Process ())
enableTraceStatic = staticLabel "$enableTraceRemote"
delay :: ProcessId -> Process () -> Process ()
delay them p = do
ref <- monitor them
let sameRef (ProcessMonitorNotification ref' _ _) = ref == ref'
receiveWait [
match $ \() -> unmonitor ref >> p
, matchIf sameRef $ \_ -> return ()
]
cpDelay :: ProcessId -> Closure (Process ()) -> Closure (Process ())
cpDelay = closureApply . cpDelay'
where
cpDelay' :: ProcessId -> Closure (Process () -> Process ())
cpDelay' pid = closure decoder (encode pid)
decoder :: Static (ByteString -> Process () -> Process ())
decoder = delayStatic `staticCompose` decodeProcessIdStatic
delayStatic :: Static (ProcessId -> Process () -> Process ())
delayStatic = staticLabel "$delay"