diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index b5c71fd34e..34767524b6 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -158,7 +158,7 @@ case class PeerManager( isStarted.set(false) val beganAt = System.currentTimeMillis() - syncFilterCancellableOpt.map(_.cancel()) + syncFilterCancellableOpt.map(_._2.cancel()) val stopF = for { _ <- queue.offer(NodeShutdown) @@ -321,7 +321,7 @@ case class PeerManager( s"Disconnected peer=$peer peers=$peers state=$state forceReconnect=$forceReconnect peerDataMap=${peerDataMap .map(_._1)}") val finder = state.peerFinder - + val _ = onDisconnectSyncFiltersJob(peer) val updateLastSeenF = PeerDAO().updateLastSeenTime(peer) val stateF = { require(!finder.hasPeer(peer) || !peerDataMap.contains(peer), @@ -894,7 +894,8 @@ case class PeerManager( } /** Scheduled job to sync compact filters */ - @volatile private[this] var syncFilterCancellableOpt: Option[Cancellable] = + @volatile private[this] var syncFilterCancellableOpt: Option[ + (Peer, Cancellable)] = None def sync(syncPeerOpt: Option[Peer]): Future[Unit] = { @@ -929,7 +930,7 @@ case class PeerManager( // // the filter sync job gets scheduled _after_ PeerManager.stop() has been called syncFilterCancellableOpt = syncFilterCancellableOpt match { - case s: Some[Cancellable] => + case s: Some[(Peer, Cancellable)] => s //do nothing as we already have a job scheduled case None => val c = createFilterSyncJob(chainApi, syncNodeState) @@ -946,9 +947,27 @@ case class PeerManager( } } + /** If we are disconnecting a peer, we want to cancel the associated + * sync filters job if the peer we are disconnecting is the one + * we are scheduled to sync filters with later + */ + private def onDisconnectSyncFiltersJob(peer: Peer): Unit = { + syncFilterCancellableOpt match { + case Some((p, cancellable)) => + if (peer == p) { + cancellable.cancel() + syncFilterCancellableOpt = None + () + } else { + () + } + case None => () + } + } + private def createFilterSyncJob( chainApi: ChainApi, - syncNodeState: SyncNodeState): Cancellable = { + syncNodeState: SyncNodeState): (Peer, Cancellable) = { require( syncFilterCancellableOpt.isEmpty, s"Cannot schedule a syncFilterCancellable as one is already scheduled") @@ -958,7 +977,7 @@ case class PeerManager( //see: https://github.com/bitcoin-s/bitcoin-s/issues/5125 val oldFilterHeaderCountF = chainApi.getFilterHeaderCount() val oldFilterCountF = chainApi.getFilterCount() - system.scheduler.scheduleOnce(10.seconds) { + val cancellable = system.scheduler.scheduleOnce(10.seconds) { val filterSyncF = { for { oldFilterHeaderCount <- oldFilterHeaderCountF @@ -995,6 +1014,8 @@ case class PeerManager( } () } + + (syncNodeState.syncPeer, cancellable) } /** Returns true if filter are in sync with their old counts, but out of sync with our block count */