{-# LANGUAGE CPP, ScopedTypeVariables #-}
-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Parallel.Eden.Workpool
-- Copyright   :  (c) Philipps Universitaet Marburg 2009-2014
-- License     :  BSD-style (see the file LICENSE)
-- 
-- Maintainer  :  eden@mathematik.uni-marburg.de
-- Stability   :  beta
-- Portability :  not portable
--
-- This Haskell module defines workpool skeletons for dynamic task
-- distribution for the parallel functional language Eden.
--
-- Depends on GHC. Using standard GHC, you will get a threaded simulation of Eden. 
-- Use the forked GHC-Eden compiler from http:\/\/www.mathematik.uni-marburg.de/~eden 
-- for a parallel build.
--
-- Eden Group ( http:\/\/www.mathematik.uni-marburg.de/~eden )


module Control.Parallel.Eden.Workpool (
  -- * Workpool skeletons with a single master
  -- ** Simple workpool skeletons
  -- | The workpool skeletons use the non-deterministic merge function to achieve dynamic load balancing.
                  workpool, workpoolSorted, workpoolSortedNonBlock, workpoolReduce,
  -- ** Simple workpool skeletons - versions using explicit placement
  -- | The workpool skeletons use the non-deterministic merge function to achieve dynamic load balancing.
                  workpoolAt, workpoolSortedAt, workpoolSortedNonBlockAt, workpoolReduceAt,
                  workpoolAuxAt,
  -- * Hierarchical workpool skeleton
  -- |These skeletons can be nested with an arbitrary number of submaster
  -- levels to unload the top master.
                  wpNested, 
  -- ** Hierarchical workpool skeleton with dynamic task creation
  -- |The worker function is extended such that dynamic creation of
  -- new tasks is possible. New tasks are added to the end of the
  -- task list, thus tasks are traversed breath first (not strictly
  -- because of the skeletons' nondeterminism). Furthermore, these
  -- skeletons can be nested with an arbitrary number of submaster
  -- levels to unload the top master.
                  wpDynNested, wpDNI,
-- * Distributed workpool skeletons with state and dynamic task creation
                  distribWPAt,
-- * Deprecated skeletons
                  masterWorker, mwNested, mwDynNested, mwDNI,
                 ) where
#if defined( __PARALLEL_HASKELL__ ) || defined (NOT_PARALLEL)
import Control.Parallel.Eden
#else
import Control.Parallel.Eden.EdenConcHs
#endif
import Control.Parallel.Eden.Auxiliary
import Control.Parallel.Eden.Map
import Control.Parallel.Eden.Topology
import Data.List
import Data.Maybe(maybeToList,mapMaybe)

import System.IO.Unsafe
import Control.Monad

import Control.Concurrent


-- | Simple workpool (result list in non-deterministic order)
-- This version takes places for instantiation.
--
-- Notice: Result list in non-deterministic order.
workpoolAt :: forall t r . (Trans t, Trans r) =>
	    Places
            -> Int        -- ^number of child processes (workers)
	    -> Int        -- ^prefetch of tasks (for workers)
	    -> (t -> r)   -- ^worker function (mapped to tasks) 
	    -> [t] -> [r] -- ^what to do
workpoolAt pos np prefetch wf tasks
   = map snd fromWorkers
    where   
            fromWorkers :: [(Int,r)]
            fromWorkers = merge (tagWithPids (parMapAt pos worker taskss))
            taskss :: [[t]]
            taskss      = distribute np (initialReqs ++ newReqs) tasks
            initialReqs, newReqs :: [Int]
            initialReqs = concat (replicate prefetch [0..np-1])
            newReqs     = map fst fromWorkers
            worker      = map wf


-- | Tag each element of the inner list with the id of its inner list.
tagWithPids :: [[r]]          -- ^untagged input
               -> [[(Int,r)]] -- ^tagged output
tagWithPids rss = [ zip (repeat i) rs |(i,rs) <-zip [0..] rss] 


-- | Simple workpool (result list in non-deterministic order)
--
-- Notice: Result list in non-deterministic order.
workpool :: (Trans t, Trans r) =>
	    Int              -- ^number of child processes (workers)
	    -> Int           -- ^prefetch of tasks (for workers)
	    -> (t -> r)      -- ^worker function (mapped to tasks) 
	    -> [t] -> [r]    -- ^what to do
workpool = workpoolAt [0]



-- | Workpool version with one result stream for each worker and meta
-- information about the task distribution.
-- This meta-skeleton can be used to define workpool-skeletons which
-- can reestablish the result list order.
--
-- Notice: Result list in non-deterministic order.
workpoolAuxAt :: (Trans t, Trans r) =>
	    Places
            -> Int                   -- ^number of child processes
                                     -- (workers)
	    -> Int                   -- ^prefetch of tasks (for
                                     -- workers)
            -> (t -> r)              -- ^worker function (tasks to
                                     -- results mapping)
            -> [t]                   -- ^tasks
            -> ([Int],[[Int]],[[r]]) -- ^(input distribution (input i
                                     -- is in sub-list distribs!i),
                                     -- task positions (element i of
                                     -- result-sub-list j was in the
                                     -- input list at (poss!j)!i ),
                                     -- result streams of workers)
workpoolAuxAt pos np prefetch wf tasks
   = (reqs,poss,fromWorkers)
    where   fromWorkers     = parMapAt pos (map wf) taskss
            (taskss,poss)   = distributeWithPos np reqs tasks
            -- generate only as many reqs as there are tasks
            reqs            = map snd $ zip tasks $ initialReqs ++ newReqs
            initialReqs     = concat (replicate prefetch [0..np-1])
            newReqs         = merge $ [ [ i | j<-rs] 
                                      | (i,rs) <-zip [0..] fromWorkers]


-- | Task distribution according to worker requests. Returns additionally to @ distribute @ a nested list of the positions to indicate where the tasks have been located in the original list
--
distributeWithPos ::  Int  -- ^number of workers
                      -> [Int] -- ^ request stream (worker IDs ranging from 0 to n-1)
                      -> [t]   -- ^ task list
                     -> ([[t]],[[Int]]) -- ^(task positions in original list, task distribution), each inner list for one worker.
distributeWithPos np reqs tasks 
   = unzip [unzip (taskList reqs tasks [0..] n) | n<-[0..np-1]]
    where taskList (r:rs) (t:ts) (tag:tags) pe
                        | pe == r    = (t,tag):(taskList rs ts tags pe)
                        | otherwise  =  taskList rs ts tags pe
          taskList _    _    _    _  = []


-- | Sorted workpool (results in the order of the tasks).
-- This version takes places for instantiation. 
workpoolSortedAt  :: (Trans t, Trans r) =>
                     Places
                     -> Int        -- ^number of child processes
                                   -- (workers)
                     -> Int        -- ^prefetch of tasks (for workers)
                     -> (t -> r)   -- ^worker function
                     -> [t]        -- ^tasks
                     -> [r]        -- ^results
workpoolSortedAt pos np prefetch f tasks = res
    where (_, poss, ress) = workpoolAuxAt pos np prefetch f tasks
          res = map snd $ mergeByPos ress'
          ress' = map (uncurry zip) (zip poss ress)


-- | Join sorted lists into one sorted list.
--  This function uses a balanced binary combination scheme to merge sublists.
mergeByPos :: [[(Int,r)]] -- ^ tagged input 
              -> [(Int,r)] -- ^ output sorted by tags
mergeByPos [] = []
mergeByPos [wOut] = wOut
mergeByPos [w1,w2] = merge2ByTag w1 w2
mergeByPos wOuts = merge2ByTag
                      (mergeHalf wOuts)
                      (mergeHalf (tail wOuts))
    where mergeHalf = mergeByPos . (takeEach 2)

merge2ByTag [] w2 = w2
merge2ByTag w1 [] = w1
merge2ByTag w1@(r1@(i,_):w1s) w2@(r2@(j,_):w2s)
                        | i < j = r1: merge2ByTag w1s w2
                        | i > j = r2: merge2ByTag w1 w2s
                        | otherwise = error "found tags i == j"



-- | Sorted workpool: Efficient implementation using a the
-- distribution lookup list.
--
-- Notice: Results in the order of the tasks.
workpoolSorted  :: (Trans t, Trans r) =>
	    Int             -- ^number of child processes (workers)
	    -> Int          -- ^prefetch of tasks (for workers)
            -> (t -> r)     -- ^worker function (mapped to tasks)  
	    -> [t]          -- ^tasks
            -> [r]          -- ^results
workpoolSorted = workpoolSortedAt [0]

-- | Non-blocking sorted workpool. Result list is structurally defined
-- up to the position where tasks are distributed, independent of the
-- received worker results. This version needs still performance
-- testing.
--
-- Notice: Results in the order of the tasks.
workpoolSortedNonBlockAt  :: (Trans t, Trans r) =>
	    Places
            -> Int         -- ^number of child processes (workers)
	    -> Int         -- ^prefetch of tasks (for workers)
            -> (t -> r)    -- ^worker function (mapped to tasks)  
	    -> [t] -> [r]  -- ^what to do
workpoolSortedNonBlockAt pos np prefetch f tasks
  = orderBy fromWorkers reqs
   where (reqs, _ ,fromWorkers) = workpoolAuxAt pos np prefetch f tasks


-- | Orders a nested list (sublist are ordered) by a given distribution (non-blocking on result elements of other sub lists)
orderBy :: forall idx r . Integral idx => 
           [[r]]    -- ^ nested input list (@inputss@)
           -> [idx] -- ^ distribution (result i is in sublist @inputss!(idxs!i)@)
           -> [r]   -- ^ ordered result list
orderBy _   [] = []
orderBy [rs] is = rs
orderBy (xs:rss) is =  fst $ foldl (f is) (xs,1) rss
  where f :: [idx] -> ([r],idx) -> [r] -> ([r], idx)
        f is (xs,a) ys = (m xs ys is, a+1)
          where m :: [r] -> [r] -> [idx] -> [r]
                m _  _  [] = []
                m xs ys (i:is) | i < a = head xs : m (tail xs) ys is
                               | i==a  = head ys : m xs (tail ys) is
                               | otherwise = m xs ys is
                                              
                                                       
-- | orders a nested list by a given distribution (alternative code)
orderBy' :: Integral idx => 
                       [[r]]    -- ^ nested input list (@inputss@)
                       -> [idx] -- ^ distribution (result i is in sublist @inputss!(idxs!i)@)
                       -> [r]   -- ^ ordered result list
orderBy' rss [] = []
orderBy' rss (r:reqs)
  = let (rss1,(rs2:rss2)) = splitAt (fromIntegral r) rss
    in  (head rs2): orderBy' (rss1 ++ ((tail rs2):rss2)) reqs


-- | Non-blocking sorted workpool (results in the order of the
-- tasks). Result list is structurally defined up to the position
-- where tasks are distributed, independent of the received worker
-- results. This version needs still performance testing.  This
-- version takes places for instantiation.
--
--  Notice: Results in the order of the tasks.
workpoolSortedNonBlock :: (Trans t, Trans r) =>
	    Int             -- ^number of child processes (workers)
	    -> Int          -- ^prefetch of tasks (for workers)
            -> (t -> r)     -- ^worker function (mapped to tasks)  
	    -> [t] -> [r]   -- ^what to do
workpoolSortedNonBlock = workpoolSortedNonBlockAt [0]





-- | Simple workpool with additional reduce function for worker outputs.
-- This version takes places for instantiation.
--
-- Notice: Result list in non-deterministic order.
workpoolReduceAt :: forall t r r' . (Trans t, Trans r, Trans r') =>
	    Places
            -> Int             -- ^number of child processes (workers)
	    -> Int             -- ^prefetch of tasks (for workers)
	    -> (r' -> r -> r)  -- ^reduce function
            -> r               -- ^neutral for reduce function
            -> (t -> r')       -- ^worker function (mapped to tasks) 
	    -> [t]             -- ^tasks 
            -> [r]             -- ^results (one from each worker)
workpoolReduceAt pos np prefetch rf e wf tasks
   = map snd fromWorkers
    where   
            fromWorkers :: [([Int],r)]
            fromWorkers = spawnFAt pos (map worker [0..np-1]) taskss
            taskss      = distribute np (initialReqs ++ newReqs) tasks
            initialReqs = concat (replicate prefetch [0..np-1])
            newReqs     = merge (map fst fromWorkers)
            worker i ts = (map (\r -> rnf r `seq` i) rs, foldr rf e rs)
                 where rs = map wf ts


-- | Simple workpool  with additional reduce function for worker outputs.
-- This version takes places for instantiation.
--
-- Notice: Result list in non-deterministic order.
workpoolReduce :: forall t r r' . (Trans t, Trans r, Trans r') =>
            Int                -- ^number of child processes (workers)
	    -> Int             -- ^prefetch of tasks (for workers)
	    -> (r' -> r -> r)  -- ^reduce function
            -> r               -- ^neutral for reduce function	    
            -> (t -> r')       -- ^worker function (mapped to tasks) 
	    -> [t]             -- ^tasks 
            -> [r]             -- ^results (one from each worker)
workpoolReduce = workpoolReduceAt [0]

-- |Hierachical WP-Skeleton. The worker
-- function is mapped to the worker input stream (list type). A worker
-- produces a result. The workers are located on the leaves of a
-- WP-hierarchy, in the intermediate levels are submasters which unload
-- the master by streaming 'result' streams of their child
-- processes into a single result stream.
--
-- Notice: Result list in non-deterministic order.
wpNested :: forall t r . (Trans t, Trans r) => 
               [Int]             -- ^branching degrees: the i-th
                                 -- element defines the branching
                                 -- degree of for the i-th level of
                                 -- the WP-hierarchy. Use a singleton
                                 -- list for a flat MW-Skeleton.
               -> [Int]          -- ^Prefetches for the
                                 -- sub-master/worker levels
               -> (t -> r)       -- ^worker function
               -> [t]            -- ^initial tasks
               -> [r]            -- ^results
wpNested ns pfs wf initTasks = wpDynNested ns pfs (\t -> (wf t,[])) initTasks


-- |Simple interface for 'wpDynNested'. Parameters are the number of child processes, the first
-- level branching degree, the nesting depth (use 1 for a
-- single master), and the task prefetch amount for the worker level.
-- All processes that are not needed for the submasters are
-- used for the workers. If the number of submasters in the last level
-- and the number of remaining child processes are prime to each
-- other, then the next larger divisor is chosen for the number of
-- workers.
--
-- Notice: Result list in non-deterministic order.
wpDNI :: (Trans t, Trans r) =>
         Int               -- ^number of processes (submasters and workers)
         -> Int            -- ^nesting depth
         -> Int            -- ^branching degree of the first submaster
                           -- level (further submaster levels are
                           -- branched binary)
         -> Int            -- ^task prefetch for the workers
         -> (t -> (r,[t])) -- ^worker function - produces a tuple of
                           -- result and tasks for the processed task
         -> [t]            -- ^initial tasks
         -> [r]            -- ^results
wpDNI np levels l_1 pf f tasks 
    = let nesting = mkNesting np levels l_1
      in  wpDynNested nesting (mkPFs pf nesting) f tasks
          

-- |Hierachical WP-Skeleton with dynamic task creation. The worker
-- function is mapped to the worker input stream (list type). A worker
-- produces a tuple of result and dynamicly created tasks for each
-- processed task. The workers are located on the leaves of a
-- WP-hierarchy, in the intermediate levels are submasters which unload
-- the master by streamlining 'result/newtask' streams of their child
-- processes into a single result/newtask stream. Furthermore, the
-- submasters retain locally half of the tasks which are 
-- dynamically created by the workers in their subtree.
--
-- Notice: Result list in non-deterministic order.
wpDynNested :: forall t r . (Trans t, Trans r) => 
               [Int]             -- ^branching degrees: the i-th
                                 -- element defines the branching
                                 -- degree of for the i-th level of
                                 -- the MW-hierarchy. Use a singleton
                                 -- list for a flat MW-Skeleton.
               -> [Int]          -- ^Prefetches for the
                                 -- sub-master/worker levels
               -> (t -> (r,[t])) -- ^worker function - produces a
                                 -- tuple of result and new tasks for
                                 -- the processed task
               -> [t]            -- ^initial tasks
               -> [r]            -- ^results
wpDynNested ns pfs wf initTasks 
  = topMasterStride strideTM (head ns) (head pfs) subWF initTasks
    where subWF = foldr fld wf' (zip3  stds (tail ns) (tail pfs))
          wf' :: [Maybe t] -> [(r,[t],Int)]
          -- explicit Nothing-Request after each result
          wf' xs = map(\ (r,ts) -> (r,ts,0)) (map (wf)(inp xs))
          inp ((Just x):rest) =  x:inp rest
          inp (Nothing:_) = [] -- STOP!!!
          -- placement:
          (strideTM:stds) = scanr (\x y -> ((y*x)+1)) 1 (tail ns)
          fld :: (Int, Int, Int) -> ([Maybe t] -> [(r,[t],Int)])
                 -> [Maybe t] -> [(r,[t],Int)]
          fld (stride,n,pf) wf = mwDynSubStride stride n pf wf



--------------------------mwDynNested auxiliary---------------------------------

topMasterStride :: forall t r . (Trans t, Trans r) =>
         Int -> Int -> Int -> ([Maybe t] -> [(r,[t],Int)]) -> [t] -> [r]
topMasterStride stride branch prefetch wf initTasks = finalResults
 where
   -- identical to static task pool except for the type of workers
   ress         =  merge (spawnAt places workers inputs)
   places       =  nextPes branch stride
   workers      :: [Process [Maybe t] [(Int,(r,[t],Int))]]
   workers      =  [process (zip [i,i..] . wf) | i <- [0..branch-1]]
   inputs       =  distribute branch (initReqs ++ reqs) tasks
   initReqs     =  concat (replicate prefetch [0..branch-1])
   -- additions for task queue management and
   -- termination detection
   tasks        =  (map Just initTasks) ++ newTasks
   -----------------------------
   initNumTasks =  length initTasks
   -- => might lead to deadlock!
   -----------------------------
   (finalResults, reqs, newTasks) 
                = tdetectTop ress initNumTasks


mwDynSubStride ::  forall t r . (Trans t, Trans r) =>
            Int -> Int -> Int -> ([Maybe t] -> [(r,[t],Int)])
            -> [Maybe t] -> [(r,[t],Int)]
mwDynSubStride stride branch prefetch wf initTasks = finalResults where
  fromWorkers  = map flatten (spawnAt places workers inputs)
  places       =  nextPes branch stride
  workers      :: [Process [Maybe t] [(Int,(r,[t],Int))]]
  workers      =  [process (zip [i,i..] . wf) | i <- [0..branch-1]]
  inputs       =  distribute branch (initReqs ++ reqs) tasks
  initReqs     =  concat (replicate prefetch [0..branch-1])
  -- task queue management
  controlInput =  merge (map Right initTasks: map (map Left) fromWorkers)
  (finalResults, tasks, reqs)
               = tcontrol controlInput 0 0 (branch * (prefetch+1)) False


-- task queue control for termination detection
tdetectTop :: [(Int,(r,[t],Int))] -> Int -> ([r], [Int], [Maybe t])
tdetectTop ((req,(r,ts,subHoldsTs)):ress) numTs
  | numTs == 1 && null ts && subHoldsTs == 0
    = ([r], [], repeat Nothing) -- final result
  | subHoldsTs == 1
    = (r:moreRes, moreReqs,  (map Just ts) ++ moreTs)
  | otherwise
    = (r:moreRes, req:moreReqs, (map Just ts) ++ moreTs)
  where --localNumTaks is 0 or 1, if it's 1 -> no Request 
        -- -> numTs will not be decreased
    (moreRes, moreReqs,  moreTs) 
      = tdetectTop ress (numTs-1+length ts+subHoldsTs)


tcontrol :: [Either (Int,r,[t],Int) (Maybe t)] -> -- controlInput
         Int -> Int ->                   -- task / hold counter
         Int -> Bool ->                  -- prefetch, split mode
         ([(r,[t],Int)],[Maybe t],[Int]) -- (results,tasks,requests)
tcontrol ((Right Nothing):_) 0 _ _ _
  = ([],repeat Nothing,[])               -- Final termination
tcontrol ((Right (Just t)):ress) numTs hldTs pf even  -- task from above
  = let (moreRes, moreTs, reqs) = tcontrol ress (numTs+1) hldTs pf even
    in (moreRes, (Just t):moreTs, reqs)
tcontrol ((Left (i,r,ts,subHoldsTs)):ress) numTs hldTs pf even
    =  let (moreRes, moreTs, reqs)
             = tcontrol ress (numTs + differ) (hldTs') pf evenAct
           differ = length localTs + subHoldsTs - 1
           hldTs' = max (hldTs + differ) 0
           holdInf | (hldTs+differ+1 > 0) = 1
                   | otherwise            = 0
           (localTs,fatherTs,evenAct)
             = split numTs pf ts even -- part of tasks to parent
           newreqs | (subHoldsTs == 0) = i:reqs 
                   | otherwise         = reqs -- no tasks kept below?
       in ((r,fatherTs,holdInf):moreRes,
           (map Just localTs) ++ moreTs, newreqs)

-- error case, not shown in paper
tcontrol ((Right Nothing):_) n _ _ _
  = error "Received Stop signal, although not finished!"
    
    
flatten [] = []
flatten ((i,(r,ts,n)):ps) = (i,r,ts,n) : flatten ps

split :: Int -> Int -> [t] -> Bool ->([t],[t],Bool)
split num pf ts even-- = splitAt (2*pf - num) ts
    | num >= 2*pf      = ([],ts,False)
    --no tasks or odd -> keep first 
    | ((not even)||(num == 1)) = oddEven ts
    | otherwise                = evenOdd ts
    --  | num < pf `div` 2 = (ts,[])


oddEven :: [t] -> ([t],[t],Bool)
oddEven []     = ([],[],False)
oddEven (x:xs) = (x:localT ,fatherT, even)
    where (localT,fatherT,even) = evenOdd xs

evenOdd :: [t] -> ([t],[t],Bool)
evenOdd []   = ([],[],True)
evenOdd (x:xs) = (localT, x:fatherT, even)
    where (localT,fatherT,even) = oddEven xs

nextPes :: Int -> Int -> [Int]
nextPes n stride | start > noPe = replicate n noPe
                 | otherwise    = concat (replicate n ps)
    where ps    = cycle (takeWhile (<= noPe) [start,start+stride..])
          start = selfPe + 1
          
mkNesting :: Int -> Int -> Int -> [Int]
mkNesting np 1 _ = [np]
mkNesting np depth level1 = level1:(replicate (depth - 2) 2) ++ [numWs]
  where -- leaves   = np - #submasters
        leaves      = np - level1 * ( 2^(depth-1) - 1 )
        -- num of lowest submasters
        numSubMs    = level1 * 2^(depth - 2)
        -- workers per branch (rounded up)
        numWs       = (leaves + numSubMs - 1) `div` numSubMs

mkPFs :: Int ->    -- prefetch value for worker processes
         [Int] ->  -- branching per level top-down
         [Int]     -- list of prefetches
mkPFs pf nesting
    = [ factor * (pf+1) | factor <- scanr1 (*) (tail nesting) ] ++ [pf]



---------------------------distributed workpool skeletons-------------------------------------------

-- | A distributed workpool skeleton that uses task generation and a global state (s) with a total order.
-- Split and Detatch policy must give tasks away (may not produce empty lists), unless all tasks are pruned!
distribWPAt :: (Trans onT, Trans t, Trans r, Trans s, NFData r') =>
       Places                             -- ^ where to put workers
       -> ((t,s) -> (Maybe (r',s),[t]))   -- ^ worker function
       -> (Maybe ofT -> Maybe onT -> [t]) -- ^ input transformation (e.g. (fetch . fromJust . snd) for online input of type [RD[t]])
       -> ([Maybe (r',s)] -> s -> r)      -- ^ result transformation (prior to release results in the workers)
       -> ([t]->[t]->s->[t])              -- ^ taskpool transform attach function
       -> ([t]->s->([t],Maybe (t,s)))     -- ^ taskpool transform detach function (local request)
       -> ([t]->s->([t],[t]))             -- ^ taskpool transform split function (remote request)
       -> (s->s->Bool)                    -- ^ state comparison (checks if new state is better than old state)
       -> s                               -- ^ initial state (offline input)
       -> [ofT]                           -- ^ offline input (if non empty, outer list defines the number of workers, else the shorter list does)
       -> [onT]                           -- ^ dynamic input (if non empty, outer list defines the number of workers, else the shorter list does)
       -> [r]                             -- ^ results of workers
distribWPAt places wf inputT resT ttA ttD ttSplit sUpdate st ofTasks onTasks = res where
   res = ringFlAt places id id workers onTasks'
   workers = zipWith (worker) (True:(repeat False)) ofTasks'
   -- length of ontasks and oftasks is adjusted such that the desired number of workers will be instantiated
   (ofTasks',onTasks') | null ofTasks = (repeat Nothing  , map Just onTasks)
                       | null onTasks = (map Just ofTasks, repeat Nothing  )
                       | otherwise    = (map Just ofTasks, map Just onTasks)
   -- worker functionality
   worker isFirst ofTasks onTasks ringIn = (resT ress sFinal,init ringOut) where
     ts = (inputT ofTasks onTasks)
     (ress,ntssNtoMe) = unzip $ genResNReqs $ map wf ts' --seperate
     reqList = Me : mergeS [ringIn,           --merge external Requests
                            ntssNtoMe] rnf    --and nf reduced local Requests
     -- manage Taskpool & Requests
     (ts',ringOut) = control ttA ttSplit ttD sUpdate reqList ts st isFirst
     sFinal = getState $ last ringOut


control:: (Trans t, Trans s) =>
           ([t]->[t]->s->[t]) ->                     --Taskpool Transform Attach
           ([t]->s->([t],[t])) ->                    --Split Policy (remote Req)
           ([t]->s->([t],Maybe (t,s))) ->            --tt Detach (local Req)
           (s->s->Bool)->           --Checks if newState is better than oldState
           [Req [t] s]->[t]->s->Bool->              --reqs,tasks,state,isFirstInRing
           ([(t,s)],[Req [t] s])                    --localTasks,RequestsToRing
control ttA ttSplit ttD sUpdate requests initTasks st isFirst 
  = distribWork requests initTasks Nothing st (0,0)
  where
    --until no tasks left: react on own and Others requests & add new Tasks
    --distribWork :: Trans t => [Req [t] s] -> [t] -> $
    --           Maybe (ChanName (Req [t] s))->(Int,Int)-> ([t],[ReqS [t] s],s)
    distribWork (TasksNMe nts:rs) tS replyCh st sNr      --selfmade Tasks arrive
      = distribWork (Me:rs) (ttA tS nts st) replyCh st sNr --add Tasks and MeReq 
    
    distribWork (NewState st':rs) tS replyCh st sNr      --Updated State arrives
      | sUpdate st' st = let (tS',wReqs') =distribWork rs tS replyCh st' sNr
                         in (tS',(NewState st':wReqs'))    --then check and send 
      | otherwise      = distribWork rs tS replyCh st sNr  --or discard
    
    distribWork (req@(Others tag tCh):rs) [] replyCh st sNr  --Others Request &
      | tag==None = (tS',req:wReqs')            --no tasks left --> pass Request    
      | otherwise = (tS', Others Black tCh : wReqs') --I'm still working -> Black
      where(tS',wReqs') = distribWork rs [] replyCh st sNr 
    
    distribWork (Me:rs) [] replyCh st sNr = --last own Task solved and no new ones
      new (\reqChan (newTS,replyChan) -> --gen new reqChan to get newTS & replyC
       let (tS',wReqs)     = passWhileReceive (mergeS [rs, --wait for fst newTask
                              newTS `pseq` [TasksNMe newTS]] r0) replyChan st sNr --to merge
           tag | isFirst   = Black --First Worker starts Black (For Termination)
               | otherwise = None  --normal Workers with None Tag
       in(case replyCh of          --First own Request into Ring- others dynamic
               Nothing       -> (tS', Others tag reqChan : wReqs)
               Just replyCh' -> parfill replyCh' (Others tag reqChan) 
                                          (tS',wReqs)))
    
    distribWork (Me:rs) tS replyCh st sNr      --local Request and Tasks present
      = let (tS',tDetatch) = ttD tS st         --TaskpoolTransform Detatch
            (tsDetatch,wReqs) = case tDetatch of 
                                 Nothing -> distribWork (Me:rs) [] replyCh st sNr 
                                 Just t  -> distribWork rs tS' replyCh st sNr
        in ((maybeToList tDetatch)++tsDetatch,wReqs) --add Maybe one Task to wf
    
    distribWork reqs@(Others _ tCh :rs) tS replyCh st (s,r)  --foreign Req & have Ts
      = let (holdTs,sendTs) = ttSplit tS st    --split ts to send and ts to hold
            ((tS',wReqs'),replyReq) 
              = new (\replyChan replyReq' ->   --gen ReplyC and send Tasks & new
                 parfill tCh (sendTs,Just replyChan) --ReplyC in Chan of the Req
                 ((distribWork rs holdTs replyCh st (s+1,r)),replyReq')) 
        in case sendTs of                                       --ReplyReqToOutp
            []    -> distribWork reqs [] replyCh st (s,r)       --No tasks left
            (_:_) -> (tS',mergeS [[replyReq],wReqs'] rnf)

--  Pass all until foreign Tasks arrive or Termination starts
--  passWhileRecive :: Trans t => [ReqS [t] s] -> Maybe(ChanName (ReqS [t] s)) 
--                                -> (Int,Int) -> ([t],[ReqS [t] s])
    passWhileReceive (NewState st':rs) replyCh st sNr --Updated State arrives
      | sUpdate st' st =let (tS',wReqs')=passWhileReceive rs replyCh st' sNr
                        in (tS',(NewState st':wReqs'))    --then check and send 
      | otherwise      = passWhileReceive rs replyCh st sNr     --or discard
    
    passWhileReceive (req@(Others None tCh):rs) replyCh st sNr --Req of normal 
      = let (tS',wReqs) = passWhileReceive rs replyCh st sNr --Worker -> pass it
        in (tS',req:wReqs)
    
    passWhileReceive (req@(Others Black tCh ):rs) replyCh st (s,r) --Black Req
      | (not isFirst) = (tS',req :wReqs)  --normal Workers: pass it
      | otherwise     = (tS',req':wReqs)  --First Worker: new Round starts White
      where (tS',wReqs) = passWhileReceive rs replyCh st (s,r)
            req'= Others (White s r 0 0) tCh          --Start with own Counters
    
    passWhileReceive (Others (White s1 r1 s2 r2) tCh :rs) replyCh st (s,r) 
      | (not isFirst) = (tS',req':wReqs)  --Normal Workers: add Counter and pass
      --4 counters equal -> pass Black as end of reqs Symbol, start Termination
      | otherwise = if terminate 
                    then ([],Others Black tCh : (termRing rs ++ [NewState st]))
                    else (tS',req'':wReqs) --no Termination
      where (tS',wReqs) = passWhileReceive rs replyCh st (s,r)
            req'        = Others (White (s1+s) (r1+r) s2 r2) tCh --add Counters
            req''       = Others (White s r s1 r1) tCh --Move Counters->NewTurn 
            terminate   = (s1==r1)&&(r1==s2)&&(s2==r2)       --Check Termination

    passWhileReceive (TasksNMe newTS:rs) replyCh st (s,r)    --Task List arrives
      | null newTS = ([],(termRing rs)    --got empty newTs -> begin Termination
                          ++ [NewState st]) --attach final State at the End
      | otherwise  = (distribWork (Me:rs) newTS replyCh st (s,r+1)) --have newTs


data Req t s = 
       Me |                             --Work Request of local Worker - Fuction
               --Request of other Workers with Tag, and Chanel to Send Tasks and
       Others { getTag       :: Tag, 
                getReplyChan :: (ChanName (t,Maybe (ChanName(Req t s))))
              } |     --Reply Chanel
       TasksNMe { getTask :: t} |  -- New Tasks and additional Me Request to add
       NewState { getState :: s }

instance (NFData t, NFData s) => NFData (Req t s)
  where 
   rnf Me = ()      
   rnf (Others t c) = rnf t `pseq` rnf c
   rnf (TasksNMe t) = rnf t
   rnf (NewState s) = rnf s
            
instance (Trans t,Trans s) => Trans (Req t s)

data Tag = Black | --no Termination Situation / Term Mode: Last Request in Ring
           White Int Int Int Int |  --check Termination Situation:(send&recv)
           None                       --Request of normal Worker

instance NFData Tag 
    where rnf Black = ()
          rnf None  = ()
          rnf (White a b c d) = rnf (a,b,c,d)

instance Eq Tag where
    Black   == Black   = True
    None    == None    = True
    White a b c d == White a' b' c' d' = (a,b,c,d)==(a',b',c',d')
    a       == b       = False


---------------------auxiliary-----------------
termRing :: (Trans t,Trans s) => [Req [t] s] -> [Req [t] s]
termRing []                       = []        -- Predecessors tells no more reqs
termRing (Others Black tCh : _ ) = parfill tCh ([],Nothing) [] --reply last req
termRing (Others None  tCh : rs) = parfill tCh ([],Nothing) termRing rs --reply
termRing (_                : rs) = termRing rs          --ignore isFirsts reply

genResNReqs :: (NFData t,NFData r',NFData s)=>
               [(Maybe (r',s),[t])] -> [(Maybe (r',s),Req [t] s)]
genResNReqs [] = []                           --No more tasks
genResNReqs ((res@(Nothing,nts)):ress)     --No new State -> Attach new Tasks 
  = rnf res `pseq` (Nothing,TasksNMe nts): genResNReqs ress
genResNReqs ((res@(Just (r,st),nts)):ress)   --New State found -> Attach
  = rnf res `pseq` (Just (r,st),NewState st): genResNReqs ((Nothing,nts):ress)


-----------------------------DEPRECATED---------------------------------
-- | Deprecated, same as 'workpoolSortedNonBlock'
{-# DEPRECATED masterWorker "better use workpoolSortedNonBlock instead" #-}
masterWorker :: (Trans a, Trans b) => 
                Int -> Int -> (a -> b) -> [a] -> [b]
masterWorker = workpoolSortedNonBlock

-- | Deprecated, same as 'wpNested'
{-# DEPRECATED mwNested "better use wpNested instead" #-}
mwNested :: forall t r . (Trans t, Trans r) => 
               [Int] -> [Int] -> (t -> r) -> [t] -> [r]           
mwNested = wpNested

-- | Deprecated, same as 'wpDNI'
{-# DEPRECATED mwDNI "better use wpDNI instead" #-}
mwDNI :: (Trans t, Trans r) =>
         Int              
         -> Int           
         -> Int           
         -> Int           
         -> (t -> (r,[t]))
         -> [t]          
         -> [r]          
mwDNI = wpDNI
          
-- | Deprecated, same as 'wpDynNested'
{-# DEPRECATED mwDynNested "better use wpDynNested instead" #-}
mwDynNested :: forall t r . (Trans t, Trans r) => 
               [Int]           
               -> [Int]        
               -> (t -> (r,[t]))
               -> [t]           
               -> [r]           
mwDynNested = wpDynNested