Remove awaitPeerWithServices() (#5093)

* Remove awaitPeerWithServices()

* Empty commit to run CI

* Rework Node.sync() to return Future[Option[Peer]] rather than Future[Unit]. This returns the peer we are syncing with, if we could find one to sync with

* Turn logging OFF again

* Empty commit to re-run CI

* Use AsyncUtil.retryUntilSatisfied() when calling node.sync() after starting node to make sure we have a peer to sync from in a test case

* Await on re-started node not stale reference in NeutrinoNodeWithWalletTest

* Fix second reference

* Empty commit to re-run CI
This commit is contained in:
Chris Stewart 2023-06-07 04:47:39 -05:00 committed by GitHub
parent 4fd7af04ca
commit abeaaa05de
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 78 additions and 160 deletions

View file

@ -299,7 +299,7 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
_ <- bitcoind.generate(1) _ <- bitcoind.generate(1)
//restart the node //restart the node
_ <- node.start() _ <- node.start()
_ <- node.sync() _ <- AsyncUtil.retryUntilSatisfiedF(() => node.sync().map(_.isDefined))
//await for us to sync compact filter headers filters //await for us to sync compact filter headers filters
//the sync process should get kicked off after we see the //the sync process should get kicked off after we see the
//newly mined block header //newly mined block header

View file

@ -240,7 +240,7 @@ class NeutrinoNodeWithWalletTest extends NodeTestWithCachedBitcoindNewest {
//restart the node now that we have received funds //restart the node now that we have received funds
startedNode <- stoppedNode.start() startedNode <- stoppedNode.start()
_ <- startedNode.sync() _ <- startedNode.sync()
_ <- NodeTestUtil.awaitSync(node = node, rpc = bitcoind) _ <- NodeTestUtil.awaitSync(node = startedNode, rpc = bitcoind)
_ <- AsyncUtil.retryUntilSatisfiedF(() => { _ <- AsyncUtil.retryUntilSatisfiedF(() => {
for { for {
balance <- wallet.getBalance() balance <- wallet.getBalance()
@ -278,8 +278,9 @@ class NeutrinoNodeWithWalletTest extends NodeTestWithCachedBitcoindNewest {
//bring node back online //bring node back online
startedNode <- stoppedNode.start() startedNode <- stoppedNode.start()
_ <- startedNode.sync() _ <- AsyncUtil.retryUntilSatisfiedF(() =>
_ <- NodeTestUtil.awaitSync(node, bitcoind) startedNode.sync().map(_.isDefined))
_ <- NodeTestUtil.awaitSync(startedNode, bitcoind)
balanceAfterSpend <- wallet.getBalance() balanceAfterSpend <- wallet.getBalance()
} yield { } yield {
assert(balanceAfterSpend < initBalance) assert(balanceAfterSpend < initBalance)

View file

@ -1,60 +0,0 @@
package org.bitcoins.node
import com.typesafe.config.ConfigFactory
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.node.{NodeTestWithCachedBitcoindNoP2pBlockFilters}
import org.bitcoins.testkit.node.fixture.NeutrinoNodeConnectedWithBitcoind
import org.bitcoins.testkit.util.TorUtil
import org.scalatest.{FutureOutcome, Outcome}
import scala.concurrent.Future
class NeutrinoUnsupportedPeerTest
extends NodeTestWithCachedBitcoindNoP2pBlockFilters {
override protected def getFreshConfig: BitcoinSAppConfig = {
val config = ConfigFactory.parseString(
"""
|bitcoin-s.node.peer-discovery-timeout = 10s
""".stripMargin
)
BitcoinSTestAppConfig.getNeutrinoWithEmbeddedDbTestConfig(pgUrl,
Vector(config))
}
override type FixtureParam = NeutrinoNodeConnectedWithBitcoind
override def withFixture(test: OneArgAsyncTest): FutureOutcome = {
if (TorUtil.torEnabled) {
// We must skip this test for tor enabled
// because bitcoind only supported tor v2 at the time
// which is now deprecated and no longer supported by
// the tor network
FutureOutcome.succeeded
} else {
val outcomeF: Future[Outcome] = for {
bitcoind <- cachedBitcoindWithFundsF
outcome = withNeutrinoNodeUnstarted(test, bitcoind)(system,
getFreshConfig)
f <- outcome.toFuture
} yield f
new FutureOutcome(outcomeF)
}
}
behavior of "NeutrinoNode"
it must "throw RuntimeException if peer does not support compact filters" in {
nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoind =>
val node = nodeConnectedWithBitcoind.node
val exception = recoverToExceptionIf[RuntimeException] {
for {
startedNode <- node.start()
_ <- startedNode.sync()
} yield ()
}
exception.map(e =>
assert(e.getMessage.startsWith("No supported peers found!")))
}
}

View file

@ -53,6 +53,9 @@ case class NeutrinoNode(
res res
} }
override def stop(): Future[NeutrinoNode] =
super.stop().map(_.asInstanceOf[NeutrinoNode])
/** Starts to sync our node with our peer /** Starts to sync our node with our peer
* If our local best block hash is the same as our peers * If our local best block hash is the same as our peers
* we will not sync, otherwise we will keep syncing * we will not sync, otherwise we will keep syncing
@ -60,16 +63,18 @@ case class NeutrinoNode(
* *
* @return * @return
*/ */
override def sync(): Future[Unit] = { override def sync(): Future[Option[Peer]] = {
for { for {
chainApi <- chainApiFromDb() chainApi <- chainApiFromDb()
_ <- chainApi.setSyncing(true) _ <- chainApi.setSyncing(true)
_ = logger.info(s"Fetching peers to sync with...") _ = logger.info(s"Fetching peers to sync with...")
syncPeer <- peerManager.randomPeerWithService( syncPeerOpt <- peerManager.randomPeerWithService(
ServiceIdentifier.NODE_COMPACT_FILTERS) ServiceIdentifier.NODE_COMPACT_FILTERS)
_ <- syncHelper(syncPeer) _ <- syncPeerOpt match {
} yield () case Some(syncPeer) => syncHelper(syncPeer)
case None => Future.unit
}
} yield syncPeerOpt
} }
private def syncHelper(syncPeer: Peer): Future[Unit] = { private def syncHelper(syncPeer: Peer): Future[Unit] = {
@ -142,12 +147,15 @@ case class NeutrinoNode(
} }
} }
override def syncFromNewPeer(): Future[Unit] = { override def syncFromNewPeer(): Future[Option[Peer]] = {
for { for {
syncPeer <- peerManager.randomPeerWithService( syncPeerOpt <- peerManager.randomPeerWithService(
ServiceIdentifier.NODE_COMPACT_FILTERS) ServiceIdentifier.NODE_COMPACT_FILTERS)
_ <- syncHelper(syncPeer) _ <- syncPeerOpt match {
} yield () case Some(p) => syncHelper(p)
case None => Future.unit
}
} yield syncPeerOpt
} }
/** Gets the number of compact filters in the database */ /** Gets the number of compact filters in the database */

View file

@ -122,9 +122,12 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger {
* *
* @return * @return
*/ */
def sync(): Future[Unit] def sync(): Future[Option[Peer]]
def syncFromNewPeer(): Future[Unit] /** Sync from a new peer
* @return the new peer we are syncing from else none if we could not start syncing with another peer
*/
def syncFromNewPeer(): Future[Option[Peer]]
/** Broadcasts the given transaction over the P2P network */ /** Broadcasts the given transaction over the P2P network */
override def broadcastTransactions( override def broadcastTransactions(

View file

@ -221,7 +221,6 @@ case class PeerFinder(
_peerData.put(peer, _peerData.put(peer,
PeerData(peer, controlMessageHandler, queue, supervisor)) PeerData(peer, controlMessageHandler, queue, supervisor))
_peerData(peer).peerMessageSender.map(_.connect()) _peerData(peer).peerMessageSender.map(_.connect())
} }
private def tryToReconnectPeer(peer: Peer): Future[Unit] = { private def tryToReconnectPeer(peer: Peer): Future[Unit] = {

View file

@ -1,7 +1,7 @@
package org.bitcoins.node package org.bitcoins.node
import akka.Done import akka.Done
import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} import akka.actor.{ActorRef, ActorSystem, Props}
import akka.stream.scaladsl.{ import akka.stream.scaladsl.{
Keep, Keep,
RunnableGraph, RunnableGraph,
@ -34,11 +34,11 @@ import org.bitcoins.node.util.{BitcoinSNodeUtil, PeerMessageSenderApi}
import scodec.bits.ByteVector import scodec.bits.ByteVector
import java.net.InetAddress import java.net.InetAddress
import java.time.{Duration, Instant} import java.time.{Instant}
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable import scala.collection.mutable
import scala.concurrent.duration.DurationInt import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future, Promise} import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random import scala.util.Random
case class PeerManager( case class PeerManager(
@ -113,13 +113,21 @@ case class PeerManager(
val peerMsgSenderF = peerOpt match { val peerMsgSenderF = peerOpt match {
case Some(peer) => case Some(peer) =>
val peerMsgSenderF = peerDataMap(peer).peerMessageSender val peerMsgSenderF = peerDataMap(peer).peerMessageSender
peerMsgSenderF peerMsgSenderF.map(Some(_))
case None => case None =>
val peerMsgSenderF = randomPeerMsgSenderWithService( val peerMsgSenderF = randomPeerMsgSenderWithService(
ServiceIdentifier.NODE_NETWORK) ServiceIdentifier.NODE_NETWORK)
peerMsgSenderF peerMsgSenderF
} }
peerMsgSenderF.flatMap(_.sendMsg(msg)) peerMsgSenderF.flatMap { peerMsgSenderOpt =>
peerMsgSenderOpt match {
case Some(peerMsgSender) => peerMsgSender.sendMsg(msg)
case None =>
val exn = new RuntimeException(
s"Unable to send message=${msg.commandName} because we couldn't find a peer to send it to")
Future.failed(exn)
}
}
} }
/** Gossips the given message to all peers except the excluded peer. If None given as excluded peer, gossip message to all peers */ /** Gossips the given message to all peers except the excluded peer. If None given as excluded peer, gossip message to all peers */
@ -142,16 +150,8 @@ case class PeerManager(
override def sendGetHeadersMessage( override def sendGetHeadersMessage(
hashes: Vector[DoubleSha256DigestBE], hashes: Vector[DoubleSha256DigestBE],
peerOpt: Option[Peer]): Future[Unit] = { peerOpt: Option[Peer]): Future[Unit] = {
val peerMsgSenderF = peerOpt match { val headersMsg = GetHeadersMessage(hashes.distinct.take(101).map(_.flip))
case Some(peer) => sendMsg(headersMsg, peerOpt)
val peerMsgSenderF = peerDataMap(peer).peerMessageSender
peerMsgSenderF
case None =>
val peerMsgSenderF = randomPeerMsgSenderWithService(
ServiceIdentifier.NODE_NETWORK)
peerMsgSenderF
}
peerMsgSenderF.flatMap(_.sendGetHeadersMessage(hashes.map(_.flip)))
} }
override def sendGetDataMessages( override def sendGetDataMessages(
@ -167,10 +167,15 @@ case class PeerManager(
case None => case None =>
val peerMsgSenderF = randomPeerMsgSenderWithService( val peerMsgSenderF = randomPeerMsgSenderWithService(
ServiceIdentifier.NODE_NETWORK) ServiceIdentifier.NODE_NETWORK)
peerMsgSenderF.flatMap( peerMsgSenderF.flatMap {
_.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock, case Some(peerMsgSender) =>
hashes.map(_.flip): _*)) peerMsgSender.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock,
hashes.map(_.flip): _*)
case None =>
val exn = new RuntimeException(
s"Unable to send getdatamessage because we couldn't find a peer to send it to, hashes=$hashes")
Future.failed(exn)
}
} }
} }
@ -238,32 +243,33 @@ case class PeerManager(
} }
} }
def randomPeerWithService(services: ServiceIdentifier): Future[Peer] = { def randomPeerWithService(
//wait when requested services: ServiceIdentifier): Future[Option[Peer]] = {
val waitF = val filteredPeers =
awaitPeerWithService(services, peerDataMap
timeout = nodeAppConfig.peerDiscoveryTimeout) .filter(p => p._2.serviceIdentifier.hasServicesOf(services))
.keys
.toVector
val (good, _) =
filteredPeers.partition(p => !peerDataMap(p).hasFailedRecently)
waitF.map { _ => val peerOpt = if (good.nonEmpty) {
val filteredPeers = Some(good(Random.nextInt(good.length)))
peerDataMap } else {
.filter(p => p._2.serviceIdentifier.hasServicesOf(services)) None
.keys
.toVector
require(filteredPeers.nonEmpty)
val (good, failedRecently) =
filteredPeers.partition(p => !peerDataMap(p).hasFailedRecently)
if (good.nonEmpty) good(Random.nextInt(good.length))
else
failedRecently(Random.nextInt(failedRecently.length))
} }
Future.successful(peerOpt)
} }
def randomPeerMsgSenderWithService( private def randomPeerMsgSenderWithService(
services: ServiceIdentifier): Future[PeerMessageSender] = { services: ServiceIdentifier): Future[Option[PeerMessageSender]] = {
val randomPeerF = randomPeerWithService(services) val randomPeerOptF = randomPeerWithService(services)
randomPeerF.flatMap(peer => peerDataMap(peer).peerMessageSender) randomPeerOptF.flatMap { peerOpt =>
peerOpt match {
case Some(peer) => peerDataMap(peer).peerMessageSender.map(Some(_))
case None => Future.successful(None)
}
}
} }
private def createInDb( private def createInDb(
@ -290,46 +296,6 @@ case class PeerManager(
serviceIdentifier) serviceIdentifier)
} }
private var peerServicesQueries: Vector[Cancellable] = Vector.empty
private def awaitPeerWithService(
services: ServiceIdentifier,
timeout: Duration): Future[Unit] = {
logger.debug(s"Waiting for peer connection. ${_peerDataMap.keys}")
val promise = Promise[Unit]()
var counter = 0
val cancellable =
system.scheduler.scheduleAtFixedRate(0.seconds, 1.second) { () =>
if (
_peerDataMap.exists(x =>
x._2.serviceIdentifier.hasServicesOf(services))
) {
promise.success(())
} else if (counter == timeout.getSeconds.toInt) {
promise.failure(
new RuntimeException(
s"No supported peers found! Requested: ${services}"))
} else {
counter += 1
}
}
peerServicesQueries = peerServicesQueries.appended(cancellable)
//remove the cancellable from the peerServicesQueries
//when our promise is completed from the scheduled job
promise.future.onComplete { _ =>
val _: Boolean = cancellable.cancel()
val idx = peerServicesQueries.indexOf(cancellable)
if (idx >= 0) {
peerServicesQueries = peerServicesQueries.zipWithIndex
.filter(_._2 != idx)
.map(_._1)
}
}
promise.future
}
def replacePeer(replacePeer: Peer, withPeer: Peer): Future[Unit] = { def replacePeer(replacePeer: Peer, withPeer: Peer): Future[Unit] = {
logger.debug(s"Replacing $replacePeer with $withPeer") logger.debug(s"Replacing $replacePeer with $withPeer")
assert(!peerDataMap(replacePeer).serviceIdentifier.nodeCompactFilters, assert(!peerDataMap(replacePeer).serviceIdentifier.nodeCompactFilters,
@ -392,8 +358,6 @@ case class PeerManager(
case None => Future.unit case None => Future.unit
} }
peerServicesQueries.foreach(_.cancel()) //reset the peerServicesQueries var?
val stopF = for { val stopF = for {
_ <- finderStopF _ <- finderStopF
_ <- Future.traverse(peers)(removePeer) _ <- Future.traverse(peers)(removePeer)

View file

@ -59,6 +59,7 @@
<!-- Inspect handling of headers and inventory messages --> <!-- Inspect handling of headers and inventory messages -->
<logger name="org.bitcoins.node.networking.peer.DataMessageHandler" level="WARN"/> <logger name="org.bitcoins.node.networking.peer.DataMessageHandler" level="WARN"/>
<logger name="org.bitcoins.node.networking.peer.ControlMessageHandler" level="WARN"/>
<!-- inspect TCP details --> <!-- inspect TCP details -->
<logger name="org.bitcoins.node.networking.P2PClientActor" level="WARN"/> <logger name="org.bitcoins.node.networking.P2PClientActor" level="WARN"/>
@ -66,6 +67,8 @@
<logger name="org.bitcoins.node.networking.P2PClientSupervisor" level="WARN"/> <logger name="org.bitcoins.node.networking.P2PClientSupervisor" level="WARN"/>
<logger name="org.bitcoins.node.PeerManager" level="WARN"/> <logger name="org.bitcoins.node.PeerManager" level="WARN"/>
<logger name="org.bitcoins.node.PeerFinder" level="WARN"/>
<!-- ╔════════════════════╗ --> <!-- ╔════════════════════╗ -->
<!-- ║ Chain module ║ --> <!-- ║ Chain module ║ -->
<!-- ╚════════════════════╝ --> <!-- ╚════════════════════╝ -->

View file

@ -518,7 +518,7 @@ object NodeUnitTest extends P2PLogger {
//see: https://github.com/bitcoin/bitcoin/issues/27085 //see: https://github.com/bitcoin/bitcoin/issues/27085
//see: https://github.com/bitcoin-s/bitcoin-s/issues/4976 //see: https://github.com/bitcoin-s/bitcoin-s/issues/4976
_ <- bitcoind.syncWithValidationInterfaceQueue() _ <- bitcoind.syncWithValidationInterfaceQueue()
_ <- node.sync() _ <- AsyncUtil.retryUntilSatisfiedF(() => node.sync().map(_.isDefined))
syncing <- node.chainApiFromDb().flatMap(_.isSyncing()) syncing <- node.chainApiFromDb().flatMap(_.isSyncing())
_ = require(syncing) _ = require(syncing)
_ <- NodeTestUtil.awaitSync(node, bitcoind) _ <- NodeTestUtil.awaitSync(node, bitcoind)