From 26290bf4c0b1aeb38b6acd03537c7c8122e8b91d Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Thu, 24 Aug 2023 07:49:15 -0500 Subject: [PATCH] Rework handling of `BlockMessage`, fix bug where callbacks executed when they shouldn't have been (#5201) * Rework handling of BlockMessage, fix bug where callbacks executed when they shouldn't have been * scalafmt --- .../networking/peer/DataMessageHandler.scala | 67 +++++++++---------- 1 file changed, 30 insertions(+), 37 deletions(-) diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index 796d83ffcc..f5ffaa874d 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -357,47 +357,40 @@ case class DataMessageHandler( } case msg: BlockMessage => val block = msg.block - logger.info( - s"Received block message with hash ${block.blockHeader.hash.flip.hex}") - val newMsgHandlerF = { - chainApi + val newMsgHandlerF = for { + isIBD <- isChainIBD + headerOpt <- chainApi .getHeader(block.blockHeader.hashBE) - .flatMap { headerOpt => - if (headerOpt.isEmpty) { - logger.debug("Processing block's header...") - val headersMessage = - HeadersMessage(CompactSizeUInt.one, - Vector(block.blockHeader)) - for { - isIBD <- isChainIBD - newMsgHandler <- { - // if in IBD, do not process this header, just execute callbacks - if (!isIBD) { - handleDataPayload(payload = headersMessage, - peerData = peerData) - } else { - //else ignore it until we are done with ibd - logger.info( - s"Received block=${block.blockHeader.hashBE.hex} while in IBD, ignoring it until IBD complete state=${state}.") - Future.successful(this) - } - } - } yield { - newMsgHandler - } - } else Future.successful(this) + newMsgHandler <- { + if (isIBD && headerOpt.isEmpty) { + //ignore block, don't execute callbacks until IBD is done + logger.info( + s"Received block=${block.blockHeader.hashBE.hex} while in IBD, ignoring it until IBD complete state=${state}.") + Future.successful(this) + } else if (!isIBD && headerOpt.isEmpty) { + logger.info( + s"Received block=${block.blockHeader.hash.flip.hex}, processing block's header... state=$state") + val headersMessage = + HeadersMessage(CompactSizeUInt.one, Vector(block.blockHeader)) + val newDmhF = handleDataPayload(payload = headersMessage, + peerData = peerData) + newDmhF.flatMap { dmh => + appConfig.callBacks + .executeOnBlockReceivedCallbacks(block) + .map(_ => dmh) + } + } else { + logger.info( + s"Received block=${block.blockHeader.hash.flip.hex} state=$state") + appConfig.callBacks + .executeOnBlockReceivedCallbacks(block) + .map(_ => this) } - } + } + } yield newMsgHandler - for { - handler <- newMsgHandlerF - _ <- - appConfig.callBacks - .executeOnBlockReceivedCallbacks(block) - } yield { - handler - } + newMsgHandlerF case TransactionMessage(tx) => logger.trace( s"Received txmsg=${tx.txIdBE}, processing given callbacks")