{-# LANGUAGE Trustworthy, ScopedTypeVariables, ConstraintKinds , DeriveDataTypeable #-} -- | A module of concurrent higher order functions. module Control.CUtils.CPUMultiThreading (ExceptionList(..), ConcException(..), ConcurrentMethod, ConcurrentMethodAdapter, module Control.CUtils.ThreadPool, simpleConc_, concurrent_, throwToCallerAdapter_, toNewStyle) where import Control.Exception import Control.Concurrent import Control.Concurrent.Chan import Data.Typeable import Control.Monad.Loops import Data.IORef import Data.List.Extra import Data.Maybe import Control.Monad import Control.Monad.Identity import Control.Category import Control.Arrow import System.IO import Control.CUtils.Semaphore import Control.CUtils.ThreadPool import Prelude hiding (catch, (.), id) -- | For exceptions caused by caller code. data ExceptionList = ExceptionList[SomeException] deriving (Show, Typeable) instance Exception ExceptionList -- | For internal errors. If a procedure throws this, some threads it created may still be running. -- This exception type is provided out of an abundance of caution, in case you want to -- take precautions against the activities of threads that for whatever reason cannot -- be terminated. If thrown it is never among the exceptions listed inside an ExceptionList. data ConcException = ConcException deriving (Show, Typeable) instance Exception ConcException type Over p f t u v w = p v(f w) -> t -> f u type Over' p f t u = Over p f t t u u -- | A type for methods that accept a list of tasks. type ConcurrentMethod t t2 = Over(Kleisli((->) Int)) IO () t () t2 -- | A type for functions that transform a base method, taking a thread pool also. type ConcurrentMethodAdapter t t2 = Pool -> (Int -> ConcurrentMethod t t2) -> Int -> ConcurrentMethod t t2 simpleConc_ :: (Foldable f)=> Pool -> f(IO()) -> () -> IO() -- | Run the collection of tasks in the specified thread pool. Each task does not necessarily -- get its own thread--a best effort is made to spread the load over all threads in the -- thread pool. simpleConc_ ls mnds unit= do sem <- newSem -- The number of tasks successfully spawned is maintained in a reference; in the -- event of an exception this indicates how many tasks to wait for during teardown. ref <- newIORef 0 catch (mapM_(\m ->do addToPoolMulti ls(finally m(putSem sem 1)) modifyIORef' ref succ) mnds) (\(ex :: SomeException) -> do -- Print the exception immediately so that information on the exception is -- available in case one of the threads went away indefinitely. hPrint stderr ex nLen <- readIORef ref takeSem sem nLen throwIO ex) nLen <- readIORef ref takeSem sem nLen chunkSize :: Int chunkSize= 10000 concurrent_ :: Pool -> Int -> ConcurrentMethod() t -- | This function implements some policy for 'simpleConc_'. 'simpleConc_' makes the guarantee -- that the tasks given as an argument, map one-to-one to the tasks run on the thread pool. -- This guarantee is dropped for the function 'concurrent_', which allows more optimization. -- -- In order to get the tasks, the integer argument 'n' is accepted and the functional -- 'mnds' is evaluated in the range 0 to n-1 inclusive. concurrent_ ls n mnds = simpleConc_ ls (map (\ii -> mapM_(runKleisli mnds()) [ii..pred(min n(ii+chunkSize))]) [0,chunkSize..n-1]) -------------------------------------------- -- Exception handling getExceptions exs = do writeChan exs$!mzero ls <- getChanContents exs let exslst = map fromJust . takeWhile isJust$ls when(not(null exslst))$throwIO(ExceptionList exslst) throwToCallerAdapter_ :: Pool -> (t1 -> ConcurrentMethod() ()) -> t1 -> ConcurrentMethod() () -- | An adapter which modifies the argument concurrent method, to implement an exception -- handling policy in which exceptions are collected in a list, which is thrown as an -- exception in the caller. This appropriates the exception handling behavior of old -- versions of this package. throwToCallerAdapter_ _ method arg mnds unit = do exs <- newChan _ <- method arg(arr(f exs).mnds) unit getExceptions exs where f exs mnd = catch mnd(\(ex::SomeException) -> writeChan exs$!return ex) ------------------------------------------ -- Optic adapter toNewStyle :: ((Int -> IO t2) ->IO t) -> ConcurrentMethod t t2 {-# INLINE toNewStyle #-} toNewStyle x x2 unit= x(runKleisli x2 unit)