{-# OPTIONS -fglasgow-exts -XUndecidableInstances #-} ------------------------------------------------- -- A Transactional data cache with configurable persitence -- (Something like a little Java Hybernate or Rails for Rubi) -- Author: Alberto G�mez Corona Nov 2006 -- Language: Haskell -- Terms of use: See LICENSE -- 2008: -- some bugs fixed -- 10/15/2007 : changes -- Default writeResource and delResource for persistence in files -- (only keyResource must be defined by the user if use defaults) -- Coherent Inserts and deletes -- Reduced the number of accesses to the hashtable -- hashtable access put outside of the transaction block (takeBlocks) -- faster re-executions in case of roll-back ------------------------------------------------ module Data.TCache ( IResource(..) -- class interface to be implemented for the object by the user ,Resources(..) -- data definition used to communicate object Inserts and Deletes to the cache ,resources -- empty resources ,getTVars -- :: (IResource a)=> [a] -- the list of resources to be retrieved -- -> IO [Maybe (TVar a)] -- The Transactional variables ,releaseTVars ,getTVarsIO -- :: (IResource a)=> [a] -> IO [TVar a] ,withSTMResources -- :: (IResource a)=> [a] -- list of resources to retrieve -- -> ([Maybe a]-> Resources a x) -- the function to apply that contains a Resources structure -- -> STM x -- return value within the STM monad ,withResources -- :: (IResource a)=> [a] --list of resources to be retrieve -- -> ([Maybe a]-> [a]) ----function that get the retrieved resources -- -> IO () --and return a list of objects to be inserted/modified ,withResource -- :: (IResource a)=> a --same as withResources , but for one only object -- -> ([Maybe a]-> a) -- -- -> IO () -- ,getResources -- :: (IResource a)=>[a] --resources [a] are read from cache and returned -- -> IO [Maybe a] ,getResource -- :: :: (IResource a)=>a --to retrieve one object instead of a list -- -> IO [Maybe a] ,deleteResources -- :: (IResource a)=>[a]-> IO() -- delete the list of resources from cache and from persistent storage ,deleteResource -- :: (IResource a)=>a-> IO() -- delete the resource from cache and from persistent storage --cache handling ,Cache -- :: IORef (Ht a,Int, Integer) --The cache definition ,setCache -- :: Cache a -> IO() -- set the cache. this is useful for hot loaded modules that will use an existing cache ,newCache -- :: (Ht a, Integer) --newCache creates a new cache ,refcache -- :: Cache a --the reference to the cache (see data definition below) ,syncCache -- :: (IResource a) =>Cache a -> IO() --force the atomic write of all the cache objects into permanent storage --useful for termination --start the thread that clean and writes on the persistent storage trough syncCache ,clearSyncCacheProc -- :: (IResource a) =>Cache a --The cache reference -- -> Int --number of seconds betwen checks -- -> (Integer-> Integer-> Bool) --The user-defined check-for-cleanup-from-cache for each object --(when True, the object is removed from cache) -- -> Int --The max number of objects in the cache, if more, the cleanup start -- -> >IO ThreadId --Identifier of the thread created -- the default check procedure ,defaultCheck -- :: Integer -- current time in seconds -- -> Integer --last access time for a given object -- -> Integer --last cache syncronization (with the persisten storage) -- -> Bool --return true for all the elems not accesed since --half the time between now and the last sync -- auxiliary ,readFileStrict -- :: String -> IO String -- Strict file read, needed for the default file persistence ) where import GHC.Conc import Control.Concurrent.STM.TMVar import Control.Monad(when) import Data.HashTable as H import Data.IORef import System.IO.Unsafe import System.Time import Data.Maybe(catMaybes,mapMaybe) import Data.TCache.IResource import Control.Exception(handle,assert) type Block a= (TVar a,AccessTime,ModifTime) type Ht a= HashTable String (Block a) -- contains the hastable, number of items, last sync time type Cache a= IORef (Ht a, Integer) data CheckBlockFlags= AddToHash | NoAddToHash | MaxTime -- |set the cache. this is useful for hot loaded modules that will update an existing cache. Experimental setCache :: (Ht a, Integer) -> IO() setCache = writeIORef refcache -- the cache holder. stablished by default refcache :: Cache a refcache =unsafePerformIO $ newCache >>= newIORef -- | newCache creates a new cache. Experimental newCache :: IO (Ht a, Integer) newCache =do c <- H.new (==) hashString return (c,0) -- | getTVars return the TVar that wraps the resources for which the keys are given . -- | it return @Nothing@ if a TVar with this object has not been allocated -- These TVars can be used as usual in explicit user constructed atomic blocks -- Additionally, the retrieved TVars remain in the cache and can be accessed and updated by the rest -- of the TCache methods. -- to keep the consistence in the serialized data, the content of the TVars are written every time the cache is syncronized with the storage until releaseTVars is called getTVars :: (IResource a) => [a] -- ^ the list of partial object definitions for which keyResource can be extracted -> STM [Maybe (TVar a)] -- ^ The TVars that contain such objects getTVars rs= do (cache,_) <- unsafeIOToSTM $ readIORef refcache takeBlocks rs cache MaxTime -- | releaseTVars permits the TVars captured by getTVars to be released. so they can be discarded when not used. -- Do this when you no longer need to use them directly in atomic blocks. releaseTVars :: (IResource a)=> [a]-> STM () releaseTVars rs=do (cache,_) <- unsafeIOToSTM $ readIORef refcache releaseBlocks rs cache -- | getTVarsIO does not search for a TVar in the cache like getTVars. Instead of this getTVarsIO creates a list of -- TVars with the content given in the list of resourcees and add these TVars to the cache and return them. -- the content of the TVars are written every time the cache is syncronized with the storage until releaseTVars is called getTVarsIO :: (IResource a)=> [a] -> IO [TVar a] getTVarsIO rs= do tvs<- mapM newTVarIO rs (cache,_) <- readIORef refcache mapM_ (\(tv,r)-> H.update cache (keyResource r) (tv, infinite, infinite)) $ zip tvs rs return tvs -- | this is the main function for the *Resource calls. All the rest derive from it. The results are kept in the STM monad -- so it can be part of a larger STM transaction involving other TVars -- The 'Resources' register returned by the user-defined function is interpreted as such: -- -- 'toAdd': additional resources not read in the first parameter of withSTMResources are created/updated with toAdd -- -- 'toDelete': from the cache and from permanent storage -- -- 'toReturn': will be returned by withSTMResources withSTMResources :: (IResource a)=> [a] -- ^ the list of resources to be retrieved -> ([Maybe a]-> Resources a x) -- ^ The function that process the resources found and return a Resources structure -> STM x -- ^ The return value in the STM monad. withSTMResources rs f= do (cache,_) <- unsafeIOToSTM $ readIORef refcache mtrs <- takeBlocks rs cache AddToHash mrs <- mapM mreadTVar mtrs case f mrs of Retry -> retry Resources as ds r -> do unsafeIOToSTM $ do delListFromHash cache $ map keyResource ds mapM delResource ds releaseBlocks as cache return r where assert1= flip assert mreadTVar (Just tvar)= readTVar tvar >>= return . Just mreadTVar Nothing = return Nothing -- | update of a single object in the cache -- -- @withResource r f= withResources [r] (\[mr]-> [f mr])@ withResource:: (IResource a) => a -- ^ prototypes of the object to be retrieved for which keyResource can be derived -> (Maybe a-> a) -- ^ update function that return another full object -> IO () withResource r f= withResources [r] (\[mr]-> [f mr]) -- | to atomically add/modify many objects in the cache -- -- @ withResources rs f= atomically $ withSTMResources rs f1 >> return() where f1 mrs= let as= f mrs in Resources as [] ()@ withResources:: (IResource a)=> [a]-> ([Maybe a]-> [a])-> IO () withResources rs f= atomically $ withSTMResources rs f1 >> return() where f1 mrs= let as= f mrs in Resources as [] () -- | to read a resource from the cache. -- -- @getResource r= do{mr<- getResources [r];return $! head mr}@ getResource:: (IResource a)=>a-> IO (Maybe a) getResource r= do{mr<- getResources [r];return $! head mr} --- | to read a list of resources from the cache if they exist -- -- | @getResources rs= atomically $ withSTMResources rs f1 where f1 mrs= Resources [] [] mrs@ getResources:: (IResource a)=>[a]-> IO [Maybe a] getResources rs= atomically $ withSTMResources rs f1 where f1 mrs= Resources [] [] mrs -- | delete the resource from cache and from persistent storage. -- -- @ deleteResource r= deleteResources [r] @ deleteResource :: IResource a => a -> IO () deleteResource r= deleteResources [r] -- | delete the list of resources from cache and from persistent storage. -- -- @ deleteResources rs= atomically $ withSTMResources rs f1 where f1 mrs = Resources [] (catMaybes mrs) ()@ deleteResources :: IResource a => [a] -> IO () deleteResources rs= atomically $ withSTMResources rs f1 where f1 mrs = Resources [] (catMaybes mrs) () takeBlocks :: (IResource a)=> [a] -> Ht a -> CheckBlockFlags -> STM [Maybe (TVar a)] takeBlocks rs cache addToHash= mapM (checkBlock cache addToHash) rs where checkBlock :: IResource a => Ht a -> CheckBlockFlags -> a-> STM(Maybe (TVar a)) checkBlock cache flags r =do c <- unsafeIOToSTM $ H.lookup cache keyr case c of Nothing -> do mr <- unsafeIOToSTM $ readResource r -- `debug` ("read "++keyr++ " hash= "++ (show $ H.hashString keyr)) case mr of Nothing -> return Nothing Just r2 -> do tvr <- newTVar r2 case flags of NoAddToHash -> return $ Just tvr AddToHash -> do ti <- unsafeIOToSTM timeInteger unsafeIOToSTM $ H.update cache keyr (tvr, ti, 0) -- accesed, not modified return $ Just tvr MaxTime -> do unsafeIOToSTM $ H.update cache keyr (tvr, infinite, infinite) -- accesed, not modified return $ Just tvr Just(tvr,_,_) -> return $ Just tvr where keyr= keyResource r releaseBlocks :: (IResource a)=> [a] -> Ht a -> STM () releaseBlocks rs cache = mapM_ checkBlock rs where checkBlock r =do c <- unsafeIOToSTM $ H.lookup cache keyr case c of Nothing -> do tvr <- newTVar r ti <- unsafeIOToSTM timeInteger unsafeIOToSTM $ H.update cache keyr (tvr, ti, ti ) -- accesed and modified XXX Just(tvr,_,tm) -> do writeTVar tvr r ti <- unsafeIOToSTM timeInteger let t= max ti tm unsafeIOToSTM $ H.update cache keyr (tvr ,t,t) where keyr= keyResource r timeInteger= do TOD t _ <- getClockTime return t delListFromHash hash l=mapM_ (delete hash) l updateListToHash hash kv= mapM (update1 hash) kv where update1 h (k,v)= update h k v {-| Cache handling -} -- | Start the thread that clean and writes on the persistent storage. -- Otherwise, clearSyncCache must be invoked explicitly or no persistence will exist -- :: (IResource a) =>Cache a -- -> Int --number of seconds betwen checks -- -> (Integer-> Integer-> Bool) --The user-defined check-for-cleanup-from-cache for each object --(when this function return True, the object is removed from cache) -- -> Int --The max number of objects in the cache, if more, the cleanup start -- -> >IO ThreadId --Identifier of the thread created clearSyncCacheProc :: (IResource a) => Cache a -- ^ The cache reference ('refcache' usually) -> Int -- ^ number of seconds betwen checks. objects not written to disk are written -> (Integer -> Integer-> Integer-> Bool) -- ^ The user-defined check-for-cleanup-from-cache for each object. 'defaultCheck' is an example -> Int -- ^ The max number of objects in the cache, if more, the cleanup starts -> IO ThreadId -- ^ Identifier of the thread created clearSyncCacheProc refcache time check sizeObjects= forkIO clear where clear = do threadDelay (fromIntegral$ time * 1000000) clearSyncCache refcache time check sizeObjects clear saving= unsafePerformIO $ newTVarIO False -- | Force the atomic write of all the cached objects into permanent storage -- useful for termination syncCache :: (IResource a) => Cache a -- ^ the cache reference ( 'refcache' usually) -> IO () syncCache refcache = do atomically $ do s <- readTVar saving when s retry writeTVar saving True (cache,t1) <- readIORef refcache list <- toList cache t2<- timeInteger atomically $ save list t1 writeIORef refcache (cache, t2) --print $ "write to persistent storage finised: "++ show (length list)++ " objects" -- Saves the unsaved elems of the cache -- delete some elems of the cache when the number of elems > sizeObjects -- The deletion depends on the check criteria. defaultCheck is the one implemented clearSyncCache ::(IResource a) => Cache a-> Int -> (Integer -> Integer-> Integer-> Bool)-> Int -> IO () clearSyncCache refcache time check sizeObjects=do atomically $ do s <- readTVar saving when s retry writeTVar saving True (cache,lastSync) <- readIORef refcache handle (\e-> do{print e;return ()})$ do elems <- toList cache let size=length elems atomically $ save elems lastSync t<- timeInteger when (size > sizeObjects) (filtercache t cache lastSync elems) writeIORef refcache (cache, t) where -- delete elems from the cache according with the check criteria filtercache t cache lastSync elems= mapM_ filter elems where check1 (_,lastAccess,_)=check t lastAccess lastSync filter ::(String,Block a)-> IO Int filter (k,e)= if check1 e then do{H.delete cache k;return 1} else return 0 -- | To drop from the cache all the elems not accesed since half the time between now and the last sync -- ths is a default cache clearance procedure -- it is invoke when the cache size exceeds the defined in 'clearSyncCacheProc' defaultCheck :: Integer -- ^ current time in seconds -> Integer -- ^ last access time for a given object -> Integer -- ^ last cache syncronization (with the persisten storage) -> Bool -- ^ return true for all the elems not accesed since half the time between now and the last sync defaultCheck now lastAccess lastSync | lastAccess > halftime = False | otherwise = True where halftime= now- (now-lastSync) `div` 2 save:: (IResource a) => [(String, Block a)]-> Integer-> STM () save list lastSave= do mapM_ save1 list -- `debug` ("saving "++ (show $ length list)) writeTVar saving False where save1 :: IResource a =>(String, Block a) -> STM() save1(_, (tvr,_,modTime))= when (modTime >= lastSave) $ do -- `debug` ("modTime="++show modTime++"lastSave="++show lastSave) r<- readTVar tvr unsafeIOToSTM $! writeResource r -- `debug` ("saved " ++ keyResource r)