module Control.Monad.Par.Scheds.Direct (
Sched(..), Par,
IVar(..), IVarContents(..),
runPar,
new, get, put_, fork,
newFull, newFull_, put,
spawn, spawn_, spawnP,
spawn1_
) where
import Control.Applicative
import Control.Concurrent hiding (yield)
import Debug.Trace
import Data.IORef
import Text.Printf
import GHC.Conc
import "mtl" Control.Monad.Cont as C
import qualified "mtl" Control.Monad.Reader as RD
import qualified Data.Sequence as Seq
import System.Random.MWC as Random
import System.IO.Unsafe (unsafePerformIO)
import System.Mem.StableName
import qualified Control.Monad.Par.Class as PC
import qualified Control.Monad.Par.Unsafe as UN
import Control.DeepSeq
#ifdef REACTOR_DEQUE
import Data.Concurrent.Deque.ChaseLev
import Data.Concurrent.Deque.ChaseLev.DequeInstance
import qualified Data.Concurrent.Deque.ReactorDeque as R
import Data.Array.IO
#else
import Data.Concurrent.Deque.Class (WSDeque)
import Data.Concurrent.Deque.Reference.DequeInstance
import Data.Concurrent.Deque.Reference as R
#endif
import Prelude hiding (null)
import qualified Prelude
#ifdef DEBUG
dbg = True
#else
dbg = False
#endif
#define FORKPARENT
#define WAKEIDLE
newtype Par a = Par { unPar :: C.ContT () ROnly a }
deriving (Monad, MonadCont, RD.MonadReader Sched)
type ROnly = RD.ReaderT Sched IO
data Sched = Sched
{
no :: !Int,
#ifdef REACTOR_DEQUE
workpool :: R.Deque IOArray (Par ()),
#else
workpool :: WSDeque (Par ()),
#endif
rng :: HotVar GenIO,
isMain :: Bool,
killflag :: HotVar Bool,
idle :: HotVar [MVar Bool],
scheds :: [Sched]
}
newtype IVar a = IVar (IORef (IVarContents a))
data IVarContents a = Full a | Empty | Blocked [a -> IO ()]
unsafeParIO :: IO a -> Par a
unsafeParIO io = Par (lift$ lift io)
io = unsafeParIO
#ifndef HOTVAR
#define HOTVAR 1
#endif
newHotVar :: a -> IO (HotVar a)
modifyHotVar :: HotVar a -> (a -> (a,b)) -> IO b
modifyHotVar_ :: HotVar a -> (a -> a) -> IO ()
writeHotVar :: HotVar a -> a -> IO ()
readHotVar :: HotVar a -> IO a
#if HOTVAR == 1
type HotVar a = IORef a
newHotVar = newIORef
modifyHotVar = atomicModifyIORef
modifyHotVar_ v fn = atomicModifyIORef v (\a -> (fn a, ()))
readHotVar = readIORef
writeHotVar = writeIORef
instance Show (IORef a) where
show ref = "<ioref>"
hotVarTransaction = error "Transactions not currently possible for IO refs"
readHotVarRaw = readHotVar
writeHotVarRaw = writeHotVar
#elif HOTVAR == 2
#warning "Using MVars for hot atomic variables."
type HotVar a = MVar a
newHotVar x = do v <- newMVar; putMVar v x; return v
modifyHotVar v fn = modifyMVar v (return . fn)
modifyHotVar_ v fn = modifyMVar_ v (return . fn)
readHotVar = readMVar
writeHotVar v x = do swapMVar v x; return ()
instance Show (MVar a) where
show ref = "<mvar>"
hotVarTransaction = error "Transactions not currently possible for MVars"
readHotVarRaw = readHotVar
writeHotVarRaw = writeHotVar
#elif HOTVAR == 3
#warning "Using TVars for hot atomic variables."
type HotVar a = TVar a
newHotVar = newTVarIO
modifyHotVar tv fn = atomically (do x <- readTVar tv
let (x2,b) = fn x
writeTVar tv x2
return b)
modifyHotVar_ tv fn = atomically (do x <- readTVar tv; writeTVar tv (fn x))
readHotVar x = atomically $ readTVar x
writeHotVar v x = atomically $ writeTVar v x
instance Show (TVar a) where
show ref = "<tvar>"
hotVarTransaction = atomically
readHotVarRaw = readTVar
writeHotVarRaw = writeTVar
#endif
popWork :: Sched -> IO (Maybe (Par ()))
popWork Sched{ workpool, no } = do
mb <- R.tryPopL workpool
if dbg
then case mb of
Nothing -> return Nothing
Just x -> do sn <- makeStableName mb
printf " [%d] -> POP work unit %d\n" no (hashStableName sn)
return mb
else return mb
pushWork :: Sched -> Par () -> IO ()
pushWork Sched { workpool, idle, no, isMain } task = do
R.pushL workpool task
when dbg $ do sn <- makeStableName task
printf " [%d] -> PUSH work unit %d\n" no (hashStableName sn)
#ifdef WAKEIDLE
tryWakeIdle idle
#endif
tryWakeIdle idle = do
idles <- readHotVar idle
when (not (Prelude.null idles)) $ do
when dbg$ printf "Waking %d idle thread(s).\n" (length idles)
r <- modifyHotVar idle (\is -> case is of
[] -> ([], return ())
(i:is) -> (is, putMVar i False))
r
rand :: HotVar GenIO -> IO Int
rand ref = Random.uniformR (0, numCapabilities1) =<< readHotVar ref
instance NFData (IVar a) where
rnf _ = ()
runPar userComp = unsafePerformIO $ do
#if __GLASGOW_HASKELL__ >= 701 /* 20110301 */
(main_cpu, _) <- threadCapability =<< myThreadId
#else
let main_cpu = 0
#endif
allscheds <- makeScheds main_cpu
m <- newEmptyMVar
forM_ (zip [0..] allscheds) $ \(cpu,sched) ->
forkOnIO cpu $
if (cpu /= main_cpu)
then do when dbg$ printf " [%d] Entering scheduling loop.\n" cpu
runReaderWith sched $ rescheduleR errK
when dbg$ printf " [%d] Exited scheduling loop. FINISHED.\n" cpu
else do
let userComp' = do when dbg$ io$ printf " [%d] Starting Par computation on main thread.\n" main_cpu
res <- userComp
finalSched <- RD.ask
when dbg$ io$ printf " [%d] Out of Par computation on main thread. Writing MVar...\n" (no finalSched)
when dbg $ io$ sanityCheck allscheds
io$ putMVar m res
RD.runReaderT (C.runContT (unPar userComp') trivialCont) sched
when dbg$ do putStrLn " *** Out of entire runContT user computation on main thread."
sanityCheck allscheds
writeIORef (killflag sched) True
when dbg$ do putStrLn " *** Reading final MVar on main thread."
takeMVar m
sanityCheck :: [Sched] -> IO ()
sanityCheck allscheds = do
forM_ allscheds $ \ Sched{no, workpool} -> do
b <- R.nullQ workpool
when (not b) $ do
printf "WARNING: After main thread exited non-empty queue remains for worker %d\n" no
putStrLn "Sanity check complete."
makeScheds main = do
workpools <- replicateM numCapabilities $ R.newQ
rngs <- replicateM numCapabilities $ Random.create >>= newHotVar
idle <- newHotVar []
killflag <- newHotVar False
let allscheds = [ Sched { no=x, idle, killflag, isMain= (x==main),
workpool=wp, scheds=allscheds, rng=rng
}
| (x,wp,rng) <- zip3 [0..] workpools rngs]
return allscheds
new :: Par (IVar a)
new = io$ do r <- newIORef Empty
return (IVar r)
get iv@(IVar v) = do
callCC $ \cont ->
do
e <- io$ readIORef v
case e of
Full a -> return a
_ -> do
sch <- RD.ask
# ifdef DEBUG
sn <- io$ makeStableName iv
let resched = trace (" ["++ show (no sch) ++ "] - Rescheduling on unavailable ivar "++show (hashStableName sn)++"!")
#else
let resched =
# endif
reschedule
r <- io$ atomicModifyIORef v $ \e -> case e of
Empty -> (Blocked [pushWork sch . cont], resched)
Full a -> (Full a, return a)
Blocked ks -> (Blocked (pushWork sch . cont:ks), resched)
r
unsafePeek iv@(IVar v) = do
e <- io$ readIORef v
case e of
Full a -> return (Just a)
_ -> return Nothing
put_ iv@(IVar v) !content = do
sched <- RD.ask
io$ do
ks <- atomicModifyIORef v $ \e -> case e of
Empty -> (Full content, [])
Full _ -> error "multiple put"
Blocked ks -> (Full content, ks)
#ifdef DEBUG
sn <- makeStableName iv
printf " [%d] Put value %s into IVar %d. Waking up %d continuations.\n"
(no sched) (show content) (hashStableName sn) (length ks)
#endif
mapM_ ($content) ks
return ()
unsafeTryPut iv@(IVar v) !content = do
sched <- RD.ask
io$ do
(ks,res) <- atomicModifyIORef v $ \e -> case e of
Empty -> (Full content, ([], content))
Full x -> (Full x, ([], x))
Blocked ks -> (Full content, (ks, content))
#ifdef DEBUG
sn <- makeStableName iv
printf " [%d] unsafeTryPut: value %s in IVar %d. Waking up %d continuations.\n"
(no sched) (show content) (hashStableName sn) (length ks)
#endif
mapM_ ($content) ks
return res
fork :: Par () -> Par ()
#ifdef FORKPARENT
#warning "FORK PARENT POLICY USED"
fork task = do
sched <- RD.ask
callCC$ \parent -> do
let wrapped = parent ()
io$ pushWork sched wrapped
task
reschedule
io$ putStrLn " !!! ERROR: Should not reach this point #1"
when dbg$ do
sched2 <- RD.ask
io$ printf " called parent continuation... was on cpu %d now on cpu %d\n" (no sched) (no sched2)
#else
fork task = do
sch <- RD.ask
io$ when dbg$ printf " [%d] forking task...\n" (no sch)
io$ pushWork sch task
#endif
reschedule :: Par a
reschedule = Par $ C.ContT rescheduleR
rescheduleR :: ignoredCont -> ROnly ()
rescheduleR k = do
mysched <- RD.ask
when dbg$ liftIO$ printf " [%d] - Reschedule...\n" (no mysched)
mtask <- liftIO$ popWork mysched
case mtask of
Nothing -> do k <- liftIO$ readIORef (killflag mysched)
unless k $ do
liftIO$ steal mysched
#ifdef WAKEIDLE
#endif
rescheduleR errK
Just task -> do
when dbg $ do sn <- liftIO$ makeStableName task
liftIO$ printf " [%d] popped work %d from own queue\n" (no mysched) (hashStableName sn)
let C.ContT fn = unPar task
fn (\ () -> do
sch <- RD.ask
when dbg$ liftIO$ printf " + task finished successfully on cpu %d, calling reschedule continuation..\n" (no sch)
rescheduleR errK)
runReaderWith state m = RD.runReaderT m state
steal :: Sched -> IO ()
steal mysched@Sched{ idle, scheds, rng, no=my_no } = do
when dbg$ printf " [%d] + stealing\n" my_no
i <- getnext (1 :: Int)
go maxtries i
where
maxtries = 20 * numCapabilities
getnext _ = rand rng
go 0 _ =
do m <- newEmptyMVar
r <- modifyHotVar idle $ \is -> (m:is, is)
if length r == numCapabilities 1
then do
when dbg$ printf " [%d] | initiating shutdown\n" my_no
mapM_ (\m -> putMVar m True) r
else do
done <- takeMVar m
if done
then do
when dbg$ printf " [%d] | shutting down\n" my_no
return ()
else do
when dbg$ printf " [%d] | woken up\n" my_no
i <- getnext (1::Int)
go maxtries i
go tries i
| i == my_no = do i' <- getnext i
go (tries1) i'
| otherwise = do
let schd = scheds!!i
when dbg$ printf " [%d] | trying steal from %d\n" my_no (no schd)
let dq = workpool schd
r <- R.tryPopR dq
case r of
Just task -> do
when dbg$ do sn <- makeStableName task
printf " [%d] | stole work (unit %d) from cpu %d\n" my_no (hashStableName sn) (no schd)
runReaderWith mysched $
C.runContT (unPar task)
(\_ -> do
when dbg$ do sn <- liftIO$ makeStableName task
liftIO$ printf " [%d] | DONE running stolen work (unit %d) from %d\n" my_no (hashStableName sn) (no schd)
return ())
Nothing -> do i' <- getnext i
go (tries1) i'
errK = error "this closure shouldn't be used"
trivialCont _ =
#ifdef DEBUG
trace "trivialCont evaluated!"
#endif
return ()
spawn1_ f x =
#ifdef DEBUG
do sn <- io$ makeStableName f
sch <- RD.ask; when dbg$ io$ printf " [%d] spawning fn %d with arg %s\n" (no sch) (hashStableName sn) (show x)
#endif
spawn_ (f x)
newFull_ a = do v <- new
put_ v a
return v
newFull a = deepseq a (newFull_ a)
put v a = deepseq a (put_ v a)
spawn p = do r <- new; fork (p >>= put r); return r
spawn_ p = do r <- new; fork (p >>= put_ r); return r
spawnP a = spawn (return a)
#ifdef DEBUG
put :: (Show a, NFData a) => IVar a -> a -> Par ()
spawn :: (Show a, NFData a) => Par a -> Par (IVar a)
spawn_ :: Show a => Par a -> Par (IVar a)
spawn1_ :: (Show a, Show b) => (a -> Par b) -> a -> Par (IVar b)
put_ :: Show a => IVar a -> a -> Par ()
get :: Show a => IVar a -> Par a
runPar :: Show a => Par a -> a
newFull :: (Show a, NFData a) => a -> Par (IVar a)
newFull_ :: Show a => a -> Par (IVar a)
#else
spawn :: NFData a => Par a -> Par (IVar a)
spawn_ :: Par a -> Par (IVar a)
spawn1_ :: (a -> Par b) -> a -> Par (IVar b)
put_ :: IVar a -> a -> Par ()
put :: NFData a => IVar a -> a -> Par ()
get :: IVar a -> Par a
runPar :: Par a -> a
newFull :: NFData a => a -> Par (IVar a)
newFull_ :: a -> Par (IVar a)
instance PC.ParFuture IVar Par where
get = get
spawn = spawn
spawn_ = spawn_
spawnP = spawnP
instance PC.ParIVar IVar Par where
fork = fork
new = new
put_ = put_
newFull = newFull
newFull_ = newFull_
instance UN.ParUnsafe IVar Par where
unsafePeek = unsafePeek
unsafeTryPut = unsafeTryPut
unsafeParIO = unsafeParIO
#endif
instance Functor Par where
fmap f xs = xs >>= return . f
instance Applicative Par where
(<*>) = ap
pure = return