Add NodeState.NoPeers, cache network messages (#5800)

* Add NodeState.NoPeers, cache network messages that need to be sent when we establish a connection with a peer

* Cleanup

* Bump maxTries to avoid spurious timeout

* Empty commit to re-run CI
This commit is contained in:
Chris Stewart 2024-12-09 13:30:13 -06:00 committed by GitHub
parent 4df0a7bc4e
commit eb6edab240
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 178 additions and 77 deletions

View file

@ -1,7 +1,7 @@
package org.bitcoins.node
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.core.currency._
import org.bitcoins.core.currency.*
import org.bitcoins.core.protocol.script.MultiSignatureScriptPubKey
import org.bitcoins.core.protocol.transaction.TransactionOutput
import org.bitcoins.core.wallet.fee.SatoshisPerByte
@ -23,6 +23,7 @@ import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
class NeutrinoNodeWithWalletTest extends NodeTestWithCachedBitcoindNewest {
val DEFAULT_ADDR_BATCH_SIZE = 7
/** Wallet config with data directory set to user temp directory */
override protected def getFreshConfig: BitcoinSAppConfig =
@ -220,7 +221,7 @@ class NeutrinoNodeWithWalletTest extends NodeTestWithCachedBitcoindNewest {
rescan <- wallet.isRescanning()
_ = assert(!rescan)
rescanState <- wallet.rescanHandling
.fullRescanNeutrinoWallet(addressBatchSize = 7)
.fullRescanNeutrinoWallet(addressBatchSize = DEFAULT_ADDR_BATCH_SIZE)
_ <- AsyncUtil.awaitConditionF(
() => condition(),
@ -303,4 +304,25 @@ class NeutrinoNodeWithWalletTest extends NodeTestWithCachedBitcoindNewest {
assert(balanceAfterSpend < initBalance)
}
}
it must "retry fetching a block if for some reason our block data source isn't available the first time" in {
param =>
val NeutrinoNodeFundedWalletBitcoind(node, wallet, bitcoind) = param
for {
initBalance <- wallet.getBalance()
_ = assert(initBalance != CurrencyUnits.zero)
peer1 <- NodeTestUtil.getBitcoindPeer(bitcoind)
_ <- node.peerManager.disconnectPeer(peer1)
_ <- NodeTestUtil.awaitConnectionCount(node, 0)
_ <- wallet.rescanHandling.fullRescanNeutrinoWallet(
DEFAULT_ADDR_BATCH_SIZE)
_ <- node.peerManager.connectPeer(peer1)
_ <- AsyncUtil.retryUntilSatisfiedF(() => {
wallet.getInfo().map(!_.rescan)
},
maxTries = 100)
balanceAfterRescan <- wallet.getBalance()
} yield assert(balanceAfterRescan == initBalance)
}
}

View file

@ -3,7 +3,6 @@ package org.bitcoins.node
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse
import NodeState.DoneSyncing
import org.apache.pekko.{Done, NotUsed}
import org.apache.pekko.actor.{ActorSystem, Cancellable}
import org.apache.pekko.stream.{
@ -106,11 +105,9 @@ case class NeutrinoNode(
queue = queue
)
val initState =
DoneSyncing(
peerWithServicesDataMap = Map.empty,
waitingForDisconnection = Set.empty,
peerFinder
)
NodeState.NoPeers(waitingForDisconnection = Set.empty,
peerFinder,
cachedOutboundMessages = Vector.empty)
val graph =
buildDataMessageStreamGraph(initState = initState, source = source)

View file

@ -1,7 +1,11 @@
package org.bitcoins.node
import org.bitcoins.core.api.node.{Peer, PeerWithServices}
import org.bitcoins.core.p2p.{CompactFilterMessage, ServiceIdentifier}
import org.bitcoins.core.p2p.{
CompactFilterMessage,
NetworkMessage,
ServiceIdentifier
}
import org.bitcoins.node.NodeState.{
DoneSyncing,
FilterHeaderSync,
@ -13,7 +17,7 @@ import org.bitcoins.node.NodeState.{
import org.bitcoins.node.networking.peer.{PeerConnection, PeerMessageSender}
import java.time.Instant
import java.time.temporal.{ChronoUnit}
import java.time.temporal.ChronoUnit
import scala.concurrent.duration.FiniteDuration
import scala.util.Random
@ -63,22 +67,31 @@ sealed trait NodeRunningState extends NodeState {
def replacePeers(
peerWithServicesDataMap: Map[PeerWithServices, PersistentPeerData]
): NodeRunningState = {
this match {
case h: NodeState.HeaderSync =>
h.copy(peerWithServicesDataMap = peerWithServicesDataMap)
case fh: NodeState.FilterHeaderSync =>
fh.copy(peerWithServicesDataMap = peerWithServicesDataMap)
case fs: NodeState.FilterSync =>
fs.copy(peerWithServicesDataMap = peerWithServicesDataMap)
case d: NodeState.DoneSyncing =>
d.copy(peerWithServicesDataMap = peerWithServicesDataMap)
case rm: NodeState.RemovePeers =>
rm.copy(peerWithServicesDataMap = peerWithServicesDataMap)
case m: NodeState.MisbehavingPeer =>
m.copy(peerWithServicesDataMap = peerWithServicesDataMap)
case s: NodeState.NodeShuttingDown =>
s.copy(peerWithServicesDataMap = peerWithServicesDataMap)
if (peerWithServicesDataMap.isEmpty) {
NodeState.NoPeers(waitingForDisconnection, peerFinder, Vector.empty)
} else {
this match {
case h: NodeState.HeaderSync =>
h.copy(peerWithServicesDataMap = peerWithServicesDataMap)
case fh: NodeState.FilterHeaderSync =>
fh.copy(peerWithServicesDataMap = peerWithServicesDataMap)
case fs: NodeState.FilterSync =>
fs.copy(peerWithServicesDataMap = peerWithServicesDataMap)
case d: NodeState.DoneSyncing =>
d.copy(peerWithServicesDataMap = peerWithServicesDataMap)
case rm: NodeState.RemovePeers =>
rm.copy(peerWithServicesDataMap = peerWithServicesDataMap)
case m: NodeState.MisbehavingPeer =>
m.copy(peerWithServicesDataMap = peerWithServicesDataMap)
case s: NodeState.NodeShuttingDown =>
s.copy(peerWithServicesDataMap = peerWithServicesDataMap)
case n: NodeState.NoPeers =>
DoneSyncing(peerWithServicesDataMap,
n.waitingForDisconnection,
n.peerFinder)
}
}
}
def addPeer(peer: Peer): NodeRunningState = {
@ -112,7 +125,7 @@ sealed trait NodeRunningState extends NodeState {
case Some(newSyncNodeState) =>
newSyncNodeState.replacePeers(filtered)
case None =>
toDoneSyncing.replacePeers(filtered)
sync.toDoneSyncing.replacePeers(filtered)
}
} else {
sync.replacePeers(filtered)
@ -120,6 +133,8 @@ sealed trait NodeRunningState extends NodeState {
case x @ (_: DoneSyncing | _: MisbehavingPeer | _: RemovePeers |
_: NodeShuttingDown) =>
x.replacePeers(filtered)
case n: NodeState.NoPeers =>
sys.error(s"Cannot remove peer=$peer when we have no peers! $n")
}
}
@ -141,6 +156,8 @@ sealed trait NodeRunningState extends NodeState {
m.copy(waitingForDisconnection = newWaitingForDisconnection)
case s: NodeState.NodeShuttingDown =>
s.copy(waitingForDisconnection = newWaitingForDisconnection)
case n: NodeState.NoPeers =>
n.copy(waitingForDisconnection = newWaitingForDisconnection)
}
}
@ -179,10 +196,6 @@ sealed trait NodeRunningState extends NodeState {
def isDisconnected(peer: Peer): Boolean = !isConnected(peer)
def toDoneSyncing: DoneSyncing = {
DoneSyncing(peerWithServicesDataMap, waitingForDisconnection, peerFinder)
}
override def toString: String = {
s"${getClass.getSimpleName}(peers=${peers},waitingForDisconnection=${waitingForDisconnection})"
}
@ -245,6 +258,10 @@ sealed abstract class SyncNodeState extends NodeRunningState {
)
}
def toDoneSyncing: DoneSyncing = {
DoneSyncing(peerWithServicesDataMap, waitingForDisconnection, peerFinder)
}
/** The time when we sent the last query */
def sentQuery: Instant
@ -323,6 +340,11 @@ object NodeState {
peersToRemove.forall(rm => peers.exists(_ == rm)),
s"peersToRemove must be subset of peers, peersToRemove=$peersToRemove peers=$peers"
)
/** Means we have no good peers are removing these peers */
def isDisconnected: Boolean = {
peerWithServicesDataMap.keys.toSet == peersToRemove.toSet
}
}
/** State to indicate we are not currently syncing with a peer */
@ -331,6 +353,9 @@ object NodeState {
waitingForDisconnection: Set[Peer],
peerFinder: PeerFinder
) extends NodeRunningState {
require(
peerWithServicesDataMap.nonEmpty,
s"Cannot have 0 peers, use NoPeers to represent state where we have 0 peers")
override val isSyncing: Boolean = false
/** Selects a random peer and returns us a header sync state returns None if
@ -378,4 +403,27 @@ object NodeState {
override val isSyncing: Boolean = false
}
case class NoPeers(
waitingForDisconnection: Set[Peer],
peerFinder: PeerFinder,
cachedOutboundMessages: Vector[NetworkMessage])
extends NodeRunningState {
override val isSyncing: Boolean = false
override val peerWithServicesDataMap: Map[PeerWithServices, Nothing] =
Map.empty
def toDoneSyncing(
map: Map[PeerWithServices, PersistentPeerData]): DoneSyncing = {
require(
map.nonEmpty,
s"Cannot convert NoPeers -> DoneSyncing with no peers to connected")
require(
cachedOutboundMessages.isEmpty,
s"Cannot drop messages that are cached, length=${cachedOutboundMessages.length}")
DoneSyncing(peerWithServicesDataMap = map,
waitingForDisconnection = waitingForDisconnection,
peerFinder = peerFinder)
}
}
}

View file

@ -326,6 +326,10 @@ case class PeerManager(
case x @ (_: RemovePeers | _: MisbehavingPeer |
_: NodeShuttingDown) =>
Future.successful(x)
case n: NoPeers =>
val exn = new RuntimeException(
s"Inconsistent state, cannot have no peers and re-connect to peer state=$n")
Future.failed(exn)
}
} else {
logger.warn(s"onInitialization called for unknown $peer")
@ -417,10 +421,14 @@ case class PeerManager(
case x @ (_: DoneSyncing | _: NodeShuttingDown | _: MisbehavingPeer |
_: RemovePeers) =>
Future.successful(x)
case n: NoPeers =>
val exn = new RuntimeException(
s"Inconsistent state, cannot have 0 peers and be disconnecting, state=$n")
Future.failed(exn)
}
} else {
// no new peers to try to sync from, transition to done syncing?
val done = state.removePeer(disconnectedPeer).toDoneSyncing
val done = state.removePeer(disconnectedPeer)
if (forceReconnect && !isShuttingDown) {
finder.reconnect(disconnectedPeer).map(_ => done)
} else if (!isShuttingDown) {
@ -451,7 +459,7 @@ case class PeerManager(
// if we are removing this peer and an existing query timed out because of that
// peerData will not have this peer
state.getPeerData(peer).map(_.updateLastFailureTime())
state.getPeerData(peer).foreach(_.updateLastFailureTime())
payload match {
case _: GetHeadersMessage =>
@ -462,8 +470,10 @@ case class PeerManager(
syncFromNewPeer(syncState)
.map(_ => ())
case s @ (_: DoneSyncing | _: MisbehavingPeer | _: RemovePeers |
_: NodeShuttingDown) =>
sys.error(s"Cannot have state=$s and have a query timeout")
_: NodeShuttingDown | _: NoPeers) =>
val exn = new RuntimeException(
s"Cannot have state=$s and have a query timeout")
Future.failed(exn)
}
}
@ -489,7 +499,7 @@ case class PeerManager(
syncFromNewPeer(x)
case _: FilterHeaderSync | _: FilterSync | _: RemovePeers |
_: NodeShuttingDown =>
_: NodeShuttingDown | _: NoPeers =>
Future.successful(Some(state))
}
}
@ -535,7 +545,7 @@ case class PeerManager(
s"Ignoring sync request for peer=${p} as its waiting for disconnection"
)
Future.successful(Some(s))
case x @ (_: MisbehavingPeer | _: RemovePeers) =>
case x @ (_: MisbehavingPeer | _: RemovePeers | _: NoPeers) =>
logger.warn(
s"Ignoring sync request for peer=${p} while we are in state=$x"
)
@ -552,7 +562,7 @@ case class PeerManager(
case None =>
state match {
case x @ (_: SyncNodeState | _: MisbehavingPeer | _: RemovePeers |
_: NodeShuttingDown) =>
_: NodeShuttingDown | _: NoPeers) =>
// we are either syncing already, or we are in a bad state to start a sync
Future.successful(Some(x))
case d: DoneSyncing =>
@ -593,8 +603,7 @@ case class PeerManager(
} else {
val hasCf = runningState
.getPeerServices(peer)
.map(_.nodeCompactFilters)
.getOrElse(false)
.exists(_.nodeCompactFilters)
logger.info(
s"Connected to peer $peer with compact filter support=$hasCf. Connected peer count ${runningState.peerDataMap.size} state=$state"
)
@ -612,6 +621,17 @@ case class PeerManager(
case d: DoneSyncing =>
val x = d.toHeaderSync(c.peer)
syncHelper(x).map(_.getOrElse(d))
case n: NoPeers =>
val peerData = peerDataMap(c.peer)
// send cached messages
logger.debug(
s"Sending ${n.cachedOutboundMessages.length} cached messages")
val sendMsgsF = Future.traverse(n.cachedOutboundMessages)(m =>
peerData.peerMessageSender.sendMsg(m.payload))
val peerWithSvcs = peerData.peerWithServicesOpt.get
val map = Vector((peerWithSvcs, peerData)).toMap
val d = n.toDoneSyncing(map)
sendMsgsF.map(_ => d)
case x @ (_: MisbehavingPeer | _: RemovePeers |
_: NodeShuttingDown) =>
Future.successful(x)
@ -667,9 +687,7 @@ case class PeerManager(
logger.debug(s"Got ${payload.commandName} from peer=${peer} in stream")
state match {
case runningState: NodeRunningState =>
val peerDataOpt = runningState.peerDataMap
.find(_._1 == peer)
.map(_._2)
val peerDataOpt = runningState.peerDataMap.get(peer)
peerDataOpt match {
case None =>
logger.debug(
@ -701,7 +719,7 @@ case class PeerManager(
)
} yield newDmh.state
case x @ (_: SyncNodeState | _: DoneSyncing |
_: NodeShuttingDown) =>
_: NodeShuttingDown | _: NoPeers) =>
Future.successful(x)
}
}
@ -890,32 +908,38 @@ case class PeerManager(
state: NodeRunningState,
stp: SendToPeer
): Future[NodeRunningState] = {
val peerMsgSenderOpt = stp.peerOpt match {
val nodeStateF: Future[NodeRunningState] = stp.peerOpt match {
case Some(p) =>
state.getPeerMsgSender(p)
case None =>
state match {
case s: SyncNodeState => Some(s.syncPeerMessageSender)
case x @ (_: DoneSyncing | _: MisbehavingPeer | _: NodeShuttingDown |
_: RemovePeers) =>
x.randomPeerMessageSender(
Set.empty,
ServiceIdentifier.NODE_COMPACT_FILTERS
)
}
}
peerMsgSenderOpt match {
case Some(pms) =>
pms
state
.getPeerMsgSender(p)
.get
.sendMsg(stp.msg.payload)
.map(_ => state)
case None =>
logger.warn(
s"Unable to find peer to send message=${stp.msg.payload} to, state=$state"
)
Future.successful(state)
state match {
case s: SyncNodeState =>
s.syncPeerMessageSender
.sendMsg(stp.msg.payload)
.map(_ => s)
case x @ (_: DoneSyncing | _: MisbehavingPeer | _: NodeShuttingDown |
_: RemovePeers) =>
val pms = x
.randomPeerMessageSender(
Set.empty,
ServiceIdentifier.NODE_COMPACT_FILTERS
)
.get
pms
.sendMsg(stp.msg.payload)
.map(_ => state)
case n: NoPeers =>
val addMsg = n.copy(cachedOutboundMessages =
n.cachedOutboundMessages.appended(stp.msg))
Future.successful(addMsg)
}
}
nodeStateF
}
private def switchSyncToPeer(
@ -1141,7 +1165,7 @@ case class PeerManager(
rm.randomPeer(excludePeers = rm.peersToRemove.toSet, svcIdentifier)
case d: DoneSyncing =>
d.randomPeer(Set.empty, svcIdentifier)
case _: NodeShuttingDown => None
case _: NodeShuttingDown | _: NoPeers => None
}
val newStateOptF: Future[Option[NodeRunningState]] = for {
newStateOpt <- syncPeerOpt match {
@ -1154,7 +1178,7 @@ case class PeerManager(
val hs = d.toHeaderSync(syncPeer)
syncHelper(hs)
case x @ (_: MisbehavingPeer | _: RemovePeers |
_: NodeShuttingDown) =>
_: NodeShuttingDown | _: NoPeers) =>
Future.successful(Some(x))
}
case None => Future.successful(None)

View file

@ -74,7 +74,7 @@ case class DataMessageHandler(
// see: https://github.com/bitcoin-s/bitcoin-s/issues/5429
logger.info(
s"Query timed out with in state=$state, received payload=${payload.commandName}")
copy(state = state.toDoneSyncing)
copy(state = syncState.toDoneSyncing)
} else {
this
}
@ -127,14 +127,21 @@ case class DataMessageHandler(
s"Cannot continue processing p2p messages from peer we were suppose to remove, peer=${peerData.peer}"
)
)
} else if (r.isDisconnected) {
val n = NoPeers(waitingForDisconnection = r.waitingForDisconnection,
peerFinder = r.peerFinder,
cachedOutboundMessages = Vector.empty)
copy(state = n).handleDataPayload(payload, peerData)
} else {
val d = r.toDoneSyncing
val d = DoneSyncing(r.peerWithServicesDataMap,
r.waitingForDisconnection,
r.peerFinder)
copy(state = d).handleDataPayload(payload, peerData)
}
case _: NodeShuttingDown =>
case s @ (_: NodeShuttingDown | _: NoPeers) =>
logger.warn(
s"Ignoring message ${payload.commandName} from peer=${peerData.peer} in state=$state because we are shuttingdown."
s"Ignoring message ${payload.commandName} from peer=${peerData.peer} in state=$s."
)
Future.successful(this)
@ -198,7 +205,7 @@ case class DataMessageHandler(
s"Ignoring filterheaders msg with size=${filterHeader.filterHashes.size} while in state=$x from peer=$peer"
)
Future.successful(copy(state = x))
case x @ (_: MisbehavingPeer | _: RemovePeers) =>
case x @ (_: MisbehavingPeer | _: RemovePeers | _: NoPeers) =>
sys.error(
s"Incorrect state for handling filter header messages, got=$x"
)
@ -236,7 +243,8 @@ case class DataMessageHandler(
s"Ignoring filter msg with blockHash=${filter.blockHashBE} while in state=$x from peer=$peer"
)
Future.successful(copy(state = x))
case x @ (_: MisbehavingPeer | _: RemovePeers | _: HeaderSync) =>
case x @ (_: MisbehavingPeer | _: RemovePeers | _: HeaderSync |
_: NoPeers) =>
sys.error(s"Incorrect state for handling filter messages, got=$x")
}
case notHandling @ (MemPoolMessage | _: GetHeadersMessage |
@ -448,14 +456,15 @@ case class DataMessageHandler(
)
val _ = peerManager.gossipGetHeadersMessage(cachedHeaders)
// switch to DoneSyncing state until we receive a valid header from our peers
val d = state.toDoneSyncing
d
DoneSyncing(state.peerWithServicesDataMap,
state.waitingForDisconnection,
state.peerFinder)
}
} yield newState
}
case _: FilterHeaderSync | _: FilterSync | _: NodeShuttingDown =>
Future.successful(state)
case m @ (_: MisbehavingPeer | _: RemovePeers) =>
case m @ (_: MisbehavingPeer | _: RemovePeers | _: NoPeers) =>
val exn = new RuntimeException(
s"Cannot recover invalid headers, got=$m"
)
@ -779,7 +788,7 @@ case class DataMessageHandler(
s"Ignoring block headers msg with size=${headers.size} while in state=$x from peer=$peer"
)
Some(x)
case x @ (_: MisbehavingPeer | _: RemovePeers) =>
case x @ (_: MisbehavingPeer | _: RemovePeers | _: NoPeers) =>
sys.error(s"Invalid state to receive headers in, got=$x")
}
newStateOpt match {
@ -798,7 +807,8 @@ case class DataMessageHandler(
case Some(x @ (_: FilterHeaderSync | _: FilterSync)) =>
Future.successful(copy(state = x))
case Some(
x @ (_: MisbehavingPeer | _: RemovePeers | _: NodeShuttingDown)
x @ (_: MisbehavingPeer | _: RemovePeers | _: NodeShuttingDown |
_: NoPeers)
) =>
Future.successful(copy(state = x))
case None =>