Cleanup use of syncPeerOpt as a param in parts of PeerManager (#5209)

This commit is contained in:
Chris Stewart 2023-08-26 12:37:27 -05:00 committed by GitHub
parent 2672c2223c
commit aa02683db9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 92 additions and 72 deletions

View file

@ -57,4 +57,23 @@ class PeerManagerTest extends NodeTestWithCachedBitcoindNewest {
) //make sure we had a peer passed as a param
}
}
it must "determine if filters are out of sync" in { _ =>
val blockCount = 804934
val oldFilterHeaderCount = 804934
val currentFilterHeaderCount = 804934
val oldFilterCount = 771760
val currentFilterCount = 771760
val isOutOfSync = PeerManager.isFiltersOutOfSync(
blockCount = blockCount,
oldFilterHeaderCount = oldFilterHeaderCount,
currentFilterHeaderCount = currentFilterHeaderCount,
oldFilterCount = oldFilterCount,
currentFilterCount = currentFilterCount
)
assert(isOutOfSync)
}
}

View file

@ -888,7 +888,7 @@ case class PeerManager(
case s @ (_: FilterHeaderSync | _: FilterSync) =>
if (s.syncPeer != newPeer) {
filterSyncHelper(chainApi = ChainHandler.fromDatabase(),
syncPeerOpt = Some(newPeer)).map(_ => newState)
syncPeer = newPeer).map(_ => newState)
} else {
//if its same peer we don't need to switch
Future.successful(oldSyncState)
@ -897,6 +897,7 @@ case class PeerManager(
}
}
/** If [[syncPeerOpt]] is given, we send getheaders to only that peer, if no sync peer given we gossip getheaders to all our peers */
private def getHeaderSyncHelper(syncPeerOpt: Option[Peer]): Future[Unit] = {
val blockchainsF =
BlockHeaderDAO()(ec, chainAppConfig).getBlockchains()
@ -917,7 +918,7 @@ case class PeerManager(
private def filterSyncHelper(
chainApi: ChainApi,
syncPeerOpt: Option[Peer]): Future[Unit] = {
syncPeer: Peer): Future[Unit] = {
for {
header <- chainApi.getBestBlockHeader()
bestFilterHeaderOpt <- chainApi.getBestFilterHeader()
@ -930,23 +931,13 @@ case class PeerManager(
//after we are done syncing block headers
Future.unit
} else {
val peerOpt = syncPeerOpt match {
case Some(p) => Some(p)
case None =>
randomPeerWithService(ServiceIdentifier.NODE_COMPACT_FILTERS)
}
peerOpt match {
case Some(p) =>
syncFilters(bestFilterHeaderOpt = bestFilterHeaderOpt,
bestFilterOpt = bestFilterOpt,
bestBlockHeader = header,
chainApi = chainApi,
nodeState = FilterHeaderSync(p, peers))
case None =>
Future.failed(
new RuntimeException(
"Could not find peer to sync filters with!"))
}
syncFilters(
bestFilterHeaderOpt = bestFilterHeaderOpt,
bestFilterOpt = bestFilterOpt,
bestBlockHeader = header,
chainApi = chainApi,
nodeState = FilterHeaderSync(syncPeer, peers)
)
}
}
} yield ()
@ -979,8 +970,21 @@ case class PeerManager(
// 2. Shutting down the peer manager.
//
// the filter sync job gets scheduled _after_ PeerManager.stop() has been called
val cancellable = createFilterSyncJob(chainApi, syncPeerOpt)
syncFilterCancellableOpt = Some(cancellable)
syncFilterCancellableOpt = syncFilterCancellableOpt match {
case s: Some[Cancellable] =>
s //do nothing as we already have a job scheduled
case None =>
syncPeerOpt match {
case Some(syncPeer) =>
val c = createFilterSyncJob(chainApi, syncPeer)
Some(c)
case None =>
//no sync peer to schedule the job with
logger.warn(
s"Unable to createFilterSyncJob as we have no sync peer!")
None
}
}
}
}
header <- headerF
@ -994,68 +998,55 @@ case class PeerManager(
private def createFilterSyncJob(
chainApi: ChainApi,
syncPeerOpt: Option[Peer]): Cancellable = {
syncPeer: Peer): Cancellable = {
require(
syncFilterCancellableOpt.isEmpty,
s"Cannot schedule a syncFilterCancellable as one is already scheduled")
//add a delay when syncing filter headers/filters for the case when we restart the node,
//our block header tip _is not_ synced with the network, but our tip is also _not_ stale
//this can result in duplicate syncing of filter headers.
//see: https://github.com/bitcoin-s/bitcoin-s/issues/5125
val cancellable = {
syncFilterCancellableOpt match {
case Some(syncFilterCancellable)
if !syncFilterCancellable.isCancelled =>
syncFilterCancellable
case Some(_) | None =>
val oldFilterHeaderCountF = chainApi.getFilterHeaderCount()
val oldFilterCountF = chainApi.getFilterCount()
system.scheduler.scheduleOnce(10.seconds) {
val filterSyncF = {
for {
oldFilterHeaderCount <- oldFilterHeaderCountF
oldFilterCount <- oldFilterCountF
blockCount <- chainApi.getBlockCount()
currentFilterHeaderCount <- chainApi.getFilterHeaderCount()
currentFilterCount <- chainApi.getFilterCount()
_ <- {
//make sure filter sync hasn't started since we schedule the job...
//see: https://github.com/bitcoin-s/bitcoin-s/issues/5167
val isOutOfSync = isFiltersOutOfSync(blockCount,
oldFilterHeaderCount,
currentFilterHeaderCount,
oldFilterCount,
currentFilterCount)
if (isOutOfSync) {
//if it hasn't started it, start it
filterSyncHelper(chainApi, syncPeerOpt)
} else {
Future.unit
}
}
} yield ()
val oldFilterHeaderCountF = chainApi.getFilterHeaderCount()
val oldFilterCountF = chainApi.getFilterCount()
system.scheduler.scheduleOnce(10.seconds) {
val filterSyncF = {
for {
oldFilterHeaderCount <- oldFilterHeaderCountF
oldFilterCount <- oldFilterCountF
blockCount <- chainApi.getBlockCount()
currentFilterHeaderCount <- chainApi.getFilterHeaderCount()
currentFilterCount <- chainApi.getFilterCount()
_ <- {
//make sure filter sync hasn't started since we schedule the job...
//see: https://github.com/bitcoin-s/bitcoin-s/issues/5167
val isOutOfSync = PeerManager.isFiltersOutOfSync(
blockCount = blockCount,
oldFilterHeaderCount = oldFilterHeaderCount,
currentFilterHeaderCount = currentFilterHeaderCount,
oldFilterCount = oldFilterCount,
currentFilterCount = currentFilterCount
)
if (isOutOfSync) {
//if it hasn't started it, start it
filterSyncHelper(chainApi, syncPeer)
} else {
Future.unit
}
filterSyncF.onComplete {
case scala.util.Success(_) =>
syncFilterCancellableOpt = None
case scala.util.Failure(err) =>
logger.error(s"Failed to start syncing filters", err)
syncFilterCancellableOpt = None
}
()
}
} yield ()
}
filterSyncF.onComplete {
case scala.util.Success(_) =>
syncFilterCancellableOpt = None
case scala.util.Failure(err) =>
logger.error(s"Failed to start syncing filters", err)
syncFilterCancellableOpt = None
}
()
}
cancellable
}
/** Returns true if filter are in sync with their old counts, but out of sync with our block count */
private def isFiltersOutOfSync(
blockCount: Int,
oldFilterHeaderCount: Int,
currentFilterHeaderCount: Int,
oldFilterCount: Int,
currentFilterCount: Int): Boolean = {
(oldFilterHeaderCount == currentFilterHeaderCount && oldFilterCount == currentFilterCount) &&
(blockCount != currentFilterHeaderCount || blockCount != currentFilterCount)
}
private def syncFilters(
bestFilterHeaderOpt: Option[CompactFilterHeaderDb],
@ -1287,4 +1278,14 @@ object PeerManager extends Logging {
currentDmh.copy(state = newSyncingState)
}
}
def isFiltersOutOfSync(
blockCount: Int,
oldFilterHeaderCount: Int,
currentFilterHeaderCount: Int,
oldFilterCount: Int,
currentFilterCount: Int): Boolean = {
(oldFilterHeaderCount == currentFilterHeaderCount && oldFilterCount == currentFilterCount) &&
(blockCount != currentFilterHeaderCount || blockCount != currentFilterCount)
}
}