{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

{-# OPTIONS -Wall #-}
{-# OPTIONS -Werror #-}
{-# OPTIONS -fno-warn-partial-type-signatures #-}

-- |
-- Module     : DFINITY.RadixTree.Conduit
-- Copyright  : 2018 DFINITY Stiftung
-- License    : GPL-3
-- Maintainer : Enzo Haussecker <enzo@dfinity.org>
-- Stability  : Stable
--
-- A parallel download protocol.
module DFINITY.RadixTree.Conduit (

   -- ** Combinators
     sourceRadixTree
   , sinkRadixTree

   ) where

import Codec.Serialise (deserialise, deserialiseOrFail, serialise)
import Control.Concurrent (forkIO, killThread)
import Control.Concurrent.BoundedChan (BoundedChan, readChan, tryWriteChan)
import Control.Concurrent.MVar (MVar, modifyMVar_, newMVar, readMVar)
import Control.Concurrent.ReadWriteLock (RWLock)
import Control.Exception (throw)
import Control.Monad (filterM, foldM, forM_, forever, void, when)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Resource (MonadResource, ResourceT, allocate, release)
import Crypto.Hash.BLAKE2.BLAKE2s (hash)
import Data.ByteString (ByteString)
import Data.ByteString.Lazy (fromStrict, toStrict)
import Data.ByteString.Short (fromShort, toShort)
import Data.Conduit (ConduitT, await, yield)
import Data.HashTable.IO as Cuckoo (CuckooHashTable, delete, fromList, insert, lookup)
import Data.List as List (delete)
import Data.LruCache as LRU (LruCache, empty, insert, lookup)
import Data.Maybe (isJust, isNothing)
import Data.Void (Void)
import Database.LevelDB as LevelDB (DB, Options(..), defaultOptions, defaultWriteOptions, delete)
import Database.LevelDB.Base (open)
import Database.LevelDB.Internal (unsafeClose)
import System.Directory (canonicalizePath, getTemporaryDirectory, removeDirectoryRecursive)
import System.IO.Temp (createTempDirectory)

import DFINITY.RadixTree.Bits
import DFINITY.RadixTree.Lenses
import DFINITY.RadixTree.Lock
import DFINITY.RadixTree.Types
import DFINITY.RadixTree.Utilities

--------------------------------------------------------------------------------

-- |
-- Create a conduit from a radix tree.
sourceRadixTree
   :: forall m database. MonadResource m
   => RadixDatabase (ConduitT () ByteString m) database
   => [Bool] -- ^ Bit mask.
   -> Int -- ^ LRU cache size in items.
   -> BoundedChan RadixRoot -- ^ Terminal state root producer.
   -> RadixTree database -- ^ Radix tree.
   -> RWLock -- ^ Radix database lock.
   -> ConduitT () ByteString m ()
sourceRadixTree mask cacheSize chan tree radixLock
   | cacheSize <= 0 = throw $ InvalidArgument "invalid LRU cache size"
   | otherwise = do
      cache <- liftIO $ newMVar $ empty cacheSize
      action <- fmap fst $ flip allocate killThread $ forkIO $ forever $ do
         root <- readChan chan
         modifyMVar_ cache $ pure . LRU.insert root ()
      loop cache tree []
      release action
      where

      loop
         :: MVar (LruCache RadixRoot ())
         -> RadixTree database
         -> [RadixRoot]
         -> ConduitT () ByteString m ()
      loop cache subtree@RadixTree {..} accum = do
         let accum' = _radixCheckpoint:accum
         seen <- liftIO $ readMVar cache
         if flip any accum' $ isJust . flip LRU.lookup seen
         then pure ()
         else do
            let key = fromShort _radixCheckpoint
            result <- withReadLock radixLock $ load _radixDatabase key
            case result of
               Nothing -> pure ()
               Just bytes -> do
                  let RadixNode {..} = deserialise $ fromStrict bytes
                  let success = all id $ zipWith (==) mask $ toBits $ fromShort _radixCheckpoint
                  when success $ yield bytes
                  forM_ [_radixLeft, _radixRight] $ \ case
                     Nothing -> pure ()
                     Just root -> loop cache `flip` accum' $ setCheckpoint root subtree

{-# SPECIALISE sourceRadixTree
               :: [Bool]
               -> Int
               -> BoundedChan RadixRoot
               -> RadixTree DB
               -> RWLock
               -> ConduitT () ByteString (ResourceT IO) () #-}

--------------------------------------------------------------------------------

-- |
-- Create a radix tree from a conduit.
sinkRadixTree
   :: forall m database. MonadResource m
   => RadixDatabase (ConduitT ByteString Void m) database
   => RadixRoot -- ^ Target state root.
   -> BoundedChan RadixRoot -- ^ Terminal state root consumer.
   -> RadixTree database -- ^ Radix tree.
   -> RWLock -- ^ Radix database lock.
   -> ConduitT ByteString Void m (Either String (RadixTree database))
sinkRadixTree checkpoint chan tree@RadixTree {..} radixLock = do
   -- Create a temporary directory.
   relative <- liftIO getTemporaryDirectory
   absolute <- liftIO $ canonicalizePath relative
   let createTempDir = createTempDirectory absolute "dfinity"
   let destroyTempDir = ignoreIOErrors . removeDirectoryRecursive
   (tempDirKey, tempDir) <- allocate createTempDir destroyTempDir
   -- Create a temporary database.
   let createTempDatabase = open tempDir defaultOptions {createIfMissing = True}
   (tempDatabaseKey, tempDatabase) <- allocate createTempDatabase unsafeClose
   -- Create a hash table.
   table <- liftIO $ fromList [(checkpoint, Nothing)]
   -- Consume the radix nodes.
   result <- loop1 tempDatabase table
   -- Remove the temporary database.
   release tempDatabaseKey
   release tempDirKey
   -- Return the result.
   pure result
   where

   -- Loop 1: The collection loop.
   loop1
      :: DB
      -> CuckooHashTable RadixRoot (Maybe RadixRoot)
      -> ConduitT ByteString Void m (Either String (RadixTree database))
   loop1 tempDatabase table = do
      -- Have we collected all the radix nodes?
      done <- liftIO $ isNothing <$> Cuckoo.lookup table checkpoint
      if done
      then pure $ Right $ setCheckpoint checkpoint $ setRoot checkpoint tree
      else do
         -- Wait for a radix node.
         mval <- await
         case mval of
            Nothing -> pure $ Left "EOF"
            Just node ->
               case deserialiseOrFail $ fromStrict node of
                  Left _ -> loop1 tempDatabase table
                  Right RadixNode {..} -> do
                     -- Does the radix node already exist in the radix database?
                     let key = hash 20 mempty node
                     let root = toShort key
                     want <- liftIO $ isJust <$> Cuckoo.lookup table root
                     exists <- if want
                        then pure False
                        else withReadLock radixLock $ isJust <$> load _radixDatabase key
                     if exists
                     then do
                        -- Announce a terminal state root.
                        liftIO $ void $ tryWriteChan chan root
                        -- Discard the radix node.
                        liftIO $ Cuckoo.delete table root
                        loop1 tempDatabase table
                     else do
                        -- Identify any children not present in the radix database.
                        let absent = fmap isNothing . withReadLock radixLock . load _radixDatabase . fromShort
                        let children = maybe id (:) _radixLeft $ maybe id (:) _radixRight []
                        targets <- filterM absent children
                        -- Write the radix node and its targets to the temporary database.
                        let value = toStrict $ serialise (node, targets)
                        store tempDatabase key value
                        -- Does the radix node have any gaps in its lineage?
                        if not want
                        then loop1 tempDatabase table
                        else do
                           -- Write all eligible radix nodes to the radix database.
                           eligible <- loop2 tempDatabase table root []
                           loop3 tempDatabase table eligible
                           loop1 tempDatabase table

   -- Loop 2: The aggregation loop.
   loop2
      :: DB
      -> CuckooHashTable RadixRoot (Maybe RadixRoot)
      -> RadixRoot
      -> [(RadixRoot, ByteString)]
      -> ConduitT ByteString Void m [(RadixRoot, ByteString)]
   loop2 tempDatabase table root eligible = do
      -- Read the radix node and its targets from the temporary database.
      result <- load tempDatabase $ fromShort root
      case deserialise . fromStrict <$> result of
         Nothing -> pure eligible
         Just (bytes, targets :: [] _) ->
            -- Is the radix node eligible to be written to disk?
            if null targets
            then pure $ (root, bytes):eligible
            else do
               -- Write the child-parent pairs to the hash table.
               liftIO $ forM_ targets $ \ child ->
                  Cuckoo.insert table child $ Just root
               -- Recurse.
               foldM step eligible targets
               where step = flip $ loop2 tempDatabase table

   -- Loop 3: The write loop.
   loop3
      :: DB
      -> CuckooHashTable RadixRoot (Maybe RadixRoot)
      -> [(RadixRoot, ByteString)]
      -> ConduitT ByteString Void m ()
   loop3 tempDatabase table = \ case
      [] -> pure ()
      (root, bytes):eligible -> do
         -- Write the radix node to the radix database.
         let key = fromShort root
         withWriteLock radixLock $ store _radixDatabase key bytes
         -- Delete the radix node from the temporary database.
         LevelDB.delete tempDatabase defaultWriteOptions $ fromShort root
         Just parent <- liftIO $ Cuckoo.lookup table root
         liftIO $ Cuckoo.delete table root
         case parent of
            Nothing -> pure ()
            Just root' -> do
               let key' = fromShort root'
               Just value <- load tempDatabase key'
               let (bytes', targets') = deserialise $ fromStrict value
               let targets'' = List.delete root targets'
               -- Have both siblings been written to the radix database?
               if null targets''
               then do
                  let eligible' = (root', bytes'):eligible
                  loop3 tempDatabase table eligible'
               else do
                  -- Update the parent node.
                  let value' = toStrict $ serialise (bytes', targets'')
                  store tempDatabase key' value'
                  loop3 tempDatabase table eligible

{-# SPECIALISE sinkRadixTree
               :: RadixRoot
               -> BoundedChan RadixRoot
               -> RadixTree DB
               -> RWLock
               -> ConduitT ByteString
                           Void
                           (ResourceT IO)
                           (Either String (RadixTree DB)) #-}

--------------------------------------------------------------------------------