module Database.Haskey.Alloc.Concurrent.Database where
import Control.Applicative ((<$>))
import Control.Concurrent.STM
import Control.Monad (void, unless)
import Control.Monad.IO.Class
import Control.Monad.Catch (MonadCatch, MonadMask, SomeException,
catch, mask, onException, bracket, bracket_)
import Control.Monad.State
import Control.Monad.Trans (lift)
import Data.Proxy (Proxy(..))
import Data.List.NonEmpty (NonEmpty((:|)))
import Data.Maybe (fromMaybe)
import STMContainers.Map (Map)
import qualified STMContainers.Map as Map
import Data.BTree.Alloc.Class
import Data.BTree.Impure
import Data.BTree.Primitives
import Database.Haskey.Alloc.Concurrent.Environment
import Database.Haskey.Alloc.Concurrent.FreePages.Save
import Database.Haskey.Alloc.Concurrent.Meta
import Database.Haskey.Alloc.Concurrent.Monad
import Database.Haskey.Alloc.Concurrent.Overflow
import Database.Haskey.Alloc.Transaction
import Database.Haskey.Store
import Database.Haskey.Utils.RLock
import qualified Database.Haskey.Utils.STM.Map as Map
data ConcurrentDb root = ConcurrentDb
{ concurrentDbHandles :: ConcurrentHandles
, concurrentDbWriterLock :: RLock
, concurrentDbCurrentMeta :: TVar CurrentMetaPage
, concurrentDbMeta1 :: TVar (ConcurrentMeta root)
, concurrentDbMeta2 :: TVar (ConcurrentMeta root)
, concurrentDbReaders :: Map TxId Integer
}
lockConcurrentDb :: ConcurrentMetaStoreM m => ConcurrentHandles -> m ()
lockConcurrentDb = lockHandle . concurrentHandlesRoot
unlockConcurrentDb :: ConcurrentMetaStoreM m => ConcurrentHandles -> m ()
unlockConcurrentDb = releaseHandle . concurrentHandlesRoot
openConcurrentHandles :: ConcurrentMetaStoreM m
=> ConcurrentHandles -> m ()
openConcurrentHandles ConcurrentHandles{..} = do
openHandle concurrentHandlesData
openHandle concurrentHandlesIndex
openHandle concurrentHandlesMetadata1
openHandle concurrentHandlesMetadata2
createConcurrentDb :: (Root root, MonadIO m, MonadMask m, ConcurrentMetaStoreM m)
=> ConcurrentHandles
-> root
-> m (ConcurrentDb root)
createConcurrentDb hnds root =
bracket_ (openConcurrentHandles hnds)
(closeConcurrentHandles hnds) $ do
db <- newConcurrentDb hnds meta0
setCurrentMeta meta0 db
setCurrentMeta meta0 db
return db
where
meta0 = ConcurrentMeta {
concurrentMetaRevision = 0
, concurrentMetaDataNumPages = DataState 0
, concurrentMetaIndexNumPages = IndexState 0
, concurrentMetaRoot = root
, concurrentMetaDataFreeTree = DataState $ Tree zeroHeight Nothing
, concurrentMetaIndexFreeTree = IndexState $ Tree zeroHeight Nothing
, concurrentMetaOverflowTree = Tree zeroHeight Nothing
, concurrentMetaDataCachedFreePages = DataState []
, concurrentMetaIndexCachedFreePages = IndexState []
}
openConcurrentDb :: (Root root, MonadIO m, MonadMask m, ConcurrentMetaStoreM m)
=> ConcurrentHandles
-> m (Maybe (ConcurrentDb root))
openConcurrentDb hnds@ConcurrentHandles{..} =
bracket_ (openConcurrentHandles hnds)
(closeConcurrentHandles hnds) $ do
m1 <- readConcurrentMeta concurrentHandlesMetadata1 Proxy
m2 <- readConcurrentMeta concurrentHandlesMetadata2 Proxy
maybeDb <- case (m1, m2) of
(Nothing, Nothing) -> return Nothing
(Just m , Nothing) -> Just <$> newConcurrentDb hnds m
(Nothing, Just m ) -> Just <$> newConcurrentDb hnds m
(Just x , Just y ) -> if concurrentMetaRevision x > concurrentMetaRevision y
then Just <$> newConcurrentDb hnds x
else Just <$> newConcurrentDb hnds y
case maybeDb of
Nothing -> return Nothing
Just db -> do
meta <- liftIO . atomically $ getCurrentMeta db
cleanupAfterException hnds (concurrentMetaRevision meta + 1)
return (Just db)
closeConcurrentHandles :: (MonadIO m, ConcurrentMetaStoreM m)
=> ConcurrentHandles
-> m ()
closeConcurrentHandles ConcurrentHandles{..} = do
closeHandle concurrentHandlesData
closeHandle concurrentHandlesIndex
closeHandle concurrentHandlesMetadata1
closeHandle concurrentHandlesMetadata2
newConcurrentDb :: (Root root, MonadIO m)
=> ConcurrentHandles
-> ConcurrentMeta root
-> m (ConcurrentDb root)
newConcurrentDb hnds meta0 = do
readers <- liftIO Map.newIO
meta <- liftIO $ newTVarIO Meta1
lock <- liftIO newRLock
meta1 <- liftIO $ newTVarIO meta0
meta2 <- liftIO $ newTVarIO meta0
return $! ConcurrentDb
{ concurrentDbHandles = hnds
, concurrentDbWriterLock = lock
, concurrentDbCurrentMeta = meta
, concurrentDbMeta1 = meta1
, concurrentDbMeta2 = meta2
, concurrentDbReaders = readers
}
getCurrentMeta :: Root root
=> ConcurrentDb root
-> STM (ConcurrentMeta root)
getCurrentMeta db
| ConcurrentDb { concurrentDbCurrentMeta = v } <- db
= readTVar v >>= \case
Meta1 -> readTVar $ concurrentDbMeta1 db
Meta2 -> readTVar $ concurrentDbMeta2 db
setCurrentMeta :: (Root root, MonadIO m, ConcurrentMetaStoreM m)
=> ConcurrentMeta root
-> ConcurrentDb root
-> m ()
setCurrentMeta new db
| ConcurrentDb
{ concurrentDbCurrentMeta = v
, concurrentDbHandles = hnds
} <- db
= liftIO (atomically $ readTVar v) >>= \case
Meta1 -> do
flushHandle (concurrentHandlesData hnds)
flushHandle (concurrentHandlesIndex hnds)
putConcurrentMeta (concurrentHandlesMetadata2 hnds) new
flushHandle (concurrentHandlesMetadata2 hnds)
liftIO . atomically $ do
writeTVar v Meta2
writeTVar (concurrentDbMeta2 db) new
Meta2 -> do
flushHandle (concurrentHandlesData hnds)
flushHandle (concurrentHandlesIndex hnds)
putConcurrentMeta (concurrentHandlesMetadata1 hnds) new
flushHandle (concurrentHandlesMetadata1 hnds)
liftIO . atomically $ do
writeTVar v Meta1
writeTVar (concurrentDbMeta1 db) new
transact :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m, Root root)
=> (forall n. (AllocM n, MonadMask n) => root -> n (Transaction root a))
-> ConcurrentDb root
-> m a
transact act db = withRLock (concurrentDbWriterLock db) $ do
cleanup
transactNow act db
where
cleanup :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m) => m ()
cleanup = actAndCommit db $ \meta -> do
v <- deleteOutdatedOverflowIds (concurrentMetaOverflowTree meta)
case v of
Nothing -> return (Nothing, ())
Just tree -> do
let meta' = meta { concurrentMetaOverflowTree = tree }
return (Just meta', ())
transactNow :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m, Root root)
=> (forall n. (AllocM n, MonadMask n) => root -> n (Transaction root a))
-> ConcurrentDb root
-> m a
transactNow act db = withRLock (concurrentDbWriterLock db) $
actAndCommit db $ \meta -> do
tx <- act (concurrentMetaRoot meta)
case tx of
Abort v -> return (Nothing, v)
Commit root v ->
let meta' = meta { concurrentMetaRoot = root } in
return (Just meta', v)
transact_ :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m, Root root)
=> (forall n. (AllocM n, MonadMask n) => root -> n (Transaction root ()))
-> ConcurrentDb root
-> m ()
transact_ act db = void $ transact act db
transactReadOnly :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m, Root root)
=> (forall n. (AllocReaderM n, MonadMask n) => root -> n a)
-> ConcurrentDb root
-> m a
transactReadOnly act db =
bracket_ (openConcurrentHandles hnds)
(closeConcurrentHandles hnds) $
bracket acquireMeta
releaseMeta $
\meta -> evalConcurrentT (act $ concurrentMetaRoot meta)
(ReaderEnv hnds)
where
hnds = concurrentDbHandles db
readers = concurrentDbReaders db
addOne Nothing = Just 1
addOne (Just x) = Just $! x + 1
subOne Nothing = Nothing
subOne (Just 0) = Nothing
subOne (Just 1) = Nothing
subOne (Just x) = Just $! x 1
acquireMeta = liftIO . atomically $ do
meta <- getCurrentMeta db
Map.alter (concurrentMetaRevision meta) addOne readers
return meta
releaseMeta meta =
let rev = concurrentMetaRevision meta in
liftIO . atomically $ Map.alter rev subOne readers
actAndCommit :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m, Root root)
=> ConcurrentDb root
-> (forall n. (MonadIO n, MonadMask n, ConcurrentMetaStoreM n)
=> ConcurrentMeta root
-> ConcurrentT WriterEnv ConcurrentHandles n (Maybe (ConcurrentMeta root), a)
)
-> m a
actAndCommit db act
| ConcurrentDb
{ concurrentDbHandles = hnds
, concurrentDbWriterLock = lock
, concurrentDbReaders = readers
} <- db
= withRLock lock $
bracket_ (openConcurrentHandles hnds)
(closeConcurrentHandles hnds) $ do
meta <- liftIO . atomically $ getCurrentMeta db
let newRevision = concurrentMetaRevision meta + 1
wrap hnds newRevision $ do
((maybeMeta, v), env) <- runConcurrentT (act meta) $
newWriter hnds
newRevision
readers
(concurrentMetaDataNumPages meta)
(concurrentMetaIndexNumPages meta)
(concurrentMetaDataCachedFreePages meta)
(concurrentMetaIndexCachedFreePages meta)
(concurrentMetaDataFreeTree meta)
(concurrentMetaIndexFreeTree meta)
let maybeMeta' = updateMeta env <$> maybeMeta
case maybeMeta' of
Nothing -> do
removeNewlyAllocatedOverflows env
return v
Just meta' -> do
(newMeta, _) <- flip execStateT (meta', env) $ do
saveOverflowIds
saveFreePages' 0 DataState
writerDataFileState
(\e s -> e { writerDataFileState = s })
saveFreePages' 0 IndexState
writerIndexFileState
(\e s -> e { writerIndexFileState = s })
handleCachedFreePages
setCurrentMeta (newMeta { concurrentMetaRevision = newRevision })
db
return v
where
wrap :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m)
=> ConcurrentHandles
-> TxId
-> m a
-> m a
wrap hnds tx action = mask $ \restore ->
restore action `onException` cleanupAfterException hnds tx
cleanupAfterException :: (MonadIO m, MonadCatch m, ConcurrentMetaStoreM m)
=> ConcurrentHandles
-> TxId
-> m ()
cleanupAfterException hnds tx = do
let dir = getOverflowDir (concurrentHandlesOverflowDir hnds) tx
overflows <- filter filter' <$> listOverflows dir
mapM_ (\fp -> removeHandle fp `catch` ignore) overflows
where
filter' fp = fromMaybe False $ (== tx) . fst <$> readOverflowId fp
ignore :: Monad m => SomeException -> m ()
ignore _ = return ()
removeNewlyAllocatedOverflows :: (MonadIO m, ConcurrentMetaStoreM m)
=> WriterEnv ConcurrentHandles
-> m ()
removeNewlyAllocatedOverflows env = do
let root = concurrentHandlesOverflowDir (writerHnds env)
sequence_ [ delete root (i 1) | i <- [1..(writerOverflowCounter env)] ]
where
delete root c = do
let i = (writerTxId env, c)
removeHandle (getOverflowHandle root i)
updateMeta :: WriterEnv ConcurrentHandles -> ConcurrentMeta root -> ConcurrentMeta root
updateMeta env m = m {
concurrentMetaDataFreeTree = fileStateFreeTree (writerDataFileState env)
, concurrentMetaIndexFreeTree = fileStateFreeTree (writerIndexFileState env) }
saveOverflowIds :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m)
=> StateT (ConcurrentMeta root, WriterEnv ConcurrentHandles) m ()
saveOverflowIds = do
(meta, env) <- get
case map (\(OldOverflow i) ->i) (writerRemovedOverflows env) of
[] -> return ()
x:xs -> do
(tree', env') <- lift $ flip runConcurrentT env $
insertOverflowIds (writerTxId env)
(x :| xs)
(concurrentMetaOverflowTree meta)
let meta' = (updateMeta env meta)
{ concurrentMetaOverflowTree = tree' }
put (meta', env')
saveFreePages' :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m)
=> Int
-> (forall a. a -> S t a)
-> (forall hnds. WriterEnv hnds -> FileState t)
-> (forall hnds. WriterEnv hnds -> FileState t -> WriterEnv hnds)
-> StateT (ConcurrentMeta root, WriterEnv ConcurrentHandles) m ()
saveFreePages' paranoid cons getState setState
= do
(meta, env) <- get
let tx = writerTxId env
(tree', envWithoutTree) <- lift $
runConcurrentT (saveFreePages tx (getState env)) $
env { writerQueryFreeTreeOn = False }
let state' = (getState envWithoutTree) { fileStateFreeTree = cons tree' }
let env' = setState envWithoutTree state'
let meta' = updateMeta env' meta
put (meta', env')
unless (fileStateNewlyFreedPages state' == fileStateNewlyFreedPages (getState env)) $
saveFreePages' (paranoid + 1) cons getState setState
handleCachedFreePages :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m)
=> StateT (ConcurrentMeta root, WriterEnv ConcurrentHandles) m ()
handleCachedFreePages = do
(meta, env) <- get
let dataEnv = writerDataFileState env
let indexEnv = writerIndexFileState env
let meta' = meta { concurrentMetaDataNumPages =
fileStateNewNumPages dataEnv
, concurrentMetaDataFreeTree =
fileStateFreeTree dataEnv
, concurrentMetaDataCachedFreePages =
fileStateCachedFreePages dataEnv
, concurrentMetaIndexNumPages =
fileStateNewNumPages indexEnv
, concurrentMetaIndexFreeTree =
fileStateFreeTree indexEnv
, concurrentMetaIndexCachedFreePages =
fileStateCachedFreePages indexEnv
}
put (meta', env)