module Control.CUtils.Conc (module Control.CUtils.ThreadPool, ExceptionList(..), ConcException(..), assocFold, Concurrent(..), concF_, concF, conc_, conc, concP, progressConcF, oneOfF, oneOf) where
import Prelude hiding (catch)
import Control.Exception
import Data.Typeable
import Control.Concurrent.QSemN
import Control.Concurrent.Chan
import Control.CUtils.ThreadPool
import GHC.Conc
import Data.Array.IO (newArray_, readArray, writeArray, getElems, IOArray)
import Data.Array
import Data.Array.Unsafe
import Data.Array.MArray
import Data.IORef
import Control.Monad
import Control.Arrow
import System.IO.Unsafe
data ExceptionList = ExceptionList [SomeException] deriving (Show, Typeable)
instance Exception ExceptionList
data ConcException = ConcException deriving (Show, Typeable)
instance Exception ConcException
simpleConc_ mnds = do
sem <- newQSemN 0
mapM_ (\m -> addToPool ?pool (do
m
signalQSemN sem 1))
mnds
waitQSemN sem (length mnds)
divideUp nPieces nVals = zip (0 : divisions) divisions where
divisions = if nPieces >= nVals then
[1..nVals]
else
map (`div` nPieces) $ take nPieces $ iterate (nVals +) nVals
getExceptions exs = do
writeChan exs Nothing
exslst <- let chanToList exslst = do
may <- readChan exs
case may of
Just ex -> case fromException ex of
Just (_ :: ConcException) -> throwIO ex
Nothing -> chanToList (ex : exslst)
Nothing -> return exslst in
chanToList []
unless (null exslst) $ throwIO (ExceptionList exslst)
class Concurrent a where
arr_assocFold :: (?pool :: BoxedThreadPool) => a (b, b) b -> (c -> b) -> a (b, Array Int c) b
arr_concF_ :: (?seq :: Bool, ?pool :: BoxedThreadPool) => a (t, Int) () -> a (t, Int) ()
arr_concF :: (?seq :: Bool, ?pool :: BoxedThreadPool) => a (u, Int) t -> a (u, Int) (Array Int t)
arr_oneOfF :: a (u, Int) b -> a (u, Int) b
instance Concurrent (Kleisli IO) where
arr_assocFold f g = Kleisli $ \(init, parm) -> do
let (lo, hi) = bounds parm
when (lo > hi) $ error "Conc.arr_assocFold: empty list"
exs <- newChan
caps <- getNumCapabilities
let effectiveCaps = ceiling (sqrt (fromIntegral (rangeSize (bounds parm)))) `min` caps
ar <- (newArray_ (0, effectiveCaps 1) :: IO (IOArray Int b))
let
rtnException ex = writeChan exs (Just ex) >> return undefined
innerExHandler m = catch m rtnException
outerExHandler m = catch m (\(_ :: SomeException) -> rtnException (toException ConcException)) in
outerExHandler $ simpleConc_ $ map (\(i, (x, y)) ->
innerExHandler $ foldM (\x -> runKleisli f . (,) x . g . (parm !)) init [x..y] >>= writeArray ar i) $ zip [0..] (divideUp effectiveCaps (rangeSize $ bounds parm))
getExceptions exs
ls <- getElems ar
foldM (curry (runKleisli f)) init ls
arr_concF_ mnds = Kleisli $ \(parm, n) -> do
exs <- newChan
caps <- getNumCapabilities
let
rtnException ex = writeChan exs (Just ex) >> return undefined
innerExHandler m = catch m rtnException
outerExHandler m = catch m (\(_ :: SomeException) -> rtnException (toException ConcException)) in
outerExHandler $
simpleConc_ $ map (\(x, y) -> outerExHandler $ (if ?seq then sequence_ else simpleConc_) $ map (innerExHandler . runKleisli mnds . (,) parm) [x..y1]) $ divideUp caps n
getExceptions exs
arr_concF mnds = Kleisli $ \(parm, n) -> partConcF (0, n 1) (concF_ n) (runKleisli mnds . (,) parm)
arr_oneOfF mnds = Kleisli $ \(parm, n) -> partOneOfF (0, n 1) (runKleisli mnds . (,) parm)
instance Concurrent (->) where
arr_assocFold f g x = unsafePerformIO $ assocFold (\x y -> return $! f (x, y)) g x
arr_concF_ _ = arr (const ())
arr_concF mnds (parm, n) = let ?seq = True in unsafePerformIO $ concF n ((return $!) . mnds . (,) parm)
arr_oneOfF mnds (parm, n) = unsafePerformIO $ oneOfF n ((return $!) . mnds . (,) parm)
assocFold f g = runKleisli (arr_assocFold (Kleisli (uncurry f)) g)
partConc_ f mnds = concF_ (rangeSize (bounds mnds)) $ f . (+ fst (bounds mnds))
concF_ n mnds = runKleisli (arr_concF_ (Kleisli (mnds . snd))) ((), n)
concF n mnds = runKleisli (arr_concF (Kleisli (mnds . snd))) ((), n)
conc_ mnds = partConc_ (mnds !) mnds
unsafeFreeze' :: IOArray Int e -> IO (Array Int e)
unsafeFreeze' = unsafeFreeze
partConcF bnds f mnds = do
res <- newArray_ bnds
f (\i -> do
x <- mnds i
writeArray res i x)
unsafeFreeze' res
conc mnds = partConcF (bounds mnds) (\f -> partConc_ f mnds) (mnds !)
concP m m2 = let ?seq = False in liftM ((\[Left x, Right y] -> (x, y)) . elems)
$ concF 2 (\i -> if i == 0 then
liftM Left m
else
liftM Right m2)
progressConcF n f = do
res <- concF n (\i -> f i >>= \x -> when (i * 80 `mod` n == 0) (putChar '|') >> return x)
putStrLn ""
return res
partOneOfF bnds mnds = do
thds <- newIORef []
chn <- newChan
finally (do
mapM_ (\n -> do
thd <- forkIO (catch (mnds n >>= writeChan chn . Right) (\(ex :: SomeException) -> writeChan chn (Left ex) >> return undefined))
modifyIORef thds (thd:))
(range bnds)
let chanToList n exs = if n == rangeSize bnds then
throwIO (ExceptionList exs)
else readChan chn >>=
either
(chanToList (n + 1) . (:exs))
return in
chanToList 0 [])
(catch (readIORef thds >>= mapM_ killThread) (\(_ :: SomeException) -> throwIO ConcException))
oneOfF n mnds = runKleisli (arr_oneOfF (Kleisli (mnds . snd))) ((), n)
oneOf :: Array Int (IO a) -> IO a
oneOf mnds = partOneOfF (bounds mnds) (mnds !)