{-# LANGUAGE CPP, ScopedTypeVariables #-}
-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Parallel.Eden.EdenSkel.TopoSkels
-- Copyright   :  (c) Philipps Universitaet Marburg 2009-2012
-- License     :  BSD-style (see the file LICENSE)
-- 
-- Maintainer  :  eden@mathematik.uni-marburg.de
-- Stability   :  beta
-- Portability :  not portable
--
-- This Haskell module defines topology skeletons for the parallel functional
-- language Eden. Topology skeletons are skeletons that implement a network of
-- processes interconnected by a characteristic communication topology.
--
-- Depends on GHC. Using standard GHC, you will get a threaded simulation of Eden. 
-- Use the forked GHC-Eden compiler from http:\/\/www.mathematik.uni-marburg.de/~eden 
-- for a parallel build.
--
-- Eden Group ( http:\/\/www.mathematik.uni-marburg.de/~eden )


module Control.Parallel.Eden.EdenSkel.TopoSkels (
  -- * Skeletons that are primarily characterized by their topology.
    
  -- ** Pipeline skeletons
  -- |
  pipe, pipeRD
  -- ** Ring skeletons
  -- |
  ,ringSimple, ring, ringFl, ringAt, ringFlAt
  -- ** Torus skeleton
  -- |  
  ,torus 
  -- ** The Hypercube skeleton
  -- |

  -- ** The All-To-All skeleton 
  -- | The allToAll skeleton allows distributed data exchange and
  -- transformation including data of all processes. Input and output
  -- are provided as remote data. A typical application is the
  -- distributed transposition of a distributed matrix.
  ,allToAllRD, parTransposeRD           
  -- ** The All-Reduce skeleton   
  -- | The All-Reduce skeleton uses a butterfly topology to reduce the data of
  -- participating processes P in log(|P|) communication stages. Input
  -- and output are provided as remote data.
  --
  -- Notice: The number of processes has to be a power of 2!
  ,allReduceRD
  
    ) where
#if defined( __PARALLEL_HASKELL__ ) || defined (NOT_PARALLEL)
import Control.Parallel.Eden
#else
import Control.Parallel.Eden.EdenConcHs
#endif
import Control.Parallel.Eden.EdenSkel.Auxiliary
import Control.Parallel.Eden.EdenSkel.MapSkels
import Data.List

   
   
-- |Simple pipe where the parent process creates all pipe processes. The processes communicate their results via the caller process. 
pipe :: forall a . Trans a => 
        [a -> a]    -- ^functions of the pipe
        -> a        -- ^input
        -> a        -- ^output
pipe fs = unLiftRD (pipeRD fs)
  
-- |Process pipe where the processes communicate their Remote Data handles via the caller process but fetch the actual data from their predecessor processes
pipeRD :: forall a . Trans a => 
          [a -> a]    -- ^functions of the pipe
          -> RD a     -- ^remote input
          -> RD a     -- ^remote output
pipeRD fs xs = (last outs) where 
  outs = spawn ps $ lazy $ xs : outs
  ps :: [Process (RD a) (RD a)]
  ps = map (process . liftRD) fs


-- | Simple ring skeleton (tutorial version) 
-- using remote data for providing direct inter-ring communication  
-- without input distribution and output combination  
ringSimple      :: (Trans i, Trans o, Trans r) =>
               (i -> r -> (o,r))  -- ^ ring process function
               -> [i] -> [o]      -- ^ input output mapping
ringSimple f is =  os
  where
    (os,ringOuts)  = unzip (parMap (toRD $ uncurry f)
                                   (zip is $ lazy ringIns))
    ringIns        = rightRotate ringOuts

toRD :: (Trans i, Trans o, Trans r) =>
        ((i,r) -> (o,r))          -- ^ ring process function
        -> ((i, RD r) -> (o, RD r)) -- ^ -- with remote data
toRD  f (i, ringIn)  = (o, release ringOut)
  where (o, ringOut) = f (i, fetch ringIn)

rightRotate    :: [a] -> [a]
rightRotate [] =  []
rightRotate xs =  last xs : init xs

-- | The ringFlAt establishes a ring topology, the ring process functions
-- transform the initial input of a ring process and the input stream from the ring into the 
-- ring output stream and the ring processes' final result. Every ring process  
-- applies its individual function which e.g. allows to route individual offline input into the 
-- ring processes. This version uses explicit placement.
ringFlAt :: (Trans a,Trans b,Trans r) =>
        Places                     -- ^where to put workers
        -> (i -> [a])              -- ^distribute input
        -> ([b] -> o)              -- ^combine output
        -> [(a -> [r] -> (b,[r]))] -- ^ring process fcts
        -> i                       -- ^ring input
        -> o                       -- ^ring output
ringFlAt places distrib combine fs i = combine os where
  (os, ringOuts) = unzip $ spawnFAt places (map (toRD . uncurry) fs) 
                                           (zip (distrib i) $ lazy ringIns)
  ringIns        = rightRotate ringOuts

-- | The ringFl establishes a ring topology, the ring process functions
-- transform the initial input of a ring process and the input stream from the ring into the 
-- ring output stream and the ring processes' final result. Every ring process 
-- applies an individual function which e.g. allows to route individual offline input into the 
-- ring processes. Use ringFlAt if explicit placement is desired.
ringFl  :: (Trans a,Trans b,Trans r) =>
        (i -> [a])                 -- ^distribute input
        -> ([b] -> o)              -- ^combine output
        -> [(a -> [r] -> (b,[r]))] -- ^ring process fcts
        -> i                       -- ^ring input
        -> o                       -- ^ring output
ringFl = ringFlAt [0]

-- | Skeleton @ringAt@ establishes a ring topology, the ring process function
-- transforms the initial input of a ring process and the input stream from the ring into the 
-- ring output stream and the ring processes' final result. The 
-- same function is used by every ring process. Use ringFlAt
-- if you need different functions in the processes. This version uses explicit placement.
ringAt :: (Trans a,Trans b,Trans r) =>
        Places                    -- ^where to put workers
        -> (i -> [a])             -- ^distribute input
        -> ([b] -> o)             -- ^combine output
        -> (a -> [r] -> (b,[r]))  -- ^ring process fct
        -> i                      -- ^ring input
        -> o                      -- ^ring output 
ringAt places distrib combine f i =
  ringFlAt places distrib combine [f] i 

-- | The ring establishes a ring topology, the ring process function
-- transforms the initial input of a ring process and the input stream from the ring into the 
-- ring output stream and the ring processes final result. The 
-- same function is used by every ring process. Use ringFl
-- if you need different functions in the processes. Use ringAt if 
-- explicit placement is desired.
ring :: (Trans a,Trans b,Trans r) =>
        (i -> [a])                -- ^distribute input
        -> ([b] -> o)             -- ^combine output
        -> (a -> [r] -> (b,[r]))  -- ^ring process fct
        -> i                      -- ^ring input
        -> o                      -- ^ring output
ring = ringAt [0]



-- | Parallel torus skeleton (tutorial version) with stream rotation in 2 directions: initial inputs for each torus element are given. The node function is used on each torus element to transform the initial input and a stream of inputs from each direction to a stream of outputs to each direction. Each torus input should have the same size in both dimensions, otherwise the smaller input will determine the size of the torus.
torus :: (Trans a, Trans b, Trans c, Trans d) =>
         (c -> [a] -> [b] -> (d,[a],[b])) -- ^ node function
         -> [[c]] -> [[d]]                -- ^ input-output mapping
torus f inss = outss
  where
    t_outss = spawnPss (repeat (repeat (ptorus f))) t_inss    -- optimised
    (outss,outssA,outssB) = unzip3 (map unzip3 t_outss)
    inssA   = map rightRotate outssA
    inssB   = rightRotate outssB
    t_inss  = zipWith3 lazyzip3 inss (lazy inssA) (lazy inssB)
    lazyzip3 as bs cs = zip3 as (lazy bs) (lazy cs)

-- each individual process of the torus (tutorial version)
ptorus :: (Trans a, Trans b, Trans c, Trans d) =>
          (c -> [a] -> [b] -> (d,[a],[b])) ->
          Process (c,RD [a],RD [b])
                  (d,RD [a],RD [b])
ptorus f 
 = process (\ (fromParent,          inA,           inB) ->
               let (toParent, outA, outB) = f fromParent inA' inB'
                   (inA',inB')            = fetch2 inA inB
               in  (toParent,   release outA,  release outB))


-- | The allToAllRD skeleton creates as many processes as elements in the input list (@np@). 
-- The processes are all-to-all connected, each process' input is transformed to 
-- @np@ intermediate values by the first parameter function, where the @i@-th value
-- will be sent to process @i@. The second transformation function combines the initial
-- input and the @np@ received intermediate values to the final output.
allToAllRD :: forall a b i. (Trans a, Trans b, Trans i) =>
            (Int -> a -> [i])      -- ^transform before bcast (num procs, input, sync-data out)
            -> (a -> [i] ->b)      -- ^transform after bcast (input, sync-data in, output)
            -> [RD a]              -- ^remote input for each process
            -> [RD b]              -- ^remote output for each process
allToAllRD t1 t2 xs = res where
  n = length xs           --same amount of procs as #xs
  (res,iss) = n `pseq` unzip $ parMap (uncurry p) inp
  inp       = zip xs $ lazy $ transpose iss

  p :: RD a-> [RD i]-> (RD b,[RD i])
  p xRD theirIs = (resF theirIs, myIsF x) where
    x      = fetch xRD
    myIsF  = releaseAll . t1 n
    resF   = release . t2 x . fetchAll


-- works similar for splitIntoN and unsplit (concat)??? 
-- |Parallel transposition for matrices which are row-wise round robin distributed among the machines, the transposed result matrix is also row-wise round robin distributed.
parTransposeRD :: Trans b =>  
                  [RD [[b]]]    -- ^input list of remote partial matrices
                  -> [RD [[b]]] -- ^output list of remote partial matrices
parTransposeRD = allToAllRD (\ n -> unshuffle n . transpose)
                            (\ _ -> map shuffle . transpose)


-- | Performs an all-reduce with the reduce function using a butterfly scheme.
-- The input list should have length 2^i, where i is an arbitrary natural number.
-- If not, the input list will be truncated to the next smaller power of two.
-- The initial transformation is applied in the processes to obtain the values
-- that will be reduced. The final combine function is used to create a processes
-- result from the initial input and the reduced value.
allReduceRD :: forall a b c. (Trans a, Trans b, Trans c) =>
             (a -> b)               -- ^initial transform function
             -> (b -> b -> b)       -- ^reduce function
             -> (a -> b -> c)       -- ^final combine function
             -> [RD a]              -- ^remote input
             -> [RD c]              -- ^remote output
allReduceRD initF redF resF rdAs = rdCs where
  steps = (floor . logBase 2 . fromIntegral . length) rdAs
  rdAs' = take (2^steps) rdAs          --cut input to power of 2

 -- topology, inputs and instantiation
  (rdBss,rdCs) = unzip $ parMap (uncurry p) inp         --steps in rows
  bufly = zipWith bitFlipF [1..steps] $ transpose rdBss
  inp   = zip rdAs' $ lazy $ transpose bufly            --steps in cols

 -- process functionality and abstraction
  p :: RD a -> [RD b] -> ([RD b], RD c)
  p rdA theirReds  = (reduced, release $ resF a $ head res) where
    reduced        = (releaseAll . scanl1 redF) toReduce
    (toReduce,res) = splitAt steps $ initF a  : fetchAll theirReds'
    theirReds'     = zipWith (curry snd) [1..steps] $ lazy theirReds
    a              = fetch rdA

bitFlipF :: Int -> [a] -> [a]
bitFlipF step xs = (shuffle . flipAtHalfF . unshuffle d) xs where
  d = (2 ^ step)
  flipAtHalfF xs = let (xs1, xs2) = splitAt (d `div` 2) xs
                   in xs2 ++ xs1