Refactor HeadersMessage to have helper method for HeadersMessage (#5060)

This commit is contained in:
Chris Stewart 2023-04-26 10:46:24 -05:00 committed by GitHub
parent 1fc6edf825
commit ce6f0d1507
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 168 additions and 156 deletions

View file

@ -123,7 +123,7 @@ case class PeerManager(
randomPeerF.flatMap(peer => peerDataMap(peer).peerMessageSender)
}
def createInDb(
private def createInDb(
peer: Peer,
serviceIdentifier: ServiceIdentifier): Future[PeerDb] = {
logger.debug(s"Adding peer to db $peer")

View file

@ -8,6 +8,7 @@ import org.bitcoins.core.api.node.NodeType
import org.bitcoins.core.gcs.{BlockFilter, GolombFilter}
import org.bitcoins.core.p2p._
import org.bitcoins.core.protocol.CompactSizeUInt
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.models._
@ -266,160 +267,12 @@ case class DataMessageHandler(
copy(chainApi = processed)
}
val getHeadersF: Future[DataMessageHandler] =
chainApiHeaderProcessF
.flatMap { newDmh =>
val newApi = newDmh.chainApi
if (headers.nonEmpty) {
val lastHeader = headers.last
val lastHash = lastHeader.hash
newApi.getBlockCount().map { count =>
logger.trace(
s"Processed headers, most recent has height=$count and hash=$lastHash.")
}
if (count.toInt == HeadersMessage.MaxHeadersCount) {
state match {
case HeaderSync =>
logger.info(
s"Received maximum amount of headers in one header message. This means we are not synced, requesting more")
//ask for headers more from the same peer
peerMsgSender
.sendGetHeadersMessage(lastHash)
.map(_ => newDmh)
case ValidatingHeaders(inSyncWith, _, _) =>
//In the validation stage, some peer sent max amount of valid headers, revert to HeaderSync with that peer as syncPeer
//disconnect the ones that we have already checked since they are at least out of sync by 2000 headers
val removeFs =
inSyncWith.map(p => peerManager.removePeer(p))
val newSyncPeer = Some(peer)
//ask for more headers now
val askF = peerMsgSender
.sendGetHeadersMessage(lastHash)
.map(_ => syncing)
for {
_ <- Future.sequence(removeFs)
newSyncing <- askF
} yield {
val syncPeerOpt = if (newSyncing) {
newSyncPeer
} else {
None
}
newDmh.copy(state = HeaderSync, syncPeer = syncPeerOpt)
}
case _: DataMessageHandlerState =>
Future.successful(newDmh)
}
} else {
logger.debug(
List(s"Received headers=${count.toInt} in one message,",
"which is less than max. This means we are synced,",
s"not requesting more. state=$state")
.mkString(" "))
// If we are in neutrino mode, we might need to start fetching filters and their headers
// if we are syncing we should do this, however, sometimes syncing isn't a good enough check,
// so we also check if our cached filter heights have been set as well, if they haven't then
// we probably need to sync filters
state match {
case HeaderSync =>
// headers are synced now with the current sync peer, now move to validating it for all peers
assert(syncPeer.get == peer)
if (peerManager.peers.size > 1) {
val newState =
ValidatingHeaders(inSyncWith = Set(peer),
verifyingWith =
peerManager.peers.toSet,
failedCheck = Set.empty[Peer])
logger.info(
s"Starting to validate headers now. Verifying with ${newState.verifyingWith}")
val getHeadersAllF = peerManager.peerDataMap
.filter(_._1 != peer)
.map(
_._2.peerMessageSender.flatMap(
_.sendGetHeadersMessage(lastHash))
)
Future
.sequence(getHeadersAllF)
.map(_ => newDmh.copy(state = newState))
} else {
//if just one peer then can proceed ahead directly
peerManager
.fetchCompactFilterHeaders(newDmh)
.map(_.copy(state = PostHeaderSync))
}
case headerState @ ValidatingHeaders(inSyncWith, _, _) =>
//add the current peer to it
val newHeaderState =
headerState.copy(inSyncWith = inSyncWith + peer)
val newDmh2 = newDmh.copy(state = newHeaderState)
if (newHeaderState.validated) {
// If we are in neutrino mode, we might need to start fetching filters and their headers
// if we are syncing we should do this, however, sometimes syncing isn't a good enough check,
// so we also check if our cached filter heights have been set as well, if they haven't then
// we probably need to sync filters
peerManager
.fetchCompactFilterHeaders(newDmh2)
.map(_.copy(state = PostHeaderSync))
} else {
//do nothing, we are still waiting for some peers to send headers or timeout
Future.successful(newDmh2)
}
case PostHeaderSync =>
//send further requests to the same one that sent this
logger.info(
s"Starting to fetch filter headers in data message handler")
val newSyncingF =
PeerManager.sendFirstGetCompactFilterHeadersCommand(
peerMsgSender,
chainApi)
newSyncingF.map { newSyncing =>
val syncPeerOpt = if (newSyncing) {
syncPeer
} else {
None
}
newDmh.copy(syncPeer = syncPeerOpt)
}
}
}
} else {
//what if we are synced exactly by the 2000th header
state match {
case headerState @ ValidatingHeaders(inSyncWith, _, _) =>
val newHeaderState =
headerState.copy(inSyncWith = inSyncWith + peer)
val newDmh2 = newDmh.copy(state = newHeaderState)
if (newHeaderState.validated) {
peerManager
.fetchCompactFilterHeaders(newDmh2)
.map(_.copy(state = PostHeaderSync))
} else {
//do nothing, we are still waiting for some peers to send headers
Future.successful(newDmh2)
}
case _: DataMessageHandlerState =>
Future.successful(newDmh)
}
}
}
val getHeadersF: Future[DataMessageHandler] = {
for {
newDmh <- chainApiHeaderProcessF
dmh <- getHeaders(newDmh, headers, peerMsgSender, peer)
} yield dmh
}
getHeadersF.recoverWith {
case _: DuplicateHeaders =>
logger.warn(
@ -591,7 +444,7 @@ case class DataMessageHandler(
Future.successful(newDmh)
}
case _: DataMessageHandlerState =>
case PostHeaderSync =>
Future.successful(this)
}
}
@ -769,6 +622,165 @@ case class DataMessageHandler(
sortedBlockFiltersF
}
private def getHeaders(
newDmh: DataMessageHandler,
headers: Vector[BlockHeader],
peerMsgSender: PeerMessageSender,
peer: Peer): Future[DataMessageHandler] = {
val count = headers.length
val getHeadersF: Future[DataMessageHandler] = {
val newApi = newDmh.chainApi
if (headers.nonEmpty) {
val lastHeader = headers.last
val lastHash = lastHeader.hash
newApi.getBlockCount().map { count =>
logger.trace(
s"Processed headers, most recent has height=$count and hash=$lastHash.")
}
if (count == HeadersMessage.MaxHeadersCount) {
state match {
case HeaderSync =>
logger.info(
s"Received maximum amount of headers in one header message. This means we are not synced, requesting more")
//ask for headers more from the same peer
peerMsgSender
.sendGetHeadersMessage(lastHash)
.map(_ => newDmh)
case ValidatingHeaders(inSyncWith, _, _) =>
//In the validation stage, some peer sent max amount of valid headers, revert to HeaderSync with that peer as syncPeer
//disconnect the ones that we have already checked since they are at least out of sync by 2000 headers
val removeFs =
inSyncWith.map(p => peerManager.removePeer(p))
val newSyncPeer = Some(peer)
//ask for more headers now
val askF = peerMsgSender
.sendGetHeadersMessage(lastHash)
.map(_ => syncing)
for {
_ <- Future.sequence(removeFs)
newSyncing <- askF
} yield {
val syncPeerOpt = if (newSyncing) {
newSyncPeer
} else {
None
}
newDmh.copy(state = HeaderSync, syncPeer = syncPeerOpt)
}
case _: DataMessageHandlerState =>
Future.successful(newDmh)
}
} else {
logger.debug(
List(s"Received headers=${count.toInt} in one message,",
"which is less than max. This means we are synced,",
s"not requesting more. state=$state")
.mkString(" "))
// If we are in neutrino mode, we might need to start fetching filters and their headers
// if we are syncing we should do this, however, sometimes syncing isn't a good enough check,
// so we also check if our cached filter heights have been set as well, if they haven't then
// we probably need to sync filters
state match {
case HeaderSync =>
// headers are synced now with the current sync peer, now move to validating it for all peers
assert(syncPeer.get == peer)
if (peerManager.peers.size > 1) {
val newState =
ValidatingHeaders(inSyncWith = Set(peer),
verifyingWith = peerManager.peers.toSet,
failedCheck = Set.empty[Peer])
logger.info(
s"Starting to validate headers now. Verifying with ${newState.verifyingWith}")
val getHeadersAllF = peerManager.peerDataMap
.filter(_._1 != peer)
.map(
_._2.peerMessageSender.flatMap(
_.sendGetHeadersMessage(lastHash))
)
Future
.sequence(getHeadersAllF)
.map(_ => newDmh.copy(state = newState))
} else {
//if just one peer then can proceed ahead directly
peerManager
.fetchCompactFilterHeaders(newDmh)
.map(_.copy(state = PostHeaderSync))
}
case headerState @ ValidatingHeaders(inSyncWith, _, _) =>
//add the current peer to it
val newHeaderState =
headerState.copy(inSyncWith = inSyncWith + peer)
val newDmh2 = newDmh.copy(state = newHeaderState)
if (newHeaderState.validated) {
// If we are in neutrino mode, we might need to start fetching filters and their headers
// if we are syncing we should do this, however, sometimes syncing isn't a good enough check,
// so we also check if our cached filter heights have been set as well, if they haven't then
// we probably need to sync filters
peerManager
.fetchCompactFilterHeaders(newDmh2)
.map(_.copy(state = PostHeaderSync))
} else {
//do nothing, we are still waiting for some peers to send headers or timeout
Future.successful(newDmh2)
}
case PostHeaderSync =>
//send further requests to the same one that sent this
logger.info(
s"Starting to fetch filter headers in data message handler")
val newSyncingF =
PeerManager.sendFirstGetCompactFilterHeadersCommand(
peerMsgSender,
chainApi)
newSyncingF.map { newSyncing =>
val syncPeerOpt = if (newSyncing) {
syncPeer
} else {
None
}
newDmh.copy(syncPeer = syncPeerOpt)
}
}
}
} else {
//what if we are synced exactly by the 2000th header
state match {
case headerState @ ValidatingHeaders(inSyncWith, _, _) =>
val newHeaderState =
headerState.copy(inSyncWith = inSyncWith + peer)
val newDmh2 = newDmh.copy(state = newHeaderState)
if (newHeaderState.validated) {
peerManager
.fetchCompactFilterHeaders(newDmh2)
.map(_.copy(state = PostHeaderSync))
} else {
//do nothing, we are still waiting for some peers to send headers
Future.successful(newDmh2)
}
case HeaderSync | PostHeaderSync =>
Future.successful(newDmh)
}
}
}
getHeadersF
}
}
sealed trait StreamDataMessageWrapper