Call handleDataPayloadHelper explicitly on DataMessageHandlerState (#4921)

* Call handleDataPayloadHelper explicitly on DataMessageHandlerState

* revert changes from other PR

* revert logs
This commit is contained in:
Chris Stewart 2022-12-14 09:28:56 -06:00 committed by GitHub
parent 6293d45a37
commit 0d76a06331
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -70,8 +70,51 @@ case class DataMessageHandler(
payload: DataPayload,
peerMsgSender: PeerMessageSender,
peer: Peer): Future[DataMessageHandler] = {
state match {
case _: ValidatingHeaders =>
val resultF = handleDataPayloadValidState(payload, peerMsgSender, peer)
//process messages from all peers
resultF.failed.foreach { err =>
logger.error(s"Failed to handle data payload=${payload} from $peer",
err)
}
resultF.recoverWith { case NonFatal(_) =>
Future.successful(this)
}
case HeaderSync =>
if (syncPeer.isEmpty || peer != syncPeer.get) {
//ignore message from peers that we aren't syncing with during IBD
logger.warn(
s"Ignoring message ${payload.commandName} from $peer because we are syncing with this peer currently. syncPeer=$syncPeer")
Future.successful(this)
} else {
val resultF =
handleDataPayloadValidState(payload, peerMsgSender, peer)
resultF.failed.foreach { err =>
logger.error(s"Failed to handle data payload=${payload} from $peer",
err)
}
resultF.recoverWith { case NonFatal(_) =>
Future.successful(this)
}
}
case PostHeaderSync =>
val resultF = handleDataPayloadValidState(payload, peerMsgSender, peer)
resultF.recoverWith { case NonFatal(_) =>
Future.successful(this)
}
}
lazy val resultF = payload match {
}
/** Processes a [[DataPayload]] if our [[DataMessageHandlerState]] is valid.
* We ignore messages from certain peers when we are in initial block download.
*/
private def handleDataPayloadValidState(
payload: DataPayload,
peerMsgSender: PeerMessageSender,
peer: Peer): Future[DataMessageHandler] = {
payload match {
case checkpoint: CompactFilterCheckPointMessage =>
logger.debug(
s"Got ${checkpoint.filterHeaders.size} checkpoints ${checkpoint} from $peer")
@ -227,8 +270,6 @@ case class DataMessageHandler(
case HeadersMessage(count, headers) =>
logger.info(
s"Received headers message with ${count.toInt} headers from $peer")
logger.trace(
s"Received headers=${headers.map(_.hashBE.hex).mkString("[", ",", "]")}")
val chainApiHeaderProcessF: Future[DataMessageHandler] = for {
newChainApi <- chainApi.setSyncing(count.toInt > 0)
processed <- newChainApi.processHeaders(headers)
@ -275,9 +316,11 @@ case class DataMessageHandler(
for {
_ <- Future.sequence(removeFs)
newSyncing <- askF
} yield newDmh.copy(syncing = newSyncing,
state = HeaderSync,
syncPeer = newSyncPeer)
} yield {
newDmh.copy(syncing = newSyncing,
state = HeaderSync,
syncPeer = newSyncPeer)
}
case _: DataMessageHandlerState =>
Future.successful(newDmh)
@ -287,7 +330,7 @@ case class DataMessageHandler(
logger.debug(
List(s"Received headers=${count.toInt} in one message,",
"which is less than max. This means we are synced,",
"not requesting more.")
s"not requesting more. state=$state")
.mkString(" "))
// If we are in neutrino mode, we might need to start fetching filters and their headers
// if we are syncing we should do this, however, sometimes syncing isn't a good enough check,
@ -353,8 +396,9 @@ case class DataMessageHandler(
s"Starting to fetch filter headers in data message handler")
val newSyncingF =
sendFirstGetCompactFilterHeadersCommand(peerMsgSender)
newSyncingF.map(newSyncing =>
newDmh.copy(syncing = newSyncing))
newSyncingF.map { newSyncing =>
newDmh.copy(syncing = newSyncing)
}
} else {
Try(initialSyncDone.map(_.success(Done)))
Future.successful(newDmh)
@ -457,30 +501,6 @@ case class DataMessageHandler(
case invMsg: InventoryMessage =>
handleInventoryMsg(invMsg = invMsg, peerMsgSender = peerMsgSender)
}
if (state.isInstanceOf[ValidatingHeaders]) {
//process messages from all peers
resultF.failed.foreach { err =>
logger.error(s"Failed to handle data payload=${payload} from $peer",
err)
}
resultF.recoverWith { case NonFatal(_) =>
Future.successful(this)
}
} else if (syncPeer.isEmpty || peer != syncPeer.get) {
//in other states, process messages only from syncPeer
logger.debug(s"Ignoring ${payload.commandName} from $peer")
Future.successful(this)
} else {
resultF.failed.foreach { err =>
logger.error(s"Failed to handle data payload=${payload} from $peer",
err)
}
resultF.recoverWith { case NonFatal(_) =>
Future.successful(this)
}
}
}
/** syncs filter headers in case the header chain is still ahead post filter sync */