module Data.Poolboy
  ( -- * Configuration
    PoolboySettings (..),
    WorkersCountSettings (..),
    defaultPoolboySettings,
    poolboySettingsWith,
    simpleSerializedLogger,

    -- * Running
    WorkQueue,
    withPoolboy,
    newPoolboy,

    -- * Driving
    changeDesiredWorkersCount,
    waitReadyQueue,

    -- * Stopping
    stopWorkQueue,
    isStopedWorkQueue,
    WaitingStopStrategy,
    waitingStopTimeout,
    waitingStopFinishWorkers,

    -- * Enqueueing
    enqueue,
    enqueueAfter,
  )
where

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM.TQueue
import Control.Exception.Safe (bracket, tryAny)
import Control.Monad
import Control.Monad.STM
import Data.Maybe (isNothing)
import System.Timeout (timeout)

-- | Initial settings
data PoolboySettings = PoolboySettings
  { PoolboySettings -> WorkersCountSettings
workersCount :: WorkersCountSettings,
    PoolboySettings -> String -> IO ()
log :: String -> IO ()
  }

-- | Initial number of threads
data WorkersCountSettings
  = -- | 'getNumCapabilities' based number
    CapabilitiesWCS
  | FixedWCS Int -- arbitrary number
  deriving stock (WorkersCountSettings -> WorkersCountSettings -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkersCountSettings -> WorkersCountSettings -> Bool
$c/= :: WorkersCountSettings -> WorkersCountSettings -> Bool
== :: WorkersCountSettings -> WorkersCountSettings -> Bool
$c== :: WorkersCountSettings -> WorkersCountSettings -> Bool
Eq, Int -> WorkersCountSettings -> ShowS
[WorkersCountSettings] -> ShowS
WorkersCountSettings -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkersCountSettings] -> ShowS
$cshowList :: [WorkersCountSettings] -> ShowS
show :: WorkersCountSettings -> String
$cshow :: WorkersCountSettings -> String
showsPrec :: Int -> WorkersCountSettings -> ShowS
$cshowsPrec :: Int -> WorkersCountSettings -> ShowS
Show)

-- | Usual configuration 'CapabilitiesWCS' and no log
defaultPoolboySettings :: PoolboySettings
defaultPoolboySettings :: PoolboySettings
defaultPoolboySettings =
  PoolboySettings
    { $sel:workersCount:PoolboySettings :: WorkersCountSettings
workersCount = WorkersCountSettings
CapabilitiesWCS,
      $sel:log:PoolboySettings :: String -> IO ()
log = \String
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
    }

-- | Arbitrary-numbered settings
poolboySettingsWith :: Int -> PoolboySettings
poolboySettingsWith :: Int -> PoolboySettings
poolboySettingsWith Int
c = PoolboySettings
defaultPoolboySettings {$sel:workersCount:PoolboySettings :: WorkersCountSettings
workersCount = Int -> WorkersCountSettings
FixedWCS Int
c}

-- | Simple (but not particularly performant) serialized logger
simpleSerializedLogger :: IO (String -> IO ())
simpleSerializedLogger :: IO (String -> IO ())
simpleSerializedLogger = do
  MVar ()
logLock <- forall a. a -> IO (MVar a)
newMVar ()
  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ \String
x ->
    forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar ()
logLock forall a b. (a -> b) -> a -> b
$ \() -> do
      String -> IO ()
putStrLn String
x
      forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | 'backet'-based usage (recommended)
withPoolboy :: PoolboySettings -> WaitingStopStrategy -> (WorkQueue -> IO a) -> IO a
withPoolboy :: forall a.
PoolboySettings
-> WaitingStopStrategy -> (WorkQueue -> IO a) -> IO a
withPoolboy PoolboySettings
settings WaitingStopStrategy
waitStopWorkQueue = forall (m :: * -> *) a b c.
MonadMask m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (PoolboySettings -> IO WorkQueue
newPoolboy PoolboySettings
settings) (\WorkQueue
wq -> WaitingStopStrategy
stopWorkQueue WorkQueue
wq forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WaitingStopStrategy
waitStopWorkQueue WorkQueue
wq)

-- | Standalone/manual usage
newPoolboy :: PoolboySettings -> IO WorkQueue
newPoolboy :: PoolboySettings -> IO WorkQueue
newPoolboy PoolboySettings
settings = do
  WorkQueue
wq <-
    TQueue Commands
-> TQueue (Either () (IO ()))
-> MVar ()
-> (String -> IO ())
-> WorkQueue
WorkQueue
      forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IO (TQueue a)
newTQueueIO
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. IO (TQueue a)
newTQueueIO
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. IO (MVar a)
newEmptyMVar
      forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. Monad m => a -> m a
return PoolboySettings
settings.log

  Int
count <-
    case PoolboySettings
settings.workersCount of
      WorkersCountSettings
CapabilitiesWCS -> IO Int
getNumCapabilities
      FixedWCS Int
x -> forall (m :: * -> *) a. Monad m => a -> m a
return Int
x

  WorkQueue -> Int -> IO ()
changeDesiredWorkersCount WorkQueue
wq Int
count
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ WorkQueue -> [Async ()] -> IO ()
controller WorkQueue
wq []

  forall (m :: * -> *) a. Monad m => a -> m a
return WorkQueue
wq

-- | Request a worker number adjustment
changeDesiredWorkersCount :: WorkQueue -> Int -> IO ()
changeDesiredWorkersCount :: WorkQueue -> Int -> IO ()
changeDesiredWorkersCount WorkQueue
wq =
  forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.commands forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Commands
ChangeDesiredWorkersCount

-- | Request stopping wrokers
stopWorkQueue :: WorkQueue -> IO ()
stopWorkQueue :: WaitingStopStrategy
stopWorkQueue WorkQueue
wq =
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.commands Commands
Stop

-- | Non-blocking check of the work queue's running status
isStopedWorkQueue :: WorkQueue -> IO Bool
isStopedWorkQueue :: WorkQueue -> IO Bool
isStopedWorkQueue WorkQueue
wq =
  Bool -> Bool
not forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. MVar a -> IO Bool
isEmptyMVar WorkQueue
wq.stopped

type WaitingStopStrategy = WorkQueue -> IO ()

-- | Block until the queue is totally stopped (no more running worker)
waitingStopFinishWorkers :: WaitingStopStrategy
waitingStopFinishWorkers :: WaitingStopStrategy
waitingStopFinishWorkers WorkQueue
wq =
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
readMVar WorkQueue
wq.stopped

-- | Block until the queue is totally stopped or deadline (in micro seconds) is reached
waitingStopTimeout :: Int -> WaitingStopStrategy
waitingStopTimeout :: Int -> WaitingStopStrategy
waitingStopTimeout Int
delay WorkQueue
wq = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Int -> IO a -> IO (Maybe a)
timeout Int
delay forall a b. (a -> b) -> a -> b
$ WaitingStopStrategy
waitingStopFinishWorkers WorkQueue
wq

-- | Enqueue one action in the work queue (non-blocking)
enqueue :: WorkQueue -> IO () -> IO ()
enqueue :: WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq =
  forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.queue forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right

-- | Block until one worker is available
waitReadyQueue :: WorkQueue -> IO ()
waitReadyQueue :: WaitingStopStrategy
waitReadyQueue WorkQueue
wq = do
  MVar ()
ready <- forall a. IO (MVar a)
newEmptyMVar
  WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO ()
putMVar MVar ()
ready ()
  forall a. MVar a -> IO a
readMVar MVar ()
ready

-- | Enqueue action and some actions to be run after it
enqueueAfter :: Foldable f => WorkQueue -> IO () -> f (IO ()) -> IO ()
enqueueAfter :: forall (f :: * -> *).
Foldable f =>
WorkQueue -> IO () -> f (IO ()) -> IO ()
enqueueAfter WorkQueue
wq IO ()
x f (IO ())
xs =
  WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq forall a b. (a -> b) -> a -> b
$ do
    IO ()
x
    forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ f (IO ())
xs forall a b. (a -> b) -> a -> b
$ WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq

-- Support (internal)
data WorkQueue = WorkQueue
  { WorkQueue -> TQueue Commands
commands :: TQueue Commands,
    WorkQueue -> TQueue (Either () (IO ()))
queue :: TQueue (Either () (IO ())),
    WorkQueue -> MVar ()
stopped :: MVar (),
    WorkQueue -> String -> IO ()
log :: String -> IO ()
  }

data Commands
  = ChangeDesiredWorkersCount Int
  | Stop
  deriving stock (Int -> Commands -> ShowS
[Commands] -> ShowS
Commands -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Commands] -> ShowS
$cshowList :: [Commands] -> ShowS
show :: Commands -> String
$cshow :: Commands -> String
showsPrec :: Int -> Commands -> ShowS
$cshowsPrec :: Int -> Commands -> ShowS
Show)

controller :: WorkQueue -> [Async ()]  -> IO ()
controller :: WorkQueue -> [Async ()] -> IO ()
controller WorkQueue
wq [Async ()]
workers = do
  Commands
command <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> STM a
readTQueue WorkQueue
wq.commands
  let stopOneWorker :: IO ()
stopOneWorker = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.queue forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left ()
      getLiveWorkers :: IO [Async ()]
getLiveWorkers = forall (m :: * -> *) a.
Applicative m =>
(a -> m Bool) -> [a] -> m [a]
filterM (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Maybe a -> Bool
isNothing forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> IO (Maybe (Either SomeException a))
poll) [Async ()]
workers
  WorkQueue
wq.log forall a b. (a -> b) -> a -> b
$ String
"Command: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Commands
command
  case Commands
command of
    ChangeDesiredWorkersCount Int
n -> do
      [Async ()]
liveWorkers <- IO [Async ()]
getLiveWorkers
      let diff :: Int
diff = forall (t :: * -> *) a. Foldable t => t a -> Int
length [Async ()]
liveWorkers forall a. Num a => a -> a -> a
- Int
n
      [Async ()]
newWorkers <-
        if Int
diff forall a. Ord a => a -> a -> Bool
> Int
0
          then do
            forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
diff IO ()
stopOneWorker
            forall (m :: * -> *) a. Monad m => a -> m a
return []
          else forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM (forall a. Num a => a -> a
abs Int
diff) forall a b. (a -> b) -> a -> b
$ do
            WorkQueue
wq.log String
"Pre-fork"
            forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ WaitingStopStrategy
worker WorkQueue
wq
      WorkQueue -> [Async ()] -> IO ()
controller WorkQueue
wq forall a b. (a -> b) -> a -> b
$ [Async ()]
newWorkers forall a. Semigroup a => a -> a -> a
<> [Async ()]
liveWorkers
    Commands
Stop -> do
      [Async ()]
liveWorkers <- IO [Async ()]
getLiveWorkers
      let currentCount :: Int
currentCount = forall (t :: * -> *) a. Foldable t => t a -> Int
length [Async ()]
liveWorkers
      WorkQueue
wq.log forall a b. (a -> b) -> a -> b
$ String
"Stopping " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
currentCount forall a. Semigroup a => a -> a -> a
<> String
" workers"
      forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
currentCount IO ()
stopOneWorker
      forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Async ()]
liveWorkers forall a. Async a -> IO (Either SomeException a)
waitCatch
      forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar WorkQueue
wq.stopped ()

worker :: WorkQueue -> IO ()
worker :: WaitingStopStrategy
worker WorkQueue
wq = do
  WorkQueue
wq.log String
"New worker"
  let loop :: IO ()
loop = do
        Either () (IO ())
command <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> STM a
readTQueue WorkQueue
wq.queue
        case Either () (IO ())
command of
          Left () -> do
            WorkQueue
wq.log String
"Stopping"
          Right IO ()
act -> WorkQueue
wq.log String
"pop" forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Functor f => f a -> f ()
void (forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny IO ()
act) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkQueue
wq.log String
"poped" forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
  IO ()
loop