From abeaaa05dea2314b9295ea32b71153f07a548a60 Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Wed, 7 Jun 2023 04:47:39 -0500 Subject: [PATCH] Remove `awaitPeerWithServices()` (#5093) * Remove awaitPeerWithServices() * Empty commit to run CI * Rework Node.sync() to return Future[Option[Peer]] rather than Future[Unit]. This returns the peer we are syncing with, if we could find one to sync with * Turn logging OFF again * Empty commit to re-run CI * Use AsyncUtil.retryUntilSatisfied() when calling node.sync() after starting node to make sure we have a peer to sync from in a test case * Await on re-started node not stale reference in NeutrinoNodeWithWalletTest * Fix second reference * Empty commit to re-run CI --- .../org/bitcoins/node/NeutrinoNodeTest.scala | 2 +- .../node/NeutrinoNodeWithWalletTest.scala | 7 +- .../node/NeutrinoUnsupportedPeerTest.scala | 60 -------- .../org/bitcoins/node/NeutrinoNode.scala | 26 ++-- .../main/scala/org/bitcoins/node/Node.scala | 7 +- .../scala/org/bitcoins/node/PeerFinder.scala | 1 - .../scala/org/bitcoins/node/PeerManager.scala | 130 +++++++----------- .../.jvm/src/main/resources/logback-test.xml | 3 + .../bitcoins/testkit/node/NodeUnitTest.scala | 2 +- 9 files changed, 78 insertions(+), 160 deletions(-) delete mode 100644 node-test/src/test/scala/org/bitcoins/node/NeutrinoUnsupportedPeerTest.scala diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala index db01fe80c6..12ab1717e8 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeTest.scala @@ -299,7 +299,7 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair { _ <- bitcoind.generate(1) //restart the node _ <- node.start() - _ <- node.sync() + _ <- AsyncUtil.retryUntilSatisfiedF(() => node.sync().map(_.isDefined)) //await for us to sync compact filter headers filters //the sync process should get kicked off after we see the //newly mined block header diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala index d8df622a9e..f8eb4677b3 100644 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala +++ b/node-test/src/test/scala/org/bitcoins/node/NeutrinoNodeWithWalletTest.scala @@ -240,7 +240,7 @@ class NeutrinoNodeWithWalletTest extends NodeTestWithCachedBitcoindNewest { //restart the node now that we have received funds startedNode <- stoppedNode.start() _ <- startedNode.sync() - _ <- NodeTestUtil.awaitSync(node = node, rpc = bitcoind) + _ <- NodeTestUtil.awaitSync(node = startedNode, rpc = bitcoind) _ <- AsyncUtil.retryUntilSatisfiedF(() => { for { balance <- wallet.getBalance() @@ -278,8 +278,9 @@ class NeutrinoNodeWithWalletTest extends NodeTestWithCachedBitcoindNewest { //bring node back online startedNode <- stoppedNode.start() - _ <- startedNode.sync() - _ <- NodeTestUtil.awaitSync(node, bitcoind) + _ <- AsyncUtil.retryUntilSatisfiedF(() => + startedNode.sync().map(_.isDefined)) + _ <- NodeTestUtil.awaitSync(startedNode, bitcoind) balanceAfterSpend <- wallet.getBalance() } yield { assert(balanceAfterSpend < initBalance) diff --git a/node-test/src/test/scala/org/bitcoins/node/NeutrinoUnsupportedPeerTest.scala b/node-test/src/test/scala/org/bitcoins/node/NeutrinoUnsupportedPeerTest.scala deleted file mode 100644 index 18ab0546a0..0000000000 --- a/node-test/src/test/scala/org/bitcoins/node/NeutrinoUnsupportedPeerTest.scala +++ /dev/null @@ -1,60 +0,0 @@ -package org.bitcoins.node - -import com.typesafe.config.ConfigFactory -import org.bitcoins.server.BitcoinSAppConfig -import org.bitcoins.testkit.BitcoinSTestAppConfig -import org.bitcoins.testkit.node.{NodeTestWithCachedBitcoindNoP2pBlockFilters} -import org.bitcoins.testkit.node.fixture.NeutrinoNodeConnectedWithBitcoind -import org.bitcoins.testkit.util.TorUtil -import org.scalatest.{FutureOutcome, Outcome} - -import scala.concurrent.Future - -class NeutrinoUnsupportedPeerTest - extends NodeTestWithCachedBitcoindNoP2pBlockFilters { - - override protected def getFreshConfig: BitcoinSAppConfig = { - val config = ConfigFactory.parseString( - """ - |bitcoin-s.node.peer-discovery-timeout = 10s - """.stripMargin - ) - BitcoinSTestAppConfig.getNeutrinoWithEmbeddedDbTestConfig(pgUrl, - Vector(config)) - } - - override type FixtureParam = NeutrinoNodeConnectedWithBitcoind - - override def withFixture(test: OneArgAsyncTest): FutureOutcome = { - if (TorUtil.torEnabled) { - // We must skip this test for tor enabled - // because bitcoind only supported tor v2 at the time - // which is now deprecated and no longer supported by - // the tor network - FutureOutcome.succeeded - } else { - val outcomeF: Future[Outcome] = for { - bitcoind <- cachedBitcoindWithFundsF - outcome = withNeutrinoNodeUnstarted(test, bitcoind)(system, - getFreshConfig) - f <- outcome.toFuture - } yield f - new FutureOutcome(outcomeF) - } - } - - behavior of "NeutrinoNode" - - it must "throw RuntimeException if peer does not support compact filters" in { - nodeConnectedWithBitcoind: NeutrinoNodeConnectedWithBitcoind => - val node = nodeConnectedWithBitcoind.node - val exception = recoverToExceptionIf[RuntimeException] { - for { - startedNode <- node.start() - _ <- startedNode.sync() - } yield () - } - exception.map(e => - assert(e.getMessage.startsWith("No supported peers found!"))) - } -} diff --git a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala index 6c2fd8d2ac..36a3fffaa4 100644 --- a/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala +++ b/node/src/main/scala/org/bitcoins/node/NeutrinoNode.scala @@ -53,6 +53,9 @@ case class NeutrinoNode( res } + override def stop(): Future[NeutrinoNode] = + super.stop().map(_.asInstanceOf[NeutrinoNode]) + /** Starts to sync our node with our peer * If our local best block hash is the same as our peers * we will not sync, otherwise we will keep syncing @@ -60,16 +63,18 @@ case class NeutrinoNode( * * @return */ - override def sync(): Future[Unit] = { - + override def sync(): Future[Option[Peer]] = { for { chainApi <- chainApiFromDb() _ <- chainApi.setSyncing(true) _ = logger.info(s"Fetching peers to sync with...") - syncPeer <- peerManager.randomPeerWithService( + syncPeerOpt <- peerManager.randomPeerWithService( ServiceIdentifier.NODE_COMPACT_FILTERS) - _ <- syncHelper(syncPeer) - } yield () + _ <- syncPeerOpt match { + case Some(syncPeer) => syncHelper(syncPeer) + case None => Future.unit + } + } yield syncPeerOpt } private def syncHelper(syncPeer: Peer): Future[Unit] = { @@ -142,12 +147,15 @@ case class NeutrinoNode( } } - override def syncFromNewPeer(): Future[Unit] = { + override def syncFromNewPeer(): Future[Option[Peer]] = { for { - syncPeer <- peerManager.randomPeerWithService( + syncPeerOpt <- peerManager.randomPeerWithService( ServiceIdentifier.NODE_COMPACT_FILTERS) - _ <- syncHelper(syncPeer) - } yield () + _ <- syncPeerOpt match { + case Some(p) => syncHelper(p) + case None => Future.unit + } + } yield syncPeerOpt } /** Gets the number of compact filters in the database */ diff --git a/node/src/main/scala/org/bitcoins/node/Node.scala b/node/src/main/scala/org/bitcoins/node/Node.scala index 720d5cd518..d16abfce44 100644 --- a/node/src/main/scala/org/bitcoins/node/Node.scala +++ b/node/src/main/scala/org/bitcoins/node/Node.scala @@ -122,9 +122,12 @@ trait Node extends NodeApi with ChainQueryApi with P2PLogger { * * @return */ - def sync(): Future[Unit] + def sync(): Future[Option[Peer]] - def syncFromNewPeer(): Future[Unit] + /** Sync from a new peer + * @return the new peer we are syncing from else none if we could not start syncing with another peer + */ + def syncFromNewPeer(): Future[Option[Peer]] /** Broadcasts the given transaction over the P2P network */ override def broadcastTransactions( diff --git a/node/src/main/scala/org/bitcoins/node/PeerFinder.scala b/node/src/main/scala/org/bitcoins/node/PeerFinder.scala index a5a686218e..1573d21818 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerFinder.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerFinder.scala @@ -221,7 +221,6 @@ case class PeerFinder( _peerData.put(peer, PeerData(peer, controlMessageHandler, queue, supervisor)) _peerData(peer).peerMessageSender.map(_.connect()) - } private def tryToReconnectPeer(peer: Peer): Future[Unit] = { diff --git a/node/src/main/scala/org/bitcoins/node/PeerManager.scala b/node/src/main/scala/org/bitcoins/node/PeerManager.scala index 52eefdd6c6..fc2852b2f3 100644 --- a/node/src/main/scala/org/bitcoins/node/PeerManager.scala +++ b/node/src/main/scala/org/bitcoins/node/PeerManager.scala @@ -1,7 +1,7 @@ package org.bitcoins.node import akka.Done -import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} +import akka.actor.{ActorRef, ActorSystem, Props} import akka.stream.scaladsl.{ Keep, RunnableGraph, @@ -34,11 +34,11 @@ import org.bitcoins.node.util.{BitcoinSNodeUtil, PeerMessageSenderApi} import scodec.bits.ByteVector import java.net.InetAddress -import java.time.{Duration, Instant} +import java.time.{Instant} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.concurrent.duration.DurationInt -import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.{ExecutionContext, Future} import scala.util.Random case class PeerManager( @@ -113,13 +113,21 @@ case class PeerManager( val peerMsgSenderF = peerOpt match { case Some(peer) => val peerMsgSenderF = peerDataMap(peer).peerMessageSender - peerMsgSenderF + peerMsgSenderF.map(Some(_)) case None => val peerMsgSenderF = randomPeerMsgSenderWithService( ServiceIdentifier.NODE_NETWORK) peerMsgSenderF } - peerMsgSenderF.flatMap(_.sendMsg(msg)) + peerMsgSenderF.flatMap { peerMsgSenderOpt => + peerMsgSenderOpt match { + case Some(peerMsgSender) => peerMsgSender.sendMsg(msg) + case None => + val exn = new RuntimeException( + s"Unable to send message=${msg.commandName} because we couldn't find a peer to send it to") + Future.failed(exn) + } + } } /** Gossips the given message to all peers except the excluded peer. If None given as excluded peer, gossip message to all peers */ @@ -142,16 +150,8 @@ case class PeerManager( override def sendGetHeadersMessage( hashes: Vector[DoubleSha256DigestBE], peerOpt: Option[Peer]): Future[Unit] = { - val peerMsgSenderF = peerOpt match { - case Some(peer) => - val peerMsgSenderF = peerDataMap(peer).peerMessageSender - peerMsgSenderF - case None => - val peerMsgSenderF = randomPeerMsgSenderWithService( - ServiceIdentifier.NODE_NETWORK) - peerMsgSenderF - } - peerMsgSenderF.flatMap(_.sendGetHeadersMessage(hashes.map(_.flip))) + val headersMsg = GetHeadersMessage(hashes.distinct.take(101).map(_.flip)) + sendMsg(headersMsg, peerOpt) } override def sendGetDataMessages( @@ -167,10 +167,15 @@ case class PeerManager( case None => val peerMsgSenderF = randomPeerMsgSenderWithService( ServiceIdentifier.NODE_NETWORK) - peerMsgSenderF.flatMap( - _.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock, - hashes.map(_.flip): _*)) - + peerMsgSenderF.flatMap { + case Some(peerMsgSender) => + peerMsgSender.sendGetDataMessage(TypeIdentifier.MsgWitnessBlock, + hashes.map(_.flip): _*) + case None => + val exn = new RuntimeException( + s"Unable to send getdatamessage because we couldn't find a peer to send it to, hashes=$hashes") + Future.failed(exn) + } } } @@ -238,32 +243,33 @@ case class PeerManager( } } - def randomPeerWithService(services: ServiceIdentifier): Future[Peer] = { - //wait when requested - val waitF = - awaitPeerWithService(services, - timeout = nodeAppConfig.peerDiscoveryTimeout) + def randomPeerWithService( + services: ServiceIdentifier): Future[Option[Peer]] = { + val filteredPeers = + peerDataMap + .filter(p => p._2.serviceIdentifier.hasServicesOf(services)) + .keys + .toVector + val (good, _) = + filteredPeers.partition(p => !peerDataMap(p).hasFailedRecently) - waitF.map { _ => - val filteredPeers = - peerDataMap - .filter(p => p._2.serviceIdentifier.hasServicesOf(services)) - .keys - .toVector - require(filteredPeers.nonEmpty) - val (good, failedRecently) = - filteredPeers.partition(p => !peerDataMap(p).hasFailedRecently) - - if (good.nonEmpty) good(Random.nextInt(good.length)) - else - failedRecently(Random.nextInt(failedRecently.length)) + val peerOpt = if (good.nonEmpty) { + Some(good(Random.nextInt(good.length))) + } else { + None } + Future.successful(peerOpt) } - def randomPeerMsgSenderWithService( - services: ServiceIdentifier): Future[PeerMessageSender] = { - val randomPeerF = randomPeerWithService(services) - randomPeerF.flatMap(peer => peerDataMap(peer).peerMessageSender) + private def randomPeerMsgSenderWithService( + services: ServiceIdentifier): Future[Option[PeerMessageSender]] = { + val randomPeerOptF = randomPeerWithService(services) + randomPeerOptF.flatMap { peerOpt => + peerOpt match { + case Some(peer) => peerDataMap(peer).peerMessageSender.map(Some(_)) + case None => Future.successful(None) + } + } } private def createInDb( @@ -290,46 +296,6 @@ case class PeerManager( serviceIdentifier) } - private var peerServicesQueries: Vector[Cancellable] = Vector.empty - - private def awaitPeerWithService( - services: ServiceIdentifier, - timeout: Duration): Future[Unit] = { - logger.debug(s"Waiting for peer connection. ${_peerDataMap.keys}") - val promise = Promise[Unit]() - var counter = 0 - val cancellable = - system.scheduler.scheduleAtFixedRate(0.seconds, 1.second) { () => - if ( - _peerDataMap.exists(x => - x._2.serviceIdentifier.hasServicesOf(services)) - ) { - promise.success(()) - } else if (counter == timeout.getSeconds.toInt) { - promise.failure( - new RuntimeException( - s"No supported peers found! Requested: ${services}")) - } else { - counter += 1 - } - } - - peerServicesQueries = peerServicesQueries.appended(cancellable) - - //remove the cancellable from the peerServicesQueries - //when our promise is completed from the scheduled job - promise.future.onComplete { _ => - val _: Boolean = cancellable.cancel() - val idx = peerServicesQueries.indexOf(cancellable) - if (idx >= 0) { - peerServicesQueries = peerServicesQueries.zipWithIndex - .filter(_._2 != idx) - .map(_._1) - } - } - promise.future - } - def replacePeer(replacePeer: Peer, withPeer: Peer): Future[Unit] = { logger.debug(s"Replacing $replacePeer with $withPeer") assert(!peerDataMap(replacePeer).serviceIdentifier.nodeCompactFilters, @@ -392,8 +358,6 @@ case class PeerManager( case None => Future.unit } - peerServicesQueries.foreach(_.cancel()) //reset the peerServicesQueries var? - val stopF = for { _ <- finderStopF _ <- Future.traverse(peers)(removePeer) diff --git a/testkit-core/.jvm/src/main/resources/logback-test.xml b/testkit-core/.jvm/src/main/resources/logback-test.xml index 03035e48d5..b0a502d5a5 100644 --- a/testkit-core/.jvm/src/main/resources/logback-test.xml +++ b/testkit-core/.jvm/src/main/resources/logback-test.xml @@ -59,6 +59,7 @@ + @@ -66,6 +67,8 @@ + + diff --git a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala index 6e05291d34..cb2b5bf37f 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/node/NodeUnitTest.scala @@ -518,7 +518,7 @@ object NodeUnitTest extends P2PLogger { //see: https://github.com/bitcoin/bitcoin/issues/27085 //see: https://github.com/bitcoin-s/bitcoin-s/issues/4976 _ <- bitcoind.syncWithValidationInterfaceQueue() - _ <- node.sync() + _ <- AsyncUtil.retryUntilSatisfiedF(() => node.sync().map(_.isDefined)) syncing <- node.chainApiFromDb().flatMap(_.isSyncing()) _ = require(syncing) _ <- NodeTestUtil.awaitSync(node, bitcoind)