module Control.Parallel.Eden.ParPrimConcHs
(noPe, selfPe
, ChanName'
, fork
, createC
, connectToPort
, sendData
, Mode(..)
, simInitPes
)
where
import GHC.Base(unsafeCoerce# )
import qualified Data.Map as Map
import Data.Map(Map)
import System.IO.Unsafe
import Control.Concurrent
import GHC.Conc(numCapabilities)
import Control.DeepSeq
trace :: String -> IO ()
#ifdef TRACE
trace msg = do me <- myThreadId
(pe,p,_) <- myInfo
putStrLn (show (pe,p,me) ++ msg)
#else
trace _ = return ()
#endif
toIO :: a -> IO ()
toIO x = case cast x of
Nothing -> error "IO? wrong cast"
Just io -> io
cast :: a -> Maybe b
cast x = Just (unsafeCoerce# x)
toDyn :: a -> Untyped
toDyn x = unsafeCoerce# x
fromDyn :: a -> Untyped -> a
fromDyn _ unit = unsafeCoerce# unit
idSupply :: MVar Int
idSupply = unsafePerformIO (newMVar 1)
freshId :: IO Int
freshId = do i <- takeMVar idSupply
putMVar idSupply (i+1)
return i
type ThreadInfo = (Int,Int,Maybe Int)
thrs :: MVar (Map ThreadId (Int,Int,Maybe Int))
thrs = unsafePerformIO (myThreadId >>= \id ->
newMVar (Map.insert id (1,1,Nothing) Map.empty ))
myInfo :: IO ThreadInfo
myInfo = do tid <- myThreadId
thrMap <- readMVar thrs
case Map.lookup tid thrMap of
Nothing -> error (show tid ++ " not found!")
Just x -> return x
myChan :: IO Int
myChan = do (_,_,c) <- myInfo
case c of
Nothing -> do tid <- myThreadId
error (show tid ++ " not connected!")
Just x -> return x
removeThread :: ThreadId -> IO ()
removeThread id = do trace ("Kill " ++ show id)
thrMap <- takeMVar thrs
putMVar thrs (Map.delete id thrMap)
type Untyped = ()
chs :: MVar (Map Int (Maybe ThreadId, MVar Untyped))
chs = unsafePerformIO (newMVar Map.empty)
registerSender :: Int -> IO ()
registerSender id
= do cMap <- takeMVar chs
tid <- myThreadId
case Map.lookup id cMap of
Nothing -> error $ "missing MVar for Id " ++ show id
Just (t,var) -> if (t == Nothing || t == Just tid)
then do putMVar chs
(Map.insert id (Just tid,var) cMap)
else error ("duplicate connect message: "
++ show tid ++ "->"
++ show id)
getRemoveCVar :: Int -> IO (MVar Untyped)
getRemoveCVar id = do cMap <- takeMVar chs
case Map.lookup id cMap of
Nothing -> error ("missing MVar for Id "
++ show id)
Just (_,var) -> do putMVar chs (Map.delete id cMap)
return var
updateGetCVar :: MVar Untyped -> Int -> IO (MVar Untyped )
updateGetCVar newVar id
= do cMap <- takeMVar chs
tid <- myThreadId
case Map.lookup id cMap of
Nothing -> error $ "missing MVar for Id " ++ show id
Just (t,var) -> if (t == Nothing || t == Just tid)
then do putMVar chs
(Map.insert id (Just tid,newVar) cMap)
return var
else error "1:1 restriction violated"
pesVar :: MVar ([Int],())
pesVar = unsafePerformIO (newMVar (placementList,()))
where placementList = leftrotate peNums
peNums = if numCapabilities == 1
then [1..4]
else [1..numCapabilities]
leftrotate :: [a] -> [a]
leftrotate [] = []
leftrotate (x:xs) = xs ++ [x]
simInitPes :: Int -> IO ()
simInitPes pes | pes < 1 = error "invalid number of PEs requested"
| otherwise = do (_,test) <- takeMVar pesVar
trace ("Init. with " ++ show pes ++ " PEs.")
test `seq`
putMVar pesVar
([2..pes+1],error "double simInitPes")
choosePe :: IO Int
choosePe = do pe <- selfPe
trace "choosing PE"
(list,test) <- takeMVar pesVar
let place = list!!(pe1)
pes = length list
new = if place == pes then 1 else place+1
newList = take (pe1) list ++ new:drop pe list
putMVar pesVar (newList,test)
trace "chosen"
return place
noPe :: IO Int
noPe = do (p,_) <- readMVar pesVar
return (length p)
selfPe :: IO Int
selfPe = do (pe,_,_) <- myInfo
return pe
data ChanName' a = Chan Int Int Int
deriving (Show)
fork :: IO () -> IO ()
fork action = do (pe,p,_) <- myInfo
trace ("new thread")
tMap <- takeMVar thrs
tid <- forkIO action'
putMVar thrs (Map.insert tid (pe,p,Nothing) tMap)
trace ("forked! ID=" ++ show tid)
where action' = do id <- myThreadId
trace ("run thread " ++ show id)
action
removeThread id
createC :: IO ( ChanName' a, a )
createC = do (!pe,!p,_) <- myInfo
!id <- freshId
var <- newEmptyMVar
trace ("new channel in " ++ show (pe,p) ++ ", ID=" ++ show id)
cList <- takeMVar chs
let x = unsafePerformIO $ readMVar var
x' = fromDyn (error "createC cast") x
putMVar chs (Map.insert id (Nothing,var) cList)
trace "channel created!"
return (Chan pe p id, x' )
connectToPort :: ChanName' a -> IO ()
connectToPort (Chan pe p cid)
= do id <- myThreadId
tlist <- takeMVar thrs
putMVar thrs (Map.updateWithKey newChan id tlist)
where newChan _ (pe,proc,_) = Just (pe,proc, Just cid)
data Mode = Connect
| Data
| Stream
| Instantiate Int
sendData :: Mode -> a -> IO ()
sendData Connect _ = do ch <- myChan
registerSender ch
sendData Data d = do cd <- myChan
var <- getRemoveCVar cd
putMVar var $ toDyn d
sendData Stream d = do cd <- myChan
v2 <- newEmptyMVar
var <- updateGetCVar v2 cd
let x = unsafePerformIO $ readMVar v2
newList = d: fromDyn undefined x
putMVar var $ toDyn newList
sendData (Instantiate maybePe) d
= do newPid <- freshId
pes <- noPe
pe <- if maybePe == 0 then choosePe
else return (1+((maybePe1) `mod` pes))
trace ("new process on PE " ++ show pe)
tlist <- takeMVar thrs
id <- forkIO action
putMVar thrs (Map.insert id (pe,newPid,Nothing) tlist)
trace ("process,thread: " ++ show (newPid,id))
where action = do id <- myThreadId
trace ("process starting")
toIO d
removeThread id