2024 01 28 Refactor PeerFinder.start() to avoid initializing connections (#5376)

* Conslidate PeerFinder.start() logic into peerConnectionScheduler, add bitcoins-s.node.try-peers-start-delay config settig to indicate how long until we start attempting to connect to peers in PeerFinder

* Cleanup

* Revert logback-test.xml

* Add documentation

* Empty commit to run CI

* Rework NodeAppConfig.peers to return Vector[Peer] rather than Vector[String]

* Add higher priority to paramPeers
This commit is contained in:
Chris Stewart 2024-01-29 09:46:31 -06:00 committed by GitHub
parent f01987160d
commit 76e468c5c4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 69 additions and 46 deletions

View file

@ -173,7 +173,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
//get a node that isn't started
val nodeF = nodeConf.createNode(
peers = Vector.empty,
peers = nodeConf.peers,
walletCreationTimeOpt = Some(creationTime))(chainConf, system)
val defaultApi =

View file

@ -49,10 +49,13 @@ bitcoin-s {
# }
# enable peer discovery on the p2p network by default
enable-peer-discovery = true
enable-peer-discovery = false
# time interval for trying next set of peers in peer discovery
try-peers-interval = 12 hour
# the delay until we start attempting to connect to peers
try-peers-start-delay = 1 second
}
# You can configure SOCKS5 proxy to use Tor for outgoing connections

View file

@ -169,7 +169,10 @@ bitcoin-s {
# initialization timeout once connected, reconnections resets this
initialization-timeout = 10s
# time interval for trying next set of peers in peer discovery
try-peers-interval = 1 hour
try-peers-interval = 12 hour
# the delay until we start attempting to connect to peers
try-peers-start-delay = 1 second
# wait time for queries like getheaders etc before switching to another
query-wait-time = 120s

View file

@ -1,7 +1,7 @@
package org.bitcoins.node
import akka.actor.{ActorSystem, Cancellable}
import akka.stream.scaladsl.{SourceQueue}
import akka.stream.scaladsl.SourceQueue
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.chain.config.ChainAppConfig
import org.bitcoins.core.api.node.Peer
@ -14,6 +14,7 @@ import org.bitcoins.node.networking.peer.{
PeerConnection,
PeerMessageSender
}
import org.bitcoins.node.util.BitcoinSNodeUtil
import java.net.{InetAddress, UnknownHostException}
import java.util.concurrent.atomic.AtomicBoolean
@ -55,7 +56,7 @@ case class PeerFinder(
})
.distinct
.map(_.getHostAddress)
stringsToPeers(addresses.toVector)
BitcoinSNodeUtil.stringsToPeers(addresses.toVector)
}
/** Returns peers from hardcoded addresses taken from https://github.com/bitcoin/bitcoin/blob/master/contrib/seeds/nodes_main.txt */
@ -65,7 +66,7 @@ case class PeerFinder(
.getLines()
.toVector
.filter(nodeAppConfig.torConf.enabled || !_.contains(".onion"))
val peers = stringsToPeers(addresses)
val peers = BitcoinSNodeUtil.stringsToPeers(addresses)
Random.shuffle(peers)
}
@ -93,30 +94,9 @@ case class PeerFinder(
* case it returns those.
*/
private def getPeersFromConfig: Vector[Peer] = {
val addresses = nodeAppConfig.peers.filter(
nodeAppConfig.torConf.enabled || !_.contains(".onion"))
val peers = stringsToPeers(addresses)
logger.debug(s"Config peers: $peers")
peers
}
private def getPeersFromParam: Vector[Peer] = {
logger.debug(s"Param peers: $paramPeers")
paramPeers
}
private def stringsToPeers(addresses: Vector[String]): Vector[Peer] = {
val formatStrings = addresses.map { s =>
//assumes strings are valid, todo: add util functions to check fully for different addresses
if (s.count(_ == ':') > 1 && s(0) != '[') //ipv6
"[" + s + "]"
else s
}
val inetSockets = formatStrings.map(
NetworkUtil.parseInetSocketAddress(_, nodeAppConfig.network.port))
val peers =
inetSockets.map(Peer.fromSocket(_, nodeAppConfig.socks5ProxyParams))
peers
val addresses = nodeAppConfig.peers.filter(p =>
nodeAppConfig.torConf.enabled || !p.toString.contains(".onion"))
addresses
}
//for the peers we try
@ -126,9 +106,9 @@ case class PeerFinder(
private val _peersToTry: PeerStack = PeerStack()
private val maxPeerSearchCount: Int = 8
private val maxPeerSearchCount: Int = 2
private val initialDelay: FiniteDuration = 12.hour
private val initialDelay: FiniteDuration = nodeAppConfig.tryPeersStartDelay
private val isConnectionSchedulerRunning = new AtomicBoolean(false)
@ -137,14 +117,17 @@ case class PeerFinder(
initialDelay = initialDelay,
delay = nodeAppConfig.tryNextPeersInterval) { () =>
{
logger.info(s"Running peerConnectionScheduler")
if (isConnectionSchedulerRunning.compareAndSet(false, true)) {
logger.info(s"Querying p2p network for peers...")
logger.debug(s"Cache size: ${_peerData.size}. ${_peerData.keys}")
if (_peersToTry.size < maxPeerSearchCount)
_peersToTry.pushAll(getPeersFromDnsSeeds)
//in case of less _peersToTry.size than maxPeerSearchCount
val max = Math.min(maxPeerSearchCount, _peersToTry.size)
val peers = (
1.to(maxPeerSearchCount)
0.until(max)
.map(_ => _peersToTry.pop()))
.distinct
.filterNot(p => skipPeers().contains(p) || _peerData.contains(p))
@ -167,14 +150,16 @@ case class PeerFinder(
}
override def start(): Future[PeerFinder] = {
logger.debug(s"Starting PeerFinder")
logger.info(
s"Starting PeerFinder initialDelay=${initialDelay} paramPeers=$paramPeers")
val start = System.currentTimeMillis()
isStarted.set(true)
val peersToTry = (getPeersFromParam ++ getPeersFromConfig).distinct
val initPeerF = Future.traverse(peersToTry)(tryPeer)
val peersToTry = (paramPeers ++ getPeersFromConfig).distinct
//higher priority for param peers
_peersToTry.pushAll(peersToTry, priority = 2)
val peerDiscoveryF = if (nodeAppConfig.enablePeerDiscovery) {
val startedF = for {
_ <- initPeerF
(dbNonCf, dbCf) <- getPeersFromDb
} yield {
_peersToTry.pushAll(getPeersFromDnsSeeds)
@ -189,13 +174,14 @@ case class PeerFinder(
startedF
} else {
logger.info("Peer discovery disabled.")
initPeerF.map(_ => this)
peerConnectionScheduler //start scheduler
Future.successful(this)
}
for {
_ <- initPeerF
peerFinder <- peerDiscoveryF
_ = logger.info(s"Done starting PeerFinder")
_ = logger.info(
s"Done starting PeerFinder, it took ${System.currentTimeMillis() - start}ms")
} yield peerFinder
}

View file

@ -12,10 +12,11 @@ import org.bitcoins.db.{DbAppConfig, JdbcProfileComponent}
import org.bitcoins.node._
import org.bitcoins.node.callback.NodeCallbackStreamManager
import org.bitcoins.node.db.NodeDbManagement
import org.bitcoins.node.util.BitcoinSNodeUtil
import org.bitcoins.rpc.config.BitcoindRpcAppConfig
import org.bitcoins.rpc.util.AppConfigFactoryActorSystem
import org.bitcoins.tor.config.TorAppConfig
import org.bitcoins.tor.TorParams
import org.bitcoins.tor.config.TorAppConfig
import java.nio.file.Path
import java.time.{Duration, Instant}
@ -97,13 +98,13 @@ case class NodeAppConfig(baseDatadir: Path, configOverrides: Vector[Config])(
/** List of peers
*/
lazy val peers: Vector[String] = {
lazy val peers: Vector[Peer] = {
val list = config.getStringList("bitcoin-s.node.peers")
val strs = 0
.until(list.size())
.foldLeft(Vector.empty[String])((acc, i) => acc :+ list.get(i))
val result = strs.map(_.replace("localhost", "127.0.0.1"))
if (result.isEmpty && useDefaultPeers) {
val strPeers = if (result.isEmpty && useDefaultPeers) {
logger.info(
s"No peers found in configuration, resorting to default peers")
network match {
@ -115,6 +116,8 @@ case class NodeAppConfig(baseDatadir: Path, configOverrides: Vector[Config])(
} else {
result
}
BitcoinSNodeUtil.stringsToPeers(strPeers)(this)
}
lazy val torConf: TorAppConfig =
@ -174,6 +177,15 @@ case class NodeAppConfig(baseDatadir: Path, configOverrides: Vector[Config])(
} else 10.seconds
}
lazy val tryPeersStartDelay: FiniteDuration = {
if (config.hasPath("bitcoin-s.node.try-peers-start-delay")) {
val duration = config.getDuration("bitcoin-s.node.try-peers-start-delay")
TimeUtil.durationToFiniteDuration(duration)
} else {
30.seconds
}
}
/** time interval for trying next set of peers in peer discovery */
lazy val tryNextPeersInterval: FiniteDuration = {
if (config.hasPath("bitcoin-s.node.try-peers-interval")) {
@ -205,9 +217,7 @@ case class NodeAppConfig(baseDatadir: Path, configOverrides: Vector[Config])(
}
/** Creates either a neutrino node or a spv node based on the [[NodeAppConfig]] given */
def createNode(
peers: Vector[Peer] = Vector.empty[Peer],
walletCreationTimeOpt: Option[Instant])(
def createNode(peers: Vector[Peer], walletCreationTimeOpt: Option[Instant])(
chainConf: ChainAppConfig,
system: ActorSystem): Future[Node] = {
NodeAppConfig.createNode(peers, walletCreationTimeOpt)(this,

View file

@ -1,5 +1,9 @@
package org.bitcoins.node.util
import org.bitcoins.core.api.node.Peer
import org.bitcoins.core.util.NetworkUtil
import org.bitcoins.node.config.NodeAppConfig
import scala.util.Random
object BitcoinSNodeUtil {
@ -18,4 +22,19 @@ object BitcoinSNodeUtil {
*/
def createActorName(className: Class[_]): String =
createActorName(className.getSimpleName)
def stringsToPeers(addresses: Vector[String])(implicit
nodeAppConfig: NodeAppConfig): Vector[Peer] = {
val formatStrings = addresses.map { s =>
//assumes strings are valid, todo: add util functions to check fully for different addresses
if (s.count(_ == ':') > 1 && s(0) != '[') //ipv6
"[" + s + "]"
else s
}
val inetSockets = formatStrings.map(
NetworkUtil.parseInetSocketAddress(_, nodeAppConfig.network.port))
val peers =
inetSockets.map(Peer.fromSocket(_, nodeAppConfig.socks5ProxyParams))
peers
}
}

View file

@ -62,6 +62,8 @@ bitcoin-s {
# password = ""
# }
inactivity-timeout = 20 minutes
try-peers-start-delay = 1 second
}