module Load.Core (runLoadN, Config (..)) where import Control.Concurrent ( ThreadId, forkIO, threadDelay, ) import Control.Concurrent.STM ( STM, TMVar, atomically, newEmptyTMVar, putTMVar, readTMVar, ) import Control.Concurrent.STM.TBMQueue ( TBMQueue, closeTBMQueue, newTBMQueue, readTBMQueue, writeTBMQueue, ) import Control.Concurrent.STM.TSem ( TSem, newTSem, signalTSem, waitTSem, ) import Control.Exception.Lifted ( finally, onException, ) import Control.Monad ( replicateM_, ) import Control.Monad.Base ( liftBase, ) import Control.Monad.Trans.Control ( MonadBaseControl, liftBaseDiscard, ) import Data.Default ( Default, def, ) import Numeric.Natural data Config m req res col = Config { supplier :: m req, performer :: req -> IO res, collector :: res -> col -> col, workers :: Natural, queue :: Natural, rate :: Natural, burst :: Natural } {- | 'runLoadN n cfg' will generate n requests according to the given config For example, to print random strings to the console: :~$ cat app/example/Main.hs module Main where import Control.Monad import Control.Monad.Trans.State import Load.Core import System.Random main :: IO () main = do let cfg = Config { workers = 1, queue = 1, rate = 2, burst = 2, supplier = randomASCIIString 4, performer = \s -> putStrLn $ "random: " ++ s, collector = \_ _ -> () } rndGen <- getStdGen evalStateT (runLoadN 4 cfg) rndGen randomASCIIString :: Int -> StateT StdGen IO String randomASCIIString n = replicateM n randomChar where randomChar = do gen <- get let (val, nxt) = randomR ('a', 'z') gen put nxt return val :~$ stack run example random: qpur random: rpwn random: ifqy random: cngs -} runLoadN :: (MonadBaseControl IO m, Default col) => Natural -> Config m req res col -> m col runLoadN nn cfg = do let n = toInt nn let w = toInt $ workers cfg let q = toInt $ queue cfg let r = toInt $ rate cfg let b = toInt $ burst cfg let perf = performer cfg let coll = collector cfg let supp = supplier cfg jobq <- atomically' $ newTBMQueue q resq <- atomically' $ newTBMQueue q pulq <- atomically' $ newTBMQueue b mvar <- atomically' newEmptyTMVar sema <- atomically' $ newTSem 0 replicateM_ w $ fork' $ signaling sema $ performRequests perf pulq jobq resq _ <- fork' $ closing jobq $ supplyRequests n supp _ <- fork' $ closing pulq $ startPulse r b _ <- fork' $ closing resq $ \_ -> blockUntilWorkersFinish w sema _ <- fork' $ collectResults coll def resq mvar atomically' . readTMVar $ mvar performRequests :: MonadBaseControl IO m => (req -> IO res) -> TBMQueue () -> TBMQueue req -> TBMQueue res -> m () performRequests perf pulq jobq resq = loop where loop = run `onException` loop run = do mbreq <- atomically' (readTBMQueue jobq) _ <- atomically' (readTBMQueue pulq) case mbreq of Nothing -> return () Just request -> do result <- liftBase $ perf request atomically' $ writeTBMQueue resq result run blockUntilWorkersFinish :: MonadBaseControl IO m => Int -> TSem -> m () blockUntilWorkersFinish w sema = atomically' (replicateM_ w (waitTSem sema)) supplyRequests :: MonadBaseControl IO m => Int -> m req -> TBMQueue req -> m () supplyRequests n supp q = replicateM_ n (supp >>= \r -> atomically' $ writeTBMQueue q r) collectResults :: MonadBaseControl IO m => (res -> col -> col) -> col -> TBMQueue res -> TMVar col -> m () collectResults f z resq mv = do mbnxt <- atomically' $ readTBMQueue resq case mbnxt of Nothing -> atomically' $ putTMVar mv z Just nxt -> let z' = f nxt z in seq z' $ collectResults f z' resq mv startPulse :: MonadBaseControl IO m => Int -> Int -> TBMQueue () -> m () startPulse r b q = loop where loop = do atomically' (replicateM_ b $ writeTBMQueue q ()) threadDelay' tickDelay loop tickDelay = floor $ oncePerSecondDelay / toRational r oncePerSecondDelay = toRational b * microSecondsPerSecond microSecondsPerSecond = 1e6 closing :: (MonadBaseControl IO m) => TBMQueue a -> (TBMQueue a -> m ()) -> m () closing q f = f q `finally` atomically' (closeTBMQueue q) signaling :: (MonadBaseControl IO m) => TSem -> m () -> m () signaling sem f = f `finally` atomically' (signalTSem sem) fork' :: MonadBaseControl IO m => m () -> m ThreadId fork' = liftBaseDiscard forkIO threadDelay' :: MonadBaseControl IO m => Int -> m () threadDelay' d = liftBase $ threadDelay d atomically' :: (MonadBaseControl IO m) => STM a -> m a atomically' stm = liftBase $ atomically stm toInt :: Natural -> Int toInt = fromIntegral . toInteger