Rework randomPeerWithServices() to be inside of NodeState (#5331)

* Rework randomPeerWithServices() to be inside of NodeState, try to move more state into NodeState

* Don't try to sync from peer waitingForDisconnection

* Add PeerWithServices, keep track of ServiceIdentifier in PeerManager and NodeState

* Try to make reorg test more reliable

* Empty commit to run CI
This commit is contained in:
Chris Stewart 2023-12-29 14:47:29 -06:00 committed by GitHub
parent f1775c46d3
commit 06dfd9cea4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 261 additions and 122 deletions

View File

@ -118,7 +118,8 @@ class ChainHandler(
headersWeAlreadyHave.exists(_.hashBE == h.hashBE))
if (filteredHeaders.isEmpty) {
return Future.failed(DuplicateHeaders(s"Received duplicate headers."))
return Future.failed(
DuplicateHeaders(s"Received duplicate block headers."))
}
val blockchainUpdates: Vector[BlockchainUpdate] = {

View File

@ -1,23 +1,34 @@
package org.bitcoins.core.api.node
import org.bitcoins.core.p2p.CompactFilterMessage
import org.bitcoins.core.p2p.{CompactFilterMessage, ServiceIdentifier}
import scala.util.Random
sealed abstract class NodeState {
def isSyncing: Boolean
def peersWithServices: Set[PeerWithServices]
/** All peers the node is currently connected to */
def peers: Set[Peer]
def peers: Set[Peer] = peersWithServices.map(_.peer)
def waitingForDisconnection: Set[Peer]
def replacePeers(newPeers: Set[Peer]): NodeState = this match {
case h: NodeState.HeaderSync => h.copy(peers = newPeers)
case fh: NodeState.FilterHeaderSync => fh.copy(peers = newPeers)
case fs: NodeState.FilterSync => fs.copy(peers = newPeers)
case d: NodeState.DoneSyncing => d.copy(peers = newPeers)
case rm: NodeState.RemovePeers => rm.copy(peers = newPeers)
case m: NodeState.MisbehavingPeer => m.copy(peers = newPeers)
}
def replacePeers(peerWithServices: Set[PeerWithServices]): NodeState =
this match {
case h: NodeState.HeaderSync =>
h.copy(peersWithServices = peerWithServices)
case fh: NodeState.FilterHeaderSync =>
fh.copy(peersWithServices = peerWithServices)
case fs: NodeState.FilterSync =>
fs.copy(peersWithServices = peerWithServices)
case d: NodeState.DoneSyncing =>
d.copy(peersWithServices = peerWithServices)
case rm: NodeState.RemovePeers =>
rm.copy(peersWithServices = peerWithServices)
case m: NodeState.MisbehavingPeer =>
m.copy(peersWithServices = peerWithServices)
}
def replaceWaitingForDisconnection(
newWaitingForDisconnection: Set[Peer]): NodeState = {
@ -37,6 +48,24 @@ sealed abstract class NodeState {
}
}
def randomPeer(
excludePeers: Set[Peer],
services: ServiceIdentifier): Option[Peer] = {
val filteredPeers =
peersWithServices
.filterNot(p => excludePeers.exists(_ == p.peer))
//don't give peer a peer that we are waiting to disconnect
.filterNot(p => waitingForDisconnection.exists(_ == p.peer))
.filter(p => p.services.hasServicesOf(services))
.toVector
val peerOpt = if (filteredPeers.nonEmpty) {
Some(filteredPeers(Random.nextInt(filteredPeers.length)))
} else {
None
}
peerOpt.map(_.peer)
}
}
/** State to indicate that we are syncing the blockchain */
@ -61,19 +90,19 @@ object NodeState {
case class HeaderSync(
syncPeer: Peer,
peers: Set[Peer],
peersWithServices: Set[PeerWithServices],
waitingForDisconnection: Set[Peer])
extends SyncNodeState
case class FilterHeaderSync(
syncPeer: Peer,
peers: Set[Peer],
peersWithServices: Set[PeerWithServices],
waitingForDisconnection: Set[Peer])
extends SyncNodeState
case class FilterSync(
syncPeer: Peer,
peers: Set[Peer],
peersWithServices: Set[PeerWithServices],
waitingForDisconnection: Set[Peer],
filterBatchCache: Set[CompactFilterMessage])
extends SyncNodeState {
@ -85,7 +114,7 @@ object NodeState {
case class MisbehavingPeer(
badPeer: Peer,
peers: Set[Peer],
peersWithServices: Set[PeerWithServices],
waitingForDisconnection: Set[Peer])
extends NodeState {
if (peers.nonEmpty) {
@ -100,7 +129,7 @@ object NodeState {
case class RemovePeers(
peersToRemove: Vector[Peer],
peers: Set[Peer],
peersWithServices: Set[PeerWithServices],
waitingForDisconnection: Set[Peer],
isSyncing: Boolean)
extends NodeState {
@ -110,7 +139,9 @@ object NodeState {
}
/** State to indicate we are not currently syncing with a peer */
case class DoneSyncing(peers: Set[Peer], waitingForDisconnection: Set[Peer])
case class DoneSyncing(
peersWithServices: Set[PeerWithServices],
waitingForDisconnection: Set[Peer])
extends NodeState {
override val isSyncing: Boolean = false
}

View File

@ -0,0 +1,12 @@
package org.bitcoins.core.api.node
import org.bitcoins.core.api.tor.Socks5ProxyParams
import org.bitcoins.core.p2p.ServiceIdentifier
import java.net.InetSocketAddress
case class PeerWithServices(peer: Peer, services: ServiceIdentifier) {
val id: Option[Long] = peer.id
val socket: InetSocketAddress = peer.socket
val socks5ProxyParams: Option[Socks5ProxyParams] = peer.socks5ProxyParams
}

View File

@ -85,7 +85,7 @@ sealed abstract class ServiceIdentifier extends NetworkElement {
val innerText =
if (nodeNone) "none"
else
s"network=$nodeNetwork, compactFilters=$nodeCompactFilters, getUtxo=$nodeGetUtxo, bloom=$nodeBloom, witness=$nodeWitness, xthin=$nodeXthin, networkLimited=$nodeNetworkLimited"
s"network=$nodeNetwork,compactFilters=$nodeCompactFilters,getUtxo=$nodeGetUtxo,bloom=$nodeBloom,witness=$nodeWitness,xthin=$nodeXthin,networkLimited=$nodeNetworkLimited"
s"ServiceIdentifier($innerText)"
}

View File

@ -396,8 +396,10 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
bestBlockHash0 <- bitcoind0.getBestBlockHash()
_ <- bitcoind0.invalidateBlock(bestBlockHash0)
//now generate a block, make sure we sync with them
_ <- bitcoind0.generate(1)
_ <- AsyncUtil.nonBlockingSleep(1.second)
hashes <- bitcoind0.generate(1)
chainApi <- node.chainApiFromDb()
_ <- AsyncUtil.retryUntilSatisfiedF(() =>
chainApi.getHeader(hashes.head).map(_.isDefined))
//generate another block to make sure the reorg is complete
_ <- bitcoind0.generate(1)
_ <- NodeTestUtil.awaitAllSync(node, bitcoind0)

View File

@ -227,7 +227,12 @@ class NeutrinoNodeWithWalletTest extends NodeTestWithCachedBitcoindNewest {
val bitcoindAddrF = bitcoind.getNewAddress
val sendAmt = Bitcoins.one
//stop the node to take us offline
val stopF = node.stop()
val stopF = {
for {
_ <- NodeTestUtil.awaitAllSync(node, bitcoind)
n <- node.stop()
} yield n
}
for {
initBalance <- initBalanceF
receiveAddr <- receivedAddrF

View File

@ -58,7 +58,7 @@ class DataMessageHandlerTest extends NodeTestWithCachedBitcoindNewest {
walletCreationTimeOpt = None,
peerMessageSenderApi = peerMsgSender,
peerManager = peerManager,
state = HeaderSync(peer, peerManager.peers, Set.empty)
state = HeaderSync(peer, peerManager.peersWithServices, Set.empty)
)(node.executionContext, node.nodeAppConfig, node.chainConfig)
// Use signet genesis block header, this should be invalid for regtest

View File

@ -1,13 +1,6 @@
package org.bitcoins.node
import akka.{Done, NotUsed}
import akka.actor.{ActorSystem, Cancellable}
import akka.stream.{
ActorAttributes,
OverflowStrategy,
QueueOfferResult,
Supervision
}
import akka.stream.scaladsl.{
Keep,
RunnableGraph,
@ -15,20 +8,26 @@ import akka.stream.scaladsl.{
SourceQueue,
SourceQueueWithComplete
}
import akka.stream.{
ActorAttributes,
OverflowStrategy,
QueueOfferResult,
Supervision
}
import akka.{Done, NotUsed}
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse
import org.bitcoins.core.api.node.NodeState.DoneSyncing
import org.bitcoins.core.api.node.{NodeState, NodeType, Peer}
import org.bitcoins.core.config.{MainNet, RegTest, SigNet, TestNet3}
import org.bitcoins.core.p2p.ServiceIdentifier
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.node.config.NodeAppConfig
import java.time.Instant
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
case class NeutrinoNode(
walletCreationTimeOpt: Option[Instant],
@ -95,7 +94,8 @@ case class NeutrinoNode(
override def start(): Future[NeutrinoNode] = {
isStarted.set(true)
val initState =
DoneSyncing(peers = Set.empty, waitingForDisconnection = Set.empty)
DoneSyncing(peersWithServices = Set.empty,
waitingForDisconnection = Set.empty)
val (queue, source) =
dataMessageStreamSource.preMaterialize()
@ -157,12 +157,11 @@ case class NeutrinoNode(
* @return
*/
override def sync(): Future[Unit] = {
val serviceIdentifier = ServiceIdentifier.NODE_COMPACT_FILTERS
//wait for a peer to be available to sync from...
//due to underlying mutability in PeerManager/PeerFinder
//we may not have a peer available for selection immediately
val peerAvailableF = AsyncUtil.retryUntilSatisfied(
peerManager.randomPeerWithService(serviceIdentifier).isDefined)
val peerAvailableF =
AsyncUtil.retryUntilSatisfiedF(() => getConnectionCount.map(_ > 0))
for {
_ <- peerAvailableF
_ <- peerManager.sync(None)

View File

@ -3,7 +3,7 @@ package org.bitcoins.node
import akka.Done
import akka.actor.ActorSystem
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.node.Peer
import org.bitcoins.core.api.node.{Peer, PeerWithServices}
import org.bitcoins.core.p2p.ServiceIdentifier
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.networking.peer._
@ -21,6 +21,10 @@ sealed trait PeerData {
implicit protected def system: ActorSystem
def peer: Peer
def peerWithServicesOpt: Option[PeerWithServices] = {
_serviceIdentifier.map(PeerWithServices(peer, _))
}
def peerMessageSender: PeerMessageSender
def stop(): Future[Done] = {

View File

@ -64,6 +64,10 @@ case class PeerManager(
override def peers: Set[Peer] = _peerDataMap.keys.toSet
def peersWithServices: Set[PeerWithServices] = {
peerDataMap.map(_._2.peerWithServicesOpt).flatten.toSet
}
/** Starts sync compact filter headers.
* Only starts syncing compact filters if our compact filter headers are in sync with block headers
*/
@ -121,7 +125,18 @@ case class PeerManager(
private def randomPeerConnection(
services: ServiceIdentifier): Option[PeerConnection] = {
val peerOpt = randomPeerWithService(services)
val potentialPeers =
peerDataMap.filter(_._2.serviceIdentifier.hasServicesOf(services))
val peerOpt = {
if (potentialPeers.nonEmpty) {
val randIdx = Random.nextInt(potentialPeers.size)
Some(potentialPeers.keys.toVector(randIdx))
} else {
None
}
}
val peerConnectionOpt = peerOpt.flatMap(getPeerConnection(_))
peerConnectionOpt
}
@ -131,23 +146,6 @@ case class PeerManager(
randomPeerOpt.map(PeerMessageSender(_))
}
def randomPeerWithService(services: ServiceIdentifier): Option[Peer] = {
val filteredPeers =
peerDataMap
.filter(p => p._2.serviceIdentifier.hasServicesOf(services))
.keys
.toVector
val (good, _) =
filteredPeers.partition(p => !peerDataMap(p).hasFailedRecently)
val peerOpt = if (good.nonEmpty) {
Some(good(Random.nextInt(good.length)))
} else {
None
}
peerOpt
}
private def createInDb(
peer: Peer,
serviceIdentifier: ServiceIdentifier): Future[PeerDb] = {
@ -274,7 +272,7 @@ case class PeerManager(
//if we have slots remaining, connect
if (connectedPeerCount < nodeAppConfig.maxConnectedPeers) {
connectPeer(peer)
.flatMap(_ => syncHelper(Some(peer)))
.flatMap(_ => syncHelper(peer))
} else {
val notCf = peerDataMap
.filter(p => !p._2.serviceIdentifier.nodeCompactFilters)
@ -283,7 +281,7 @@ case class PeerManager(
//try to drop another non compact filter connection for this
if (hasCf && notCf.nonEmpty)
replacePeer(replacePeer = notCf.head, withPeer = peer)
.flatMap(_ => syncHelper(Some(peer)))
.flatMap(_ => syncHelper(peer))
else {
peerData
.stop()
@ -332,14 +330,16 @@ case class PeerManager(
} else if (peerDataMap.contains(peer)) {
//one of the persistent peers initialized again, this can happen in case of a reconnection attempt
//which succeeded which is all good, do nothing
syncHelper(Some(peer)).map(_ => state)
syncHelper(peer).map(_ => state) //does this state need to be modified?
} else {
logger.warn(s"onInitialization called for unknown $peer")
Future.successful(state)
}
}
stateF.map(_.replacePeers(peers))
stateF.map { s =>
s.replacePeers(peersWithServices)
}
}
/** @param peer the peer we were disconencted from
@ -376,7 +376,8 @@ case class PeerManager(
(forceReconnect || connectedPeerCount == 0) && isStarted.get
if (peers.exists(_ != peer)) {
val randomPeerOpt =
randomPeerWithService(ServiceIdentifier.NODE_COMPACT_FILTERS)
state.randomPeer(excludePeers = Set(peer),
ServiceIdentifier.NODE_COMPACT_FILTERS)
randomPeerOpt match {
case Some(peer) =>
state match {
@ -384,7 +385,7 @@ case class PeerManager(
switchSyncToPeer(oldSyncState = syncState, newPeer = peer)
case d: DoneSyncing =>
//defensively try to sync with the new peer
syncHelper(Some(peer)).map(_ => d)
syncHelper(peer).map(_ => d)
case x @ (_: MisbehavingPeer | _: RemovePeers) =>
Future.successful(x)
}
@ -421,17 +422,18 @@ case class PeerManager(
case s: SyncNodeState =>
if (s.syncPeer == peer) {
//the peer being disconnected is our sync peer
randomPeerWithService(ServiceIdentifier.NODE_COMPACT_FILTERS) match {
s.randomPeer(excludePeers = Set(peer),
ServiceIdentifier.NODE_COMPACT_FILTERS) match {
case Some(p) => s.replaceSyncPeer(p)
case None =>
//switch to state DoneSyncing since we have no peers to sync from
DoneSyncing(peers, state.waitingForDisconnection)
DoneSyncing(peersWithServices, state.waitingForDisconnection)
}
} else {
s.replacePeers(peers)
s.replacePeers(peersWithServices)
}
case s @ (_: RemovePeers | _: MisbehavingPeer | _: DoneSyncing) =>
s.replacePeers(peers)
s.replacePeers(peersWithServices)
}
}
@ -451,28 +453,33 @@ case class PeerManager(
case _: GetHeadersMessage =>
queue.offer(HeaderTimeoutWrapper(peer)).map(_ => ())
case _ =>
val syncPeer = state match {
state match {
case syncState: SyncNodeState =>
syncState.syncPeer
syncFromNewPeer(syncState)
.map(_ => ())
case s @ (_: DoneSyncing | _: MisbehavingPeer | _: RemovePeers) =>
sys.error(s"Cannot have state=$s and have a query timeout")
}
if (peer == syncPeer)
syncFromNewPeer().map(_ => ())
else Future.unit
}
}
/** @param peer
* @param state
* @return a NodeState that contains the new peer we are syncing with, None if we couldn't find a new peer to sync with
*/
private def onHeaderRequestTimeout(
peer: Peer,
state: NodeState): Future[NodeState] = {
state: NodeState): Future[Option[NodeState]] = {
logger.info(s"Header request timed out from $peer in state $state")
state match {
case _: HeaderSync | _: MisbehavingPeer | _: DoneSyncing =>
syncFromNewPeer().map(_ => state)
case h: HeaderSync =>
syncFromNewPeer(h)
case x @ (_: MisbehavingPeer | _: DoneSyncing) =>
syncFromNewPeer(x)
case _: FilterHeaderSync | _: FilterSync | _: RemovePeers =>
Future.successful(state)
Future.successful(Some(state))
}
}
@ -506,7 +513,50 @@ case class PeerManager(
initState: NodeState): Sink[NodeStreamMessage, Future[NodeState]] = {
Sink.foldAsync(initState) {
case (state, s: StartSync) =>
syncHelper(s.peerOpt).map(_ => state)
val nodeStateOptF: Future[Option[NodeState]] = s.peerOpt match {
case Some(p) =>
state match {
case s: SyncNodeState if !s.waitingForDisconnection.contains(p) =>
switchSyncToPeer(s, p).map(Some(_))
case s: SyncNodeState =>
logger.warn(
s"Ignoring sync request for peer=${p} as its waiting for disconnection")
Future.successful(Some(s))
case x @ (_: MisbehavingPeer | _: RemovePeers) =>
logger.warn(
s"Ignoring sync request for peer=${p} while we are in state=$x")
Future.successful(Some(x)) //ignore sync request?
case d: DoneSyncing =>
val h =
HeaderSync(p, d.peersWithServices, d.waitingForDisconnection)
syncFromNewPeer(h)
}
case None =>
state match {
case x @ (_: SyncNodeState | _: MisbehavingPeer |
_: RemovePeers) =>
//we are either syncing already, or we are in a bad state to start a sync
Future.successful(Some(x))
case d: DoneSyncing =>
state.randomPeer(Set.empty,
ServiceIdentifier.NODE_COMPACT_FILTERS) match {
case Some(p) =>
val h = HeaderSync(p,
d.peersWithServices,
d.waitingForDisconnection)
syncFromNewPeer(h)
case None =>
Future.successful(None)
}
}
}
nodeStateOptF.map {
case Some(ns) => ns
case None =>
logger.warn(
s"Cannot find a new peer to fulfill sync request, reverting to old state=$state")
state
}
case (state, i: InitializeDisconnect) =>
val client: PeerData = peerDataMap(i.peer)
_peerDataMap.remove(i.peer)
@ -544,7 +594,7 @@ case class PeerManager(
//disconnect the misbehaving peer
for {
_ <- disconnectPeer(m.badPeer)
_ <- syncFromNewPeer()
_ <- syncFromNewPeer(m)
} yield newDmh
case removePeers: RemovePeers =>
for {
@ -590,13 +640,19 @@ case class PeerManager(
case (state, HeaderTimeoutWrapper(peer)) =>
logger.debug(s"Processing timeout header for $peer")
for {
newDmh <- {
onHeaderRequestTimeout(peer, state).map { s =>
logger.debug(s"Done processing timeout header for $peer")
s
newState <- {
onHeaderRequestTimeout(peer, state).map {
case Some(s) => s
case None =>
//we don't have a state to represent no connected peers atm, so switch to DoneSyncing?
DoneSyncing(peersWithServices = Set.empty,
state.waitingForDisconnection)
}
}
} yield newDmh
} yield {
logger.debug(s"Done processing timeout header for $peer")
newState
}
case (state, DisconnectedPeer(peer, forceReconnect)) =>
onDisconnect(peer, forceReconnect, state)
case (state, i: InitializationTimeout) =>
@ -645,7 +701,7 @@ case class PeerManager(
oldSyncState match {
case s: HeaderSync =>
if (s.syncPeer != newPeer) {
syncHelper(syncPeerOpt = Some(newPeer)).map(_ => newState)
syncHelper(newPeer).map(_ => newState)
} else {
//if its same peer we don't need to switch
Future.successful(oldSyncState)
@ -698,7 +754,7 @@ case class PeerManager(
Future.unit
} else {
val fhs = FilterHeaderSync(syncPeer = syncPeer,
peers = peers,
peersWithServices = peersWithServices,
waitingForDisconnection = Set.empty)
syncFilters(
bestFilterHeaderOpt = bestFilterHeaderOpt,
@ -725,16 +781,16 @@ case class PeerManager(
*
* @param syncPeerOpt if syncPeer is given, we send [[org.bitcoins.core.p2p.GetHeadersMessage]] to that peer. If None we gossip GetHeadersMessage to all peers
*/
private def syncHelper(syncPeerOpt: Option[Peer]): Future[Unit] = {
private def syncHelper(syncPeer: Peer): Future[Unit] = {
logger.debug(
s"syncHelper() syncPeerOpt=$syncPeerOpt isStarted.get=${isStarted.get} syncFilterCancellableOpt.isDefined=${syncFilterCancellableOpt.isDefined}")
s"syncHelper() syncPeer=$syncPeer isStarted.get=${isStarted.get} syncFilterCancellableOpt.isDefined=${syncFilterCancellableOpt.isDefined}")
val chainApi: ChainApi = ChainHandler.fromDatabase()
val headerF = chainApi.getBestBlockHeader()
val filterHeaderCountF = chainApi.getFilterHeaderCount()
val filterCountF = chainApi.getFilterCount()
for {
_ <- chainApi.setSyncing(true)
_ <- getHeaderSyncHelper(syncPeerOpt)
_ <- getHeaderSyncHelper(Some(syncPeer))
_ = {
if (isStarted.get) {
//in certain cases, we can schedule this job while the peer manager is attempting to shutdown
@ -750,16 +806,8 @@ case class PeerManager(
case s: Some[Cancellable] =>
s //do nothing as we already have a job scheduled
case None =>
syncPeerOpt match {
case Some(syncPeer) =>
val c = createFilterSyncJob(chainApi, syncPeer)
Some(c)
case None =>
//no sync peer to schedule the job with
logger.warn(
s"Unable to createFilterSyncJob as we have no sync peer!")
None
}
val c = createFilterSyncJob(chainApi, syncPeer)
Some(c)
}
}
}
@ -768,7 +816,7 @@ case class PeerManager(
filterCount <- filterCountF
} yield {
logger.info(
s"Starting sync node, height=${header.height} hash=${header.hashBE.hex} filterHeaderCount=$filterHeaderCount filterCount=$filterCount peerOpt=$syncPeerOpt")
s"Starting sync node, height=${header.height} hash=${header.hashBE.hex} filterHeaderCount=$filterHeaderCount filterCount=$filterCount syncPeer=$syncPeer")
}
}
@ -900,12 +948,38 @@ case class PeerManager(
}
}
private def syncFromNewPeer(): Future[Option[Peer]] = {
val syncPeerOpt = randomPeerWithService(
ServiceIdentifier.NODE_COMPACT_FILTERS)
private def syncFromNewPeer(state: NodeState): Future[Option[NodeState]] = {
val svcIdentifier = ServiceIdentifier.NODE_COMPACT_FILTERS
val syncPeerOpt = state match {
case s: SyncNodeState =>
s.randomPeer(excludePeers = Set(s.syncPeer), svcIdentifier)
case m: MisbehavingPeer =>
m.randomPeer(excludePeers = Set(m.badPeer), svcIdentifier)
case rm: RemovePeers =>
rm.randomPeer(excludePeers = rm.peersToRemove.toSet, svcIdentifier)
case d: DoneSyncing =>
d.randomPeer(Set.empty, svcIdentifier)
}
for {
_ <- syncHelper(syncPeerOpt)
} yield syncPeerOpt
newStateOpt <- syncPeerOpt match {
case Some(syncPeer) =>
syncHelper(syncPeer).map { _ =>
val newState = state match {
case s: SyncNodeState =>
s.replaceSyncPeer(syncPeer)
case d: DoneSyncing =>
HeaderSync(syncPeer,
d.peersWithServices,
d.waitingForDisconnection)
case x @ (_: MisbehavingPeer | _: RemovePeers) => x
}
Some(newState)
}
case None => Future.successful(None)
}
} yield {
newStateOpt
}
}
/** Gossips the given message to all peers except the excluded peer. If None given as excluded peer, gossip message to all peers */
@ -986,9 +1060,10 @@ object PeerManager extends Logging {
.sendGetCompactFilterHeadersMessage(filterSyncMarker)
.map(_ =>
Some(
FilterHeaderSync(state.syncPeer,
state.peers,
state.waitingForDisconnection)))
FilterHeaderSync(syncPeer = state.syncPeer,
peersWithServices = state.peersWithServices,
waitingForDisconnection =
state.waitingForDisconnection)))
case None =>
logger.info(
s"Filter headers are synced! filterHeader.blockHashBE=$blockHash")

View File

@ -91,7 +91,7 @@ case class DataMessageHandler(
case m: MisbehavingPeer =>
val badPeer = m.badPeer
val peers = m.peers
val peersWithSvcs = m.peersWithServices
if (badPeer == peerData.peer) {
Future.failed(
new RuntimeException(
@ -99,17 +99,17 @@ case class DataMessageHandler(
} else {
//re-review this, we should probably pattern match on old state so we can continue syncing
//from where we left off?
val d = DoneSyncing(peers, m.waitingForDisconnection)
val d = DoneSyncing(peersWithSvcs, m.waitingForDisconnection)
copy(state = d).handleDataPayload(payload, peerData)
}
case r: RemovePeers =>
val badPeers = r.peersToRemove
val peers = r.peers
val peersWithSvcs = r.peersWithServices
if (badPeers.exists(_ == peerData.peer)) {
Future.failed(new RuntimeException(
s"Cannot continue processing p2p messages from peer we were suppose to remove, peer=${peerData.peer}"))
} else {
val d = DoneSyncing(peers, r.waitingForDisconnection)
val d = DoneSyncing(peersWithSvcs, r.waitingForDisconnection)
copy(state = d).handleDataPayload(payload, peerData)
}
@ -141,7 +141,9 @@ case class DataMessageHandler(
s"Got ${filterHeader.filterHashes.size} compact filter header hashes, state=$state")
val filterHeaderSync = state match {
case s @ (_: HeaderSync | _: DoneSyncing) =>
FilterHeaderSync(peer, s.peers, s.waitingForDisconnection)
FilterHeaderSync(peer,
s.peersWithServices,
s.waitingForDisconnection)
case filterHeaderSync: FilterHeaderSync => filterHeaderSync
case x @ (_: FilterSync | _: MisbehavingPeer | _: RemovePeers) =>
sys.error(
@ -160,7 +162,10 @@ case class DataMessageHandler(
val filterSyncState = state match {
case f: FilterSync => f
case s @ (_: DoneSyncing | _: FilterHeaderSync) =>
FilterSync(peer, s.peers, s.waitingForDisconnection, Set.empty)
FilterSync(peer,
s.peersWithServices,
s.waitingForDisconnection,
Set.empty)
case x @ (_: MisbehavingPeer | _: RemovePeers | _: HeaderSync) =>
sys.error(s"Incorrect state for handling filter messages, got=$x")
}
@ -214,7 +219,7 @@ case class DataMessageHandler(
case Some(filterSyncState) =>
filterSyncState.copy(filterBatchCache = newBatch)
case None =>
val d = DoneSyncing(filterSyncState.peers,
val d = DoneSyncing(filterSyncState.peersWithServices,
filterSyncState.waitingForDisconnection)
d
}
@ -281,8 +286,8 @@ case class DataMessageHandler(
case d: DoneSyncing =>
val s = if (count.toInt != 0) {
//why do we sometimes get empty HeadersMessage?
HeaderSync(peer, d.peers, d.waitingForDisconnection)
} else DoneSyncing(d.peers, d.waitingForDisconnection)
HeaderSync(peer, d.peersWithServices, d.waitingForDisconnection)
} else DoneSyncing(d.peersWithServices, d.waitingForDisconnection)
Some(s)
case headerSync: HeaderSync =>
if (headerSync.syncPeer == peer) {
@ -385,7 +390,7 @@ case class DataMessageHandler(
logger.info(
s"Starting to fetch filter headers in data message handler")
val fhs = FilterHeaderSync(syncNodeState.syncPeer,
syncNodeState.peers,
syncNodeState.peersWithServices,
syncNodeState.waitingForDisconnection)
for {
@ -397,7 +402,7 @@ case class DataMessageHandler(
stopBlockHeaderDb = bestBlockHeaderDb)
} yield {
syncingFilterHeadersState.getOrElse(
DoneSyncing(syncNodeState.peers,
DoneSyncing(syncNodeState.peersWithServices,
syncNodeState.waitingForDisconnection))
}
@ -412,7 +417,7 @@ case class DataMessageHandler(
//was ongoing, see: https://github.com/bitcoin-s/bitcoin-s/issues/5036
for {
bestBlockHash <- chainApi.getBestBlockHash()
d = DoneSyncing(syncNodeState.peers,
d = DoneSyncing(syncNodeState.peersWithServices,
syncNodeState.waitingForDisconnection)
newState <- {
peerManager
@ -441,7 +446,7 @@ case class DataMessageHandler(
s"$peer exceeded max limit of invalid messages. Disconnecting. peers=${state.peers}")
val m = MisbehavingPeer(badPeer = peer,
peers = state.peers,
peersWithServices = state.peersWithServices,
waitingForDisconnection =
state.waitingForDisconnection)
Future.successful(m)
@ -459,7 +464,8 @@ case class DataMessageHandler(
peerManager.gossipGetHeadersMessage(cachedHeaders)
//switch to DoneSyncing state until we receive a valid header from our peers
val d =
DoneSyncing(state.peers, state.waitingForDisconnection)
DoneSyncing(state.peersWithServices,
state.waitingForDisconnection)
queryF.map(_ => d)
}
} yield newState
@ -508,7 +514,7 @@ case class DataMessageHandler(
if (isSyncing) {
val newState = NodeState.FilterSync(
syncPeer = fs.syncPeer,
peers = fs.peers,
peersWithServices = fs.peersWithServices,
waitingForDisconnection = fs.waitingForDisconnection,
filterBatchCache = fs.filterBatchCache)
Some(newState)
@ -525,7 +531,10 @@ case class DataMessageHandler(
val fs = syncNodeState match {
case x @ (_: HeaderSync | _: FilterHeaderSync) =>
FilterSync(x.syncPeer, x.peers, x.waitingForDisconnection, Set.empty)
FilterSync(x.syncPeer,
x.peersWithServices,
x.waitingForDisconnection,
Set.empty)
case fs: FilterSync => fs
}
@ -707,7 +716,8 @@ case class DataMessageHandler(
case Some(s) => s
case None =>
//is this right? If we don't send cfheaders to our peers, are we done syncing?
DoneSyncing(state.peers, state.waitingForDisconnection)
DoneSyncing(state.peersWithServices,
state.waitingForDisconnection)
}
}
} else {
@ -809,7 +819,7 @@ case class DataMessageHandler(
case Some(filterSyncState) => filterSyncState
case None =>
val d =
DoneSyncing(filterHeaderSync.peers,
DoneSyncing(filterHeaderSync.peersWithServices,
filterHeaderSync.waitingForDisconnection)
d
}