{-# LANGUAGE DeriveDataTypeable, BangPatterns, CPP #-}
module Data.Acid.Local
( openLocalState
, openLocalStateFrom
, openLocalStateWithSerialiser
, prepareLocalState
, prepareLocalStateFrom
, prepareLocalStateWithSerialiser
, defaultStateDirectory
, scheduleLocalUpdate'
, scheduleLocalColdUpdate'
, createCheckpointAndClose
, LocalState(..)
, Checkpoint(..)
, SerialisationLayer(..)
, defaultSerialisationLayer
, mkEventsLogKey
, mkCheckpointsLogKey
) where
import Data.Acid.Archive
import Data.Acid.Log as Log
import Data.Acid.Core
import Data.Acid.Common
import Data.Acid.Abstract
import Control.Concurrent ( newEmptyMVar, putMVar, takeMVar, MVar )
import Control.Exception ( onException, evaluate, Exception, throwIO )
import Control.Monad.State ( runState )
import Control.Monad ( join )
import Control.Applicative ( (<$>), (<*>) )
import Data.ByteString.Lazy ( ByteString )
import qualified Data.ByteString.Lazy as Lazy ( length )
import Data.Serialize ( runPutLazy, runGetLazy )
import Data.SafeCopy ( SafeCopy(..), safeGet, safePut
, primitive, contain )
import Data.Typeable ( Typeable, typeOf )
import Data.IORef
import System.FilePath ( (</>), takeDirectory )
import System.FileLock
import System.Directory ( createDirectoryIfMissing )
data LocalState st
= LocalState { localCore :: Core st
, localCopy :: IORef st
, localEvents :: FileLog (Tagged ByteString)
, localCheckpoints :: FileLog (Checkpoint st)
, localLock :: FileLock
} deriving (Typeable)
newtype StateIsLocked = StateIsLocked FilePath deriving (Show, Typeable)
instance Exception StateIsLocked
scheduleLocalUpdate :: UpdateEvent event => LocalState (EventState event) -> event -> IO (MVar (EventResult event))
scheduleLocalUpdate acidState event
= do mvar <- newEmptyMVar
let encoded = encodeMethod ms event
evaluate (Lazy.length encoded)
modifyCoreState_ (localCore acidState) $ \st ->
do let !(result, !st') = runState hotMethod st
pushEntry (localEvents acidState) (methodTag event, encoded) $ do writeIORef (localCopy acidState) st'
putMVar mvar result
return st'
return mvar
where (hotMethod, ms) = lookupHotMethodAndSerialiser (coreMethods (localCore acidState)) event
scheduleLocalUpdate' :: UpdateEvent event => LocalState (EventState event) -> event -> MVar (EventResult event) -> IO (IO ())
scheduleLocalUpdate' acidState event mvar
= do
let encoded = encodeMethod ms event
evaluate (Lazy.length encoded)
act <- modifyCoreState (localCore acidState) $ \st ->
do let !(result, !st') = runState hotMethod st
pushEntry (localEvents acidState) (methodTag event, encoded) $ return ()
let action = do writeIORef (localCopy acidState) st'
putMVar mvar result
return (st', action)
return act
where (hotMethod, ms) = lookupHotMethodAndSerialiser (coreMethods (localCore acidState)) event
scheduleLocalColdUpdate :: LocalState st -> Tagged ByteString -> IO (MVar ByteString)
scheduleLocalColdUpdate acidState event
= do mvar <- newEmptyMVar
modifyCoreState_ (localCore acidState) $ \st ->
do let !(result, !st') = runState coldMethod st
pushEntry (localEvents acidState) event $ do writeIORef (localCopy acidState) st'
putMVar mvar result
return st'
return mvar
where coldMethod = lookupColdMethod (localCore acidState) event
scheduleLocalColdUpdate' :: LocalState st -> Tagged ByteString -> MVar ByteString -> IO (IO ())
scheduleLocalColdUpdate' acidState event mvar
= do act <- modifyCoreState (localCore acidState) $ \st ->
do let !(result, !st') = runState coldMethod st
pushEntry (localEvents acidState) event $ return ()
let action = do writeIORef (localCopy acidState) st'
putMVar mvar result
return (st', action)
return act
where coldMethod = lookupColdMethod (localCore acidState) event
localQuery :: QueryEvent event => LocalState (EventState event) -> event -> IO (EventResult event)
localQuery acidState event
= do st <- readIORef (localCopy acidState)
let (result, _st) = runState hotMethod st
return result
where hotMethod = lookupHotMethod (coreMethods (localCore acidState)) event
localQueryCold :: LocalState st -> Tagged ByteString -> IO ByteString
localQueryCold acidState event
= do st <- readIORef (localCopy acidState)
let (result, _st) = runState coldMethod st
return result
where coldMethod = lookupColdMethod (localCore acidState) event
createLocalCheckpoint :: IsAcidic st => LocalState st -> IO ()
createLocalCheckpoint acidState
= do cutFileLog (localEvents acidState)
mvar <- newEmptyMVar
withCoreState (localCore acidState) $ \st ->
do eventId <- askCurrentEntryId (localEvents acidState)
pushAction (localEvents acidState) $
pushEntry (localCheckpoints acidState) (Checkpoint eventId st) (putMVar mvar ())
takeMVar mvar
createCheckpointAndClose :: (IsAcidic st, Typeable st) => AcidState st -> IO ()
createCheckpointAndClose abstract_state
= do mvar <- newEmptyMVar
closeCore' (localCore acidState) $ \st ->
do eventId <- askCurrentEntryId (localEvents acidState)
pushAction (localEvents acidState) $
pushEntry (localCheckpoints acidState) (Checkpoint eventId st) (putMVar mvar ())
takeMVar mvar
closeFileLog (localEvents acidState)
closeFileLog (localCheckpoints acidState)
unlockFile (localLock acidState)
where acidState = downcast abstract_state
data Checkpoint s = Checkpoint EntryId s
instance SafeCopy s => SafeCopy (Checkpoint s) where
kind = primitive
putCopy (Checkpoint eventEntryId content)
= contain $
do safePut eventEntryId
safePut (runPutLazy (safePut content))
getCopy = contain $ Checkpoint <$> safeGet <*> (fromNested <$> safeGet)
where
fromNested b = case runGetLazy safeGet b of
Left msg -> checkpointRestoreError msg
Right v -> v
errorTypeName s = "Checkpoint " ++ errorTypeName s
openLocalState :: (Typeable st, IsAcidic st, SafeCopy st)
=> st
-> IO (AcidState st)
openLocalState initialState =
openLocalStateFrom (defaultStateDirectory initialState) initialState
prepareLocalState :: (Typeable st, IsAcidic st, SafeCopy st)
=> st
-> IO (IO (AcidState st))
prepareLocalState initialState =
prepareLocalStateFrom (defaultStateDirectory initialState) initialState
defaultStateDirectory :: Typeable st => st -> FilePath
defaultStateDirectory initialState = "state" </> show (typeOf initialState)
openLocalStateFrom :: (IsAcidic st, SafeCopy st)
=> FilePath
-> st
-> IO (AcidState st)
openLocalStateFrom directory initialState =
openLocalStateWithSerialiser directory initialState defaultSerialisationLayer
openLocalStateWithSerialiser :: (IsAcidic st)
=> FilePath
-> st
-> SerialisationLayer st
-> IO (AcidState st)
openLocalStateWithSerialiser directory initialState serialisationLayer =
join $ resumeLocalStateFrom directory initialState False serialisationLayer
prepareLocalStateFrom :: (IsAcidic st, SafeCopy st)
=> FilePath
-> st
-> IO (IO (AcidState st))
prepareLocalStateFrom directory initialState =
prepareLocalStateWithSerialiser directory initialState defaultSerialisationLayer
prepareLocalStateWithSerialiser :: (IsAcidic st)
=> FilePath
-> st
-> SerialisationLayer st
-> IO (IO (AcidState st))
prepareLocalStateWithSerialiser directory initialState serialisationLayer =
resumeLocalStateFrom directory initialState True serialisationLayer
data SerialisationLayer st =
SerialisationLayer
{ checkpointSerialiser :: Serialiser (Checkpoint st)
, eventSerialiser :: Serialiser (Tagged ByteString)
, archiver :: Archiver
}
defaultSerialisationLayer :: SafeCopy st => SerialisationLayer st
defaultSerialisationLayer = SerialisationLayer safeCopySerialiser safeCopySerialiser defaultArchiver
mkEventsLogKey :: FilePath -> SerialisationLayer object -> LogKey (Tagged ByteString)
mkEventsLogKey directory serialisationLayer =
LogKey { logDirectory = directory
, logPrefix = "events"
, logSerialiser = eventSerialiser serialisationLayer
, logArchiver = archiver serialisationLayer }
mkCheckpointsLogKey :: FilePath -> SerialisationLayer object -> LogKey (Checkpoint object)
mkCheckpointsLogKey directory serialisationLayer =
LogKey { logDirectory = directory
, logPrefix = "checkpoints"
, logSerialiser = checkpointSerialiser serialisationLayer
, logArchiver = archiver serialisationLayer }
resumeLocalStateFrom :: (IsAcidic st)
=> FilePath
-> st
-> Bool
-> SerialisationLayer st
-> IO (IO (AcidState st))
resumeLocalStateFrom directory initialState delayLocking serialisationLayer =
case delayLocking of
True -> do
(n, st) <- loadCheckpoint
return $ do
lock <- maybeLockFile lockFile
replayEvents lock n st
False -> do
lock <- maybeLockFile lockFile
(n, st) <- loadCheckpoint `onException` unlockFile lock
return $ do
replayEvents lock n st
where
lockFile = directory </> "open.lock"
eventsLogKey = mkEventsLogKey directory serialisationLayer
checkpointsLogKey = mkCheckpointsLogKey directory serialisationLayer
loadCheckpoint = do
mbLastCheckpoint <- Log.newestEntry checkpointsLogKey
case mbLastCheckpoint of
Nothing ->
return (0, initialState)
Just (Checkpoint eventCutOff !val) ->
return (eventCutOff, val)
replayEvents lock n st = do
core <- mkCore (eventsToMethods acidEvents) st
eventsLog <- openFileLog eventsLogKey
events <- readEntriesFrom eventsLog n
mapM_ (runColdMethod core) events
ensureLeastEntryId eventsLog n
checkpointsLog <- openFileLog checkpointsLogKey
stateCopy <- newIORef undefined
withCoreState core (writeIORef stateCopy)
return $ toAcidState LocalState { localCore = core
, localCopy = stateCopy
, localEvents = eventsLog
, localCheckpoints = checkpointsLog
, localLock = lock
}
maybeLockFile path = do
createDirectoryIfMissing True (takeDirectory path)
maybe (throwIO (StateIsLocked path))
return =<< tryLockFile path Exclusive
checkpointRestoreError msg
= error $ "Could not parse saved checkpoint due to the following error: " ++ msg
closeLocalState :: LocalState st -> IO ()
closeLocalState acidState
= do closeCore (localCore acidState)
closeFileLog (localEvents acidState)
closeFileLog (localCheckpoints acidState)
unlockFile (localLock acidState)
createLocalArchive :: LocalState st -> IO ()
createLocalArchive state
= do
currentCheckpointId <- cutFileLog (localCheckpoints state)
let durableCheckpointId = currentCheckpointId-1
checkpoints <- readEntriesFrom (localCheckpoints state) durableCheckpointId
case checkpoints of
[] -> return ()
(Checkpoint entryId _content : _)
-> do
archiveFileLog (localEvents state) entryId
archiveFileLog (localCheckpoints state) durableCheckpointId
toAcidState :: IsAcidic st => LocalState st -> AcidState st
toAcidState local
= AcidState { _scheduleUpdate = scheduleLocalUpdate local
, scheduleColdUpdate = scheduleLocalColdUpdate local
, _query = localQuery local
, queryCold = localQueryCold local
, createCheckpoint = createLocalCheckpoint local
, createArchive = createLocalArchive local
, closeAcidState = closeLocalState local
, acidSubState = mkAnyState local
}