module MultiSetRewrite.StoreRepresentation where
import IO
import Data.IORef
import Control.Concurrent
import Control.Concurrent.STM
import System.IO.Unsafe
import Data.List
import qualified MultiSetRewrite.ConcurrentList as L
import qualified MultiSetRewrite.ConcurrentBag as B
import MultiSetRewrite.Base
import MultiSetRewrite.RuleSyntax
import MultiSetRewrite.RuleCompiler
type HashIdx = Int
data HashOp msg =
HashOp { numberOfTables :: HashIdx
, hashMsg :: msg -> HashIdx }
data Store msg = Store { msgTable :: [B.Bag (InternalMsg msg)],
hashOp :: HashOp msg
}
type Location msg = IORef (L.List (InternalMsg msg))
distribution = 7
instance (EMatch msg, Eq msg, Show msg) => RuleCompiler (Store msg)
msg (Location msg)
HashIdx (B.Iterator (InternalMsg msg)) where
atomicVerifyAndDeleteCnt _ body checks =
let
reVerify [] = return True
reVerify (n:ns) = do
let n' = case n of
Verify n -> n
VerifyAndDelete n -> n
b <- readTVar n'
if b then reVerify ns
else return False
setFalse [] = return ()
setFalse ((Verify _):ns) = setFalse ns
setFalse ((VerifyAndDelete n):ns) = do
writeTVar n False
setFalse ns
accDeletes [] = return ()
accDeletes ((Verify _,_):ns) = accDeletes ns
accDeletes ((VerifyAndDelete msg,_):ns) = do
node <- readIORef msg
writeIORef msg (L.DelNode {L.verify = L.verify node,
L.next = L.next node})
accDeletes ns
checkNode (msg,cnt) = do
node <- readIORef msg
b <- atomically $ readTVar (L.verify node)
if b then return Nothing
else return (Just cnt)
pickCont [] = error "pickCont: impossible, there must be a failed node"
pickCont (c:rest) = do
let (msg,cnt) = case c of
(Verify msg, cnt) -> (msg, cnt)
(VerifyAndDelete msg, cnt) -> (msg, cnt)
res <- checkNode (msg,cnt)
case res of
Nothing -> pickCont rest
Just cnt -> cnt
in do ns <-
mapM (\ (m,_) ->
case m of
Verify msg -> do node <- readIORef msg
return (Verify (L.verify node))
VerifyAndDelete msg -> do node <- readIORef msg
return (VerifyAndDelete (L.verify node)) )
checks
next <- atomically $
(do r <- reVerify ns
if r then do
setFalse ns
return (return (Just (do {accDeletes checks; body})))
else retry
)
`orElse`
return (pickCont checks)
next
getIndex act m = return ((hashMsg (hashOp act)) m)
initSearch act idx =
do let bag = (msgTable act) !! (idx 1)
t_id <- myThreadId
let no = threadIdToInt t_id
let bagEntryIdx = no `mod` distribution
it <- B.newIterator bag bagEntryIdx
return it
nextMsg act curIterator =
do res <- B.iterateBag curIterator
return res
extractMsg ptr =
do node <- readIORef ptr
case node of
L.Node {} -> return (Just (L.val node))
_ -> return Nothing
printMsg _ ptr = do
do m <- readIORef ptr
return (show (L.val m))
printReachMsg _ it = error "TODO"
threadIdToInt x = read (drop 9 (show x)) :: Int
newStore :: (EMatch msg, Eq msg, Show msg) =>
HashOp msg -> IO (Store msg)
newStore hash =
do let n = numberOfTables hash
mt <- mapM (\ _ -> B.newBag (distribution1)) [1..n]
return (Store {msgTable = mt,
hashOp = hash})
compileRulePattern :: (EMatch msg, Eq msg, Show msg) =>
[([MatchTask msg], Code_RHS ())] -> [CompClause (Store msg) (Location msg) ()]
compileRulePattern prog =
let build [] = []
build ((tasks,body):rest) =
(compileCnt body tasks)
++ (build rest)
in build prog
addMsg :: (EMatch msg, Eq msg, Show msg) =>
(Store msg) -> msg -> IO (Location msg)
addMsg (Store {msgTable = mt,
hashOp = hash})
msg =
do new_tag <- newTag
let new_msg = InternalMsg {message = msg,
msg_tag = new_tag}
let hashIdx = (hashMsg hash) msg
let bag = mt !! (hashIdx 1)
t_id <- myThreadId
let no = threadIdToInt t_id
let bagEntryIdx = no `mod` distribution
loc <- B.addToBag bag bagEntryIdx new_msg
return loc
executeRules :: (EMatch msg, Eq msg, Show msg) =>
Store msg -> Location msg -> [CompClause (Store msg) (Location msg) ()] -> IO (Maybe (Code_RHS ()))
executeRules store active_msg prog =
select store active_msg prog