{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE LambdaCase #-}
module Database.Franz.Reader where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.Serialize
import qualified Data.ByteString.Char8 as B
import qualified Data.HashMap.Strict as HM
import qualified Data.IntMap.Strict as IM
import qualified Data.Vector.Unboxed as U
import qualified Data.Vector as V
import Data.Void
import Data.Maybe (isJust)
import GHC.Clock (getMonotonicTime)
import GHC.Generics (Generic)
import System.Directory
import System.FilePath
import System.IO
import System.FSNotify

data RequestType = AllItems | LastItem deriving (Show, Generic)
instance Serialize RequestType

data ItemRef = BySeqNum !Int -- ^ sequential number
  | ByIndex !B.ByteString !Int -- ^ index name and value
  deriving (Show, Generic)
instance Serialize ItemRef

data Query = Query
  { reqStream :: !B.ByteString
  , reqFrom :: !ItemRef -- ^ name of the index to search
  , reqTo :: !ItemRef -- ^ name of the index to search
  , reqType :: !RequestType
  } deriving (Show, Generic)
instance Serialize Query

data Stream = Stream
  { vOffsets :: !(TVar (IM.IntMap Int))
  , indexNames :: ![B.ByteString]
  , indices :: !(HM.HashMap B.ByteString (TVar (IM.IntMap Int)))
  , vCount :: !(TVar Int)
  , vCaughtUp :: !(TVar Bool)
  , followThread :: !ThreadId
  , indexHandle :: !Handle
  , payloadHandle :: !Handle
  , vActivity :: !(TVar Activity)
  }

type Activity = Either Double Int

addActivity :: Activity -> Activity
addActivity (Left _) = Right 0
addActivity (Right n) = Right (n + 1)

removeActivity :: Stream -> IO ()
removeActivity str = do
  now <- getMonotonicTime
  atomically $ modifyTVar' (vActivity str) $ \case
    Left _ -> Left now
    Right n
      | n <= 0 -> Left now
      | otherwise -> Right (n - 1)

closeStream :: Stream -> IO ()
closeStream Stream{..} = do
  killThread followThread
  hClose payloadHandle
  hClose indexHandle

createStream :: WatchManager -> FilePath -> IO Stream
createStream man path = do
  let offsetPath = path </> "offsets"
  let payloadPath = path </> "payloads"
  exist <- doesFileExist offsetPath
  unless exist $ throwIO $ StreamNotFound offsetPath
  initialOffsetsBS <- B.readFile offsetPath
  payloadHandle <- openBinaryFile payloadPath ReadMode
  indexNames <- B.lines <$> B.readFile (path </> "indices")
  let icount = 1 + length indexNames
  let count = B.length initialOffsetsBS `div` (8 * icount)
  let getI = fromIntegral <$> getInt64le
  initialIndices <- either (throwIO . InternalError) pure
    $ runGet (V.replicateM count $ U.replicateM icount getI) initialOffsetsBS
  let initialOffsets = IM.fromList $ V.toList
        $ V.zip (V.enumFromN 0 count) $ V.map U.head initialIndices
  vOffsets <- newTVarIO $! initialOffsets
  vCaughtUp <- newTVarIO False
  vCount <- newTVarIO $! IM.size initialOffsets
  _ <- watchDir man path (\case
    Modified p _ _ | p == offsetPath -> True
    _ -> False)
    $ const $ atomically $ writeTVar vCaughtUp False

  vIndices <- forM [1..length indexNames] $ \i -> newTVarIO
    $ IM.fromList $ V.toList $ V.zip (V.map (U.! i) initialIndices) (V.enumFromN 0 count)

  indexHandle <- openFile offsetPath ReadMode

  let final :: Either SomeException Void -> IO ()
      final (Left exc) = logFollower [path, "terminated with", show exc]
      final (Right v) = absurd v

  -- TODO broadcast an exception if it exits?
  followThread <- flip forkFinally final $ do
    forM_ (IM.maxViewWithKey initialOffsets) $ \((i, _), _) -> do
      hSeek indexHandle AbsoluteSeek $ fromIntegral $ succ i * icount * 8
    forever $ do
      bs <- B.hGet indexHandle (8 * icount)
      if B.null bs
        then do
          atomically $ writeTVar vCaughtUp True
          atomically $ readTVar vCaughtUp >>= check . not
        else do
          ofs : indices <- either (throwIO . InternalError) pure $ runGet (replicateM icount getI) bs
          atomically $ do
            i <- readTVar vCount
            modifyTVar' vOffsets $ IM.insert i ofs
            forM_ (zip vIndices indices) $ \(v, x) -> modifyTVar' v $ IM.insert (fromIntegral x) i
            writeTVar vCount $! i + 1

  let indices = HM.fromList $ zip indexNames vIndices

  logFollower ["started", path]

  vActivity <- getMonotonicTime >>= newTVarIO . Left
  return Stream{..}
  where
    logFollower = hPutStrLn stderr . unwords . (:) "[follower]"

type QueryResult = ((Int, Int) -- starting SeqNo, byte offset
    , (Int, Int)) -- ending SeqNo, byte offset

range :: Int -- ^ from
  -> Int -- ^ to
  -> RequestType
  -> IM.IntMap Int -- ^ offsets
  -> (Bool, QueryResult)
range begin end rt allOffsets = case rt of
    AllItems -> (ready, (first, maybe first fst $ IM.maxViewWithKey body))
    LastItem -> case IM.maxViewWithKey body of
      Nothing -> (False, (zero, zero))
      Just (ofs', r) -> case IM.maxViewWithKey (IM.union left r) of
        Just (ofs, _) -> (ready, (ofs, ofs'))
        Nothing -> (ready, (zero, ofs'))
  where
    zero = (-1, 0)
    ready = isJust lastItem || not (null cont)
    (wing, lastItem, cont) = IM.splitLookup end allOffsets
    (left, body) = splitR begin $ maybe id (IM.insert end) lastItem wing
    first = maybe zero fst $ IM.maxViewWithKey left

splitR :: Int -> IM.IntMap a -> (IM.IntMap a, IM.IntMap a)
splitR i m = let (l, p, r) = IM.splitLookup i m in (l, maybe id (IM.insert i) p r)

data FranzException = MalformedRequest !String
  | StreamNotFound !FilePath
  | IndexNotFound !B.ByteString ![B.ByteString]
  | InternalError !String
  | ClientError !String
  deriving (Show, Generic)
instance Serialize FranzException
instance Exception FranzException

data FranzReader = FranzReader
  { watchManager :: WatchManager
  , vStreams :: TVar (HM.HashMap B.ByteString Stream)
  , prefix :: FilePath
  }

reaper :: Double -- interval
  -> Double -- lifetime
  -> FranzReader -> IO ()
reaper int life FranzReader{..} = forever $ do
  now <- getMonotonicTime
  (count, xs) <- atomically $ do
    list <- newTVar []
    m <- readTVar vStreams
    m' <- forM m $ \s -> readTVar (vActivity s) >>= \case
      Left t | now - t >= life -> Nothing <$ modifyTVar list (s:)
      _ -> pure $ Just s
    writeTVar vStreams $! HM.mapMaybe id m'
    (,) (HM.size m) <$> readTVar list
  mapM_ closeStream xs
  unless (null xs) $ hPutStrLn stderr $ unwords
    [ "[reaper] closed"
    , show (length xs)
    , "out of"
    , show count
    ]
  threadDelay $ floor $ int * 1e6

withFranzReader :: FilePath -> (FranzReader -> IO ()) -> IO ()
withFranzReader prefix k = do
  vStreams <- newTVarIO HM.empty
  withManager $ \watchManager -> k FranzReader{..}

handleQuery :: FranzReader
  -> FilePath
  -> Query
  -> IO (Stream, STM (Bool, QueryResult))
handleQuery FranzReader{..} dir (Query name begin_ end_ rt) = do
  streams <- readTVarIO vStreams
  let path = prefix </> dir </> B.unpack name
  let streamId = B.pack dir <> name
  stream@Stream{..} <- case HM.lookup streamId streams of
    Nothing -> do
      s <- createStream watchManager path
      atomically $ modifyTVar' vStreams $ HM.insert streamId s
      return s
    Just vStream -> return vStream
  atomically $ modifyTVar' vActivity addActivity
  return $ (,) stream $ do
    readTVar vCaughtUp >>= check
    allOffsets <- readTVar vOffsets
    let finalOffset = case IM.maxViewWithKey allOffsets of
          Just ((k, _), _) -> k + 1
          Nothing -> 0
    let rotate i
          | i < 0 = finalOffset + i
          | otherwise = i
    begin <- case begin_ of
      BySeqNum i -> pure $ rotate i
      ByIndex index val -> case HM.lookup index indices of
        Nothing -> throwSTM $ IndexNotFound index $ HM.keys indices
        Just v -> do
          m <- readTVar v
          let (_, wing) = splitR val m
          return $! maybe maxBound fst $ IM.minView wing
    end <- case end_ of
      BySeqNum i -> pure $ rotate i
      ByIndex index val -> case HM.lookup index indices of
        Nothing -> throwSTM $ IndexNotFound index $ HM.keys indices
        Just v -> do
          m <- readTVar v
          let (body, lastItem, _) = IM.splitLookup val m
          let body' = maybe id (IM.insert val) lastItem body
          return $! maybe minBound fst $ IM.maxView body'
    return $! range begin end rt allOffsets