{-# LANGUAGE CPP #-}
module Data.Array.Repa.Eval.Gang
( theGang
, Gang, forkGang, gangSize, gangIO, gangST)
where
import GHC.IO
import GHC.ST
import GHC.Conc (forkOn)
import Control.Concurrent.MVar
import Control.Exception (assert)
import Control.Monad
import GHC.Conc (numCapabilities)
import System.IO
theGang :: Gang
{-# NOINLINE theGang #-}
theGang :: Gang
theGang
= IO Gang -> Gang
forall a. IO a -> a
unsafePerformIO
(IO Gang -> Gang) -> IO Gang -> Gang
forall a b. (a -> b) -> a -> b
$ do let caps :: Int
caps = Int
numCapabilities
Int -> IO Gang
forkGang Int
caps
data Req
= ReqDo (Int -> IO ())
| ReqShutdown
data Gang
= Gang
{
Gang -> Int
_gangThreads :: !Int
, Gang -> [MVar Req]
_gangRequestVars :: [MVar Req]
, Gang -> [MVar ()]
_gangResultVars :: [MVar ()]
, Gang -> MVar Bool
_gangBusy :: MVar Bool
}
instance Show Gang where
showsPrec :: Int -> Gang -> ShowS
showsPrec Int
p (Gang Int
n [MVar Req]
_ [MVar ()]
_ MVar Bool
_)
= String -> ShowS
showString String
"<<"
ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Int -> ShowS
forall a. Show a => Int -> a -> ShowS
showsPrec Int
p Int
n
ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> ShowS
showString String
" threads>>"
gangSize :: Gang -> Int
gangSize :: Gang -> Int
gangSize (Gang Int
n [MVar Req]
_ [MVar ()]
_ MVar Bool
_)
= Int
n
forkGang :: Int -> IO Gang
forkGang :: Int -> IO Gang
forkGang Int
n
= Bool -> IO Gang -> IO Gang
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0)
(IO Gang -> IO Gang) -> IO Gang -> IO Gang
forall a b. (a -> b) -> a -> b
$ do
[MVar Req]
mvsRequest <- [IO (MVar Req)] -> IO [MVar Req]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence ([IO (MVar Req)] -> IO [MVar Req])
-> [IO (MVar Req)] -> IO [MVar Req]
forall a b. (a -> b) -> a -> b
$ Int -> IO (MVar Req) -> [IO (MVar Req)]
forall a. Int -> a -> [a]
replicate Int
n (IO (MVar Req) -> [IO (MVar Req)])
-> IO (MVar Req) -> [IO (MVar Req)]
forall a b. (a -> b) -> a -> b
$ IO (MVar Req)
forall a. IO (MVar a)
newEmptyMVar
[MVar ()]
mvsDone <- [IO (MVar ())] -> IO [MVar ()]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence ([IO (MVar ())] -> IO [MVar ()]) -> [IO (MVar ())] -> IO [MVar ()]
forall a b. (a -> b) -> a -> b
$ Int -> IO (MVar ()) -> [IO (MVar ())]
forall a. Int -> a -> [a]
replicate Int
n (IO (MVar ()) -> [IO (MVar ())]) -> IO (MVar ()) -> [IO (MVar ())]
forall a b. (a -> b) -> a -> b
$ IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
(MVar Req -> MVar () -> IO (Weak (MVar Req)))
-> [MVar Req] -> [MVar ()] -> IO ()
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m ()
zipWithM_ (\MVar Req
varReq MVar ()
varDone
-> MVar Req -> IO () -> IO (Weak (MVar Req))
forall a. MVar a -> IO () -> IO (Weak (MVar a))
mkWeakMVar MVar Req
varReq (MVar Req -> MVar () -> IO ()
finaliseWorker MVar Req
varReq MVar ()
varDone))
[MVar Req]
mvsRequest
[MVar ()]
mvsDone
(Int -> IO () -> IO ThreadId) -> [Int] -> [IO ()] -> IO ()
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m ()
zipWithM_ Int -> IO () -> IO ThreadId
forkOn [Int
0..]
([IO ()] -> IO ()) -> [IO ()] -> IO ()
forall a b. (a -> b) -> a -> b
$ (Int -> MVar Req -> MVar () -> IO ())
-> [Int] -> [MVar Req] -> [MVar ()] -> [IO ()]
forall a b c d. (a -> b -> c -> d) -> [a] -> [b] -> [c] -> [d]
zipWith3 Int -> MVar Req -> MVar () -> IO ()
gangWorker
[Int
0 .. Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1] [MVar Req]
mvsRequest [MVar ()]
mvsDone
MVar Bool
busy <- Bool -> IO (MVar Bool)
forall a. a -> IO (MVar a)
newMVar Bool
False
Gang -> IO Gang
forall (m :: * -> *) a. Monad m => a -> m a
return (Gang -> IO Gang) -> Gang -> IO Gang
forall a b. (a -> b) -> a -> b
$ Int -> [MVar Req] -> [MVar ()] -> MVar Bool -> Gang
Gang Int
n [MVar Req]
mvsRequest [MVar ()]
mvsDone MVar Bool
busy
gangWorker :: Int -> MVar Req -> MVar () -> IO ()
gangWorker :: Int -> MVar Req -> MVar () -> IO ()
gangWorker Int
threadId MVar Req
varRequest MVar ()
varDone
= do
Req
req <- MVar Req -> IO Req
forall a. MVar a -> IO a
takeMVar MVar Req
varRequest
case Req
req of
ReqDo Int -> IO ()
action
-> do
Int -> IO ()
action Int
threadId
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
varDone ()
Int -> MVar Req -> MVar () -> IO ()
gangWorker Int
threadId MVar Req
varRequest MVar ()
varDone
Req
ReqShutdown
-> MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
varDone ()
finaliseWorker :: MVar Req -> MVar () -> IO ()
finaliseWorker :: MVar Req -> MVar () -> IO ()
finaliseWorker MVar Req
varReq MVar ()
varDone
= do MVar Req -> Req -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Req
varReq Req
ReqShutdown
MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
varDone
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
gangIO :: Gang
-> (Int -> IO ())
-> IO ()
{-# NOINLINE gangIO #-}
gangIO :: Gang -> (Int -> IO ()) -> IO ()
gangIO gang :: Gang
gang@(Gang Int
_ [MVar Req]
_ [MVar ()]
_ MVar Bool
busy) Int -> IO ()
action
= do Bool
b <- MVar Bool -> Bool -> IO Bool
forall a. MVar a -> a -> IO a
swapMVar MVar Bool
busy Bool
True
if Bool
b
then do
Gang -> (Int -> IO ()) -> IO ()
seqIO Gang
gang Int -> IO ()
action
else do
Gang -> (Int -> IO ()) -> IO ()
parIO Gang
gang Int -> IO ()
action
Bool
_ <- MVar Bool -> Bool -> IO Bool
forall a. MVar a -> a -> IO a
swapMVar MVar Bool
busy Bool
False
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
seqIO :: Gang -> (Int -> IO ()) -> IO ()
seqIO :: Gang -> (Int -> IO ()) -> IO ()
seqIO (Gang Int
n [MVar Req]
_ [MVar ()]
_ MVar Bool
_) Int -> IO ()
action
= do Handle -> String -> IO ()
hPutStr Handle
stderr
(String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
[ String
"Data.Array.Repa: Performing nested parallel computation sequentially."
, String
" You've probably called the 'compute' or 'copy' function while another"
, String
" instance was already running. This can happen if the second version"
, String
" was suspended due to lazy evaluation. Use 'deepSeqArray' to ensure"
, String
" that each array is fully evaluated before you 'compute' the next one."
, String
"" ]
(Int -> IO ()) -> [Int] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Int -> IO ()
action [Int
0 .. Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1]
parIO :: Gang -> (Int -> IO ()) -> IO ()
parIO :: Gang -> (Int -> IO ()) -> IO ()
parIO (Gang Int
_ [MVar Req]
mvsRequest [MVar ()]
mvsResult MVar Bool
_) Int -> IO ()
action
= do
(MVar Req -> IO ()) -> [MVar Req] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\MVar Req
v -> MVar Req -> Req -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Req
v ((Int -> IO ()) -> Req
ReqDo Int -> IO ()
action)) [MVar Req]
mvsRequest
(MVar () -> IO ()) -> [MVar ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar [MVar ()]
mvsResult
gangST :: Gang -> (Int -> ST s ()) -> ST s ()
gangST :: Gang -> (Int -> ST s ()) -> ST s ()
gangST Gang
g Int -> ST s ()
p = IO () -> ST s ()
forall a s. IO a -> ST s a
unsafeIOToST (IO () -> ST s ())
-> ((Int -> IO ()) -> IO ()) -> (Int -> IO ()) -> ST s ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Gang -> (Int -> IO ()) -> IO ()
gangIO Gang
g ((Int -> IO ()) -> ST s ()) -> (Int -> IO ()) -> ST s ()
forall a b. (a -> b) -> a -> b
$ ST s () -> IO ()
forall s a. ST s a -> IO a
unsafeSTToIO (ST s () -> IO ()) -> (Int -> ST s ()) -> Int -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> ST s ()
p