module Control.Parallel.Eden.Workpool (
workpool, workpoolSorted, workpoolSortedNonBlock, workpoolReduce,
workpoolAt, workpoolSortedAt, workpoolSortedNonBlockAt, workpoolReduceAt,
workpoolAuxAt,
wpNested,
wpDynNested, wpDNI,
distribWPAt,
masterWorker, mwNested, mwDynNested, mwDNI,
) where
#if defined( __PARALLEL_HASKELL__ ) || defined (NOT_PARALLEL)
import Control.Parallel.Eden
#else
import Control.Parallel.Eden.EdenConcHs
#endif
import Control.Parallel.Eden.Auxiliary
import Control.Parallel.Eden.Map
import Control.Parallel.Eden.Topology
import Data.List
import Data.Maybe(maybeToList,mapMaybe)
import System.IO.Unsafe
import Control.Monad
import Control.Concurrent
workpoolAt :: forall t r . (Trans t, Trans r) =>
Places
-> Int
-> Int
-> (t -> r)
-> [t] -> [r]
workpoolAt pos np prefetch wf tasks
= map snd fromWorkers
where
fromWorkers :: [(Int,r)]
fromWorkers = merge (tagWithPids (parMapAt pos worker taskss))
taskss :: [[t]]
taskss = distribute np (initialReqs ++ newReqs) tasks
initialReqs, newReqs :: [Int]
initialReqs = concat (replicate prefetch [0..np1])
newReqs = map fst fromWorkers
worker = map wf
tagWithPids :: [[r]]
-> [[(Int,r)]]
tagWithPids rss = [ zip (repeat i) rs |(i,rs) <-zip [0..] rss]
workpool :: (Trans t, Trans r) =>
Int
-> Int
-> (t -> r)
-> [t] -> [r]
workpool = workpoolAt [0]
workpoolAuxAt :: (Trans t, Trans r) =>
Places
-> Int
-> Int
-> (t -> r)
-> [t]
-> ([Int],[[Int]],[[r]])
workpoolAuxAt pos np prefetch wf tasks
= (reqs,poss,fromWorkers)
where fromWorkers = parMapAt pos (map wf) taskss
(taskss,poss) = distributeWithPos np reqs tasks
reqs = map snd $ zip tasks $ initialReqs ++ newReqs
initialReqs = concat (replicate prefetch [0..np1])
newReqs = merge $ [ [ i | j<-rs]
| (i,rs) <-zip [0..] fromWorkers]
distributeWithPos :: Int
-> [Int]
-> [t]
-> ([[t]],[[Int]])
distributeWithPos np reqs tasks
= lazyunzip [unzip (taskList reqs tasks [0..] n) | n<-[0..np1]]
where taskList (r:rs) (t:ts) (tag:tags) pe
| pe == r = (t,tag):(taskList rs ts tags pe)
| otherwise = taskList rs ts tags pe
taskList _ _ _ _ = []
lazyunzip = foldr (\(~(a,b)) ~(as,bs) -> (a:as,b:bs)) ([],[])
workpoolSortedAt :: (Trans t, Trans r) =>
Places
-> Int
-> Int
-> (t -> r)
-> [t]
-> [r]
workpoolSortedAt pos np prefetch f tasks = res
where (_, poss, ress) = workpoolAuxAt pos np prefetch f tasks
res = map snd $ mergeByPos ress'
ress' = map (uncurry zip) (zip poss ress)
mergeByPos :: [[(Int,r)]]
-> [(Int,r)]
mergeByPos [] = []
mergeByPos [wOut] = wOut
mergeByPos [w1,w2] = merge2ByTag w1 w2
mergeByPos wOuts = merge2ByTag
(mergeHalf wOuts)
(mergeHalf (tail wOuts))
where mergeHalf = mergeByPos . (takeEach 2)
merge2ByTag [] w2 = w2
merge2ByTag w1 [] = w1
merge2ByTag w1@(r1@(i,_):w1s) w2@(r2@(j,_):w2s)
| i < j = r1: merge2ByTag w1s w2
| i > j = r2: merge2ByTag w1 w2s
| otherwise = error "found tags i == j"
workpoolSorted :: (Trans t, Trans r) =>
Int
-> Int
-> (t -> r)
-> [t]
-> [r]
workpoolSorted = workpoolSortedAt [0]
workpoolSortedNonBlockAt :: (Trans t, Trans r) =>
Places
-> Int
-> Int
-> (t -> r)
-> [t] -> [r]
workpoolSortedNonBlockAt pos np prefetch f tasks
= orderBy fromWorkers reqs
where (reqs, _ ,fromWorkers) = workpoolAuxAt pos np prefetch f tasks
orderBy :: forall idx r . Integral idx =>
[[r]]
-> [idx]
-> [r]
orderBy _ [] = []
orderBy [rs] is = rs
orderBy (xs:rss) is = fst $ foldl (f is) (xs,1) rss
where f :: [idx] -> ([r],idx) -> [r] -> ([r], idx)
f is (xs,a) ys = (m xs ys is, a+1)
where m :: [r] -> [r] -> [idx] -> [r]
m _ _ [] = []
m xs ys (i:is) | i < a = head xs : m (tail xs) ys is
| i==a = head ys : m xs (tail ys) is
| otherwise = m xs ys is
orderBy' :: Integral idx =>
[[r]]
-> [idx]
-> [r]
orderBy' rss [] = []
orderBy' rss (r:reqs)
= let (rss1,(rs2:rss2)) = splitAt (fromIntegral r) rss
in (head rs2): orderBy' (rss1 ++ ((tail rs2):rss2)) reqs
workpoolSortedNonBlock :: (Trans t, Trans r) =>
Int
-> Int
-> (t -> r)
-> [t] -> [r]
workpoolSortedNonBlock = workpoolSortedNonBlockAt [0]
workpoolReduceAt :: forall t r r' . (Trans t, Trans r, Trans r') =>
Places
-> Int
-> Int
-> (r' -> r -> r)
-> r
-> (t -> r')
-> [t]
-> [r]
workpoolReduceAt pos np prefetch rf e wf tasks
= map snd fromWorkers
where
fromWorkers :: [([Int],r)]
fromWorkers = spawnFAt pos (map worker [0..np1]) taskss
taskss = distribute np (initialReqs ++ newReqs) tasks
initialReqs = concat (replicate prefetch [0..np1])
newReqs = merge (map fst fromWorkers)
worker i ts = (map (\r -> rnf r `seq` i) rs, foldr rf e rs)
where rs = map wf ts
workpoolReduce :: forall t r r' . (Trans t, Trans r, Trans r') =>
Int
-> Int
-> (r' -> r -> r)
-> r
-> (t -> r')
-> [t]
-> [r]
workpoolReduce = workpoolReduceAt [0]
wpNested :: forall t r . (Trans t, Trans r) =>
[Int]
-> [Int]
-> (t -> r)
-> [t]
-> [r]
wpNested ns pfs wf initTasks = wpDynNested ns pfs (\t -> (wf t,[])) initTasks
wpDNI :: (Trans t, Trans r) =>
Int
-> Int
-> Int
-> Int
-> (t -> (r,[t]))
-> [t]
-> [r]
wpDNI np levels l_1 pf f tasks
= let nesting = mkNesting np levels l_1
in wpDynNested nesting (mkPFs pf nesting) f tasks
wpDynNested :: forall t r . (Trans t, Trans r) =>
[Int]
-> [Int]
-> (t -> (r,[t]))
-> [t]
-> [r]
wpDynNested ns pfs wf initTasks
= topMasterStride strideTM (head ns) (head pfs) subWF initTasks
where subWF = foldr fld wf' (zip3 stds (tail ns) (tail pfs))
wf' :: [Maybe t] -> [(r,[t],Int)]
wf' xs = map(\ (r,ts) -> (r,ts,0)) (map (wf)(inp xs))
inp ((Just x):rest) = x:inp rest
inp (Nothing:_) = []
(strideTM:stds) = scanr (\x y -> ((y*x)+1)) 1 (tail ns)
fld :: (Int, Int, Int) -> ([Maybe t] -> [(r,[t],Int)])
-> [Maybe t] -> [(r,[t],Int)]
fld (stride,n,pf) wf = mwDynSubStride stride n pf wf
topMasterStride :: forall t r . (Trans t, Trans r) =>
Int -> Int -> Int -> ([Maybe t] -> [(r,[t],Int)]) -> [t] -> [r]
topMasterStride stride branch prefetch wf initTasks = finalResults
where
ress = merge (spawnAt places workers inputs)
places = nextPes branch stride
workers :: [Process [Maybe t] [(Int,(r,[t],Int))]]
workers = [process (zip [i,i..] . wf) | i <- [0..branch1]]
inputs = distribute branch (initReqs ++ reqs) tasks
initReqs = concat (replicate prefetch [0..branch1])
tasks = (map Just initTasks) ++ newTasks
initNumTasks = length initTasks
(finalResults, reqs, newTasks)
= tdetectTop ress initNumTasks
mwDynSubStride :: forall t r . (Trans t, Trans r) =>
Int -> Int -> Int -> ([Maybe t] -> [(r,[t],Int)])
-> [Maybe t] -> [(r,[t],Int)]
mwDynSubStride stride branch prefetch wf initTasks = finalResults where
fromWorkers = map flatten (spawnAt places workers inputs)
places = nextPes branch stride
workers :: [Process [Maybe t] [(Int,(r,[t],Int))]]
workers = [process (zip [i,i..] . wf) | i <- [0..branch1]]
inputs = distribute branch (initReqs ++ reqs) tasks
initReqs = concat (replicate prefetch [0..branch1])
controlInput = merge (map Right initTasks: map (map Left) fromWorkers)
(finalResults, tasks, reqs)
= tcontrol controlInput 0 0 (branch * (prefetch+1)) False
tdetectTop :: [(Int,(r,[t],Int))] -> Int -> ([r], [Int], [Maybe t])
tdetectTop ((req,(r,ts,subHoldsTs)):ress) numTs
| numTs == 1 && null ts && subHoldsTs == 0
= ([r], [], repeat Nothing)
| subHoldsTs == 1
= (r:moreRes, moreReqs, (map Just ts) ++ moreTs)
| otherwise
= (r:moreRes, req:moreReqs, (map Just ts) ++ moreTs)
where
(moreRes, moreReqs, moreTs)
= tdetectTop ress (numTs1+length ts+subHoldsTs)
tcontrol :: [Either (Int,r,[t],Int) (Maybe t)] ->
Int -> Int ->
Int -> Bool ->
([(r,[t],Int)],[Maybe t],[Int])
tcontrol ((Right Nothing):_) 0 _ _ _
= ([],repeat Nothing,[])
tcontrol ((Right (Just t)):ress) numTs hldTs pf even
= let (moreRes, moreTs, reqs) = tcontrol ress (numTs+1) hldTs pf even
in (moreRes, (Just t):moreTs, reqs)
tcontrol ((Left (i,r,ts,subHoldsTs)):ress) numTs hldTs pf even
= let (moreRes, moreTs, reqs)
= tcontrol ress (numTs + differ) (hldTs') pf evenAct
differ = length localTs + subHoldsTs 1
hldTs' = max (hldTs + differ) 0
holdInf | (hldTs+differ+1 > 0) = 1
| otherwise = 0
(localTs,fatherTs,evenAct)
= split numTs pf ts even
newreqs | (subHoldsTs == 0) = i:reqs
| otherwise = reqs
in ((r,fatherTs,holdInf):moreRes,
(map Just localTs) ++ moreTs, newreqs)
tcontrol ((Right Nothing):_) n _ _ _
= error "Received Stop signal, although not finished!"
flatten [] = []
flatten ((i,(r,ts,n)):ps) = (i,r,ts,n) : flatten ps
split :: Int -> Int -> [t] -> Bool ->([t],[t],Bool)
split num pf ts even
| num >= 2*pf = ([],ts,False)
| ((not even)||(num == 1)) = oddEven ts
| otherwise = evenOdd ts
oddEven :: [t] -> ([t],[t],Bool)
oddEven [] = ([],[],False)
oddEven (x:xs) = (x:localT ,fatherT, even)
where (localT,fatherT,even) = evenOdd xs
evenOdd :: [t] -> ([t],[t],Bool)
evenOdd [] = ([],[],True)
evenOdd (x:xs) = (localT, x:fatherT, even)
where (localT,fatherT,even) = oddEven xs
nextPes :: Int -> Int -> [Int]
nextPes n stride | start > noPe = replicate n noPe
| otherwise = concat (replicate n ps)
where ps = cycle (takeWhile (<= noPe) [start,start+stride..])
start = selfPe + 1
mkNesting :: Int -> Int -> Int -> [Int]
mkNesting np 1 _ = [np]
mkNesting np depth level1 = level1:(replicate (depth 2) 2) ++ [numWs]
where
leaves = np level1 * ( 2^(depth1) 1 )
numSubMs = level1 * 2^(depth 2)
numWs = (leaves + numSubMs 1) `div` numSubMs
mkPFs :: Int ->
[Int] ->
[Int]
mkPFs pf nesting
= [ factor * (pf+1) | factor <- scanr1 (*) (tail nesting) ] ++ [pf]
distribWPAt :: (Trans onT, Trans t, Trans r, Trans s, NFData r') =>
Places
-> ((t,s) -> (Maybe (r',s),[t]))
-> (Maybe ofT -> Maybe onT -> [t])
-> ([Maybe (r',s)] -> s -> r)
-> ([t]->[t]->s->[t])
-> ([t]->s->([t],Maybe (t,s)))
-> ([t]->s->([t],[t]))
-> (s->s->Bool)
-> s
-> [ofT]
-> [onT]
-> [r]
distribWPAt places wf inputT resT ttA ttD ttSplit sUpdate st ofTasks onTasks = res where
res = ringFlAt places id id workers onTasks'
workers = zipWith (worker) (True:(repeat False)) ofTasks'
(ofTasks',onTasks') | null ofTasks = (repeat Nothing , map Just onTasks)
| null onTasks = (map Just ofTasks, repeat Nothing )
| otherwise = (map Just ofTasks, map Just onTasks)
worker isFirst ofTasks onTasks ringIn = (resT ress sFinal,init ringOut) where
ts = (inputT ofTasks onTasks)
(ress,ntssNtoMe) = unzip $ genResNReqs $ map wf ts' --seperate
reqList = Me : mergeS [ringIn,
ntssNtoMe] rnf
(ts',ringOut) = control ttA ttSplit ttD sUpdate reqList ts st isFirst
sFinal = getState $ last ringOut
control:: (Trans t, Trans s) =>
([t]->[t]->s->[t]) ->
([t]->s->([t],[t])) ->
([t]->s->([t],Maybe (t,s))) ->
(s->s->Bool)->
[Req [t] s]->[t]->s->Bool-> --reqs,tasks,state,isFirstInRing
([(t,s)],[Req [t] s]) --localTasks,RequestsToRing
control ttA ttSplit ttD sUpdate requests initTasks st isFirst
= distribWork requests initTasks Nothing st (0,0)
where
distribWork (TasksNMe nts:rs) tS replyCh st sNr
= distribWork (Me:rs) (ttA tS nts st) replyCh st sNr
distribWork (NewState st':rs) tS replyCh st sNr
| sUpdate st' st = let (tS',wReqs') =distribWork rs tS replyCh st' sNr
in (tS',(NewState st':wReqs'))
| otherwise = distribWork rs tS replyCh st sNr
distribWork (req@(Others tag tCh):rs) [] replyCh st sNr
| tag==None = (tS',req:wReqs')
| otherwise = (tS', Others Black tCh : wReqs')
where(tS',wReqs') = distribWork rs [] replyCh st sNr
distribWork (Me:rs) [] replyCh st sNr =
new (\reqChan (newTS,replyChan) ->
let (tS',wReqs) = passWhileReceive (mergeS [rs,
newTS `pseq` [TasksNMe newTS]] r0) replyChan st sNr
tag | isFirst = Black
| otherwise = None
in(case replyCh of
Nothing -> (tS', Others tag reqChan : wReqs)
Just replyCh' -> parfill replyCh' (Others tag reqChan)
(tS',wReqs)))
distribWork (Me:rs) tS replyCh st sNr
= let (tS',tDetatch) = ttD tS st
(tsDetatch,wReqs) = case tDetatch of
Nothing -> distribWork (Me:rs) [] replyCh st sNr
Just t -> distribWork rs tS' replyCh st sNr
in ((maybeToList tDetatch)++tsDetatch,wReqs)
distribWork reqs@(Others _ tCh :rs) tS replyCh st (s,r)
= let (holdTs,sendTs) = ttSplit tS st
((tS',wReqs'),replyReq)
= new (\replyChan replyReq' ->
parfill tCh (sendTs,Just replyChan)
((distribWork rs holdTs replyCh st (s+1,r)),replyReq'))
in case sendTs of --ReplyReqToOutp
[] -> distribWork reqs [] replyCh st (s,r)
(_:_) -> (tS',mergeS [[replyReq],wReqs'] rnf)
passWhileReceive (NewState st':rs) replyCh st sNr
| sUpdate st' st =let (tS',wReqs')=passWhileReceive rs replyCh st' sNr
in (tS',(NewState st':wReqs'))
| otherwise = passWhileReceive rs replyCh st sNr
passWhileReceive (req@(Others None tCh):rs) replyCh st sNr
= let (tS',wReqs) = passWhileReceive rs replyCh st sNr
in (tS',req:wReqs)
passWhileReceive (req@(Others Black tCh ):rs) replyCh st (s,r)
| (not isFirst) = (tS',req :wReqs)
| otherwise = (tS',req':wReqs)
where (tS',wReqs) = passWhileReceive rs replyCh st (s,r)
req'= Others (White s r 0 0) tCh
passWhileReceive (Others (White s1 r1 s2 r2) tCh :rs) replyCh st (s,r)
| (not isFirst) = (tS',req':wReqs)
| otherwise = if terminate
then ([],Others Black tCh : (termRing rs ++ [NewState st]))
else (tS',req'':wReqs)
where (tS',wReqs) = passWhileReceive rs replyCh st (s,r)
req' = Others (White (s1+s) (r1+r) s2 r2) tCh
req'' = Others (White s r s1 r1) tCh
terminate = (s1==r1)&&(r1==s2)&&(s2==r2)
passWhileReceive (TasksNMe newTS:rs) replyCh st (s,r)
| null newTS = ([],(termRing rs)
++ [NewState st])
| otherwise = (distribWork (Me:rs) newTS replyCh st (s,r+1))
data Req t s =
Me |
Others { getTag :: Tag,
getReplyChan :: (ChanName (t,Maybe (ChanName(Req t s))))
} |
TasksNMe { getTask :: t} |
NewState { getState :: s }
instance (NFData t, NFData s) => NFData (Req t s)
where
rnf Me = ()
rnf (Others t c) = rnf t `pseq` rnf c
rnf (TasksNMe t) = rnf t
rnf (NewState s) = rnf s
instance (Trans t,Trans s) => Trans (Req t s)
data Tag = Black |
White Int Int Int Int |
None
instance NFData Tag
where rnf Black = ()
rnf None = ()
rnf (White a b c d) = rnf (a,b,c,d)
instance Eq Tag where
Black == Black = True
None == None = True
White a b c d == White a' b' c' d' = (a,b,c,d)==(a',b',c',d')
a == b = False
---------------------auxiliary-----------------
termRing :: (Trans t,Trans s) => [Req [t] s] -> [Req [t] s]
termRing [] = []
termRing (Others Black tCh : _ ) = parfill tCh ([],Nothing) []
termRing (Others None tCh : rs) = parfill tCh ([],Nothing) termRing rs --reply
termRing (_ : rs) = termRing rs
genResNReqs :: (NFData t,NFData r',NFData s)=>
[(Maybe (r',s),[t])] -> [(Maybe (r',s),Req [t] s)]
genResNReqs [] = []
genResNReqs ((res@(Nothing,nts)):ress)
= rnf res `pseq` (Nothing,TasksNMe nts): genResNReqs ress
genResNReqs ((res@(Just (r,st),nts)):ress)
= rnf res `pseq` (Just (r,st),NewState st): genResNReqs ((Nothing,nts):ress)
-----------------------------DEPRECATED---------------------------------
masterWorker :: (Trans a, Trans b) =>
Int -> Int -> (a -> b) -> [a] -> [b]
masterWorker = workpoolSortedNonBlock
mwNested :: forall t r . (Trans t, Trans r) =>
[Int] -> [Int] -> (t -> r) -> [t] -> [r]
mwNested = wpNested
mwDNI :: (Trans t, Trans r) =>
Int
-> Int
-> Int
-> Int
-> (t -> (r,[t]))
-> [t]
-> [r]
mwDNI = wpDNI
mwDynNested :: forall t r . (Trans t, Trans r) =>
[Int]
-> [Int]
-> (t -> (r,[t]))
-> [t]
-> [r]
mwDynNested = wpDynNested