2024 03 01 Fix syncPeer exceptions (#5446)

* Add NodeRunningState.removePeer(), add helper method onDisconnectNodeStateUpdate() to consolidate disconnect logic

* Remove duplicate logic

* Add bitcoin-s.node.maxConnectedPeers test

* Clean up resources in test case

* Remove unecessary replacePeers()

* Add NodeState.addPeer()

* Add NodeRunningState.connectedPeerCount, separate handling of connection slots and filter slots
This commit is contained in:
Chris Stewart 2024-03-04 09:22:16 -06:00 committed by GitHub
parent 8b23b1f4f6
commit ae3a634703
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 219 additions and 121 deletions

View File

@ -1,5 +1,6 @@
package org.bitcoins.node
import com.typesafe.config.ConfigFactory
import org.apache.pekko.actor.Cancellable
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.models.{CompactFilterDAO, CompactFilterHeaderDAO}
@ -392,4 +393,39 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
bestBlockHashBE = Some(hashes1.head))
} yield succeed
}
it must "honor bitcoin-s.node.maxConnectedPeers" in {
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoinds =>
val max = 1
val nodeF = getCustomMaxConnectedPeers(initNode =
nodeConnectedWithBitcoind.node,
maxConnectedPeers = max)
for {
node <- nodeF
_ <- AsyncUtil.nonBlockingSleep(5.second)
connCount <- node.getConnectionCount
_ <- node.stop()
_ <- node.nodeConfig.stop()
} yield {
assert(connCount == max)
}
}
private def getCustomMaxConnectedPeers(
initNode: NeutrinoNode,
maxConnectedPeers: Int): Future[NeutrinoNode] = {
require(initNode.nodeConfig.maxConnectedPeers != maxConnectedPeers,
s"maxConnectedPeers must be different")
//make a custom config, set the inactivity timeout very low
//so we will disconnect our peer organically
val str =
s"""
|bitcoin-s.node.maxConnectedPeers = $maxConnectedPeers
|""".stripMargin
val config =
ConfigFactory.parseString(str)
NodeTestUtil.getStartedNodeCustomConfig(initNode, config)
}
}

View File

@ -98,25 +98,6 @@ class ReConnectionTest extends NodeTestWithCachedBitcoindNewest {
|""".stripMargin
val config =
ConfigFactory.parseString(str)
val stoppedConfigF = initNode.nodeConfig.stop()
val newNodeAppConfigF =
stoppedConfigF.map(_ => initNode.nodeConfig.withOverrides(config))
val nodeF = {
for {
newNodeAppConfig <- newNodeAppConfigF
_ <- newNodeAppConfig.start()
} yield {
NeutrinoNode(
walletCreationTimeOpt = initNode.walletCreationTimeOpt,
nodeConfig = newNodeAppConfig,
chainConfig = initNode.chainAppConfig,
actorSystem = initNode.system,
paramPeers = initNode.paramPeers
)
}
}
val startedF = nodeF.flatMap(_.start())
startedF
NodeTestUtil.getStartedNodeCustomConfig(initNode, config)
}
}

View File

@ -2,7 +2,12 @@ package org.bitcoins.node
import org.bitcoins.core.api.node.{Peer, PeerWithServices}
import org.bitcoins.core.p2p.{CompactFilterMessage, ServiceIdentifier}
import org.bitcoins.node.NodeState.DoneSyncing
import org.bitcoins.node.NodeState.{
DoneSyncing,
MisbehavingPeer,
NodeShuttingDown,
RemovePeers
}
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.networking.peer.{PeerConnection, PeerMessageSender}
@ -24,6 +29,8 @@ sealed trait NodeRunningState extends NodeState {
def peerFinder: PeerFinder
def connectedPeerCount: Int = peers.size
def getPeerConnection(peer: Peer): Option[PeerConnection] = {
peerDataMap.find(_._1.peer == peer).map(_._2.peerConnection) match {
case Some(peerConnection) => Some(peerConnection)
@ -59,6 +66,48 @@ sealed trait NodeRunningState extends NodeState {
}
}
def addPeer(peer: Peer): NodeRunningState = {
val peerDataOpt = peerFinder.popFromCache(peer)
peerDataOpt match {
case None =>
//do we just want to ignore the attempt if
//the peer does not exist??
this
case Some(peerData) =>
val persistentPeerData = peerData match {
case p: PersistentPeerData => p
case a: AttemptToConnectPeerData => a.toPersistentPeerData
}
val peerWithSvcs = persistentPeerData.peerWithServicesOpt.get
val newPdm =
peerDataMap.+((peerWithSvcs, persistentPeerData))
val newState = replacePeers(newPdm)
newState
}
}
/** Removes the peer from our [[peerDataMap]] */
def removePeer(peer: Peer): NodeRunningState = {
val filtered = peerDataMap.filterNot(_._1.peer == peer)
this match {
case sync: SyncNodeState =>
if (sync.syncPeer == peer) {
sync.replaceSyncPeer match {
case Some(newSyncNodeState) =>
newSyncNodeState.replacePeers(filtered)
case None =>
toDoneSyncing.replacePeers(filtered)
}
} else {
sync.replacePeers(filtered)
}
case x @ (_: DoneSyncing | _: MisbehavingPeer | _: RemovePeers |
_: NodeShuttingDown) =>
x.replacePeers(filtered)
}
}
def replaceWaitingForDisconnection(
newWaitingForDisconnection: Set[Peer]): NodeRunningState = {
this match {
@ -138,6 +187,16 @@ sealed abstract class SyncNodeState extends NodeRunningState {
case fs: NodeState.FilterSync => fs.copy(syncPeer = newSyncPeer)
}
}
/** Replaces the current sync peer with a new sync peer,
* returns None if there is not a new peer available
*/
def replaceSyncPeer: Option[SyncNodeState] = {
randomPeer(excludePeers = Set(syncPeer),
ServiceIdentifier.NODE_COMPACT_FILTERS).map { p =>
replaceSyncPeer(p)
}
}
}
object NodeState {

View File

@ -229,30 +229,26 @@ case class PeerManager(
s"Could not find peer=$peer in PeerFinder!")
val peerData = curPeerDataOpt.get
val hasCf = peerData.serviceIdentifier.nodeCompactFilters
val notCfPeers = peerDataMap
val notCfPeers = state.peerDataMap
.filter(p => !p._2.serviceIdentifier.nodeCompactFilters)
.keys
val availableFilterSlot = hasCf && notCfPeers.nonEmpty
val hasConnectionSlot = connectedPeerCount < nodeAppConfig.maxConnectedPeers
if (hasConnectionSlot || availableFilterSlot) {
val hasConnectionSlot =
state.connectedPeerCount < nodeAppConfig.maxConnectedPeers
if (hasConnectionSlot) {
//we want to promote this peer, so pop from cache
val _ = state.peerFinder.popFromCache(peer)
val persistentPeerData = peerData match {
case p: PersistentPeerData => p
case a: AttemptToConnectPeerData => a.toPersistentPeerData
}
val newState = state.addPeer(peer)
val persistentPeerData =
newState.peerDataMap.filter(_._1.peer == peer).head._2
_peerDataMap.put(peer, persistentPeerData)
val peerWithSvcs = persistentPeerData.peerWithServicesOpt.get
val newPdm =
state.peerDataMap.+((peerWithSvcs, persistentPeerData))
val newState = state.replacePeers(newPdm)
if (availableFilterSlot) {
replacePeer(replacePeer = notCfPeers.head, withPeer = peer)
.map(_ => newState)
} else {
connectPeer(peer).map(_ => newState)
}
connectPeer(peer).map(_ => newState)
} else if (availableFilterSlot) {
val newState = state.addPeer(peer)
val persistentPeerData =
newState.peerDataMap.filter(_._1.peer == peer).head._2
_peerDataMap.put(peer, persistentPeerData)
replacePeer(replacePeer = notCfPeers.head.peer, withPeer = peer)
.map(_ => newState)
} else {
Future.successful(state)
}
@ -307,9 +303,7 @@ case class PeerManager(
}
}
stateF.map { s =>
s.replacePeers(peerWithServicesDataMap)
}
stateF
}
/** @param peer the peer we were disconencted from
@ -335,89 +329,83 @@ case class PeerManager(
Future.successful(state)
} else if (peerDataMap.contains(peer)) {
_peerDataMap.remove(peer)
val rm = state.waitingForDisconnection.-(peer)
val rmWaitingForDisconnect = state.replaceWaitingForDisconnection(rm)
val isShuttingDown = state.isInstanceOf[NodeShuttingDown]
if (state.peers.exists(_ != peer)) {
rmWaitingForDisconnect match {
case s: SyncNodeState => switchSyncToRandomPeer(s, Some(peer))
case d: DoneSyncing =>
//defensively try to sync with the new peer
val hsOpt = d.toHeaderSync
hsOpt match {
case Some(hs) => syncHelper(hs).map(_ => hs)
case None =>
//no peers available to sync with, so return DoneSyncing
Future.successful(d)
}
case x @ (_: DoneSyncing | _: NodeShuttingDown |
_: MisbehavingPeer | _: RemovePeers) =>
Future.successful(x)
}
} else {
if (forceReconnect && !isShuttingDown) {
finder.reconnect(peer).map(_ => rmWaitingForDisconnect)
} else if (!isShuttingDown) {
logger.info(
s"No new peers to connect to, querying for new connections... state=${state} peers=$peers")
finder.queryForPeerConnections(Set(peer)) match {
case Some(_) => Future.successful(rmWaitingForDisconnect)
case None =>
logger.debug(
s"Could not query for more peer connections as previous job is still running")
Future.successful(rmWaitingForDisconnect)
}
} else {
//if shutting down, do nothing
Future.successful(state)
}
onDisconnectNodeStateUpdate(state = state,
disconnectedPeer = peer,
forceReconnect = forceReconnect).map {
updated =>
val rm = state.waitingForDisconnection.-(peer)
val rmWaitingForDisconnect =
updated.replaceWaitingForDisconnection(rm)
rmWaitingForDisconnect
}
} else if (state.waitingForDisconnection.contains(peer)) {
//a peer we wanted to disconnect has remove has stopped the client actor, finally mark this as deleted
val removed = state.waitingForDisconnection.-(peer)
val newState = state.replaceWaitingForDisconnection(removed)
newState match {
case s: SyncNodeState =>
switchSyncToRandomPeer(s, Some(peer))
case x @ (_: DoneSyncing | _: NodeShuttingDown | _: MisbehavingPeer |
_: RemovePeers) =>
Future.successful(x)
}
Future.successful(newState)
} else {
logger.warn(s"onP2PClientStopped called for unknown $peer")
Future.successful(state)
}
}
val replacedPeersStateF = stateF.map {
case s: SyncNodeState =>
if (s.syncPeer == peer) {
//the peer being disconnected is our sync peer
s.randomPeer(excludePeers = Set(peer),
ServiceIdentifier.NODE_COMPACT_FILTERS) match {
case Some(p) => s.replaceSyncPeer(p)
case None =>
//switch to state DoneSyncing since we have no peers to sync from
DoneSyncing(peerDataMap = peerWithServicesDataMap,
waitingForDisconnection =
state.waitingForDisconnection,
peerFinder = s.peerFinder)
}
} else {
s.replacePeers(peerWithServicesDataMap)
}
case runningState: NodeRunningState =>
runningState.replacePeers(peerWithServicesDataMap)
}
for {
state <- replacedPeersStateF
state <- stateF
_ <- updateLastSeenF
} yield state
}
private def onDisconnectNodeStateUpdate(
state: NodeRunningState,
disconnectedPeer: Peer,
forceReconnect: Boolean): Future[NodeRunningState] = {
val isShuttingDown = state.isInstanceOf[NodeShuttingDown]
val finder = state.peerFinder
if (state.peers.exists(_ != disconnectedPeer)) {
state match {
case s: SyncNodeState =>
switchSyncToRandomPeer(state = s,
excludePeerOpt = Some(disconnectedPeer))
case d: DoneSyncing =>
//defensively try to sync with the new peer
//this headerSync is not safe, need to exclude peer we are disconnencting
val hsOpt = d
.removePeer(disconnectedPeer)
.asInstanceOf[DoneSyncing]
.toHeaderSync
hsOpt match {
case Some(hs) => syncHelper(hs).map(_ => hs)
case None =>
//no peers available to sync with, so return DoneSyncing
Future.successful(d)
}
case x @ (_: DoneSyncing | _: NodeShuttingDown | _: MisbehavingPeer |
_: RemovePeers) =>
Future.successful(x.removePeer(disconnectedPeer))
}
} else {
//no new peers to try to sync from, transition to done syncing?
val done = state.removePeer(disconnectedPeer).toDoneSyncing
if (forceReconnect && !isShuttingDown) {
finder.reconnect(disconnectedPeer).map(_ => done)
} else if (!isShuttingDown) {
logger.info(
s"No new peers to connect to, querying for new connections... state=${state} peers=$peers")
finder.queryForPeerConnections(Set(disconnectedPeer)) match {
case Some(_) => Future.successful(done)
case None =>
logger.debug(
s"Could not query for more peer connections as previous job is still running")
Future.successful(done)
}
} else {
//if shutting down, do nothing
Future.successful(done)
}
}
}
private def onQueryTimeout(
payload: ExpectsResponse,
peer: Peer,
@ -582,9 +570,9 @@ case class PeerManager(
}
case (state, i: InitializeDisconnect) =>
state match {
case r: NodeRunningState =>
case running: NodeRunningState =>
val client: PeerData =
r.peerDataMap.find(_._1.peer == i.peer) match {
running.peerDataMap.find(_._1.peer == i.peer) match {
case Some((_, p)) => p
case None =>
sys.error(
@ -596,18 +584,24 @@ case class PeerManager(
//now send request to stop actor which will be completed some time in future
val _ = _peerDataMap.remove(i.peer)
val newWaiting = r.waitingForDisconnection.+(i.peer)
val newPdm = r.peerDataMap.filterNot(_._1.peer == i.peer)
val newState = r
.replaceWaitingForDisconnection(newWaiting)
.replacePeers(newPdm)
val newStateF =
onDisconnectNodeStateUpdate(state = running,
disconnectedPeer = i.peer,
forceReconnect = false).map {
updated =>
val newWaiting = updated.waitingForDisconnection.+(i.peer)
updated
.replaceWaitingForDisconnection(newWaiting)
}
val stopF: Future[Done] = client.stop().recoverWith {
case scala.util.control.NonFatal(err) =>
logger.error(s"Failed to stop peer=${client.peer}", err)
Future.successful(Done)
}
stopF.map { _ =>
newState
stopF.flatMap { _ =>
newStateF
}
}
@ -817,7 +811,7 @@ case class PeerManager(
private def switchSyncToRandomPeer(
state: SyncNodeState,
excludePeerOpt: Option[Peer]): Future[NodeState] = {
excludePeerOpt: Option[Peer]): Future[SyncNodeState] = {
val randomPeerOpt =
state.randomPeer(excludePeers = excludePeerOpt.toSet,
ServiceIdentifier.NODE_COMPACT_FILTERS)
@ -855,7 +849,7 @@ case class PeerManager(
private def switchSyncToPeer(
oldSyncState: SyncNodeState,
newPeer: Peer): Future[NodeState] = {
newPeer: Peer): Future[SyncNodeState] = {
logger.debug(
s"switchSyncToPeer() oldSyncState=$oldSyncState newPeer=$newPeer")
val newState = oldSyncState.replaceSyncPeer(newPeer)

View File

@ -1,5 +1,6 @@
package org.bitcoins.testkit.node
import com.typesafe.config.Config
import org.apache.pekko.actor.ActorSystem
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.core.api.node.Peer
@ -278,6 +279,33 @@ abstract class NodeTestUtil extends P2PLogger {
result
}
}
def getStartedNodeCustomConfig(initNode: NeutrinoNode, config: Config)(
implicit ec: ExecutionContext): Future[NeutrinoNode] = {
val stoppedConfigF = for {
_ <- initNode.stop()
_ <- initNode.nodeConfig.stop()
} yield ()
val newNodeAppConfigF =
stoppedConfigF.map(_ => initNode.nodeConfig.withOverrides(config))
val nodeF = {
for {
newNodeAppConfig <- newNodeAppConfigF
_ <- newNodeAppConfig.start()
} yield {
NeutrinoNode(
walletCreationTimeOpt = initNode.walletCreationTimeOpt,
nodeConfig = newNodeAppConfig,
chainConfig = initNode.chainAppConfig,
actorSystem = initNode.system,
paramPeers = initNode.paramPeers
)
}
}
val startedF = nodeF.flatMap(_.start())
startedF
}
}
object NodeTestUtil extends NodeTestUtil