{-# LANGUAGE LambdaCase #-} {-# OPTIONS -Wall #-} {-# OPTIONS -Werror #-} module Integrations ( tests , simple ) where import Control.Arrow (arr) import Control.Concurrent.BoundedChan (newBoundedChan, readChan, writeChan) import Control.Concurrent.MVar (MVar, modifyMVar_, newMVar, readMVar) import Control.Concurrent.ReadWriteLock (new) import Control.Monad (foldM, forM_, forever, replicateM, void) import Control.Monad.IO.Class (MonadIO(..)) import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Resource (MonadResource, resourceForkIO, runResourceT) import Crypto.Hash.BLAKE2.BLAKE2s (hash) import Data.ByteString.Base16 (encode) import Data.ByteString.Builder (toLazyByteString, word32BE) import Data.ByteString.Char8 as Byte (ByteString, length, pack, unpack) import Data.ByteString.Lazy (toStrict) import Data.ByteString.Short (fromShort) import Data.Conduit (ConduitT, (.|), awaitForever, runConduit, yield) import Data.Conduit.Internal (Pipe(..), sourceToPipe) import Data.Word (Word32, Word64) import Database.LevelDB (DB, Options(..), defaultOptions, open) import System.FilePath (()) import System.IO.Temp (withSystemTempDirectory) import Test.Tasty (TestTree, testGroup) import Test.Tasty.QuickCheck (arbitrary, generate) import Test.Tasty.HUnit (Assertion, assertEqual, testCase) import Text.Printf (printf) import DFINITY.RadixTree import DFINITY.RadixTree.Conduit tests :: TestTree tests = testGroup "integrations" [ testCase "simple-01-001000" $ simple 01 001000 , testCase "simple-02-005000" $ simple 02 005000 , testCase "simple-04-010000" $ simple 04 010000 , testCase "simple-08-025000" $ simple 08 025000 , testCase "simple-16-050000" $ simple 16 050000 , testCase "simple-32-100000" $ simple 32 100000 ] simple :: Int -> Word32 -> Assertion simple n size = withSystemTempDirectory "test" $ \ path -> do -- Create concurrent data structures. counter <- newMVar 0 senders <- replicateM n $ newBoundedChan 64 receiver <- newBoundedChan 64 -- Create the source and target database locks. sourceLock <- new targetLock <- new -- Run the deterministic resource allocator. runResourceT $ do -- Create the source and target trees. sourceTree <- create path "source" targetTree <- create path "target" -- Saturate the source tree. sourceTree' <- saturate 1 size sourceTree -- Calculate the source tree state root. sourceRoot' <- fst <$> merkleizeRadixTree sourceTree' liftIO $ printf "source: %s\n" $ pretty sourceRoot' -- Create thread to relay updates. void $ resourceForkIO $ liftIO $ forever $ do update <- readChan receiver forM_ senders $ flip writeChan update -- Define the state synchronization conduits. let masks = genMasks n let zipper mask sender = sourceRadixTree mask 2048 sender sourceTree' sourceLock let source = merge $ zipWith zipper masks senders let sink = sinkRadixTree sourceRoot' receiver targetTree targetLock -- Run the state synchronization protocol. result <- runConduit $ source .| bandwidth counter .| sink -- Inspect the result. case result of Left _ -> fail "missing subtrees" Right targetTree' -> do -- Calculate the target tree state root. targetRoot' <- fst <$> merkleizeRadixTree targetTree' liftIO $ printf "target: %s\n" $ pretty targetRoot' -- Display bandwidth utilization. total <- liftIO $ readMVar counter liftIO $ printf "bandwidth: %d bytes\n" total -- Assert the source and target tree state roots as equal. liftIO $ assertEqual "simple" sourceRoot' targetRoot' create :: MonadResource m => FilePath -> String -> m (RadixTree DB) create path name = do handle <- open database options createRadixTree 262144 2028 Nothing handle where database = path name options = defaultOptions {createIfMissing = True} saturate :: MonadIO m => RadixDatabase m database => Word32 -> Word32 -> RadixTree database -> m (RadixTree database) saturate a b tree = foldM step tree [a..b] where step accum x = do let key = hashW32 x value <- liftIO $ generate $ pack <$> arbitrary accum' <- insertRadixTree key value accum if mod x 1000 == 0 then snd <$> merkleizeRadixTree accum' else pure accum' merge :: Monad m => [ConduitT () o m ()] -> ConduitT () o m () merge = loop . map sourceToPipe where loop pipes = do pipes' <- foldM step [] pipes if null pipes' then pure () else loop pipes' step accum = \ case Done () -> pure accum HaveOutput pipe value -> do yield value pure $ pipe:accum PipeM action -> do pipe <- lift action step accum pipe _ -> fail $ "merge: undefined" bandwidth :: MonadIO m => MVar Word64 -> ConduitT ByteString ByteString m () bandwidth counter = awaitForever $ \ bytes -> do let size = fromIntegral $ Byte.length bytes liftIO $ modifyMVar_ counter $ \ accum -> pure $! accum + size yield bytes pretty :: RadixRoot -> String pretty = unpack . encode . fromShort hashW32 :: Word32 -> ByteString hashW32 = hash 32 mempty . toStrict . toLazyByteString . word32BE genMasks :: Int -> [[Bool]] genMasks n = take n $ concat $ repeat xs where i = truncate $ logBase 2 $ (realToFrac n :: Double) xs = [zipWith arr fs $ replicate i True | fs <- replicateM i [id, not]]