{-# LANGUAGE LambdaCase, ScopedTypeVariables, GADTs, DeriveDataTypeable, Trustworthy #-} module Control.CUtils.ThreadPool (Pool, addToPoolMulti, newPool, stopPool_, -- ** Global thread pools globalPool, -- ** Compatibility shims ThreadPool(..), Interruptible(..), NoPool(..), BoxedThreadPool(..)) where import Control.Exception import Control.Concurrent import Data.Data import Data.IORef import Data.List.Extras.Argmax import Data.Maybe import Control.Monad import Control.Monad.Identity import Control.CUtils.BoundedQueue import System.IO.Unsafe import System.IO data Instruction = NextTask !(IO()) | S deriving (Typeable) data Worker= Worker{ instructions :: !(BoundedQueue Instruction), counter :: !(IORef Int) } deriving (Typeable) instance Data Instruction instance Data Worker newtype Pool = Pool { workers_ :: [Worker] } deriving (Typeable, Data) addToWorker :: Worker -> IO t -> IO() {-# INLINE addToWorker #-} addToWorker mv mnd = do atomicModifyIORef'(counter mv) (flip(,) ().succ) writeRB(instructions mv) (NextTask(void mnd)) addToPoolMulti :: Pool -> IO t -> IO() {-# INLINABLE addToPoolMulti #-} addToPoolMulti (Pool []) _ = throwIO$ErrorCall"addToPoolMulti: pool is empty" addToPoolMulti (Pool ls) mnd = do ls2 <- mapM(readIORef.counter) ls let worker = fst.argmin snd.zip ls$ls2 addToWorker worker mnd -- It's desirable for performance to have each worker thread occupied -- as often as possible with work. The simplest way of achieving this -- is to have a single work queue and have the various threads take -- work off it greedily (or do a workstealing approach) -- , but this introduces a central point of contention. -- An observation: pretty good load balancing can be achieved with an -- imprecise estimate of which threads are occupied; then in principle -- the source of contention is eliminated; the caller of 'addToPoolMulti' -- selects the worker thread to which to assign the work. (The estimate -- is imprecise because the code is reading it without synchronization.) newWorker :: IO Worker newWorker = do rb <- newRB 1000 ref <- newIORef 0 let worker = Worker rb ref _ <- forkIO(loop worker) return$!worker where -- Task processing loop loop worker = readRB(instructions worker) >>= \ case NextTask mnd -> do -- For every task taken off the work queue, decrement the counter. atomicModifyIORef'(counter worker) (flip(,) ().pred) -- The default exception handling policy for worker threads is to -- print the exception to stderr. catch mnd(\(ex::SomeException) -> hPrint stderr ex) loop worker S -> return() newPool :: Int -> IO Pool newPool = liftM Pool. sequence. (`replicate` newWorker) stopWorker :: Worker -> IO() stopWorker mv = writeRB(instructions mv)$!S stopPool_ :: Pool -> IO() -- | Stop each worker thread in turn by sending it a message. stopPool_ = mapM_ stopWorker.workers_ -------------------------------------------- -- Global thread pools globalPool :: Pool {-# NOINLINE globalPool #-} globalPool = unsafePerformIO(getNumCapabilities >>= newPool) -------------------------------------------- -- Compatibility shims -- | Thread pools support some standard operations... class ThreadPool pool where addToPool :: pool -> IO() -> IO() class Interruptible pool where stopPool :: pool -> IO() instance ThreadPool Pool where addToPool = addToPoolMulti instance Interruptible Pool where stopPool = stopPool_ data NoPool = NoPool deriving (Typeable, Data) instance ThreadPool NoPool where addToPool _ = void.forkIO data BoxedThreadPool where BoxedThreadPool :: (ThreadPool pool) => pool -> BoxedThreadPool instance ThreadPool BoxedThreadPool where addToPool(BoxedThreadPool pool) = addToPool pool