module Data.Poolboy
(
PoolboySettings (..),
WorkersCountSettings (..),
defaultPoolboySettings,
poolboySettingsWith,
simpleSerializedLogger,
WorkQueue,
withPoolboy,
newPoolboy,
changeDesiredWorkersCount,
waitReadyQueue,
stopWorkQueue,
isStopedWorkQueue,
WaitingStopStrategy,
waitingStopTimeout,
waitingStopFinishWorkers,
enqueue,
enqueueAfter,
)
where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM.TQueue
import Control.Exception.Safe (bracket, tryAny)
import Control.Monad
import Control.Monad.STM
import Data.Maybe (isNothing)
import System.Timeout (timeout)
data PoolboySettings = PoolboySettings
{ PoolboySettings -> WorkersCountSettings
workersCount :: WorkersCountSettings,
PoolboySettings -> String -> IO ()
log :: String -> IO ()
}
data WorkersCountSettings
=
CapabilitiesWCS
| FixedWCS Int
deriving stock (WorkersCountSettings -> WorkersCountSettings -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkersCountSettings -> WorkersCountSettings -> Bool
$c/= :: WorkersCountSettings -> WorkersCountSettings -> Bool
== :: WorkersCountSettings -> WorkersCountSettings -> Bool
$c== :: WorkersCountSettings -> WorkersCountSettings -> Bool
Eq, Int -> WorkersCountSettings -> ShowS
[WorkersCountSettings] -> ShowS
WorkersCountSettings -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkersCountSettings] -> ShowS
$cshowList :: [WorkersCountSettings] -> ShowS
show :: WorkersCountSettings -> String
$cshow :: WorkersCountSettings -> String
showsPrec :: Int -> WorkersCountSettings -> ShowS
$cshowsPrec :: Int -> WorkersCountSettings -> ShowS
Show)
defaultPoolboySettings :: PoolboySettings
defaultPoolboySettings :: PoolboySettings
defaultPoolboySettings =
PoolboySettings
{ $sel:workersCount:PoolboySettings :: WorkersCountSettings
workersCount = WorkersCountSettings
CapabilitiesWCS,
$sel:log:PoolboySettings :: String -> IO ()
log = \String
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
}
poolboySettingsWith :: Int -> PoolboySettings
poolboySettingsWith :: Int -> PoolboySettings
poolboySettingsWith Int
c = PoolboySettings
defaultPoolboySettings {$sel:workersCount:PoolboySettings :: WorkersCountSettings
workersCount = Int -> WorkersCountSettings
FixedWCS Int
c}
simpleSerializedLogger :: IO (String -> IO ())
simpleSerializedLogger :: IO (String -> IO ())
simpleSerializedLogger = do
MVar ()
logLock <- forall a. a -> IO (MVar a)
newMVar ()
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ \String
x ->
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar ()
logLock forall a b. (a -> b) -> a -> b
$ \() -> do
String -> IO ()
putStrLn String
x
forall (m :: * -> *) a. Monad m => a -> m a
return ()
withPoolboy :: PoolboySettings -> WaitingStopStrategy -> (WorkQueue -> IO a) -> IO a
withPoolboy :: forall a.
PoolboySettings
-> WaitingStopStrategy -> (WorkQueue -> IO a) -> IO a
withPoolboy PoolboySettings
settings WaitingStopStrategy
waitStopWorkQueue = forall (m :: * -> *) a b c.
MonadMask m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (PoolboySettings -> IO WorkQueue
newPoolboy PoolboySettings
settings) (\WorkQueue
wq -> WaitingStopStrategy
stopWorkQueue WorkQueue
wq forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WaitingStopStrategy
waitStopWorkQueue WorkQueue
wq)
newPoolboy :: PoolboySettings -> IO WorkQueue
newPoolboy :: PoolboySettings -> IO WorkQueue
newPoolboy PoolboySettings
settings = do
WorkQueue
wq <-
TQueue Commands
-> TQueue (Either () (IO ()))
-> MVar ()
-> (String -> IO ())
-> WorkQueue
WorkQueue
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IO (TQueue a)
newTQueueIO
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. IO (TQueue a)
newTQueueIO
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. IO (MVar a)
newEmptyMVar
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a. Monad m => a -> m a
return PoolboySettings
settings.log
Int
count <-
case PoolboySettings
settings.workersCount of
WorkersCountSettings
CapabilitiesWCS -> IO Int
getNumCapabilities
FixedWCS Int
x -> forall (m :: * -> *) a. Monad m => a -> m a
return Int
x
WorkQueue -> Int -> IO ()
changeDesiredWorkersCount WorkQueue
wq Int
count
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ WorkQueue -> [Async ()] -> IO ()
controller WorkQueue
wq []
forall (m :: * -> *) a. Monad m => a -> m a
return WorkQueue
wq
changeDesiredWorkersCount :: WorkQueue -> Int -> IO ()
changeDesiredWorkersCount :: WorkQueue -> Int -> IO ()
changeDesiredWorkersCount WorkQueue
wq =
forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.commands forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Commands
ChangeDesiredWorkersCount
stopWorkQueue :: WorkQueue -> IO ()
stopWorkQueue :: WaitingStopStrategy
stopWorkQueue WorkQueue
wq =
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.commands Commands
Stop
isStopedWorkQueue :: WorkQueue -> IO Bool
isStopedWorkQueue :: WorkQueue -> IO Bool
isStopedWorkQueue WorkQueue
wq =
Bool -> Bool
not forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. MVar a -> IO Bool
isEmptyMVar WorkQueue
wq.stopped
type WaitingStopStrategy = WorkQueue -> IO ()
waitingStopFinishWorkers :: WaitingStopStrategy
waitingStopFinishWorkers :: WaitingStopStrategy
waitingStopFinishWorkers WorkQueue
wq =
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
readMVar WorkQueue
wq.stopped
waitingStopTimeout :: Int -> WaitingStopStrategy
waitingStopTimeout :: Int -> WaitingStopStrategy
waitingStopTimeout Int
delay WorkQueue
wq = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. Int -> IO a -> IO (Maybe a)
timeout Int
delay forall a b. (a -> b) -> a -> b
$ WaitingStopStrategy
waitingStopFinishWorkers WorkQueue
wq
enqueue :: WorkQueue -> IO () -> IO ()
enqueue :: WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq =
forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.queue forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right
waitReadyQueue :: WorkQueue -> IO ()
waitReadyQueue :: WaitingStopStrategy
waitReadyQueue WorkQueue
wq = do
MVar ()
ready <- forall a. IO (MVar a)
newEmptyMVar
WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO ()
putMVar MVar ()
ready ()
forall a. MVar a -> IO a
readMVar MVar ()
ready
enqueueAfter :: Foldable f => WorkQueue -> IO () -> f (IO ()) -> IO ()
enqueueAfter :: forall (f :: * -> *).
Foldable f =>
WorkQueue -> IO () -> f (IO ()) -> IO ()
enqueueAfter WorkQueue
wq IO ()
x f (IO ())
xs =
WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq forall a b. (a -> b) -> a -> b
$ do
IO ()
x
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ f (IO ())
xs forall a b. (a -> b) -> a -> b
$ WorkQueue -> IO () -> IO ()
enqueue WorkQueue
wq
data WorkQueue = WorkQueue
{ WorkQueue -> TQueue Commands
commands :: TQueue Commands,
WorkQueue -> TQueue (Either () (IO ()))
queue :: TQueue (Either () (IO ())),
WorkQueue -> MVar ()
stopped :: MVar (),
WorkQueue -> String -> IO ()
log :: String -> IO ()
}
data Commands
= ChangeDesiredWorkersCount Int
| Stop
deriving stock (Int -> Commands -> ShowS
[Commands] -> ShowS
Commands -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Commands] -> ShowS
$cshowList :: [Commands] -> ShowS
show :: Commands -> String
$cshow :: Commands -> String
showsPrec :: Int -> Commands -> ShowS
$cshowsPrec :: Int -> Commands -> ShowS
Show)
controller :: WorkQueue -> [Async ()] -> IO ()
controller :: WorkQueue -> [Async ()] -> IO ()
controller WorkQueue
wq [Async ()]
workers = do
Commands
command <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> STM a
readTQueue WorkQueue
wq.commands
let stopOneWorker :: IO ()
stopOneWorker = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue WorkQueue
wq.queue forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left ()
getLiveWorkers :: IO [Async ()]
getLiveWorkers = forall (m :: * -> *) a.
Applicative m =>
(a -> m Bool) -> [a] -> m [a]
filterM (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Maybe a -> Bool
isNothing forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> IO (Maybe (Either SomeException a))
poll) [Async ()]
workers
WorkQueue
wq.log forall a b. (a -> b) -> a -> b
$ String
"Command: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Commands
command
case Commands
command of
ChangeDesiredWorkersCount Int
n -> do
[Async ()]
liveWorkers <- IO [Async ()]
getLiveWorkers
let diff :: Int
diff = forall (t :: * -> *) a. Foldable t => t a -> Int
length [Async ()]
liveWorkers forall a. Num a => a -> a -> a
- Int
n
[Async ()]
newWorkers <-
if Int
diff forall a. Ord a => a -> a -> Bool
> Int
0
then do
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
diff IO ()
stopOneWorker
forall (m :: * -> *) a. Monad m => a -> m a
return []
else forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM (forall a. Num a => a -> a
abs Int
diff) forall a b. (a -> b) -> a -> b
$ do
WorkQueue
wq.log String
"Pre-fork"
forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ WaitingStopStrategy
worker WorkQueue
wq
WorkQueue -> [Async ()] -> IO ()
controller WorkQueue
wq forall a b. (a -> b) -> a -> b
$ [Async ()]
newWorkers forall a. Semigroup a => a -> a -> a
<> [Async ()]
liveWorkers
Commands
Stop -> do
[Async ()]
liveWorkers <- IO [Async ()]
getLiveWorkers
let currentCount :: Int
currentCount = forall (t :: * -> *) a. Foldable t => t a -> Int
length [Async ()]
liveWorkers
WorkQueue
wq.log forall a b. (a -> b) -> a -> b
$ String
"Stopping " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
currentCount forall a. Semigroup a => a -> a -> a
<> String
" workers"
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
currentCount IO ()
stopOneWorker
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Async ()]
liveWorkers forall a. Async a -> IO (Either SomeException a)
waitCatch
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO Bool
tryPutMVar WorkQueue
wq.stopped ()
worker :: WorkQueue -> IO ()
worker :: WaitingStopStrategy
worker WorkQueue
wq = do
WorkQueue
wq.log String
"New worker"
let loop :: IO ()
loop = do
Either () (IO ())
command <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> STM a
readTQueue WorkQueue
wq.queue
case Either () (IO ())
command of
Left () -> do
WorkQueue
wq.log String
"Stopping"
Right IO ()
act -> WorkQueue
wq.log String
"pop" forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Functor f => f a -> f ()
void (forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny IO ()
act) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkQueue
wq.log String
"poped" forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
IO ()
loop