module Control.Parallel.Eden.Merge (
nmergeIO_E
)
where
#if __GLASGOW_HASKELL__ < 707
import Control.Concurrent(nmergeIO)
nmergeIO_E = nmergeIO
#else
import Control.Concurrent
import Control.Exception( mask_ )
import System.IO.Unsafe
type QSem_E = MVar (Int,[MVar ()])
newQSem_E :: Int -> IO QSem_E
newQSem_E n | n >= 0 = newMVar (n,[])
| otherwise = error "QSem: negative."
qsem_P, qsem_Q :: QSem_E -> IO ()
qsem_P sem = mask_ $
do state <- takeMVar sem
case state of
(0,blocked) -> do b <- newEmptyMVar
putMVar sem (0, blocked ++ [b])
takeMVar b
(n,[]) -> putMVar sem (n1, [])
(n,other) -> error ("QSem: " ++ show (n, length other))
qsem_Q sem = mask_ $
do state <- takeMVar sem
case state of
(n,(blocker:blockers)) -> do putMVar sem (n,blockers)
putMVar blocker ()
(n,[]) -> putMVar sem (n+1, [])
nmergeIO_E :: [[a]] -> IO [a]
nmergeIO_E lss
= do let !len = length lss
tl_node <- newEmptyMVar
tl_list <- newMVar tl_node
sem <- newQSem_E 1
count_var <- newMVar len
mapM_ (forkIO . suckIO_E count_var (tl_list, sem) ) lss
val <- takeMVar tl_node
qsem_Q sem
return val
suckIO_E :: MVar Int -> (MVar (MVar [a]),QSem_E) -> [a] -> IO ()
suckIO_E count_var buff@(tl_list,sem) vs
= do count <- takeMVar count_var
if count == 1
then do node <- takeMVar tl_list
putMVar node vs
putMVar tl_list node
else do putMVar count_var count
case vs of
[] -> do count <- takeMVar count_var
if count == 1
then do node <- takeMVar tl_list
putMVar node []
putMVar tl_list node
else do putMVar count_var (count1)
(x:xs) -> do qsem_P sem
node <- takeMVar tl_list
next <- newEmptyMVar
next_val <- unsafeInterleaveIO
(do y <- takeMVar next
qsem_Q sem
return y)
putMVar node (x:next_val)
putMVar tl_list next
suckIO_E count_var buff xs
#endif