From 4f0c3da303d2902d9bb5f50deb15949e853126c1 Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Wed, 15 Jan 2025 15:41:40 -0600 Subject: [PATCH] node: Fix bug where we were dropping cached outbound messages on the floor (#5853) * node: Fix bug where we were dropping cached outbound messages on the floor * cleanup logs * cleanups --- app/server/src/main/resources/logback.xml | 2 +- .../org/bitcoins/node/PeerManagerTest.scala | 1 - .../scala/org/bitcoins/node/NodeState.scala | 14 +- .../scala/org/bitcoins/node/PeerFinder.scala | 2 +- .../scala/org/bitcoins/node/PeerManager.scala | 158 ++++++++++-------- 5 files changed, 100 insertions(+), 77 deletions(-) diff --git a/app/server/src/main/resources/logback.xml b/app/server/src/main/resources/logback.xml index 49d2eafb99..3a19170aee 100644 --- a/app/server/src/main/resources/logback.xml +++ b/app/server/src/main/resources/logback.xml @@ -35,7 +35,7 @@ - + diff --git a/node-test/src/test/scala/org/bitcoins/node/PeerManagerTest.scala b/node-test/src/test/scala/org/bitcoins/node/PeerManagerTest.scala index cef846eebc..fecc6a96d9 100644 --- a/node-test/src/test/scala/org/bitcoins/node/PeerManagerTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/PeerManagerTest.scala @@ -72,7 +72,6 @@ class PeerManagerTest extends NodeTestWithCachedBitcoindNewest { for { _ <- node.start() peer <- peerF - peerManager = node.peerManager _ <- NodeTestUtil.awaitSyncAndIBD(node = node, bitcoind = bitcoind) // disconnect diff --git a/node/src/main/scala/org/bitcoins/node/NodeState.scala b/node/src/main/scala/org/bitcoins/node/NodeState.scala index f13353943f..665a50bf0c 100644 --- a/node/src/main/scala/org/bitcoins/node/NodeState.scala +++ b/node/src/main/scala/org/bitcoins/node/NodeState.scala @@ -86,6 +86,8 @@ sealed trait NodeRunningState extends NodeState { case s: NodeState.NodeShuttingDown => s.copy(peerWithServicesDataMap = peerWithServicesDataMap) case n: NodeState.NoPeers => + require(n.cachedOutboundMessages.isEmpty, + s"Have to send outbound messages") DoneSyncing(peerWithServicesDataMap, n.waitingForDisconnection, n.peerFinder) @@ -270,6 +272,9 @@ sealed abstract class SyncNodeState extends NodeRunningState { sentQuery.isBefore(timeout) } + override def addPeer(peer: Peer): SyncNodeState = + super.addPeer(peer).asInstanceOf[SyncNodeState] + override def toString: String = { s"${getClass.getSimpleName}(syncPeer=$syncPeer,peers=${peers},waitingForDisconnection=${waitingForDisconnection})" } @@ -392,6 +397,9 @@ object NodeState { sentQuery = Instant.now() ) } + + override def addPeer(peer: Peer): DoneSyncing = + super.addPeer(peer).asInstanceOf[DoneSyncing] } /** means our node is in the process of shutting down */ @@ -423,7 +431,11 @@ object NodeState { DoneSyncing(peerWithServicesDataMap = map, waitingForDisconnection = waitingForDisconnection, peerFinder = peerFinder) + + } + + override def toString: String = { + s"NoPeers(waitingForDisconnection=$waitingForDisconnection,cachedMessages=${cachedOutboundMessages.length})" } } - } diff --git a/node/src/main/scala/org/bitcoins/node/PeerFinder.scala b/node/src/main/scala/org/bitcoins/node/PeerFinder.scala index 49e36dc72e..91d18f424d 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerFinder.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerFinder.scala @@ -378,7 +378,7 @@ case class PeerFinder( val peersF = { for { peers <- peersToTryF - _ = logger.debug(s"Trying next set of peers $peers") + _ = logger.debug(s"Trying next set of peers ${peers.map(_.peer)}") _ <- { Future.traverse(peers) { p => // check if we already have an active connection diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index 9de1cf273b..536550b558 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -140,16 +140,6 @@ case class PeerManager( .upsertPeer(addrBytes, peer.port, networkByte, serviceIdentifier) } - private def replacePeer(replacePeer: Peer, withPeer: Peer): Future[Unit] = { - logger.debug(s"Replacing $replacePeer with $withPeer") - for { - _ <- disconnectPeer(replacePeer) - _ <- connectPeer(withPeer) - } yield { - () - } - } - def disconnectPeer(peer: Peer): Future[Unit] = { logger.debug(s"Disconnecting persistent peer=$peer") queue.offer(InitializeDisconnect(peer)).map(_ => ()) @@ -244,32 +234,9 @@ case class PeerManager( curPeerDataOpt.isDefined, s"Could not find peer=$peer in PeerFinder!" ) - val peerData = curPeerDataOpt.get - val hasCf = peerData.serviceIdentifier.nodeCompactFilters - val notCfPeers = state.peerDataMap - .filter(p => !p._2.serviceIdentifier.nodeCompactFilters) - .keys - val availableFilterSlot = hasCf && notCfPeers.nonEmpty - val hasConnectionSlot = - state.connectedPeerCount < nodeAppConfig.maxConnectedPeers - val stateAndOfferF: (NodeRunningState, Future[Unit]) = { - if (hasConnectionSlot) { - // we want to promote this peer, so pop from cache - val newState = state.addPeer(peer) - val persistentPeerData = - newState.peerDataMap.filter(_._1 == peer).head._2 - _peerDataMap.put(peer, persistentPeerData) - (newState, connectPeer(peer)) - } else if (availableFilterSlot) { - val newState = state.addPeer(peer) - val persistentPeerData = - newState.peerDataMap.filter(_._1 == peer).head._2 - _peerDataMap.put(peer, persistentPeerData) - (newState, replacePeer(replacePeer = notCfPeers.head, withPeer = peer)) - } else { - (state, Future.unit) - } - } + + val stateAndOfferF: (NodeRunningState, Future[Unit]) = + (state, connectPeer(peer)) stateAndOfferF._2.failed.foreach(err => logger.error(s"Failed managePeerAfterInitialization() offer to queue", @@ -356,7 +323,7 @@ case class PeerManager( val updateLastSeenF = PeerDAO().updateLastSeenTime(peer) val stateF: Future[NodeRunningState] = { require( - !finder.hasPeer(peer) || !state.getPeerData(peer).isDefined, + !finder.hasPeer(peer) || state.getPeerData(peer).isEmpty, s"$peer cannot be both a test and a persistent peer" ) @@ -601,41 +568,7 @@ case class PeerManager( val connectF = runningState.peerFinder.connect(c.peer) connectF.map(_ => runningState) } else { - val hasCf = runningState - .getPeerServices(peer) - .exists(_.nodeCompactFilters) - logger.info( - s"Connected to peer $peer with compact filter support=$hasCf. Connected peer count ${runningState.peerDataMap.size} state=$state" - ) - state match { - case s: SyncNodeState => - if (s.isQueryTimedOut(nodeAppConfig.queryWaitTime)) { - // we don't want to re-request from our peer - // unless the query is timed out. This can lead to - // duplicate header requests. - // see: https://github.com/bitcoin-s/bitcoin-s/issues/5665 - syncHelper(s).map(_.getOrElse(s.toDoneSyncing)) - } else { - Future.successful(s) - } - case d: DoneSyncing => - val x = d.toHeaderSync(c.peer) - syncHelper(x).map(_.getOrElse(d)) - case n: NoPeers => - val peerData = peerDataMap(c.peer) - // send cached messages - logger.debug( - s"Sending ${n.cachedOutboundMessages.length} cached messages") - val sendMsgsF = Future.traverse(n.cachedOutboundMessages)(m => - peerData.peerMessageSender.sendMsg(m.payload)) - val peerWithSvcs = peerData.peerWithServicesOpt.get - val map = Vector((peerWithSvcs, peerData)).toMap - val d = n.toDoneSyncing(map) - sendMsgsF.map(_ => d) - case x @ (_: MisbehavingPeer | _: RemovePeers | - _: NodeShuttingDown) => - Future.successful(x) - } + handleConnectPeer(c = c, runningState = runningState) } } case (state, i: InitializeDisconnect) => @@ -895,7 +828,7 @@ case class PeerManager( case (state, NodeStreamMessage.PeerHealthCheck) => state match { case s: NodeShuttingDown => - logger.trace(s"Ignorinng peer health check as we are shutting down") + logger.trace(s"Ignoring peer health check as we are shutting down") Future.successful(s) case r: NodeRunningState => PeerManager.handleHealthCheck(r) @@ -908,6 +841,7 @@ case class PeerManager( state: NodeRunningState, stp: SendToPeer ): Future[NodeRunningState] = { + logger.debug(s"sendToPeerHelper() stp=$stp state=$state") val nodeStateF: Future[NodeRunningState] = stp.peerOpt match { case Some(p) => state @@ -1222,6 +1156,84 @@ case class PeerManager( err)) () } + + private def handleConnectPeer( + c: ConnectPeer, + runningState: NodeRunningState): Future[NodeRunningState] = { + val peer = c.peer +// require( +// runningState.isDisconnected(peer), +// s"Cannot call handleConnectPeer() with a peer arleady connected! peer=$peer") + val hasCf = runningState.peerFinder + .getPeerData(peer) + .exists(_.peerWithServicesOpt.exists(_.services.nodeCompactFilters)) + val notCfPeers = runningState.peerDataMap + .filter(p => !p._2.serviceIdentifier.nodeCompactFilters) + .keys + val availableFilterSlot = hasCf && notCfPeers.nonEmpty + val hasConnectionSlot = + runningState.connectedPeerCount < nodeAppConfig.maxConnectedPeers + val newStateF: Future[NodeRunningState] = { + if (hasConnectionSlot || availableFilterSlot) { + val addPeerF: Future[NodeRunningState] = runningState match { + case s: SyncNodeState => + val add = s.addPeer(peer) + if (add.isQueryTimedOut(nodeAppConfig.queryWaitTime)) { + // we don't want to re-request from our peer + // unless the query is timed out. This can lead to + // duplicate header requests. + // see: https://github.com/bitcoin-s/bitcoin-s/issues/5665 + syncHelper(add).map(_.getOrElse(s.toDoneSyncing)) + } else { + Future.successful(add) + } + case d: DoneSyncing => + val dAdd = d.addPeer(peer) + val h = dAdd.toHeaderSync(peer) + syncHelper(h).map(_.getOrElse(dAdd)) + case n: NoPeers => + // send cached messages + val peerData = n.peerFinder.popFromCache(peer).get match { + case p: PersistentPeerData => p + case a: AttemptToConnectPeerData => a.toPersistentPeerData + } + val peerWithSvcs = peerData.peerWithServicesOpt.get + val map = Vector((peerWithSvcs, peerData)).toMap + val d = DoneSyncing(map, n.waitingForDisconnection, n.peerFinder) + logger.debug( + s"Sending ${n.cachedOutboundMessages.length} cached message to peer=$peer") + val sendMsgsF = Future.traverse(n.cachedOutboundMessages)(m => + peerData.peerMessageSender.sendMsg(m.payload)) + val h = d.toHeaderSync(peer) + sendMsgsF + .flatMap(_ => syncHelper(h).map(_.getOrElse(d))) + case x @ (_: MisbehavingPeer | _: RemovePeers | + _: NodeShuttingDown) => + Future.successful(x) + } + + addPeerF.flatMap { addPeer => + if (availableFilterSlot && notCfPeers.nonEmpty) { + disconnectPeer(notCfPeers.head).map(_ => addPeer) + } else { + Future.successful(addPeer) + } + } + } else { + Future.successful(runningState) + } + } + + newStateF.map { newState => + newState.peerDataMap.get(peer).foreach { persistentPeerData => + _peerDataMap.put(peer, persistentPeerData) + } + logger.info( + s"Connected to peer $peer with compact filter support=$hasCf. Connected peer count ${runningState.peerDataMap.size} state=$newState" + ) + newState + } + } } case class ResponseTimeout(payload: NetworkPayload)