{-# LANGUAGE FlexibleContexts           #-}
{-# LANGUAGE FlexibleInstances          #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses      #-}
{-# LANGUAGE OverloadedStrings          #-}
{-# LANGUAGE RankNTypes                 #-}
{-# LANGUAGE RecordWildCards            #-}
{-# LANGUAGE ScopedTypeVariables        #-}
{-# LANGUAGE TypeFamilies               #-}
{-# LANGUAGE UndecidableInstances       #-}

module Periodic.Trans.Worker
  ( WorkerT
  , startWorkerT
  , ping
  , addFunc
  , broadcast
  , removeFunc
  , work
  , close
  , runJobT
  , getClientEnv
  ) where

import           Control.Monad                (forever, replicateM, void, when)
import           Control.Monad.Reader.Class   (MonadReader, asks)
import           Control.Monad.Trans.Class    (MonadTrans, lift)
import           Control.Monad.Trans.Reader   (ReaderT (..), runReaderT)
import           Metro.Class                  (Transport, TransportConfig,
                                               getPacketId)
import           Metro.Conn                   (initConnEnv, runConnT)
import qualified Metro.Conn                   as Conn
import           Metro.IOHashMap              (IOHashMap, newIOHashMap)
import qualified Metro.IOHashMap              as HM (delete, insert, lookup)
import           Metro.Node                   (NodeMode (..), SessionMode (..),
                                               initEnv1, newSessionEnv,
                                               nextSessionId, runSessionT_,
                                               setDefaultSessionTimeout,
                                               setNodeMode, setSessionMode,
                                               startNodeT_, withEnv,
                                               withSessionT)
import           Metro.Session                (send)
import           Periodic.IOList              (IOList, newIOList)
import qualified Periodic.IOList              as IL (append, clearSTM,
                                                     toListSTM)
import           Periodic.Node
import qualified Periodic.Trans.BaseClient    as BT (BaseClientEnv, checkHealth,
                                                     close, getClientEnv, ping)
import           Periodic.Trans.Job           (JobT, func_, name, workFail)
import           Periodic.Types               (ClientType (TypeWorker), Msgid,
                                               Packet, getClientType, getResult,
                                               packetREQ, regPacketREQ)
import           Periodic.Types.Job
import           Periodic.Types.ServerCommand
import           Periodic.Types.WorkerCommand
import           System.Log.Logger            (errorM)
import           UnliftIO
import           UnliftIO.Concurrent          (threadDelay)

type TaskList tp m = IOHashMap FuncName (JobT tp m ())

type JobList = IOList (Msgid, Job)

data WorkerEnv tp m = WorkerEnv
    { WorkerEnv tp m -> TaskList tp m
taskList :: TaskList tp m
    , WorkerEnv tp m -> JobList
jobList  :: JobList
    , WorkerEnv tp m -> TVar Int
taskSize :: TVar Int
    }

newtype WorkerT tp m a = WorkerT {WorkerT tp m a -> ReaderT (WorkerEnv tp m) (JobT tp m) a
unWorkerT :: ReaderT (WorkerEnv tp m) (JobT tp m) a}
  deriving
    ( a -> WorkerT tp m b -> WorkerT tp m a
(a -> b) -> WorkerT tp m a -> WorkerT tp m b
(forall a b. (a -> b) -> WorkerT tp m a -> WorkerT tp m b)
-> (forall a b. a -> WorkerT tp m b -> WorkerT tp m a)
-> Functor (WorkerT tp m)
forall a b. a -> WorkerT tp m b -> WorkerT tp m a
forall a b. (a -> b) -> WorkerT tp m a -> WorkerT tp m b
forall tp (m :: * -> *) a b.
Functor m =>
a -> WorkerT tp m b -> WorkerT tp m a
forall tp (m :: * -> *) a b.
Functor m =>
(a -> b) -> WorkerT tp m a -> WorkerT tp m b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> WorkerT tp m b -> WorkerT tp m a
$c<$ :: forall tp (m :: * -> *) a b.
Functor m =>
a -> WorkerT tp m b -> WorkerT tp m a
fmap :: (a -> b) -> WorkerT tp m a -> WorkerT tp m b
$cfmap :: forall tp (m :: * -> *) a b.
Functor m =>
(a -> b) -> WorkerT tp m a -> WorkerT tp m b
Functor
    , Functor (WorkerT tp m)
a -> WorkerT tp m a
Functor (WorkerT tp m) =>
(forall a. a -> WorkerT tp m a)
-> (forall a b.
    WorkerT tp m (a -> b) -> WorkerT tp m a -> WorkerT tp m b)
-> (forall a b c.
    (a -> b -> c)
    -> WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m c)
-> (forall a b. WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b)
-> (forall a b. WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m a)
-> Applicative (WorkerT tp m)
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m a
WorkerT tp m (a -> b) -> WorkerT tp m a -> WorkerT tp m b
(a -> b -> c) -> WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m c
forall a. a -> WorkerT tp m a
forall a b. WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m a
forall a b. WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
forall a b.
WorkerT tp m (a -> b) -> WorkerT tp m a -> WorkerT tp m b
forall a b c.
(a -> b -> c) -> WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m c
forall tp (m :: * -> *). Applicative m => Functor (WorkerT tp m)
forall tp (m :: * -> *) a. Applicative m => a -> WorkerT tp m a
forall tp (m :: * -> *) a b.
Applicative m =>
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m a
forall tp (m :: * -> *) a b.
Applicative m =>
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
forall tp (m :: * -> *) a b.
Applicative m =>
WorkerT tp m (a -> b) -> WorkerT tp m a -> WorkerT tp m b
forall tp (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c) -> WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m a
$c<* :: forall tp (m :: * -> *) a b.
Applicative m =>
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m a
*> :: WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
$c*> :: forall tp (m :: * -> *) a b.
Applicative m =>
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
liftA2 :: (a -> b -> c) -> WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m c
$cliftA2 :: forall tp (m :: * -> *) a b c.
Applicative m =>
(a -> b -> c) -> WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m c
<*> :: WorkerT tp m (a -> b) -> WorkerT tp m a -> WorkerT tp m b
$c<*> :: forall tp (m :: * -> *) a b.
Applicative m =>
WorkerT tp m (a -> b) -> WorkerT tp m a -> WorkerT tp m b
pure :: a -> WorkerT tp m a
$cpure :: forall tp (m :: * -> *) a. Applicative m => a -> WorkerT tp m a
$cp1Applicative :: forall tp (m :: * -> *). Applicative m => Functor (WorkerT tp m)
Applicative
    , Applicative (WorkerT tp m)
a -> WorkerT tp m a
Applicative (WorkerT tp m) =>
(forall a b.
 WorkerT tp m a -> (a -> WorkerT tp m b) -> WorkerT tp m b)
-> (forall a b. WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b)
-> (forall a. a -> WorkerT tp m a)
-> Monad (WorkerT tp m)
WorkerT tp m a -> (a -> WorkerT tp m b) -> WorkerT tp m b
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
forall a. a -> WorkerT tp m a
forall a b. WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
forall a b.
WorkerT tp m a -> (a -> WorkerT tp m b) -> WorkerT tp m b
forall tp (m :: * -> *). Monad m => Applicative (WorkerT tp m)
forall tp (m :: * -> *) a. Monad m => a -> WorkerT tp m a
forall tp (m :: * -> *) a b.
Monad m =>
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
forall tp (m :: * -> *) a b.
Monad m =>
WorkerT tp m a -> (a -> WorkerT tp m b) -> WorkerT tp m b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: a -> WorkerT tp m a
$creturn :: forall tp (m :: * -> *) a. Monad m => a -> WorkerT tp m a
>> :: WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
$c>> :: forall tp (m :: * -> *) a b.
Monad m =>
WorkerT tp m a -> WorkerT tp m b -> WorkerT tp m b
>>= :: WorkerT tp m a -> (a -> WorkerT tp m b) -> WorkerT tp m b
$c>>= :: forall tp (m :: * -> *) a b.
Monad m =>
WorkerT tp m a -> (a -> WorkerT tp m b) -> WorkerT tp m b
$cp1Monad :: forall tp (m :: * -> *). Monad m => Applicative (WorkerT tp m)
Monad
    , Monad (WorkerT tp m)
Monad (WorkerT tp m) =>
(forall a. IO a -> WorkerT tp m a) -> MonadIO (WorkerT tp m)
IO a -> WorkerT tp m a
forall a. IO a -> WorkerT tp m a
forall tp (m :: * -> *). MonadIO m => Monad (WorkerT tp m)
forall tp (m :: * -> *) a. MonadIO m => IO a -> WorkerT tp m a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
liftIO :: IO a -> WorkerT tp m a
$cliftIO :: forall tp (m :: * -> *) a. MonadIO m => IO a -> WorkerT tp m a
$cp1MonadIO :: forall tp (m :: * -> *). MonadIO m => Monad (WorkerT tp m)
MonadIO
    , MonadReader (WorkerEnv tp m)
    )

instance MonadUnliftIO m => MonadUnliftIO (WorkerT tp m) where
  withRunInIO :: ((forall a. WorkerT tp m a -> IO a) -> IO b) -> WorkerT tp m b
withRunInIO inner :: (forall a. WorkerT tp m a -> IO a) -> IO b
inner = ReaderT (WorkerEnv tp m) (JobT tp m) b -> WorkerT tp m b
forall tp (m :: * -> *) a.
ReaderT (WorkerEnv tp m) (JobT tp m) a -> WorkerT tp m a
WorkerT (ReaderT (WorkerEnv tp m) (JobT tp m) b -> WorkerT tp m b)
-> ReaderT (WorkerEnv tp m) (JobT tp m) b -> WorkerT tp m b
forall a b. (a -> b) -> a -> b
$
    (WorkerEnv tp m -> JobT tp m b)
-> ReaderT (WorkerEnv tp m) (JobT tp m) b
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT ((WorkerEnv tp m -> JobT tp m b)
 -> ReaderT (WorkerEnv tp m) (JobT tp m) b)
-> (WorkerEnv tp m -> JobT tp m b)
-> ReaderT (WorkerEnv tp m) (JobT tp m) b
forall a b. (a -> b) -> a -> b
$ \r :: WorkerEnv tp m
r ->
      ((forall a. JobT tp m a -> IO a) -> IO b) -> JobT tp m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. JobT tp m a -> IO a) -> IO b) -> JobT tp m b)
-> ((forall a. JobT tp m a -> IO a) -> IO b) -> JobT tp m b
forall a b. (a -> b) -> a -> b
$ \run :: forall a. JobT tp m a -> IO a
run ->
        (forall a. WorkerT tp m a -> IO a) -> IO b
inner (JobT tp m a -> IO a
forall a. JobT tp m a -> IO a
run (JobT tp m a -> IO a)
-> (WorkerT tp m a -> JobT tp m a) -> WorkerT tp m a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WorkerEnv tp m -> WorkerT tp m a -> JobT tp m a
forall tp (m :: * -> *) a.
WorkerEnv tp m -> WorkerT tp m a -> JobT tp m a
runWorkerT WorkerEnv tp m
r)

instance MonadTrans (WorkerT tp) where
  lift :: m a -> WorkerT tp m a
lift = ReaderT (WorkerEnv tp m) (JobT tp m) a -> WorkerT tp m a
forall tp (m :: * -> *) a.
ReaderT (WorkerEnv tp m) (JobT tp m) a -> WorkerT tp m a
WorkerT (ReaderT (WorkerEnv tp m) (JobT tp m) a -> WorkerT tp m a)
-> (m a -> ReaderT (WorkerEnv tp m) (JobT tp m) a)
-> m a
-> WorkerT tp m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m a
-> ReaderT (WorkerEnv tp m) (JobT tp m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m a
 -> ReaderT (WorkerEnv tp m) (JobT tp m) a)
-> (m a
    -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m a)
-> m a
-> ReaderT (WorkerEnv tp m) (JobT tp m) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift

runWorkerT :: WorkerEnv tp m -> WorkerT tp m a -> JobT tp m a
runWorkerT :: WorkerEnv tp m -> WorkerT tp m a -> JobT tp m a
runWorkerT workerEnv :: WorkerEnv tp m
workerEnv = (ReaderT (WorkerEnv tp m) (JobT tp m) a
 -> WorkerEnv tp m -> JobT tp m a)
-> WorkerEnv tp m
-> ReaderT (WorkerEnv tp m) (JobT tp m) a
-> JobT tp m a
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT (WorkerEnv tp m) (JobT tp m) a
-> WorkerEnv tp m -> JobT tp m a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT WorkerEnv tp m
workerEnv (ReaderT (WorkerEnv tp m) (JobT tp m) a -> JobT tp m a)
-> (WorkerT tp m a -> ReaderT (WorkerEnv tp m) (JobT tp m) a)
-> WorkerT tp m a
-> JobT tp m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WorkerT tp m a -> ReaderT (WorkerEnv tp m) (JobT tp m) a
forall tp (m :: * -> *) a.
WorkerT tp m a -> ReaderT (WorkerEnv tp m) (JobT tp m) a
unWorkerT

startWorkerT
  :: (MonadUnliftIO m, Transport tp)
  => TransportConfig tp -> WorkerT tp m () -> m ()
startWorkerT :: TransportConfig tp -> WorkerT tp m () -> m ()
startWorkerT config :: TransportConfig tp
config m :: WorkerT tp m ()
m = do
  ConnEnv tp
connEnv <- TransportConfig tp -> m (ConnEnv tp)
forall (m :: * -> *) tp.
(MonadIO m, Transport tp) =>
TransportConfig tp -> m (ConnEnv tp)
initConnEnv TransportConfig tp
config
  RegPacket ServerCommand
r <- ConnEnv tp
-> ConnT tp m (RegPacket ServerCommand)
-> m (RegPacket ServerCommand)
forall tp (m :: * -> *) a. ConnEnv tp -> ConnT tp m a -> m a
runConnT ConnEnv tp
connEnv (ConnT tp m (RegPacket ServerCommand)
 -> m (RegPacket ServerCommand))
-> ConnT tp m (RegPacket ServerCommand)
-> m (RegPacket ServerCommand)
forall a b. (a -> b) -> a -> b
$ do
    RegPacket ClientType -> ConnT tp m ()
forall (m :: * -> *) tp pkt.
(MonadUnliftIO m, Transport tp, SendPacket pkt) =>
pkt -> ConnT tp m ()
Conn.send (RegPacket ClientType -> ConnT tp m ())
-> RegPacket ClientType -> ConnT tp m ()
forall a b. (a -> b) -> a -> b
$ ClientType -> RegPacket ClientType
forall a. a -> RegPacket a
regPacketREQ ClientType
TypeWorker
    ConnT tp m (RegPacket ServerCommand)
forall (m :: * -> *) tp pkt.
(MonadUnliftIO m, Transport tp, RecvPacket pkt) =>
ConnT tp m pkt
Conn.receive

  let nid :: ByteString
nid = case RegPacket ServerCommand -> ServerCommand
forall a. RegPacket a -> a
getClientType RegPacket ServerCommand
r of
              Data v :: ByteString
v -> ByteString
v
              _      -> ""

  IOHashMap FuncName (JobT tp m ())
taskList <- m (IOHashMap FuncName (JobT tp m ()))
forall (m :: * -> *) a b. MonadIO m => m (IOHashMap a b)
newIOHashMap
  JobList
jobList <- m JobList
forall (m :: * -> *) a. MonadIO m => m (IOList a)
newIOList
  TVar Int
taskSize <- Int -> m (TVar Int)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO 0

  NodeEnv1 (Maybe Job) Nid Msgid (Packet ServerCommand) tp
jobEnv1 <- (NodeEnv (Maybe Job) Nid Msgid (Packet ServerCommand)
 -> NodeEnv (Maybe Job) Nid Msgid (Packet ServerCommand))
-> ConnEnv tp
-> Maybe Job
-> Nid
-> IO Msgid
-> m (NodeEnv1 (Maybe Job) Nid Msgid (Packet ServerCommand) tp)
forall (m :: * -> *) u nid k rpkt tp.
MonadIO m =>
(NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt)
-> ConnEnv tp -> u -> nid -> IO k -> m (NodeEnv1 u nid k rpkt tp)
initEnv1 NodeEnv (Maybe Job) Nid Msgid (Packet ServerCommand)
-> NodeEnv (Maybe Job) Nid Msgid (Packet ServerCommand)
forall u nid k rpkt. NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt
mapEnv ConnEnv tp
connEnv Maybe Job
forall a. Maybe a
Nothing (ByteString -> Nid
Nid ByteString
nid) IO Msgid
sessionGen

  let wEnv :: WorkerEnv tp m
wEnv = WorkerEnv :: forall tp (m :: * -> *).
TaskList tp m -> JobList -> TVar Int -> WorkerEnv tp m
WorkerEnv {..}

  NodeEnv1 (Maybe Job) Nid Msgid (Packet ServerCommand) tp
-> JobT tp m () -> m ()
forall (m :: * -> *) u rpkt tp a.
Monad m =>
NodeEnv u rpkt tp -> NodeT u rpkt tp m a -> m a
runNodeT NodeEnv1 (Maybe Job) Nid Msgid (Packet ServerCommand) tp
jobEnv1 (JobT tp m () -> m ()) -> JobT tp m () -> m ()
forall a b. (a -> b) -> a -> b
$ do

    NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ())
-> JobT tp m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ())
 -> JobT tp m ())
-> NodeT
     (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ())
-> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ JobT tp m ()
-> NodeT
     (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (JobT tp m ()
 -> NodeT
      (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ()))
-> JobT tp m ()
-> NodeT
     (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ())
forall a b. (a -> b) -> a -> b
$ (Packet ServerCommand -> m Bool)
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) tp rpkt k u nid.
(MonadUnliftIO m, Transport tp, RecvPacket rpkt,
 GetPacketId k rpkt, Eq k, Hashable k) =>
(rpkt -> m Bool)
-> SessionT u nid k rpkt tp m () -> NodeT u nid k rpkt tp m ()
startNodeT_ (JobList -> Packet ServerCommand -> m Bool
forall (m :: * -> *).
MonadIO m =>
JobList -> Packet ServerCommand -> m Bool
filterPacketM JobList
jobList) SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) u rpkt tp.
MonadIO m =>
SessionT u rpkt tp m ()
defaultSessionHandler
    WorkerEnv tp m -> WorkerT tp m () -> JobT tp m ()
forall tp (m :: * -> *) a.
WorkerEnv tp m -> WorkerT tp m a -> JobT tp m a
runWorkerT WorkerEnv tp m
wEnv (WorkerT tp m () -> JobT tp m ())
-> WorkerT tp m () -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ do
      WorkerT tp m (Async Any) -> WorkerT tp m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (WorkerT tp m (Async Any) -> WorkerT tp m ())
-> (WorkerT tp m Any -> WorkerT tp m (Async Any))
-> WorkerT tp m Any
-> WorkerT tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WorkerT tp m Any -> WorkerT tp m (Async Any)
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (WorkerT tp m Any -> WorkerT tp m ())
-> WorkerT tp m Any -> WorkerT tp m ()
forall a b. (a -> b) -> a -> b
$ WorkerT tp m () -> WorkerT tp m Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (WorkerT tp m () -> WorkerT tp m Any)
-> WorkerT tp m () -> WorkerT tp m Any
forall a b. (a -> b) -> a -> b
$ do
        Int -> WorkerT tp m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (Int -> WorkerT tp m ()) -> Int -> WorkerT tp m ()
forall a b. (a -> b) -> a -> b
$ 100 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000
        WorkerT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
WorkerT tp m ()
checkHealth
      WorkerT tp m (Async ()) -> WorkerT tp m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (WorkerT tp m (Async ()) -> WorkerT tp m ())
-> (JobT tp m () -> WorkerT tp m (Async ()))
-> JobT tp m ()
-> WorkerT tp m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WorkerT tp m () -> WorkerT tp m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (WorkerT tp m () -> WorkerT tp m (Async ()))
-> (JobT tp m () -> WorkerT tp m ())
-> JobT tp m ()
-> WorkerT tp m (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. JobT tp m () -> WorkerT tp m ()
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT (JobT tp m () -> WorkerT tp m ())
-> JobT tp m () -> WorkerT tp m ()
forall a b. (a -> b) -> a -> b
$ WorkerEnv tp m -> JobT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
WorkerEnv tp m -> JobT tp m ()
processJobQueue WorkerEnv tp m
wEnv
      WorkerT tp m ()
m

  where mapEnv :: NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt
mapEnv =
          NodeMode -> NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt
forall u nid k rpkt.
NodeMode -> NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt
setNodeMode NodeMode
Multi
          (NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt)
-> (NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt)
-> NodeEnv u nid k rpkt
-> NodeEnv u nid k rpkt
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SessionMode -> NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt
forall u nid k rpkt.
SessionMode -> NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt
setSessionMode SessionMode
SingleAction
          (NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt)
-> (NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt)
-> NodeEnv u nid k rpkt
-> NodeEnv u nid k rpkt
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int64 -> NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt
forall u nid k rpkt.
Int64 -> NodeEnv u nid k rpkt -> NodeEnv u nid k rpkt
setDefaultSessionTimeout 100

filterPacketM :: MonadIO m => JobList -> Packet ServerCommand -> m Bool
filterPacketM :: JobList -> Packet ServerCommand -> m Bool
filterPacketM jl :: JobList
jl rpkt :: Packet ServerCommand
rpkt = do
  case Maybe Job
-> (ServerCommand -> Maybe Job)
-> Maybe (Packet ServerCommand)
-> Maybe Job
forall a b. a -> (b -> a) -> Maybe (Packet b) -> a
getResult Maybe Job
forall a. Maybe a
Nothing ServerCommand -> Maybe Job
getAssignJob (Packet ServerCommand -> Maybe (Packet ServerCommand)
forall a. a -> Maybe a
Just Packet ServerCommand
rpkt) of
    Nothing  -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
    Just job :: Job
job -> do
      JobList -> (Msgid, Job) -> m ()
forall (m :: * -> *) a. MonadIO m => IOList a -> a -> m ()
IL.append JobList
jl (Packet ServerCommand -> Msgid
forall k pkt. GetPacketId k pkt => pkt -> k
getPacketId Packet ServerCommand
rpkt, Job
job)
      Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False


runJobT :: Monad m => JobT tp m a -> WorkerT tp m a
runJobT :: JobT tp m a -> WorkerT tp m a
runJobT = ReaderT (WorkerEnv tp m) (JobT tp m) a -> WorkerT tp m a
forall tp (m :: * -> *) a.
ReaderT (WorkerEnv tp m) (JobT tp m) a -> WorkerT tp m a
WorkerT (ReaderT (WorkerEnv tp m) (JobT tp m) a -> WorkerT tp m a)
-> (JobT tp m a -> ReaderT (WorkerEnv tp m) (JobT tp m) a)
-> JobT tp m a
-> WorkerT tp m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. JobT tp m a -> ReaderT (WorkerEnv tp m) (JobT tp m) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift

getClientEnv
  :: (Monad m, Transport tp)
  => WorkerT tp m (BT.BaseClientEnv (Maybe Job) tp)
getClientEnv :: WorkerT tp m (BaseClientEnv (Maybe Job) tp)
getClientEnv = JobT tp m (BaseClientEnv (Maybe Job) tp)
-> WorkerT tp m (BaseClientEnv (Maybe Job) tp)
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT JobT tp m (BaseClientEnv (Maybe Job) tp)
forall (m :: * -> *) tp u.
(Monad m, Transport tp) =>
BaseClientT u tp m (BaseClientEnv u tp)
BT.getClientEnv

close :: (MonadUnliftIO m, Transport tp) => WorkerT tp m ()
close :: WorkerT tp m ()
close = JobT tp m () -> WorkerT tp m ()
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT JobT tp m ()
forall (m :: * -> *) tp u.
(MonadUnliftIO m, Transport tp) =>
BaseClientT u tp m ()
BT.close

ping :: (MonadUnliftIO m, Transport tp) => WorkerT tp m Bool
ping :: WorkerT tp m Bool
ping = JobT tp m Bool -> WorkerT tp m Bool
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT JobT tp m Bool
forall (m :: * -> *) tp u.
(MonadUnliftIO m, Transport tp) =>
BaseClientT u tp m Bool
BT.ping

addFunc
  :: (MonadUnliftIO m, Transport tp)
  => FuncName -> JobT tp m () -> WorkerT tp m ()
addFunc :: FuncName -> JobT tp m () -> WorkerT tp m ()
addFunc f :: FuncName
f j :: JobT tp m ()
j = do
  JobT tp m () -> WorkerT tp m ()
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT (JobT tp m () -> WorkerT tp m ())
-> JobT tp m () -> WorkerT tp m ()
forall a b. (a -> b) -> a -> b
$ Maybe Int64
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) k u nid rpkt tp a.
(MonadUnliftIO m, Eq k, Hashable k) =>
Maybe Int64
-> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
withSessionT Maybe Int64
forall a. Maybe a
Nothing (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
 -> JobT tp m ())
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
 SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (WorkerCommand -> Packet WorkerCommand
forall a. a -> Packet a
packetREQ (WorkerCommand -> Packet WorkerCommand)
-> WorkerCommand -> Packet WorkerCommand
forall a b. (a -> b) -> a -> b
$ FuncName -> WorkerCommand
CanDo FuncName
f)
  TaskList tp m
ref <- (WorkerEnv tp m -> TaskList tp m) -> WorkerT tp m (TaskList tp m)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerEnv tp m -> TaskList tp m
forall tp (m :: * -> *). WorkerEnv tp m -> TaskList tp m
taskList
  TaskList tp m -> FuncName -> JobT tp m () -> WorkerT tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> b -> m ()
HM.insert TaskList tp m
ref FuncName
f JobT tp m ()
j

broadcast
  :: (MonadUnliftIO m, Transport tp)
  => FuncName -> JobT tp m () -> WorkerT tp m ()
broadcast :: FuncName -> JobT tp m () -> WorkerT tp m ()
broadcast f :: FuncName
f j :: JobT tp m ()
j = do
  JobT tp m () -> WorkerT tp m ()
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT (JobT tp m () -> WorkerT tp m ())
-> JobT tp m () -> WorkerT tp m ()
forall a b. (a -> b) -> a -> b
$ Maybe Int64
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) k u nid rpkt tp a.
(MonadUnliftIO m, Eq k, Hashable k) =>
Maybe Int64
-> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
withSessionT Maybe Int64
forall a. Maybe a
Nothing (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
 -> JobT tp m ())
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
 SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (WorkerCommand -> Packet WorkerCommand
forall a. a -> Packet a
packetREQ (WorkerCommand -> Packet WorkerCommand)
-> WorkerCommand -> Packet WorkerCommand
forall a b. (a -> b) -> a -> b
$ FuncName -> WorkerCommand
Broadcast FuncName
f)
  TaskList tp m
ref <- (WorkerEnv tp m -> TaskList tp m) -> WorkerT tp m (TaskList tp m)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerEnv tp m -> TaskList tp m
forall tp (m :: * -> *). WorkerEnv tp m -> TaskList tp m
taskList
  TaskList tp m -> FuncName -> JobT tp m () -> WorkerT tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> b -> m ()
HM.insert TaskList tp m
ref FuncName
f JobT tp m ()
j

removeFunc
  :: (MonadUnliftIO m, Transport tp)
  => FuncName -> WorkerT tp m ()
removeFunc :: FuncName -> WorkerT tp m ()
removeFunc f :: FuncName
f = do
  TaskList tp m
tskList <- (WorkerEnv tp m -> TaskList tp m) -> WorkerT tp m (TaskList tp m)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerEnv tp m -> TaskList tp m
forall tp (m :: * -> *). WorkerEnv tp m -> TaskList tp m
taskList
  JobT tp m () -> WorkerT tp m ()
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT (JobT tp m () -> WorkerT tp m ())
-> JobT tp m () -> WorkerT tp m ()
forall a b. (a -> b) -> a -> b
$ TaskList tp m -> FuncName -> JobT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
TaskList tp m -> FuncName -> JobT tp m ()
removeFunc_ TaskList tp m
tskList FuncName
f

removeFunc_
  :: (MonadUnliftIO m, Transport tp)
  => TaskList tp m -> FuncName -> JobT tp m ()
removeFunc_ :: TaskList tp m -> FuncName -> JobT tp m ()
removeFunc_ ref :: TaskList tp m
ref f :: FuncName
f = do
  Maybe Int64
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) k u nid rpkt tp a.
(MonadUnliftIO m, Eq k, Hashable k) =>
Maybe Int64
-> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
withSessionT Maybe Int64
forall a. Maybe a
Nothing (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
 -> JobT tp m ())
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
 SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (WorkerCommand -> Packet WorkerCommand
forall a. a -> Packet a
packetREQ (WorkerCommand -> Packet WorkerCommand)
-> WorkerCommand -> Packet WorkerCommand
forall a b. (a -> b) -> a -> b
$ FuncName -> WorkerCommand
CantDo FuncName
f)
  TaskList tp m -> FuncName -> JobT tp m ()
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m ()
HM.delete TaskList tp m
ref FuncName
f

getAssignJob :: ServerCommand -> Maybe Job
getAssignJob :: ServerCommand -> Maybe Job
getAssignJob (JobAssign job :: Job
job) = Job -> Maybe Job
forall a. a -> Maybe a
Just Job
job
getAssignJob _               = Maybe Job
forall a. Maybe a
Nothing

work
  :: (MonadUnliftIO m, Transport tp)
  => Int -> WorkerT tp m ()
work :: Int -> WorkerT tp m ()
work size :: Int
size = do
  TVar Int
tskSize <- (WorkerEnv tp m -> TVar Int) -> WorkerT tp m (TVar Int)
forall r (m :: * -> *) a. MonadReader r m => (r -> a) -> m a
asks WorkerEnv tp m -> TVar Int
forall tp (m :: * -> *). WorkerEnv tp m -> TVar Int
taskSize
  JobT tp m () -> WorkerT tp m ()
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT (JobT tp m () -> WorkerT tp m ())
-> JobT tp m () -> WorkerT tp m ()
forall a b. (a -> b) -> a -> b
$ do
    [SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)]
envs <- (Msgid
 -> NodeT
      (Maybe Job)
      Nid
      Msgid
      (Packet ServerCommand)
      tp
      m
      (SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)))
-> [Msgid]
-> NodeT
     (Maybe Job)
     Nid
     Msgid
     (Packet ServerCommand)
     tp
     m
     [SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM (Maybe Int64
-> Msgid
-> NodeT
     (Maybe Job)
     Nid
     Msgid
     (Packet ServerCommand)
     tp
     m
     (SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand))
forall (m :: * -> *) k u nid rpkt tp.
(MonadIO m, Eq k, Hashable k) =>
Maybe Int64
-> k -> NodeT u nid k rpkt tp m (SessionEnv u nid k rpkt)
newSessionEnv (Int64 -> Maybe Int64
forall a. a -> Maybe a
Just (-1))) ([Msgid]
 -> NodeT
      (Maybe Job)
      Nid
      Msgid
      (Packet ServerCommand)
      tp
      m
      [SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)])
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m [Msgid]
-> NodeT
     (Maybe Job)
     Nid
     Msgid
     (Packet ServerCommand)
     tp
     m
     [SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Msgid
-> NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m [Msgid]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
size NodeT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m Msgid
forall (m :: * -> *) u nid k rpkt tp.
MonadIO m =>
NodeT u nid k rpkt tp m k
nextSessionId
    JobT tp m () -> JobT tp m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (JobT tp m () -> JobT tp m ()) -> JobT tp m () -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ do
      STM () -> JobT tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> JobT tp m ()) -> STM () -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ do
        Int
s <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
tskSize
        Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
s Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
size) STM ()
forall a. STM a
retrySTM

      (SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)
 -> JobT tp m ())
-> [SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)]
-> JobT tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) u nid k rpkt tp a.
Monad m =>
SessionEnv u nid k rpkt
-> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
`runSessionT_` (Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
 SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (Packet WorkerCommand
 -> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ())
-> Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall a b. (a -> b) -> a -> b
$ WorkerCommand -> Packet WorkerCommand
forall a. a -> Packet a
packetREQ WorkerCommand
GrabJob)) [SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)]
envs
      Int -> JobT tp m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay 10000000 -- 10s


processJob :: (MonadUnliftIO m, Transport tp) => WorkerEnv tp m -> (Msgid, Job) -> JobT tp m ()
processJob :: WorkerEnv tp m -> (Msgid, Job) -> JobT tp m ()
processJob WorkerEnv{..} (sid :: Msgid
sid, job :: Job
job) = do
  STM () -> JobT tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> JobT tp m ()) -> STM () -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ do
    Int
s <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
taskSize
    TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
taskSize (Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1)
  Maybe Job -> JobT tp m () -> JobT tp m ()
forall (m :: * -> *) u nid k rpkt tp a.
Monad m =>
u -> NodeT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
withEnv (Job -> Maybe Job
forall a. a -> Maybe a
Just Job
job) (JobT tp m () -> JobT tp m ()) -> JobT tp m () -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ do
    FuncName
f <- JobT tp m FuncName
forall (m :: * -> *) tp. Monad m => JobT tp m FuncName
func_
    Maybe (JobT tp m ())
task <- TaskList tp m
-> FuncName
-> NodeT
     (Maybe Job)
     Nid
     Msgid
     (Packet ServerCommand)
     tp
     m
     (Maybe (JobT tp m ()))
forall a (m :: * -> *) b.
(Eq a, Hashable a, MonadIO m) =>
IOHashMap a b -> a -> m (Maybe b)
HM.lookup TaskList tp m
taskList FuncName
f
    case Maybe (JobT tp m ())
task of
      Nothing -> do
        TaskList tp m -> FuncName -> JobT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
TaskList tp m -> FuncName -> JobT tp m ()
removeFunc_ TaskList tp m
taskList FuncName
f
        JobT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
JobT tp m ()
workFail
      Just task' :: JobT tp m ()
task' ->
        JobT tp m () -> (SomeException -> JobT tp m ()) -> JobT tp m ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> (SomeException -> m a) -> m a
catchAny JobT tp m ()
task' ((SomeException -> JobT tp m ()) -> JobT tp m ())
-> (SomeException -> JobT tp m ()) -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ \e :: SomeException
e -> do
          [Char]
n <- JobT tp m [Char]
forall a (m :: * -> *) tp.
(FromBS a, Show a, Monad m) =>
JobT tp m a
name
          IO () -> JobT tp m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> JobT tp m ()) -> IO () -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ [Char] -> [Char] -> IO ()
errorM "Periodic.Trans.Worker"
                 ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [[Char]] -> [Char]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [ "Failing on running job { name = "
                          , [Char]
n
                          , ", "
                          , FuncName -> [Char]
forall a. Show a => a -> [Char]
show FuncName
f
                          , " }"
                          , "\nError: "
                          , SomeException -> [Char]
forall a. Show a => a -> [Char]
show SomeException
e
                          ]
          JobT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
JobT tp m ()
workFail

  STM () -> JobT tp m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> JobT tp m ()) -> STM () -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ do
    Int
s <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
taskSize
    TVar Int -> Int -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Int
taskSize (Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
- 1)

  SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)
jobEnv <- Maybe Int64
-> Msgid
-> NodeT
     (Maybe Job)
     Nid
     Msgid
     (Packet ServerCommand)
     tp
     m
     (SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand))
forall (m :: * -> *) k u nid rpkt tp.
(MonadIO m, Eq k, Hashable k) =>
Maybe Int64
-> k -> NodeT u nid k rpkt tp m (SessionEnv u nid k rpkt)
newSessionEnv (Int64 -> Maybe Int64
forall a. a -> Maybe a
Just (-1)) Msgid
sid
  SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall (m :: * -> *) u nid k rpkt tp a.
Monad m =>
SessionEnv u nid k rpkt
-> SessionT u nid k rpkt tp m a -> NodeT u nid k rpkt tp m a
runSessionT_ SessionEnv (Maybe Job) Nid Msgid (Packet ServerCommand)
jobEnv (SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
 -> JobT tp m ())
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
-> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ Packet WorkerCommand
-> SessionT (Maybe Job) Nid Msgid (Packet ServerCommand) tp m ()
forall (m :: * -> *) tp spkt k u nid rpkt.
(MonadUnliftIO m, Transport tp, SendPacket spkt,
 SetPacketId k spkt) =>
spkt -> SessionT u nid k rpkt tp m ()
send (WorkerCommand -> Packet WorkerCommand
forall a. a -> Packet a
packetREQ WorkerCommand
GrabJob)

checkHealth
  :: (MonadUnliftIO m, Transport tp)
  => WorkerT tp m ()
checkHealth :: WorkerT tp m ()
checkHealth = JobT tp m () -> WorkerT tp m ()
forall (m :: * -> *) tp a. Monad m => JobT tp m a -> WorkerT tp m a
runJobT JobT tp m ()
forall (m :: * -> *) tp u.
(MonadUnliftIO m, Transport tp) =>
BaseClientT u tp m ()
BT.checkHealth

processJobQueue :: (MonadUnliftIO m, Transport tp) => WorkerEnv tp m -> JobT tp m ()
processJobQueue :: WorkerEnv tp m -> JobT tp m ()
processJobQueue wEnv :: WorkerEnv tp m
wEnv@WorkerEnv {..} = JobT tp m () -> JobT tp m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (JobT tp m () -> JobT tp m ()) -> JobT tp m () -> JobT tp m ()
forall a b. (a -> b) -> a -> b
$ do
  [(Msgid, Job)]
jobs <- STM [(Msgid, Job)]
-> NodeT
     (Maybe Job) Nid Msgid (Packet ServerCommand) tp m [(Msgid, Job)]
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM [(Msgid, Job)]
 -> NodeT
      (Maybe Job) Nid Msgid (Packet ServerCommand) tp m [(Msgid, Job)])
-> STM [(Msgid, Job)]
-> NodeT
     (Maybe Job) Nid Msgid (Packet ServerCommand) tp m [(Msgid, Job)]
forall a b. (a -> b) -> a -> b
$ do
    [(Msgid, Job)]
v <- JobList -> STM [(Msgid, Job)]
forall a. IOList a -> STM [a]
IL.toListSTM JobList
jobList
    if [(Msgid, Job)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(Msgid, Job)]
v then STM [(Msgid, Job)]
forall a. STM a
retrySTM
              else do
                JobList -> STM ()
forall a. IOList a -> STM ()
IL.clearSTM JobList
jobList
                [(Msgid, Job)] -> STM [(Msgid, Job)]
forall (m :: * -> *) a. Monad m => a -> m a
return [(Msgid, Job)]
v

  ((Msgid, Job)
 -> NodeT
      (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ()))
-> [(Msgid, Job)] -> JobT tp m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (JobT tp m ()
-> NodeT
     (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (JobT tp m ()
 -> NodeT
      (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ()))
-> ((Msgid, Job) -> JobT tp m ())
-> (Msgid, Job)
-> NodeT
     (Maybe Job) Nid Msgid (Packet ServerCommand) tp m (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WorkerEnv tp m -> (Msgid, Job) -> JobT tp m ()
forall (m :: * -> *) tp.
(MonadUnliftIO m, Transport tp) =>
WorkerEnv tp m -> (Msgid, Job) -> JobT tp m ()
processJob WorkerEnv tp m
wEnv) [(Msgid, Job)]
jobs