{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.Franz (
WriterHandle,
openWriter,
closeWriter,
withWriter,
write,
flush,
getLastSeqNo
) where
import Control.Concurrent
import Control.Exception
import Control.Monad
import qualified Data.ByteString.FastBuilder as BB
import qualified Data.Vector.Storable.Mutable as MV
import Data.Foldable (toList)
import Data.Int
import Data.Word (Word64)
import Data.IORef
import Data.Kind (Type)
import Foreign.Ptr (Ptr)
import Foreign.Storable (Storable(..))
import GHC.IO.Handle.FD (openFileBlocking)
import System.Directory
import System.Endian (toLE64)
import System.FilePath
import System.IO
data WriterHandle (f :: Type -> Type) = WriterHandle
{ hPayload :: Handle
, hOffset :: Handle
, vOffset :: MVar (Int, Word64)
, offsetBuf :: MV.IOVector Word64
, offsetPtr :: IORef Int
, indexCount :: Int
}
getLastSeqNo :: WriterHandle f -> IO Int
getLastSeqNo = fmap (subtract 1 . fst) . readMVar . vOffset
openWriter :: Foldable f
=> f String
-> FilePath
-> IO (WriterHandle f)
openWriter idents path = do
createDirectoryIfMissing True path
let payloadPath = path </> "payloads"
let offsetPath = path </> "offsets"
let indexPath = path </> "indices"
alreadyExists <- doesFileExist payloadPath
vOffset <- if alreadyExists
then do
count <- withFile offsetPath ReadMode hFileSize
size <- withFile payloadPath ReadMode hFileSize
newMVar (fromIntegral count `div` 8, fromIntegral size)
else newMVar (0,0)
writeFile indexPath $ unlines $ toList idents
hPayload <- openFileBlocking payloadPath AppendMode
hOffset <- openFileBlocking offsetPath AppendMode
offsetBuf <- MV.new offsetBufferSize
offsetPtr <- newIORef 0
let indexCount = length idents + 1
return WriterHandle{..}
closeWriter :: Foldable f => WriterHandle f -> IO ()
closeWriter h@WriterHandle{..} = do
flush h
hClose hPayload
hClose hOffset
withWriter :: Foldable f => f String -> FilePath -> (WriterHandle f -> IO a) -> IO a
withWriter idents path = bracket (openWriter idents path) closeWriter
offsetBufferSize :: Int
offsetBufferSize = 256
write :: Foldable f
=> WriterHandle f
-> f Int64
-> BB.Builder
-> IO Int
write h@WriterHandle{..} ixs !bs = modifyMVar vOffset $ \(n, ofs) -> do
len <- fromIntegral <$> BB.hPutBuilderLen hPayload bs
let ofs' = ofs + len
pos <- readIORef offsetPtr
pos' <- if pos + indexCount >= offsetBufferSize
then 0 <$ unsafeFlush h
else return pos
MV.write offsetBuf pos' $ toLE64 $ fromIntegral ofs'
forM_ (zip [pos'+1..] (toList ixs))
$ \(i, v) -> MV.write offsetBuf i $ toLE64 $ fromIntegral v
writeIORef offsetPtr (pos' + indexCount)
let !n' = n + 1
return ((n', ofs'), n)
{-# INLINE write #-}
flush :: WriterHandle f -> IO ()
flush h = withMVar (vOffset h) $ const $ unsafeFlush h
unsafeFlush :: WriterHandle f -> IO ()
unsafeFlush WriterHandle{..} = do
len <- readIORef offsetPtr
when (len > 0) $ do
hFlush hPayload
MV.unsafeWith offsetBuf $ \ptr -> hPutElems hOffset ptr len
writeIORef offsetPtr 0
hFlush hOffset
hPutElems :: forall a. Storable a => Handle -> Ptr a -> Int -> IO ()
hPutElems hdl ptr len = hPutBuf hdl ptr (len * sizeOf (undefined :: a))
{-# INLINE hPutElems #-}