1
0
Fork 0
mirror of https://github.com/bitcoin-s/bitcoin-s.git synced 2025-03-26 21:42:48 +01:00

Rework various private methods in DataMessagehandler to return NodeState ()

* Rework various private methods in DataMessagehandler to return NodeState rather than DataMessageHandler

* Refactor compact filter header handling into private helper method

* Refactor handling of headersmsg in case where the headermsg came from a different peer than our current syncPeer

* Add DEBUG log
This commit is contained in:
Chris Stewart 2023-12-19 09:24:09 -06:00 committed by GitHub
parent ce154fc5fd
commit 54f303efb0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 168 additions and 165 deletions
node/src/main/scala/org/bitcoins/node

View file

@ -932,7 +932,8 @@ object PeerManager extends Logging {
chainApi: ChainApi,
state: SyncNodeState)(implicit
ec: ExecutionContext,
chainConfig: ChainAppConfig): Future[Option[NodeState]] = {
chainConfig: ChainAppConfig): Future[
Option[NodeState.FilterHeaderSync]] = {
for {
bestFilterHeaderOpt <-
chainApi
@ -1013,33 +1014,21 @@ object PeerManager extends Logging {
}
def fetchCompactFilterHeaders(
currentDmh: DataMessageHandler,
state: SyncNodeState, //can we tighten this type up?
chainApi: ChainApi,
peerMessageSenderApi: PeerMessageSenderApi)(implicit
ec: ExecutionContext,
nodeAppConfig: NodeAppConfig,
chainAppConfig: ChainAppConfig): Future[DataMessageHandler] = {
val syncNodeState = currentDmh.state match {
case s: SyncNodeState => s
case state @ (_: DoneSyncing | _: MisbehavingPeer | _: RemovePeers) =>
sys.error(
s"Cannot fetch compact filter headers when we are in state=$state")
}
chainAppConfig: ChainAppConfig): Future[
Option[NodeState.FilterHeaderSync]] = {
logger.info(
s"Now syncing filter headers from ${syncNodeState.syncPeer} in state=${currentDmh.state}")
s"Now syncing filter headers from ${state.syncPeer} in state=${state}")
for {
newSyncingStateOpt <- PeerManager.sendFirstGetCompactFilterHeadersCommand(
peerMessageSenderApi = peerMessageSenderApi,
chainApi = currentDmh.chainApi,
state = syncNodeState)
chainApi = chainApi,
state = state)
} yield {
newSyncingStateOpt match {
case Some(newSyncingState) =>
currentDmh.copy(state = newSyncingState)
case None =>
val state = DoneSyncing(currentDmh.state.peers,
currentDmh.state.waitingForDisconnection)
currentDmh.copy(state = state)
}
newSyncingStateOpt
}
}

View file

@ -140,7 +140,6 @@ case class DataMessageHandler(
logger.debug(
s"Got ${filterHeader.filterHashes.size} compact filter header hashes, state=$state")
val filterHeaderSync = state match {
//is validating headers right here? Re-review these state transitions
case s @ (_: HeaderSync | _: DoneSyncing) =>
FilterHeaderSync(peer, s.peers, s.waitingForDisconnection)
case filterHeaderSync: FilterHeaderSync => filterHeaderSync
@ -148,47 +147,13 @@ case class DataMessageHandler(
sys.error(
s"Incorrect state for handling filter header messages, got=$x")
}
val filterHeaders = filterHeader.filterHeaders
val blockCountF = chainApi.getBlockCount()
for {
newChainApi <- chainApi.processFilterHeaders(
filterHeaders,
filterHeader.stopHash.flip)
filterHeaderCount <- newChainApi.getFilterHeaderCount()
blockCount <- blockCountF
newState <-
if (blockCount != filterHeaderCount) {
logger.debug(
s"Received maximum amount of filter headers in one header message. This means we are not synced, requesting more")
sendNextGetCompactFilterHeadersCommand(
peerMessageSenderApi = peerMessageSenderApi,
syncPeer = peer,
prevStopHash = filterHeader.stopHash.flip).map(_ =>
filterHeaderSync)
} else {
for {
startHeight <- PeerManager.getCompactFilterStartHeight(
chainApi,
walletCreationTimeOpt)
filterSyncStateOpt <- sendFirstGetCompactFilterCommand(
peerMessageSenderApi = peerMessageSenderApi,
startHeight = startHeight,
syncNodeState = filterHeaderSync)
} yield {
filterSyncStateOpt match {
case Some(filterSyncState) => filterSyncState
case None =>
val d =
DoneSyncing(filterHeaderSync.peers,
filterHeaderSync.waitingForDisconnection)
d
}
}
}
newChainApi <- newChainApi.setSyncing(newState.isSyncing)
} yield {
this.copy(chainApi = newChainApi, state = newState)
}
handleFilterHeadersMessage(filterHeaderSync,
filterHeader,
chainApi,
peer)
.map(s => copy(state = s))
case filter: CompactFilterMessage =>
logger.debug(
s"Received ${filter.commandName}, filter.blockHash=${filter.blockHash.flip} state=$state")
@ -305,69 +270,41 @@ case class DataMessageHandler(
case HeadersMessage(count, headers) =>
logger.info(
s"Received headers message with ${count.toInt} headers from peer=$peer state=$state")
val headerSyncState = state match {
val newStateOpt: Option[NodeState] = state match {
case d: DoneSyncing =>
if (count.toInt != 0) {
val s = if (count.toInt != 0) {
//why do we sometimes get empty HeadersMessage?
HeaderSync(peer, d.peers, d.waitingForDisconnection)
} else DoneSyncing(d.peers, d.waitingForDisconnection)
Some(s)
case headerSync: HeaderSync =>
if (headerSync.syncPeer == peer) {
headerSync
Some(headerSync)
} else {
val fhs = FilterHeaderSync(syncPeer = headerSync.syncPeer,
peers = headerSync.peers,
waitingForDisconnection =
headerSync.waitingForDisconnection)
fhs
//means we received a headers message from a peer we aren't syncing with, so ignore for now
logger.debug(
s"Ignoring headers from peer=$peer while we are syncing with syncPeer=${headerSync.syncPeer}")
None
}
case x @ (_: FilterHeaderSync | _: FilterSync | _: MisbehavingPeer |
_: RemovePeers) =>
case x @ (_: FilterHeaderSync | _: FilterSync) =>
logger.warn(
s"Ignoring headers msg with size=${headers.size} while in state=$x")
Some(x)
case x @ (_: MisbehavingPeer | _: RemovePeers) =>
sys.error(s"Invalid state to receive headers in, got=$x")
}
val chainApiHeaderProcessF: Future[DataMessageHandler] = for {
newChainApi <- chainApi.setSyncing(count.toInt > 0)
processed <- newChainApi.processHeaders(headers)
} yield {
copy(state = headerSyncState, chainApi = processed)
}
val getHeadersF: Future[DataMessageHandler] = {
for {
newDmh <- chainApiHeaderProcessF
dmh <- getHeaders(newDmh = newDmh, headers = headers, peer = peer)
} yield dmh
}
val recoveredDmhF = getHeadersF.recoverWith {
case _: DuplicateHeaders =>
logger.warn(
s"Received duplicate headers from ${peer} in state=$state")
newStateOpt match {
case Some(h: HeaderSync) =>
handleHeadersMessage(h, headers, peerData)
.map(s => copy(state = s))
case Some(
x @ (_: FilterHeaderSync | _: FilterSync | _: DoneSyncing |
_: MisbehavingPeer | _: RemovePeers)) =>
Future.successful(copy(state = x))
case None =>
Future.successful(this)
case _: InvalidBlockHeader =>
logger.warn(
s"Invalid headers of count $count sent from ${peer} in state=$state")
recoverInvalidHeader(peerData)
case e: Throwable => throw e
}
recoveredDmhF.failed.map { err =>
logger.error(
s"Error when processing headers message: ${err.getMessage}")
}
for {
newDmh <- recoveredDmhF
_ <- {
if (count.toInt == 0) {
Future.unit //don't execute callbacks if we receive 0 headers from peer
} else {
appConfig.callBacks.executeOnBlockHeadersReceivedCallbacks(
headers)
}
}
} yield {
newDmh
}
case msg: BlockMessage =>
val block = msg.block
@ -488,7 +425,7 @@ case class DataMessageHandler(
/** Recover the data message handler if we received an invalid block header from a peer */
private def recoverInvalidHeader(
peerData: PersistentPeerData): Future[DataMessageHandler] = {
peerData: PersistentPeerData): Future[NodeState] = {
val result = state match {
case state @ (_: HeaderSync | _: DoneSyncing) =>
val peer = peerData.peer
@ -501,7 +438,7 @@ case class DataMessageHandler(
peers = state.peers,
waitingForDisconnection =
state.waitingForDisconnection)
Future.successful(copy(state = m))
Future.successful(m)
} else {
for {
@ -519,10 +456,10 @@ case class DataMessageHandler(
DoneSyncing(state.peers, state.waitingForDisconnection)
queryF.map(_ => d)
}
} yield this.copy(state = newState)
} yield newState
}
case _: FilterHeaderSync | _: FilterSync =>
Future.successful(this)
Future.successful(state)
case m @ (_: MisbehavingPeer | _: RemovePeers) =>
val exn = new RuntimeException(
s"Cannot recover invalid headers, got=$m")
@ -705,41 +642,29 @@ case class DataMessageHandler(
}
private def getHeaders(
newDmh: DataMessageHandler,
state: HeaderSync,
headers: Vector[BlockHeader],
peer: Peer): Future[DataMessageHandler] = {
logger.debug(
s"getHeaders() newDmh.state=${newDmh.state} peer=$peer peers=$peer")
val state = newDmh.state
peer: Peer,
chainApi: ChainApi): Future[NodeState] = {
logger.debug(s"getHeaders() newDmh.state=${state} peer=$peer peers=$peer")
val count = headers.length
val getHeadersF: Future[DataMessageHandler] = {
val newApi = newDmh.chainApi
val getHeadersF: Future[NodeState] = {
if (headers.nonEmpty) {
val lastHeader = headers.last
val lastHash = lastHeader.hash
newApi.getBlockCount().map { count =>
chainApi.getBlockCount().map { count =>
logger.trace(
s"Processed headers, most recent has height=$count and hash=$lastHash.")
}
if (count == HeadersMessage.MaxHeadersCount) {
state match {
case _: HeaderSync =>
logger.debug(
s"Received maximum amount of headers in one header message. This means we are not synced, requesting more")
//ask for headers more from the same peer
peerMessageSenderApi
.sendGetHeadersMessage(lastHash.flip)
.map(_ => newDmh)
case x @ (_: FilterHeaderSync | _: FilterSync | _: DoneSyncing |
_: MisbehavingPeer | _: RemovePeers) =>
val exn = new RuntimeException(
s"Cannot be in state=$x while retrieving block headers")
Future.failed(exn)
}
logger.debug(
s"Received maximum amount of headers in one header message. This means we are not synced, requesting more")
//ask for headers more from the same peer
peerMessageSenderApi
.sendGetHeadersMessage(lastHash.flip)
.map(_ => state)
} else {
logger.debug(
@ -751,37 +676,126 @@ case class DataMessageHandler(
// if we are syncing we should do this, however, sometimes syncing isn't a good enough check,
// so we also check if our cached filter heights have been set as well, if they haven't then
// we probably need to sync filters
state match {
case h: HeaderSync =>
val syncPeer = h.syncPeer
// headers are synced now with the current sync peer, now move to validating it for all peers
require(syncPeer == peer, s"syncPeer=$syncPeer peer=$peer")
val syncPeer = state.syncPeer
// headers are synced now with the current sync peer, now move to validating it for all peers
require(syncPeer == peer, s"syncPeer=$syncPeer peer=$peer")
PeerManager.fetchCompactFilterHeaders(newDmh,
peerMessageSenderApi)
case x @ (_: FilterHeaderSync | _: FilterSync | _: DoneSyncing |
_: MisbehavingPeer | _: RemovePeers) =>
val exn = new RuntimeException(
s"Cannot be in state=$x while we are about to begin syncing compact filter headers")
Future.failed(exn)
val fhsOptF = PeerManager.fetchCompactFilterHeaders(
state = state,
chainApi = chainApi,
peerMessageSenderApi = peerMessageSenderApi)
fhsOptF.map {
case Some(s) => s
case None =>
//is this right? If we don't send cfheaders to our peers, are we done syncing?
DoneSyncing(state.peers, state.waitingForDisconnection)
}
}
} else {
//what if we are synced exactly by the 2000th header
state match {
case _: HeaderSync =>
Future.successful(newDmh)
case _: DoneSyncing =>
Future.successful(newDmh)
case x @ (_: FilterHeaderSync | _: FilterSync | _: MisbehavingPeer |
_: RemovePeers) =>
val exn = new RuntimeException(
s"Invalid state to complete block header sync in, got=$x")
Future.failed(exn)
}
Future.successful(state)
}
}
getHeadersF
}
private def handleHeadersMessage(
headerSyncState: HeaderSync,
headers: Vector[BlockHeader],
peerData: PersistentPeerData): Future[NodeState] = {
val peer = headerSyncState.syncPeer
val count = headers.size
val chainApiHeaderProcessF: Future[DataMessageHandler] = for {
newChainApi <- chainApi.setSyncing(count > 0)
processed <- newChainApi.processHeaders(headers)
} yield {
copy(state = headerSyncState, chainApi = processed)
}
val getHeadersF: Future[NodeState] = {
for {
newDmh <- chainApiHeaderProcessF
dmh <- getHeaders(state = headerSyncState,
headers = headers,
peer = peer,
newDmh.chainApi)
} yield dmh
}
val recoveredStateF: Future[NodeState] = getHeadersF.recoverWith {
case _: DuplicateHeaders =>
logger.warn(s"Received duplicate headers from ${peer} in state=$state")
Future.successful(headerSyncState)
case _: InvalidBlockHeader =>
logger.warn(
s"Invalid headers of count $count sent from ${peer} in state=$state")
recoverInvalidHeader(peerData)
case e: Throwable => throw e
}
recoveredStateF.failed.map { err =>
logger.error(s"Error when processing headers message: ${err.getMessage}")
}
for {
newState <- recoveredStateF
_ <- {
if (count == 0) {
Future.unit //don't execute callbacks if we receive 0 headers from peer
} else {
appConfig.callBacks.executeOnBlockHeadersReceivedCallbacks(headers)
}
}
} yield {
newState
}
}
private def handleFilterHeadersMessage(
filterHeaderSync: FilterHeaderSync,
filterHeader: CompactFilterHeadersMessage,
chainApi: ChainApi,
peer: Peer): Future[NodeState] = {
val filterHeaders = filterHeader.filterHeaders
val blockCountF = chainApi.getBlockCount()
for {
_ <- chainApi.processFilterHeaders(filterHeaders,
filterHeader.stopHash.flip)
filterHeaderCount <- chainApi.getFilterHeaderCount()
blockCount <- blockCountF
newState <-
if (blockCount != filterHeaderCount) {
logger.debug(
s"Received maximum amount of filter headers in one header message. This means we are not synced, requesting more")
sendNextGetCompactFilterHeadersCommand(
peerMessageSenderApi = peerMessageSenderApi,
syncPeer = peer,
prevStopHash = filterHeader.stopHash.flip).map(_ =>
filterHeaderSync)
} else {
for {
startHeight <- PeerManager.getCompactFilterStartHeight(
chainApi,
walletCreationTimeOpt)
filterSyncStateOpt <- sendFirstGetCompactFilterCommand(
peerMessageSenderApi = peerMessageSenderApi,
startHeight = startHeight,
syncNodeState = filterHeaderSync)
} yield {
filterSyncStateOpt match {
case Some(filterSyncState) => filterSyncState
case None =>
val d =
DoneSyncing(filterHeaderSync.peers,
filterHeaderSync.waitingForDisconnection)
d
}
}
}
_ <- chainApi.setSyncing(newState.isSyncing)
} yield {
newState
}
}
}