1
0
Fork 0
mirror of https://github.com/bitcoin-s/bitcoin-s.git synced 2025-03-26 21:42:48 +01:00

Fix duplicate filter header sync by adding delay before attempting to sync filter headers ()

* Fix duplicate filter header sync by adding delay before attempting to sync filter headers

* Fix bug where we don't wait for AsyncUtil.nonBlockingSleep()
This commit is contained in:
Chris Stewart 2023-07-06 12:05:14 -05:00 committed by GitHub
parent fc99087c89
commit 9b6bca06c0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,7 +1,7 @@
package org.bitcoins.node
import akka.{Done, NotUsed}
import akka.actor.{ActorSystem}
import akka.actor.{ActorSystem, Cancellable}
import akka.stream.{
ActorAttributes,
OverflowStrategy,
@ -36,7 +36,7 @@ import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models.{Peer, PeerDAO, PeerDb}
import org.bitcoins.node.networking.peer.DataMessageHandlerState._
import org.bitcoins.node.networking.peer._
import org.bitcoins.node.util.{PeerMessageSenderApi}
import org.bitcoins.node.util.PeerMessageSenderApi
import scodec.bits.ByteVector
import java.net.InetAddress
@ -396,6 +396,8 @@ case class PeerManager(
isStarted.set(false)
val beganAt = System.currentTimeMillis()
syncFilterCancellableOpt.map(_.cancel())
val finderStopF = finderOpt match {
case Some(finder) => finder.stop()
case None => Future.unit
@ -420,6 +422,7 @@ case class PeerManager(
_ <- watchCompletion()
_ = {
//reset all variables
syncFilterCancellableOpt = None
dataMessageQueueOpt = None
streamDoneFOpt = None
finderOpt = None
@ -915,6 +918,10 @@ case class PeerManager(
} yield ()
}
/** Scheduled job to sync compact filters */
@volatile private[this] var syncFilterCancellableOpt: Option[Cancellable] =
None
/** Helper method to sync the blockchain over the network
*
* @param syncPeerOpt if syncPeer is given, we send [[org.bitcoins.core.p2p.GetHeadersMessage]] to that peer. If None we gossip GetHeadersMessage to all peers
@ -924,7 +931,10 @@ case class PeerManager(
val headerF = chainApi.getBestBlockHeader()
for {
_ <- getHeaderSyncHelper(syncPeerOpt)
_ <- filterSyncHelper(chainApi, syncPeerOpt)
cancellable = createFilterSyncJob(chainApi, syncPeerOpt)
_ = {
syncFilterCancellableOpt = Some(cancellable)
}
header <- headerF
} yield {
logger.info(
@ -932,6 +942,35 @@ case class PeerManager(
}
}
private def createFilterSyncJob(
chainApi: ChainApi,
syncPeerOpt: Option[Peer]): Cancellable = {
//add a delay when syncing filter headers/filters for the case when we restart the node,
//our block header tip _is not_ synced with the network, but our tip is also _not_ stale
//this can result in duplicate syncing of filter headers.
//see: https://github.com/bitcoin-s/bitcoin-s/issues/5125
val cancellable = {
syncFilterCancellableOpt match {
case Some(syncFilterCancellable)
if !syncFilterCancellable.isCancelled =>
syncFilterCancellable
case Some(_) | None =>
system.scheduler.scheduleOnce(10.seconds) {
val filterSyncF = filterSyncHelper(chainApi, syncPeerOpt)
filterSyncF.onComplete {
case scala.util.Success(_) =>
syncFilterCancellableOpt = None
case scala.util.Failure(err) =>
logger.error(s"Failed to start syncing filters", err)
syncFilterCancellableOpt = None
}
()
}
}
}
cancellable
}
private def syncFilters(
bestFilterHeaderOpt: Option[CompactFilterHeaderDb],
bestFilterOpt: Option[CompactFilterDb],