{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE UndecidableInstances #-} module Database.Liszt.Tracker ( Offset(..) , Request(..) , defRequest , handleRequest , LisztError(..) , LisztReader , withLisztReader , Tracker , withTracker )where import Control.Applicative import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.STM.Delay import Control.Exception import Control.Monad import Database.Liszt.Internal import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Internal as B import qualified Data.IntMap.Strict as IM import qualified Data.HashMap.Strict as HM import Data.Scientific (Scientific) import Data.Reflection (Given(..), give) import Data.Text (Text) import Data.Winery import Foreign.ForeignPtr import qualified Data.Winery.Internal.Builder as WB import GHC.Generics (Generic) import System.Directory import System.FilePath import System.FSNotify import System.IO data Offset = SeqNo !Int | FromEnd !Int | WineryTag !Schema ![Text] !Scientific deriving (Show, Generic) instance Serialise Offset data Request = Request { reqKey :: !Key , reqTimeout :: !Int , reqLimit :: !Int , reqFrom :: !Offset , reqTo :: !Offset } deriving (Show, Generic) instance Serialise Request defRequest :: Key -> Request defRequest k = Request { reqKey = k , reqTimeout = 0 , reqFrom = FromEnd 1 , reqTo = FromEnd 1 , reqLimit = maxBound } data LisztError = MalformedRequest | InvalidRequest | StreamNotFound | FileNotFound | IndexNotFound | WinerySchemaError !String | WineryError !DecodeException deriving (Show, Read) instance Exception LisztError data Tracker = Tracker { vRoot :: !(TVar (Frame CachePointer)) , vUpdated :: !(TVar Bool) , vPending :: !(TVar [STM (IO ())]) , vReaders :: !(TVar Int) , followThread :: !ThreadId , trackerPath :: !B.ByteString , streamHandle :: !LisztHandle , cache :: !Cache } data Cache = Cache { primaryCache :: TVar (IM.IntMap (Frame CachePointer)) , secondaryCache :: TVar (IM.IntMap (Frame CachePointer)) } newtype CachePointer = CachePointer RawPointer instance Given Cache => Fetchable CachePointer where fetchFrame h (CachePointer p@(RP ofs _)) = join $ atomically $ do let Cache{..} = given pcache <- readTVar primaryCache case IM.lookup ofs pcache of Just x -> return (pure x) Nothing -> do scache <- readTVar secondaryCache case IM.lookup ofs scache of Just x -> do writeTVar primaryCache $! IM.insert ofs x pcache return (pure x) Nothing -> return $ do x <- fmap CachePointer <$> fetchFrame h p atomically $ modifyTVar' primaryCache $ IM.insert ofs x return x flipCache :: Cache -> STM () flipCache Cache{..} = do readTVar primaryCache >>= writeTVar secondaryCache writeTVar primaryCache IM.empty createTracker :: WatchManager -> FilePath -> B.ByteString -> IO Tracker createTracker man prefix trackerPath = do let filePath = prefix B.unpack trackerPath exist <- doesFileExist filePath unless exist $ throwIO FileNotFound streamHandle <- openLiszt filePath vRoot <- newTVarIO Empty vPending <- newTVarIO [] vUpdated <- newTVarIO True vReaders <- newTVarIO 1 stopWatch <- watchDir man (takeDirectory filePath) (\case Modified path' _ _ | filePath == path' -> True _ -> False) $ const $ void $ atomically $ writeTVar vUpdated True fptr <- B.mallocByteString 4096 let wait = atomically $ do b <- readTVar vUpdated unless b retry writeTVar vUpdated False hSeek (hPayload streamHandle) SeekFromEnd (-fromIntegral footerSize) let seekRoot prevSize = do n <- withForeignPtr fptr $ \p -> hGetBuf (hPayload streamHandle) p 4096 if n == 0 then do let bs = B.PS fptr (max 0 $ prevSize - footerSize) footerSize if isFooter bs then try (evaluate $ decodeFrame bs) >>= \case Left (_ :: DecodeException) -> do wait seekRoot 0 Right a -> forceSpine a else do wait seekRoot 0 else seekRoot n cache <- Cache <$> newTVarIO IM.empty <*> newTVarIO IM.empty followThread <- forkFinally (forever $ do newRoot <- fmap CachePointer <$> seekRoot 0 join $ atomically $ do flipCache cache writeTVar vRoot newRoot pending <- readTVar vPending writeTVar vPending [] ms <- sequence pending return $ sequence_ ms wait) $ const $ stopWatch >> closeLiszt streamHandle return Tracker{..} data LisztReader = LisztReader { watchManager :: WatchManager , vTrackers :: TVar (HM.HashMap B.ByteString Tracker) , prefix :: FilePath } withLisztReader :: FilePath -> (LisztReader -> IO ()) -> IO () withLisztReader prefix k = do vTrackers <- newTVarIO HM.empty withManager $ \watchManager -> k LisztReader{..} acquireTracker :: LisztReader -> B.ByteString -> IO Tracker acquireTracker LisztReader{..} path = join $ atomically $ do streams <- readTVar vTrackers case HM.lookup path streams of Just s -> do modifyTVar' (vReaders s) (+1) return (return s) Nothing -> return $ do s <- createTracker watchManager prefix path atomically $ modifyTVar vTrackers (HM.insert path s) return s releaseTracker :: LisztReader -> Tracker -> IO () releaseTracker LisztReader{..} Tracker{..} = join $ atomically $ do n <- readTVar vReaders if n <= 1 then do modifyTVar' vTrackers (HM.delete trackerPath) return $ do killThread followThread closeLiszt streamHandle else return () <$ writeTVar vReaders (n - 1) withTracker :: LisztReader -> B.ByteString -> (Tracker -> IO a) -> IO a withTracker env path = bracket (acquireTracker env path) (releaseTracker env) handleRequest :: Tracker -> Request -> (LisztHandle -> Int -> [QueryResult] -> IO ()) -> IO () handleRequest str@Tracker{..} req@Request{..} cont = do root <- atomically $ do b <- readTVar vUpdated when b retry readTVar vRoot give cache $ lookupSpine streamHandle reqKey root >>= \case Nothing -> throwIO StreamNotFound Just spine -> do let len = spineLength spine let goSeqNo ofs | ofs >= len = do delay <- newDelay reqTimeout atomically $ do modifyTVar vPending $ (:) $ cont streamHandle 0 [] <$ waitDelay delay <|> pure (handleRequest str req { reqTo = SeqNo ofs } cont) | otherwise = do spine' <- dropSpine streamHandle (len - ofs - 1) spine case reqFrom of FromEnd n -> takeSpine streamHandle (min reqLimit $ ofs - (len - n) + 1) spine' [] >>= cont streamHandle ofs SeqNo n -> takeSpine streamHandle (min reqLimit $ ofs - n + 1) spine' [] >>= cont streamHandle ofs WineryTag sch name p -> do dec <- handleWinery sch name takeSpineWhile ((>=p) . dec . WB.toByteString) streamHandle spine' [] >>= cont streamHandle ofs case reqTo of FromEnd ofs -> goSeqNo (len - ofs) SeqNo ofs -> goSeqNo ofs WineryTag sch name p -> do dec <- handleWinery sch name dropSpineWhile ((>=p) . dec . WB.toByteString) streamHandle spine >>= \case Nothing -> cont streamHandle 0 [] Just (dropped, e, spine') -> case reqFrom of FromEnd n -> takeSpine streamHandle (min reqLimit $ n - dropped + 1) spine' [e] >>= cont streamHandle (len - dropped) SeqNo n -> takeSpine streamHandle (min reqLimit $ len - dropped - n + 1) spine' [e] >>= cont streamHandle (len - dropped) WineryTag sch' name' q -> do dec' <- handleWinery sch' name' takeSpineWhile ((>=q) . dec' . WB.toByteString) streamHandle spine' [e] >>= cont streamHandle (len - dropped) where handleWinery :: Schema -> [Text] -> IO (B.ByteString -> Scientific) handleWinery sch names = either (throwIO . WinerySchemaError . show) pure $ getDecoderBy (foldr (flip extractFieldBy) deserialiser names) sch