From ce6f0d15074dc3716e8a77689c5f12c423794d8b Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Wed, 26 Apr 2023 10:46:24 -0500 Subject: [PATCH] Refactor HeadersMessage to have helper method for HeadersMessage (#5060) --- .../scala/org/bitcoins/node/PeerManager.scala | 2 +- .../networking/peer/DataMessageHandler.scala | 322 +++++++++--------- 2 files changed, 168 insertions(+), 156 deletions(-) diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index ce9163aeb1..9b734e7115 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -123,7 +123,7 @@ case class PeerManager( randomPeerF.flatMap(peer => peerDataMap(peer).peerMessageSender) } - def createInDb( + private def createInDb( peer: Peer, serviceIdentifier: ServiceIdentifier): Future[PeerDb] = { logger.debug(s"Adding peer to db $peer") diff --git a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala index db97875073..8f92960daa 100644 --- a/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala +++ b/node/src/main/scala/org/bitcoins/node/networking/peer/DataMessageHandler.scala @@ -8,6 +8,7 @@ import org.bitcoins.core.api.node.NodeType import org.bitcoins.core.gcs.{BlockFilter, GolombFilter} import org.bitcoins.core.p2p._ import org.bitcoins.core.protocol.CompactSizeUInt +import org.bitcoins.core.protocol.blockchain.BlockHeader import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE} import org.bitcoins.node.config.NodeAppConfig import org.bitcoins.node.models._ @@ -266,160 +267,12 @@ case class DataMessageHandler( copy(chainApi = processed) } - val getHeadersF: Future[DataMessageHandler] = - chainApiHeaderProcessF - .flatMap { newDmh => - val newApi = newDmh.chainApi - if (headers.nonEmpty) { - - val lastHeader = headers.last - val lastHash = lastHeader.hash - newApi.getBlockCount().map { count => - logger.trace( - s"Processed headers, most recent has height=$count and hash=$lastHash.") - } - - if (count.toInt == HeadersMessage.MaxHeadersCount) { - - state match { - case HeaderSync => - logger.info( - s"Received maximum amount of headers in one header message. This means we are not synced, requesting more") - //ask for headers more from the same peer - peerMsgSender - .sendGetHeadersMessage(lastHash) - .map(_ => newDmh) - - case ValidatingHeaders(inSyncWith, _, _) => - //In the validation stage, some peer sent max amount of valid headers, revert to HeaderSync with that peer as syncPeer - //disconnect the ones that we have already checked since they are at least out of sync by 2000 headers - val removeFs = - inSyncWith.map(p => peerManager.removePeer(p)) - - val newSyncPeer = Some(peer) - - //ask for more headers now - val askF = peerMsgSender - .sendGetHeadersMessage(lastHash) - .map(_ => syncing) - - for { - _ <- Future.sequence(removeFs) - newSyncing <- askF - } yield { - val syncPeerOpt = if (newSyncing) { - newSyncPeer - } else { - None - } - newDmh.copy(state = HeaderSync, syncPeer = syncPeerOpt) - } - - case _: DataMessageHandlerState => - Future.successful(newDmh) - } - - } else { - logger.debug( - List(s"Received headers=${count.toInt} in one message,", - "which is less than max. This means we are synced,", - s"not requesting more. state=$state") - .mkString(" ")) - // If we are in neutrino mode, we might need to start fetching filters and their headers - // if we are syncing we should do this, however, sometimes syncing isn't a good enough check, - // so we also check if our cached filter heights have been set as well, if they haven't then - // we probably need to sync filters - state match { - case HeaderSync => - // headers are synced now with the current sync peer, now move to validating it for all peers - assert(syncPeer.get == peer) - - if (peerManager.peers.size > 1) { - val newState = - ValidatingHeaders(inSyncWith = Set(peer), - verifyingWith = - peerManager.peers.toSet, - failedCheck = Set.empty[Peer]) - - logger.info( - s"Starting to validate headers now. Verifying with ${newState.verifyingWith}") - - val getHeadersAllF = peerManager.peerDataMap - .filter(_._1 != peer) - .map( - _._2.peerMessageSender.flatMap( - _.sendGetHeadersMessage(lastHash)) - ) - - Future - .sequence(getHeadersAllF) - .map(_ => newDmh.copy(state = newState)) - } else { - //if just one peer then can proceed ahead directly - peerManager - .fetchCompactFilterHeaders(newDmh) - .map(_.copy(state = PostHeaderSync)) - } - - case headerState @ ValidatingHeaders(inSyncWith, _, _) => - //add the current peer to it - val newHeaderState = - headerState.copy(inSyncWith = inSyncWith + peer) - val newDmh2 = newDmh.copy(state = newHeaderState) - - if (newHeaderState.validated) { - // If we are in neutrino mode, we might need to start fetching filters and their headers - // if we are syncing we should do this, however, sometimes syncing isn't a good enough check, - // so we also check if our cached filter heights have been set as well, if they haven't then - // we probably need to sync filters - - peerManager - .fetchCompactFilterHeaders(newDmh2) - .map(_.copy(state = PostHeaderSync)) - } else { - //do nothing, we are still waiting for some peers to send headers or timeout - Future.successful(newDmh2) - } - - case PostHeaderSync => - //send further requests to the same one that sent this - logger.info( - s"Starting to fetch filter headers in data message handler") - val newSyncingF = - PeerManager.sendFirstGetCompactFilterHeadersCommand( - peerMsgSender, - chainApi) - newSyncingF.map { newSyncing => - val syncPeerOpt = if (newSyncing) { - syncPeer - } else { - None - } - newDmh.copy(syncPeer = syncPeerOpt) - } - } - } - } else { - //what if we are synced exactly by the 2000th header - state match { - case headerState @ ValidatingHeaders(inSyncWith, _, _) => - val newHeaderState = - headerState.copy(inSyncWith = inSyncWith + peer) - val newDmh2 = newDmh.copy(state = newHeaderState) - if (newHeaderState.validated) { - peerManager - .fetchCompactFilterHeaders(newDmh2) - .map(_.copy(state = PostHeaderSync)) - } else { - //do nothing, we are still waiting for some peers to send headers - Future.successful(newDmh2) - } - case _: DataMessageHandlerState => - Future.successful(newDmh) - } - } - } - + val getHeadersF: Future[DataMessageHandler] = { + for { + newDmh <- chainApiHeaderProcessF + dmh <- getHeaders(newDmh, headers, peerMsgSender, peer) + } yield dmh + } getHeadersF.recoverWith { case _: DuplicateHeaders => logger.warn( @@ -591,7 +444,7 @@ case class DataMessageHandler( Future.successful(newDmh) } - case _: DataMessageHandlerState => + case PostHeaderSync => Future.successful(this) } } @@ -769,6 +622,165 @@ case class DataMessageHandler( sortedBlockFiltersF } + + private def getHeaders( + newDmh: DataMessageHandler, + headers: Vector[BlockHeader], + peerMsgSender: PeerMessageSender, + peer: Peer): Future[DataMessageHandler] = { + val count = headers.length + val getHeadersF: Future[DataMessageHandler] = { + val newApi = newDmh.chainApi + if (headers.nonEmpty) { + + val lastHeader = headers.last + val lastHash = lastHeader.hash + newApi.getBlockCount().map { count => + logger.trace( + s"Processed headers, most recent has height=$count and hash=$lastHash.") + } + + if (count == HeadersMessage.MaxHeadersCount) { + + state match { + case HeaderSync => + logger.info( + s"Received maximum amount of headers in one header message. This means we are not synced, requesting more") + //ask for headers more from the same peer + peerMsgSender + .sendGetHeadersMessage(lastHash) + .map(_ => newDmh) + + case ValidatingHeaders(inSyncWith, _, _) => + //In the validation stage, some peer sent max amount of valid headers, revert to HeaderSync with that peer as syncPeer + //disconnect the ones that we have already checked since they are at least out of sync by 2000 headers + val removeFs = + inSyncWith.map(p => peerManager.removePeer(p)) + + val newSyncPeer = Some(peer) + + //ask for more headers now + val askF = peerMsgSender + .sendGetHeadersMessage(lastHash) + .map(_ => syncing) + + for { + _ <- Future.sequence(removeFs) + newSyncing <- askF + } yield { + val syncPeerOpt = if (newSyncing) { + newSyncPeer + } else { + None + } + newDmh.copy(state = HeaderSync, syncPeer = syncPeerOpt) + } + + case _: DataMessageHandlerState => + Future.successful(newDmh) + } + + } else { + logger.debug( + List(s"Received headers=${count.toInt} in one message,", + "which is less than max. This means we are synced,", + s"not requesting more. state=$state") + .mkString(" ")) + // If we are in neutrino mode, we might need to start fetching filters and their headers + // if we are syncing we should do this, however, sometimes syncing isn't a good enough check, + // so we also check if our cached filter heights have been set as well, if they haven't then + // we probably need to sync filters + state match { + case HeaderSync => + // headers are synced now with the current sync peer, now move to validating it for all peers + assert(syncPeer.get == peer) + + if (peerManager.peers.size > 1) { + val newState = + ValidatingHeaders(inSyncWith = Set(peer), + verifyingWith = peerManager.peers.toSet, + failedCheck = Set.empty[Peer]) + + logger.info( + s"Starting to validate headers now. Verifying with ${newState.verifyingWith}") + + val getHeadersAllF = peerManager.peerDataMap + .filter(_._1 != peer) + .map( + _._2.peerMessageSender.flatMap( + _.sendGetHeadersMessage(lastHash)) + ) + + Future + .sequence(getHeadersAllF) + .map(_ => newDmh.copy(state = newState)) + } else { + //if just one peer then can proceed ahead directly + peerManager + .fetchCompactFilterHeaders(newDmh) + .map(_.copy(state = PostHeaderSync)) + } + + case headerState @ ValidatingHeaders(inSyncWith, _, _) => + //add the current peer to it + val newHeaderState = + headerState.copy(inSyncWith = inSyncWith + peer) + val newDmh2 = newDmh.copy(state = newHeaderState) + + if (newHeaderState.validated) { + // If we are in neutrino mode, we might need to start fetching filters and their headers + // if we are syncing we should do this, however, sometimes syncing isn't a good enough check, + // so we also check if our cached filter heights have been set as well, if they haven't then + // we probably need to sync filters + + peerManager + .fetchCompactFilterHeaders(newDmh2) + .map(_.copy(state = PostHeaderSync)) + } else { + //do nothing, we are still waiting for some peers to send headers or timeout + Future.successful(newDmh2) + } + + case PostHeaderSync => + //send further requests to the same one that sent this + logger.info( + s"Starting to fetch filter headers in data message handler") + val newSyncingF = + PeerManager.sendFirstGetCompactFilterHeadersCommand( + peerMsgSender, + chainApi) + newSyncingF.map { newSyncing => + val syncPeerOpt = if (newSyncing) { + syncPeer + } else { + None + } + newDmh.copy(syncPeer = syncPeerOpt) + } + } + } + } else { + //what if we are synced exactly by the 2000th header + state match { + case headerState @ ValidatingHeaders(inSyncWith, _, _) => + val newHeaderState = + headerState.copy(inSyncWith = inSyncWith + peer) + val newDmh2 = newDmh.copy(state = newHeaderState) + if (newHeaderState.validated) { + peerManager + .fetchCompactFilterHeaders(newDmh2) + .map(_.copy(state = PostHeaderSync)) + } else { + //do nothing, we are still waiting for some peers to send headers + Future.successful(newDmh2) + } + case HeaderSync | PostHeaderSync => + Future.successful(newDmh) + } + } + } + getHeadersF + } } sealed trait StreamDataMessageWrapper