2024 02 15 Fix duplicate sync bug when we have a misbehaving peer (#5399)

* Fix bug where we were attempting to sync twice when we had a MisBehavingPeer, also simplify some logic inside of onDisconnect()

* Pull over more small changes from #5390
This commit is contained in:
Chris Stewart 2024-02-15 14:56:12 -06:00 committed by GitHub
parent fe33c2919c
commit bc94a8b01f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 33 additions and 33 deletions

View File

@ -2,6 +2,7 @@ package org.bitcoins.node
import org.bitcoins.core.api.node.{Peer, PeerWithServices}
import org.bitcoins.core.p2p.{CompactFilterMessage, ServiceIdentifier}
import org.bitcoins.node.NodeState.DoneSyncing
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.networking.peer.{PeerConnection, PeerMessageSender}
@ -105,6 +106,16 @@ sealed trait NodeRunningState extends NodeState {
getPeerMsgSender(p)
}
}
def isConnected(peer: Peer): Boolean = {
peerDataMap.filter(_._1.peer == peer).nonEmpty || peerFinder.hasPeer(peer)
}
def isDisconnected(peer: Peer): Boolean = !isConnected(peer)
def toDoneSyncing: DoneSyncing = {
DoneSyncing(peerDataMap, waitingForDisconnection, peerFinder)
}
}
/** State to indicate that we are syncing the blockchain */

View File

@ -95,4 +95,11 @@ case class AttemptToConnectPeerData(
override val system: ActorSystem,
override val nodeAppConfig: NodeAppConfig,
override val chainAppConfig: ChainAppConfig)
extends PeerData
extends PeerData {
def toPersistentPeerData: PersistentPeerData = {
val p = PersistentPeerData(peer, peerMessageSender)
p.setServiceIdentifier(serviceIdentifier = serviceIdentifier)
p
}
}

View File

@ -192,18 +192,18 @@ case class PeerManager(
stopF
}
def isConnected(peer: Peer): Future[Boolean] = {
override def isConnected(peer: Peer): Future[Boolean] = {
peerDataMap.get(peer) match {
case None => Future.successful(false)
case Some(p) => p.peerConnection.isConnected()
}
}
def isDisconnected(peer: Peer): Future[Boolean] = {
override def isDisconnected(peer: Peer): Future[Boolean] = {
isConnected(peer).map(b => !b)
}
def isInitialized(peer: Peer): Future[Boolean] = {
override def isInitialized(peer: Peer): Future[Boolean] = {
Future.successful(peerDataMap.exists(_._1 == peer))
}
@ -340,41 +340,23 @@ case class PeerManager(
s"$peer cannot be both a test and a persistent peer")
if (finder.hasPeer(peer)) {
if (peers.isEmpty) {
finder.reconnect(peer).map(_ => state)
} else {
finder.removePeer(peer).map(_ => state)
}
finder.removePeer(peer)
Future.successful(state)
} else if (peerDataMap.contains(peer)) {
_peerDataMap.remove(peer)
val syncPeerOpt = state match {
case s: SyncNodeState =>
Some(s.syncPeer)
case _: DoneSyncing | _: RemovePeers | _: NodeShuttingDown |
_: MisbehavingPeer =>
None
}
val isShuttingDown = state.isInstanceOf[NodeShuttingDown]
if (state.peers.exists(_ != peer)) {
state match {
case s: SyncNodeState => switchSyncToRandomPeer(s, Some(peer))
case d: DoneSyncing =>
//defensively try to sync with the new peer
val hs = d.toHeaderSync(peer)
switchSyncToRandomPeer(hs, Some(peer))
val hs = d.toHeaderSync
syncHelper(hs).map(_ => hs)
case x @ (_: DoneSyncing | _: NodeShuttingDown |
_: MisbehavingPeer | _: RemovePeers) =>
Future.successful(x)
}
} else if (syncPeerOpt.isDefined) {
if (forceReconnect && !isShuttingDown) {
finder.reconnect(peer).map(_ => state)
} else {
logger.warn(
s"No new peers to sync from, cannot start new sync. Terminated sync with peer=$peer current syncPeer=$syncPeerOpt state=${state} peers=$peers")
Future.successful(state)
}
} else {
if (forceReconnect && !isShuttingDown) {
finder.reconnect(peer).map(_ => state)
@ -642,7 +624,7 @@ case class PeerManager(
peerManager = this,
state = runningState
)
val resultF = dmh
val resultF: Future[NodeState] = dmh
.handleDataPayload(payload, peerData)
.flatMap { newDmh =>
newDmh.state match {
@ -650,22 +632,21 @@ case class PeerManager(
//disconnect the misbehaving peer
for {
_ <- disconnectPeer(m.badPeer)
_ <- syncFromNewPeer(m)
} yield newDmh
} yield newDmh.state
case removePeers: RemovePeers =>
for {
_ <- Future.traverse(removePeers.peers)(
disconnectPeer)
} yield newDmh
} yield newDmh.state
case _: SyncNodeState | _: DoneSyncing |
_: NodeShuttingDown =>
Future.successful(newDmh)
Future.successful(newDmh.state)
}
}
resultF.map { r =>
logger.debug(
s"Done processing ${payload.commandName} in peer=${peer} state=${r.state}")
r.state
s"Done processing ${payload.commandName} in peer=${peer} state=${r}")
r
}
}
}
@ -1105,6 +1086,7 @@ case class PeerManager(
}
}
/** Attempts to start syncing from a new peer. Returns None if we have no new peers to sync with */
private def syncFromNewPeer(
state: NodeRunningState): Future[Option[NodeRunningState]] = {
val svcIdentifier = ServiceIdentifier.NODE_COMPACT_FILTERS