2024 10 21 Replace Future.sequence() usage with Future.traverse() (#5732)

* Use more Future.traverse()

* More Future.traverse()
This commit is contained in:
Chris Stewart 2024-10-22 16:57:23 -05:00 committed by GitHub
parent 528ceae9d4
commit 65e67287f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 53 additions and 63 deletions

View File

@ -223,9 +223,10 @@ class BitcoindRpcClient(override val instance: BitcoindInstance)(implicit
from: BlockHeaderDb,
to: BlockHeaderDb
): Future[Vector[BlockHeaderDb]] = {
val range = from.height.to(to.height).toVector
val headerFs =
from.height.to(to.height).map(height => getHeaderAtHeight(height))
Future.sequence(headerFs).map(_.toVector)
Future.traverse(range)(height => getHeaderAtHeight(height))
headerFs
}
private def getHeaderAtHeight(height: Int): Future[BlockHeaderDb] =

View File

@ -332,7 +332,7 @@ trait BlockchainRpc extends ChainApi { self: Client =>
val allHeights = startHeight.to(endHeight)
def f(range: Vector[Int]): Future[Vector[FilterResponse]] = {
val filterFs = range.map { height =>
val filterFs = Future.traverse(range) { height =>
for {
hash <- getBlockHash(height)
filter <- getBlockFilter(hash, FilterType.Basic)
@ -340,7 +340,7 @@ trait BlockchainRpc extends ChainApi { self: Client =>
FilterResponse(filter.filter, hash, height)
}
}
Future.sequence(filterFs)
filterFs
}
FutureUtil.batchAndSyncExecute(

View File

@ -952,10 +952,9 @@ class ChainHandler(
case Some(blockHeight) =>
for {
tips <- getBestChainTips()
getNAncestorsFs = tips.map { tip =>
ancestorChains <- Future.traverse(tips) { tip =>
blockHeaderDAO.getNAncestors(tip.hashBE, tip.height - blockHeight)
}
ancestorChains <- Future.sequence(getNAncestorsFs)
} yield {
val confs = ancestorChains.flatMap { chain =>
if (chain.last.hashBE == blockHash) {

View File

@ -162,14 +162,11 @@ abstract class FilterSync extends ChainVerificationLogger {
)(implicit ec: ExecutionContext): Future[ChainApi] = {
// now that we have headers that are missing filters, let's fetch the filters
val fetchNested = missingHeaders.map { b =>
val filterF = getFilterFunc(b.blockHeader)
filterF.map(f => (b, f))
}
val fetchFiltersF: Future[Vector[(BlockHeaderDb, FilterWithHeaderHash)]] = {
Future.sequence(fetchNested)
}
val fetchFiltersF: Future[Vector[(BlockHeaderDb, FilterWithHeaderHash)]] =
Future.traverse(missingHeaders) { b =>
val filterF = getFilterFunc(b.blockHeader)
filterF.map(f => (b, f))
}
// now let's build filter headers
val blockFiltersAggF: Future[Vector[BlockFilterAggregated]] = {

View File

@ -400,17 +400,19 @@ case class BlockHeaderDAO()(implicit
val chainTipsF = getForkedChainTips
val bestTipF = getBestChainTips
val staleChainsF = chainTipsF.flatMap { tips =>
val nestedFuture: Vector[Future[Option[Blockchain]]] = tips.map { tip =>
getBlockchainFrom(tip)
}
Future.sequence(nestedFuture).map(_.flatten)
Future
.traverse(tips) { tip =>
getBlockchainFrom(tip)
}
.map(_.flatten)
}
val bestChainsF = bestTipF.flatMap { tips =>
val nestedFuture: Vector[Future[Option[Blockchain]]] = tips.map { tip =>
getBlockchainFrom(tip)
}
Future.sequence(nestedFuture).map(_.flatten)
Future
.traverse(tips) { tip =>
getBlockchainFrom(tip)
}
.map(_.flatten)
}
for {

View File

@ -67,7 +67,7 @@ trait BitcoinSCryptoAsyncTest
.toVector
.flatten
val testRunsF = Future.sequence(samples.map(func))
val testRunsF = Future.traverse(samples)(func)
checkRunResults(testRunsF)
}
@ -84,7 +84,7 @@ trait BitcoinSCryptoAsyncTest
(a, b)
}
val testRunsF = Future.sequence(samples.map(x => func(x._1, x._2)))
val testRunsF = Future.traverse(samples)(x => func(x._1, x._2))
checkRunResults(testRunsF)
}

View File

@ -80,12 +80,12 @@ trait LndRouterClient { self: LndRpcClient =>
routeHints: Vector[LnRoute]
): Future[Vector[Route]] = {
queryRoutes(amount, node, routeHints).map(_.routes).flatMap { routes =>
val fs = routes.toVector.map { route =>
val fs = Future.traverse(routes.toVector) { route =>
val fakeHash = CryptoUtil.sha256(ECPrivateKey.freshPrivateKey.bytes)
sendToRoute(fakeHash, route).map(t => (route, t))
}
Future.sequence(fs).map { results =>
fs.map { results =>
results
.filter(
_._2.failure.exists(_.code == INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS)

View File

@ -75,9 +75,8 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
val node = nodeConnectedWithBitcoind.node
def peerManager = node.peerManager
def peers = peerManager.peers
val ourPeersF: Future[Vector[Peer]] = Future.sequence(
nodeConnectedWithBitcoind.bitcoinds.map(NodeTestUtil.getBitcoindPeer)
)
val ourPeersF: Future[Vector[Peer]] = Future.traverse(
nodeConnectedWithBitcoind.bitcoinds)(NodeTestUtil.getBitcoindPeer)
def has2Peers: Future[Unit] =
AsyncUtil.retryUntilSatisfied(
@ -89,10 +88,10 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
assert(ours.map(peers.contains(_)).forall(_ == true))
}
def allConnected: Future[Assertion] = for {
conns <- Future.sequence(peers.map(peerManager.isConnected))
conns <- Future.traverse(peers)(peerManager.isConnected)
} yield assert(conns.forall(_ == true))
def allInitialized: Future[Assertion] = for {
inits <- Future.sequence(peers.map(peerManager.isInitialized))
inits <- Future.traverse(peers)(peerManager.isInitialized)
} yield assert(inits.forall(_ == true))
for {
@ -144,9 +143,8 @@ class NeutrinoNodeTest extends NodeTestWithCachedBitcoindPair {
for {
_ <- assertConnAndInit
ourPeers <- Future.sequence(
nodeConnectedWithBitcoind.bitcoinds.map(NodeTestUtil.getBitcoindPeer)
)
ourPeers <- Future.traverse(nodeConnectedWithBitcoind.bitcoinds)(
NodeTestUtil.getBitcoindPeer)
peerDbs <- PeerDAO()(node.nodeAppConfig, executionContext).findAll()
} yield {

View File

@ -27,8 +27,7 @@ class NeutrinoNodeWithUncachedBitcoindTest extends NodeUnitTest with CachedTor {
lazy val bitcoinPeersF: Future[Vector[Peer]] = {
bitcoindsF.flatMap { bitcoinds =>
val peersF = bitcoinds.map(NodeTestUtil.getBitcoindPeer)
Future.sequence(peersF)
Future.traverse(bitcoinds)(NodeTestUtil.getBitcoindPeer)
}
}

View File

@ -1152,7 +1152,7 @@ trait DLCTest {
constructAndSetupDLC(contractParams)
.flatMap {
case (dlcOffer, offerSetup, dlcAccept, acceptSetup, outcomes) =>
val testFs = outcomeIndices.map {
val testFs = Future.traverse(outcomeIndices) {
case (contractIndex, outcomeIndex) =>
executeForOutcome(
outcomeIndex = outcomeIndex,
@ -1164,8 +1164,7 @@ trait DLCTest {
contractIndex = contractIndex
)
}
Future.sequence(testFs).map(_ => succeed)
testFs.map(_ => succeed)
}
}

View File

@ -659,14 +659,12 @@ trait EclairRpcTestUtil extends BitcoinSLogger {
def sendPayments(c1: EclairApi, c2: EclairApi, numPayments: Int = 5)(implicit
ec: ExecutionContext
): Future[Vector[PaymentId]] = {
val payments = (1 to numPayments)
.map(MilliSatoshis(_))
.map(sats =>
val range = 1.to(numPayments).toVector
val amounts = range.map(MilliSatoshis(_))
val resultF =
Future.traverse(amounts)(sats =>
c1.createInvoice(s"this is a note for $sats")
.flatMap(invoice => c2.payInvoice(invoice, sats)))
val resultF = Future.sequence(payments).map(_.toVector)
resultF.onComplete {
case Success(_) =>
case Failure(_) =>
@ -832,7 +830,7 @@ trait EclairRpcTestUtil extends BitcoinSLogger {
def shutdown()(implicit ec: ExecutionContext): Future[Unit] =
for {
_ <- Future.sequence(networkEclairNodes.map(_.stop()))
_ <- Future.traverse(networkEclairNodes)(_.stop())
_ <- testEclairNode.stop()
_ <- bitcoind.stop()
} yield ()
@ -882,13 +880,10 @@ trait EclairRpcTestUtil extends BitcoinSLogger {
)
)
)
_ <- Future.sequence(networkEclairNodes.map(_.start()))
_ <- Future.sequence(
networkEclairNodes.map(awaitEclairInSync(_, bitcoind))
)
_ <- Future.sequence(
networkEclairNodes.map(connectLNNodes(_, testEclairNode))
)
_ <- Future.traverse(networkEclairNodes)(_.start())
_ <- Future.traverse(networkEclairNodes)(awaitEclairInSync(_, bitcoind))
_ <- Future.traverse(networkEclairNodes)(
connectLNNodes(_, testEclairNode))
channelIds <- networkEclairNodes.foldLeft(
Future.successful(Vector.empty[FundedChannelId])
) { (accF, node) =>

View File

@ -488,10 +488,10 @@ trait BitcoindRpcTestUtil extends BitcoinSLogger {
hashes <- clients.head.generateToAddress(blocks, address)
_ <- {
val pairs = ListUtil.uniquePairs(clients)
val syncFuts = pairs.map { case (first, second) =>
val syncFuts = Future.traverse(pairs) { case (first, second) =>
awaitSynced(first, second)
}
Future.sequence(syncFuts)
syncFuts
}
} yield hashes
}
@ -635,10 +635,10 @@ trait BitcoindRpcTestUtil extends BitcoinSLogger {
pairs: Vector[(BitcoindRpcClient, BitcoindRpcClient)]
)(implicit system: ActorSystem): Future[Unit] = {
import system.dispatcher
val futures = pairs.map { case (first, second) =>
val futures = Future.traverse(pairs) { case (first, second) =>
BitcoindRpcTestUtil.awaitSynced(first, second)
}
Future.sequence(futures).map(_ => ())
futures.map(_ => ())
}
/** Connects and waits non-blockingly until all the provided pairs of clients
@ -649,18 +649,18 @@ trait BitcoindRpcTestUtil extends BitcoinSLogger {
)(implicit system: ActorSystem): Future[Unit] = {
import system.dispatcher
val addNodesF: Future[Vector[Unit]] = {
val addedF = pairs.map { case (first, second) =>
val addedF = Future.traverse(pairs) { case (first, second) =>
first.addNode(second.getDaemon.uri, AddNodeArgument.Add)
}
Future.sequence(addedF)
addedF
}
val connectedPairsF = addNodesF.flatMap { _ =>
val futures = pairs.map { case (first, second) =>
val futures = Future.traverse(pairs) { case (first, second) =>
BitcoindRpcTestUtil
.awaitConnection(first, second, interval = 1.second)
}
Future.sequence(futures)
futures
}
connectedPairsF.map(_ => ())
@ -1025,7 +1025,7 @@ trait BitcoindRpcTestUtil extends BitcoinSLogger {
def deleteNodePair(client1: BitcoindRpcClient, client2: BitcoindRpcClient)(
implicit executionContext: ExecutionContext
): Future[Unit] = {
val stopsF = List(client1, client2).map { client =>
val stopsF = Future.traverse(List(client1, client2)) { client =>
implicit val sys = client.system
for {
_ <- client.stop()
@ -1033,7 +1033,7 @@ trait BitcoindRpcTestUtil extends BitcoinSLogger {
_ <- removeDataDirectory(client)
} yield ()
}
Future.sequence(stopsF).map(_ => ())
stopsF.map(_ => ())
}
/** Checks whether the provided client has seen the given block hash