Header sync validation (#4456)

* add header sync validation

* fix docs, minor fixes

* Refactor to us InvalidBlockHeader ChainException, also refactor recovery to private helper method

* changes from comments

Co-authored-by: Chris Stewart <stewart.chris1234@gmail.com>
This commit is contained in:
Shreyansh 2022-08-23 03:35:41 +05:30 committed by GitHub
parent 451b525be5
commit 2c2e03b279
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 625 additions and 142 deletions

View File

@ -112,14 +112,12 @@ val nodeF = for {
chainApi <- chainApiF
peer <- peerF
} yield {
//you can set this to only sync compact filters after the timestamp
val walletCreationTimeOpt = None
val dataMessageHandler = DataMessageHandler(chainApi, walletCreationTimeOpt)
NeutrinoNode(paramPeers = Vector(peer),
dataMessageHandler = dataMessageHandler,
nodeConfig = nodeConfig,
chainConfig = chainConfig,
actorSystem = system)
NeutrinoNode(chainApi = chainApi,
walletCreationTimeOpt = None, //you can set this to only sync compact filters after the timestamp
paramPeers = Vector(peer),
nodeConfig = nodeConfig,
chainConfig = chainConfig,
actorSystem = system)
}
//let's start it

View File

@ -13,7 +13,7 @@ import org.bitcoins.testkit.node.{
NodeTestWithCachedBitcoindPair,
NodeUnitTest
}
import org.bitcoins.testkit.util.TorUtil
import org.bitcoins.testkit.util.{AkkaUtil, TorUtil}
import org.scalatest.{Assertion, FutureOutcome, Outcome}
import scala.concurrent.Future
@ -108,7 +108,7 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
_ <- bothOurs
_ <- allConnected
_ <- allInitialized
_ = peers.map(peerManager.removePeer)
_ <- Future.sequence(peers.map(peerManager.removePeer))
_ <- allDisconn
} yield {
succeed
@ -264,6 +264,7 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
for {
_ <- NodeUnitTest.syncNeutrinoNode(node, bitcoind)
_ <- AkkaUtil.nonBlockingSleep(3.seconds)
_ <- bitcoind.generateToAddress(2, junkAddress)
_ <- NodeTestUtil.awaitAllSync(node, bitcoind)
} yield {

View File

@ -1,7 +1,7 @@
package org.bitcoins.node
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.core.p2p.GetHeadersMessage
import org.bitcoins.core.p2p.{GetHeadersMessage, HeadersMessage}
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.P2PClient.ExpectResponseCommand
import org.bitcoins.server.BitcoinSAppConfig
@ -20,7 +20,9 @@ import scala.concurrent.{Await, Future}
class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
lazy val bitcoindsF =
BitcoindRpcTestUtil.createNodePair().map(p => Vector(p._1, p._2))
BitcoindRpcTestUtil
.createUnconnectedNodePairWithBlocks()
.map(p => Vector(p._1, p._2))
lazy val bitcoinPeersF: Future[Vector[Peer]] = {
bitcoindsF.flatMap { bitcoinds =>
@ -85,6 +87,99 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
}
}
it must "have the best header chain post sync from all peers" in {
nodeConnectedWithBitcoinds =>
val node = nodeConnectedWithBitcoinds.node
val bitcoinds = nodeConnectedWithBitcoinds.bitcoinds
val peerManager = node.peerManager
def peers = peerManager.peers
for {
_ <- AsyncUtil.retryUntilSatisfied(peers.size == 2)
_ <- bitcoinds(1).generateToAddress(1, junkAddress)
h1 <- bitcoinds(0).getBestHashBlockHeight()
h2 <- bitcoinds(1).getBestHashBlockHeight()
//out of sync by 1 block, h2 ahead
_ = assert(h2 - h1 == 1)
_ <- node.sync()
_ <- NodeTestUtil.awaitSync(node, bitcoinds(1))
} yield {
succeed
}
}
//note: now bitcoinds(1) is ahead by 1 block compared to bitcoinds(0)
it must "re-query in case invalid headers are sent" in {
nodeConnectedWithBitcoinds =>
val node = nodeConnectedWithBitcoinds.node
val bitcoinds = nodeConnectedWithBitcoinds.bitcoinds
for {
_ <- AsyncUtil.retryUntilSatisfied(node.peerManager.peers.size == 2)
peers <- bitcoinPeersF
peer = peers.head
_ = node.updateDataMessageHandler(
node.getDataMessageHandler.copy(syncPeer = Some(peer))(
executionContext,
node.nodeConfig,
node.chainConfig))
invalidHeader = node.chainAppConfig.chain.genesisBlock.blockHeader
invalidHeaderMessage = HeadersMessage(headers = Vector(invalidHeader))
sender <- node.peerManager.peerData(peer).peerMessageSender
_ <- node.getDataMessageHandler.addToStream(invalidHeaderMessage,
sender,
peer)
bestChain = bitcoinds(1)
_ <- NodeTestUtil.awaitSync(node, bestChain)
} yield {
succeed
}
}
it must "must disconnect a peer that keeps sending invalid headers" in {
nodeConnectedWithBitcoinds =>
val node = nodeConnectedWithBitcoinds.node
val peerManager = node.peerManager
def sendInvalidHeaders(peer: Peer): Future[Unit] = {
val invalidHeader = node.chainAppConfig.chain.genesisBlock.blockHeader
val invalidHeaderMessage =
HeadersMessage(headers = Vector(invalidHeader))
val senderF = node.peerManager.peerData(peer).peerMessageSender
for {
sender <- senderF
sendFs = 1
.to(node.nodeConfig.maxInvalidResponsesAllowed + 1)
.map(_ =>
node.getDataMessageHandler.addToStream(invalidHeaderMessage,
sender,
peer))
_ <- Future.sequence(sendFs)
} yield ()
}
for {
_ <- AsyncUtil.retryUntilSatisfied(peerManager.peers.size == 2)
peers <- bitcoinPeersF
peer = peers(0)
_ <- node.peerManager.isConnected(peer).map(assert(_))
_ = node.updateDataMessageHandler(
node.getDataMessageHandler.copy(syncPeer = Some(peer))(
executionContext,
node.nodeConfig,
node.chainConfig))
_ <- sendInvalidHeaders(peer)
_ <- AsyncUtil.retryUntilSatisfied(
!node.peerManager.peers.contains(peer))
} yield {
succeed
}
}
override def afterAll(): Unit = {
val stopF = for {
bitcoinds <- bitcoindsF

View File

@ -9,6 +9,7 @@ import org.bitcoins.core.protocol.blockchain.{Block, BlockHeader}
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.crypto.DoubleSha256Digest
import org.bitcoins.node._
import org.bitcoins.node.networking.peer.DataMessageHandlerState.HeaderSync
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.node.NodeUnitTest
@ -39,6 +40,8 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
chainApi <- node.chainApiFromDb()
dataMessageHandler = DataMessageHandler(chainApi,
None,
node,
HeaderSync,
syncPeer = Some(peer))(
node.executionContext,
node.nodeAppConfig,
@ -83,10 +86,13 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
_ = node.nodeAppConfig.addCallbacks(nodeCallbacks)
dataMessageHandler =
DataMessageHandler(genesisChainApi, None, syncPeer = Some(peer))(
node.executionContext,
node.nodeAppConfig,
node.chainConfig)
DataMessageHandler(genesisChainApi,
None,
node,
HeaderSync,
syncPeer = Some(peer))(node.executionContext,
node.nodeAppConfig,
node.chainConfig)
sender <- senderF
_ <- dataMessageHandler.handleDataPayload(payload, sender, peer)
result <- resultP.future
@ -120,10 +126,13 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
_ = node.nodeAppConfig.addCallbacks(callbacks)
dataMessageHandler =
DataMessageHandler(genesisChainApi, None, syncPeer = Some(peer))(
node.executionContext,
node.nodeAppConfig,
node.chainConfig)
DataMessageHandler(genesisChainApi,
None,
node,
HeaderSync,
syncPeer = Some(peer))(node.executionContext,
node.nodeAppConfig,
node.chainConfig)
sender <- senderF
_ <- dataMessageHandler.handleDataPayload(payload, sender, peer)
result <- resultP.future
@ -155,10 +164,13 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
nodeCallbacks = NodeCallbacks.onCompactFilterReceived(callback)
_ = node.nodeAppConfig.addCallbacks(nodeCallbacks)
dataMessageHandler =
DataMessageHandler(genesisChainApi, None, syncPeer = Some(peer))(
node.executionContext,
node.nodeAppConfig,
node.chainConfig)
DataMessageHandler(genesisChainApi,
None,
node,
HeaderSync,
syncPeer = Some(peer))(node.executionContext,
node.nodeAppConfig,
node.chainConfig)
sender <- senderF
_ <- dataMessageHandler.handleDataPayload(payload, sender, peer)
result <- resultP.future
@ -191,10 +203,13 @@ class DataMessageHandlerTest extends NodeUnitTest with CachedTor {
_ = node.nodeAppConfig.addCallbacks(nodeCallbacks)
dataMessageHandler =
DataMessageHandler(genesisChainApi, None, syncPeer = Some(peer))(
node.executionContext,
node.nodeAppConfig,
node.chainConfig)
DataMessageHandler(genesisChainApi,
None,
node,
HeaderSync,
syncPeer = Some(peer))(node.executionContext,
node.nodeAppConfig,
node.chainConfig)
sender <- senderF
_ <- dataMessageHandler.handleDataPayload(payload, sender, peer)
result <- resultP.future

View File

@ -15,15 +15,18 @@ import org.bitcoins.core.p2p.ServiceIdentifier
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.DataMessageHandlerState.HeaderSync
import org.bitcoins.node.networking.peer.{
ControlMessageHandler,
DataMessageHandler
}
import java.time.Instant
import scala.concurrent.Future
case class NeutrinoNode(
private var dataMessageHandler: DataMessageHandler,
chainApi: ChainApi,
walletCreationTimeOpt: Option[Instant],
nodeConfig: NodeAppConfig,
chainConfig: ChainAppConfig,
actorSystem: ActorSystem,
@ -41,6 +44,9 @@ case class NeutrinoNode(
val controlMessageHandler: ControlMessageHandler = ControlMessageHandler(this)
private var dataMessageHandler: DataMessageHandler =
DataMessageHandler(chainApi, walletCreationTimeOpt, this, HeaderSync)
override def getDataMessageHandler: DataMessageHandler = dataMessageHandler
override def updateDataMessageHandler(

View File

@ -51,6 +51,12 @@ case class PeerData(
_serviceIdentifier = Some(serviceIdentifier)
}
private var _invalidMessagesCount: Int = 0
def updateInvalidMessageCount(): Unit = {
_invalidMessagesCount += 1
}
private var lastTimedOut: Long = 0
def updateLastFailureTime(): Unit = {
@ -63,4 +69,8 @@ case class PeerData(
val timePast = System.currentTimeMillis() - lastTimedOut
timePast < 30.minutes.toMillis
}
def exceededMaxInvalidMessages: Boolean = {
_invalidMessagesCount > nodeAppConfig.maxInvalidResponsesAllowed
}
}

View File

@ -1,18 +1,15 @@
package org.bitcoins.node
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Sink, Source, SourceQueueWithComplete}
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.core.api.node.NodeType
import org.bitcoins.core.p2p.{
AddrV2Message,
ExpectsResponse,
ServiceIdentifier,
VersionMessage
}
import org.bitcoins.core.p2p._
import org.bitcoins.core.util.{NetworkUtil, StartStopAsync}
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.{Peer, PeerDAO, PeerDb}
import org.bitcoins.node.networking.peer.PeerMessageSender
import org.bitcoins.node.networking.peer._
import org.bitcoins.node.networking.{P2PClient, P2PClientSupervisor}
import org.bitcoins.node.util.BitcoinSNodeUtil
import scodec.bits.ByteVector
@ -149,8 +146,12 @@ case class PeerManager(
logger.debug(s"Replacing $replacePeer with $withPeer")
assert(!peerData(replacePeer).serviceIdentifier.nodeCompactFilters,
s"$replacePeer has cf")
removePeer(replacePeer)
addPeer(withPeer)
for {
_ <- removePeer(replacePeer)
_ <- addPeer(withPeer)
} yield {
()
}
}
def removePeer(peer: Peer): Future[Unit] = {
@ -192,7 +193,7 @@ case class PeerManager(
interval = 1.seconds,
maxTries = 30)
for {
val stopF = for {
_ <- removeF
_ <- finderStopF
_ <- managerStopF
@ -201,6 +202,14 @@ case class PeerManager(
s"Stopped PeerManager. Took ${System.currentTimeMillis() - beganAt} ms ")
this
}
stopF.failed.foreach { e =>
logger.error(
s"Failed to stop peer manager. Peers: $peers, waiting for deletion: $waitingForDeletion",
e)
}
stopF
}
def isConnected(peer: Peer): Future[Boolean] = {
@ -309,7 +318,7 @@ case class PeerManager(
_peerData.remove(peer)
val syncPeer = node.getDataMessageHandler.syncPeer
if (syncPeer.isDefined && syncPeer.get == peer)
syncFromNewPeer()
syncFromNewPeer().map(_ => ())
else Future.unit
} else if (waitingForDeletion.contains(peer)) {
//a peer we wanted to disconnect has remove has stopped the client actor, finally mark this as deleted
@ -337,12 +346,19 @@ case class PeerManager(
def onQueryTimeout(payload: ExpectsResponse, peer: Peer): Future[Unit] = {
logger.debug(s"Query timeout out for $peer")
//if we are removing this peer and an existing query timed out because of that
// peerData will not have this peer
if (peerData.contains(peer)) {
peerData(peer).updateLastFailureTime()
}
payload match {
case _ => //if any times out, try a new peer
peerData(peer).updateLastFailureTime()
val syncPeer = node.getDataMessageHandler.syncPeer
if (syncPeer.isDefined && syncPeer.get == peer)
syncFromNewPeer()
case _: GetHeadersMessage =>
dataMessageStream.offer(HeaderTimeoutWrapper(peer)).map(_ => ())
case _ =>
if (peer == node.getDataMessageHandler.syncPeer.get)
syncFromNewPeer().map(_ => ())
else Future.unit
}
}
@ -352,10 +368,41 @@ case class PeerManager(
Future.unit
}
def syncFromNewPeer(): Future[Unit] = {
def syncFromNewPeer(): Future[DataMessageHandler] = {
logger.debug(s"Trying to sync from new peer")
val newNode =
node.updateDataMessageHandler(node.getDataMessageHandler.reset)
newNode.sync()
newNode.sync().map(_ => node.getDataMessageHandler)
}
private val dataMessageStreamSource = Source
.queue[StreamDataMessageWrapper](1000,
overflowStrategy = OverflowStrategy.fail)
.mapAsync(1) {
case msg @ DataMessageWrapper(payload, peerMsgSender, peer) =>
logger.debug(s"Got ${payload.commandName} from ${peer} in stream")
node.getDataMessageHandler
.handleDataPayload(payload, peerMsgSender, peer)
.map { newDmh =>
node.updateDataMessageHandler(newDmh)
msg
}
case msg @ HeaderTimeoutWrapper(peer) =>
logger.debug(s"Processing timeout header for $peer")
node.getDataMessageHandler.onHeaderRequestTimeout(peer).map { newDmh =>
node.updateDataMessageHandler(newDmh)
logger.debug(s"Done processing timeout header for $peer")
msg
}
}
private val dataMessageStreamSink =
Sink.foreach[StreamDataMessageWrapper] {
case DataMessageWrapper(payload, _, peer) =>
logger.debug(s"Done processing ${payload.commandName} in ${peer}")
case HeaderTimeoutWrapper(_) =>
}
val dataMessageStream: SourceQueueWithComplete[StreamDataMessageWrapper] =
dataMessageStreamSource.to(dataMessageStreamSink).run()
}

View File

@ -19,7 +19,6 @@ import org.bitcoins.node._
import org.bitcoins.node.callback.NodeCallbackStreamManager
import org.bitcoins.node.db.NodeDbManagement
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.DataMessageHandler
import org.bitcoins.rpc.config.BitcoindRpcAppConfig
import org.bitcoins.rpc.util.AppConfigFactoryActorSystem
import org.bitcoins.tor.config.TorAppConfig
@ -198,6 +197,13 @@ case class NodeAppConfig(baseDatadir: Path, configOverrides: Vector[Config])(
} else 15.seconds
}
/** maximum consecutive number of invalid responses allowed from the same peer */
lazy val maxInvalidResponsesAllowed: Int = {
if (config.hasPath("bitcoin-s.node.max-invalid-response-count")) {
config.getInt("bitcoin-s.node.max-invalid-response-count")
} else 10
}
/** Creates either a neutrino node or a spv node based on the [[NodeAppConfig]] given */
def createNode(
peers: Vector[Peer] = Vector.empty[Peer],
@ -234,14 +240,18 @@ object NodeAppConfig extends AppConfigFactoryActorSystem[NodeAppConfig] {
val filterDAO = CompactFilterDAO()
val stateDAO = ChainStateDescriptorDAO()
val dmhF = ChainHandlerCached
val chainF = ChainHandlerCached
.fromDatabase(blockHeaderDAO, filterHeaderDAO, filterDAO, stateDAO)
.map(handler => DataMessageHandler(handler, walletCreationTimeOpt))
nodeConf.nodeType match {
case NodeType.NeutrinoNode =>
dmhF.map(dmh =>
NeutrinoNode(dmh, nodeConf, chainConf, system, paramPeers = peers))
chainF.map(chain =>
NeutrinoNode(chain,
walletCreationTimeOpt,
nodeConf,
chainConf,
system,
paramPeers = peers))
case NodeType.FullNode =>
Future.failed(new RuntimeException("Not implemented"))
case NodeType.BitcoindBackend =>

View File

@ -22,12 +22,7 @@ import org.bitcoins.node.networking.P2PClient.{
NodeCommand
}
import org.bitcoins.node.networking.peer.PeerMessageReceiver.NetworkMessageReceived
import org.bitcoins.node.networking.peer.PeerMessageReceiverState.{
Disconnected,
Initializing,
Normal,
Waiting
}
import org.bitcoins.node.networking.peer.PeerMessageReceiverState._
import org.bitcoins.node.networking.peer.{
PeerMessageReceiver,
PeerMessageReceiverState
@ -116,9 +111,10 @@ case class P2PClientActor(
unalignedBytes: ByteVector): Receive =
LoggingReceive {
case message: NetworkMessage =>
message match {
message.payload match {
case _: ExpectsResponse =>
logger.debug(s"${message.payload.commandName} expects response")
logger.debug(
s"${message.payload.commandName} expects response from $peer")
Await.result(handleExpectResponse(message.payload), timeout)
case _ =>
}
@ -150,8 +146,9 @@ case class P2PClientActor(
private def ignoreNetworkMessages(
peerConnectionOpt: Option[ActorRef],
unalignedBytes: ByteVector): Receive = LoggingReceive {
case _ @(_: NetworkMessage | _: NetworkPayload |
case msg @ (_: NetworkMessage | _: NetworkPayload |
_: ExpectResponseCommand) =>
logger.debug(s"Ignoring $msg for $peer as disconnecting.")
case message: Tcp.Event if peerConnectionOpt.isDefined =>
val newUnalignedBytes =
handleEvent(message, peerConnectionOpt.get, unalignedBytes)
@ -191,6 +188,10 @@ case class P2PClientActor(
case P2PClient.CloseAnyStateCommand =>
handleNodeCommand(cmd = P2PClient.CloseAnyStateCommand,
peerConnectionOpt = None)
case msg: NetworkMessage =>
logger.debug(s"$peer got ${msg.payload.commandName} while reconnecting.")
if (msg.payload.isInstanceOf[ExpectsResponse])
Await.result(handleExpectResponse(msg.payload), timeout)
case ExpectResponseCommand(msg) =>
Await.result(handleExpectResponse(msg), timeout)
case metaMsg: P2PClient.MetaMsg =>
@ -202,6 +203,9 @@ case class P2PClientActor(
case P2PClient.CloseAnyStateCommand =>
handleNodeCommand(cmd = P2PClient.CloseAnyStateCommand,
peerConnectionOpt = None)
case msg: NetworkMessage =>
if (msg.payload.isInstanceOf[ExpectsResponse])
Await.result(handleExpectResponse(msg.payload), timeout)
case ExpectResponseCommand(msg) =>
Await.result(handleExpectResponse(msg), timeout)
case Tcp.CommandFailed(c: Tcp.Connect) =>
@ -246,6 +250,14 @@ case class P2PClientActor(
proxy: ActorRef,
remoteAddress: InetSocketAddress,
proxyAddress: InetSocketAddress): Receive = LoggingReceive {
case P2PClient.CloseAnyStateCommand =>
handleNodeCommand(cmd = P2PClient.CloseAnyStateCommand,
peerConnectionOpt = None)
case msg: NetworkMessage =>
if (msg.payload.isInstanceOf[ExpectsResponse])
Await.result(handleExpectResponse(msg.payload), timeout)
case ExpectResponseCommand(msg) =>
Await.result(handleExpectResponse(msg), timeout)
case Tcp.CommandFailed(_: Socks5Connect) =>
logger.debug(
s"connection failed to ${remoteAddress} via SOCKS5 ${proxyAddress}")
@ -421,7 +433,15 @@ case class P2PClientActor(
logger.trace(s"Processing message=${m}")
val msg = NetworkMessageReceived(m, P2PClient(self, peer))
if (peerMsgRecv.isConnected) {
peerMsgRecv.handleNetworkMessageReceived(msg)
currentPeerMsgHandlerRecv.state match {
case _ @(_: Normal | _: Waiting | Preconnection |
_: Initializing) =>
peerMsgRecv.handleNetworkMessageReceived(msg)
case _: PeerMessageReceiverState =>
logger.debug(
s"Ignoring ${msg.msg.payload.commandName} from $peer as in state=${currentPeerMsgHandlerRecv.state}")
Future.successful(peerMsgRecv)
}
} else {
Future.successful(peerMsgRecv)
}
@ -519,7 +539,7 @@ case class P2PClientActor(
require(
msg.isInstanceOf[ExpectsResponse],
s"Tried to wait for response to message which is not a query, got=$msg")
logger.info(s"Expecting response for ${msg.commandName} for $peer")
logger.debug(s"Expecting response for ${msg.commandName} for $peer")
currentPeerMsgHandlerRecv.handleExpectResponse(msg).map { newReceiver =>
currentPeerMsgHandlerRecv = newReceiver
}

View File

@ -1,16 +1,19 @@
package org.bitcoins.node.networking.peer
import akka.Done
import org.bitcoins.chain.blockchain.InvalidBlockHeader
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.chain.models.BlockHeaderDAO
import org.bitcoins.core.api.chain.ChainApi
import org.bitcoins.core.api.node.NodeType
import org.bitcoins.core.gcs.BlockFilter
import org.bitcoins.core.p2p._
import org.bitcoins.core.protocol.CompactSizeUInt
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.node.P2PLogger
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models._
import org.bitcoins.node.networking.peer.DataMessageHandlerState._
import org.bitcoins.node.{Node, P2PLogger, PeerManager}
import java.time.Instant
import scala.concurrent.{ExecutionContext, Future, Promise}
@ -27,6 +30,8 @@ import scala.util.control.NonFatal
case class DataMessageHandler(
chainApi: ChainApi,
walletCreationTimeOpt: Option[Instant],
node: Node,
state: DataMessageHandlerState,
initialSyncDone: Option[Promise[Done]] = None,
currentFilterBatch: Vector[CompactFilterMessage] = Vector.empty,
filterHeaderHeightOpt: Option[Int] = None,
@ -38,8 +43,8 @@ case class DataMessageHandler(
chainConfig: ChainAppConfig)
extends P2PLogger {
require(appConfig.nodeType != NodeType.BitcoindBackend,
"Bitcoind should handle the P2P interactions")
require(appConfig.nodeType == NodeType.NeutrinoNode,
"DataMessageHandler is meant to be used with NeutrinoNode")
private val txDAO = BroadcastAbleTransactionDAO()
@ -48,7 +53,18 @@ case class DataMessageHandler(
filterHeaderHeightOpt = None,
filterHeightOpt = None,
syncPeer = None,
syncing = false)
syncing = false,
state = HeaderSync)
def manager: PeerManager = node.peerManager
def addToStream(
payload: DataPayload,
peerMsgSender: PeerMessageSender,
peer: Peer): Future[Unit] = {
val msg = DataMessageWrapper(payload, peerMsgSender, peer)
manager.dataMessageStream.offer(msg).map(_ => ())
}
def handleDataPayload(
payload: DataPayload,
@ -212,72 +228,180 @@ case class DataMessageHandler(
s"Received headers message with ${count.toInt} headers from $peer")
logger.trace(
s"Received headers=${headers.map(_.hashBE.hex).mkString("[", ",", "]")}")
val chainApiF = for {
val chainApiHeaderProcessF: Future[DataMessageHandler] = for {
newChainApi <- chainApi.setSyncing(count.toInt > 0)
processed <- newChainApi.processHeaders(headers)
} yield {
processed
copy(chainApi = processed)
}
val getHeadersF = chainApiF
.flatMap { newApi =>
if (headers.nonEmpty) {
val recoveredDMHF: Future[DataMessageHandler] =
chainApiHeaderProcessF.recoverWith {
case _: InvalidBlockHeader =>
logger.debug(
s"Invalid headers of count $count sent from ${syncPeer.get} in state $state")
recoverInvalidHeader(peer, peerMsgSender)
case throwable: Throwable => throw throwable
}
val lastHeader = headers.last
val lastHash = lastHeader.hash
newApi.getBlockCount().map { count =>
logger.trace(
s"Processed headers, most recent has height=$count and hash=$lastHash.")
}
val getHeadersF: Future[DataMessageHandler] =
recoveredDMHF
.flatMap { newDmh =>
val newApi = newDmh.chainApi
if (headers.nonEmpty) {
val lastHeader = headers.last
val lastHash = lastHeader.hash
newApi.getBlockCount().map { count =>
logger.trace(
s"Processed headers, most recent has height=$count and hash=$lastHash.")
}
if (count.toInt == HeadersMessage.MaxHeadersCount) {
state match {
case HeaderSync =>
logger.info(
s"Received maximum amount of headers in one header message. This means we are not synced, requesting more")
//ask for headers more from the same peer
peerMsgSender
.sendGetHeadersMessage(lastHash)
.map(_ => newDmh)
case ValidatingHeaders(inSyncWith, _, _) =>
//In the validation stage, some peer sent max amount of valid headers, revert to HeaderSync with that peer as syncPeer
//disconnect the ones that we have already checked since they are at least out of sync by 2000 headers
val removeFs = inSyncWith.map(p => manager.removePeer(p))
val newSyncPeer = Some(peer)
//ask for more headers now
val askF = peerMsgSender
.sendGetHeadersMessage(lastHash)
.map(_ => syncing)
for {
_ <- Future.sequence(removeFs)
newSyncing <- askF
} yield newDmh.copy(syncing = newSyncing,
state = HeaderSync,
syncPeer = newSyncPeer)
case _: DataMessageHandlerState =>
Future.successful(newDmh)
}
if (count.toInt == HeadersMessage.MaxHeadersCount) {
logger.info(
s"Received maximum amount of headers in one header message. This means we are not synced, requesting more")
//the same one should sync headers
//expect a message with a timeout now
peerMsgSender
.sendGetHeadersMessage(lastHash)
.map(_ => syncing)
} else {
logger.debug(
List(s"Received headers=${count.toInt} in one message,",
"which is less than max. This means we are synced,",
"not requesting more.")
.mkString(" "))
// If we are in neutrino mode, we might need to start fetching filters and their headers
// if we are syncing we should do this, however, sometimes syncing isn't a good enough check,
// so we also check if our cached filter heights have been set as well, if they haven't then
// we probably need to sync filters
if (
appConfig.nodeType == NodeType.NeutrinoNode && (!syncing ||
(filterHeaderHeightOpt.isEmpty &&
filterHeightOpt.isEmpty))
) {
logger.info(
s"Starting to fetch filter headers in data message handler")
sendFirstGetCompactFilterHeadersCommand(peerMsgSender)
} else {
Try(initialSyncDone.map(_.success(Done)))
Future.successful(syncing)
logger.debug(
List(s"Received headers=${count.toInt} in one message,",
"which is less than max. This means we are synced,",
"not requesting more.")
.mkString(" "))
// If we are in neutrino mode, we might need to start fetching filters and their headers
// if we are syncing we should do this, however, sometimes syncing isn't a good enough check,
// so we also check if our cached filter heights have been set as well, if they haven't then
// we probably need to sync filters
state match {
case HeaderSync =>
// headers are synced now with the current sync peer, now move to validating it for all peers
assert(syncPeer.get == peer)
if (manager.peers.size > 1) {
val newState =
ValidatingHeaders(inSyncWith = Set(peer),
verifyingWith = manager.peers.toSet,
failedCheck = Set.empty[Peer])
logger.info(
s"Starting to validate headers now. Verifying with ${newState.verifyingWith}")
val getHeadersAllF = manager.peerData
.filter(_._1 != peer)
.map(
_._2.peerMessageSender.flatMap(
_.sendGetHeadersMessage(lastHash))
)
Future
.sequence(getHeadersAllF)
.map(_ => newDmh.copy(state = newState))
} else {
//if just one peer then can proceed ahead directly
fetchCompactFilters(newDmh).map(
_.copy(state = PostHeaderSync))
}
case headerState @ ValidatingHeaders(inSyncWith, _, _) =>
//add the current peer to it
val newHeaderState =
headerState.copy(inSyncWith = inSyncWith + peer)
val newDmh2 = newDmh.copy(state = newHeaderState)
if (newHeaderState.validated) {
// If we are in neutrino mode, we might need to start fetching filters and their headers
// if we are syncing we should do this, however, sometimes syncing isn't a good enough check,
// so we also check if our cached filter heights have been set as well, if they haven't then
// we probably need to sync filters
fetchCompactFilters(newDmh2).map(
_.copy(state = PostHeaderSync))
} else {
//do nothing, we are still waiting for some peers to send headers or timeout
Future.successful(newDmh2)
}
case PostHeaderSync =>
//send further requests to the same one that sent this
if (
!syncing ||
(filterHeaderHeightOpt.isEmpty &&
filterHeightOpt.isEmpty)
) {
logger.info(
s"Starting to fetch filter headers in data message handler")
val newSyncingF =
sendFirstGetCompactFilterHeadersCommand(peerMsgSender)
newSyncingF.map(newSyncing =>
newDmh.copy(syncing = newSyncing))
} else {
Try(initialSyncDone.map(_.success(Done)))
Future.successful(newDmh)
}
}
}
} else {
//what if we are synced exactly by the 2000th header
state match {
case headerState @ ValidatingHeaders(inSyncWith, _, _) =>
val newHeaderState =
headerState.copy(inSyncWith = inSyncWith + peer)
val newDmh2 = newDmh.copy(state = newHeaderState)
if (newHeaderState.validated) {
fetchCompactFilters(newDmh2).map(
_.copy(state = PostHeaderSync))
} else {
//do nothing, we are still waiting for some peers to send headers
Future.successful(newDmh2)
}
case _: DataMessageHandlerState =>
Future.successful(newDmh)
}
}
} else {
Try(initialSyncDone.map(_.success(Done)))
Future.successful(syncing)
}
}
getHeadersF.failed.map { err =>
logger.error(s"Error when processing headers message", err)
}
for {
newApi <- chainApiF
newSyncing <- getHeadersF
_ <- chainApiHeaderProcessF
newDmh <- getHeadersF
_ <- appConfig.callBacks.executeOnBlockHeadersReceivedCallbacks(
headers)
} yield {
this.copy(chainApi = newApi, syncing = newSyncing)
newDmh
}
case msg: BlockMessage =>
val block = msg.block
@ -293,13 +417,18 @@ case class DataMessageHandler(
val headersMessage =
HeadersMessage(CompactSizeUInt.one, Vector(block.blockHeader))
for {
newMsgHandler <- handleDataPayload(headersMessage,
peerMsgSender,
peer)
_ <-
appConfig.callBacks
.executeOnBlockHeadersReceivedCallbacks(
Vector(block.blockHeader))
newMsgHandler <- {
// if in IBD, do not process this header, just execute callbacks
if (
initialSyncDone.isDefined && initialSyncDone.get.isCompleted
) handleDataPayload(headersMessage, peerMsgSender, peer)
else {
appConfig.callBacks
.executeOnBlockHeadersReceivedCallbacks(
Vector(block.blockHeader))
.map(_ => this)
}
}
} yield newMsgHandler
} else Future.successful(this)
}
@ -325,11 +454,20 @@ case class DataMessageHandler(
handleInventoryMsg(invMsg = invMsg, peerMsgSender = peerMsgSender)
}
if (syncPeer.isEmpty || peer != syncPeer.get) {
if (state.isInstanceOf[ValidatingHeaders]) {
//process messages from all peers
resultF.failed.foreach { err =>
logger.error(s"Failed to handle data payload=${payload} from $peer",
err)
}
resultF.recoverWith { case NonFatal(_) =>
Future.successful(this)
}
} else if (syncPeer.isEmpty || peer != syncPeer.get) {
//in other states, process messages only from syncPeer
logger.debug(s"Ignoring ${payload.commandName} from $peer")
Future.successful(this)
} else {
resultF.failed.foreach { err =>
logger.error(s"Failed to handle data payload=${payload} from $peer",
err)
@ -369,6 +507,98 @@ case class DataMessageHandler(
} yield syncing
}
/** Recover the data message handler if we received an invalid block header from a peer */
private def recoverInvalidHeader(
peer: Peer,
peerMsgSender: PeerMessageSender): Future[DataMessageHandler] = {
state match {
case HeaderSync =>
manager.peerData(peer).updateInvalidMessageCount()
if (
manager
.peerData(peer)
.exceededMaxInvalidMessages && manager.peers.size > 1
) {
logger.info(
s"$peer exceeded max limit of invalid messages. Disconnecting.")
for {
_ <- manager.removePeer(peer)
newDmh <- manager.syncFromNewPeer()
} yield newDmh.copy(state = HeaderSync)
} else {
logger.info(s"Re-querying headers from $peer.")
for {
blockchains <- BlockHeaderDAO().getBlockchains()
cachedHeaders = blockchains
.flatMap(_.headers)
.map(_.hashBE.flip)
_ <- peerMsgSender.sendGetHeadersMessage(cachedHeaders)
} yield this
}
case headerState @ ValidatingHeaders(_, failedCheck, _) =>
//if a peer sends invalid data then mark it as failed, dont disconnect
logger.debug(
s"Got invalid headers from $peer while validating. Marking as failed.")
val newHeaderState =
headerState.copy(failedCheck = failedCheck + peer)
val newDmh = copy(state = newHeaderState)
if (newHeaderState.validated) {
logger.info(
s"Done validating headers, inSyncWith=${newHeaderState.inSyncWith}, failedCheck=${newHeaderState.failedCheck}")
fetchCompactFilters(newDmh).map(_.copy(state = PostHeaderSync))
} else {
Future.successful(newDmh)
}
case _: DataMessageHandlerState =>
Future.successful(this)
}
}
private def fetchCompactFilters(
currentDmh: DataMessageHandler): Future[DataMessageHandler] = {
if (
!syncing ||
(filterHeaderHeightOpt.isEmpty &&
filterHeightOpt.isEmpty)
) {
logger.info(s"Starting to fetch filter headers in data message handler.")
for {
peer <- manager.randomPeerWithService(
ServiceIdentifier.NODE_COMPACT_FILTERS)
newDmh = currentDmh.copy(syncPeer = Some(peer))
_ = logger.info(s"Now syncing filters from $peer")
sender <- manager.peerData(peer).peerMessageSender
newSyncing <- sendFirstGetCompactFilterHeadersCommand(sender)
} yield newDmh.copy(syncing = newSyncing)
} else {
Try(initialSyncDone.map(_.success(Done)))
Future.successful(this)
}
}
def onHeaderRequestTimeout(peer: Peer): Future[DataMessageHandler] = {
logger.info(s"Header request timed out from $peer in state $state")
state match {
case HeaderSync =>
manager.syncFromNewPeer()
case headerState @ ValidatingHeaders(_, failedCheck, _) =>
val newHeaderState = headerState.copy(failedCheck = failedCheck + peer)
val newDmh = copy(state = newHeaderState)
if (newHeaderState.validated) {
fetchCompactFilters(newDmh).map(_.copy(state = PostHeaderSync))
} else Future.successful(newDmh)
case _: DataMessageHandlerState => Future.successful(this)
}
}
private def sendNextGetCompactFilterHeadersCommand(
peerMsgSender: PeerMessageSender,
prevStopHash: DoubleSha256DigestBE): Future[Boolean] =
@ -481,3 +711,13 @@ case class DataMessageHandler(
}
}
}
sealed trait StreamDataMessageWrapper
case class DataMessageWrapper(
payload: DataPayload,
peerMsgSender: PeerMessageSender,
peer: Peer)
extends StreamDataMessageWrapper
case class HeaderTimeoutWrapper(peer: Peer) extends StreamDataMessageWrapper

View File

@ -0,0 +1,20 @@
package org.bitcoins.node.networking.peer
import org.bitcoins.node.models.Peer
sealed abstract class DataMessageHandlerState
object DataMessageHandlerState {
final case object HeaderSync extends DataMessageHandlerState
case class ValidatingHeaders(
inSyncWith: Set[Peer],
failedCheck: Set[Peer],
verifyingWith: Set[Peer]
) extends DataMessageHandlerState {
def validated: Boolean = inSyncWith ++ failedCheck == verifyingWith
}
final case object PostHeaderSync extends DataMessageHandlerState
}

View File

@ -238,11 +238,9 @@ class PeerMessageReceiver(
curReceiver: PeerMessageReceiver): Future[PeerMessageReceiver] = {
//else it means we are receiving this data payload from a peer,
//we need to handle it
node.getDataMessageHandler.handleDataPayload(payload, sender, peer).map {
handler =>
val newNode = node.updateDataMessageHandler(handler)
new PeerMessageReceiver(newNode, curReceiver.state, peer)
}
node.getDataMessageHandler
.addToStream(payload, sender, peer)
.map(_ => new PeerMessageReceiver(node, curReceiver.state, peer))
}
/** Handles control payloads defined here https://bitcoin.org/en/developer-reference#control-messages
@ -266,7 +264,8 @@ class PeerMessageReceiver(
def onResponseTimeout(networkPayload: NetworkPayload): Future[Unit] = {
assert(networkPayload.isInstanceOf[ExpectsResponse])
logger.debug(s"Handling response timeout for ${networkPayload.commandName}")
logger.debug(
s"Handling response timeout for ${networkPayload.commandName} from $peer")
//isn't this redundant? No, on response timeout may be called when not cancel timeout
state match {
@ -287,6 +286,9 @@ class PeerMessageReceiver(
}
def handleExpectResponse(msg: NetworkPayload): Future[PeerMessageReceiver] = {
require(
msg.isInstanceOf[ExpectsResponse],
s"Cannot expect response for ${msg.commandName} from $peer as ${msg.commandName} does not expect a response.")
state match {
case good: Normal =>
logger.debug(s"Handling expected response for ${msg.commandName}")

View File

@ -190,11 +190,12 @@ object NodeUnitTest extends P2PLogger {
chainConf: ChainAppConfig,
nodeConf: NodeAppConfig,
system: ActorSystem): NeutrinoNode = {
import system.dispatcher
val dmh = DataMessageHandler(chainApi, walletCreationTimeOpt)
NeutrinoNode(dmh, nodeConf, chainConf, system, paramPeers = Vector(peer))
NeutrinoNode(chainApi,
walletCreationTimeOpt,
nodeConf,
chainConf,
system,
paramPeers = Vector(peer))
}
def buildPeerMessageReceiver(
@ -424,9 +425,9 @@ object NodeUnitTest extends P2PLogger {
peer <- createPeer(bitcoind)
chainApi <- chainApiF
} yield {
val dmh = DataMessageHandler(chainApi, walletCreationTimeOpt)
NeutrinoNode(paramPeers = Vector(peer),
dataMessageHandler = dmh,
NeutrinoNode(chainApi,
walletCreationTimeOpt,
paramPeers = Vector(peer),
nodeConfig = nodeAppConfig,
chainConfig = chainAppConfig,
actorSystem = system)
@ -456,9 +457,9 @@ object NodeUnitTest extends P2PLogger {
_ <- nodeAppConfig.start()
chainApi <- chainApiF
} yield {
val dmh = DataMessageHandler(chainApi, walletCreationTimeOpt)
NeutrinoNode(paramPeers = Vector(peer),
dataMessageHandler = dmh,
NeutrinoNode(chainApi,
walletCreationTimeOpt,
paramPeers = Vector(peer),
nodeConfig = nodeAppConfig,
chainConfig = chainAppConfig,
actorSystem = system)
@ -491,9 +492,9 @@ object NodeUnitTest extends P2PLogger {
chainApi <- chainApiF
peers <- Future.sequence(peersF)
} yield {
val dmh = DataMessageHandler(chainApi, creationTimeOpt)
NeutrinoNode(paramPeers = peers,
dataMessageHandler = dmh,
NeutrinoNode(chainApi,
creationTimeOpt,
paramPeers = peers,
nodeConfig = nodeAppConfig,
chainConfig = chainAppConfig,
actorSystem = system)

View File

@ -790,6 +790,24 @@ trait BitcoindRpcTestUtil extends Logging {
createNodePairInternal(version)
}
/** Returns a pair of [[org.bitcoins.rpc.client.common.BitcoindRpcClient BitcoindRpcClient]]
* that are not connected but have the same blocks in the chain
*/
def createUnconnectedNodePairWithBlocks[T <: BitcoindRpcClient](
clientAccum: RpcClientAccum = Vector.newBuilder)(implicit
system: ActorSystem): Future[(BitcoindRpcClient, BitcoindRpcClient)] = {
import system.dispatcher
for {
(first, second) <- createNodePair(clientAccum)
_ <- first.addNode(second.getDaemon.uri, AddNodeArgument.Remove)
_ <- first.disconnectNode(second.getDaemon.uri)
_ <- awaitDisconnected(first, second)
_ <- awaitDisconnected(second, first)
} yield {
(first, second)
}
}
/** Returns a pair of [[org.bitcoins.rpc.client.v17.BitcoindV17RpcClient BitcoindV17RpcClient]]
* that are connected with some blocks in the chain
*/