#include "fusion-phases.h"
module Data.Array.Parallel.Unlifted.Parallel.Segmented
( replicateRSUP
, appendSUP
, appendSUP_old
, appendSUPV
, foldRUP
, sumRUP)
where
import Data.Array.Parallel.Unlifted.Distributed
import Data.Array.Parallel.Unlifted.Distributed.What
import Data.Array.Parallel.Unlifted.Parallel.Basics
import Data.Array.Parallel.Unlifted.Parallel.UPSegd (UPSegd)
import Data.Array.Parallel.Unlifted.Sequential.USegd (USegd)
import Data.Array.Parallel.Unlifted.Sequential.Vector as Seq
import qualified Data.Array.Parallel.Unlifted.Vectors as Vs
import qualified Data.Array.Parallel.Unlifted.Parallel.UPSegd as UPSegd
import qualified Data.Array.Parallel.Unlifted.Sequential as Seq
import qualified Data.Array.Parallel.Unlifted.Sequential.USegd as USegd
import qualified Data.Array.Parallel.Unlifted.Sequential.USSegd as USSegd
import qualified Data.Array.Parallel.Unlifted.Parallel.UPVSegd as UPVSegd
import qualified Data.Array.Parallel.Unlifted.Parallel.UPSSegd as UPSSegd
import Data.Vector.Fusion.Stream.Monadic ( Stream(..), Step(..) )
import Data.Vector.Fusion.Stream.Size ( Size(..) )
import qualified Data.Vector.Fusion.Stream as S
import GHC.Exts
here :: String -> String
here s = "Data.Array.Parallel.Unlifted.Parallel.Segmented." Prelude.++ s
replicateRSUP :: Unbox a => Int -> Vector a -> Vector a
replicateRSUP n xs
= UPSegd.replicateWithP (UPSegd.fromLengths (replicateUP (Seq.length xs) n)) xs
appendSUP
:: Unbox a
=> UPSegd
-> UPSegd
-> Vector a
-> UPSegd
-> Vector a
-> Vector a
appendSUP segd !xd !xs !yd !ys
= joinD theGang balanced
. mapD (What "appendSUP/append") theGang append
$ UPSegd.takeDistributed segd
where append ((segd',seg_off),el_off)
= Seq.unstream
$ appendSegS (UPSegd.takeUSegd xd) xs
(UPSegd.takeUSegd yd) ys
(USegd.takeElements segd')
seg_off el_off
appendSegS
:: Unbox a
=> USegd
-> Vector a
-> USegd
-> Vector a
-> Int
-> Int
-> Int
-> S.Stream a
appendSegS !xd !xs !yd !ys !n seg_off el_off
= Stream next state (Exact n)
where
!xlens = USegd.takeLengths xd
!ylens = USegd.takeLengths yd
index1 = Seq.index (here "appendSegS")
index2 = Seq.index (here "appendSegS")
unbox (I# i) = i
state
| n == 0 = ASDo
{ as_takefrom = 0#
, as_seg_off = 0#
, as_xs_index = 0#
, as_ys_index = 0#
, as_next_swap= 0#
, as_remain = 0# }
| el_off < xlens `index1` seg_off
= let xi = (USegd.takeIndices xd `index1` seg_off) + el_off
yi = USegd.takeIndices yd `index1` seg_off
swap = (USegd.takeLengths xd `index1` seg_off) el_off
in ASDo
{ as_takefrom = 0#
, as_seg_off = unbox seg_off
, as_xs_index = unbox xi
, as_ys_index = unbox yi
, as_next_swap= unbox swap
, as_remain = unbox n }
| otherwise
= let
xi = (USegd.takeIndices xd `index1` seg_off) + (USegd.takeLengths xd `index1` seg_off)
el_off' = el_off USegd.takeLengths xd `index1` seg_off
yi = (USegd.takeIndices yd `index1` seg_off) + el_off'
swap = (USegd.takeLengths yd `index1` seg_off) el_off'
in ASDo
{ as_takefrom = 1#
, as_seg_off = unbox seg_off
, as_xs_index = unbox xi
, as_ys_index = unbox yi
, as_next_swap= unbox swap
, as_remain = unbox n }
next ASDo{as_remain=0#} = return Done
next s@ASDo{as_takefrom=0#}
| as_next_swap s ==# 0#
= return $ Skip (s{as_takefrom=1#, as_next_swap= unbox (ylens `index1` I# (as_seg_off s))})
| otherwise = return $ Yield (xs `index2` I# (as_xs_index s)) (inc s)
next s
| as_next_swap s ==# 0#
= let seg' = as_seg_off s +# 1#
in return $ Skip (s {as_takefrom=0#, as_seg_off=seg', as_next_swap= unbox (xlens `index1` I# seg')})
| otherwise = return $ Yield (ys `index2` I# (as_ys_index s)) (inc s)
inc s@ASDo{as_takefrom=0#, as_xs_index=xi, as_next_swap=swap, as_remain=n'}
= s{as_xs_index=xi +# 1#, as_next_swap=swap -# 1#, as_remain=n' -# 1#}
inc s@ASDo{as_ys_index=yi, as_next_swap=swap, as_remain=n'}
= s{as_ys_index=yi +# 1#, as_next_swap=swap -# 1#, as_remain=n' -# 1#}
data AppendState
= ASDo
{ as_takefrom :: Int#
, as_seg_off :: Int#
, as_xs_index :: Int#
, as_ys_index :: Int#
, as_next_swap:: Int#
, as_remain :: Int#
}
appendSUPV
:: (Vs.Unboxes a, Unbox a)
=> UPSegd
-> UPVSegd.UPVSegd
-> Vs.Vectors a
-> UPVSegd.UPVSegd
-> Vs.Vectors a
-> Vector a
appendSUPV segd !xd !xs !yd !ys
= joinD theGang balanced
. mapD (What "appendSUPV/append") theGang append
$ UPSegd.takeDistributed segd
where append ((segd',seg_off),el_off)
= Seq.unstream
$ appendUPVSegS xd xs
yd ys
(USegd.takeElements segd')
seg_off el_off
appendUPVSegS
:: Vs.Unboxes a
=> UPVSegd.UPVSegd
-> Vs.Vectors a
-> UPVSegd.UPVSegd
-> Vs.Vectors a
-> Int
-> Int
-> Int
-> S.Stream a
appendUPVSegS !xd !xs !yd !ys !n seg_off el_off
= Stream next state (Exact n)
where
!xvsegs= UPVSegd.takeVSegidsRedundant xd
!yvsegs= UPVSegd.takeVSegidsRedundant yd
!xssegd= UPSSegd.takeUSSegd $ UPVSegd.takeUPSSegdRedundant xd
!yssegd= UPSSegd.takeUSSegd $ UPVSegd.takeUPSSegdRedundant yd
!xsegd = USSegd.takeUSegd xssegd
!ysegd = USSegd.takeUSegd yssegd
xpseg s = index1 xvsegs "xpseg" s
ypseg s = index1 yvsegs "ypseg" s
!xseglens = USegd.takeLengths xsegd
!yseglens = USegd.takeLengths ysegd
!xsrc = USSegd.takeSources xssegd
!ysrc = USSegd.takeSources yssegd
!xstrt = USSegd.takeStarts xssegd
!ystrt = USSegd.takeStarts yssegd
xplen s = index1 xseglens "xplen1" (xpseg s)
yplen s = index1 yseglens "yplen1" (ypseg s)
gdata gs st
= let !src = avs_ssrc st
!strt = avs_sstart st
!ix = avs_index st
in index2 gs (I# src) (I# (strt +# ix))
getscatter gpseg gsrcs gstrts segid
= let !phys = gpseg segid in
let !src = index1 gsrcs "src" phys in
let !strt = index1 gstrts "strt" phys in
(src, strt)
index1 v h i = Seq.index (here $ "appendUVSegS:" Prelude.++ h) v i
index2 v i1 i2 = Vs.index2 (here "appendUVSegS") v i1 i2
unbox (I# i) = i
state
| n == 0 = ASUPVDo
{ avs_takefrom = 0#
, avs_seg_off = 0#
, avs_index = 0#
, avs_next_swap= 0#
, avs_remain = 0#
, avs_sstart = 0#
, avs_ssrc = 0# }
| el_off < xplen seg_off
= let (src,strt) = getscatter xpseg xsrc xstrt seg_off
swap = (xplen seg_off) el_off
in ASUPVDo
{ avs_takefrom = 0#
, avs_seg_off = unbox seg_off
, avs_index = unbox el_off
, avs_next_swap= unbox swap
, avs_remain = unbox n
, avs_sstart = unbox strt
, avs_ssrc = unbox src }
| otherwise
= let (src,strt) = getscatter ypseg ysrc ystrt seg_off
el_off' = el_off xplen seg_off
swap = (yplen seg_off) el_off'
in ASUPVDo
{ avs_takefrom = 1#
, avs_seg_off = unbox seg_off
, avs_index = unbox el_off'
, avs_next_swap= unbox swap
, avs_remain = unbox n
, avs_sstart = unbox strt
, avs_ssrc = unbox src }
next ASUPVDo{avs_remain=0#} = return Done
next s@ASUPVDo{avs_takefrom=0#}
| avs_next_swap s ==# 0# =
let seg' = I# (avs_seg_off s)
(src,strt) = getscatter ypseg ysrc ystrt seg'
in return $ Skip $
s {
avs_takefrom = 1#
, avs_index = 0#
, avs_next_swap = unbox (yplen seg')
, avs_sstart = unbox strt
, avs_ssrc = unbox src }
| otherwise = return $ Yield (gdata xs s) (inc s)
next s
| avs_next_swap s ==# 0#
= let seg' = I# (avs_seg_off s +# 1#)
(src,strt) = getscatter xpseg xsrc xstrt seg'
in return $ Skip $
s {
avs_takefrom = 0#
, avs_seg_off = unbox seg'
, avs_index = 0#
, avs_next_swap = unbox (xplen seg')
, avs_sstart = unbox strt
, avs_ssrc = unbox src }
| otherwise = return $ Yield (gdata ys s) (inc s)
inc s@ASUPVDo{avs_index=ix, avs_next_swap=swap, avs_remain=n'}
= s{avs_index=ix +# 1#, avs_next_swap=swap -# 1#, avs_remain=n' -# 1#}
data AppendUPVState
= ASUPVDo
{ avs_takefrom :: Int#
, avs_seg_off :: Int#
, avs_index :: Int#
, avs_next_swap:: Int#
, avs_remain :: Int#
, avs_sstart :: Int#
, avs_ssrc :: Int#
}
appendSUP_old
:: Unbox a
=> UPSegd
-> UPSegd -> Vector a
-> UPSegd -> Vector a
-> Vector a
appendSUP_old segd !xd !xs !yd !ys
= joinD theGang balanced
. mapD (What "appendSUP_old/append") theGang append
$ UPSegd.takeDistributed segd
where append ((segd',seg_off),el_off)
= Seq.unstream
$ appendSegS_old (UPSegd.takeUSegd xd) xs
(UPSegd.takeUSegd yd) ys
(USegd.takeElements segd')
seg_off el_off
appendSegS_old
:: Unbox a
=> USegd
-> Vector a
-> USegd
-> Vector a
-> Int
-> Int
-> Int
-> S.Stream a
appendSegS_old !xd !xs !yd !ys !n seg_off el_off
= Stream next state (Exact n)
where
!xlens = USegd.takeLengths xd
!ylens = USegd.takeLengths yd
index1 = Seq.index (here "appendSegS")
index2 = Seq.index (here "appendSegS")
state
| n == 0 = Nothing
| el_off < xlens `index1` seg_off
= let i = (USegd.takeIndices xd `index1` seg_off) + el_off
j = USegd.takeIndices yd `index1` seg_off
k = (USegd.takeLengths xd `index1` seg_off) el_off
in Just (False, seg_off, i, j, k, n)
| otherwise
= let
i = (USegd.takeIndices xd `index1` seg_off) + (USegd.takeLengths xd `index1` seg_off)
el_off' = el_off USegd.takeLengths xd `index1` seg_off
j = (USegd.takeIndices yd `index1` seg_off) + el_off'
k = (USegd.takeLengths yd `index1` seg_off) el_off'
in Just (True, seg_off, i, j, k, n)
next Nothing = return Done
next (Just (False, seg, i, j, k, n'))
| n' == 0 = return Done
| k == 0 = return $ Skip (Just (True, seg, i, j, ylens `index1` seg, n'))
| otherwise = return $ Yield (xs `index2` i) (Just (False, seg, i+1, j, k1, n'1))
next (Just (True, seg, i, j, k, n'))
| n' == 0 = return Done
| k == 0
= let !seg' = seg+1
in return $ Skip (Just (False, seg', i, j, xlens `index1` seg', n'))
| otherwise = return $ Yield (ys `index2` j) (Just (True, seg, i, j+1, k1, n'1))
foldRUP :: (Unbox a, Unbox b) => (b -> a -> b) -> b -> Int -> Vector a -> Vector b
foldRUP f z !segSize xs
= joinD theGang unbalanced
(mapD (What "foldRUP/foldRU") theGang
(Seq.foldlRU f z segSize)
(splitAsD theGang (mapD (What "foldRUP/segSize") theGang (*segSize) dlen) xs))
where
noOfSegs = Seq.length xs `div` segSize
dlen = splitLenD theGang noOfSegs
sumRUP :: (Num e, Unbox e) => Int -> Vector e -> Vector e
sumRUP = foldRUP (+) 0