From 0d76a0633145930ae50917aa6347637102a841fb Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Wed, 14 Dec 2022 09:28:56 -0600 Subject: [PATCH] Call `handleDataPayloadHelper` explicitly on `DataMessageHandlerState` (#4921) * Call handleDataPayloadHelper explicitly on DataMessageHandlerState * revert changes from other PR * revert logs --- .../networking/peer/DataMessageHandler.scala | 86 ++++++++++++------- 1 file changed, 53 insertions(+), 33 deletions(-) diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index c96da5333a..2a87b880b9 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -70,8 +70,51 @@ case class DataMessageHandler( payload: DataPayload, peerMsgSender: PeerMessageSender, peer: Peer): Future[DataMessageHandler] = { + state match { + case _: ValidatingHeaders => + val resultF = handleDataPayloadValidState(payload, peerMsgSender, peer) + //process messages from all peers + resultF.failed.foreach { err => + logger.error(s"Failed to handle data payload=${payload} from $peer", + err) + } + resultF.recoverWith { case NonFatal(_) => + Future.successful(this) + } + case HeaderSync => + if (syncPeer.isEmpty || peer != syncPeer.get) { + //ignore message from peers that we aren't syncing with during IBD + logger.warn( + s"Ignoring message ${payload.commandName} from $peer because we are syncing with this peer currently. syncPeer=$syncPeer") + Future.successful(this) + } else { + val resultF = + handleDataPayloadValidState(payload, peerMsgSender, peer) + resultF.failed.foreach { err => + logger.error(s"Failed to handle data payload=${payload} from $peer", + err) + } + resultF.recoverWith { case NonFatal(_) => + Future.successful(this) + } + } + case PostHeaderSync => + val resultF = handleDataPayloadValidState(payload, peerMsgSender, peer) + resultF.recoverWith { case NonFatal(_) => + Future.successful(this) + } + } - lazy val resultF = payload match { + } + + /** Processes a [[DataPayload]] if our [[DataMessageHandlerState]] is valid. + * We ignore messages from certain peers when we are in initial block download. + */ + private def handleDataPayloadValidState( + payload: DataPayload, + peerMsgSender: PeerMessageSender, + peer: Peer): Future[DataMessageHandler] = { + payload match { case checkpoint: CompactFilterCheckPointMessage => logger.debug( s"Got ${checkpoint.filterHeaders.size} checkpoints ${checkpoint} from $peer") @@ -227,8 +270,6 @@ case class DataMessageHandler( case HeadersMessage(count, headers) => logger.info( s"Received headers message with ${count.toInt} headers from $peer") - logger.trace( - s"Received headers=${headers.map(_.hashBE.hex).mkString("[", ",", "]")}") val chainApiHeaderProcessF: Future[DataMessageHandler] = for { newChainApi <- chainApi.setSyncing(count.toInt > 0) processed <- newChainApi.processHeaders(headers) @@ -275,9 +316,11 @@ case class DataMessageHandler( for { _ <- Future.sequence(removeFs) newSyncing <- askF - } yield newDmh.copy(syncing = newSyncing, - state = HeaderSync, - syncPeer = newSyncPeer) + } yield { + newDmh.copy(syncing = newSyncing, + state = HeaderSync, + syncPeer = newSyncPeer) + } case _: DataMessageHandlerState => Future.successful(newDmh) @@ -287,7 +330,7 @@ case class DataMessageHandler( logger.debug( List(s"Received headers=${count.toInt} in one message,", "which is less than max. This means we are synced,", - "not requesting more.") + s"not requesting more. state=$state") .mkString(" ")) // If we are in neutrino mode, we might need to start fetching filters and their headers // if we are syncing we should do this, however, sometimes syncing isn't a good enough check, @@ -353,8 +396,9 @@ case class DataMessageHandler( s"Starting to fetch filter headers in data message handler") val newSyncingF = sendFirstGetCompactFilterHeadersCommand(peerMsgSender) - newSyncingF.map(newSyncing => - newDmh.copy(syncing = newSyncing)) + newSyncingF.map { newSyncing => + newDmh.copy(syncing = newSyncing) + } } else { Try(initialSyncDone.map(_.success(Done))) Future.successful(newDmh) @@ -457,30 +501,6 @@ case class DataMessageHandler( case invMsg: InventoryMessage => handleInventoryMsg(invMsg = invMsg, peerMsgSender = peerMsgSender) } - - if (state.isInstanceOf[ValidatingHeaders]) { - //process messages from all peers - resultF.failed.foreach { err => - logger.error(s"Failed to handle data payload=${payload} from $peer", - err) - } - resultF.recoverWith { case NonFatal(_) => - Future.successful(this) - } - } else if (syncPeer.isEmpty || peer != syncPeer.get) { - //in other states, process messages only from syncPeer - logger.debug(s"Ignoring ${payload.commandName} from $peer") - Future.successful(this) - } else { - resultF.failed.foreach { err => - logger.error(s"Failed to handle data payload=${payload} from $peer", - err) - } - resultF.recoverWith { case NonFatal(_) => - Future.successful(this) - } - - } } /** syncs filter headers in case the header chain is still ahead post filter sync */