Remove PeerMessageSenderApi param from DataMessageHandler (#5453)

This commit is contained in:
Chris Stewart 2024-03-05 11:12:17 -06:00 committed by GitHub
parent ddb6f01d65
commit 238948e185
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 35 additions and 25 deletions

View File

@ -52,7 +52,6 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest {
peer = node.peerManager.peers.head
chainApi <- node.chainApiFromDb()
_ = require(peerManager.getPeerData(peer).isDefined)
peerMsgSender = peerManager.getPeerData(peer).get.peerMessageSender
peerFinder = PeerFinder(peerManagerApi = peerManager,
paramPeers = Vector.empty,
queue = node)(system.dispatcher,
@ -62,7 +61,6 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest {
dataMessageHandler = DataMessageHandler(
chainApi = chainApi,
walletCreationTimeOpt = None,
peerMessageSenderApi = peerMsgSender,
peerManager = peerManager,
state = HeaderSync(peer,
peerManager.peerWithServicesDataMap,

View File

@ -616,11 +616,9 @@ case class PeerManager(
s"Ignoring received msg=${payload.commandName} from peer=$peer because it was disconnected, peers=$peers state=${state}")
Future.successful(state)
case Some(peerData) =>
val peerMsgSender = PeerMessageSender(peerData.peerConnection)
val dmh = DataMessageHandler(
chainApi = ChainHandler.fromDatabase(),
walletCreationTimeOpt = walletCreationTimeOpt,
peerMessageSenderApi = peerMsgSender,
peerManager = this,
state = runningState
)

View File

@ -31,7 +31,6 @@ import scala.util.control.NonFatal
case class DataMessageHandler(
chainApi: ChainApi,
walletCreationTimeOpt: Option[Instant],
peerMessageSenderApi: PeerMessageSenderApi,
peerManager: PeerManager,
state: NodeRunningState)(implicit
ec: ExecutionContext,
@ -148,16 +147,18 @@ case class DataMessageHandler(
s.peerDataMap,
s.waitingForDisconnection,
s.peerFinder)
handleFilterHeadersMessage(filterHeaderSync,
filterHeader,
chainApi,
peer)
handleFilterHeadersMessage(filterHeaderSync = filterHeaderSync,
filterHeader = filterHeader,
chainApi = chainApi,
peerMessageSenderApi =
peerData.peerMessageSender)
.map(s => copy(state = s))
case filterHeaderSync: FilterHeaderSync =>
handleFilterHeadersMessage(filterHeaderSync,
filterHeader,
chainApi,
peer)
handleFilterHeadersMessage(filterHeaderSync = filterHeaderSync,
filterHeader = filterHeader,
chainApi = chainApi,
peerMessageSenderApi =
peerData.peerMessageSender)
.map(s => copy(state = s))
case x @ (_: NodeShuttingDown | _: FilterSync) =>
logger.warn(
@ -173,7 +174,10 @@ case class DataMessageHandler(
s"Received ${filter.commandName}, filter.blockHash=${filter.blockHash.flip} state=$state")
state match {
case f: FilterSync =>
handleFilterMessage(f, filter)
handleFilterMessage(filterSyncState = f,
filter = filter,
peerMessageSenderApi =
peerData.peerMessageSender)
.map(s => copy(state = s))
case s @ (_: DoneSyncing | _: FilterHeaderSync) =>
val f = FilterSync(peer,
@ -181,7 +185,10 @@ case class DataMessageHandler(
s.waitingForDisconnection,
Set.empty,
s.peerFinder)
handleFilterMessage(f, filter)
handleFilterMessage(filterSyncState = f,
filter = filter,
peerMessageSenderApi =
peerData.peerMessageSender)
.map(s => copy(state = s))
case x @ (_: HeaderSync | _: NodeShuttingDown) =>
logger.warn(
@ -213,8 +220,8 @@ case class DataMessageHandler(
tx.toBaseTx
} else tx // send normal serialization
peerMessageSenderApi.sendTransactionMessage(transaction =
txToBroadcast)
peerData.peerMessageSender
.sendTransactionMessage(transaction = txToBroadcast)
case None =>
logger.warn(
s"Got request to send data with hash=${inv.hash}, but found nothing")
@ -329,7 +336,8 @@ case class DataMessageHandler(
logger.warn(s"Merkleblock is not supported")
Future.successful(this)
case invMsg: InventoryMessage =>
handleInventoryMsg(invMsg = invMsg)
handleInventoryMsg(invMsg = invMsg,
peerMessageSenderApi = peerData.peerMessageSender)
}
}
@ -338,7 +346,8 @@ case class DataMessageHandler(
/** syncs filter headers in case the header chain is still ahead post filter sync */
private def syncIfHeadersAhead(
syncNodeState: SyncNodeState): Future[NodeRunningState] = {
syncNodeState: SyncNodeState,
peerMessageSenderApi: PeerMessageSenderApi): Future[NodeRunningState] = {
val bestBlockHeaderDbF = chainApi.getBestBlockHeader()
for {
headerHeight <- chainApi.getBestHashBlockHeight()
@ -518,7 +527,9 @@ case class DataMessageHandler(
}
private def handleInventoryMsg(
invMsg: InventoryMessage): Future[DataMessageHandler] = {
invMsg: InventoryMessage,
peerMessageSenderApi: PeerMessageSenderApi): Future[
DataMessageHandler] = {
logger.debug(s"Received inv=${invMsg}")
val invsOptF: Future[Seq[Option[Inventory]]] =
Future.traverse(invMsg.inventories) {
@ -655,8 +666,9 @@ case class DataMessageHandler(
private def getHeaders(
state: HeaderSync,
headers: Vector[BlockHeader],
peer: Peer,
peerMessageSenderApi: PeerMessageSenderApi,
chainApi: ChainApi): Future[NodeRunningState] = {
val peer = peerMessageSenderApi.peer
logger.debug(s"getHeaders() newDmh.state=${state} peer=$peer peers=$peer")
val count = headers.length
val getHeadersF: Future[NodeRunningState] = {
@ -739,7 +751,7 @@ case class DataMessageHandler(
newDmh <- chainApiHeaderProcessF
dmh <- getHeaders(state = headerSyncState,
headers = headers,
peer = peer,
peerMessageSenderApi = peerData.peerMessageSender,
newDmh.chainApi)
} yield dmh
}
@ -780,7 +792,8 @@ case class DataMessageHandler(
filterHeaderSync: FilterHeaderSync,
filterHeader: CompactFilterHeadersMessage,
chainApi: ChainApi,
peer: Peer): Future[NodeRunningState] = {
peerMessageSenderApi: PeerMessageSenderApi): Future[NodeRunningState] = {
val peer = peerMessageSenderApi.peer
val filterHeaders = filterHeader.filterHeaders
val blockCountF = chainApi.getBlockCount()
val bestBlockHashF = chainApi.getBestBlockHash()
@ -828,7 +841,8 @@ case class DataMessageHandler(
private def handleFilterMessage(
filterSyncState: FilterSync,
filter: CompactFilterMessage): Future[NodeRunningState] = {
filter: CompactFilterMessage,
peerMessageSenderApi: PeerMessageSenderApi): Future[NodeRunningState] = {
val filterBatch = filterSyncState.filterBatchCache.+(filter)
val batchSizeFull: Boolean =
filterBatch.size == chainConfig.filterBatchSize
@ -871,7 +885,7 @@ case class DataMessageHandler(
} else Future.successful(Some(filterSyncState))
newDmhState <- {
if (isFiltersSynced) {
syncIfHeadersAhead(filterSyncState)
syncIfHeadersAhead(filterSyncState, peerMessageSenderApi)
} else {
val res = filterHeaderSyncStateOpt match {
case Some(filterSyncState) =>