Dont initiate disconnect logic from bitcoind, its flakey for some reason (#5343)

* Dont initiate disconnect logic from bitcoind, its falky for some reason

* Fix bug where we weren't switching sync to another peer on disconnect for the case where we were waiting for disconnection

* Empty commit to run CI

* Empty commit to run CI
This commit is contained in:
Chris Stewart 2024-01-05 11:52:39 -06:00 committed by GitHub
parent d6c1491ba8
commit 3c5bace825
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 33 deletions

View File

@ -309,8 +309,8 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
val numBlocks = 5 val numBlocks = 5
val genBlocksF = { val genBlocksF = {
for { for {
nodeUri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoind1) peer1 <- NodeTestUtil.getBitcoindPeer(bitcoind1)
_ <- bitcoind1.disconnectNode(nodeUri) _ <- node.peerManager.disconnectPeer(peer1)
_ <- AsyncUtil.retryUntilSatisfiedF( _ <- AsyncUtil.retryUntilSatisfiedF(
() => node.getConnectionCount.map(_ == 1), () => node.getConnectionCount.map(_ == 1),
1.second) 1.second)
@ -391,8 +391,8 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
for { for {
_ <- NodeTestUtil.awaitSyncAndIBD(node, bitcoind0) _ <- NodeTestUtil.awaitSyncAndIBD(node, bitcoind0)
//disconnect bitcoind1 as we don't need it //disconnect bitcoind1 as we don't need it
nodeUri1 <- NodeTestUtil.getNodeURIFromBitcoind(bitcoind1) peer1 <- NodeTestUtil.getBitcoindPeer(bitcoind1)
_ <- bitcoind1.disconnectNode(nodeUri1) _ <- node.peerManager.disconnectPeer(peer1)
bestBlockHash0 <- bitcoind0.getBestBlockHash() bestBlockHash0 <- bitcoind0.getBestBlockHash()
//invalidate blockhash to force a reorg when next block is generated //invalidate blockhash to force a reorg when next block is generated
_ <- bitcoind0.invalidateBlock(bestBlockHash0) _ <- bitcoind0.invalidateBlock(bestBlockHash0)

View File

@ -63,20 +63,19 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
nodeConnectedWithBitcoinds => nodeConnectedWithBitcoinds =>
val node = nodeConnectedWithBitcoinds.node val node = nodeConnectedWithBitcoinds.node
val bitcoinds = nodeConnectedWithBitcoinds.bitcoinds val bitcoinds = nodeConnectedWithBitcoinds.bitcoinds
def peers = node.peerManager.peers
for { for {
bitcoindPeers <- bitcoinPeersF bitcoindPeers <- bitcoinPeersF
_ <- node.start() _ <- node.start()
_ <- AsyncUtil.retryUntilSatisfied(peers.size == 2, _ <- AsyncUtil.retryUntilSatisfiedF(
maxTries = 30, () => node.getConnectionCount.map(_ == 2),
interval = 1.second) maxTries = 30,
interval = 1.second)
//sync from first bitcoind //sync from first bitcoind
peer0 = bitcoindPeers(0) peer0 = bitcoindPeers(0)
nodeUri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoinds(0)) _ <- node.peerManager.disconnectPeer(peer0)
_ <- bitcoinds(0).disconnectNode(nodeUri)
_ = logger.debug( _ = logger.debug(
s"Disconnected $nodeUri from bitcoind bitcoind(0).p2pPort=${peer0.socket.getPort} bitcoind(1).p2pPort=${bitcoinds( s"Disconnected $peer0 from node bitcoind(0).p2pPort=${peer0.socket.getPort} bitcoind(1).p2pPort=${bitcoinds(
1).instance.p2pPort}") 1).instance.p2pPort}")
//old peer we were syncing with that just disconnected us //old peer we were syncing with that just disconnected us
_ <- NodeTestUtil.awaitAllSync(node, bitcoinds(1)) _ <- NodeTestUtil.awaitAllSync(node, bitcoinds(1))
@ -167,8 +166,8 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
bitcoind1 = bitcoinds(1) bitcoind1 = bitcoinds(1)
_ <- NodeTestUtil.awaitAllSync(node, bitcoind1) _ <- NodeTestUtil.awaitAllSync(node, bitcoind1)
//disconnect bitcoind(0) as its not needed for this test //disconnect bitcoind(0) as its not needed for this test
node0Uri <- NodeTestUtil.getNodeURIFromBitcoind(bitcoind0) peer0 <- NodeTestUtil.getBitcoindPeer(bitcoind0)
_ <- bitcoinds(0).disconnectNode(node0Uri) _ <- node.peerManager.disconnectPeer(peer0)
_ <- AsyncUtil.retryUntilSatisfied(peerManager.peers.size == 1) _ <- AsyncUtil.retryUntilSatisfied(peerManager.peers.size == 1)
_ <- NodeTestUtil.awaitAllSync(node, bitcoind1) _ <- NodeTestUtil.awaitAllSync(node, bitcoind1)
_ <- sendInvalidHeaders(peer) _ <- sendInvalidHeaders(peer)

View File

@ -366,26 +366,14 @@ case class PeerManager(
val isShuttingDown = state.isInstanceOf[NodeShuttingDown] val isShuttingDown = state.isInstanceOf[NodeShuttingDown]
val shouldReconnect = val shouldReconnect =
(forceReconnect || connectedPeerCount == 0) && isStarted.get && !isShuttingDown (forceReconnect || connectedPeerCount == 0) && isStarted.get && !isShuttingDown
if (peers.exists(_ != peer)) { if (state.peers.exists(_ != peer)) {
val randomPeerOpt = state match {
state.randomPeer(excludePeers = Set(peer), case s: SyncNodeState => switchSyncToRandomPeer(s, Some(peer))
ServiceIdentifier.NODE_COMPACT_FILTERS) case x @ (_: DoneSyncing | _: NodeShuttingDown |
randomPeerOpt match { _: MisbehavingPeer | _: RemovePeers) =>
case Some(peer) => Future.successful(x)
state match {
case syncState: SyncNodeState =>
switchSyncToPeer(oldSyncState = syncState, newPeer = peer)
case d: DoneSyncing =>
//defensively try to sync with the new peer
syncHelper(peer).map(_ => d)
case x @ (_: MisbehavingPeer | _: RemovePeers |
_: NodeShuttingDown) =>
Future.successful(x)
}
case None =>
//if we have no new peers should we just switch to DoneSyncing?
Future.successful(state)
} }
} else if (syncPeerOpt.isDefined) { } else if (syncPeerOpt.isDefined) {
if (shouldReconnect) { if (shouldReconnect) {
finder.reconnect(peer).map(_ => state) finder.reconnect(peer).map(_ => state)
@ -404,7 +392,14 @@ case class PeerManager(
} else if (state.waitingForDisconnection.contains(peer)) { } else if (state.waitingForDisconnection.contains(peer)) {
//a peer we wanted to disconnect has remove has stopped the client actor, finally mark this as deleted //a peer we wanted to disconnect has remove has stopped the client actor, finally mark this as deleted
val removed = state.waitingForDisconnection.removedAll(Set(peer)) val removed = state.waitingForDisconnection.removedAll(Set(peer))
Future.successful(state.replaceWaitingForDisconnection(removed)) val newState = state.replaceWaitingForDisconnection(removed)
newState match {
case s: SyncNodeState =>
switchSyncToRandomPeer(s, Some(peer))
case x @ (_: DoneSyncing | _: NodeShuttingDown | _: MisbehavingPeer |
_: RemovePeers) =>
Future.successful(x)
}
} else { } else {
logger.warn(s"onP2PClientStopped called for unknown $peer") logger.warn(s"onP2PClientStopped called for unknown $peer")
Future.successful(state) Future.successful(state)
@ -749,6 +744,21 @@ case class PeerManager(
} }
} }
private def switchSyncToRandomPeer(
state: SyncNodeState,
excludePeerOpt: Option[Peer]): Future[NodeState] = {
val randomPeerOpt =
state.randomPeer(excludePeers = excludePeerOpt.toSet,
ServiceIdentifier.NODE_COMPACT_FILTERS)
randomPeerOpt match {
case Some(peer) =>
switchSyncToPeer(oldSyncState = state, newPeer = peer)
case None =>
//if we have no new peers should we just switch to DoneSyncing?
Future.successful(state)
}
}
private def switchSyncToPeer( private def switchSyncToPeer(
oldSyncState: SyncNodeState, oldSyncState: SyncNodeState,
newPeer: Peer): Future[NodeState] = { newPeer: Peer): Future[NodeState] = {