From aa02683db9aa4dba816a5dd5353a6f2325578271 Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Sat, 26 Aug 2023 12:37:27 -0500 Subject: [PATCH] Cleanup use of syncPeerOpt as a param in parts of PeerManager (#5209) --- .../org/bitcoins/node/PeerManagerTest.scala | 19 +++ .../scala/org/bitcoins/node/PeerManager.scala | 145 +++++++++--------- 2 files changed, 92 insertions(+), 72 deletions(-) diff --git a/node-test/src/test/scala/org/bitcoins/node/PeerManagerTest.scala b/node-test/src/test/scala/org/bitcoins/node/PeerManagerTest.scala index c9ed954180..201dbc89fe 100644 --- a/node-test/src/test/scala/org/bitcoins/node/PeerManagerTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/PeerManagerTest.scala @@ -57,4 +57,23 @@ class PeerManagerTest extends NodeTestWithCachedBitcoindNewest { ) //make sure we had a peer passed as a param } } + + it must "determine if filters are out of sync" in { _ => + val blockCount = 804934 + val oldFilterHeaderCount = 804934 + val currentFilterHeaderCount = 804934 + val oldFilterCount = 771760 + val currentFilterCount = 771760 + + val isOutOfSync = PeerManager.isFiltersOutOfSync( + blockCount = blockCount, + oldFilterHeaderCount = oldFilterHeaderCount, + currentFilterHeaderCount = currentFilterHeaderCount, + oldFilterCount = oldFilterCount, + currentFilterCount = currentFilterCount + ) + + assert(isOutOfSync) + + } } diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index 6da9cb3a12..d19870dfb5 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -888,7 +888,7 @@ case class PeerManager( case s @ (_: FilterHeaderSync | _: FilterSync) => if (s.syncPeer != newPeer) { filterSyncHelper(chainApi = ChainHandler.fromDatabase(), - syncPeerOpt = Some(newPeer)).map(_ => newState) + syncPeer = newPeer).map(_ => newState) } else { //if its same peer we don't need to switch Future.successful(oldSyncState) @@ -897,6 +897,7 @@ case class PeerManager( } } + /** If [[syncPeerOpt]] is given, we send getheaders to only that peer, if no sync peer given we gossip getheaders to all our peers */ private def getHeaderSyncHelper(syncPeerOpt: Option[Peer]): Future[Unit] = { val blockchainsF = BlockHeaderDAO()(ec, chainAppConfig).getBlockchains() @@ -917,7 +918,7 @@ case class PeerManager( private def filterSyncHelper( chainApi: ChainApi, - syncPeerOpt: Option[Peer]): Future[Unit] = { + syncPeer: Peer): Future[Unit] = { for { header <- chainApi.getBestBlockHeader() bestFilterHeaderOpt <- chainApi.getBestFilterHeader() @@ -930,23 +931,13 @@ case class PeerManager( //after we are done syncing block headers Future.unit } else { - val peerOpt = syncPeerOpt match { - case Some(p) => Some(p) - case None => - randomPeerWithService(ServiceIdentifier.NODE_COMPACT_FILTERS) - } - peerOpt match { - case Some(p) => - syncFilters(bestFilterHeaderOpt = bestFilterHeaderOpt, - bestFilterOpt = bestFilterOpt, - bestBlockHeader = header, - chainApi = chainApi, - nodeState = FilterHeaderSync(p, peers)) - case None => - Future.failed( - new RuntimeException( - "Could not find peer to sync filters with!")) - } + syncFilters( + bestFilterHeaderOpt = bestFilterHeaderOpt, + bestFilterOpt = bestFilterOpt, + bestBlockHeader = header, + chainApi = chainApi, + nodeState = FilterHeaderSync(syncPeer, peers) + ) } } } yield () @@ -979,8 +970,21 @@ case class PeerManager( // 2. Shutting down the peer manager. // // the filter sync job gets scheduled _after_ PeerManager.stop() has been called - val cancellable = createFilterSyncJob(chainApi, syncPeerOpt) - syncFilterCancellableOpt = Some(cancellable) + syncFilterCancellableOpt = syncFilterCancellableOpt match { + case s: Some[Cancellable] => + s //do nothing as we already have a job scheduled + case None => + syncPeerOpt match { + case Some(syncPeer) => + val c = createFilterSyncJob(chainApi, syncPeer) + Some(c) + case None => + //no sync peer to schedule the job with + logger.warn( + s"Unable to createFilterSyncJob as we have no sync peer!") + None + } + } } } header <- headerF @@ -994,68 +998,55 @@ case class PeerManager( private def createFilterSyncJob( chainApi: ChainApi, - syncPeerOpt: Option[Peer]): Cancellable = { + syncPeer: Peer): Cancellable = { + require( + syncFilterCancellableOpt.isEmpty, + s"Cannot schedule a syncFilterCancellable as one is already scheduled") //add a delay when syncing filter headers/filters for the case when we restart the node, //our block header tip _is not_ synced with the network, but our tip is also _not_ stale //this can result in duplicate syncing of filter headers. //see: https://github.com/bitcoin-s/bitcoin-s/issues/5125 - val cancellable = { - syncFilterCancellableOpt match { - case Some(syncFilterCancellable) - if !syncFilterCancellable.isCancelled => - syncFilterCancellable - case Some(_) | None => - val oldFilterHeaderCountF = chainApi.getFilterHeaderCount() - val oldFilterCountF = chainApi.getFilterCount() - system.scheduler.scheduleOnce(10.seconds) { - val filterSyncF = { - for { - oldFilterHeaderCount <- oldFilterHeaderCountF - oldFilterCount <- oldFilterCountF - blockCount <- chainApi.getBlockCount() - currentFilterHeaderCount <- chainApi.getFilterHeaderCount() - currentFilterCount <- chainApi.getFilterCount() - _ <- { - //make sure filter sync hasn't started since we schedule the job... - //see: https://github.com/bitcoin-s/bitcoin-s/issues/5167 - val isOutOfSync = isFiltersOutOfSync(blockCount, - oldFilterHeaderCount, - currentFilterHeaderCount, - oldFilterCount, - currentFilterCount) - if (isOutOfSync) { - //if it hasn't started it, start it - filterSyncHelper(chainApi, syncPeerOpt) - } else { - Future.unit - } - } - } yield () + val oldFilterHeaderCountF = chainApi.getFilterHeaderCount() + val oldFilterCountF = chainApi.getFilterCount() + system.scheduler.scheduleOnce(10.seconds) { + val filterSyncF = { + for { + oldFilterHeaderCount <- oldFilterHeaderCountF + oldFilterCount <- oldFilterCountF + blockCount <- chainApi.getBlockCount() + currentFilterHeaderCount <- chainApi.getFilterHeaderCount() + currentFilterCount <- chainApi.getFilterCount() + _ <- { + //make sure filter sync hasn't started since we schedule the job... + //see: https://github.com/bitcoin-s/bitcoin-s/issues/5167 + val isOutOfSync = PeerManager.isFiltersOutOfSync( + blockCount = blockCount, + oldFilterHeaderCount = oldFilterHeaderCount, + currentFilterHeaderCount = currentFilterHeaderCount, + oldFilterCount = oldFilterCount, + currentFilterCount = currentFilterCount + ) + if (isOutOfSync) { + //if it hasn't started it, start it + filterSyncHelper(chainApi, syncPeer) + } else { + Future.unit } - filterSyncF.onComplete { - case scala.util.Success(_) => - syncFilterCancellableOpt = None - case scala.util.Failure(err) => - logger.error(s"Failed to start syncing filters", err) - syncFilterCancellableOpt = None - } - () } + } yield () } + filterSyncF.onComplete { + case scala.util.Success(_) => + syncFilterCancellableOpt = None + case scala.util.Failure(err) => + logger.error(s"Failed to start syncing filters", err) + syncFilterCancellableOpt = None + } + () } - cancellable } /** Returns true if filter are in sync with their old counts, but out of sync with our block count */ - private def isFiltersOutOfSync( - blockCount: Int, - oldFilterHeaderCount: Int, - currentFilterHeaderCount: Int, - oldFilterCount: Int, - currentFilterCount: Int): Boolean = { - (oldFilterHeaderCount == currentFilterHeaderCount && oldFilterCount == currentFilterCount) && - (blockCount != currentFilterHeaderCount || blockCount != currentFilterCount) - } private def syncFilters( bestFilterHeaderOpt: Option[CompactFilterHeaderDb], @@ -1287,4 +1278,14 @@ object PeerManager extends Logging { currentDmh.copy(state = newSyncingState) } } + + def isFiltersOutOfSync( + blockCount: Int, + oldFilterHeaderCount: Int, + currentFilterHeaderCount: Int, + oldFilterCount: Int, + currentFilterCount: Int): Boolean = { + (oldFilterHeaderCount == currentFilterHeaderCount && oldFilterCount == currentFilterCount) && + (blockCount != currentFilterHeaderCount || blockCount != currentFilterCount) + } }