{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Network.Haskoin.Node.Chain
( chain
) where
import Control.Monad.Except
import Control.Monad.Logger
import Control.Monad.Reader
import Control.Monad.Trans.Maybe
import qualified Data.ByteString as B
import Data.Default
import Data.List
import Data.Maybe
import Data.Serialize as S
import Data.String.Conversions
import Data.Time.Clock.POSIX
import Data.Word
import Database.RocksDB (DB)
import qualified Database.RocksDB as R
import Database.RocksDB.Query as R
import Haskoin
import Network.Haskoin.Node.Common
import NQE
import System.Random
import UnliftIO
import UnliftIO.Concurrent
import UnliftIO.Resource
type MonadChain m
= (MonadLoggerIO m, MonadChainLogic ChainConfig Peer m)
chain ::
(MonadUnliftIO m, MonadLoggerIO m)
=> ChainConfig
-> Inbox ChainMessage
-> m ()
chain cfg inbox = do
st <-
newTVarIO
ChainState
{ chainSyncing = Nothing
, mySynced = False
, newPeers = []
}
let rd = ChainReader {myReader = cfg, myChainDB = db, chainState = st}
withSyncLoop ch $ run `runReaderT` rd
where
net = chainConfNetwork cfg
db = chainConfDB cfg
ch = inboxToMailbox inbox
run = do
$(logDebugS) "Chain" "Initializing..."
initChainDB net
getBestBlockHeader >>= chainEvent . ChainBestBlock
$(logInfoS) "Chain" "Initialization complete"
forever $ receive inbox >>= chainMessage
chainEvent :: MonadChain m => ChainEvent -> m ()
chainEvent e = do
l <- chainConfEvents <$> asks myReader
case e of
ChainBestBlock b ->
$(logInfoS) "Chain" $
"Best block header at height " <> cs (show (nodeHeight b))
ChainSynced b ->
$(logInfoS) "Chain" $
"Headers now synced at height " <> cs (show (nodeHeight b))
atomically $ l e
processHeaders ::
MonadChain m => Peer -> [BlockHeader] -> m ()
processHeaders p hs =
void . runMaybeT $ do
net <- chainConfNetwork <$> asks myReader
$(logDebugS) "Chain" $
"Importing " <> cs (show (length hs)) <> " headers"
now <- round <$> liftIO getPOSIXTime
pbest <- getBestBlockHeader
importHeaders net now hs >>= \case
Left e -> do
$(logErrorS) "Chain" "Could not connect received headers"
e `killPeer` p
Right done -> do
setLastReceived now
best <- getBestBlockHeader
when (nodeHeader pbest /= nodeHeader best) . chainEvent $
ChainBestBlock best
if done
then do
$(logDebugS)
"Chain"
"Finished importing headers from peer"
MSendHeaders `sendMessage` p
finishPeer p
syncNewPeer
syncNotif
else syncPeer p
syncNewPeer :: MonadChain m => m ()
syncNewPeer = do
$(logDebugS) "Chain" "Attempting to sync against a new peer"
getSyncingPeer >>= \case
Nothing -> do
$(logDebugS) "Chain" "Getting next peer to sync from"
nextPeer >>= \case
Nothing ->
$(logInfoS) "Chain" "Finished syncing against all peers"
Just p -> syncPeer p
Just _ -> $(logDebugS) "Chain" "Already syncing against a peer"
syncNotif :: MonadChain m => m ()
syncNotif =
round <$> liftIO getPOSIXTime >>= notifySynced >>= \x ->
when x $ getBestBlockHeader >>= chainEvent . ChainSynced
syncPeer :: MonadChain m => Peer -> m ()
syncPeer p = do
$(logInfoS) "Chain" "Syncing against selected peer"
bb <- chainSyncingPeer >>= \case
Just ChainSync {chainSyncPeer = p', chainHighest = Just g}
| p == p' -> return g
_ -> getBestBlockHeader
now <- round <$> liftIO getPOSIXTime
gh <- syncHeaders now bb p
MGetHeaders gh `sendMessage` p
chainMessage :: MonadChain m => ChainMessage -> m ()
chainMessage (ChainGetBest reply) =
getBestBlockHeader >>= atomically . reply
chainMessage (ChainHeaders p hs) = do
$(logDebugS) "Chain" $ "Processing " <> cs (show (length hs)) <> " headers"
processHeaders p hs
chainMessage (ChainPeerConnected p a) = do
$(logDebugS) "Chain" $ "Adding new peer to sync queue: " <> cs (show a)
addPeer p
syncNewPeer
chainMessage (ChainPeerDisconnected p a) = do
$(logWarnS) "Chain" $ "Removing a peer from sync queue: " <> cs (show a)
finishPeer p
syncNewPeer
chainMessage (ChainGetAncestor h n reply) =
getAncestor h n >>= atomically . reply
chainMessage (ChainGetSplit r l reply) =
splitPoint r l >>= atomically . reply
chainMessage (ChainGetBlock h reply) =
getBlockHeader h >>= atomically . reply
chainMessage (ChainIsSynced reply) =
isSynced >>= atomically . reply
chainMessage ChainPing = do
ChainConfig {chainConfTimeout = to} <- asks myReader
now <- round <$> liftIO getPOSIXTime
chainSyncingPeer >>= \case
Nothing -> return ()
Just ChainSync {chainSyncPeer = p, chainTimestamp = t}
| now - t > fromIntegral to -> do
$(logErrorS) "Chain" "Syncing peer timed out"
PeerTimeout `killPeer` p
| otherwise -> return ()
withSyncLoop :: (MonadUnliftIO m, MonadLoggerIO m) => Chain -> m a -> m a
withSyncLoop ch f = withAsync go $ \a -> link a >> f
where
go =
forever $ do
threadDelay =<<
liftIO (randomRIO (250 * 1000, 1000 * 1000))
ChainPing `send` ch
dataVersion :: Word32
dataVersion = 1
data ChainDataVersionKey = ChainDataVersionKey
deriving (Eq, Ord, Show)
instance Key ChainDataVersionKey
instance KeyValue ChainDataVersionKey Word32
instance Serialize ChainDataVersionKey where
get = do
guard . (== 0x92) =<< S.getWord8
return ChainDataVersionKey
put ChainDataVersionKey = S.putWord8 0x92
data ChainSync p = ChainSync
{ chainSyncPeer :: !p
, chainTimestamp :: !Timestamp
, chainHighest :: !(Maybe BlockNode)
}
data ChainState p = ChainState
{ chainSyncing :: !(Maybe (ChainSync p))
, newPeers :: ![p]
, mySynced :: !Bool
}
newtype BlockHeaderKey = BlockHeaderKey BlockHash deriving (Eq, Show)
instance Serialize BlockHeaderKey where
get = do
guard . (== 0x90) =<< getWord8
BlockHeaderKey <$> get
put (BlockHeaderKey bh) = do
putWord8 0x90
put bh
data BestBlockKey = BestBlockKey deriving (Eq, Show)
instance KeyValue BlockHeaderKey BlockNode
instance KeyValue BestBlockKey BlockNode
instance Serialize BestBlockKey where
get = do
guard . (== 0x91) =<< getWord8
return BestBlockKey
put BestBlockKey = putWord8 0x91
type MonadChainLogic a p m
= (BlockHeaders m, MonadReader (ChainReader a p) m)
data ChainReader a p = ChainReader
{ myReader :: !a
, myChainDB :: !DB
, chainState :: !(TVar (ChainState p))
}
instance (Monad m, MonadIO m, MonadReader (ChainReader a p) m) =>
BlockHeaders m where
addBlockHeader bn = do
db <- asks myChainDB
R.insert db (BlockHeaderKey (headerHash (nodeHeader bn))) bn
getBlockHeader bh = do
db <- asks myChainDB
retrieve db def (BlockHeaderKey bh)
getBestBlockHeader = do
db <- asks myChainDB
retrieve db def BestBlockKey >>= \case
Nothing -> error "Could not get best block from database"
Just b -> return b
setBestBlockHeader bn = do
db <- asks myChainDB
R.insert db BestBlockKey bn
addBlockHeaders bns = do
db <- asks myChainDB
writeBatch db (map f bns)
where
f bn = insertOp (BlockHeaderKey (headerHash (nodeHeader bn))) bn
initChainDB :: (MonadChainLogic a p m, MonadUnliftIO m) => Network -> m ()
initChainDB net = do
db <- asks myChainDB
ver <- retrieve db def ChainDataVersionKey
when (ver /= Just dataVersion) purgeChainDB
R.insert db ChainDataVersionKey dataVersion
retrieve db def BestBlockKey >>= \b ->
when (isNothing (b :: Maybe BlockNode)) $ do
addBlockHeader (genesisNode net)
setBestBlockHeader (genesisNode net)
purgeChainDB :: (MonadChainLogic a p m, MonadUnliftIO m) => m ()
purgeChainDB = do
db <- asks myChainDB
runResourceT . R.withIterator db def $ \it -> do
R.iterSeek it $ B.singleton 0x90
recurse_delete it db
where
recurse_delete it db =
R.iterKey it >>= \case
Nothing -> return ()
Just k
| B.head k == 0x90 || B.head k == 0x91 -> do
R.delete db def k
R.iterNext it
recurse_delete it db
| otherwise -> return ()
importHeaders ::
(MonadIO m, BlockHeaders m, MonadChainLogic a p m)
=> Network
-> Timestamp
-> [BlockHeader]
-> m (Either PeerException Bool)
importHeaders net now hs =
runExceptT $
lift (connectBlocks net now hs) >>= \case
Right _ -> do
case hs of
[] -> return ()
_ -> do
bb <- getBlockHeader (headerHash (last hs))
box <- asks chainState
atomically . modifyTVar box $ \s ->
s
{ chainSyncing =
(\x -> x {chainHighest = bb}) <$>
chainSyncing s
}
case length hs of
2000 -> return False
_ -> return True
Left _ -> throwError PeerSentBadHeaders
notifySynced :: (MonadIO m, MonadChainLogic a p m) => Timestamp -> m Bool
notifySynced now =
fmap isJust $
runMaybeT $ do
bb <- getBestBlockHeader
guard (now - blockTimestamp (nodeHeader bb) < 2 * 60 * 60)
st <- asks chainState
MaybeT . atomically . runMaybeT $ do
s <- lift $ readTVar st
guard . isNothing $ chainSyncing s
guard . null $ newPeers s
guard . not $ mySynced s
lift $ writeTVar st s {mySynced = True}
return ()
nextPeer :: (MonadIO m, MonadChainLogic a p m) => m (Maybe p)
nextPeer = listToMaybe . newPeers <$> (asks chainState >>= readTVarIO)
syncHeaders ::
(Eq p, MonadChainLogic a p m, MonadIO m)
=> Timestamp
-> BlockNode
-> p
-> m GetHeaders
syncHeaders now bb p = do
st <- asks chainState
atomically . modifyTVar st $ \s ->
s
{ chainSyncing =
Just
ChainSync
{ chainSyncPeer = p
, chainTimestamp = now
, chainHighest = Nothing
}
, newPeers = delete p (newPeers s)
}
loc <- blockLocator bb
return
GetHeaders
{ getHeadersVersion = myVersion
, getHeadersBL = loc
, getHeadersHashStop =
"0000000000000000000000000000000000000000000000000000000000000000"
}
setLastReceived :: (MonadChainLogic a p m, MonadIO m) => Timestamp -> m ()
setLastReceived now = do
st <- asks chainState
atomically . modifyTVar st $ \s ->
s {chainSyncing = (\p -> p {chainTimestamp = now}) <$> chainSyncing s}
addPeer :: (Eq p, MonadIO m, MonadChainLogic a p m) => p -> m ()
addPeer p = do
st <- asks chainState
atomically . modifyTVar st $ \s -> s {newPeers = nub (p : newPeers s)}
getSyncingPeer :: (MonadChainLogic a p m, MonadIO m) => m (Maybe p)
getSyncingPeer = fmap chainSyncPeer . chainSyncing <$> (readTVarIO =<< asks chainState)
isSynced :: (MonadChainLogic a p m, MonadIO m) => m Bool
isSynced = mySynced <$> (asks chainState >>= readTVarIO)
finishPeer :: (Eq p, MonadIO m, MonadChainLogic a p m) => p -> m ()
finishPeer p =
asks chainState >>= \st ->
atomically . modifyTVar st $ \s ->
s
{ newPeers = delete p (newPeers s)
, chainSyncing =
case chainSyncing s of
Just ChainSync { chainSyncPeer = p'
, chainTimestamp = _
, chainHighest = _
}
| p == p' -> Nothing
_ -> chainSyncing s
}
chainSyncingPeer :: (MonadChainLogic a p m, MonadIO m) => m (Maybe (ChainSync p))
chainSyncingPeer = chainSyncing <$> (readTVarIO =<< asks chainState)