#if defined(NOT_PARALLEL)
#warning !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\
Eden BUILD WITH CONCURRENT HASKELL SIMULATION OF PARALLEL PRIMITIVES, \
DON'T EXPECT SPEEDUPS! USE THE EDEN VERSION OF GHC FROM \
http://www.mathematik.unimarburg.de/~eden \
FOR A PARALLEL BUILD \
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!'
#endif
module Control.Parallel.Eden.Auxiliary (
unshuffle, shuffle,
splitIntoN, unSplit,
chunk, unchunk,
distribute,
lazy,
lazy1ZipWith, lazy2ZipWith,
lazy1Zip, lazy2Zip,
lazyTranspose,
takeEach, transposeRt,
unLiftRD, unLiftRD2, unLiftRD3, unLiftRD4,
spawnPss, fetch2, fetchRDss, mergeS
) where
import Data.List
import Data.Maybe(maybeToList,mapMaybe)
import Control.Concurrent
import System.IO.Unsafe(unsafePerformIO,unsafeInterleaveIO)
#if defined( __PARALLEL_HASKELL__ ) || defined (NOT_PARALLEL)
import Control.Parallel.Eden
#else
import Control.Parallel.Eden.EdenConcHs
#endif
unshuffle :: Int
-> [a]
-> [[a]]
unshuffle n xs = [takeEach n (drop i xs) | i <- [0..n1]]
takeEach :: Int -> [a] -> [a]
takeEach n [] = []
takeEach n (x:xs) = x : takeEach n (drop (n1) xs)
shuffle :: [[a]]
-> [a]
shuffle = concat . transpose
transposeRt :: [[a]] -> [[a]]
transposeRt [] = []
transposeRt ([] : xss) = []
transposeRt ((x:xs) : xss) = (x : [h | (h:_) <- xss]) : transposeRt (xs : [ t | (_:t) <- xss])
splitIntoN :: Int
-> [a]
-> [[a]]
splitIntoN n xs = chunkBalance (l `div` n) (l `mod` n) xs where
l = length xs
unSplit :: [[a]]
-> [a]
unSplit = concat
chunkBalance :: Int
-> Int
-> [a]
-> [[a]]
chunkBalance d = chunk' where
chunk' _ [] = []
chunk' 0 xs = ys : chunk' 0 zs where
(ys,zs) = splitAt d xs
chunk' m xs = ys : chunk' (m1) zs where
(ys,zs) = splitAt (d+1) xs
chunk :: Int
-> [a]
-> [[a]]
chunk l = chunkBalance l 0
unchunk :: [[a]]
-> [a]
unchunk = concat
distribute :: Int
-> [Int]
-> [t]
-> [[t]]
distribute np reqs tasks = [taskList reqs tasks n | n<-[0..np1]]
where taskList (r:rs) (t:ts) pe
| pe == r = t:(taskList rs ts pe)
| otherwise = taskList rs ts pe
taskList _ _ _ = []
lazy :: [a] -> [a]
lazy ~(x:xs) = x : lazy xs
lazy1ZipWith :: (a->b->c) -> [a] -> [b] -> [c]
lazy1ZipWith f xs = zipWith f (lazy xs)
lazy2ZipWith :: (a->b->c) -> [a] -> [b] -> [c]
lazy2ZipWith f xs ys = zipWith f xs (lazy ys)
lazy1Zip :: [a] -> [b] -> [(a,b)]
lazy1Zip xs ys = zip (lazy xs) ys
lazy2Zip :: [a] -> [b] -> [(a,b)]
lazy2Zip xs ys = zip xs (lazy ys)
lazyTranspose :: [[a]] -> [[a]]
lazyTranspose = foldr (lazy2ZipWith (:)) (repeat [])
---------------------------unLiftRDs------------------------------
unLiftRD :: (Trans a, Trans b) =>
(RD a -> RD b)
-> a
-> b
unLiftRD f = fetch . f . release
unLiftRD2 :: (Trans a, Trans b, Trans c)
=> (RD a -> RD b -> RD c)
-> a
-> b
-> c
unLiftRD2 f x = unLiftRD (f (release x))
unLiftRD3 :: (Trans a, Trans b, Trans c, Trans d) => (RD a -> RD b -> RD c -> RD d) -> a -> b -> c -> d
unLiftRD3 f x = unLiftRD2 (f (release x))
unLiftRD4 :: (Trans a, Trans b, Trans c, Trans d, Trans e) => (RD a -> RD b -> RD c -> RD d -> RD e) -> a -> b -> c -> d -> e
unLiftRD4 f x = unLiftRD3 (f (release x))
spawnPss :: (Trans a, Trans b) => [[Process a b]] -> [[a]] -> [[b]]
spawnPss pss xss = runPA $ sequence $ zipWith3 (\is ps xs -> sequence (zipWith3 instantiateAt is ps xs)) iss pss xss where
iss = (unshuffle (length (zip pss xss)) [selfPe+1..])
fetch2 :: (Trans a, Trans b) => RD a -> RD b -> (a,b)
fetch2 a b = runPA $
do a' <- fetchPA a
b' <- fetchPA b
return (a',b')
fetchRDss :: Trans a => [[RD a]] -> [[a]]
fetchRDss rda = runPA $ mapM (mapM fetchPA) rda
mergeS:: [[a]] -> Strategy a -> [a]
mergeS l st = unsafePerformIO (nmergeIO_S l st)
nmergeIO_S :: [[a]] -> Strategy a -> IO [a]
nmergeIO_S lss st
= let
len = length lss
in
newEmptyMVar >>= \ tail_node ->
newMVar tail_node >>= \ tail_list ->
newQSem max_buff_size >>= \ e ->
newMVar len >>= \ branches_running ->
let
buff = (tail_list,e)
in
mapIO (\ x -> forkIO (suckIO_S branches_running buff x st)) lss >>
takeMVar tail_node >>= \ val ->
signalQSem e >>
return val
where
mapIO f xs = sequence (map f xs)
suckIO_S :: MVar Int -> Buffer a -> [a] -> Strategy a -> IO ()
suckIO_S branches_running buff@(tail_list,e) vs st
= do count <- takeMVar branches_running
if count == 1 then
takeMVar tail_list >>= \ node ->
putMVar node vs >>
putMVar tail_list node
else putMVar branches_running count >>
case vs of
[] -> takeMVar branches_running >>= \ val ->
if val == 1 then
takeMVar tail_list >>= \ node ->
putMVar node [] >>
putMVar tail_list node
else
putMVar branches_running (val1)
(x:xs) ->
(st x `seq` waitQSem e) >>
takeMVar tail_list >>= \ node ->
newEmptyMVar >>= \ next_node ->
unsafeInterleaveIO (
takeMVar next_node >>= \ y ->
signalQSem e >>
return y) >>= \ next_node_val ->
putMVar node (x:next_node_val) >>
putMVar tail_list next_node >>
suckIO_S branches_running buff xs st
type Buffer a
= (MVar (MVar [a]), QSem)
max_buff_size :: Int
max_buff_size = 1