module Data.Poolboy
  ( -- * Configuration
    PoolboySettings (..),
    WorkersCountSettings (..),
    defaultPoolboySettings,
    poolboySettingsWith,
    simpleSerializedLogger,

    -- * Running
    WorkQueue,
    withPoolboy,
    newPoolboy,

    -- * Driving
    changeDesiredWorkersCount,
    waitReadyQueue,

    -- * Stopping
    stopWorkQueue,
    isStopedWorkQueue,
    WaitingStopStrategy,
    waitingStopTimeout,
    waitingStopFinishWorkers,

    -- * Enqueueing
    enqueue,
    enqueueAfter,
  )
where

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM.TQueue
import Control.Monad
import Control.Monad.STM
import Data.Maybe (isNothing)
import System.Timeout (timeout)
import UnliftIO.Exception (bracket, tryAny)

-- | Initial settings
data PoolboySettings = PoolboySettings
  { PoolboySettings -> WorkersCountSettings
workersCount :: WorkersCountSettings,
    PoolboySettings -> String -> IO ()
log :: String -> IO ()
  }

-- | Initial number of threads
data WorkersCountSettings
  = -- | 'getNumCapabilities' based number
    CapabilitiesWCS
  | FixedWCS Int -- arbitrary number
  deriving stock (WorkersCountSettings -> WorkersCountSettings -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkersCountSettings -> WorkersCountSettings -> Bool
$c/= :: WorkersCountSettings -> WorkersCountSettings -> Bool
== :: WorkersCountSettings -> WorkersCountSettings -> Bool
$c== :: WorkersCountSettings -> WorkersCountSettings -> Bool
Eq, Int -> WorkersCountSettings -> ShowS
[WorkersCountSettings] -> ShowS
WorkersCountSettings -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkersCountSettings] -> ShowS
$cshowList :: [WorkersCountSettings] -> ShowS
show :: WorkersCountSettings -> String
$cshow :: WorkersCountSettings -> String
showsPrec :: Int -> WorkersCountSettings -> ShowS
$cshowsPrec :: Int -> WorkersCountSettings -> ShowS
Show)

-- | Usual configuration 'CapabilitiesWCS' and no log
defaultPoolboySettings :: PoolboySettings
defaultPoolboySettings :: PoolboySettings
defaultPoolboySettings =
  PoolboySettings
    { $sel:workersCount:PoolboySettings :: WorkersCountSettings
workersCount = WorkersCountSettings
CapabilitiesWCS,
      $sel:log:PoolboySettings :: String -> IO ()
log = \String
x -> seq :: forall a b. a -> b -> b
seq String
x forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return ()
    }

-- | Arbitrary-numbered settings
poolboySettingsWith :: Int -> PoolboySettings
poolboySettingsWith :: Int -> PoolboySettings
poolboySettingsWith Int
c = PoolboySettings
defaultPoolboySettings {$sel:workersCount:PoolboySettings :: WorkersCountSettings
workersCount = Int -> WorkersCountSettings
FixedWCS Int
c}

-- | Simple (but not particularly performant) serialized logger
simpleSerializedLogger :: IO (String -> IO ())
simpleSerializedLogger :: IO (String -> IO ())
simpleSerializedLogger = do
  MVar ()
logLock <- forall a. a -> IO (MVar a)
newMVar ()
  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ \String
x ->
    forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar ()
logLock forall a b. (a -> b) -> a -> b
$ \() -> do
      String -> IO ()
putStrLn String
x
      forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | 'backet'-based usage (recommended)
withPoolboy :: PoolboySettings -> WaitingStopStrategy -> (WorkQueue -> IO a) -> IO a
withPoolboy :: forall a.
PoolboySettings
-> WaitingStopStrategy -> (WorkQueue -> IO a) -> IO a
withPoolboy PoolboySettings
settings WaitingStopStrategy
waitStopWorkQueue = forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (PoolboySettings -> IO WorkQueue
newPoolboy PoolboySettings
settings) (\WorkQueue
wq -> WaitingStopStrategy
stopWorkQueue WorkQueue
wq forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WaitingStopStrategy
waitStopWorkQueue WorkQueue
wq)

-- | Standalone/manual usage
newPoolboy :: PoolboySettings -> IO WorkQueue
newPoolboy :: PoolboySettings -> IO WorkQueue
newPoolboy PoolboySettings
settings = do
  WorkQueue
wq <-
    TQueue Commands
-> TQueue (Either () (IO ())) -> MVar () -> QSemN -> WorkQueue
WorkQueue
      forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IO (TQueue a)
newTQueueIO
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. IO (TQueue a)
newTQueueIO
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. IO (MVar a)
newEmptyMVar
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO QSemN
newQSemN Int
0

  Int
count <-
    case PoolboySettings
settings.workersCount of
      WorkersCountSettings
CapabilitiesWCS -> IO Int
getNumCapabilities
      FixedWCS Int
x -> forall (m :: * -> *) a. Monad m => a -> m a
return Int
x

  WorkQueue -> Int -> IO ()
changeDesiredWorkersCount WorkQueue
wq Int
count
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$
    IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$
      WorkQueueControllerState -> IO ()
controller forall a b. (a -> b) -> a -> b
$
        WorkQueueControllerState
          { $sel:commands:WorkQueueControllerState :: TQueue Commands
commands = WorkQueue
wq.commands,
            $sel:queue:WorkQueueControllerState :: TQueue (Either () (IO ()))
queue = WorkQueue
wq.queue,
            $sel:stopped:WorkQueueControllerState :: MVar ()
stopped = WorkQueue
wq.stopped,
            $sel:log:WorkQueueControllerState :: String -> IO ()
log = PoolboySettings
settings.log,
            $sel:workers:WorkQueueControllerState :: [Async ()]
workers = [],
            $sel:waitingWorkers:WorkQueueControllerState :: QSemN
waitingWorkers = WorkQueue
wq.waitingWorkers,
            $sel:capabilityCount:WorkQueueControllerState :: Int
capabilityCount = Int
0
          }

  forall (m :: * -> *) a. Monad m => a -> m a
return WorkQueue
wq

-- | Request a worker number adjustment
changeDesiredWorkersCount :: WorkQueue -> Int -> IO ()
changeDesiredWorkersCount :: WorkQueue -> Int -> IO ()
changeDesiredWorkersCount WorkQueue
wq =
  forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.commands forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Commands
ChangeDesiredWorkersCount

-- | Request stopping wrokers
stopWorkQueue :: WorkQueue -> IO ()
stopWorkQueue :: WaitingStopStrategy
stopWorkQueue WorkQueue
wq =
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.commands Commands
Stop

-- | Non-blocking check of the work queue's running status
isStopedWorkQueue :: WorkQueue -> IO Bool
isStopedWorkQueue :: WorkQueue -> IO Bool
isStopedWorkQueue WorkQueue
wq =
  Bool -> Bool
not forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. MVar a -> IO Bool
isEmptyMVar WorkQueue
wq.stopped

type WaitingStopStrategy = WorkQueue -> IO ()

-- | Block until the queue is totally stopped (no more running worker)
waitingStopFinishWorkers :: WaitingStopStrategy
waitingStopFinishWorkers :: WaitingStopStrategy
waitingStopFinishWorkers WorkQueue
wq =
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
readMVar WorkQueue
wq.stopped

-- | Block until the queue is totally stopped or deadline (in micro seconds) is reached
waitingStopTimeout :: Int -> WaitingStopStrategy
waitingStopTimeout :: Int -> WaitingStopStrategy
waitingStopTimeout Int
delay WorkQueue
wq = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Int -> IO a -> IO (Maybe a)
timeout Int
delay forall a b. (a -> b) -> a -> b
$ WaitingStopStrategy
waitingStopFinishWorkers WorkQueue
wq

-- | Enqueue one action in the work queue (non-blocking)
enqueue :: WorkQueue -> IO () -> IO ()
enqueue :: WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq =
  forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.queue forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right

-- | Block until one worker is available
waitReadyQueue :: WorkQueue -> IO ()
waitReadyQueue :: WaitingStopStrategy
waitReadyQueue WorkQueue
wq = do
  QSemN -> Int -> IO ()
waitQSemN WorkQueue
wq.waitingWorkers Int
1
  QSemN -> Int -> IO ()
signalQSemN WorkQueue
wq.waitingWorkers Int
1

-- | Enqueue action and some actions to be run after it
enqueueAfter :: (Foldable f) => WorkQueue -> IO () -> f (IO ()) -> IO ()
enqueueAfter :: forall (f :: * -> *).
Foldable f =>
WorkQueue -> IO () -> f (IO ()) -> IO ()
enqueueAfter WorkQueue
wq IO ()
x f (IO ())
xs =
  WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq forall a b. (a -> b) -> a -> b
$ do
    IO ()
x
    forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ f (IO ())
xs forall a b. (a -> b) -> a -> b
$ WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq

-- Support (internal)
data WorkQueue = WorkQueue
  { WorkQueue -> TQueue Commands
commands :: TQueue Commands,
    WorkQueue -> TQueue (Either () (IO ()))
queue :: TQueue (Either () (IO ())),
    WorkQueue -> MVar ()
stopped :: MVar (),
    WorkQueue -> QSemN
waitingWorkers :: QSemN
  }

data Commands
  = ChangeDesiredWorkersCount Int
  | Stop
  deriving stock (Int -> Commands -> ShowS
[Commands] -> ShowS
Commands -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Commands] -> ShowS
$cshowList :: [Commands] -> ShowS
show :: Commands -> String
$cshow :: Commands -> String
showsPrec :: Int -> Commands -> ShowS
$cshowsPrec :: Int -> Commands -> ShowS
Show)

data WorkQueueControllerState = WorkQueueControllerState
  { WorkQueueControllerState -> TQueue Commands
commands :: TQueue Commands,
    WorkQueueControllerState -> TQueue (Either () (IO ()))
queue :: TQueue (Either () (IO ())),
    WorkQueueControllerState -> MVar ()
stopped :: MVar (),
    WorkQueueControllerState -> String -> IO ()
log :: String -> IO (),
    WorkQueueControllerState -> [Async ()]
workers :: [Async ()],
    WorkQueueControllerState -> QSemN
waitingWorkers :: QSemN,
    WorkQueueControllerState -> Int
capabilityCount :: Int
  }

controller :: WorkQueueControllerState -> IO ()
controller :: WorkQueueControllerState -> IO ()
controller WorkQueueControllerState
wq = do
  Commands
command <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> STM a
readTQueue WorkQueueControllerState
wq.commands
  let stopOneWorker :: IO ()
stopOneWorker = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueueControllerState
wq.queue forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left ()
      getLiveWorkers :: IO [Async ()]
getLiveWorkers = forall (m :: * -> *) a.
Applicative m =>
(a -> m Bool) -> [a] -> m [a]
filterM (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Maybe a -> Bool
isNothing forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> IO (Maybe (Either SomeException a))
poll) WorkQueueControllerState
wq.workers
      prefix :: String
prefix = String
"Controller: "
  WorkQueueControllerState
wq.log forall a b. (a -> b) -> a -> b
$ String
prefix forall a. Semigroup a => a -> a -> a
<> String
"Command: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Commands
command
  case Commands
command of
    ChangeDesiredWorkersCount Int
n -> do
      [Async ()]
liveWorkers <- IO [Async ()]
getLiveWorkers
      let diff :: Int
diff = forall (t :: * -> *) a. Foldable t => t a -> Int
length [Async ()]
liveWorkers forall a. Num a => a -> a -> a
- Int
n
      ([Async ()]
newWorkers, Int
newCapabilityCount) <-
        if Int
diff forall a. Ord a => a -> a -> Bool
> Int
0
          then do
            forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
diff IO ()
stopOneWorker
            forall (m :: * -> *) a. Monad m => a -> m a
return ([], WorkQueueControllerState
wq.capabilityCount)
          else do
            let newWorkersCount :: Int
newWorkersCount = forall a. Num a => a -> a
abs Int
diff
            [Async ()]
newWorkers <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1 .. Int
newWorkersCount] forall a b. (a -> b) -> a -> b
$ \Int
capability -> do
              WorkQueueControllerState
wq.log forall a b. (a -> b) -> a -> b
$ String
prefix forall a. Semigroup a => a -> a -> a
<> String
"Pre-fork"
              forall a. Int -> IO a -> IO (Async a)
asyncOn (Int
capability forall a. Num a => a -> a -> a
- Int
1) forall a b. (a -> b) -> a -> b
$ WorkQueueWorkerState -> IO ()
worker forall a b. (a -> b) -> a -> b
$ WorkQueueWorkerState {$sel:queue:WorkQueueWorkerState :: TQueue (Either () (IO ()))
queue = WorkQueueControllerState
wq.queue, $sel:waitingWorkers:WorkQueueWorkerState :: QSemN
waitingWorkers = WorkQueueControllerState
wq.waitingWorkers, $sel:log:WorkQueueWorkerState :: String -> IO ()
log = WorkQueueControllerState
wq.log}
            forall (m :: * -> *) a. Monad m => a -> m a
return ([Async ()]
newWorkers, WorkQueueControllerState
wq.capabilityCount forall a. Num a => a -> a -> a
+ Int
newWorkersCount)
      WorkQueueControllerState -> IO ()
controller forall a b. (a -> b) -> a -> b
$ WorkQueueControllerState
wq {$sel:workers:WorkQueueControllerState :: [Async ()]
workers = [Async ()]
newWorkers forall a. Semigroup a => a -> a -> a
<> [Async ()]
liveWorkers, $sel:capabilityCount:WorkQueueControllerState :: Int
capabilityCount = Int
newCapabilityCount}
    Commands
Stop -> do
      [Async ()]
liveWorkers <- IO [Async ()]
getLiveWorkers
      let currentCount :: Int
currentCount = forall (t :: * -> *) a. Foldable t => t a -> Int
length [Async ()]
liveWorkers
      WorkQueueControllerState
wq.log forall a b. (a -> b) -> a -> b
$ String
prefix forall a. Semigroup a => a -> a -> a
<> String
"Stopping " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
currentCount forall a. Semigroup a => a -> a -> a
<> String
" workers"
      QSemN -> Int -> IO ()
waitQSemN WorkQueueControllerState
wq.waitingWorkers Int
currentCount
      forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
currentCount IO ()
stopOneWorker
      forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar WorkQueueControllerState
wq.stopped ()

data WorkQueueWorkerState = WorkQueueWorkerState
  { WorkQueueWorkerState -> TQueue (Either () (IO ()))
queue :: TQueue (Either () (IO ())),
    WorkQueueWorkerState -> QSemN
waitingWorkers :: QSemN,
    WorkQueueWorkerState -> String -> IO ()
log :: String -> IO ()
  }

worker :: WorkQueueWorkerState -> IO ()
worker :: WorkQueueWorkerState -> IO ()
worker WorkQueueWorkerState
wq = do
  String
workerId <- forall a. Show a => a -> String
show forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO ThreadId
myThreadId
  let prefix :: String
prefix = String
"Worker [" forall a. Semigroup a => a -> a -> a
<> String
workerId forall a. Semigroup a => a -> a -> a
<> String
"]: "
  WorkQueueWorkerState
wq.log forall a b. (a -> b) -> a -> b
$ String
prefix forall a. Semigroup a => a -> a -> a
<> String
"Starting"
  let loop :: IO ()
loop = do
        QSemN -> Int -> IO ()
signalQSemN WorkQueueWorkerState
wq.waitingWorkers Int
1
        Either () (IO ())
command <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> STM a
readTQueue WorkQueueWorkerState
wq.queue
        case Either () (IO ())
command of
          Left () -> do
            WorkQueueWorkerState
wq.log forall a b. (a -> b) -> a -> b
$ String
prefix forall a. Semigroup a => a -> a -> a
<> String
"Stopping"
          Right IO ()
act -> do
            QSemN -> Int -> IO ()
waitQSemN WorkQueueWorkerState
wq.waitingWorkers Int
1
            WorkQueueWorkerState
wq.log (String
prefix forall a. Semigroup a => a -> a -> a
<> String
"pop")
            forall (f :: * -> *) a. Functor f => f a -> f ()
void (forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny IO ()
act)
            WorkQueueWorkerState
wq.log (String
prefix forall a. Semigroup a => a -> a -> a
<> String
"poped")
            IO ()
loop
  IO ()
loop