module Control.Distributed.Process.Task.Queue.BlockingQueue
( BlockingQueue()
, SizeLimit
, BlockingQueueStats(..)
, start
, pool
, executeTask
, stats
) where
import Control.Distributed.Process hiding (call)
import Control.Distributed.Process.Closure()
import Control.Distributed.Process.Extras.Internal.Types
import Control.Distributed.Process.Async
import Control.Distributed.Process.ManagedProcess
import qualified Control.Distributed.Process.ManagedProcess as ManagedProcess
import Control.Distributed.Process.Extras.Time
import Control.Distributed.Process.Serializable
import Data.Binary
import Data.List
( deleteBy
, find
)
import Data.Sequence
( Seq
, ViewR(..)
, (<|)
, viewr
)
import qualified Data.Sequence as Seq (empty, length)
import Data.Typeable
import GHC.Generics (Generic)
type SizeLimit = Int
data GetStats = GetStats
deriving (Typeable, Generic)
instance Binary GetStats where
data BlockingQueueStats = BlockingQueueStats {
maxJobs :: Int
, activeJobs :: Int
, queuedJobs :: Int
} deriving (Typeable, Generic)
instance Binary BlockingQueueStats where
data BlockingQueue a = BlockingQueue {
poolSize :: SizeLimit
, active :: [(MonitorRef, CallRef (Either ExitReason a), Async a)]
, accepted :: Seq (CallRef (Either ExitReason a), Closure (Process a))
} deriving (Typeable)
start :: forall a . (Serializable a)
=> Process (InitResult (BlockingQueue a))
-> Process ()
start init' = ManagedProcess.serve () (\() -> init') poolServer
where poolServer =
defaultProcess {
apiHandlers = [
handleCallFrom (\s f (p :: Closure (Process a)) -> storeTask s f p)
, handleCall poolStatsRequest
]
, infoHandlers = [ handleInfo taskComplete ]
} :: ProcessDefinition (BlockingQueue a)
pool :: forall a . Serializable a
=> SizeLimit
-> Process (InitResult (BlockingQueue a))
pool sz' = return $ InitOk (BlockingQueue sz' [] Seq.empty) Infinity
executeTask :: forall s a . (Addressable s, Serializable a)
=> s
-> Closure (Process a)
-> Process (Either ExitReason a)
executeTask sid t = call sid t
stats :: forall s . Addressable s => s -> Process (Maybe BlockingQueueStats)
stats sid = tryCall sid GetStats
poolStatsRequest :: (Serializable a)
=> BlockingQueue a
-> GetStats
-> Process (ProcessReply BlockingQueueStats (BlockingQueue a))
poolStatsRequest st GetStats =
let sz = poolSize st
ac = length (active st)
pj = Seq.length (accepted st)
in reply (BlockingQueueStats sz ac pj) st
storeTask :: Serializable a
=> BlockingQueue a
-> CallRef (Either ExitReason a)
-> Closure (Process a)
-> Process (ProcessReply (Either ExitReason a) (BlockingQueue a))
storeTask s r c = acceptTask s r c >>= noReply_
acceptTask :: Serializable a
=> BlockingQueue a
-> CallRef (Either ExitReason a)
-> Closure (Process a)
-> Process (BlockingQueue a)
acceptTask s@(BlockingQueue sz' runQueue taskQueue) from task' =
let currentSz = length runQueue
in case currentSz >= sz' of
True -> do
return $ s { accepted = enqueue taskQueue (from, task') }
False -> do
proc <- unClosure task'
asyncHandle <- async $ task proc
ref <- monitorAsync asyncHandle
taskEntry <- return (ref, from, asyncHandle)
return s { active = (taskEntry:runQueue) }
taskComplete :: forall a . Serializable a
=> BlockingQueue a
-> ProcessMonitorNotification
-> Process (ProcessAction (BlockingQueue a))
taskComplete s@(BlockingQueue _ runQ _)
(ProcessMonitorNotification ref _ _) =
let worker = findWorker ref runQ in
case worker of
Just t@(_, c, h) -> wait h >>= respond c >> bump s t >>= continue
Nothing -> continue s
where
respond :: CallRef (Either ExitReason a)
-> AsyncResult a
-> Process ()
respond c (AsyncDone r) = replyTo c ((Right r) :: (Either ExitReason a))
respond c (AsyncFailed d) = replyTo c ((Left (ExitOther $ show d)) :: (Either ExitReason a))
respond c (AsyncLinkFailed d) = replyTo c ((Left (ExitOther $ show d)) :: (Either ExitReason a))
respond _ _ = die $ ExitOther "IllegalState"
bump :: BlockingQueue a
-> (MonitorRef, CallRef (Either ExitReason a), Async a)
-> Process (BlockingQueue a)
bump st@(BlockingQueue _ runQueue acc) worker =
let runQ2 = deleteFromRunQueue worker runQueue
accQ = dequeue acc in
case accQ of
Nothing -> return st { active = runQ2 }
Just ((tr,tc), ts) -> acceptTask (st { accepted = ts, active = runQ2 }) tr tc
findWorker :: MonitorRef
-> [(MonitorRef, CallRef (Either ExitReason a), Async a)]
-> Maybe (MonitorRef, CallRef (Either ExitReason a), Async a)
findWorker key = find (\(ref,_,_) -> ref == key)
deleteFromRunQueue :: (MonitorRef, CallRef (Either ExitReason a), Async a)
-> [(MonitorRef, CallRef (Either ExitReason a), Async a)]
-> [(MonitorRef, CallRef (Either ExitReason a), Async a)]
deleteFromRunQueue c@(p, _, _) runQ = deleteBy (\_ (b, _, _) -> b == p) c runQ
enqueue :: Seq a -> a -> Seq a
enqueue s a = a <| s
dequeue :: Seq a -> Maybe (a, Seq a)
dequeue s = maybe Nothing (\(s' :> a) -> Just (a, s')) $ getR s
getR :: Seq a -> Maybe (ViewR a)
getR s =
case (viewr s) of
EmptyR -> Nothing
a -> Just a