From 7c649d39db54a8e628481db3372ddac261dd355e Mon Sep 17 00:00:00 2001 From: Shreyansh <56965282+shreyanshyad@users.noreply.github.com> Date: Sat, 9 Jul 2022 00:14:15 +0530 Subject: [PATCH] Fix filter sync if headers received while syncing (#4463) * fix filter sync if headers received while syncing * remove unintended diffs --- .../org/bitcoins/node/NeutrinoNodeTest.scala | 15 ++++++++ .../networking/peer/DataMessageHandler.scala | 38 ++++++++++++++++++- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala index dbe24dc44e..d8391f2ffc 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala @@ -253,4 +253,19 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair { } } } + + //intended for test fixtures + it must "sync filters when multiple header messages are sent in succession" in { + nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoinds => + val node = nodeConnectedWithBitcoind.node + val bitcoind = nodeConnectedWithBitcoind.bitcoinds(0) + + for { + _ <- NodeUnitTest.syncNeutrinoNode(node, bitcoind) + _ <- bitcoind.generateToAddress(2, junkAddress) + _ <- NodeTestUtil.awaitAllSync(node, bitcoind) + } yield { + succeed + } + } } diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index bd5fb95515..0bf6331707 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -119,7 +119,6 @@ case class DataMessageHandler( } else { val syncing = newFilterHeight < newFilterHeaderHeight if (!syncing) { - logger.info(s"We are synced") Try(initialSyncDone.map(_.success(Done))) } syncing @@ -147,11 +146,18 @@ case class DataMessageHandler( s"Received maximum amount of filters in one batch. This means we are not synced, requesting more") sendNextGetCompactFilterCommand(peerMsgSender, newFilterHeight) } else Future.unit + newSyncing2 <- { + if (!newSyncing) { + syncIfHeadersAhead(peerMsgSender) + } else { + Future.successful(newSyncing) + } + } } yield { this.copy( chainApi = newChainApi, currentFilterBatch = newBatch, - syncing = newSyncing, + syncing = newSyncing2, filterHeaderHeightOpt = Some(newFilterHeaderHeight), filterHeightOpt = Some(newFilterHeight) ) @@ -335,6 +341,34 @@ case class DataMessageHandler( } } + /** syncs filter headers in case the header chain is still ahead post filter sync */ + def syncIfHeadersAhead( + peerMessageSender: PeerMessageSender): Future[Boolean] = { + for { + headerHeight <- chainApi.getBestHashBlockHeight() + filterHeaderCount <- chainApi.getFilterHeaderCount() + filterCount <- chainApi.getFilterCount() + syncing <- { + assert(headerHeight >= Math.max(filterHeaderCount, filterCount), + "Header chain cannot be behind filter or filter header chain") + assert( + filterHeaderCount >= filterCount, + s"Filter header height $filterHeaderCount must be atleast filter height $filterCount") + if (headerHeight > filterHeaderCount) { + logger.info( + s"Starting to fetch filter headers in data message handler") + sendFirstGetCompactFilterHeadersCommand(peerMessageSender) + } else { + assert( + headerHeight == filterHeaderCount && headerHeight == filterCount) + logger.info(s"We are synced") + Try(initialSyncDone.map(_.success(Done))) + Future.successful(false) + } + } + } yield syncing + } + private def sendNextGetCompactFilterHeadersCommand( peerMsgSender: PeerMessageSender, prevStopHash: DoubleSha256DigestBE): Future[Boolean] =