{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE Rank2Types #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# OPTIONS -Wall #-} {-# OPTIONS -Werror=incomplete-patterns #-} -- | -- Module : Network.DFINITY.RadixTree.Conduit -- Copyright : 2018 DFINITY Stiftung -- License : GPL-3 -- Maintainer : Enzo Haussecker -- Stability : Stable -- -- A parallel download protocol. module Network.DFINITY.RadixTree.Conduit ( -- ** Combinators sourceRadixTree , sinkRadixTree ) where import Codec.Serialise (deserialise, deserialiseOrFail) import Control.Concurrent (forkIO, killThread) import Control.Concurrent.BoundedChan (BoundedChan, readChan, tryWriteChan) import Control.Concurrent.MVar (modifyMVar_, newMVar, readMVar) import Control.Concurrent.ReadWriteLock (RWLock) import Control.Exception (throw) import Control.Monad (foldM, forM_, forever, void, when) import Control.Monad.IO.Class (liftIO) import Control.Monad.Trans.Resource (MonadResource, ResourceT, allocate, release) import Crypto.Hash.SHA256 (hash) import Data.ByteString.Lazy (fromStrict) import Data.ByteString.Short (ShortByteString, fromShort, toShort) import Data.ByteString.Char8 as Byte (ByteString, take) import Data.Conduit (ConduitT, await, yield) import Data.List as List (delete, null) import Data.LruCache as LRU (empty, insert, lookup) import Data.Map as Map (Map, (!), delete, empty, insert, keys, lookup, member, null, singleton) import Data.Maybe (isJust) import Data.Void (Void) import Database.LevelDB (DB) import Network.DFINITY.RadixTree.Bits import Network.DFINITY.RadixTree.Lenses import Network.DFINITY.RadixTree.Lock import Network.DFINITY.RadixTree.Types -- | -- Create a conduit from a radix tree. sourceRadixTree :: forall m database. MonadResource m => RadixDatabase (ConduitT () ByteString m) database => [Bool] -- ^ Bit mask. -> Int -- ^ LRU cache size in items. -> BoundedChan RadixRoot -- ^ Terminal state root producer. -> RadixTree database -- ^ Radix tree. -> RWLock -- ^ Database lock. -> ConduitT () ByteString m () {-# SPECIALISE sourceRadixTree :: [Bool] -> Int -> BoundedChan RadixRoot -> RadixTree DB -> RWLock -> ConduitT () ByteString (ResourceT IO) () #-} sourceRadixTree mask cacheSize chan tree lock | cacheSize <= 0 = throw $ InvalidArgument "invalid LRU cache size" | otherwise = do cache <- liftIO $ newMVar $ LRU.empty cacheSize action <- fmap fst $ flip allocate killThread $ forkIO $ forever $ do root <- readChan chan modifyMVar_ cache $ pure . LRU.insert root () loop cache tree [] release action where loop cache subtree@RadixTree {..} accum = do let accum' = _radixCheckpoint:accum seen <- liftIO $ readMVar cache if flip any accum' $ isJust . flip LRU.lookup seen then pure () else do let key = fromShort _radixCheckpoint result <- withReadLock lock $ load _radixDatabase key case result of Nothing -> pure () Just bytes -> do let RadixNode {..} = deserialise $ fromStrict bytes let success = all id $ zipWith (==) mask $ toBits $ fromShort _radixCheckpoint when success $ yield bytes forM_ [_radixLeft, _radixRight] $ \ case Nothing -> pure () Just root -> loop cache `flip` accum' $ setCheckpoint root subtree -- | -- Create a radix tree from a conduit. sinkRadixTree :: forall m database. MonadResource m => RadixDatabase (ConduitT ByteString Void m) database => RadixRoot -- ^ Target state root. -> BoundedChan RadixRoot -- ^ Terminal state root consumer. -> RadixTree database -- ^ Radix tree. -> RWLock -- ^ Database lock. -> ConduitT ByteString Void m (Either [RadixRoot] (RadixTree database)) {-# SPECIALISE sinkRadixTree :: RadixRoot -> BoundedChan RadixRoot -> RadixTree DB -> RWLock -> ConduitT ByteString Void (ResourceT IO) (Either [RadixRoot] (RadixTree DB)) #-} sinkRadixTree checkpoint chan tree@RadixTree {..} lock = loop1 Map.empty $ singleton checkpoint Nothing where -- Loop 1: The accumulation loop. loop1 :: Map RadixRoot (ShortByteString, [RadixRoot]) -> Map RadixRoot (Maybe RadixRoot) -> ConduitT ByteString Void m (Either [RadixRoot] (RadixTree database)) loop1 buffer targets = -- Have we found all the subtrees? if Map.null targets then pure $ Right $ setCheckpoint checkpoint $ setRoot checkpoint tree else do -- Wait for a node. mval <- await case mval of Nothing -> pure $ Left $ keys targets Just bytes -> case deserialiseOrFail $ fromStrict bytes of Left _ -> loop1 buffer targets Right RadixNode {..} -> do -- Does the node already exist in the database? let key = Byte.take 20 $ hash bytes let root = toShort key let want = member root targets exists <- if want then pure False else do result <- withReadLock lock $ load _radixDatabase key pure $ isJust result if exists then loop1 buffer $ Map.delete root targets else do -- Update the buffer to include the node. children <- foldM step [] $ maybe id (:) _radixLeft $ maybe id (:) _radixRight [] let buffer' = Map.insert root (toShort bytes, children) buffer -- Can we trace the node back to the target state root? if want then loop3 buffer' `uncurry` loop2 buffer' root (targets, []) else loop1 buffer' targets where step accum root = do let key = fromShort root result <- withReadLock lock $ load _radixDatabase key if isJust result then pure accum else pure $ root:accum -- Loop 2: The aggregation loop. loop2 :: Map RadixRoot (ShortByteString, [RadixRoot]) -> RadixRoot -> (Map RadixRoot (Maybe RadixRoot), [(RadixRoot, ShortByteString)]) -> (Map RadixRoot (Maybe RadixRoot), [(RadixRoot, ShortByteString)]) loop2 buffer root accum@(targets, candidates) = -- Get the node from the buffer and analyze. case Map.lookup root buffer of Nothing -> accum Just (bytes, []) -> -- The node is now a candidate. let candidates' = (root, bytes):candidates in (targets, candidates') Just (_, children) -> -- The children are now targets. let targets' = foldr step1 targets children in foldl step2 (targets', candidates) children where step1 = flip Map.insert $ Just root step2 = flip $ loop2 buffer -- Loop 3: The write loop. loop3 :: Map RadixRoot (ShortByteString, [RadixRoot]) -> Map RadixRoot (Maybe RadixRoot) -> [(RadixRoot, ShortByteString)] -> ConduitT ByteString Void m (Either [RadixRoot] (RadixTree database)) loop3 buffer targets = \ case [] -> loop1 buffer targets (root, bytes):candidates' -> do -- Write the node to the database. let key = fromShort root withWriteLock lock $ store _radixDatabase key $ fromShort bytes -- Remove all references to the node. let buffer' = Map.delete root buffer let targets' = Map.delete root targets -- Inspect the parent node. case targets ! root of Nothing -> loop3 buffer' targets' candidates' Just root' -> do let (bytes', siblings') = buffer ! root' let children' = List.delete root siblings' -- Has the sibling node been written to the database? if List.null children' then loop3 buffer' targets' $ (root', bytes'):candidates' else do -- Announce a terminal state root. liftIO $ void $ tryWriteChan chan root -- Update the parent node. let buffer'' = Map.insert root' (bytes', children') buffer' loop3 buffer'' targets' candidates'