module Data.Poolboy
(
PoolboySettings (..),
WorkersCountSettings (..),
defaultPoolboySettings,
poolboySettingsWith,
simpleSerializedLogger,
WorkQueue,
withPoolboy,
newPoolboy,
changeDesiredWorkersCount,
waitReadyQueue,
stopWorkQueue,
isStopedWorkQueue,
WaitingStopStrategy,
waitingStopTimeout,
waitingStopFinishWorkers,
enqueue,
enqueueAfter,
)
where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM.TQueue
import Control.Monad
import Control.Monad.STM
import Data.Maybe (isNothing)
import System.Timeout (timeout)
import UnliftIO.Exception (bracket, tryAny)
data PoolboySettings = PoolboySettings
{ PoolboySettings -> WorkersCountSettings
workersCount :: WorkersCountSettings,
PoolboySettings -> String -> IO ()
log :: String -> IO ()
}
data WorkersCountSettings
=
CapabilitiesWCS
| FixedWCS Int
deriving stock (WorkersCountSettings -> WorkersCountSettings -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkersCountSettings -> WorkersCountSettings -> Bool
$c/= :: WorkersCountSettings -> WorkersCountSettings -> Bool
== :: WorkersCountSettings -> WorkersCountSettings -> Bool
$c== :: WorkersCountSettings -> WorkersCountSettings -> Bool
Eq, Int -> WorkersCountSettings -> ShowS
[WorkersCountSettings] -> ShowS
WorkersCountSettings -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkersCountSettings] -> ShowS
$cshowList :: [WorkersCountSettings] -> ShowS
show :: WorkersCountSettings -> String
$cshow :: WorkersCountSettings -> String
showsPrec :: Int -> WorkersCountSettings -> ShowS
$cshowsPrec :: Int -> WorkersCountSettings -> ShowS
Show)
defaultPoolboySettings :: PoolboySettings
defaultPoolboySettings :: PoolboySettings
defaultPoolboySettings =
PoolboySettings
{ $sel:workersCount:PoolboySettings :: WorkersCountSettings
workersCount = WorkersCountSettings
CapabilitiesWCS,
$sel:log:PoolboySettings :: String -> IO ()
log = \String
x -> seq :: forall a b. a -> b -> b
seq String
x forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return ()
}
poolboySettingsWith :: Int -> PoolboySettings
poolboySettingsWith :: Int -> PoolboySettings
poolboySettingsWith Int
c = PoolboySettings
defaultPoolboySettings {$sel:workersCount:PoolboySettings :: WorkersCountSettings
workersCount = Int -> WorkersCountSettings
FixedWCS Int
c}
simpleSerializedLogger :: IO (String -> IO ())
simpleSerializedLogger :: IO (String -> IO ())
simpleSerializedLogger = do
MVar ()
logLock <- forall a. a -> IO (MVar a)
newMVar ()
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ \String
x ->
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar ()
logLock forall a b. (a -> b) -> a -> b
$ \() -> do
String -> IO ()
putStrLn String
x
forall (m :: * -> *) a. Monad m => a -> m a
return ()
withPoolboy :: PoolboySettings -> WaitingStopStrategy -> (WorkQueue -> IO a) -> IO a
withPoolboy :: forall a.
PoolboySettings
-> WaitingStopStrategy -> (WorkQueue -> IO a) -> IO a
withPoolboy PoolboySettings
settings WaitingStopStrategy
waitStopWorkQueue = forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (PoolboySettings -> IO WorkQueue
newPoolboy PoolboySettings
settings) (\WorkQueue
wq -> WaitingStopStrategy
stopWorkQueue WorkQueue
wq forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WaitingStopStrategy
waitStopWorkQueue WorkQueue
wq)
newPoolboy :: PoolboySettings -> IO WorkQueue
newPoolboy :: PoolboySettings -> IO WorkQueue
newPoolboy PoolboySettings
settings = do
WorkQueue
wq <-
TQueue Commands
-> TQueue (Either () (IO ())) -> MVar () -> QSemN -> WorkQueue
WorkQueue
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IO (TQueue a)
newTQueueIO
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. IO (TQueue a)
newTQueueIO
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. IO (MVar a)
newEmptyMVar
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO QSemN
newQSemN Int
0
Int
count <-
case PoolboySettings
settings.workersCount of
WorkersCountSettings
CapabilitiesWCS -> IO Int
getNumCapabilities
FixedWCS Int
x -> forall (m :: * -> *) a. Monad m => a -> m a
return Int
x
WorkQueue -> Int -> IO ()
changeDesiredWorkersCount WorkQueue
wq Int
count
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$
IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$
WorkQueueControllerState -> IO ()
controller forall a b. (a -> b) -> a -> b
$
WorkQueueControllerState
{ $sel:commands:WorkQueueControllerState :: TQueue Commands
commands = WorkQueue
wq.commands,
$sel:queue:WorkQueueControllerState :: TQueue (Either () (IO ()))
queue = WorkQueue
wq.queue,
$sel:stopped:WorkQueueControllerState :: MVar ()
stopped = WorkQueue
wq.stopped,
$sel:log:WorkQueueControllerState :: String -> IO ()
log = PoolboySettings
settings.log,
$sel:workers:WorkQueueControllerState :: [Async ()]
workers = [],
$sel:waitingWorkers:WorkQueueControllerState :: QSemN
waitingWorkers = WorkQueue
wq.waitingWorkers,
$sel:capabilityCount:WorkQueueControllerState :: Int
capabilityCount = Int
0
}
forall (m :: * -> *) a. Monad m => a -> m a
return WorkQueue
wq
changeDesiredWorkersCount :: WorkQueue -> Int -> IO ()
changeDesiredWorkersCount :: WorkQueue -> Int -> IO ()
changeDesiredWorkersCount WorkQueue
wq =
forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.commands forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Commands
ChangeDesiredWorkersCount
stopWorkQueue :: WorkQueue -> IO ()
stopWorkQueue :: WaitingStopStrategy
stopWorkQueue WorkQueue
wq =
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.commands Commands
Stop
isStopedWorkQueue :: WorkQueue -> IO Bool
isStopedWorkQueue :: WorkQueue -> IO Bool
isStopedWorkQueue WorkQueue
wq =
Bool -> Bool
not forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. MVar a -> IO Bool
isEmptyMVar WorkQueue
wq.stopped
type WaitingStopStrategy = WorkQueue -> IO ()
waitingStopFinishWorkers :: WaitingStopStrategy
waitingStopFinishWorkers :: WaitingStopStrategy
waitingStopFinishWorkers WorkQueue
wq =
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
readMVar WorkQueue
wq.stopped
waitingStopTimeout :: Int -> WaitingStopStrategy
waitingStopTimeout :: Int -> WaitingStopStrategy
waitingStopTimeout Int
delay WorkQueue
wq = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Int -> IO a -> IO (Maybe a)
timeout Int
delay forall a b. (a -> b) -> a -> b
$ WaitingStopStrategy
waitingStopFinishWorkers WorkQueue
wq
enqueue :: WorkQueue -> IO () -> IO ()
enqueue :: WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq =
forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.queue forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right
waitReadyQueue :: WorkQueue -> IO ()
waitReadyQueue :: WaitingStopStrategy
waitReadyQueue WorkQueue
wq = do
QSemN -> Int -> IO ()
waitQSemN WorkQueue
wq.waitingWorkers Int
1
QSemN -> Int -> IO ()
signalQSemN WorkQueue
wq.waitingWorkers Int
1
enqueueAfter :: (Foldable f) => WorkQueue -> IO () -> f (IO ()) -> IO ()
enqueueAfter :: forall (f :: * -> *).
Foldable f =>
WorkQueue -> IO () -> f (IO ()) -> IO ()
enqueueAfter WorkQueue
wq IO ()
x f (IO ())
xs =
WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq forall a b. (a -> b) -> a -> b
$ do
IO ()
x
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ f (IO ())
xs forall a b. (a -> b) -> a -> b
$ WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq
data WorkQueue = WorkQueue
{ WorkQueue -> TQueue Commands
commands :: TQueue Commands,
WorkQueue -> TQueue (Either () (IO ()))
queue :: TQueue (Either () (IO ())),
WorkQueue -> MVar ()
stopped :: MVar (),
WorkQueue -> QSemN
waitingWorkers :: QSemN
}
data Commands
= ChangeDesiredWorkersCount Int
| Stop
deriving stock (Int -> Commands -> ShowS
[Commands] -> ShowS
Commands -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Commands] -> ShowS
$cshowList :: [Commands] -> ShowS
show :: Commands -> String
$cshow :: Commands -> String
showsPrec :: Int -> Commands -> ShowS
$cshowsPrec :: Int -> Commands -> ShowS
Show)
data WorkQueueControllerState = WorkQueueControllerState
{ WorkQueueControllerState -> TQueue Commands
commands :: TQueue Commands,
WorkQueueControllerState -> TQueue (Either () (IO ()))
queue :: TQueue (Either () (IO ())),
WorkQueueControllerState -> MVar ()
stopped :: MVar (),
WorkQueueControllerState -> String -> IO ()
log :: String -> IO (),
WorkQueueControllerState -> [Async ()]
workers :: [Async ()],
WorkQueueControllerState -> QSemN
waitingWorkers :: QSemN,
WorkQueueControllerState -> Int
capabilityCount :: Int
}
controller :: WorkQueueControllerState -> IO ()
controller :: WorkQueueControllerState -> IO ()
controller WorkQueueControllerState
wq = do
Commands
command <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> STM a
readTQueue WorkQueueControllerState
wq.commands
let stopOneWorker :: IO ()
stopOneWorker = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueueControllerState
wq.queue forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left ()
getLiveWorkers :: IO [Async ()]
getLiveWorkers = forall (m :: * -> *) a.
Applicative m =>
(a -> m Bool) -> [a] -> m [a]
filterM (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Maybe a -> Bool
isNothing forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> IO (Maybe (Either SomeException a))
poll) WorkQueueControllerState
wq.workers
prefix :: String
prefix = String
"Controller: "
WorkQueueControllerState
wq.log forall a b. (a -> b) -> a -> b
$ String
prefix forall a. Semigroup a => a -> a -> a
<> String
"Command: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Commands
command
case Commands
command of
ChangeDesiredWorkersCount Int
n -> do
[Async ()]
liveWorkers <- IO [Async ()]
getLiveWorkers
let diff :: Int
diff = forall (t :: * -> *) a. Foldable t => t a -> Int
length [Async ()]
liveWorkers forall a. Num a => a -> a -> a
- Int
n
([Async ()]
newWorkers, Int
newCapabilityCount) <-
if Int
diff forall a. Ord a => a -> a -> Bool
> Int
0
then do
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
diff IO ()
stopOneWorker
forall (m :: * -> *) a. Monad m => a -> m a
return ([], WorkQueueControllerState
wq.capabilityCount)
else do
let newWorkersCount :: Int
newWorkersCount = forall a. Num a => a -> a
abs Int
diff
[Async ()]
newWorkers <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1 .. Int
newWorkersCount] forall a b. (a -> b) -> a -> b
$ \Int
capability -> do
WorkQueueControllerState
wq.log forall a b. (a -> b) -> a -> b
$ String
prefix forall a. Semigroup a => a -> a -> a
<> String
"Pre-fork"
forall a. Int -> IO a -> IO (Async a)
asyncOn (Int
capability forall a. Num a => a -> a -> a
- Int
1) forall a b. (a -> b) -> a -> b
$ WorkQueueWorkerState -> IO ()
worker forall a b. (a -> b) -> a -> b
$ WorkQueueWorkerState {$sel:queue:WorkQueueWorkerState :: TQueue (Either () (IO ()))
queue = WorkQueueControllerState
wq.queue, $sel:waitingWorkers:WorkQueueWorkerState :: QSemN
waitingWorkers = WorkQueueControllerState
wq.waitingWorkers, $sel:log:WorkQueueWorkerState :: String -> IO ()
log = WorkQueueControllerState
wq.log}
forall (m :: * -> *) a. Monad m => a -> m a
return ([Async ()]
newWorkers, WorkQueueControllerState
wq.capabilityCount forall a. Num a => a -> a -> a
+ Int
newWorkersCount)
WorkQueueControllerState -> IO ()
controller forall a b. (a -> b) -> a -> b
$ WorkQueueControllerState
wq {$sel:workers:WorkQueueControllerState :: [Async ()]
workers = [Async ()]
newWorkers forall a. Semigroup a => a -> a -> a
<> [Async ()]
liveWorkers, $sel:capabilityCount:WorkQueueControllerState :: Int
capabilityCount = Int
newCapabilityCount}
Commands
Stop -> do
[Async ()]
liveWorkers <- IO [Async ()]
getLiveWorkers
let currentCount :: Int
currentCount = forall (t :: * -> *) a. Foldable t => t a -> Int
length [Async ()]
liveWorkers
WorkQueueControllerState
wq.log forall a b. (a -> b) -> a -> b
$ String
prefix forall a. Semigroup a => a -> a -> a
<> String
"Stopping " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
currentCount forall a. Semigroup a => a -> a -> a
<> String
" workers"
QSemN -> Int -> IO ()
waitQSemN WorkQueueControllerState
wq.waitingWorkers Int
currentCount
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
currentCount IO ()
stopOneWorker
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar WorkQueueControllerState
wq.stopped ()
data WorkQueueWorkerState = WorkQueueWorkerState
{ WorkQueueWorkerState -> TQueue (Either () (IO ()))
queue :: TQueue (Either () (IO ())),
WorkQueueWorkerState -> QSemN
waitingWorkers :: QSemN,
WorkQueueWorkerState -> String -> IO ()
log :: String -> IO ()
}
worker :: WorkQueueWorkerState -> IO ()
worker :: WorkQueueWorkerState -> IO ()
worker WorkQueueWorkerState
wq = do
String
workerId <- forall a. Show a => a -> String
show forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO ThreadId
myThreadId
let prefix :: String
prefix = String
"Worker [" forall a. Semigroup a => a -> a -> a
<> String
workerId forall a. Semigroup a => a -> a -> a
<> String
"]: "
WorkQueueWorkerState
wq.log forall a b. (a -> b) -> a -> b
$ String
prefix forall a. Semigroup a => a -> a -> a
<> String
"Starting"
let loop :: IO ()
loop = do
QSemN -> Int -> IO ()
signalQSemN WorkQueueWorkerState
wq.waitingWorkers Int
1
Either () (IO ())
command <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> STM a
readTQueue WorkQueueWorkerState
wq.queue
case Either () (IO ())
command of
Left () -> do
WorkQueueWorkerState
wq.log forall a b. (a -> b) -> a -> b
$ String
prefix forall a. Semigroup a => a -> a -> a
<> String
"Stopping"
Right IO ()
act -> do
QSemN -> Int -> IO ()
waitQSemN WorkQueueWorkerState
wq.waitingWorkers Int
1
WorkQueueWorkerState
wq.log (String
prefix forall a. Semigroup a => a -> a -> a
<> String
"pop")
forall (f :: * -> *) a. Functor f => f a -> f ()
void (forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny IO ()
act)
WorkQueueWorkerState
wq.log (String
prefix forall a. Semigroup a => a -> a -> a
<> String
"poped")
IO ()
loop
IO ()
loop