module Periodic.Trans.Job
  ( JobT
  , JobEnv
  , name
  , func
  , workload
  , name_
  , func_
  , workload_
  , count
  , timeout

  , workDone
  , workDone_
  , workFail
  , schedLater
  , schedLater'

  , acquireLock
  , releaseLock

  , withLock
  , withLock_
  , module Periodic.Trans.BaseClient
  ) where

import           Control.Monad                (when)
import           Data.ByteString              (ByteString, empty)
import           Data.Int                     (Int64)
import           Data.Maybe                   (fromJust)
import           Metro.Class                  (Transport)
import           Metro.Node                   (env, request, withSessionT)
import           Metro.Session                (send)
import           Periodic.Node
import           Periodic.Trans.BaseClient    (BaseClientT, runJob, runJob_,
                                               submitJob, submitJob_)
import           Periodic.Types               (FromBS (..), LockName, getResult,
                                               packetREQ)
import           Periodic.Types.Job
import           Periodic.Types.ServerCommand (ServerCommand (Acquired))
import           Periodic.Types.WorkerCommand
import           UnliftIO                     hiding (timeout)

type JobT = BaseClientT (Maybe Job)
type JobEnv = SessionEnv (Maybe Job) ServerCommand

job :: Monad m => JobT tp m Job
job :: JobT tp m Job
job = Maybe Job -> Job
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe Job -> Job)
-> NodeT
     (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Maybe Job)
-> JobT tp m Job
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Maybe Job)
forall (m :: * -> *) u nid k rpkt tp.
Monad m =>
NodeT u nid k rpkt tp m u
env

name :: (FromBS a, Show a, Monad m) => JobT tp m a
name :: JobT tp m a
name = ByteString -> a
forall a. FromBS a => ByteString -> a
fromBS (ByteString -> a) -> (JobName -> ByteString) -> JobName -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. JobName -> ByteString
unJN (JobName -> a)
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m JobName
-> JobT tp m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m JobName
forall (m :: * -> *) tp. Monad m => JobT tp m JobName
name_

name_ :: Monad m => JobT tp m JobName
name_ :: JobT tp m JobName
name_ = Job -> JobName
getName (Job -> JobName)
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
-> JobT tp m JobName
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
forall (m :: * -> *) tp. Monad m => JobT tp m Job
job

func :: (FromBS a, Show a, Monad m) => JobT tp m a
func :: JobT tp m a
func = ByteString -> a
forall a. FromBS a => ByteString -> a
fromBS (ByteString -> a) -> (FuncName -> ByteString) -> FuncName -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FuncName -> ByteString
unFN (FuncName -> a)
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m FuncName
-> JobT tp m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m FuncName
forall (m :: * -> *) tp. Monad m => JobT tp m FuncName
func_

func_ :: Monad m => JobT tp m FuncName
func_ :: JobT tp m FuncName
func_ = Job -> FuncName
getFuncName (Job -> FuncName)
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
-> JobT tp m FuncName
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
forall (m :: * -> *) tp. Monad m => JobT tp m Job
job

workload :: (FromBS a, Show a, Monad m) => JobT tp m a
workload :: JobT tp m a
workload = ByteString -> a
forall a. FromBS a => ByteString -> a
fromBS (ByteString -> a) -> (Workload -> ByteString) -> Workload -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Workload -> ByteString
unWL (Workload -> a)
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Workload
-> JobT tp m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Workload
forall (m :: * -> *) tp. Monad m => JobT tp m Workload
workload_

workload_ :: Monad m => JobT tp m Workload
workload_ :: JobT tp m Workload
workload_ = Job -> Workload
getWorkload (Job -> Workload)
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
-> JobT tp m Workload
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
forall (m :: * -> *) tp. Monad m => JobT tp m Job
job

count :: Monad m => JobT tp m Int
count :: JobT tp m Int
count = Job -> Int
getCount (Job -> Int)
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
-> JobT tp m Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
forall (m :: * -> *) tp. Monad m => JobT tp m Job
job

timeout :: Monad m => JobT tp m Int
timeout :: JobT tp m Int
timeout = Job -> Int
getTimeout (Job -> Int)
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
-> JobT tp m Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
forall (m :: * -> *) tp. Monad m => JobT tp m Job
job

workDone
  :: (MonadUnliftIO m, Transport tp)
  => JobT tp m ()
workDone :: JobT tp m ()
workDone = ByteString -> JobT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
ByteString -> JobT tp m ()
workDone_ ByteString
empty

workDone_
  :: (MonadUnliftIO m, Transport tp)
  => ByteString -> JobT tp m ()
workDone_ :: ByteString -> JobT tp m ()
workDone_ w :: ByteString
w = do
  JobHandle
h <- Job -> JobHandle
getHandle (Job -> JobHandle)
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
-> NodeT
     (Maybe Job) Nid Msgid (Packet ServerCommand) tp m JobHandle
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
forall (m :: * -> *) tp. Monad m => JobT tp m Job
job
  Maybe Int64
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) k u nid rpkt tp a.
(MonadUnliftIO m, Eq k, Hashable k) =>
Maybe Int64
-> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
withSessionT Maybe Int64
forall a. Maybe a
Nothing (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
 -> JobT tp m ())
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
 SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (WorkerCommand -> Packet WorkerCommand
forall a. a -> Packet a
packetREQ (WorkerCommand -> Packet WorkerCommand)
-> WorkerCommand -> Packet WorkerCommand
forall a b. (a -> b) -> a -> b
$ JobHandle -> ByteString -> WorkerCommand
WorkDone JobHandle
h ByteString
w)

workFail
  :: (MonadUnliftIO m, Transport tp)
  =>  JobT tp m ()
workFail :: JobT tp m ()
workFail = do
  JobHandle
h <- Job -> JobHandle
getHandle (Job -> JobHandle)
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
-> NodeT
     (Maybe Job) Nid Msgid (Packet ServerCommand) tp m JobHandle
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
forall (m :: * -> *) tp. Monad m => JobT tp m Job
job
  Maybe Int64
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) k u nid rpkt tp a.
(MonadUnliftIO m, Eq k, Hashable k) =>
Maybe Int64
-> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
withSessionT Maybe Int64
forall a. Maybe a
Nothing (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
 -> JobT tp m ())
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
 SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (WorkerCommand -> Packet WorkerCommand
forall a. a -> Packet a
packetREQ (WorkerCommand -> Packet WorkerCommand)
-> WorkerCommand -> Packet WorkerCommand
forall a b. (a -> b) -> a -> b
$ JobHandle -> WorkerCommand
WorkFail JobHandle
h)

schedLater
  :: (MonadUnliftIO m, Transport tp)
  =>  Int64 -> JobT tp m ()
schedLater :: Int64 -> JobT tp m ()
schedLater later :: Int64
later = do
  JobHandle
h <- Job -> JobHandle
getHandle (Job -> JobHandle)
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
-> NodeT
     (Maybe Job) Nid Msgid (Packet ServerCommand) tp m JobHandle
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
forall (m :: * -> *) tp. Monad m => JobT tp m Job
job
  Maybe Int64
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) k u nid rpkt tp a.
(MonadUnliftIO m, Eq k, Hashable k) =>
Maybe Int64
-> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
withSessionT Maybe Int64
forall a. Maybe a
Nothing (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
 -> JobT tp m ())
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
 SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (WorkerCommand -> Packet WorkerCommand
forall a. a -> Packet a
packetREQ (WorkerCommand -> Packet WorkerCommand)
-> WorkerCommand -> Packet WorkerCommand
forall a b. (a -> b) -> a -> b
$ JobHandle -> Int64 -> Int -> WorkerCommand
SchedLater JobHandle
h Int64
later 0)

schedLater'
  :: (MonadUnliftIO m, Transport tp)
  =>  Int64 -> Int -> JobT tp m ()
schedLater' :: Int64 -> Int -> JobT tp m ()
schedLater' later :: Int64
later step :: Int
step = do
  JobHandle
h <- Job -> JobHandle
getHandle (Job -> JobHandle)
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
-> NodeT
     (Maybe Job) Nid Msgid (Packet ServerCommand) tp m JobHandle
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
forall (m :: * -> *) tp. Monad m => JobT tp m Job
job
  Maybe Int64
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) k u nid rpkt tp a.
(MonadUnliftIO m, Eq k, Hashable k) =>
Maybe Int64
-> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
withSessionT Maybe Int64
forall a. Maybe a
Nothing (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
 -> JobT tp m ())
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
 SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (WorkerCommand -> Packet WorkerCommand
forall a. a -> Packet a
packetREQ (WorkerCommand -> Packet WorkerCommand)
-> WorkerCommand -> Packet WorkerCommand
forall a b. (a -> b) -> a -> b
$ JobHandle -> Int64 -> Int -> WorkerCommand
SchedLater JobHandle
h Int64
later Int
step)

acquireLock
  :: (MonadUnliftIO m, Transport tp)
  => LockName -> Int -> JobT tp m Bool
acquireLock :: LockName -> Int -> JobT tp m Bool
acquireLock n :: LockName
n maxCount :: Int
maxCount = do
  JobHandle
h <- Job -> JobHandle
getHandle (Job -> JobHandle)
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
-> NodeT
     (Maybe Job) Nid Msgid (Packet ServerCommand) tp m JobHandle
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
forall (m :: * -> *) tp. Monad m => JobT tp m Job
job
  Bool
-> (ServerCommand -> Bool) -> Maybe (Packet ServerCommand) -> Bool
forall a b. a -> (b -> a) -> Maybe (Packet b) -> a
getResult Bool
False ServerCommand -> Bool
getAcq (Maybe (Packet ServerCommand) -> Bool)
-> NodeT
     (Maybe Job)
     Nid
     Msgid
     (Packet ServerCommand)
     tp
     m
     (Maybe (Packet ServerCommand))
-> JobT tp m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Int64
-> Packet WorkerCommand
-> NodeT
     (Maybe Job)
     Nid
     Msgid
     (Packet ServerCommand)
     tp
     m
     (Maybe (Packet ServerCommand))
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
 SetPacketId k spkt, Eq k, Hashable k) =>
Maybe Int64 -> spkt -> NodeT u nid k rpkt tp m (Maybe rpkt)
request Maybe Int64
forall a. Maybe a
Nothing (WorkerCommand -> Packet WorkerCommand
forall a. a -> Packet a
packetREQ (LockName -> Int -> JobHandle -> WorkerCommand
Acquire LockName
n Int
maxCount JobHandle
h))

  where getAcq :: ServerCommand -> Bool
        getAcq :: ServerCommand -> Bool
getAcq (Acquired v :: Bool
v) = Bool
v
        getAcq _            = Bool
False

releaseLock
  :: (MonadUnliftIO m, Transport tp)
  => LockName -> JobT tp m ()
releaseLock :: LockName -> JobT tp m ()
releaseLock n :: LockName
n = do
  JobHandle
h <- Job -> JobHandle
getHandle (Job -> JobHandle)
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
-> NodeT
     (Maybe Job) Nid Msgid (Packet ServerCommand) tp m JobHandle
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Job
forall (m :: * -> *) tp. Monad m => JobT tp m Job
job
  Maybe Int64
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) k u nid rpkt tp a.
(MonadUnliftIO m, Eq k, Hashable k) =>
Maybe Int64
-> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
withSessionT Maybe Int64
forall a. Maybe a
Nothing (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
 -> JobT tp m ())
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
 SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (WorkerCommand -> Packet WorkerCommand
forall a. a -> Packet a
packetREQ (WorkerCommand -> Packet WorkerCommand)
-> WorkerCommand -> Packet WorkerCommand
forall a b. (a -> b) -> a -> b
$ LockName -> JobHandle -> WorkerCommand
Release LockName
n JobHandle
h)

withLock_
  :: (MonadUnliftIO m, Transport tp)
  => LockName -> Int -> JobT tp m () -> JobT tp m ()
withLock_ :: LockName -> Int -> JobT tp m () -> JobT tp m ()
withLock_ n :: LockName
n maxCount :: Int
maxCount j :: JobT tp m ()
j = do
  Bool
acquired <- LockName -> Int -> JobT tp m Bool
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
LockName -> Int -> JobT tp m Bool
acquireLock LockName
n Int
maxCount
  Bool -> JobT tp m () -> JobT tp m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
acquired JobT tp m ()
j

withLock
  :: (MonadUnliftIO m, Transport tp)
  => LockName -> Int -> JobT tp m () -> JobT tp m ()
withLock :: LockName -> Int -> JobT tp m () -> JobT tp m ()
withLock n :: LockName
n maxCount :: Int
maxCount j :: JobT tp m ()
j = do
  LockName -> Int -> JobT tp m () -> JobT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
LockName -> Int -> JobT tp m () -> JobT tp m ()
withLock_ LockName
n Int
maxCount JobT tp m ()
j
  LockName -> JobT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
LockName -> JobT tp m ()
releaseLock LockName
n