node: Fix bug where we were dropping cached outbound messages on the floor (#5853)

* node: Fix bug where we were dropping cached outbound messages on the floor

* cleanup logs

* cleanups
This commit is contained in:
Chris Stewart 2025-01-15 15:41:40 -06:00 committed by GitHub
parent 07270ba8ca
commit 4f0c3da303
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 100 additions and 77 deletions

View file

@ -35,7 +35,7 @@
<appender-ref ref="ASYNC-FILE"/>
</root>
<logger name="org.bitcoins.node" level="INFO"/>
<logger name="org.bitcoins.node" level="DEBUG"/>
<logger name="org.bitcoins.chain" level="INFO"/>

View file

@ -72,7 +72,6 @@ class PeerManagerTest extends NodeTestWithCachedBitcoindNewest {
for {
_ <- node.start()
peer <- peerF
peerManager = node.peerManager
_ <- NodeTestUtil.awaitSyncAndIBD(node = node, bitcoind = bitcoind)
// disconnect

View file

@ -86,6 +86,8 @@ sealed trait NodeRunningState extends NodeState {
case s: NodeState.NodeShuttingDown =>
s.copy(peerWithServicesDataMap = peerWithServicesDataMap)
case n: NodeState.NoPeers =>
require(n.cachedOutboundMessages.isEmpty,
s"Have to send outbound messages")
DoneSyncing(peerWithServicesDataMap,
n.waitingForDisconnection,
n.peerFinder)
@ -270,6 +272,9 @@ sealed abstract class SyncNodeState extends NodeRunningState {
sentQuery.isBefore(timeout)
}
override def addPeer(peer: Peer): SyncNodeState =
super.addPeer(peer).asInstanceOf[SyncNodeState]
override def toString: String = {
s"${getClass.getSimpleName}(syncPeer=$syncPeer,peers=${peers},waitingForDisconnection=${waitingForDisconnection})"
}
@ -392,6 +397,9 @@ object NodeState {
sentQuery = Instant.now()
)
}
override def addPeer(peer: Peer): DoneSyncing =
super.addPeer(peer).asInstanceOf[DoneSyncing]
}
/** means our node is in the process of shutting down */
@ -423,7 +431,11 @@ object NodeState {
DoneSyncing(peerWithServicesDataMap = map,
waitingForDisconnection = waitingForDisconnection,
peerFinder = peerFinder)
}
override def toString: String = {
s"NoPeers(waitingForDisconnection=$waitingForDisconnection,cachedMessages=${cachedOutboundMessages.length})"
}
}
}

View file

@ -378,7 +378,7 @@ case class PeerFinder(
val peersF = {
for {
peers <- peersToTryF
_ = logger.debug(s"Trying next set of peers $peers")
_ = logger.debug(s"Trying next set of peers ${peers.map(_.peer)}")
_ <- {
Future.traverse(peers) { p =>
// check if we already have an active connection

View file

@ -140,16 +140,6 @@ case class PeerManager(
.upsertPeer(addrBytes, peer.port, networkByte, serviceIdentifier)
}
private def replacePeer(replacePeer: Peer, withPeer: Peer): Future[Unit] = {
logger.debug(s"Replacing $replacePeer with $withPeer")
for {
_ <- disconnectPeer(replacePeer)
_ <- connectPeer(withPeer)
} yield {
()
}
}
def disconnectPeer(peer: Peer): Future[Unit] = {
logger.debug(s"Disconnecting persistent peer=$peer")
queue.offer(InitializeDisconnect(peer)).map(_ => ())
@ -244,32 +234,9 @@ case class PeerManager(
curPeerDataOpt.isDefined,
s"Could not find peer=$peer in PeerFinder!"
)
val peerData = curPeerDataOpt.get
val hasCf = peerData.serviceIdentifier.nodeCompactFilters
val notCfPeers = state.peerDataMap
.filter(p => !p._2.serviceIdentifier.nodeCompactFilters)
.keys
val availableFilterSlot = hasCf && notCfPeers.nonEmpty
val hasConnectionSlot =
state.connectedPeerCount < nodeAppConfig.maxConnectedPeers
val stateAndOfferF: (NodeRunningState, Future[Unit]) = {
if (hasConnectionSlot) {
// we want to promote this peer, so pop from cache
val newState = state.addPeer(peer)
val persistentPeerData =
newState.peerDataMap.filter(_._1 == peer).head._2
_peerDataMap.put(peer, persistentPeerData)
(newState, connectPeer(peer))
} else if (availableFilterSlot) {
val newState = state.addPeer(peer)
val persistentPeerData =
newState.peerDataMap.filter(_._1 == peer).head._2
_peerDataMap.put(peer, persistentPeerData)
(newState, replacePeer(replacePeer = notCfPeers.head, withPeer = peer))
} else {
(state, Future.unit)
}
}
val stateAndOfferF: (NodeRunningState, Future[Unit]) =
(state, connectPeer(peer))
stateAndOfferF._2.failed.foreach(err =>
logger.error(s"Failed managePeerAfterInitialization() offer to queue",
@ -356,7 +323,7 @@ case class PeerManager(
val updateLastSeenF = PeerDAO().updateLastSeenTime(peer)
val stateF: Future[NodeRunningState] = {
require(
!finder.hasPeer(peer) || !state.getPeerData(peer).isDefined,
!finder.hasPeer(peer) || state.getPeerData(peer).isEmpty,
s"$peer cannot be both a test and a persistent peer"
)
@ -601,41 +568,7 @@ case class PeerManager(
val connectF = runningState.peerFinder.connect(c.peer)
connectF.map(_ => runningState)
} else {
val hasCf = runningState
.getPeerServices(peer)
.exists(_.nodeCompactFilters)
logger.info(
s"Connected to peer $peer with compact filter support=$hasCf. Connected peer count ${runningState.peerDataMap.size} state=$state"
)
state match {
case s: SyncNodeState =>
if (s.isQueryTimedOut(nodeAppConfig.queryWaitTime)) {
// we don't want to re-request from our peer
// unless the query is timed out. This can lead to
// duplicate header requests.
// see: https://github.com/bitcoin-s/bitcoin-s/issues/5665
syncHelper(s).map(_.getOrElse(s.toDoneSyncing))
} else {
Future.successful(s)
}
case d: DoneSyncing =>
val x = d.toHeaderSync(c.peer)
syncHelper(x).map(_.getOrElse(d))
case n: NoPeers =>
val peerData = peerDataMap(c.peer)
// send cached messages
logger.debug(
s"Sending ${n.cachedOutboundMessages.length} cached messages")
val sendMsgsF = Future.traverse(n.cachedOutboundMessages)(m =>
peerData.peerMessageSender.sendMsg(m.payload))
val peerWithSvcs = peerData.peerWithServicesOpt.get
val map = Vector((peerWithSvcs, peerData)).toMap
val d = n.toDoneSyncing(map)
sendMsgsF.map(_ => d)
case x @ (_: MisbehavingPeer | _: RemovePeers |
_: NodeShuttingDown) =>
Future.successful(x)
}
handleConnectPeer(c = c, runningState = runningState)
}
}
case (state, i: InitializeDisconnect) =>
@ -895,7 +828,7 @@ case class PeerManager(
case (state, NodeStreamMessage.PeerHealthCheck) =>
state match {
case s: NodeShuttingDown =>
logger.trace(s"Ignorinng peer health check as we are shutting down")
logger.trace(s"Ignoring peer health check as we are shutting down")
Future.successful(s)
case r: NodeRunningState =>
PeerManager.handleHealthCheck(r)
@ -908,6 +841,7 @@ case class PeerManager(
state: NodeRunningState,
stp: SendToPeer
): Future[NodeRunningState] = {
logger.debug(s"sendToPeerHelper() stp=$stp state=$state")
val nodeStateF: Future[NodeRunningState] = stp.peerOpt match {
case Some(p) =>
state
@ -1222,6 +1156,84 @@ case class PeerManager(
err))
()
}
private def handleConnectPeer(
c: ConnectPeer,
runningState: NodeRunningState): Future[NodeRunningState] = {
val peer = c.peer
// require(
// runningState.isDisconnected(peer),
// s"Cannot call handleConnectPeer() with a peer arleady connected! peer=$peer")
val hasCf = runningState.peerFinder
.getPeerData(peer)
.exists(_.peerWithServicesOpt.exists(_.services.nodeCompactFilters))
val notCfPeers = runningState.peerDataMap
.filter(p => !p._2.serviceIdentifier.nodeCompactFilters)
.keys
val availableFilterSlot = hasCf && notCfPeers.nonEmpty
val hasConnectionSlot =
runningState.connectedPeerCount < nodeAppConfig.maxConnectedPeers
val newStateF: Future[NodeRunningState] = {
if (hasConnectionSlot || availableFilterSlot) {
val addPeerF: Future[NodeRunningState] = runningState match {
case s: SyncNodeState =>
val add = s.addPeer(peer)
if (add.isQueryTimedOut(nodeAppConfig.queryWaitTime)) {
// we don't want to re-request from our peer
// unless the query is timed out. This can lead to
// duplicate header requests.
// see: https://github.com/bitcoin-s/bitcoin-s/issues/5665
syncHelper(add).map(_.getOrElse(s.toDoneSyncing))
} else {
Future.successful(add)
}
case d: DoneSyncing =>
val dAdd = d.addPeer(peer)
val h = dAdd.toHeaderSync(peer)
syncHelper(h).map(_.getOrElse(dAdd))
case n: NoPeers =>
// send cached messages
val peerData = n.peerFinder.popFromCache(peer).get match {
case p: PersistentPeerData => p
case a: AttemptToConnectPeerData => a.toPersistentPeerData
}
val peerWithSvcs = peerData.peerWithServicesOpt.get
val map = Vector((peerWithSvcs, peerData)).toMap
val d = DoneSyncing(map, n.waitingForDisconnection, n.peerFinder)
logger.debug(
s"Sending ${n.cachedOutboundMessages.length} cached message to peer=$peer")
val sendMsgsF = Future.traverse(n.cachedOutboundMessages)(m =>
peerData.peerMessageSender.sendMsg(m.payload))
val h = d.toHeaderSync(peer)
sendMsgsF
.flatMap(_ => syncHelper(h).map(_.getOrElse(d)))
case x @ (_: MisbehavingPeer | _: RemovePeers |
_: NodeShuttingDown) =>
Future.successful(x)
}
addPeerF.flatMap { addPeer =>
if (availableFilterSlot && notCfPeers.nonEmpty) {
disconnectPeer(notCfPeers.head).map(_ => addPeer)
} else {
Future.successful(addPeer)
}
}
} else {
Future.successful(runningState)
}
}
newStateF.map { newState =>
newState.peerDataMap.get(peer).foreach { persistentPeerData =>
_peerDataMap.put(peer, persistentPeerData)
}
logger.info(
s"Connected to peer $peer with compact filter support=$hasCf. Connected peer count ${runningState.peerDataMap.size} state=$newState"
)
newState
}
}
}
case class ResponseTimeout(payload: NetworkPayload)