diff --git a/core/src/main/scala/org/bitcoins/core/api/node/NodeState.scala b/core/src/main/scala/org/bitcoins/core/api/node/NodeState.scala index f3fefe1dbd..5a6411bf2e 100644 --- a/core/src/main/scala/org/bitcoins/core/api/node/NodeState.scala +++ b/core/src/main/scala/org/bitcoins/core/api/node/NodeState.scala @@ -1,5 +1,7 @@ package org.bitcoins.core.api.node +import org.bitcoins.core.p2p.CompactFilterMessage + sealed abstract class NodeState { def isSyncing: Boolean @@ -72,7 +74,8 @@ object NodeState { case class FilterSync( syncPeer: Peer, peers: Set[Peer], - waitingForDisconnection: Set[Peer]) + waitingForDisconnection: Set[Peer], + filterBatchCache: Set[CompactFilterMessage]) extends SyncNodeState case class MisbehavingPeer( diff --git a/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala b/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala index 9ef9c9bd65..6c9ca448d8 100644 --- a/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/networking/peer/DataMessageHandlerTest.scala @@ -57,8 +57,7 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest { walletCreationTimeOpt = None, queue = peerManager.dataMessageQueueOpt.get, peerMessageSenderApi = peerManager, - state = HeaderSync(peer, peerManager.peers, Set.empty), - filterBatchCache = Set.empty + state = HeaderSync(peer, peerManager.peers, Set.empty) )(node.executionContext, node.nodeAppConfig, node.chainConfig) // Use signet genesis block header, this should be invalid for regtest diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index 70fcf71f30..dd655a54cb 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -745,8 +745,7 @@ case class PeerManager( walletCreationTimeOpt = walletCreationTimeOpt, queue = queue, peerMessageSenderApi = this, - state = DoneSyncing(peers = peers, waitingForDisconnection = Set.empty), - filterBatchCache = Set.empty + state = DoneSyncing(peers = peers, waitingForDisconnection = Set.empty) ) } diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index b10d42af96..cb63aab995 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -39,8 +39,7 @@ case class DataMessageHandler( walletCreationTimeOpt: Option[Instant], queue: SourceQueue[NodeStreamMessage], peerMessageSenderApi: PeerMessageSenderApi, - state: NodeState, - filterBatchCache: Set[CompactFilterMessage])(implicit + state: NodeState)(implicit ec: ExecutionContext, appConfig: NodeAppConfig, chainConfig: ChainAppConfig) @@ -202,11 +201,11 @@ case class DataMessageHandler( val filterSyncState = state match { case f: FilterSync => f case s @ (_: DoneSyncing | _: FilterHeaderSync) => - FilterSync(peer, s.peers, s.waitingForDisconnection) + FilterSync(peer, s.peers, s.waitingForDisconnection, Set.empty) case x @ (_: MisbehavingPeer | _: RemovePeers | _: HeaderSync) => sys.error(s"Incorrect state for handling filter messages, got=$x") } - val filterBatch = filterBatchCache.+(filter) + val filterBatch = filterSyncState.filterBatchCache.+(filter) val batchSizeFull: Boolean = filterBatch.size == chainConfig.filterBatchSize for { @@ -246,7 +245,8 @@ case class DataMessageHandler( syncIfHeadersAhead(filterSyncState) } else { val res = filterHeaderSyncStateOpt match { - case Some(filterSyncState) => filterSyncState + case Some(filterSyncState) => + filterSyncState.copy(filterBatchCache = newBatch) case None => val d = DoneSyncing(filterSyncState.peers, filterSyncState.waitingForDisconnection) @@ -260,7 +260,6 @@ case class DataMessageHandler( } yield { this.copy( chainApi = newChainApi, - filterBatchCache = newBatch, state = newDmhState ) } @@ -570,7 +569,7 @@ case class DataMessageHandler( private def sendNextGetCompactFilterCommand( peerMessageSenderApi: PeerMessageSenderApi, startHeight: Int, - syncNodeState: SyncNodeState): Future[Option[NodeState.FilterSync]] = { + fs: NodeState.FilterSync): Future[Option[NodeState.FilterSync]] = { PeerManager .sendNextGetCompactFilterCommand( @@ -578,15 +577,16 @@ case class DataMessageHandler( chainApi = chainApi, filterBatchSize = chainConfig.filterBatchSize, startHeight = startHeight, - peer = syncNodeState.syncPeer + peer = fs.syncPeer ) .map { isSyncing => if (isSyncing) { - val fs = NodeState.FilterSync(syncPeer = syncNodeState.syncPeer, - peers = syncNodeState.peers, - waitingForDisconnection = - syncNodeState.waitingForDisconnection) - Some(fs) + val newState = NodeState.FilterSync( + syncPeer = fs.syncPeer, + peers = fs.peers, + waitingForDisconnection = fs.waitingForDisconnection, + filterBatchCache = fs.filterBatchCache) + Some(newState) } else None } } @@ -597,9 +597,15 @@ case class DataMessageHandler( syncNodeState: SyncNodeState): Future[Option[NodeState.FilterSync]] = { logger.info(s"Beginning to sync filters from startHeight=$startHeight") + val fs = syncNodeState match { + case x @ (_: HeaderSync | _: FilterHeaderSync) => + FilterSync(x.syncPeer, x.peers, x.waitingForDisconnection, Set.empty) + case fs: FilterSync => fs + } + sendNextGetCompactFilterCommand(peerMessageSenderApi = peerMessageSenderApi, startHeight = startHeight, - syncNodeState = syncNodeState) + fs = fs) } private def handleInventoryMsg(