{-# LANGUAGE RankNTypes, NamedFieldPuns, BangPatterns,
ExistentialQuantification, CPP #-}
{-# OPTIONS_GHC -Wall -fno-warn-name-shadowing -fno-warn-unused-do-bind #-}
module Control.Monad.Par.Scheds.TraceInternal (
Trace(..), Sched(..), Par(..),
IVar(..), IVarContents(..),
sched,
runPar, runParIO, runParAsync,
new, newFull, newFull_, get, put_, put,
pollIVar, yield, fixPar, FixParException (..)
) where
import Control.Monad as M hiding (mapM, sequence, join)
import Prelude hiding (mapM, sequence, head,tail)
import Data.IORef
import System.IO.Unsafe
#if MIN_VERSION_base(4,4,0)
import GHC.IO.Unsafe (unsafeDupableInterleaveIO)
#else
import GHC.IO.Unsafe (unsafeInterleaveIO)
#endif
import Control.Concurrent hiding (yield)
import GHC.Conc (numCapabilities)
import Control.DeepSeq
import Control.Monad.Fix (MonadFix (mfix))
import Control.Exception (Exception, throwIO, BlockedIndefinitelyOnMVar (..),
catch)
#if !MIN_VERSION_base(4,8,0)
import Control.Applicative
#endif
#if __GLASGOW_HASKELL__ <= 700
import GHC.Conc (forkOnIO)
forkOn = forkOnIO
#endif
data Trace = forall a . Get (IVar a) (a -> Trace)
| forall a . Put (IVar a) a Trace
| forall a . New (IVarContents a) (IVar a -> Trace)
| Fork Trace Trace
| Done
| Yield Trace
| forall a . LiftIO (IO a) (a -> Trace)
sched :: Bool -> Sched -> Trace -> IO ()
sched _doSync queue t = loop t
where
loop t = case t of
New a f -> do
r <- newIORef a
loop (f (IVar r))
Get (IVar v) c -> do
e <- readIORef v
case e of
Full a -> loop (c a)
_other -> do
r <- atomicModifyIORef v $ \e -> case e of
Empty -> (Blocked [c], reschedule queue)
Full a -> (Full a, loop (c a))
Blocked cs -> (Blocked (c:cs), reschedule queue)
r
Put (IVar v) a t -> do
cs <- atomicModifyIORef v $ \e -> case e of
Empty -> (Full a, [])
Full _ -> error "multiple put"
Blocked cs -> (Full a, cs)
mapM_ (pushWork queue. ($a)) cs
loop t
Fork child parent -> do
pushWork queue child
loop parent
Done ->
if _doSync
then reschedule queue
else do putStrLn " [par] Forking replacement thread..\n"
forkIO (reschedule queue); return ()
Yield parent -> do
let Sched { workpool } = queue
atomicModifyIORef workpool $ \ts -> (ts++[parent], ())
reschedule queue
LiftIO io c -> do
r <- io
loop (c r)
data FixParException = FixParException deriving Show
instance Exception FixParException
reschedule :: Sched -> IO ()
reschedule queue@Sched{ workpool } = do
e <- atomicModifyIORef workpool $ \ts ->
case ts of
[] -> ([], Nothing)
(t:ts') -> (ts', Just t)
case e of
Nothing -> steal queue
Just t -> sched True queue t
steal :: Sched -> IO ()
steal q@Sched{ idle, scheds, no=my_no } = do
go scheds
where
go [] = do m <- newEmptyMVar
r <- atomicModifyIORef idle $ \is -> (m:is, is)
if length r == numCapabilities - 1
then do
mapM_ (\m -> putMVar m True) r
else do
done <- takeMVar m
if done
then do
return ()
else do
go scheds
go (x:xs)
| no x == my_no = go xs
| otherwise = do
r <- atomicModifyIORef (workpool x) $ \ ts ->
case ts of
[] -> ([], Nothing)
(x:xs) -> (xs, Just x)
case r of
Just t -> do
sched True q t
Nothing -> go xs
pushWork :: Sched -> Trace -> IO ()
pushWork Sched { workpool, idle } t = do
atomicModifyIORef workpool $ \ts -> (t:ts, ())
idles <- readIORef idle
when (not (null idles)) $ do
r <- atomicModifyIORef idle (\is -> case is of
[] -> ([], return ())
(i:is) -> (is, putMVar i False))
r
data Sched = Sched
{ no :: {-# UNPACK #-} !Int,
workpool :: IORef [Trace],
idle :: IORef [MVar Bool],
scheds :: [Sched]
}
newtype Par a = Par {
runCont :: (a -> Trace) -> Trace
}
instance Functor Par where
fmap f m = Par $ \c -> runCont m (c . f)
instance Monad Par where
return = pure
m >>= k = Par $ \c -> runCont m $ \a -> runCont (k a) c
instance Applicative Par where
(<*>) = ap
pure a = Par ($ a)
instance MonadFix Par where
mfix = fixPar
fixPar :: (a -> Par a) -> Par a
fixPar f = Par $ \ c ->
LiftIO (do
mv <- newEmptyMVar
ans <- unsafeDupableInterleaveIO (readMVar mv
`catch` \ ~BlockedIndefinitelyOnMVar -> throwIO FixParException)
case f ans of
Par q -> pure $ q $ \a -> LiftIO (putMVar mv a) (\ ~() -> c a)) id
#if !MIN_VERSION_base(4,4,0)
unsafeDupableInterleaveIO :: IO a -> IO a
unsafeDupableInterleaveIO = unsafeInterleaveIO
#endif
newtype IVar a = IVar (IORef (IVarContents a))
instance Eq (IVar a) where
(IVar r1) == (IVar r2) = r1 == r2
instance NFData (IVar a) where
rnf !_ = ()
pollIVar :: IVar a -> IO (Maybe a)
pollIVar (IVar ref) =
do contents <- readIORef ref
case contents of
Full x -> return (Just x)
_ -> return (Nothing)
data IVarContents a = Full a | Empty | Blocked [a -> Trace]
{-# INLINE runPar_internal #-}
runPar_internal :: Bool -> Par a -> IO a
runPar_internal _doSync x = do
workpools <- replicateM numCapabilities $ newIORef []
idle <- newIORef []
let states = [ Sched { no=x, workpool=wp, idle, scheds=states }
| (x,wp) <- zip [0..] workpools ]
#if __GLASGOW_HASKELL__ >= 701 /* 20110301 */
(main_cpu, _) <- threadCapability =<< myThreadId
#else
let main_cpu = 0
#endif
m <- newEmptyMVar
forM_ (zip [0..] states) $ \(cpu,state) ->
forkOn cpu $
if (cpu /= main_cpu)
then reschedule state
else do
rref <- newIORef Empty
sched _doSync state $ runCont (x >>= put_ (IVar rref)) (const Done)
readIORef rref >>= putMVar m
r <- takeMVar m
case r of
Full a -> return a
_ -> error "no result"
runPar :: Par a -> a
runPar = unsafePerformIO . runPar_internal True
runParIO :: Par a -> IO a
runParIO = runPar_internal True
runParAsync :: Par a -> a
runParAsync = unsafePerformIO . runPar_internal False
new :: Par (IVar a)
new = Par $ New Empty
newFull :: NFData a => a -> Par (IVar a)
newFull x = Par $ \c -> x `deepseq` New (Full x) c
newFull_ :: a -> Par (IVar a)
newFull_ !x = Par $ New (Full x)
get :: IVar a -> Par a
get v = Par $ \c -> Get v c
put_ :: IVar a -> a -> Par ()
put_ v !a = Par $ \c -> Put v a (c ())
put :: NFData a => IVar a -> a -> Par ()
put v a = Par $ \c -> a `deepseq` Put v a (c ())
yield :: Par ()
yield = Par $ \c -> Yield (c ())