module Control.Distributed.Process.Execution.Mailbox
(
Mailbox()
, startMailbox
, startSupervised
, startSupervisedMailbox
, createMailbox
, resize
, statistics
, monitor
, Limit
, BufferType(..)
, MailboxStats(..)
, post
, notify
, deliver
, active
, NewMail(..)
, Delivery(..)
, FilterResult(..)
, acceptEverything
, acceptMatching
, __remoteTable
) where
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan
( TChan
, newBroadcastTChanIO
, dupTChan
, readTChan
, writeTChan
)
import Control.Distributed.Process hiding (call, monitor)
import qualified Control.Distributed.Process as P (monitor)
import Control.Distributed.Process.Closure
( remotable
, mkStaticClosure
)
import Control.Distributed.Process.Serializable hiding (SerializableDict)
import Control.Distributed.Process.Extras.Internal.Types
( ExitReason(..)
, Resolvable(..)
, Routable(..)
, Linkable(..)
)
import Control.Distributed.Process.ManagedProcess
( call
, sendControlMessage
, channelControlPort
, handleControlChan
, handleInfo
, handleRaw
, continue
, defaultProcess
, UnhandledMessagePolicy(..)
, InitHandler
, InitResult(..)
, ProcessAction
, ProcessDefinition(..)
, ControlChannel
, ControlPort
)
import qualified Control.Distributed.Process.ManagedProcess as MP
( chanServe
)
import Control.Distributed.Process.ManagedProcess.Server
( stop
)
import Control.Distributed.Process.ManagedProcess.Server.Restricted as Restricted
( getState
, Result
, RestrictedProcess
)
import qualified Control.Distributed.Process.ManagedProcess.Server.Restricted as Restricted
( handleCall
, reply
)
import Control.Distributed.Process.Supervisor (SupervisorPid)
import Control.Distributed.Process.Extras.Time
import Control.Exception (SomeException)
import Data.Accessor
( Accessor
, accessor
, (^:)
, (.>)
, (^=)
, (^.)
)
import Data.Binary
import qualified Data.Foldable as Foldable
import Data.Sequence
( Seq
, ViewL(EmptyL, (:<))
, ViewR(EmptyR, (:>))
, (<|)
, (|>)
)
import qualified Data.Sequence as Seq
import Data.Typeable (Typeable)
import GHC.Generics
#if ! MIN_VERSION_base(4,6,0)
import Prelude hiding (catch, drop)
#else
import Prelude hiding (drop)
#endif
data Mailbox = Mailbox { pid :: !ProcessId
, cchan :: !(ControlPort ControlMessage)
} deriving (Typeable, Generic, Eq)
instance Binary Mailbox where
instance Show Mailbox where
show = ("Mailbox:" ++) . show . pid
instance Linkable Mailbox where
linkTo = link . pid
instance Resolvable Mailbox where
resolve = return . Just . pid
instance Routable Mailbox where
sendTo = post
unsafeSendTo = post
sendCtrlMsg :: Mailbox
-> ControlMessage
-> Process ()
sendCtrlMsg Mailbox{..} = sendControlMessage cchan
data BufferType =
Queue
| Stack
| Ring
deriving (Typeable, Eq, Show)
type Limit = Integer
type Filter = Closure (Message -> Process FilterResult)
data NewMail = NewMail !Mailbox !Integer
deriving (Typeable, Generic, Show)
instance Binary NewMail where
data Delivery = Delivery { box :: Mailbox
, messages :: [Message]
, count :: Integer
, totalDropped :: Integer
}
deriving (Typeable, Generic)
instance Binary Delivery where
data MailboxStats =
MailboxStats { pendingMessages :: Integer
, droppedMessages :: Integer
, currentLimit :: Limit
, owningProcess :: ProcessId
} deriving (Typeable, Generic, Show)
instance Binary MailboxStats where
data Post = Post !Message
deriving (Typeable, Generic)
instance Binary Post where
data StatsReq = StatsReq
deriving (Typeable, Generic)
instance Binary StatsReq where
data FilterResult = Keep | Skip | Send
deriving (Typeable, Generic)
instance Binary FilterResult
data Mode =
Active !Filter
| Notify
| Passive
deriving (Typeable, Generic)
instance Binary Mode where
instance Show Mode where
show (Active _) = "Active"
show Notify = "Notify"
show Passive = "Passive"
data ControlMessage =
Resize !Integer
| SetActiveMode !Mode
deriving (Typeable, Generic)
instance Binary ControlMessage where
class Buffered a where
tag :: a -> BufferType
push :: Message -> a -> a
pop :: a -> Maybe (Message, a)
adjust :: Limit -> a -> a
drop :: Integer -> a -> a
data BufferState =
BufferState { _mode :: Mode
, _bufferT :: BufferType
, _limit :: Limit
, _size :: Integer
, _dropped :: Integer
, _owner :: ProcessId
, ctrlChan :: ControlPort ControlMessage
}
defaultState :: BufferType
-> Limit
-> ProcessId
-> ControlPort ControlMessage
-> BufferState
defaultState bufferT limit' pid cc =
BufferState { _mode = Passive
, _bufferT = bufferT
, _limit = limit'
, _size = 0
, _dropped = 0
, _owner = pid
, ctrlChan = cc
}
data State = State { _buffer :: Seq Message
, _state :: BufferState
}
instance Buffered State where
tag q = _bufferT $ _state q
push m = (state .> size ^: (+1)) . (buffer ^: (m <|))
pop q = maybe Nothing
(\(s' :> a) -> Just (a, ( (buffer ^= s')
. (state .> size ^: (1))
$ q))) $ getR (q ^. buffer)
adjust sz q = (state .> limit ^= sz) $ maybeDrop
where
maybeDrop
| size' <- (q ^. state ^. size),
size' > sz = (state .> size ^= sz) $ drop (size' sz) q
| otherwise = q
drop n q
| n > 1 = drop (n 1) $ drop 1 q
| isQueue q = dropR q
| otherwise = dropL q
where
dropR q' = maybe q' (\(s' :> _) -> dropOne q' s') $ getR (q' ^. buffer)
dropL q' = maybe q' (\(_ :< s') -> dropOne q' s') $ getL (q' ^. buffer)
dropOne q' s = ( (buffer ^= s)
. (state .> size ^: (\n' -> n' 1))
. (state .> dropped ^: (+1))
$ q' )
createMailbox :: BufferType -> Limit -> Process Mailbox
createMailbox buffT maxSz =
getSelfPid >>= \self -> startMailbox self buffT maxSz
startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox
startMailbox = doStartMailbox Nothing
startSupervised :: ProcessId
-> BufferType
-> Limit
-> SupervisorPid
-> Process (ProcessId, Message)
startSupervised p b l s = do
mb <- startSupervisedMailbox p b l s
return (pid mb, unsafeWrapMessage mb)
startSupervisedMailbox :: ProcessId
-> BufferType
-> Limit
-> SupervisorPid
-> Process Mailbox
startSupervisedMailbox p b l s = doStartMailbox (Just s) p b l
doStartMailbox :: Maybe SupervisorPid
-> ProcessId
-> BufferType
-> Limit
-> Process Mailbox
doStartMailbox mSp p b l = do
bchan <- liftIO $ newBroadcastTChanIO
rchan <- liftIO $ atomically $ dupTChan bchan
spawnLocal (maybeLink mSp >> runMailbox bchan p b l) >>= \pid -> do
cc <- liftIO $ atomically $ readTChan rchan
return $ Mailbox pid cc
where
maybeLink Nothing = return ()
maybeLink (Just p') = link p'
runMailbox :: TChan (ControlPort ControlMessage)
-> ProcessId
-> BufferType
-> Limit
-> Process ()
runMailbox tc pid buffT maxSz = do
link pid
tc' <- liftIO $ atomically $ dupTChan tc
MP.chanServe (pid, buffT, maxSz) (mboxInit tc') (processDefinition pid tc)
mboxInit :: TChan (ControlPort ControlMessage)
-> InitHandler (ProcessId, BufferType, Limit) State
mboxInit tc (pid, buffT, maxSz) = do
cc <- liftIO $ atomically $ readTChan tc
return $ InitOk (State Seq.empty $ defaultState buffT maxSz pid cc) Infinity
monitor :: Mailbox -> Process MonitorRef
monitor = P.monitor . pid
notify :: Mailbox -> Process ()
notify mb = sendCtrlMsg mb $ SetActiveMode Notify
active :: Mailbox -> Filter -> Process ()
active mb f = sendCtrlMsg mb $ SetActiveMode $ Active f
resize :: Mailbox -> Integer -> Process ()
resize mb sz = sendCtrlMsg mb $ Resize sz
post :: Serializable a => Mailbox -> a -> Process ()
post Mailbox{..} m = send pid (Post $ wrapMessage m)
statistics :: Mailbox -> Process MailboxStats
statistics mb = call mb StatsReq
everything :: Message -> Process FilterResult
everything _ = return Keep
matching :: Closure (Message -> Process FilterResult)
-> Message
-> Process FilterResult
matching predicate msg = do
pred' <- unClosure predicate :: Process (Message -> Process FilterResult)
res <- handleMessage msg pred'
case res of
Nothing -> return Skip
Just fr -> return fr
processDefinition :: ProcessId
-> TChan (ControlPort ControlMessage)
-> ControlChannel ControlMessage
-> Process (ProcessDefinition State)
processDefinition pid tc cc = do
liftIO $ atomically $ writeTChan tc $ channelControlPort cc
return $ defaultProcess { apiHandlers = [
handleControlChan cc handleControlMessages
, Restricted.handleCall handleGetStats
]
, infoHandlers = [ handleInfo handlePost
, handleRaw handleRawInputs ]
, unhandledMessagePolicy = DeadLetter pid
} :: Process (ProcessDefinition State)
handleControlMessages :: State
-> ControlMessage
-> Process (ProcessAction State)
handleControlMessages st cm
| (SetActiveMode new) <- cm = activateMode st new
| (Resize sz') <- cm = continue $ adjust sz' st
| otherwise = stop $ ExitOther "IllegalState"
where
activateMode :: State -> Mode -> Process (ProcessAction State)
activateMode st' new
| sz <- (st ^. state ^. size)
, sz == 0 = continue $ updated st' new
| otherwise = do
let updated' = updated st' new
case new of
Notify -> sendNotification updated' >> continue updated'
(Active _) -> sendMail updated' >>= continue
Passive -> die $ "IllegalState"
updated s m = (state .> mode ^= m) s
handleGetStats :: StatsReq -> RestrictedProcess State (Result MailboxStats)
handleGetStats _ = Restricted.reply . (^. stats) =<< getState
handleRawInputs :: State -> Message -> Process (ProcessAction State)
handleRawInputs st msg = handlePost st (Post msg)
handlePost :: State -> Post -> Process (ProcessAction State)
handlePost st (Post msg) = do
let st' = insert msg st
continue . (state .> mode ^= Passive) =<< forwardIfNecessary st'
where
forwardIfNecessary s
| Notify <- currentMode = sendNotification s >> return s
| Active _ <- currentMode = sendMail s
| otherwise = return s
currentMode = st ^. state ^. mode
sendNotification :: State -> Process ()
sendNotification st = do
pid <- getSelfPid
send ownerPid $ NewMail (Mailbox pid cchan) pending
where
ownerPid = st ^. state ^. owner
pending = st ^. state ^. size
cchan = ctrlChan (st ^. state)
type Count = Integer
type Skipped = Integer
sendMail :: State -> Process State
sendMail st = do
let Active f = st ^. state ^. mode
unCl <- catch (unClosure f >>= return . Just)
(\(_ :: SomeException) -> return Nothing)
case unCl of
Nothing -> return st
Just f' -> do
(st', cnt, skipped, msgs) <- applyFilter f' st
us <- getSelfPid
send ownerPid $ Delivery { box = Mailbox us (ctrlChan $ st ^. state)
, messages = Foldable.toList msgs
, count = cnt
, totalDropped = skipped + droppedMsgs
}
return $ ( (state .> dropped ^= 0)
. (state .> size ^: ((cnt + skipped) ))
$ st' )
where
applyFilter f s = filterMessages f (s, 0, 0, Seq.empty)
filterMessages :: (Message -> Process FilterResult)
-> (State, Count, Skipped, Seq Message)
-> Process (State, Count, Skipped, Seq Message)
filterMessages f accIn@(buff, cnt, drp, acc) = do
case pop buff of
Nothing -> return accIn
Just (m, buff') -> do
res <- f m
case res of
Keep -> filterMessages f (buff', cnt + 1, drp, acc |> m)
Skip -> filterMessages f (buff', cnt, drp + 1, acc)
Send -> return accIn
ownerPid = st ^. state ^. owner
droppedMsgs = st ^. state ^. dropped
insert :: Message -> State -> State
insert msg st@(State _ BufferState{..}) =
if _size /= _limit
then push msg st
else case _bufferT of
Ring -> (state .> dropped ^: (+1)) st
_ -> push msg $ drop 1 st
isQueue :: State -> Bool
isQueue = (== Queue) . _bufferT . _state
isStack :: State -> Bool
isStack = (== Stack) . _bufferT . _state
getR :: Seq a -> Maybe (ViewR a)
getR s =
case Seq.viewr s of
EmptyR -> Nothing
a -> Just a
getL :: Seq a -> Maybe (ViewL a)
getL s =
case Seq.viewl s of
EmptyL -> Nothing
a -> Just a
mode :: Accessor BufferState Mode
mode = accessor _mode (\m st -> st { _mode = m })
bufferType :: Accessor BufferState BufferType
bufferType = accessor _bufferT (\t st -> st { _bufferT = t })
limit :: Accessor BufferState Limit
limit = accessor _limit (\l st -> st { _limit = l })
size :: Accessor BufferState Integer
size = accessor _size (\s st -> st { _size = s })
dropped :: Accessor BufferState Integer
dropped = accessor _dropped (\d st -> st { _dropped = d })
owner :: Accessor BufferState ProcessId
owner = accessor _owner (\o st -> st { _owner = o })
buffer :: Accessor State (Seq Message)
buffer = accessor _buffer (\b qb -> qb { _buffer = b })
state :: Accessor State BufferState
state = accessor _state (\s qb -> qb { _state = s })
stats :: Accessor State MailboxStats
stats = accessor getStats (\_ s -> s)
where
getStats (State _ (BufferState _ _ lm sz dr op _)) = MailboxStats sz dr lm op
$(remotable ['everything, 'matching])
acceptEverything :: Closure (Message -> Process FilterResult)
acceptEverything = $(mkStaticClosure 'everything)
acceptMatching :: Closure (Closure (Message -> Process FilterResult)
-> Message -> Process FilterResult)
acceptMatching = $(mkStaticClosure 'matching)
deliver :: Mailbox -> Process ()
deliver mb = active mb acceptEverything