{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE UndecidableInstances #-}
module Network.Haskoin.Node.Chain
( chain
) where
import Control.Monad
import Control.Monad.Logger
import Control.Monad.Reader
import Control.Monad.Trans.Maybe
import Data.String.Conversions
import Data.Time.Clock.POSIX
import Network.Haskoin.Block
import Network.Haskoin.Network
import Network.Haskoin.Node.Chain.Logic
import Network.Haskoin.Node.Common
import NQE
import System.Random
import UnliftIO
import UnliftIO.Concurrent
type MonadChain m
= (MonadLoggerIO m, MonadChainLogic ChainConfig Peer m)
chain ::
(MonadUnliftIO m, MonadLoggerIO m)
=> ChainConfig
-> Inbox ChainMessage
-> m ()
chain cfg inbox = do
st <-
newTVarIO
ChainState
{ chainSyncing = Nothing
, mySynced = False
, newPeers = []
}
let rd = ChainReader {myReader = cfg, myChainDB = db, chainState = st}
withSyncLoop ch $ run `runReaderT` rd
where
net = chainConfNetwork cfg
db = chainConfDB cfg
ch = inboxToMailbox inbox
run = do
$(logDebugS) "Chain" "Initializing..."
initChainDB net
getBestBlockHeader >>= chainEvent . ChainBestBlock
$(logInfoS) "Chain" "Initialization complete"
forever $ receive inbox >>= chainMessage
chainEvent :: MonadChain m => ChainEvent -> m ()
chainEvent e = do
l <- chainConfEvents <$> asks myReader
case e of
ChainBestBlock b ->
$(logInfoS) "Chain" $
"Best block header at height " <> cs (show (nodeHeight b))
ChainSynced b ->
$(logInfoS) "Chain" $
"Headers now synced at height " <> cs (show (nodeHeight b))
atomically $ l e
processHeaders ::
MonadChain m => Peer -> [BlockHeader] -> m ()
processHeaders p hs =
void . runMaybeT $ do
net <- chainConfNetwork <$> asks myReader
$(logDebugS) "Chain" $
"Importing " <> cs (show (length hs)) <> " headers"
now <- round <$> liftIO getPOSIXTime
importHeaders net now hs >>= \case
Left e -> do
$(logErrorS) "Chain" "Could not connect received headers"
e `killPeer` p
Right done -> do
setLastReceived now
best <- getBestBlockHeader
chainEvent $ ChainBestBlock best
if done
then do
$(logDebugS)
"Chain"
"Finished importing headers from peer"
MSendHeaders `sendMessage` p
finishPeer p
syncNewPeer
syncNotif
else syncPeer p
syncNewPeer :: MonadChain m => m ()
syncNewPeer = do
$(logDebugS) "Chain" "Attempting to sync against a new peer"
getSyncingPeer >>= \case
Nothing -> do
$(logDebugS) "Chain" "Getting next peer to sync from"
nextPeer >>= \case
Nothing ->
$(logInfoS) "Chain" "Finished syncing against all peers"
Just p -> syncPeer p
Just _ -> $(logDebugS) "Chain" "Already syncing against a peer"
syncNotif :: MonadChain m => m ()
syncNotif =
round <$> liftIO getPOSIXTime >>= notifySynced >>= \x ->
when x $ getBestBlockHeader >>= chainEvent . ChainSynced
syncPeer :: MonadChain m => Peer -> m ()
syncPeer p = do
$(logInfoS) "Chain" "Syncing against selected peer"
bb <- chainSyncingPeer >>= \case
Just ChainSync {chainSyncPeer = p', chainHighest = Just g}
| p == p' -> return g
_ -> getBestBlockHeader
now <- round <$> liftIO getPOSIXTime
gh <- syncHeaders now bb p
MGetHeaders gh `sendMessage` p
chainMessage :: MonadChain m => ChainMessage -> m ()
chainMessage (ChainGetBest reply) =
getBestBlockHeader >>= atomically . reply
chainMessage (ChainHeaders p hs) = do
$(logDebugS) "Chain" $ "Processing " <> cs (show (length hs)) <> " headers"
processHeaders p hs
chainMessage (ChainPeerConnected p a) = do
$(logDebugS) "Chain" $ "Adding new peer to sync queue: " <> cs (show a)
addPeer p
syncNewPeer
chainMessage (ChainPeerDisconnected p a) = do
$(logWarnS) "Chain" $ "Removing a peer from sync queue: " <> cs (show a)
finishPeer p
syncNewPeer
chainMessage (ChainGetAncestor h n reply) =
getAncestor h n >>= atomically . reply
chainMessage (ChainGetSplit r l reply) =
splitPoint r l >>= atomically . reply
chainMessage (ChainGetBlock h reply) =
getBlockHeader h >>= atomically . reply
chainMessage (ChainIsSynced reply) =
isSynced >>= atomically . reply
chainMessage ChainPing = do
ChainConfig {chainConfTimeout = to} <- asks myReader
now <- round <$> liftIO getPOSIXTime
chainSyncingPeer >>= \case
Nothing -> return ()
Just ChainSync {chainSyncPeer = p, chainTimestamp = t}
| now - t > fromIntegral to -> do
$(logErrorS) "Chain" "Syncing peer timed out"
PeerTimeout `killPeer` p
| otherwise -> return ()
withSyncLoop :: (MonadUnliftIO m, MonadLoggerIO m) => Chain -> m a -> m a
withSyncLoop ch f = withAsync go $ \a -> link a >> f
where
go =
forever $ do
threadDelay =<<
liftIO (randomRIO (250 * 1000, 1000 * 1000))
ChainPing `send` ch