{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
-- | This module implements data structures and functions related to the database.
module Database.Haskey.Alloc.Concurrent.Internal.Database where

import Control.Applicative ((<$>))
import Control.Concurrent.STM
import Control.Monad (void, unless)
import Control.Monad.IO.Class
import Control.Monad.Catch (MonadCatch, MonadMask, SomeException,
                            catch, mask, onException, bracket, bracket_)
import Control.Monad.State
import Control.Monad.Trans (lift)

import Data.Proxy (Proxy(..))
import Data.List.NonEmpty (NonEmpty((:|)))
import Data.Maybe (fromMaybe)

import STMContainers.Map (Map)
import qualified STMContainers.Map as Map

import Data.BTree.Alloc.Class
import Data.BTree.Impure (Tree(..))
import Data.BTree.Primitives

import Database.Haskey.Alloc.Concurrent.Internal.Environment
import Database.Haskey.Alloc.Concurrent.Internal.FreePages.Save
import Database.Haskey.Alloc.Concurrent.Internal.Meta
import Database.Haskey.Alloc.Concurrent.Internal.Monad
import Database.Haskey.Alloc.Concurrent.Internal.Overflow
import Database.Haskey.Alloc.Transaction
import Database.Haskey.Store
import Database.Haskey.Utils.RLock
import qualified Database.Haskey.Utils.STM.Map as Map

-- | An active concurrent database.
--
-- This can be shared amongst threads.
data ConcurrentDb root = ConcurrentDb
    { concurrentDbHandles :: ConcurrentHandles
    , concurrentDbWriterLock :: RLock
    , concurrentDbCurrentMeta :: TVar CurrentMetaPage
    , concurrentDbMeta1 :: TVar (ConcurrentMeta root)
    , concurrentDbMeta2 :: TVar (ConcurrentMeta root)
    , concurrentDbReaders :: Map TxId Integer
    }

-- | Lock the database.
--
-- This needs to be called manually, if you want exclusive access, before
-- calling either 'createConcurrentDb' or 'openConcurrentDb'
--
-- Use 'unlockConcurrentDb' using the 'bracket' pattern to properly unlock the
-- database.
lockConcurrentDb :: ConcurrentMetaStoreM m => ConcurrentHandles -> m ()
lockConcurrentDb = lockHandle . concurrentHandlesRoot

-- | Unlock the database.
unlockConcurrentDb :: ConcurrentMetaStoreM m => ConcurrentHandles -> m ()
unlockConcurrentDb = releaseHandle . concurrentHandlesRoot

-- | Open all concurrent handles.
openConcurrentHandles :: ConcurrentMetaStoreM m
                      => ConcurrentHandles -> m ()
openConcurrentHandles ConcurrentHandles{..} = do
    openHandle concurrentHandlesData
    openHandle concurrentHandlesIndex
    openHandle concurrentHandlesMetadata1
    openHandle concurrentHandlesMetadata2

-- | Open a new concurrent database, with the given handles.
createConcurrentDb :: (Root root, MonadIO m, MonadMask m, ConcurrentMetaStoreM m)
                   => ConcurrentHandles
                   -> root
                   -> m (ConcurrentDb root)
createConcurrentDb hnds root =
    bracket_ (openConcurrentHandles hnds)
             (closeConcurrentHandles hnds) $ do

    db <- newConcurrentDb hnds meta0
    setCurrentMeta meta0 db
    setCurrentMeta meta0 db
    return db
  where
    meta0 = ConcurrentMeta {
        concurrentMetaRevision = 0
      , concurrentMetaDataNumPages = DataState 0
      , concurrentMetaIndexNumPages = IndexState 0
      , concurrentMetaRoot = root
      , concurrentMetaDataFreeTree = DataState $ Tree zeroHeight Nothing
      , concurrentMetaIndexFreeTree = IndexState $ Tree zeroHeight Nothing
      , concurrentMetaOverflowTree = Tree zeroHeight Nothing
      , concurrentMetaDataCachedFreePages = DataState []
      , concurrentMetaIndexCachedFreePages = IndexState []
      }

-- | Open the an existing database, with the given handles.
openConcurrentDb :: (Root root, MonadIO m, MonadMask m, ConcurrentMetaStoreM m)
                 => ConcurrentHandles
                 -> m (Maybe (ConcurrentDb root))
openConcurrentDb hnds@ConcurrentHandles{..} =
    bracket_ (openConcurrentHandles hnds)
             (closeConcurrentHandles hnds) $ do

    m1 <- readConcurrentMeta concurrentHandlesMetadata1 Proxy
    m2 <- readConcurrentMeta concurrentHandlesMetadata2 Proxy
    maybeDb <- case (m1, m2) of
        (Nothing, Nothing) -> return Nothing
        (Just m , Nothing) -> Just <$> newConcurrentDb hnds m
        (Nothing, Just m ) -> Just <$> newConcurrentDb hnds m
        (Just x , Just y ) -> if concurrentMetaRevision x > concurrentMetaRevision y
                                  then Just <$> newConcurrentDb hnds x
                                  else Just <$> newConcurrentDb hnds y
    case maybeDb of
        Nothing -> return Nothing
        Just db -> do
            meta <- liftIO . atomically $ getCurrentMeta db
            cleanupAfterException hnds (concurrentMetaRevision meta + 1)
            return (Just db)

-- | Close the handles of the database.
closeConcurrentHandles :: (MonadIO m, ConcurrentMetaStoreM m)
                       => ConcurrentHandles
                       -> m ()
closeConcurrentHandles ConcurrentHandles{..} = do
    closeHandle concurrentHandlesData
    closeHandle concurrentHandlesIndex
    closeHandle concurrentHandlesMetadata1
    closeHandle concurrentHandlesMetadata2

-- | Create a new concurrent database with handles and metadata provided.
newConcurrentDb :: (Root root, MonadIO m)
                => ConcurrentHandles
                -> ConcurrentMeta root
                -> m (ConcurrentDb root)
newConcurrentDb hnds meta0 = do
    readers <- liftIO Map.newIO
    meta    <- liftIO $ newTVarIO Meta1
    lock    <- liftIO   newRLock
    meta1   <- liftIO $ newTVarIO meta0
    meta2   <- liftIO $ newTVarIO meta0
    return $! ConcurrentDb
        { concurrentDbHandles = hnds
        , concurrentDbWriterLock = lock
        , concurrentDbCurrentMeta = meta
        , concurrentDbMeta1 = meta1
        , concurrentDbMeta2 = meta2
        , concurrentDbReaders = readers
        }

-- | Get the current meta data.
getCurrentMeta :: Root root
               => ConcurrentDb root
               -> STM (ConcurrentMeta root)
getCurrentMeta db
    | ConcurrentDb { concurrentDbCurrentMeta = v } <- db
    = readTVar v >>= \case
        Meta1 -> readTVar $ concurrentDbMeta1 db
        Meta2 -> readTVar $ concurrentDbMeta2 db

-- | Write the new metadata, and switch the pointer to the current one.
setCurrentMeta :: (Root root, MonadIO m, ConcurrentMetaStoreM m)
               => ConcurrentMeta root
               -> ConcurrentDb root
               -> m ()
setCurrentMeta new db
    | ConcurrentDb
      { concurrentDbCurrentMeta = v
      , concurrentDbHandles = hnds
      } <- db
    = liftIO (atomically $ readTVar v) >>= \case
        Meta1 -> do
            flushHandle (concurrentHandlesData hnds)
            flushHandle (concurrentHandlesIndex hnds)
            putConcurrentMeta (concurrentHandlesMetadata2 hnds) new
            flushHandle (concurrentHandlesMetadata2 hnds)
            liftIO . atomically $ do
                writeTVar v Meta2
                writeTVar (concurrentDbMeta2 db) new
        Meta2 -> do
            flushHandle (concurrentHandlesData hnds)
            flushHandle (concurrentHandlesIndex hnds)
            putConcurrentMeta (concurrentHandlesMetadata1 hnds) new
            flushHandle (concurrentHandlesMetadata1 hnds)
            liftIO . atomically $ do
                writeTVar v Meta1
                writeTVar (concurrentDbMeta1 db) new

-- | Execute a write transaction, with a result.
transact :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m, Root root)
         => (forall n. (AllocM n, MonadMask n) => root -> n (Transaction root a))
         -> ConcurrentDb root
         -> m a
transact act db = withRLock (concurrentDbWriterLock db) $ do
    cleanup
    transactNow act db
  where
    cleanup :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m) => m ()
    cleanup = actAndCommit db $ \meta -> do
        v <- deleteOutdatedOverflowIds (concurrentMetaOverflowTree meta)
        case v of
            Nothing -> return (Nothing, ())
            Just tree -> do
                let meta' = meta { concurrentMetaOverflowTree = tree }
                return (Just meta', ())

-- | Execute a write transaction, without cleaning up old overflow pages.
transactNow :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m, Root root)
            => (forall n. (AllocM n, MonadMask n) => root -> n (Transaction root a))
            -> ConcurrentDb root
            -> m a
transactNow act db = withRLock (concurrentDbWriterLock db) $
    actAndCommit db $ \meta -> do
        tx <- act (concurrentMetaRoot meta)
        case tx of
            Abort v -> return (Nothing, v)
            Commit root v ->
                let meta' = meta { concurrentMetaRoot = root } in
                return (Just meta', v)

-- | Execute a write transaction, without a result.
transact_ :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m, Root root)
          => (forall n. (AllocM n, MonadMask n) => root -> n (Transaction root ()))
          -> ConcurrentDb root
          -> m ()
transact_ act db = void $ transact act db

-- | Execute a read-only transaction.
transactReadOnly :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m, Root root)
                 => (forall n. (AllocReaderM n, MonadMask n) => root -> n a)
                 -> ConcurrentDb root
                 -> m a
transactReadOnly act db =
    bracket_ (openConcurrentHandles hnds)
             (closeConcurrentHandles hnds) $

    bracket acquireMeta
            releaseMeta $
            \meta -> evalConcurrentT (act $ concurrentMetaRoot meta)
                                     (ReaderEnv hnds)
  where
    hnds    = concurrentDbHandles db
    readers = concurrentDbReaders db

    addOne Nothing = Just 1
    addOne (Just x) = Just $! x + 1
    subOne Nothing = Nothing
    subOne (Just 0) = Nothing
    subOne (Just 1) = Nothing
    subOne (Just x) = Just $! x - 1

    acquireMeta = liftIO . atomically $ do
        meta <- getCurrentMeta db
        Map.alter (concurrentMetaRevision meta) addOne readers
        return meta

    releaseMeta meta =
        let rev = concurrentMetaRevision meta in
        liftIO . atomically $ Map.alter rev subOne readers

--------------------------------------------------------------------------------

-- | Run a write action that takes the current meta-data and returns new
-- meta-data to be commited, or 'Nothing' if the write transaction should be
-- aborted.
actAndCommit :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m, Root root)
             => ConcurrentDb root
             -> (forall n. (MonadIO n, MonadMask n, ConcurrentMetaStoreM n)
                 => ConcurrentMeta root
                 -> ConcurrentT WriterEnv ConcurrentHandles n (Maybe (ConcurrentMeta root), a)
                )
             -> m a
actAndCommit db act
    | ConcurrentDb
      { concurrentDbHandles = hnds
      , concurrentDbWriterLock = lock
      , concurrentDbReaders = readers
      } <- db
    = withRLock lock $
        bracket_ (openConcurrentHandles hnds)
                 (closeConcurrentHandles hnds) $ do

    meta <- liftIO . atomically $ getCurrentMeta db
    let newRevision = concurrentMetaRevision meta + 1
    wrap hnds newRevision $ do
        ((maybeMeta, v), env) <- runConcurrentT (act meta) $
                                   newWriter hnds
                                             newRevision
                                             readers
                                             (concurrentMetaDataNumPages meta)
                                             (concurrentMetaIndexNumPages meta)
                                             (concurrentMetaDataCachedFreePages meta)
                                             (concurrentMetaIndexCachedFreePages meta)
                                             (concurrentMetaDataFreeTree meta)
                                             (concurrentMetaIndexFreeTree meta)

        let maybeMeta' = updateMeta env <$> maybeMeta

        case maybeMeta' of
            Nothing -> do
                removeNewlyAllocatedOverflows env
                return v

            Just meta' -> do
                -- Bookkeeping
                (newMeta, _) <- flip execStateT (meta', env) $ do
                    saveOverflowIds
                    saveFreePages' 0 DataState
                                     writerDataFileState
                                     (\e s -> e { writerDataFileState = s })
                    saveFreePages' 0 IndexState
                                     writerIndexFileState
                                     (\e s -> e { writerIndexFileState = s })
                    handleCachedFreePages

                -- Commit
                setCurrentMeta (newMeta { concurrentMetaRevision = newRevision })
                               db
                return v
  where
    wrap :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m)
         => ConcurrentHandles
         -> TxId
         -> m a
         -> m a
    wrap hnds tx action = mask $ \restore ->
        restore action `onException` cleanupAfterException hnds tx

-- | Cleanup after an exception occurs, or after a program crash.
--
-- The 'TxId' of the aborted transaction should be passed.
cleanupAfterException :: (MonadIO m, MonadCatch m, ConcurrentMetaStoreM m)
                      => ConcurrentHandles
                      -> TxId
                      -> m ()
cleanupAfterException hnds tx = do
    let dir = getOverflowDir (concurrentHandlesOverflowDir hnds) tx
    overflows <- filter filter' <$> listOverflows dir
    mapM_ (\fp -> removeHandle fp `catch` ignore) overflows
  where
    filter' fp = fromMaybe False $ (== tx) . fst <$> readOverflowId fp

    ignore :: Monad m => SomeException -> m ()
    ignore _ = return ()

-- | Remove all overflow pages that were written in the transaction.
--
-- If the transaction is aborted, all written pages should be deleted.
removeNewlyAllocatedOverflows :: (MonadIO m, ConcurrentMetaStoreM m)
                              => WriterEnv ConcurrentHandles
                              -> m ()
removeNewlyAllocatedOverflows env = do
    let root = concurrentHandlesOverflowDir (writerHnds env)
    sequence_ [ delete root (i - 1) | i <- [1..(writerOverflowCounter env)] ]
  where
    delete root c = do
        let i = (writerTxId env, c)
        removeHandle (getOverflowHandle root i)

-- | Update the meta-data from a writer environment
updateMeta :: WriterEnv ConcurrentHandles -> ConcurrentMeta root -> ConcurrentMeta root
updateMeta env m = m {
    concurrentMetaDataFreeTree = fileStateFreeTree (writerDataFileState env)
  , concurrentMetaIndexFreeTree = fileStateFreeTree (writerIndexFileState env) }


-- | Save the newly free'd overflow pages, for deletion on the next tx.
saveOverflowIds :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m)
                => StateT (ConcurrentMeta root, WriterEnv ConcurrentHandles) m ()
saveOverflowIds = do
    (meta, env) <- get
    case map (\(OldOverflow i) ->i) (writerRemovedOverflows env) of
        [] -> return ()
        x:xs -> do
            (tree', env') <- lift $ flip runConcurrentT env $
                insertOverflowIds (writerTxId env)
                                  (x :| xs)
                                  (concurrentMetaOverflowTree meta)
            let meta' = (updateMeta env meta)
                            { concurrentMetaOverflowTree = tree' }
            put (meta', env')

-- | Save the free'd pages to the free page database
saveFreePages' :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m)
               => Int
               -> (forall a. a -> S t a)
               -> (forall hnds. WriterEnv hnds -> FileState t)
               -> (forall hnds. WriterEnv hnds -> FileState t -> WriterEnv hnds)
               -> StateT (ConcurrentMeta root, WriterEnv ConcurrentHandles) m ()
saveFreePages' paranoid cons getState setState
    {- paranoid >= 100 = error "paranoid: looping!"
    | otherwise-}
    = do

    (meta, env) <- get
    let tx = writerTxId env
    (tree', envWithoutTree) <- lift $
        runConcurrentT (saveFreePages tx (getState env)) $
            env { writerQueryFreeTreeOn = False }

    let state' = (getState envWithoutTree) { fileStateFreeTree = cons tree' }
    let env'   = setState envWithoutTree state'
    let meta'  = updateMeta env' meta
    put (meta', env')

    -- Did we free any new pages? We have to put them in the free tree!
    unless (fileStateNewlyFreedPages state' == fileStateNewlyFreedPages (getState env)) $
       saveFreePages' (paranoid + 1) cons getState setState

-- | Handle the cached free pages.
--
-- Save the cached free pages to the metadata for later use.
--
-- Update the database size.
handleCachedFreePages :: (MonadIO m, MonadMask m, ConcurrentMetaStoreM m)
                      => StateT (ConcurrentMeta root, WriterEnv ConcurrentHandles) m ()
handleCachedFreePages = do
    (meta, env) <- get

    let dataEnv  = writerDataFileState env
    let indexEnv = writerIndexFileState env

    let meta' = meta { concurrentMetaDataNumPages =
                            fileStateNewNumPages dataEnv
                     , concurrentMetaDataFreeTree =
                            fileStateFreeTree dataEnv
                     , concurrentMetaDataCachedFreePages =
                            fileStateCachedFreePages dataEnv

                     , concurrentMetaIndexNumPages =
                            fileStateNewNumPages indexEnv
                     , concurrentMetaIndexFreeTree =
                            fileStateFreeTree indexEnv
                     , concurrentMetaIndexCachedFreePages =
                            fileStateCachedFreePages indexEnv
                     }
    put (meta', env)

--------------------------------------------------------------------------------