{-# LANGUAGE FlexibleContexts     #-}
{-# LANGUAGE FlexibleInstances    #-}
{-# LANGUAGE GADTs                #-}
{-# LANGUAGE OverloadedStrings    #-}
{-# LANGUAGE ScopedTypeVariables  #-}
{-# LANGUAGE Strict               #-}
{-# LANGUAGE TemplateHaskell      #-}
{-# LANGUAGE TypeFamilies         #-}
{-# LANGUAGE UndecidableInstances #-}

module Experimenter.MasterSlave
    ( WorkerStatus (..)
    , createKeepAliveFork
    , waitForSlaves
    , keepAliveTimeout
    ) where

import           Control.Lens
import           Control.Monad.IO.Class
import           Control.Monad.Logger         (NoLoggingT, logInfo, runNoLoggingT)
import           Data.IORef

import           Control.Concurrent           (forkIO, threadDelay)
import           Control.Monad.Reader
import           Control.Monad.Trans.Resource
import qualified Data.Text                    as T
import           Data.Time                    (UTCTime, diffUTCTime, getCurrentTime)
import           Database.Persist.Postgresql
import           Network.HostName
import           System.Posix.Process


import           Experimenter.DatabaseSetting
import           Experimenter.DB
import           Experimenter.Models
import           Experimenter.Result
import           Experimenter.Util


keepAliveTimeout :: Num t => t
keepAliveTimeout :: t
keepAliveTimeout = t
10


data WorkerStatus = Working | Finished
  deriving (WorkerStatus -> WorkerStatus -> Bool
(WorkerStatus -> WorkerStatus -> Bool)
-> (WorkerStatus -> WorkerStatus -> Bool) -> Eq WorkerStatus
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkerStatus -> WorkerStatus -> Bool
$c/= :: WorkerStatus -> WorkerStatus -> Bool
== :: WorkerStatus -> WorkerStatus -> Bool
$c== :: WorkerStatus -> WorkerStatus -> Bool
Eq, Int -> WorkerStatus -> ShowS
[WorkerStatus] -> ShowS
WorkerStatus -> String
(Int -> WorkerStatus -> ShowS)
-> (WorkerStatus -> String)
-> ([WorkerStatus] -> ShowS)
-> Show WorkerStatus
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerStatus] -> ShowS
$cshowList :: [WorkerStatus] -> ShowS
show :: WorkerStatus -> String
$cshow :: WorkerStatus -> String
showsPrec :: Int -> WorkerStatus -> ShowS
$cshowsPrec :: Int -> WorkerStatus -> ShowS
Show)

createKeepAliveFork :: DatabaseSetting -> (UTCTime -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()) -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) () -> IO (IORef WorkerStatus)
createKeepAliveFork :: DatabaseSetting
-> (UTCTime -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ())
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
-> IO (IORef WorkerStatus)
createKeepAliveFork DatabaseSetting
dbSetup UTCTime -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
updateFunction ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
deletionFunction = do
  IORef WorkerStatus
ref <- IO (IORef WorkerStatus) -> IO (IORef WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef WorkerStatus) -> IO (IORef WorkerStatus))
-> IO (IORef WorkerStatus) -> IO (IORef WorkerStatus)
forall a b. (a -> b) -> a -> b
$ WorkerStatus -> IO (IORef WorkerStatus)
forall a. a -> IO (IORef a)
newIORef WorkerStatus
Working
  IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO ThreadId -> IO ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ThreadId -> IO ThreadId) -> IO ThreadId -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (NoLoggingT IO () -> IO ()
forall (m :: * -> *) a. NoLoggingT m a -> m a
runNoLoggingT (NoLoggingT IO () -> IO ()) -> NoLoggingT IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ConnectionString
-> Int -> (Pool SqlBackend -> NoLoggingT IO ()) -> NoLoggingT IO ()
forall (m :: * -> *) a.
(MonadLogger m, MonadUnliftIO m) =>
ConnectionString -> Int -> (Pool SqlBackend -> m a) -> m a
withPostgresqlPool (DatabaseSetting -> ConnectionString
connectionString DatabaseSetting
dbSetup) Int
1 ((Pool SqlBackend -> NoLoggingT IO ()) -> NoLoggingT IO ())
-> (Pool SqlBackend -> NoLoggingT IO ()) -> NoLoggingT IO ()
forall a b. (a -> b) -> a -> b
$ ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
-> Pool SqlBackend -> NoLoggingT IO ()
forall backend (m :: * -> *) a.
(MonadIO m, BackendCompatible SqlBackend backend) =>
ReaderT backend (NoLoggingT (ResourceT IO)) a
-> Pool backend -> m a
liftSqlPersistMPool (ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
 -> Pool SqlBackend -> NoLoggingT IO ())
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
-> Pool SqlBackend
-> NoLoggingT IO ()
forall a b. (a -> b) -> a -> b
$ IORef WorkerStatus
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
keepAlive IORef WorkerStatus
ref)
  IORef WorkerStatus -> IO (IORef WorkerStatus)
forall (m :: * -> *) a. Monad m => a -> m a
return IORef WorkerStatus
ref
  where
    keepAlive :: IORef WorkerStatus
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
keepAlive IORef WorkerStatus
ref = do
      WorkerStatus
res <- IO WorkerStatus
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) WorkerStatus
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO WorkerStatus
 -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) WorkerStatus)
-> IO WorkerStatus
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) WorkerStatus
forall a b. (a -> b) -> a -> b
$ IORef WorkerStatus -> IO WorkerStatus
forall a. IORef a -> IO a
readIORef IORef WorkerStatus
ref
      if WorkerStatus
res WorkerStatus -> WorkerStatus -> Bool
forall a. Eq a => a -> a -> Bool
== WorkerStatus
Working
        then do
          UTCTime
time <- IO UTCTime
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
          UTCTime -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
updateFunction UTCTime
time
          ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
forall (m :: * -> *). MonadIO m => ReaderT SqlBackend m ()
transactionSave
          IO () -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ())
-> IO () -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int
1000000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
forall t. Num t => t
keepAliveTimeout)
          IORef WorkerStatus
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
keepAlive IORef WorkerStatus
ref
        else ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
deletionFunction

waitForSlaves :: (MonadIO m) => Experiments a -> DB m Bool
waitForSlaves :: Experiments a -> DB m Bool
waitForSlaves Experiments a
exps = do
  ProcessID
pid <- IO ProcessID
-> ReaderT SqlBackend (LoggingT (ResourceT m)) ProcessID
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ProcessID
getProcessID
  Text
hostName <- String -> Text
T.pack (String -> Text)
-> ReaderT SqlBackend (LoggingT (ResourceT m)) String
-> ReaderT SqlBackend (LoggingT (ResourceT m)) Text
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO String -> ReaderT SqlBackend (LoggingT (ResourceT m)) String
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO String
getHostName
  let notSelf :: ExpExecutionLock -> Bool
notSelf (ExpExecutionLock Key Exp
_ Text
h Int
p UTCTime
_) = Bool -> Bool
not (Text
h Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
== Text
hostName Bool -> Bool -> Bool
&& Int
p Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessID -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral ProcessID
pid)
  [Key Exp]
expIds <- [Filter Exp]
-> [SelectOpt Exp]
-> ReaderT SqlBackend (LoggingT (ResourceT m)) [Key Exp]
forall record backend (m :: * -> *).
(MonadIO m, PersistQueryRead backend,
 PersistRecordBackend record backend) =>
[Filter record]
-> [SelectOpt record] -> ReaderT backend m [Key record]
selectKeysList [EntityField Exp (Key Exps)
forall typ. (typ ~ Key Exps) => EntityField Exp typ
ExpExps EntityField Exp (Key Exps) -> Key Exps -> Filter Exp
forall v typ.
PersistField typ =>
EntityField v typ -> typ -> Filter v
==. Key Exps
expsId] []
  (ExpExecutionLock -> Bool) -> [Key Exp] -> DB m Bool
forall backend (m :: * -> *).
(MonadIO m, PersistQueryWrite backend, MonadLogger m,
 BaseBackend backend ~ SqlBackend) =>
(ExpExecutionLock -> Bool) -> [Key Exp] -> ReaderT backend m Bool
waitForSlaves' ExpExecutionLock -> Bool
notSelf [Key Exp]
expIds
  where
    expsId :: Key Exps
expsId = Experiments a
exps Experiments a
-> Getting (Key Exps) (Experiments a) (Key Exps) -> Key Exps
forall s a. s -> Getting a s a -> a
^. Getting (Key Exps) (Experiments a) (Key Exps)
forall a. Lens' (Experiments a) (Key Exps)
experimentsKey
    waitForSlaves' :: (ExpExecutionLock -> Bool) -> [Key Exp] -> ReaderT backend m Bool
waitForSlaves' ExpExecutionLock -> Bool
notSelf [Key Exp]
expIds = do
      [ExpExecutionLock]
locks <- (ExpExecutionLock -> Bool)
-> [ExpExecutionLock] -> [ExpExecutionLock]
forall a. (a -> Bool) -> [a] -> [a]
filter ExpExecutionLock -> Bool
notSelf ([ExpExecutionLock] -> [ExpExecutionLock])
-> ([Entity ExpExecutionLock] -> [ExpExecutionLock])
-> [Entity ExpExecutionLock]
-> [ExpExecutionLock]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Entity ExpExecutionLock -> ExpExecutionLock)
-> [Entity ExpExecutionLock] -> [ExpExecutionLock]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Entity ExpExecutionLock -> ExpExecutionLock
forall record. Entity record -> record
entityVal ([Entity ExpExecutionLock] -> [ExpExecutionLock])
-> ReaderT backend m [Entity ExpExecutionLock]
-> ReaderT backend m [ExpExecutionLock]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Filter ExpExecutionLock]
-> [SelectOpt ExpExecutionLock]
-> ReaderT backend m [Entity ExpExecutionLock]
forall record backend (m :: * -> *).
(MonadIO m, PersistQueryRead backend,
 PersistRecordBackend record backend) =>
[Filter record]
-> [SelectOpt record] -> ReaderT backend m [Entity record]
selectList [EntityField ExpExecutionLock (Key Exp)
forall typ. (typ ~ Key Exp) => EntityField ExpExecutionLock typ
ExpExecutionLockExp EntityField ExpExecutionLock (Key Exp)
-> [Key Exp] -> Filter ExpExecutionLock
forall v typ.
PersistField typ =>
EntityField v typ -> [typ] -> Filter v
<-. [Key Exp]
expIds] []
      if [ExpExecutionLock] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ExpExecutionLock]
locks
        then do
        [Filter ExpProgress] -> ReaderT backend m ()
forall backend (m :: * -> *) record.
(PersistQueryWrite backend, MonadIO m,
 PersistRecordBackend record backend) =>
[Filter record] -> ReaderT backend m ()
deleteWhere [EntityField ExpProgress (Key Exp)
forall typ. (typ ~ Key Exp) => EntityField ExpProgress typ
ExpProgressExp EntityField ExpProgress (Key Exp)
-> [Key Exp] -> Filter ExpProgress
forall v typ.
PersistField typ =>
EntityField v typ -> [typ] -> Filter v
<-. [Key Exp]
expIds]
        Bool -> ReaderT backend m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        else do
          UTCTime
time <- IO UTCTime -> ReaderT backend m UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
          let workingSlaves :: [ExpExecutionLock]
workingSlaves = (ExpExecutionLock -> Bool)
-> [ExpExecutionLock] -> [ExpExecutionLock]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ExpExecutionLock
l -> UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
time (ExpExecutionLock
l ExpExecutionLock
-> Getting UTCTime ExpExecutionLock UTCTime -> UTCTime
forall s a. s -> Getting a s a -> a
^. Getting UTCTime ExpExecutionLock UTCTime
forall (f :: * -> *).
Functor f =>
(UTCTime -> f UTCTime) -> ExpExecutionLock -> f ExpExecutionLock
expExecutionLockLastAliveSign) NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
<= NominalDiffTime
2 NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
forall t. Num t => t
keepAliveTimeout) [ExpExecutionLock]
locks
          if [ExpExecutionLock] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ExpExecutionLock]
workingSlaves
            then Bool -> ReaderT backend m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False -- a slive must have died as it didn't delete the lock
            else do
              $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> ReaderT backend m ()
(Text -> ReaderT backend m ())
-> (Text -> Text) -> Text -> ReaderT backend m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
pack :: String -> Text
logInfo) Text
"Waiting for slaves. List of slaves currently working: "
              (ExpExecutionLock -> ReaderT backend m ())
-> [ExpExecutionLock] -> ReaderT backend m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ExpExecutionLock -> ReaderT backend m ()
forall (m :: * -> *). MonadLogger m => ExpExecutionLock -> m ()
printInfoSlave [ExpExecutionLock]
locks
              IO () -> ReaderT backend m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ReaderT backend m ()) -> IO () -> ReaderT backend m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int
1000000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
forall t. Num t => t
keepAliveTimeout)
              (ExpExecutionLock -> Bool) -> [Key Exp] -> ReaderT backend m Bool
waitForSlaves' ExpExecutionLock -> Bool
notSelf [Key Exp]
expIds
    printInfoSlave :: ExpExecutionLock -> m ()
printInfoSlave (ExpExecutionLock Key Exp
_ Text
host Int
pid UTCTime
_) = $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> m ()
(Text -> m ()) -> (Text -> Text) -> Text -> m ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
pack :: String -> Text
logInfo) (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Slave from host " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
forall a. Show a => a -> Text
tshow Text
host Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" with process ID " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Int -> Text
forall a. Show a => a -> Text
tshow Int
pid