Move away from suredbits provided peers by default (#5390)

* Move away from suredbits provided peers by default

* Remove setting bitcoin-s.node.use-default-peers

* Don't allow peer to stay connected if it doesn't support compact filters

* Move disconnection logic into managePeerAfterInitialization()

* Remove reconnect() when we have no other peers, send getaddr message after we know we are keeping the peer connection

* Remove hasCf flag check, revert log

* Reduce inactivity-timeout to 5 minutes by default, shuffle peers in PeerFinder beforing pushing onto stack

* Add logic to disconnect AttemptToConnect peers after addr mesage is received

* Create PeerFinder.queryForPeerConnections(), call it directly from PeerManager on health checks rather than restarting PeerFinder

* scalafmt

* Move more logic into managePeerAfterInitalization()

* Don't set isPersistent for db peers by default

* Add call to queryForPeerConnections() inside of PeerManager.onDisconnect() to try to attempt to more peers when we have no more connections, fix MisBehavingPeer logic to not start sync, starting sync should happen in onDisconnect()

* Revert logback-test.xml

* Sort peers we are attempting to connect to by lastSeen parameter

* Refactor DoneSyncing.toHeaderSync() to return Option[HeaderSync] to represent the case where we don't have a peer to sync with

* scalafmt

* Remove duplicate PeerFinder.buildPeerData()
This commit is contained in:
Chris Stewart 2024-02-20 12:12:57 -06:00 committed by GitHub
parent caeb8d559c
commit 3177ee405f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 144 additions and 148 deletions

View file

@ -49,7 +49,7 @@ bitcoin-s {
# }
# enable peer discovery on the p2p network by default
enable-peer-discovery = false
enable-peer-discovery = true
# time interval for trying next set of peers in peer discovery
try-peers-interval = 12 hour

View file

@ -151,11 +151,7 @@ bitcoin-s {
# a list of peer addresses in form "hostname:portnumber"
# Port number is optional, the default value is 8333 for mainnet,
# 18333 for testnet and 18444 for regtest.
# by default we provide a testnet peer to connect to
peers = ["neutrino.testnet3.suredbits.com:18333"]
# use the defauls suredbits neutrino node as a peer
use-default-peers = true
peers = [""]
# try to connect to peers from dns seeds, database, addr messages etc
enable-peer-discovery = true
@ -185,7 +181,7 @@ bitcoin-s {
# if a node doesn't send a message in this time frame
# we disconnect them and try to connect to a new peer
inactivity-timeout = 20 minutes
inactivity-timeout = 5 minutes
}
proxy {

View file

@ -202,7 +202,7 @@ case class NeutrinoNode(
private def inactivityChecksRunnable(): Runnable = { () =>
val peers = peerManager.peers
logger.info(s"Running inactivity checks for peers=${peers}")
val resultF = if (peers.nonEmpty) {
val resultF = if (peers.nonEmpty || isStarted.get) {
queueOpt match {
case Some(q) =>
q.offer(NodeStreamMessage.PeerHealthCheck)
@ -211,11 +211,6 @@ case class NeutrinoNode(
logger.warn(s"No queue defined for inactivity check")
Future.unit
}
} else if (isStarted.get) {
//stop and restart to get more peers
stop()
.flatMap(_.start())
.map(_ => ())
} else {
start().map(_ => ())
}

View file

@ -201,16 +201,13 @@ object NodeState {
extends NodeRunningState {
override val isSyncing: Boolean = false
/** selects a random peer and returns us a header sync state */
def toHeaderSync: HeaderSync = {
/** Selects a random peer and returns us a header sync state
* returns None if we don't have a peer ot sync with
*/
def toHeaderSync: Option[HeaderSync] = {
val syncPeerOpt =
randomPeer(Set.empty, ServiceIdentifier.NODE_COMPACT_FILTERS)
syncPeerOpt match {
case Some(p) => toHeaderSync(p)
case None =>
sys.error(
s"Could not find a peer to transition from DoneSyncing -> HeaderSync")
}
syncPeerOpt.map(toHeaderSync)
}
def toHeaderSync(syncPeer: Peer): HeaderSync = {

View file

@ -48,6 +48,4 @@ object NodeStreamMessage {
/** Checks our peers are healthy, for instance checking that we are peered with compact filter peers */
case object PeerHealthCheck extends NodeStreamMessage
case class Initialized(peer: Peer)
}

View file

@ -127,8 +127,8 @@ case class PeerManager(
private def replacePeer(replacePeer: Peer, withPeer: Peer): Future[Unit] = {
logger.debug(s"Replacing $replacePeer with $withPeer")
assert(!peerDataMap(replacePeer).serviceIdentifier.nodeCompactFilters,
s"$replacePeer has cf")
require(!peerDataMap(replacePeer).serviceIdentifier.nodeCompactFilters,
s"$replacePeer has cf")
for {
_ <- disconnectPeer(replacePeer)
_ <- connectPeer(withPeer)
@ -220,49 +220,48 @@ case class PeerManager(
/** Helper method to determine what action to take after a peer is initialized, such as beginning sync with that peer */
private def managePeerAfterInitialization(
finder: PeerFinder,
peerData: PeerData,
hasCf: Boolean): Future[Unit] = {
val peer = peerData.peer
state: NodeRunningState,
peer: Peer): Future[NodeRunningState] = {
val curPeerDataOpt = state.peerFinder.getPeerData(peer)
require(curPeerDataOpt.isDefined,
s"Could not find peer=$peer in PeerFinder!")
val peerData = curPeerDataOpt.get
val hasCf = peerData.serviceIdentifier.nodeCompactFilters
val notCfPeers = peerDataMap
.filter(p => !p._2.serviceIdentifier.nodeCompactFilters)
.keys
val availableFilterSlot = hasCf && notCfPeers.nonEmpty
val hasConnectionSlot = connectedPeerCount < nodeAppConfig.maxConnectedPeers
if (hasConnectionSlot || availableFilterSlot) {
//we want to promote this peer, so pop from cache
val _ = state.peerFinder.popFromCache(peer)
val persistentPeerData = peerData match {
case p: PersistentPeerData => p
case a: AttemptToConnectPeerData => a.toPersistentPeerData
}
_peerDataMap.put(peer, persistentPeerData)
peerData match {
case _: PersistentPeerData =>
//if we have slots remaining, connect
if (connectedPeerCount < nodeAppConfig.maxConnectedPeers) {
connectPeer(peer)
} else {
val notCf = peerDataMap
.filter(p => !p._2.serviceIdentifier.nodeCompactFilters)
.keys
//try to drop another non compact filter connection for this
if (hasCf && notCf.nonEmpty)
replacePeer(replacePeer = notCf.head, withPeer = peer)
else {
peerData
.stop()
.map(_ => ())
}
}
case q: AttemptToConnectPeerData =>
if (finder.hasPeer(q.peer)) {
//if we still have an active connection with this peer, stop it
q.stop().map(_ => ())
} else {
//else it already has been deleted because of connection issues
Future.unit
}
val peerWithSvcs = persistentPeerData.peerWithServicesOpt.get
val newPdm =
state.peerDataMap.+((peerWithSvcs, persistentPeerData))
val newState = state.replacePeers(newPdm)
if (availableFilterSlot) {
replacePeer(replacePeer = notCfPeers.head, withPeer = peer)
.map(_ => newState)
} else {
connectPeer(peer).map(_ => newState)
}
} else {
Future.successful(state)
}
}
private def onInitialization(
peer: Peer,
state: NodeRunningState): Future[NodeState] = {
val finder = state.peerFinder
val stateF: Future[NodeRunningState] = {
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
s"$peer cannot be both a test and a persistent peer")
//this assumes neutrino and checks for compact filter support so should not be called for anything else
require(nodeAppConfig.nodeType == NodeType.NeutrinoNode,
@ -271,18 +270,20 @@ case class PeerManager(
if (finder.hasPeer(peer)) {
//one of the peers we tries got initialized successfully
val peerData = finder.getPeerData(peer).get
val peerMsgSender = PeerMessageSender(peerData.peerConnection)
val serviceIdentifer = peerData.serviceIdentifier
val hasCf = serviceIdentifer.nodeCompactFilters
logger.debug(s"Initialized peer $peer with $hasCf")
for {
_ <- peerMsgSender.sendGetAddrMessage()
_ <- peerData.peerMessageSender.sendGetAddrMessage()
_ <- createInDb(peer, peerData.serviceIdentifier)
_ <- managePeerAfterInitialization(finder = finder,
peerData = peerData,
hasCf = hasCf)
} yield state
newState <- managePeerAfterInitialization(state, peer)
} yield {
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
s"$peer cannot be both a test and a persistent peer")
logger.debug(
s"Initialized peer $peer with compactFilter support=$hasCf")
newState
}
} else if (peerDataMap.contains(peer)) {
//one of the persistent peers initialized again, this can happen in case of a reconnection attempt
@ -323,7 +324,7 @@ case class PeerManager(
val finder = state.peerFinder
val _ = onDisconnectSyncFiltersJob(peer)
val updateLastSeenF = PeerDAO().updateLastSeenTime(peer)
val stateF = {
val stateF: Future[NodeState] = {
require(!finder.hasPeer(peer) || !peerDataMap.contains(peer),
s"$peer cannot be both a test and a persistent peer")
@ -338,8 +339,14 @@ case class PeerManager(
case s: SyncNodeState => switchSyncToRandomPeer(s, Some(peer))
case d: DoneSyncing =>
//defensively try to sync with the new peer
val hs = d.toHeaderSync
syncHelper(hs).map(_ => hs)
val hsOpt = d.toHeaderSync
hsOpt match {
case Some(hs) => syncHelper(hs).map(_ => hs)
case None =>
//no peers available to sync with, so return DoneSyncing
Future.successful(d)
}
case x @ (_: DoneSyncing | _: NodeShuttingDown |
_: MisbehavingPeer | _: RemovePeers) =>
Future.successful(x)
@ -348,7 +355,18 @@ case class PeerManager(
} else {
if (forceReconnect && !isShuttingDown) {
finder.reconnect(peer).map(_ => state)
} else if (!isShuttingDown) {
logger.info(
s"No new peers to connect to, querying for new connections... state=${state} peers=$peers")
finder.queryForPeerConnections(Set(peer)) match {
case Some(_) => Future.successful(state)
case None =>
logger.debug(
s"Could not query for more peer connections as previous job is still running")
Future.successful(state)
}
} else {
//if shutting down, do nothing
Future.successful(state)
}
}
@ -533,43 +551,31 @@ case class PeerManager(
Future.successful(s)
case runningState: NodeRunningState =>
val peer = c.peer
val curPeerDataOpt = runningState.peerFinder.popFromCache(peer)
curPeerDataOpt match {
case None =>
val connectF = runningState.peerFinder.connect(c.peer)
connectF.map(_ => runningState)
case Some(curPeerData) =>
val persistent = curPeerData match {
case a: AttemptToConnectPeerData => a.toPersistentPeerData
case p: PersistentPeerData => p
}
_peerDataMap.put(peer, persistent)
val hasCf =
if (curPeerData.serviceIdentifier.nodeCompactFilters)
"with filters"
else ""
val peerWithSvcs = curPeerData.peerWithServicesOpt.get
val newPdm =
runningState.peerDataMap.+((peerWithSvcs, persistent))
val replacePeers = runningState.replacePeers(newPdm)
logger.info(
s"Connected to peer $peer $hasCf. Connected peer count ${replacePeers.peerDataMap.size}")
replacePeers match {
case s: SyncNodeState =>
syncHelper(s).map(_ => s)
case d: DoneSyncing =>
val x = d.toHeaderSync(c.peer)
syncHelper(x).map(_ => x)
case x @ (_: MisbehavingPeer | _: RemovePeers |
_: NodeShuttingDown) =>
Future.successful(x)
}
val isConnectedAlready = runningState.isConnected(peer)
if (!isConnectedAlready) {
val connectF = runningState.peerFinder.connect(c.peer)
connectF.map(_ => runningState)
} else {
val hasCf = runningState.peerDataMap
.filter(_._1.peer == peer)
.headOption match {
case Some(p) => p._1.services.nodeCompactFilters
case None => false
}
logger.info(
s"Connected to peer $peer with compact filter support=$hasCf. Connected peer count ${runningState.peerDataMap.size}")
state match {
case s: SyncNodeState =>
syncHelper(s).map(_ => s)
case d: DoneSyncing =>
val x = d.toHeaderSync(c.peer)
syncHelper(x).map(_ => x)
case x @ (_: MisbehavingPeer | _: RemovePeers |
_: NodeShuttingDown) =>
Future.successful(x)
}
}
}
case (state, i: InitializeDisconnect) =>
state match {
case r: NodeRunningState =>
@ -624,15 +630,17 @@ case class PeerManager(
//disconnect the misbehaving peer
for {
_ <- disconnectPeer(m.badPeer)
} yield newDmh.state
} yield {
runningState
}
case removePeers: RemovePeers =>
for {
_ <- Future.traverse(removePeers.peers)(
disconnectPeer)
} yield newDmh.state
case _: SyncNodeState | _: DoneSyncing |
_: NodeShuttingDown =>
Future.successful(newDmh.state)
case x @ (_: SyncNodeState | _: DoneSyncing |
_: NodeShuttingDown) =>
Future.successful(x)
}
}
resultF.map { r =>
@ -646,7 +654,7 @@ case class PeerManager(
case (state, ControlMessageWrapper(payload, peer)) =>
state match {
case runningState: NodeRunningState =>
val peerMsgSenderApiOpt: Option[PeerMessageSenderApi] =
val peerMsgSenderApiOpt: Option[PeerMessageSenderApi] = {
runningState.getPeerMsgSender(peer) match {
case Some(p) => Some(p)
case None =>
@ -655,14 +663,28 @@ case class PeerManager(
case None => None
}
}
}
peerMsgSenderApiOpt match {
case Some(peerMsgSenderApi) =>
val resultOptF = runningState.peerFinder.controlMessageHandler
.handleControlPayload(payload,
peerMsgSenderApi = peerMsgSenderApi)
resultOptF.flatMap {
case Some(i) =>
case Some(i: ControlMessageHandler.Initialized) =>
onInitialization(i.peer, runningState)
case Some(ControlMessageHandler.ReceivedAddrMessage) =>
if (runningState.peerFinder.hasPeer(peer)) {
//got to disconnect it since it hasn't been promoted to a persistent peer
runningState.peerFinder.getPeerData(peer) match {
case Some(pd: AttemptToConnectPeerData) =>
pd.stop().map(_ => runningState)
case None | Some(_: PersistentPeerData) =>
Future.successful(runningState)
}
} else {
//do nothing as its a persistent peer
Future.successful(runningState)
}
case None =>
Future.successful(state)
}
@ -1352,8 +1374,8 @@ object PeerManager extends Logging {
}
}
def handleHealthCheck(runningState: NodeRunningState)(implicit
ec: ExecutionContext): Future[NodeRunningState] = {
def handleHealthCheck(
runningState: NodeRunningState): Future[NodeRunningState] = {
val blockFilterPeers = runningState.peerDataMap.filter(
_._2.serviceIdentifier.hasServicesOf(
ServiceIdentifier.NODE_COMPACT_FILTERS))
@ -1362,10 +1384,8 @@ object PeerManager extends Logging {
Future.successful(runningState)
} else {
val peerFinder = runningState.peerFinder
for {
_ <- peerFinder.stop()
_ <- peerFinder.start()
} yield runningState
peerFinder.queryForPeerConnections(excludePeers = Set.empty)
Future.successful(runningState)
}
}
}

View file

@ -6,7 +6,6 @@ import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.CallbackConfig
import org.bitcoins.core.api.node.{NodeType, Peer}
import org.bitcoins.core.api.tor.Socks5ProxyParams
import org.bitcoins.core.config.{MainNet, RegTest, SigNet, TestNet3}
import org.bitcoins.core.util.TimeUtil
import org.bitcoins.db.{DbAppConfig, JdbcProfileComponent}
import org.bitcoins.node._
@ -96,28 +95,15 @@ case class NodeAppConfig(baseDatadir: Path, configOverrides: Vector[Config])(
lazy val nodeType: NodeType =
NodeType.fromString(config.getString("bitcoin-s.node.mode"))
/** List of peers
*/
/** List of peers hardcoded in our configuration files */
lazy val peers: Vector[Peer] = {
val list = config.getStringList("bitcoin-s.node.peers")
val strs = 0
.until(list.size())
.foldLeft(Vector.empty[String])((acc, i) => acc :+ list.get(i))
val result = strs.map(_.replace("localhost", "127.0.0.1"))
val strPeers = if (result.isEmpty && useDefaultPeers) {
logger.info(
s"No peers found in configuration, resorting to default peers")
network match {
case MainNet => Vector("neutrino.suredbits.com:8333")
case TestNet3 => Vector("neutrino.testnet3.suredbits.com:18333")
case n @ (RegTest | SigNet) =>
sys.error(s"Cannot configure any peers by default on $n")
}
} else {
result
}
BitcoinSNodeUtil.stringsToPeers(strPeers)(this)
BitcoinSNodeUtil.stringsToPeers(result)(this)
}
lazy val torConf: TorAppConfig =
@ -155,12 +141,6 @@ case class NodeAppConfig(baseDatadir: Path, configOverrides: Vector[Config])(
else Duration.ofMinutes(10)
}
lazy val useDefaultPeers: Boolean = {
if (config.hasPath("bitcoin-s.node.use-default-peers"))
config.getBoolean("bitcoin-s.node.use-default-peers")
else true
}
/** timeout for tcp connection in P2PClient */
lazy val connectionTimeout: FiniteDuration = {
if (config.hasPath("bitcoin-s.node.connection-timeout")) {
@ -213,7 +193,7 @@ case class NodeAppConfig(baseDatadir: Path, configOverrides: Vector[Config])(
if (config.hasPath("bitcoin-s.node.inactivity-timeout")) {
val duration = config.getDuration("bitcoin-s.node.inactivity-timeout")
TimeUtil.durationToFiniteDuration(duration)
} else 20.minute
} else 5.minute
}
/** Creates either a neutrino node or a spv node based on the [[NodeAppConfig]] given */

View file

@ -3,10 +3,10 @@ package org.bitcoins.node.networking.peer
import org.bitcoins.core.api.node.Peer
import org.bitcoins.core.p2p._
import org.bitcoins.core.util.NetworkUtil
import org.bitcoins.node.NodeStreamMessage.Initialized
import org.bitcoins.node.config.NodeAppConfig
import org.bitcoins.node.networking.peer.ControlMessageHandler.ControlMessageHandlerState
import org.bitcoins.node.util.PeerMessageSenderApi
import org.bitcoins.node.{NodeStreamMessage, P2PLogger, PeerFinder}
import org.bitcoins.node.{P2PLogger, PeerFinder}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
@ -18,7 +18,8 @@ case class ControlMessageHandler(peerFinder: PeerFinder)(implicit
def handleControlPayload(
payload: ControlPayload,
peerMsgSenderApi: PeerMessageSenderApi): Future[Option[Initialized]] = {
peerMsgSenderApi: PeerMessageSenderApi): Future[
Option[ControlMessageHandlerState]] = {
val peer = peerMsgSenderApi.peer
payload match {
@ -30,7 +31,7 @@ case class ControlMessageHandler(peerFinder: PeerFinder)(implicit
.map(_ => None)
case VerAckMessage =>
val i = NodeStreamMessage.Initialized(peer)
val i = ControlMessageHandler.Initialized(peer)
Future.successful(Some(i))
case ping: PingMessage =>
peerMsgSenderApi
@ -44,7 +45,7 @@ case class ControlMessageHandler(peerFinder: PeerFinder)(implicit
.map(_ => None)
case msg: GossipAddrMessage =>
handleGossipAddrMessage(msg)
Future.successful(None)
Future.successful(Some(ControlMessageHandler.ReceivedAddrMessage))
case SendAddrV2Message =>
peerMsgSenderApi
.sendSendAddrV2Message()
@ -103,3 +104,10 @@ case class ControlMessageHandler(peerFinder: PeerFinder)(implicit
}
}
}
object ControlMessageHandler {
sealed trait ControlMessageHandlerState
case class Initialized(peer: Peer) extends ControlMessageHandlerState
case object ReceivedAddrMessage extends ControlMessageHandlerState
}

View file

@ -61,9 +61,11 @@ bitcoin-s {
# user = postgres
# password = ""
# }
inactivity-timeout = 20 minutes
inactivity-timeout = 5 minutes
try-peers-start-delay = 1 second
enable-peer-discovery = false
}

View file

@ -48,7 +48,7 @@ object BitcoinSTestAppConfig {
| node {
| mode = neutrino
| relay = true
| use-default-peers = false
| enable-peer-discovery = false
| }
| wallet {
| allowExternalDLCAddresses = true
@ -79,7 +79,7 @@ object BitcoinSTestAppConfig {
| node {
| mode = neutrino
| relay = true
| use-default-peers = false
| enable-peer-discovery = false
| }
| proxy.enabled = $torEnabled
| tor.enabled = $torEnabled
@ -113,7 +113,7 @@ object BitcoinSTestAppConfig {
| mode = neutrino
| relay = true
| maxConnectedPeers = 8
| use-default-peers = false
| enable-peer-discovery = false
| }
| proxy.enabled = $torEnabled
| tor.enabled = $torEnabled