{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
module Periodic.Trans.Worker
( WorkerT
, startWorkerT
, ping
, addFunc
, broadcast
, removeFunc
, work
, close
, runJobT
, getClientEnv
) where
import Control.Monad (forever, replicateM, void, when)
import Control.Monad.Reader.Class (MonadReader, asks)
import Control.Monad.Trans.Class (MonadTrans, lift)
import Control.Monad.Trans.Reader (ReaderT (..), runReaderT)
import Metro.Class (Transport, TransportConfig,
getPacketId)
import Metro.Conn (initConnEnv, runConnT)
import qualified Metro.Conn as Conn
import Metro.IOHashMap (IOHashMap, newIOHashMap)
import qualified Metro.IOHashMap as HM (delete, insert, lookup)
import Metro.Node (NodeMode (..), SessionMode (..),
initEnv1, newSessionEnv,
nextSessionId, runSessionT_,
setDefaultSessionTimeout,
setNodeMode, setSessionMode,
startNodeT_, withEnv,
withSessionT)
import Metro.Session (send)
import Periodic.IOList (IOList, newIOList)
import qualified Periodic.IOList as IL (append, clearSTM,
toListSTM)
import Periodic.Node
import qualified Periodic.Trans.BaseClient as BT (BaseClientEnv, checkHealth,
close, getClientEnv, ping)
import Periodic.Trans.Job (JobT, func_, name, workFail)
import Periodic.Types (ClientType (TypeWorker), Msgid,
Packet, getClientType, getResult,
packetREQ, regPacketREQ)
import Periodic.Types.Job
import Periodic.Types.ServerCommand
import Periodic.Types.WorkerCommand
import System.Log.Logger (errorM)
import UnliftIO
import UnliftIO.Concurrent (threadDelay)
type TaskList tp m = IOHashMap FuncName (JobT tp m ())
type JobList = IOList (Msgid, Job)
data WorkerEnv tp m = WorkerEnv
{ WorkerEnv tp m -> TaskList tp m
taskList :: TaskList tp m
, WorkerEnv tp m -> JobList
jobList :: JobList
, WorkerEnv tp m -> TVar Int
taskSize :: TVar Int
}
newtype WorkerT tp m a = WorkerT {WorkerT tp m a -> ReaderT (WorkerEnv tp m) (JobT tp m) a
unWorkerT :: ReaderT (WorkerEnv tp m) (JobT tp m) a}
deriving
( a -> WorkerT tp m b -> WorkerT tp m a
(a -> b) -> WorkerT tp m a -> WorkerT tp m b
(forall a b. (a -> b) -> WorkerT tp m a -> WorkerT tp m b)
-> (forall a b. a -> WorkerT tp m b -> WorkerT tp m a)
-> Functor (WorkerT tp m)
forall a b. a -> WorkerT tp m b -> WorkerT tp m a
forall a b. (a -> b) -> WorkerT tp m a -> WorkerT tp m b
forall tp (m :: * -> *) a b.
Functor m =>
a -> WorkerT tp m b -> WorkerT tp m a
forall tp (m :: * -> *) a b.
Functor m =>
(a -> b) -> WorkerT tp m a -> WorkerT tp m b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> WorkerT tp m b -> WorkerT tp m a
$c<$ :: forall tp (m :: * -> *) a b.
Functor m =>
a -> WorkerT tp m b -> WorkerT tp m a
fmap :: (a -> b) -> WorkerT tp m a -> WorkerT tp m b
$cfmap :: forall tp (m :: * -> *) a b.
Functor m =>
(a -> b) -> WorkerT tp m a -> WorkerT tp m b
Functor
, Functor (WorkerT tp m)
a -> WorkerT tp m a
Functor (WorkerT tp m) =>
(forall a. a -> WorkerT tp m a)
-> (forall a b.
WorkerT tp m (a -> b) -> WorkerT tp m a -> WorkerT tp m b)
-> (forall a b c.
(a -> b -> c)
-> WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m c)
-> (forall a b. WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b)
-> (forall a b. WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m a)
-> Applicative (WorkerT tp m)
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m a
WorkerT tp m (a -> b) -> WorkerT tp m a -> WorkerT tp m b
(a -> b -> c) -> WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m c
forall a. a -> WorkerT tp m a
forall a b. WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m a
forall a b. WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
forall a b.
WorkerT tp m (a -> b) -> WorkerT tp m a -> WorkerT tp m b
forall a b c.
(a -> b -> c) -> WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m c
forall tp (m :: * -> *). Applicative m => Functor (WorkerT tp m)
forall tp (m :: * -> *) a. Applicative m => a -> WorkerT tp m a
forall tp (m :: * -> *) a b.
Applicative m =>
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m a
forall tp (m :: * -> *) a b.
Applicative m =>
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
forall tp (m :: * -> *) a b.
Applicative m =>
WorkerT tp m (a -> b) -> WorkerT tp m a -> WorkerT tp m b
forall tp (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c) -> WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m a
$c<* :: forall tp (m :: * -> *) a b.
Applicative m =>
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m a
*> :: WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
$c*> :: forall tp (m :: * -> *) a b.
Applicative m =>
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
liftA2 :: (a -> b -> c) -> WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m c
$cliftA2 :: forall tp (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c) -> WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m c
<*> :: WorkerT tp m (a -> b) -> WorkerT tp m a -> WorkerT tp m b
$c<*> :: forall tp (m :: * -> *) a b.
Applicative m =>
WorkerT tp m (a -> b) -> WorkerT tp m a -> WorkerT tp m b
pure :: a -> WorkerT tp m a
$cpure :: forall tp (m :: * -> *) a. Applicative m => a -> WorkerT tp m a
$cp1Applicative :: forall tp (m :: * -> *). Applicative m => Functor (WorkerT tp m)
Applicative
, Applicative (WorkerT tp m)
a -> WorkerT tp m a
Applicative (WorkerT tp m) =>
(forall a b.
WorkerT tp m a -> (a -> WorkerT tp m b) -> WorkerT tp m b)
-> (forall a b. WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b)
-> (forall a. a -> WorkerT tp m a)
-> Monad (WorkerT tp m)
WorkerT tp m a -> (a -> WorkerT tp m b) -> WorkerT tp m b
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
forall a. a -> WorkerT tp m a
forall a b. WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
forall a b.
WorkerT tp m a -> (a -> WorkerT tp m b) -> WorkerT tp m b
forall tp (m :: * -> *). Monad m => Applicative (WorkerT tp m)
forall tp (m :: * -> *) a. Monad m => a -> WorkerT tp m a
forall tp (m :: * -> *) a b.
Monad m =>
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
forall tp (m :: * -> *) a b.
Monad m =>
WorkerT tp m a -> (a -> WorkerT tp m b) -> WorkerT tp m b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: a -> WorkerT tp m a
$creturn :: forall tp (m :: * -> *) a. Monad m => a -> WorkerT tp m a
>> :: WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
$c>> :: forall tp (m :: * -> *) a b.
Monad m =>
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
>>= :: WorkerT tp m a -> (a -> WorkerT tp m b) -> WorkerT tp m b
$c>>= :: forall tp (m :: * -> *) a b.
Monad m =>
WorkerT tp m a -> (a -> WorkerT tp m b) -> WorkerT tp m b
$cp1Monad :: forall tp (m :: * -> *). Monad m => Applicative (WorkerT tp m)
Monad
, Monad (WorkerT tp m)
Monad (WorkerT tp m) =>
(forall a. IO a -> WorkerT tp m a) -> MonadIO (WorkerT tp m)
IO a -> WorkerT tp m a
forall a. IO a -> WorkerT tp m a
forall tp (m :: * -> *). MonadIO m => Monad (WorkerT tp m)
forall tp (m :: * -> *) a. MonadIO m => IO a -> WorkerT tp m a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
liftIO :: IO a -> WorkerT tp m a
$cliftIO :: forall tp (m :: * -> *) a. MonadIO m => IO a -> WorkerT tp m a
$cp1MonadIO :: forall tp (m :: * -> *). MonadIO m => Monad (WorkerT tp m)
MonadIO
, MonadReader (WorkerEnv tp m)
)
instance MonadUnliftIO m => MonadUnliftIO (WorkerT tp m) where
withRunInIO :: ((forall a. WorkerT tp m a -> IO a) -> IO b) -> WorkerT tp m b
withRunInIO inner :: (forall a. WorkerT tp m a -> IO a) -> IO b
inner = ReaderT (WorkerEnv tp m) (JobT tp m) b -> WorkerT tp m b
forall tp (m :: * -> *) a.
ReaderT (WorkerEnv tp m) (JobT tp m) a -> WorkerT tp m a
WorkerT (ReaderT (WorkerEnv tp m) (JobT tp m) b -> WorkerT tp m b)
-> ReaderT (WorkerEnv tp m) (JobT tp m) b -> WorkerT tp m b
forall a b. (a -> b) -> a -> b
$
(WorkerEnv tp m -> JobT tp m b)
-> ReaderT (WorkerEnv tp m) (JobT tp m) b
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT ((WorkerEnv tp m -> JobT tp m b)
-> ReaderT (WorkerEnv tp m) (JobT tp m) b)
-> (WorkerEnv tp m -> JobT tp m b)
-> ReaderT (WorkerEnv tp m) (JobT tp m) b
forall a b. (a -> b) -> a -> b
$ \r :: WorkerEnv tp m
r ->
((forall a. JobT tp m a -> IO a) -> IO b) -> JobT tp m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. JobT tp m a -> IO a) -> IO b) -> JobT tp m b)
-> ((forall a. JobT tp m a -> IO a) -> IO b) -> JobT tp m b
forall a b. (a -> b) -> a -> b
$ \run :: forall a. JobT tp m a -> IO a
run ->
(forall a. WorkerT tp m a -> IO a) -> IO b
inner (JobT tp m a -> IO a
forall a. JobT tp m a -> IO a
run (JobT tp m a -> IO a)
-> (WorkerT tp m a -> JobT tp m a) -> WorkerT tp m a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WorkerEnv tp m -> WorkerT tp m a -> JobT tp m a
forall tp (m :: * -> *) a.
WorkerEnv tp m -> WorkerT tp m a -> JobT tp m a
runWorkerT WorkerEnv tp m
r)
instance MonadTrans (WorkerT tp) where
lift :: m a -> WorkerT tp m a
lift = ReaderT (WorkerEnv tp m) (JobT tp m) a -> WorkerT tp m a
forall tp (m :: * -> *) a.
ReaderT (WorkerEnv tp m) (JobT tp m) a -> WorkerT tp m a
WorkerT (ReaderT (WorkerEnv tp m) (JobT tp m) a -> WorkerT tp m a)
-> (m a -> ReaderT (WorkerEnv tp m) (JobT tp m) a)
-> m a
-> WorkerT tp m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m a
-> ReaderT (WorkerEnv tp m) (JobT tp m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m a
-> ReaderT (WorkerEnv tp m) (JobT tp m) a)
-> (m a
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m a)
-> m a
-> ReaderT (WorkerEnv tp m) (JobT tp m) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift
runWorkerT :: WorkerEnv tp m -> WorkerT tp m a -> JobT tp m a
runWorkerT :: WorkerEnv tp m -> WorkerT tp m a -> JobT tp m a
runWorkerT workerEnv :: WorkerEnv tp m
workerEnv = (ReaderT (WorkerEnv tp m) (JobT tp m) a
-> WorkerEnv tp m -> JobT tp m a)
-> WorkerEnv tp m
-> ReaderT (WorkerEnv tp m) (JobT tp m) a
-> JobT tp m a
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT (WorkerEnv tp m) (JobT tp m) a
-> WorkerEnv tp m -> JobT tp m a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT WorkerEnv tp m
workerEnv (ReaderT (WorkerEnv tp m) (JobT tp m) a -> JobT tp m a)
-> (WorkerT tp m a -> ReaderT (WorkerEnv tp m) (JobT tp m) a)
-> WorkerT tp m a
-> JobT tp m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WorkerT tp m a -> ReaderT (WorkerEnv tp m) (JobT tp m) a
forall tp (m :: * -> *) a.
WorkerT tp m a -> ReaderT (WorkerEnv tp m) (JobT tp m) a
unWorkerT
startWorkerT
:: (MonadUnliftIO m, Transport tp)
=> TransportConfig tp -> WorkerT tp m () -> m ()
startWorkerT :: TransportConfig tp -> WorkerT tp m () -> m ()
startWorkerT config :: TransportConfig tp
config m :: WorkerT tp m ()
m = do
ConnEnv tp
connEnv <- TransportConfig tp -> m (ConnEnv tp)
forall (m :: * -> *) tp.
(MonadIO m, Transport tp) =>
TransportConfig tp -> m (ConnEnv tp)
initConnEnv TransportConfig tp
config
RegPacket ServerCommand
r <- ConnEnv tp
-> ConnT tp m (RegPacket ServerCommand)
-> m (RegPacket ServerCommand)
forall tp (m :: * -> *) a. ConnEnv tp -> ConnT tp m a -> m a
runConnT ConnEnv tp
connEnv (ConnT tp m (RegPacket ServerCommand)
-> m (RegPacket ServerCommand))
-> ConnT tp m (RegPacket ServerCommand)
-> m (RegPacket ServerCommand)
forall a b. (a -> b) -> a -> b
$ do
RegPacket ClientType -> ConnT tp m ()
forall (m :: * -> *) tp pkt.
(MonadUnliftIO m, Transport tp, SendPacket pkt) =>
pkt -> ConnT tp m ()
Conn.send (RegPacket ClientType -> ConnT tp m ())
-> RegPacket ClientType -> ConnT tp m ()
forall a b. (a -> b) -> a -> b
$ ClientType -> RegPacket ClientType
forall a. a -> RegPacket a
regPacketREQ ClientType
TypeWorker
ConnT tp m (RegPacket ServerCommand)
forall (m :: * -> *) tp pkt.
(MonadUnliftIO m, Transport tp, RecvPacket pkt) =>
ConnT tp m pkt
Conn.receive
let nid :: ByteString
nid = case RegPacket ServerCommand -> ServerCommand
forall a. RegPacket a -> a
getClientType RegPacket ServerCommand
r of
Data v :: ByteString
v -> ByteString
v
_ -> ""
IOHashMap FuncName (JobT tp m ())
taskList <- m (IOHashMap FuncName (JobT tp m ()))
forall (m :: * -> *) a b. MonadIO m => m (IOHashMap a b)
newIOHashMap
JobList
jobList <- m JobList
forall (m :: * -> *) a. MonadIO m => m (IOList a)
newIOList
TVar Int
taskSize <- Int -> m (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO 0
NodeEnv1 (Maybe Job) Nid Msgid (Packet ServerCommand) tp
jobEnv1 <- (NodeEnv (Maybe Job) Nid Msgid (Packet ServerCommand)
-> NodeEnv (Maybe Job) Nid Msgid (Packet ServerCommand))
-> ConnEnv tp
-> Maybe Job
-> Nid
-> IO Msgid
-> m (NodeEnv1 (Maybe Job) Nid Msgid (Packet ServerCommand) tp)
forall (m :: * -> *) u nid k rpkt tp.
MonadIO m =>
(NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt)
-> ConnEnv tp -> u -> nid -> IO k -> m (NodeEnv1 u nid k rpkt tp)
initEnv1 NodeEnv (Maybe Job) Nid Msgid (Packet ServerCommand)
-> NodeEnv (Maybe Job) Nid Msgid (Packet ServerCommand)
forall u nid k rpkt. NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt
mapEnv ConnEnv tp
connEnv Maybe Job
forall a. Maybe a
Nothing (ByteString -> Nid
Nid ByteString
nid) IO Msgid
sessionGen
let wEnv :: WorkerEnv tp m
wEnv = WorkerEnv :: forall tp (m :: * -> *).
TaskList tp m -> JobList -> TVar Int -> WorkerEnv tp m
WorkerEnv {..}
NodeEnv1 (Maybe Job) Nid Msgid (Packet ServerCommand) tp
-> JobT tp m () -> m ()
forall (m :: * -> *) u rpkt tp a.
Monad m =>
NodeEnv u rpkt tp -> NodeT u rpkt tp m a -> m a
runNodeT NodeEnv1 (Maybe Job) Nid Msgid (Packet ServerCommand) tp
jobEnv1 (JobT tp m () -> m ()) -> JobT tp m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ())
-> JobT tp m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ())
-> JobT tp m ())
-> NodeT
(Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ())
-> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ JobT tp m ()
-> NodeT
(Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (JobT tp m ()
-> NodeT
(Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ()))
-> JobT tp m ()
-> NodeT
(Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ())
forall a b. (a -> b) -> a -> b
$ (Packet ServerCommand -> m Bool)
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) tp rpkt k u nid.
(MonadUnliftIO m, Transport tp, RecvPacket rpkt,
GetPacketId k rpkt, Eq k, Hashable k) =>
(rpkt -> m Bool)
-> SessionT u nid k rpkt tp m () -> NodeT u nid k rpkt tp m ()
startNodeT_ (JobList -> Packet ServerCommand -> m Bool
forall (m :: * -> *).
MonadIO m =>
JobList -> Packet ServerCommand -> m Bool
filterPacketM JobList
jobList) SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) u rpkt tp.
MonadIO m =>
SessionT u rpkt tp m ()
defaultSessionHandler
WorkerEnv tp m -> WorkerT tp m () -> JobT tp m ()
forall tp (m :: * -> *) a.
WorkerEnv tp m -> WorkerT tp m a -> JobT tp m a
runWorkerT WorkerEnv tp m
wEnv (WorkerT tp m () -> JobT tp m ())
-> WorkerT tp m () -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ do
WorkerT tp m (Async Any) -> WorkerT tp m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (WorkerT tp m (Async Any) -> WorkerT tp m ())
-> (WorkerT tp m Any -> WorkerT tp m (Async Any))
-> WorkerT tp m Any
-> WorkerT tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WorkerT tp m Any -> WorkerT tp m (Async Any)
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (WorkerT tp m Any -> WorkerT tp m ())
-> WorkerT tp m Any -> WorkerT tp m ()
forall a b. (a -> b) -> a -> b
$ WorkerT tp m () -> WorkerT tp m Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (WorkerT tp m () -> WorkerT tp m Any)
-> WorkerT tp m () -> WorkerT tp m Any
forall a b. (a -> b) -> a -> b
$ do
Int -> WorkerT tp m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (Int -> WorkerT tp m ()) -> Int -> WorkerT tp m ()
forall a b. (a -> b) -> a -> b
$ 100 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000
WorkerT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
WorkerT tp m ()
checkHealth
WorkerT tp m (Async ()) -> WorkerT tp m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (WorkerT tp m (Async ()) -> WorkerT tp m ())
-> (JobT tp m () -> WorkerT tp m (Async ()))
-> JobT tp m ()
-> WorkerT tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WorkerT tp m () -> WorkerT tp m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (WorkerT tp m () -> WorkerT tp m (Async ()))
-> (JobT tp m () -> WorkerT tp m ())
-> JobT tp m ()
-> WorkerT tp m (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. JobT tp m () -> WorkerT tp m ()
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT (JobT tp m () -> WorkerT tp m ())
-> JobT tp m () -> WorkerT tp m ()
forall a b. (a -> b) -> a -> b
$ WorkerEnv tp m -> JobT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
WorkerEnv tp m -> JobT tp m ()
processJobQueue WorkerEnv tp m
wEnv
WorkerT tp m ()
m
where mapEnv :: NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt
mapEnv =
NodeMode -> NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt
forall u nid k rpkt.
NodeMode -> NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt
setNodeMode NodeMode
Multi
(NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt)
-> (NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt)
-> NodeEnv u nid k rpkt
-> NodeEnv u nid k rpkt
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SessionMode -> NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt
forall u nid k rpkt.
SessionMode -> NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt
setSessionMode SessionMode
SingleAction
(NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt)
-> (NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt)
-> NodeEnv u nid k rpkt
-> NodeEnv u nid k rpkt
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int64 -> NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt
forall u nid k rpkt.
Int64 -> NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt
setDefaultSessionTimeout 100
filterPacketM :: MonadIO m => JobList -> Packet ServerCommand -> m Bool
filterPacketM :: JobList -> Packet ServerCommand -> m Bool
filterPacketM jl :: JobList
jl rpkt :: Packet ServerCommand
rpkt = do
case Maybe Job
-> (ServerCommand -> Maybe Job)
-> Maybe (Packet ServerCommand)
-> Maybe Job
forall a b. a -> (b -> a) -> Maybe (Packet b) -> a
getResult Maybe Job
forall a. Maybe a
Nothing ServerCommand -> Maybe Job
getAssignJob (Packet ServerCommand -> Maybe (Packet ServerCommand)
forall a. a -> Maybe a
Just Packet ServerCommand
rpkt) of
Nothing -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just job :: Job
job -> do
JobList -> (Msgid, Job) -> m ()
forall (m :: * -> *) a. MonadIO m => IOList a -> a -> m ()
IL.append JobList
jl (Packet ServerCommand -> Msgid
forall k pkt. GetPacketId k pkt => pkt -> k
getPacketId Packet ServerCommand
rpkt, Job
job)
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
runJobT :: Monad m => JobT tp m a -> WorkerT tp m a
runJobT :: JobT tp m a -> WorkerT tp m a
runJobT = ReaderT (WorkerEnv tp m) (JobT tp m) a -> WorkerT tp m a
forall tp (m :: * -> *) a.
ReaderT (WorkerEnv tp m) (JobT tp m) a -> WorkerT tp m a
WorkerT (ReaderT (WorkerEnv tp m) (JobT tp m) a -> WorkerT tp m a)
-> (JobT tp m a -> ReaderT (WorkerEnv tp m) (JobT tp m) a)
-> JobT tp m a
-> WorkerT tp m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. JobT tp m a -> ReaderT (WorkerEnv tp m) (JobT tp m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift
getClientEnv
:: (Monad m, Transport tp)
=> WorkerT tp m (BT.BaseClientEnv (Maybe Job) tp)
getClientEnv :: WorkerT tp m (BaseClientEnv (Maybe Job) tp)
getClientEnv = JobT tp m (BaseClientEnv (Maybe Job) tp)
-> WorkerT tp m (BaseClientEnv (Maybe Job) tp)
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT JobT tp m (BaseClientEnv (Maybe Job) tp)
forall (m :: * -> *) tp u.
(Monad m, Transport tp) =>
BaseClientT u tp m (BaseClientEnv u tp)
BT.getClientEnv
close :: (MonadUnliftIO m, Transport tp) => WorkerT tp m ()
close :: WorkerT tp m ()
close = JobT tp m () -> WorkerT tp m ()
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT JobT tp m ()
forall (m :: * -> *) tp u.
(MonadUnliftIO m, Transport tp) =>
BaseClientT u tp m ()
BT.close
ping :: (MonadUnliftIO m, Transport tp) => WorkerT tp m Bool
ping :: WorkerT tp m Bool
ping = JobT tp m Bool -> WorkerT tp m Bool
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT JobT tp m Bool
forall (m :: * -> *) tp u.
(MonadUnliftIO m, Transport tp) =>
BaseClientT u tp m Bool
BT.ping
addFunc
:: (MonadUnliftIO m, Transport tp)
=> FuncName -> JobT tp m () -> WorkerT tp m ()
addFunc :: FuncName -> JobT tp m () -> WorkerT tp m ()
addFunc f :: FuncName
f j :: JobT tp m ()
j = do
JobT tp m () -> WorkerT tp m ()
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT (JobT tp m () -> WorkerT tp m ())
-> JobT tp m () -> WorkerT tp m ()
forall a b. (a -> b) -> a -> b
$ Maybe Int64
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) k u nid rpkt tp a.
(MonadUnliftIO m, Eq k, Hashable k) =>
Maybe Int64
-> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
withSessionT Maybe Int64
forall a. Maybe a
Nothing (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ())
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (WorkerCommand -> Packet WorkerCommand
forall a. a -> Packet a
packetREQ (WorkerCommand -> Packet WorkerCommand)
-> WorkerCommand -> Packet WorkerCommand
forall a b. (a -> b) -> a -> b
$ FuncName -> WorkerCommand
CanDo FuncName
f)
TaskList tp m
ref <- (WorkerEnv tp m -> TaskList tp m) -> WorkerT tp m (TaskList tp m)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerEnv tp m -> TaskList tp m
forall tp (m :: * -> *). WorkerEnv tp m -> TaskList tp m
taskList
TaskList tp m -> FuncName -> JobT tp m () -> WorkerT tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> b -> m ()
HM.insert TaskList tp m
ref FuncName
f JobT tp m ()
j
broadcast
:: (MonadUnliftIO m, Transport tp)
=> FuncName -> JobT tp m () -> WorkerT tp m ()
broadcast :: FuncName -> JobT tp m () -> WorkerT tp m ()
broadcast f :: FuncName
f j :: JobT tp m ()
j = do
JobT tp m () -> WorkerT tp m ()
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT (JobT tp m () -> WorkerT tp m ())
-> JobT tp m () -> WorkerT tp m ()
forall a b. (a -> b) -> a -> b
$ Maybe Int64
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) k u nid rpkt tp a.
(MonadUnliftIO m, Eq k, Hashable k) =>
Maybe Int64
-> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
withSessionT Maybe Int64
forall a. Maybe a
Nothing (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ())
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (WorkerCommand -> Packet WorkerCommand
forall a. a -> Packet a
packetREQ (WorkerCommand -> Packet WorkerCommand)
-> WorkerCommand -> Packet WorkerCommand
forall a b. (a -> b) -> a -> b
$ FuncName -> WorkerCommand
Broadcast FuncName
f)
TaskList tp m
ref <- (WorkerEnv tp m -> TaskList tp m) -> WorkerT tp m (TaskList tp m)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerEnv tp m -> TaskList tp m
forall tp (m :: * -> *). WorkerEnv tp m -> TaskList tp m
taskList
TaskList tp m -> FuncName -> JobT tp m () -> WorkerT tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> b -> m ()
HM.insert TaskList tp m
ref FuncName
f JobT tp m ()
j
removeFunc
:: (MonadUnliftIO m, Transport tp)
=> FuncName -> WorkerT tp m ()
removeFunc :: FuncName -> WorkerT tp m ()
removeFunc f :: FuncName
f = do
TaskList tp m
tskList <- (WorkerEnv tp m -> TaskList tp m) -> WorkerT tp m (TaskList tp m)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerEnv tp m -> TaskList tp m
forall tp (m :: * -> *). WorkerEnv tp m -> TaskList tp m
taskList
JobT tp m () -> WorkerT tp m ()
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT (JobT tp m () -> WorkerT tp m ())
-> JobT tp m () -> WorkerT tp m ()
forall a b. (a -> b) -> a -> b
$ TaskList tp m -> FuncName -> JobT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
TaskList tp m -> FuncName -> JobT tp m ()
removeFunc_ TaskList tp m
tskList FuncName
f
removeFunc_
:: (MonadUnliftIO m, Transport tp)
=> TaskList tp m -> FuncName -> JobT tp m ()
removeFunc_ :: TaskList tp m -> FuncName -> JobT tp m ()
removeFunc_ ref :: TaskList tp m
ref f :: FuncName
f = do
Maybe Int64
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) k u nid rpkt tp a.
(MonadUnliftIO m, Eq k, Hashable k) =>
Maybe Int64
-> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
withSessionT Maybe Int64
forall a. Maybe a
Nothing (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ())
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (WorkerCommand -> Packet WorkerCommand
forall a. a -> Packet a
packetREQ (WorkerCommand -> Packet WorkerCommand)
-> WorkerCommand -> Packet WorkerCommand
forall a b. (a -> b) -> a -> b
$ FuncName -> WorkerCommand
CantDo FuncName
f)
TaskList tp m -> FuncName -> JobT tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m ()
HM.delete TaskList tp m
ref FuncName
f
getAssignJob :: ServerCommand -> Maybe Job
getAssignJob :: ServerCommand -> Maybe Job
getAssignJob (JobAssign job :: Job
job) = Job -> Maybe Job
forall a. a -> Maybe a
Just Job
job
getAssignJob _ = Maybe Job
forall a. Maybe a
Nothing
work
:: (MonadUnliftIO m, Transport tp)
=> Int -> WorkerT tp m ()
work :: Int -> WorkerT tp m ()
work size :: Int
size = do
TVar Int
tskSize <- (WorkerEnv tp m -> TVar Int) -> WorkerT tp m (TVar Int)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerEnv tp m -> TVar Int
forall tp (m :: * -> *). WorkerEnv tp m -> TVar Int
taskSize
JobT tp m () -> WorkerT tp m ()
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT (JobT tp m () -> WorkerT tp m ())
-> JobT tp m () -> WorkerT tp m ()
forall a b. (a -> b) -> a -> b
$ do
[SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)]
envs <- (Msgid
-> NodeT
(Maybe Job)
Nid
Msgid
(Packet ServerCommand)
tp
m
(SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)))
-> [Msgid]
-> NodeT
(Maybe Job)
Nid
Msgid
(Packet ServerCommand)
tp
m
[SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (Maybe Int64
-> Msgid
-> NodeT
(Maybe Job)
Nid
Msgid
(Packet ServerCommand)
tp
m
(SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand))
forall (m :: * -> *) k u nid rpkt tp.
(MonadIO m, Eq k, Hashable k) =>
Maybe Int64
-> k -> NodeT u nid k rpkt tp m (SessionEnv u nid k rpkt)
newSessionEnv (Int64 -> Maybe Int64
forall a. a -> Maybe a
Just (-1))) ([Msgid]
-> NodeT
(Maybe Job)
Nid
Msgid
(Packet ServerCommand)
tp
m
[SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)])
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m [Msgid]
-> NodeT
(Maybe Job)
Nid
Msgid
(Packet ServerCommand)
tp
m
[SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Msgid
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m [Msgid]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
size NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Msgid
forall (m :: * -> *) u nid k rpkt tp.
MonadIO m =>
NodeT u nid k rpkt tp m k
nextSessionId
JobT tp m () -> JobT tp m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (JobT tp m () -> JobT tp m ()) -> JobT tp m () -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ do
STM () -> JobT tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> JobT tp m ()) -> STM () -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ do
Int
s <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
tskSize
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
s Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
size) STM ()
forall a. STM a
retrySTM
(SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)
-> JobT tp m ())
-> [SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)]
-> JobT tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) u nid k rpkt tp a.
Monad m =>
SessionEnv u nid k rpkt
-> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
`runSessionT_` (Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ())
-> Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall a b. (a -> b) -> a -> b
$ WorkerCommand -> Packet WorkerCommand
forall a. a -> Packet a
packetREQ WorkerCommand
GrabJob)) [SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)]
envs
Int -> JobT tp m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay 10000000
processJob :: (MonadUnliftIO m, Transport tp) => WorkerEnv tp m -> (Msgid, Job) -> JobT tp m ()
processJob :: WorkerEnv tp m -> (Msgid, Job) -> JobT tp m ()
processJob WorkerEnv{..} (sid :: Msgid
sid, job :: Job
job) = do
STM () -> JobT tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> JobT tp m ()) -> STM () -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ do
Int
s <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
taskSize
TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
taskSize (Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1)
Maybe Job -> JobT tp m () -> JobT tp m ()
forall (m :: * -> *) u nid k rpkt tp a.
Monad m =>
u -> NodeT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
withEnv (Job -> Maybe Job
forall a. a -> Maybe a
Just Job
job) (JobT tp m () -> JobT tp m ()) -> JobT tp m () -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ do
FuncName
f <- JobT tp m FuncName
forall (m :: * -> *) tp. Monad m => JobT tp m FuncName
func_
Maybe (JobT tp m ())
task <- TaskList tp m
-> FuncName
-> NodeT
(Maybe Job)
Nid
Msgid
(Packet ServerCommand)
tp
m
(Maybe (JobT tp m ()))
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m (Maybe b)
HM.lookup TaskList tp m
taskList FuncName
f
case Maybe (JobT tp m ())
task of
Nothing -> do
TaskList tp m -> FuncName -> JobT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
TaskList tp m -> FuncName -> JobT tp m ()
removeFunc_ TaskList tp m
taskList FuncName
f
JobT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
JobT tp m ()
workFail
Just task' :: JobT tp m ()
task' ->
JobT tp m () -> (SomeException -> JobT tp m ()) -> JobT tp m ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> (SomeException -> m a) -> m a
catchAny JobT tp m ()
task' ((SomeException -> JobT tp m ()) -> JobT tp m ())
-> (SomeException -> JobT tp m ()) -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ \e :: SomeException
e -> do
[Char]
n <- JobT tp m [Char]
forall a (m :: * -> *) tp.
(FromBS a, Show a, Monad m) =>
JobT tp m a
name
IO () -> JobT tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> JobT tp m ()) -> IO () -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ [Char] -> [Char] -> IO ()
errorM "Periodic.Trans.Worker"
([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [[Char]] -> [Char]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [ "Failing on running job { name = "
, [Char]
n
, ", "
, FuncName -> [Char]
forall a. Show a => a -> [Char]
show FuncName
f
, " }"
, "\nError: "
, SomeException -> [Char]
forall a. Show a => a -> [Char]
show SomeException
e
]
JobT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
JobT tp m ()
workFail
STM () -> JobT tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> JobT tp m ()) -> STM () -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ do
Int
s <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
taskSize
TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
taskSize (Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
- 1)
SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)
jobEnv <- Maybe Int64
-> Msgid
-> NodeT
(Maybe Job)
Nid
Msgid
(Packet ServerCommand)
tp
m
(SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand))
forall (m :: * -> *) k u nid rpkt tp.
(MonadIO m, Eq k, Hashable k) =>
Maybe Int64
-> k -> NodeT u nid k rpkt tp m (SessionEnv u nid k rpkt)
newSessionEnv (Int64 -> Maybe Int64
forall a. a -> Maybe a
Just (-1)) Msgid
sid
SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) u nid k rpkt tp a.
Monad m =>
SessionEnv u nid k rpkt
-> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
runSessionT_ SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)
jobEnv (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ())
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (WorkerCommand -> Packet WorkerCommand
forall a. a -> Packet a
packetREQ WorkerCommand
GrabJob)
checkHealth
:: (MonadUnliftIO m, Transport tp)
=> WorkerT tp m ()
checkHealth :: WorkerT tp m ()
checkHealth = JobT tp m () -> WorkerT tp m ()
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT JobT tp m ()
forall (m :: * -> *) tp u.
(MonadUnliftIO m, Transport tp) =>
BaseClientT u tp m ()
BT.checkHealth
processJobQueue :: (MonadUnliftIO m, Transport tp) => WorkerEnv tp m -> JobT tp m ()
processJobQueue :: WorkerEnv tp m -> JobT tp m ()
processJobQueue wEnv :: WorkerEnv tp m
wEnv@WorkerEnv {..} = JobT tp m () -> JobT tp m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (JobT tp m () -> JobT tp m ()) -> JobT tp m () -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ do
[(Msgid, Job)]
jobs <- STM [(Msgid, Job)]
-> NodeT
(Maybe Job) Nid Msgid (Packet ServerCommand) tp m [(Msgid, Job)]
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM [(Msgid, Job)]
-> NodeT
(Maybe Job) Nid Msgid (Packet ServerCommand) tp m [(Msgid, Job)])
-> STM [(Msgid, Job)]
-> NodeT
(Maybe Job) Nid Msgid (Packet ServerCommand) tp m [(Msgid, Job)]
forall a b. (a -> b) -> a -> b
$ do
[(Msgid, Job)]
v <- JobList -> STM [(Msgid, Job)]
forall a. IOList a -> STM [a]
IL.toListSTM JobList
jobList
if [(Msgid, Job)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(Msgid, Job)]
v then STM [(Msgid, Job)]
forall a. STM a
retrySTM
else do
JobList -> STM ()
forall a. IOList a -> STM ()
IL.clearSTM JobList
jobList
[(Msgid, Job)] -> STM [(Msgid, Job)]
forall (m :: * -> *) a. Monad m => a -> m a
return [(Msgid, Job)]
v
((Msgid, Job)
-> NodeT
(Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ()))
-> [(Msgid, Job)] -> JobT tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (JobT tp m ()
-> NodeT
(Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (JobT tp m ()
-> NodeT
(Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ()))
-> ((Msgid, Job) -> JobT tp m ())
-> (Msgid, Job)
-> NodeT
(Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WorkerEnv tp m -> (Msgid, Job) -> JobT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
WorkerEnv tp m -> (Msgid, Job) -> JobT tp m ()
processJob WorkerEnv tp m
wEnv) [(Msgid, Job)]
jobs