Move filterBatchCache into NodeState.FilterSync (#5253)

This commit is contained in:
Chris Stewart 2023-10-05 14:56:08 -05:00 committed by GitHub
parent a40be1a6bc
commit 522821869d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 26 additions and 19 deletions

View file

@ -1,5 +1,7 @@
package org.bitcoins.core.api.node
import org.bitcoins.core.p2p.CompactFilterMessage
sealed abstract class NodeState {
def isSyncing: Boolean
@ -72,7 +74,8 @@ object NodeState {
case class FilterSync(
syncPeer: Peer,
peers: Set[Peer],
waitingForDisconnection: Set[Peer])
waitingForDisconnection: Set[Peer],
filterBatchCache: Set[CompactFilterMessage])
extends SyncNodeState
case class MisbehavingPeer(

View file

@ -57,8 +57,7 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest {
walletCreationTimeOpt = None,
queue = peerManager.dataMessageQueueOpt.get,
peerMessageSenderApi = peerManager,
state = HeaderSync(peer, peerManager.peers, Set.empty),
filterBatchCache = Set.empty
state = HeaderSync(peer, peerManager.peers, Set.empty)
)(node.executionContext, node.nodeAppConfig, node.chainConfig)
// Use signet genesis block header, this should be invalid for regtest

View file

@ -745,8 +745,7 @@ case class PeerManager(
walletCreationTimeOpt = walletCreationTimeOpt,
queue = queue,
peerMessageSenderApi = this,
state = DoneSyncing(peers = peers, waitingForDisconnection = Set.empty),
filterBatchCache = Set.empty
state = DoneSyncing(peers = peers, waitingForDisconnection = Set.empty)
)
}

View file

@ -39,8 +39,7 @@ case class DataMessageHandler(
walletCreationTimeOpt: Option[Instant],
queue: SourceQueue[NodeStreamMessage],
peerMessageSenderApi: PeerMessageSenderApi,
state: NodeState,
filterBatchCache: Set[CompactFilterMessage])(implicit
state: NodeState)(implicit
ec: ExecutionContext,
appConfig: NodeAppConfig,
chainConfig: ChainAppConfig)
@ -202,11 +201,11 @@ case class DataMessageHandler(
val filterSyncState = state match {
case f: FilterSync => f
case s @ (_: DoneSyncing | _: FilterHeaderSync) =>
FilterSync(peer, s.peers, s.waitingForDisconnection)
FilterSync(peer, s.peers, s.waitingForDisconnection, Set.empty)
case x @ (_: MisbehavingPeer | _: RemovePeers | _: HeaderSync) =>
sys.error(s"Incorrect state for handling filter messages, got=$x")
}
val filterBatch = filterBatchCache.+(filter)
val filterBatch = filterSyncState.filterBatchCache.+(filter)
val batchSizeFull: Boolean =
filterBatch.size == chainConfig.filterBatchSize
for {
@ -246,7 +245,8 @@ case class DataMessageHandler(
syncIfHeadersAhead(filterSyncState)
} else {
val res = filterHeaderSyncStateOpt match {
case Some(filterSyncState) => filterSyncState
case Some(filterSyncState) =>
filterSyncState.copy(filterBatchCache = newBatch)
case None =>
val d = DoneSyncing(filterSyncState.peers,
filterSyncState.waitingForDisconnection)
@ -260,7 +260,6 @@ case class DataMessageHandler(
} yield {
this.copy(
chainApi = newChainApi,
filterBatchCache = newBatch,
state = newDmhState
)
}
@ -570,7 +569,7 @@ case class DataMessageHandler(
private def sendNextGetCompactFilterCommand(
peerMessageSenderApi: PeerMessageSenderApi,
startHeight: Int,
syncNodeState: SyncNodeState): Future[Option[NodeState.FilterSync]] = {
fs: NodeState.FilterSync): Future[Option[NodeState.FilterSync]] = {
PeerManager
.sendNextGetCompactFilterCommand(
@ -578,15 +577,16 @@ case class DataMessageHandler(
chainApi = chainApi,
filterBatchSize = chainConfig.filterBatchSize,
startHeight = startHeight,
peer = syncNodeState.syncPeer
peer = fs.syncPeer
)
.map { isSyncing =>
if (isSyncing) {
val fs = NodeState.FilterSync(syncPeer = syncNodeState.syncPeer,
peers = syncNodeState.peers,
waitingForDisconnection =
syncNodeState.waitingForDisconnection)
Some(fs)
val newState = NodeState.FilterSync(
syncPeer = fs.syncPeer,
peers = fs.peers,
waitingForDisconnection = fs.waitingForDisconnection,
filterBatchCache = fs.filterBatchCache)
Some(newState)
} else None
}
}
@ -597,9 +597,15 @@ case class DataMessageHandler(
syncNodeState: SyncNodeState): Future[Option[NodeState.FilterSync]] = {
logger.info(s"Beginning to sync filters from startHeight=$startHeight")
val fs = syncNodeState match {
case x @ (_: HeaderSync | _: FilterHeaderSync) =>
FilterSync(x.syncPeer, x.peers, x.waitingForDisconnection, Set.empty)
case fs: FilterSync => fs
}
sendNextGetCompactFilterCommand(peerMessageSenderApi = peerMessageSenderApi,
startHeight = startHeight,
syncNodeState = syncNodeState)
fs = fs)
}
private def handleInventoryMsg(