module Periodic.Trans.Job ( JobT , JobEnv , name , func , workload , name_ , func_ , workload_ , count , timeout , workDone , workDone_ , workFail , schedLater , schedLater' , acquireLock , releaseLock , withLock , withLock_ , module Periodic.Trans.BaseClient ) where import Control.Monad (when) import Data.ByteString (ByteString, empty) import Data.Int (Int64) import Data.Maybe (fromJust) import Metro.Class (Transport) import Metro.Node (env, request, withSessionT) import Metro.Session (send) import Periodic.Node import Periodic.Trans.BaseClient (BaseClientT, runJob, runJob_, submitJob, submitJob_) import Periodic.Types (FromBS (..), LockName, getResult, packetREQ) import Periodic.Types.Job import Periodic.Types.ServerCommand (ServerCommand (Acquired)) import Periodic.Types.WorkerCommand import UnliftIO hiding (timeout) type JobT = BaseClientT (Maybe Job) type JobEnv = SessionEnv (Maybe Job) ServerCommand job :: Monad m => JobT tp m Job job :: JobT tp m Job job = Maybe Job -> Job forall a. HasCallStack => Maybe a -> a fromJust (Maybe Job -> Job) -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Maybe Job) -> JobT tp m Job forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Maybe Job) forall (m :: * -> *) u nid k rpkt tp. Monad m => NodeT u nid k rpkt tp m u env name :: (FromBS a, Show a, Monad m) => JobT tp m a name :: JobT tp m a name = ByteString -> a forall a. FromBS a => ByteString -> a fromBS (ByteString -> a) -> (JobName -> ByteString) -> JobName -> a forall b c a. (b -> c) -> (a -> b) -> a -> c . JobName -> ByteString unJN (JobName -> a) -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m JobName -> JobT tp m a forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m JobName forall (m :: * -> *) tp. Monad m => JobT tp m JobName name_ name_ :: Monad m => JobT tp m JobName name_ :: JobT tp m JobName name_ = Job -> JobName getName (Job -> JobName) -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job -> JobT tp m JobName forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job forall (m :: * -> *) tp. Monad m => JobT tp m Job job func :: (FromBS a, Show a, Monad m) => JobT tp m a func :: JobT tp m a func = ByteString -> a forall a. FromBS a => ByteString -> a fromBS (ByteString -> a) -> (FuncName -> ByteString) -> FuncName -> a forall b c a. (b -> c) -> (a -> b) -> a -> c . FuncName -> ByteString unFN (FuncName -> a) -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m FuncName -> JobT tp m a forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m FuncName forall (m :: * -> *) tp. Monad m => JobT tp m FuncName func_ func_ :: Monad m => JobT tp m FuncName func_ :: JobT tp m FuncName func_ = Job -> FuncName getFuncName (Job -> FuncName) -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job -> JobT tp m FuncName forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job forall (m :: * -> *) tp. Monad m => JobT tp m Job job workload :: (FromBS a, Show a, Monad m) => JobT tp m a workload :: JobT tp m a workload = ByteString -> a forall a. FromBS a => ByteString -> a fromBS (ByteString -> a) -> (Workload -> ByteString) -> Workload -> a forall b c a. (b -> c) -> (a -> b) -> a -> c . Workload -> ByteString unWL (Workload -> a) -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Workload -> JobT tp m a forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Workload forall (m :: * -> *) tp. Monad m => JobT tp m Workload workload_ workload_ :: Monad m => JobT tp m Workload workload_ :: JobT tp m Workload workload_ = Job -> Workload getWorkload (Job -> Workload) -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job -> JobT tp m Workload forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job forall (m :: * -> *) tp. Monad m => JobT tp m Job job count :: Monad m => JobT tp m Int count :: JobT tp m Int count = Job -> Int getCount (Job -> Int) -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job -> JobT tp m Int forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job forall (m :: * -> *) tp. Monad m => JobT tp m Job job timeout :: Monad m => JobT tp m Int timeout :: JobT tp m Int timeout = Job -> Int getTimeout (Job -> Int) -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job -> JobT tp m Int forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job forall (m :: * -> *) tp. Monad m => JobT tp m Job job workDone :: (MonadUnliftIO m, Transport tp) => JobT tp m () workDone :: JobT tp m () workDone = ByteString -> JobT tp m () forall (m :: * -> *) tp. (MonadUnliftIO m, Transport tp) => ByteString -> JobT tp m () workDone_ ByteString empty workDone_ :: (MonadUnliftIO m, Transport tp) => ByteString -> JobT tp m () workDone_ :: ByteString -> JobT tp m () workDone_ w :: ByteString w = do JobHandle h <- Job -> JobHandle getHandle (Job -> JobHandle) -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m JobHandle forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job forall (m :: * -> *) tp. Monad m => JobT tp m Job job Maybe Int64 -> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () -> JobT tp m () forall (m :: * -> *) k u nid rpkt tp a. (MonadUnliftIO m, Eq k, Hashable k) => Maybe Int64 -> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a withSessionT Maybe Int64 forall a. Maybe a Nothing (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () -> JobT tp m ()) -> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () -> JobT tp m () forall a b. (a -> b) -> a -> b $ Packet WorkerCommand -> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () forall (m :: * -> *) tp spkt k u nid rpkt. (MonadUnliftIO m, Transport tp, SendPacket spkt, SetPacketId k spkt) => spkt -> SessionT u nid k rpkt tp m () send (WorkerCommand -> Packet WorkerCommand forall a. a -> Packet a packetREQ (WorkerCommand -> Packet WorkerCommand) -> WorkerCommand -> Packet WorkerCommand forall a b. (a -> b) -> a -> b $ JobHandle -> ByteString -> WorkerCommand WorkDone JobHandle h ByteString w) workFail :: (MonadUnliftIO m, Transport tp) => JobT tp m () workFail :: JobT tp m () workFail = do JobHandle h <- Job -> JobHandle getHandle (Job -> JobHandle) -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m JobHandle forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job forall (m :: * -> *) tp. Monad m => JobT tp m Job job Maybe Int64 -> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () -> JobT tp m () forall (m :: * -> *) k u nid rpkt tp a. (MonadUnliftIO m, Eq k, Hashable k) => Maybe Int64 -> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a withSessionT Maybe Int64 forall a. Maybe a Nothing (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () -> JobT tp m ()) -> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () -> JobT tp m () forall a b. (a -> b) -> a -> b $ Packet WorkerCommand -> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () forall (m :: * -> *) tp spkt k u nid rpkt. (MonadUnliftIO m, Transport tp, SendPacket spkt, SetPacketId k spkt) => spkt -> SessionT u nid k rpkt tp m () send (WorkerCommand -> Packet WorkerCommand forall a. a -> Packet a packetREQ (WorkerCommand -> Packet WorkerCommand) -> WorkerCommand -> Packet WorkerCommand forall a b. (a -> b) -> a -> b $ JobHandle -> WorkerCommand WorkFail JobHandle h) schedLater :: (MonadUnliftIO m, Transport tp) => Int64 -> JobT tp m () schedLater :: Int64 -> JobT tp m () schedLater later :: Int64 later = do JobHandle h <- Job -> JobHandle getHandle (Job -> JobHandle) -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m JobHandle forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job forall (m :: * -> *) tp. Monad m => JobT tp m Job job Maybe Int64 -> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () -> JobT tp m () forall (m :: * -> *) k u nid rpkt tp a. (MonadUnliftIO m, Eq k, Hashable k) => Maybe Int64 -> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a withSessionT Maybe Int64 forall a. Maybe a Nothing (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () -> JobT tp m ()) -> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () -> JobT tp m () forall a b. (a -> b) -> a -> b $ Packet WorkerCommand -> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () forall (m :: * -> *) tp spkt k u nid rpkt. (MonadUnliftIO m, Transport tp, SendPacket spkt, SetPacketId k spkt) => spkt -> SessionT u nid k rpkt tp m () send (WorkerCommand -> Packet WorkerCommand forall a. a -> Packet a packetREQ (WorkerCommand -> Packet WorkerCommand) -> WorkerCommand -> Packet WorkerCommand forall a b. (a -> b) -> a -> b $ JobHandle -> Int64 -> Int -> WorkerCommand SchedLater JobHandle h Int64 later 0) schedLater' :: (MonadUnliftIO m, Transport tp) => Int64 -> Int -> JobT tp m () schedLater' :: Int64 -> Int -> JobT tp m () schedLater' later :: Int64 later step :: Int step = do JobHandle h <- Job -> JobHandle getHandle (Job -> JobHandle) -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m JobHandle forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job forall (m :: * -> *) tp. Monad m => JobT tp m Job job Maybe Int64 -> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () -> JobT tp m () forall (m :: * -> *) k u nid rpkt tp a. (MonadUnliftIO m, Eq k, Hashable k) => Maybe Int64 -> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a withSessionT Maybe Int64 forall a. Maybe a Nothing (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () -> JobT tp m ()) -> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () -> JobT tp m () forall a b. (a -> b) -> a -> b $ Packet WorkerCommand -> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () forall (m :: * -> *) tp spkt k u nid rpkt. (MonadUnliftIO m, Transport tp, SendPacket spkt, SetPacketId k spkt) => spkt -> SessionT u nid k rpkt tp m () send (WorkerCommand -> Packet WorkerCommand forall a. a -> Packet a packetREQ (WorkerCommand -> Packet WorkerCommand) -> WorkerCommand -> Packet WorkerCommand forall a b. (a -> b) -> a -> b $ JobHandle -> Int64 -> Int -> WorkerCommand SchedLater JobHandle h Int64 later Int step) acquireLock :: (MonadUnliftIO m, Transport tp) => LockName -> Int -> JobT tp m Bool acquireLock :: LockName -> Int -> JobT tp m Bool acquireLock n :: LockName n maxCount :: Int maxCount = do JobHandle h <- Job -> JobHandle getHandle (Job -> JobHandle) -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m JobHandle forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job forall (m :: * -> *) tp. Monad m => JobT tp m Job job Bool -> (ServerCommand -> Bool) -> Maybe (Packet ServerCommand) -> Bool forall a b. a -> (b -> a) -> Maybe (Packet b) -> a getResult Bool False ServerCommand -> Bool getAcq (Maybe (Packet ServerCommand) -> Bool) -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Maybe (Packet ServerCommand)) -> JobT tp m Bool forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> Maybe Int64 -> Packet WorkerCommand -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Maybe (Packet ServerCommand)) forall (m :: * -> *) tp spkt k u nid rpkt. (MonadUnliftIO m, Transport tp, SendPacket spkt, SetPacketId k spkt, Eq k, Hashable k) => Maybe Int64 -> spkt -> NodeT u nid k rpkt tp m (Maybe rpkt) request Maybe Int64 forall a. Maybe a Nothing (WorkerCommand -> Packet WorkerCommand forall a. a -> Packet a packetREQ (LockName -> Int -> JobHandle -> WorkerCommand Acquire LockName n Int maxCount JobHandle h)) where getAcq :: ServerCommand -> Bool getAcq :: ServerCommand -> Bool getAcq (Acquired v :: Bool v) = Bool v getAcq _ = Bool False releaseLock :: (MonadUnliftIO m, Transport tp) => LockName -> JobT tp m () releaseLock :: LockName -> JobT tp m () releaseLock n :: LockName n = do JobHandle h <- Job -> JobHandle getHandle (Job -> JobHandle) -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m JobHandle forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job forall (m :: * -> *) tp. Monad m => JobT tp m Job job Maybe Int64 -> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () -> JobT tp m () forall (m :: * -> *) k u nid rpkt tp a. (MonadUnliftIO m, Eq k, Hashable k) => Maybe Int64 -> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a withSessionT Maybe Int64 forall a. Maybe a Nothing (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () -> JobT tp m ()) -> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () -> JobT tp m () forall a b. (a -> b) -> a -> b $ Packet WorkerCommand -> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m () forall (m :: * -> *) tp spkt k u nid rpkt. (MonadUnliftIO m, Transport tp, SendPacket spkt, SetPacketId k spkt) => spkt -> SessionT u nid k rpkt tp m () send (WorkerCommand -> Packet WorkerCommand forall a. a -> Packet a packetREQ (WorkerCommand -> Packet WorkerCommand) -> WorkerCommand -> Packet WorkerCommand forall a b. (a -> b) -> a -> b $ LockName -> JobHandle -> WorkerCommand Release LockName n JobHandle h) withLock_ :: (MonadUnliftIO m, Transport tp) => LockName -> Int -> JobT tp m () -> JobT tp m () withLock_ :: LockName -> Int -> JobT tp m () -> JobT tp m () withLock_ n :: LockName n maxCount :: Int maxCount j :: JobT tp m () j = do Bool acquired <- LockName -> Int -> JobT tp m Bool forall (m :: * -> *) tp. (MonadUnliftIO m, Transport tp) => LockName -> Int -> JobT tp m Bool acquireLock LockName n Int maxCount Bool -> JobT tp m () -> JobT tp m () forall (f :: * -> *). Applicative f => Bool -> f () -> f () when Bool acquired JobT tp m () j withLock :: (MonadUnliftIO m, Transport tp) => LockName -> Int -> JobT tp m () -> JobT tp m () withLock :: LockName -> Int -> JobT tp m () -> JobT tp m () withLock n :: LockName n maxCount :: Int maxCount j :: JobT tp m () j = do LockName -> Int -> JobT tp m () -> JobT tp m () forall (m :: * -> *) tp. (MonadUnliftIO m, Transport tp) => LockName -> Int -> JobT tp m () -> JobT tp m () withLock_ LockName n Int maxCount JobT tp m () j LockName -> JobT tp m () forall (m :: * -> *) tp. (MonadUnliftIO m, Transport tp) => LockName -> JobT tp m () releaseLock LockName n