{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Persistence
( PersistentValue
, PersistenceConfig (..)
, getDataFile
, getValue
, apply
, loadFromBackend
, setupStorageBackend
, syncToBackend
) where
import Control.Concurrent.STM
import Control.Exception
import Control.Monad.Except
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Types as Aeson
import qualified Data.ByteString as SBS
import qualified Data.ByteString.Char8 as SBS8
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString.Lazy.Char8 as LBS8
import Data.Foldable
import Data.Text (Text)
import qualified Data.Text as Text
import Data.Traversable
import Database.SQLite.Simple (FromRow (..), NamedParam (..), Only (..), execute,
execute_, executeNamed, field, open, query_)
import System.Directory (getFileSize, renameFile)
import System.Exit (die)
import System.IO
import System.IO.Error (tryIOError, isDoesNotExistError, isPermissionError)
import Logger (Logger, LogLevel(..))
import qualified Logger
import qualified Metrics
import qualified Store
import Config (StorageBackend (..))
data PersistentValue = PersistentValue
{ pvConfig :: PersistenceConfig
, pvValue :: TVar Store.Value
, pvIsDirty :: TVar Bool
, pvJournal :: Maybe Handle
}
data PersistenceConfig = PersistenceConfig
{ pcDataFile :: FilePath
, pcJournalFile :: Maybe FilePath
, pcLogger :: Logger
, pcMetrics :: Maybe Metrics.IcepeakMetrics
}
getValue :: PersistentValue -> STM Store.Value
getValue = readTVar . pvValue
apply :: Store.Modification -> PersistentValue -> IO ()
apply op val = do
for_ (pvJournal val) $ \journalHandle -> do
let entry = Aeson.encode op
LBS8.hPutStrLn journalHandle entry
for_ (pcMetrics . pvConfig $ val) $ \metrics -> do
journalPos <- hTell journalHandle
_ <- Metrics.incrementJournalWritten (LBS8.length entry) metrics
Metrics.setJournalSize journalPos metrics
atomically $ do
modifyTVar (pvValue val) (Store.applyModification op)
writeTVar (pvIsDirty val) True
getDataFile :: StorageBackend -> Maybe FilePath -> FilePath
getDataFile _ (Just filePath) = filePath
getDataFile File _ = "icepeak.json"
getDataFile Sqlite _ = "icepeak.db"
setupStorageBackend :: StorageBackend -> FilePath -> IO ()
setupStorageBackend File filePath = do
eitherEncodedValue <- tryIOError $ SBS.readFile filePath
case eitherEncodedValue of
Left e | isDoesNotExistError e -> do
let message = "WARNING: Could not read data from " ++ filePath ++
" because the file does not exist yet. Created an empty database instead."
SBS.writeFile filePath "{}"
putStrLn message
Left e | isPermissionError e ->
die $ "File " ++ filePath ++ " cannot be read due to a permission error. Please check the file permissions."
Left e -> die (show e)
Right "" -> do
let message = "WARNING: The provided --data-file " ++ filePath ++
" is empty. Will write a default database of {} to this file."
putStrLn message
SBS.writeFile filePath "{}"
Right encodedValue -> case Aeson.eitherDecodeStrict encodedValue of
Left msg -> die $ "Failed to decode the initial data in " ++ filePath ++ ": " ++ show msg
Right (_value :: Aeson.Value) -> pure ()
setupStorageBackend Sqlite filePath = do
conn <- liftIO $ open filePath
liftIO $ execute_ conn "CREATE TABLE IF NOT EXISTS icepeak (value BLOB)"
jsonRows <- liftIO $ (query_ conn "SELECT * from icepeak" :: IO [JsonRow])
case jsonRows of
[] -> liftIO $ execute conn "INSERT INTO icepeak (value) VALUES (?)" (Only $ Aeson.encode Aeson.emptyObject)
_ -> pure()
loadFromBackend :: StorageBackend -> PersistenceConfig -> IO (Either String PersistentValue)
loadFromBackend backend config = runExceptT $ do
let metrics = pcMetrics config
dataFilePath = pcDataFile config
liftIO $ forM_ metrics $ \metric -> do
size <- getFileSize dataFilePath
Metrics.setDataSize size metric
value <- case backend of
File -> readData dataFilePath
Sqlite -> readSqliteData dataFilePath
valueVar <- lift $ newTVarIO value
dirtyVar <- lift $ newTVarIO False
journal <- for (pcJournalFile config) openJournal
let val = PersistentValue
{ pvConfig = config
, pvValue = valueVar
, pvIsDirty = dirtyVar
, pvJournal = journal
}
recoverJournal val
return val
syncToBackend :: StorageBackend -> PersistentValue -> IO ()
syncToBackend File pv = syncFile pv
syncToBackend Sqlite pv = syncSqliteFile pv
data JsonRow = JsonRow {jsonByteString :: SBS.ByteString} deriving (Show)
instance FromRow JsonRow where
fromRow = JsonRow <$> field
readSqliteData :: FilePath -> ExceptT String IO Store.Value
readSqliteData filePath = ExceptT $ do
conn <- liftIO $ open filePath
jsonRows <- liftIO $ (query_ conn "SELECT * from icepeak" :: IO [JsonRow])
case jsonRows of
[] -> pure $ Right Aeson.emptyObject
_ -> case Aeson.eitherDecodeStrict (jsonByteString $ head $ jsonRows) of
Left msg -> pure $ Left $ "Failed to decode the initial data: " ++ show msg
Right value -> pure $ Right (value :: Store.Value)
syncSqliteFile :: PersistentValue -> IO ()
syncSqliteFile val = do
(dirty, value) <- atomically $ (,) <$> readTVar (pvIsDirty val)
<*> readTVar (pvValue val)
<* writeTVar (pvIsDirty val) False
when dirty $ do
let filePath = pcDataFile $ pvConfig val
conn <- open filePath
liftIO $ executeNamed conn "UPDATE icepeak SET value = :value" [":value" := Aeson.encode value]
truncateJournal val
updateMetrics val
syncFile :: PersistentValue -> IO ()
syncFile val = do
(dirty, value) <- atomically $ (,) <$> readTVar (pvIsDirty val)
<*> readTVar (pvValue val)
<* writeTVar (pvIsDirty val) False
when dirty $ do
let fileName = pcDataFile $ pvConfig val
tempFileName = fileName ++ ".new"
LBS.writeFile tempFileName (Aeson.encode value)
renameFile tempFileName fileName
truncateJournal val
updateMetrics val
truncateJournal :: PersistentValue -> IO ()
truncateJournal val =
for_ (pvJournal val) $ \journalHandle -> do
hSeek journalHandle AbsoluteSeek 0
hSetFileSize journalHandle 0
updateMetrics :: PersistentValue -> IO ()
updateMetrics val = do
let filePath = pcDataFile . pvConfig $ val
metrics = pcMetrics . pvConfig $ val
forM_ metrics $ \metric -> do
size <- getFileSize filePath
Metrics.setDataSize size metric
Metrics.setJournalSize (0 :: Int) metric
Metrics.incrementDataWritten size metric
openJournal :: FilePath -> ExceptT String IO Handle
openJournal journalFile = ExceptT $ do
eitherHandle <- try $ do
h <- openBinaryFile journalFile ReadWriteMode
hSetBuffering h LineBuffering
pure h
case eitherHandle :: Either SomeException Handle of
Left exc -> pure $ Left $ "Failed to open journal file: " ++ show exc
Right fileHandle -> pure $ Right fileHandle
recoverJournal :: PersistentValue -> ExceptT String IO ()
recoverJournal pval = for_ (pvJournal pval) $ \journalHandle -> ExceptT $ fmap formatErr $ try $ do
initialValue <- atomically $ readTVar (pvValue pval)
(finalValue, successful, total) <- runRecovery journalHandle initialValue
when (successful > 0) $ do
atomically $ do
writeTVar (pvValue pval) finalValue
writeTVar (pvIsDirty pval) True
syncFile pval
when (total > 0) $ do
logMessage pval "Journal replayed"
logMessage pval $ " failed: " <> Text.pack (show $ total - successful)
logMessage pval $ " successful: " <> Text.pack (show $ successful)
where
formatErr :: Either SomeException a -> Either String a
formatErr (Left exc) = Left $ "Failed to read journal: " ++ show exc
formatErr (Right x) = Right x
runRecovery journalHandle value = do
hSeek journalHandle AbsoluteSeek 0
foldJournalM journalHandle replayLine (value, 0 :: Integer, 0 :: Integer)
replayLine line (!value, !successful, !total) = do
when (total == 0) $ do
logMessage pval "Journal not empty, recovering"
case Aeson.eitherDecodeStrict line of
Left err -> do
let lineNumber = total + 1
logMessage pval $ failedRecoveryMsg err lineNumber
pure (value, successful, total + 1)
Right op -> pure (Store.applyModification op value, successful + 1, total + 1)
failedRecoveryMsg err line = "Failed to recover journal entry "
<> Text.pack (show line) <> ": " <> Text.pack err
readData :: FilePath -> ExceptT String IO Store.Value
readData filePath = ExceptT $ do
eitherEncodedValue <- tryIOError $ SBS.readFile filePath
case eitherEncodedValue of
Left e -> pure $ Left (show e)
Right encodedValue -> case Aeson.eitherDecodeStrict encodedValue of
Left msg -> pure $ Left $ "Failed to decode the initial data: " ++ show msg
Right value -> pure $ Right value
logMessage :: PersistentValue -> Text -> IO ()
logMessage pval msg = Logger.postLogBlocking (pcLogger $ pvConfig pval) LogInfo msg
foldJournalM :: Handle -> (SBS8.ByteString -> a -> IO a) -> a -> IO a
foldJournalM h f = go
where
go !x = do
eof <- hIsEOF h
if eof
then pure x
else do
line <- SBS8.hGetLine h
x' <- f line x
go x'