module System.Mesos.Scheduler (
ToScheduler(..),
SchedulerDriver,
withSchedulerDriver,
start,
stop,
abort,
await,
run,
requestResources,
launchTasks,
killTask,
declineOffer,
reviveOffers,
sendFrameworkMessage,
reconcileTasks,
createDriver,
destroyDriver,
Scheduler,
createScheduler,
destroyScheduler
) where
import Control.Monad.Managed
import Data.ByteString (ByteString, packCStringLen)
import Data.ByteString.Unsafe (unsafeUseAsCStringLen)
import Foreign.Ptr
import System.Mesos.Internal hiding (marshal, requestResources)
import System.Mesos.Raw
import System.Mesos.Raw.Scheduler
import System.Mesos.Types hiding (requestResources)
class ToScheduler a where
registered :: a -> SchedulerDriver -> FrameworkID -> MasterInfo -> IO ()
registered _ _ _ _ = return ()
reRegistered :: a -> SchedulerDriver -> MasterInfo -> IO ()
reRegistered _ _ _ = return ()
disconnected :: a -> SchedulerDriver -> IO ()
disconnected _ _ = return ()
resourceOffers :: a -> SchedulerDriver -> [Offer] -> IO ()
resourceOffers _ _ _ = return ()
offerRescinded :: a -> SchedulerDriver -> OfferID -> IO ()
offerRescinded _ _ _ = return ()
statusUpdate :: a -> SchedulerDriver -> TaskStatus -> IO ()
statusUpdate _ _ _ = return ()
frameworkMessage :: a -> SchedulerDriver -> ExecutorID -> SlaveID -> ByteString -> IO ()
frameworkMessage _ _ _ _ _ = return ()
slaveLost :: a -> SchedulerDriver -> SlaveID -> IO ()
slaveLost _ _ _ = return ()
executorLost :: a -> SchedulerDriver -> ExecutorID -> SlaveID -> Status -> IO ()
executorLost _ _ _ _ _ = return ()
errorMessage :: a -> SchedulerDriver -> ByteString -> IO ()
errorMessage _ _ _ = return ()
createScheduler :: ToScheduler a => a -> IO Scheduler
createScheduler s = do
registeredFun <- wrapSchedulerRegistered $ \sdp fp mp -> runManaged $ do
let sd = SchedulerDriver sdp
f <- peekCPP fp
m <- peekCPP mp
liftIO $ (registered s) sd f m
reRegisteredFun <- wrapSchedulerReRegistered $ \sdp mip -> runManaged $ do
let sd = SchedulerDriver sdp
mi <- unmarshal mip
liftIO $ (reRegistered s) sd mi
disconnectedFun <- wrapSchedulerDisconnected $ \sdp -> runManaged $ do
let sd = SchedulerDriver sdp
liftIO $ (disconnected s) sd
resourceOffersFun <- wrapSchedulerResourceOffers $ \sdp os c -> runManaged $ do
let sd = SchedulerDriver sdp
offers <- mapM unmarshal =<< peekArray' (os, fromIntegral c)
liftIO $ (resourceOffers s) sd offers
offerRescindedFun <- wrapSchedulerOfferRescinded $ \sdp oidp -> do
let sd = SchedulerDriver sdp
with (unmarshal oidp) $ \oid ->
(offerRescinded s) sd oid
statusUpdateFun <- wrapSchedulerStatusUpdate $ \sdp tsp -> runManaged $ do
let sd = SchedulerDriver sdp
ts <- unmarshal tsp
liftIO $ (statusUpdate s) sd ts
frameworkMessageFun <- wrapSchedulerFrameworkMessage $ \sdp eip sip ptr c -> runManaged $ do
let sd = SchedulerDriver sdp
ei <- unmarshal eip
si <- unmarshal sip
bs <- liftIO $ packCStringLen (ptr, c)
liftIO $ (frameworkMessage s) sd ei si bs
slaveLostFun <- wrapSchedulerSlaveLost $ \sdp sip -> runManaged $ do
let sd = SchedulerDriver sdp
si <- unmarshal sip
liftIO $ (slaveLost s) sd si
executorLostFun <- wrapSchedulerExecutorLost $ \sdp eip sip st -> runManaged $ do
let sd = SchedulerDriver sdp
ei <- unmarshal eip
si <- unmarshal sip
liftIO $ (executorLost s) sd ei si (toEnum $ fromIntegral st)
errorFun <- wrapSchedulerError $ \sdp ptr c -> do
let sd = SchedulerDriver sdp
bs <- packCStringLen (ptr, fromIntegral c)
(errorMessage s) sd bs
schedulerPtr <- c_createScheduler registeredFun reRegisteredFun disconnectedFun resourceOffersFun offerRescindedFun statusUpdateFun frameworkMessageFun slaveLostFun executorLostFun errorFun
return $ Scheduler schedulerPtr registeredFun reRegisteredFun disconnectedFun resourceOffersFun offerRescindedFun statusUpdateFun frameworkMessageFun slaveLostFun executorLostFun errorFun
destroyScheduler :: Scheduler -> IO ()
destroyScheduler s = do
c_destroyScheduler $ schedulerImpl s
freeHaskellFunPtr $ rawSchedulerRegistered s
freeHaskellFunPtr $ rawSchedulerReRegistered s
freeHaskellFunPtr $ rawSchedulerDisconnected s
freeHaskellFunPtr $ rawSchedulerResourceOffers s
freeHaskellFunPtr $ rawSchedulerOfferRescinded s
freeHaskellFunPtr $ rawSchedulerStatusUpdate s
freeHaskellFunPtr $ rawSchedulerFrameworkMessage s
freeHaskellFunPtr $ rawSchedulerSlaveLost s
freeHaskellFunPtr $ rawSchedulerExecutorLost s
freeHaskellFunPtr $ rawSchedulerError s
withDriver :: (SchedulerDriverPtr -> IO CInt) -> SchedulerDriver -> IO Status
withDriver f (SchedulerDriver p) = fmap (toEnum . fromIntegral) $ f p
withSchedulerDriver :: ToScheduler a => a
-> FrameworkInfo
-> ByteString
-> Maybe Credential
-> (SchedulerDriver -> IO b)
-> IO b
withSchedulerDriver s i h c f = do
scheduler <- createScheduler s
driver <- createDriver scheduler i h c
result <- f driver
destroyDriver driver
destroyScheduler scheduler
return result
createDriver :: Scheduler -> FrameworkInfo -> ByteString -> Maybe Credential -> IO SchedulerDriver
createDriver s i h mc = with (cppValue i) $ \fiP ->
with (cstring h) $ \(hp, hLen) ->
fmap SchedulerDriver $ case mc of
Nothing -> c_createSchedulerDriver (schedulerImpl s) fiP hp (fromIntegral hLen)
Just c -> with (cppValue c) $ \cp -> do
c_createSchedulerDriverWithCredentials (schedulerImpl s) fiP hp (fromIntegral hLen) cp
destroyDriver :: SchedulerDriver -> IO ()
destroyDriver = c_destroySchedulerDriver . fromSchedulerDriver
start :: SchedulerDriver -> IO Status
start = withDriver c_startSchedulerDriver
stop :: SchedulerDriver
-> Bool
-> IO Status
stop d f = withDriver (\p -> c_stopSchedulerDriver p fi) d
where
fi = if f then 1 else 0
abort :: SchedulerDriver -> IO Status
abort = withDriver c_abortSchedulerDriver
await :: SchedulerDriver -> IO Status
await = withDriver c_joinSchedulerDriver
run :: SchedulerDriver -> IO Status
run = withDriver c_runSchedulerDriver
requestResources :: SchedulerDriver -> [Request] -> IO Status
requestResources (SchedulerDriver p) rs = do
fmap (toEnum . fromIntegral) $ with (mapM cppValue rs >>= arrayLen) $ \(rp, l) -> do
c_requestResources p rp $ fromIntegral l
launchTasks :: SchedulerDriver -> [OfferID] -> [TaskInfo] -> Filters -> IO Status
launchTasks (SchedulerDriver p) os ts f = with (cppValue f) $ \fp ->
with (mapM cppValue os >>= arrayLen) $ \(op, ol) ->
with (mapM cppValue ts >>= arrayLen) $ \(tp, tl) -> do
res <- c_launchTasks p op (fromIntegral ol) tp (fromIntegral tl) fp
return $ toEnum $ fromIntegral res
killTask :: SchedulerDriver -> TaskID -> IO Status
killTask (SchedulerDriver p) t = with (cppValue t) $ \tid -> do
res <- c_killTask p tid
return $ toEnum $ fromIntegral res
declineOffer :: SchedulerDriver -> OfferID -> Filters -> IO Status
declineOffer (SchedulerDriver p) o f = with (cppValue o) $ \oid ->
with (cppValue f) $ \fp -> do
res <- c_declineOffer p oid fp
return $ toEnum $ fromIntegral res
reviveOffers :: SchedulerDriver -> IO Status
reviveOffers = withDriver c_reviveOffers
sendFrameworkMessage :: SchedulerDriver -> ExecutorID -> SlaveID -> ByteString -> IO Status
sendFrameworkMessage (SchedulerDriver p) e s bs = with (cppValue e) $ \ep -> with (cppValue s) $ \sp ->
with (cstring bs) $ \(strp, l) -> do
res <-c_sendFrameworkMessage p ep sp strp (fromIntegral l)
return $ toEnum $ fromIntegral res
reconcileTasks :: SchedulerDriver -> [TaskStatus] -> IO Status
reconcileTasks (SchedulerDriver p) ts = with (mapM cppValue ts >>= arrayLen) $ \(tp, l) -> do
res <- c_reconcileTasks p tp (fromIntegral l)
return $ toEnum $ fromIntegral res