Fix case were we weren't cancelling the syncFilter job onDisconnect() (#5402)

This commit is contained in:
Chris Stewart 2024-02-17 17:41:34 -06:00 committed by GitHub
parent f5087c5e3f
commit 155b4fbc76
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -158,7 +158,7 @@ case class PeerManager(
isStarted.set(false) isStarted.set(false)
val beganAt = System.currentTimeMillis() val beganAt = System.currentTimeMillis()
syncFilterCancellableOpt.map(_.cancel()) syncFilterCancellableOpt.map(_._2.cancel())
val stopF = for { val stopF = for {
_ <- queue.offer(NodeShutdown) _ <- queue.offer(NodeShutdown)
@ -321,7 +321,7 @@ case class PeerManager(
s"Disconnected peer=$peer peers=$peers state=$state forceReconnect=$forceReconnect peerDataMap=${peerDataMap s"Disconnected peer=$peer peers=$peers state=$state forceReconnect=$forceReconnect peerDataMap=${peerDataMap
.map(_._1)}") .map(_._1)}")
val finder = state.peerFinder val finder = state.peerFinder
val _ = onDisconnectSyncFiltersJob(peer)
val updateLastSeenF = PeerDAO().updateLastSeenTime(peer) val updateLastSeenF = PeerDAO().updateLastSeenTime(peer)
val stateF = { val stateF = {
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer), require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
@ -894,7 +894,8 @@ case class PeerManager(
} }
/** Scheduled job to sync compact filters */ /** Scheduled job to sync compact filters */
@volatile private[this] var syncFilterCancellableOpt: Option[Cancellable] = @volatile private[this] var syncFilterCancellableOpt: Option[
(Peer, Cancellable)] =
None None
def sync(syncPeerOpt: Option[Peer]): Future[Unit] = { def sync(syncPeerOpt: Option[Peer]): Future[Unit] = {
@ -929,7 +930,7 @@ case class PeerManager(
// //
// the filter sync job gets scheduled _after_ PeerManager.stop() has been called // the filter sync job gets scheduled _after_ PeerManager.stop() has been called
syncFilterCancellableOpt = syncFilterCancellableOpt match { syncFilterCancellableOpt = syncFilterCancellableOpt match {
case s: Some[Cancellable] => case s: Some[(Peer, Cancellable)] =>
s //do nothing as we already have a job scheduled s //do nothing as we already have a job scheduled
case None => case None =>
val c = createFilterSyncJob(chainApi, syncNodeState) val c = createFilterSyncJob(chainApi, syncNodeState)
@ -946,9 +947,27 @@ case class PeerManager(
} }
} }
/** If we are disconnecting a peer, we want to cancel the associated
* sync filters job if the peer we are disconnecting is the one
* we are scheduled to sync filters with later
*/
private def onDisconnectSyncFiltersJob(peer: Peer): Unit = {
syncFilterCancellableOpt match {
case Some((p, cancellable)) =>
if (peer == p) {
cancellable.cancel()
syncFilterCancellableOpt = None
()
} else {
()
}
case None => ()
}
}
private def createFilterSyncJob( private def createFilterSyncJob(
chainApi: ChainApi, chainApi: ChainApi,
syncNodeState: SyncNodeState): Cancellable = { syncNodeState: SyncNodeState): (Peer, Cancellable) = {
require( require(
syncFilterCancellableOpt.isEmpty, syncFilterCancellableOpt.isEmpty,
s"Cannot schedule a syncFilterCancellable as one is already scheduled") s"Cannot schedule a syncFilterCancellable as one is already scheduled")
@ -958,7 +977,7 @@ case class PeerManager(
//see: https://github.com/bitcoin-s/bitcoin-s/issues/5125 //see: https://github.com/bitcoin-s/bitcoin-s/issues/5125
val oldFilterHeaderCountF = chainApi.getFilterHeaderCount() val oldFilterHeaderCountF = chainApi.getFilterHeaderCount()
val oldFilterCountF = chainApi.getFilterCount() val oldFilterCountF = chainApi.getFilterCount()
system.scheduler.scheduleOnce(10.seconds) { val cancellable = system.scheduler.scheduleOnce(10.seconds) {
val filterSyncF = { val filterSyncF = {
for { for {
oldFilterHeaderCount <- oldFilterHeaderCountF oldFilterHeaderCount <- oldFilterHeaderCountF
@ -995,6 +1014,8 @@ case class PeerManager(
} }
() ()
} }
(syncNodeState.syncPeer, cancellable)
} }
/** Returns true if filter are in sync with their old counts, but out of sync with our block count */ /** Returns true if filter are in sync with their old counts, but out of sync with our block count */