2022 07 21 Modify helper methods in BitcoindRpcBackendUtil
to not materialize streams eagerly. (#4526)
* Refactor syncWalletToBitcoind impl to be completely stream based * Add getParallelism, refactor usages in BitcoindRpcBackendUtil to use it * Fix compile * Replace usages of 'WalletApi with NeutrinoWalletApi' with NeutrinoHDWalletApi * Revert wallet sync height logic * Refactor to make things cleaner
This commit is contained in:
@ -25,7 +25,8 @@ import org.bitcoins.core.api.node.{
import org.bitcoins.core.api.wallet.{NeutrinoWalletApi, WalletApi}
import org.bitcoins.core.api.wallet.NeutrinoHDWalletApi
import org.bitcoins.core.util.TimeUtil
import org.bitcoins.core.wallet.rescan.RescanState
import org.bitcoins.dlc.node.DLCNode
@ -525,7 +526,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
private def syncWalletWithBitcoindAndStartPolling(
bitcoind: BitcoindRpcClient,
wallet: WalletApi with NeutrinoWalletApi,
wallet: NeutrinoHDWalletApi,
chainCallbacksOpt: Option[ChainCallbacks]): Future[
BitcoindPollingCancellabe] = {
val f = for {
@ -1,12 +1,16 @@
package org.bitcoins.server
import akka.Done
import akka.{Done, NotUsed}
import akka.actor.{ActorSystem, Cancellable}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source}
import grizzled.slf4j.Logging
import org.bitcoins.chain.ChainCallbacks
import org.bitcoins.core.api.node.NodeApi
import org.bitcoins.core.api.wallet.{NeutrinoWalletApi, WalletApi}
import org.bitcoins.core.api.wallet.{
import org.bitcoins.core.gcs.FilterType
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.Transaction
@ -29,51 +33,75 @@ object BitcoindRpcBackendUtil extends Logging {
/** Has the wallet process all the blocks it has not seen up until bitcoind's chain tip */
def syncWalletToBitcoind(
bitcoind: BitcoindRpcClient,
wallet: WalletApi with NeutrinoWalletApi,
wallet: NeutrinoHDWalletApi,
chainCallbacksOpt: Option[ChainCallbacks])(implicit
system: ActorSystem): Future[Unit] = {
logger.info("Syncing wallet to bitcoind")
import system.dispatcher
val res = for {
val streamF: Future[RunnableGraph[Future[NeutrinoHDWalletApi]]] = for {
_ <- setSyncingFlag(true, bitcoind, chainCallbacksOpt)
bitcoindHeight <- bitcoind.getBlockCount
walletStateOpt <- wallet.getSyncDescriptorOpt()
walletBirthdayHeight = 0 // need to come back to this likely
_ = logger.info(
s"bitcoindHeight=$bitcoindHeight walletStateOpt=$walletStateOpt")
_ <- walletStateOpt match {
case None =>
for {
txDbs <- wallet.listTransactions()
lastConfirmedOpt = txDbs.filter(_.blockHashOpt.isDefined).lastOption
_ <- lastConfirmedOpt match {
case None =>
for {
_ <- doSync(walletHeight = bitcoindHeight - 1,
bitcoindHeight = bitcoindHeight,
bitcoind = bitcoind,
wallet = wallet)
} yield ()
case Some(txDb) =>
for {
heightOpt <- bitcoind.getBlockHeight(txDb.blockHashOpt.get)
_ <- heightOpt match {
case Some(height) =>
s"Last tx occurred at block $height, syncing from there")
doSync(height, bitcoindHeight, bitcoind, wallet)
case None => Future.unit
} yield ()
} yield ()
case Some(syncHeight) =>
doSync(syncHeight.height, bitcoindHeight, bitcoind, wallet)
s"Syncing from bitcoind with bitcoindHeight=$bitcoindHeight walletHeight=${walletStateOpt
heightRange <- {
walletStateOpt match {
case None =>
getHeightRangeNoWalletState(wallet, bitcoind, bitcoindHeight)
case Some(walletState) =>
val range = walletState.height.to(bitcoindHeight).tail
} yield ()
syncFlow <- buildBitcoindSyncSink(bitcoind, wallet)
stream = Source(heightRange).toMat(syncFlow)(Keep.right)
} yield stream
//run the stream
val res = streamF.flatMap(_.run())
res.onComplete { case _ =>
setSyncingFlag(false, bitcoind, chainCallbacksOpt)
res.map(_ => ())
/** Gets the height range for syncing against bitcoind when we don't have a [[org.bitcoins.core.api.wallet.WalletStateDescriptor]]
* to read the sync height from.
private def getHeightRangeNoWalletState(
wallet: NeutrinoHDWalletApi,
bitcoind: BitcoindRpcClient,
bitcoindHeight: Int)(implicit
ex: ExecutionContext): Future[Range.Inclusive] = {
for {
txDbs <- wallet.listTransactions()
lastConfirmedOpt = txDbs
range <- lastConfirmedOpt match {
case None =>
val range = (bitcoindHeight - 1).to(bitcoindHeight)
case Some(txDb) =>
for {
heightOpt <- bitcoind.getBlockHeight(txDb.blockHashOpt.get)
range <- heightOpt match {
case Some(height) =>
s"Last tx occurred at block $height, syncing from there")
val range = height.to(bitcoindHeight)
case None =>
val range = (bitcoindHeight - 1).to(bitcoindHeight)
} yield range
} yield range
private def setSyncingFlag(
@ -87,54 +115,52 @@ object BitcoindRpcBackendUtil extends Logging {
/** Helper method to sync the wallet until the bitcoind height */
private def doSync(
walletHeight: Int,
bitcoindHeight: Int,
/** Helper method to sync the wallet until the bitcoind height.
* This method returns a Sink that you can give block heights too and
* the sink will synchronize our bitcoin-s wallet against bitcoind
private def buildBitcoindSyncSink(
bitcoind: BitcoindRpcClient,
wallet: WalletApi with NeutrinoWalletApi)(implicit
system: ActorSystem): Future[WalletApi with NeutrinoWalletApi] = {
if (walletHeight > bitcoindHeight) {
val msg = s"Bitcoind and wallet are in incompatible states, " +
s"wallet height: $walletHeight, bitcoind height: $bitcoindHeight"
Future.failed(new RuntimeException(msg))
} else {
logger.info(s"Syncing from $walletHeight to $bitcoindHeight")
wallet: NeutrinoHDWalletApi)(implicit
system: ActorSystem): Future[Sink[Int, Future[NeutrinoHDWalletApi]]] = {
import system.dispatcher
import system.dispatcher
val hasFiltersF = bitcoind
.flatMap(hash => bitcoind.getFilter(hash))
.map(_ => true)
.recover { case _: Throwable => false }
val genesisHashBEF = bitcoind.getBlockHash(0)
val hasFiltersF: Future[Boolean] = for {
genesisHash <- genesisHashBEF
bool <- bitcoind
.map(_ => true)
.recover { case _: Throwable => false }
} yield bool
val numParallelism = getParallelism
//feeding blockchain hashes into this sync
//will sync our wallet with those blockchain hashes
val syncWalletSinkF: Future[
Sink[DoubleSha256Digest, Future[NeutrinoHDWalletApi]]] = {
val blockRange = walletHeight.to(bitcoindHeight).tail
val numParallelism = Runtime.getRuntime.availableProcessors()
logger.info(s"Syncing ${blockRange.size} blocks")
logger.info(s"Fetching block hashes")
val hashFs = Source(blockRange)
.mapAsync(numParallelism) {
for {
hashes <- hashFs.map(_.toVector)
hasFilters <- hasFiltersF
_ <- {
if (hasFilters) {
filterSync(hashes, bitcoind.asInstanceOf[V19BlockFilterRpc], wallet)
} else wallet.nodeApi.downloadBlocks(hashes)
} yield {
if (hasFilters) {
filterSyncSink(bitcoind.asInstanceOf[V19BlockFilterRpc], wallet)
} else {
.batch(100, hash => Vector(hash))(_ :+ _)
.mapAsync(1)(wallet.nodeApi.downloadBlocks(_).map(_ => wallet))
} yield wallet
val fetchBlockHashesFlow: Flow[Int, DoubleSha256Digest, NotUsed] = Flow[Int]
.mapAsync[DoubleSha256Digest](numParallelism) { case height =>
for {
syncWalletSink <- syncWalletSinkF
} yield fetchBlockHashesFlow.toMat(syncWalletSink)(Keep.right)
def createWalletWithBitcoindCallbacks(
@ -226,33 +252,28 @@ object BitcoindRpcBackendUtil extends Logging {
private def filterSync(
blockHashes: Vector[DoubleSha256Digest],
private def filterSyncSink(
bitcoindRpcClient: V19BlockFilterRpc,
wallet: WalletApi with NeutrinoWalletApi)(implicit
system: ActorSystem): Future[Unit] = {
wallet: NeutrinoHDWalletApi)(implicit system: ActorSystem): Sink[
Future[NeutrinoHDWalletApi]] = {
import system.dispatcher
logger.info("Starting filter sync")
val start = System.currentTimeMillis()
val numParallelism = Runtime.getRuntime.availableProcessors()
val runStream: Future[Done] = Source(blockHashes)
.mapAsync(parallelism = numParallelism) { hash =>
bitcoindRpcClient.getBlockFilter(hash.flip, FilterType.Basic).map {
res => (hash, res.filter)
val numParallelism = getParallelism
val sink: Sink[DoubleSha256Digest, Future[NeutrinoHDWalletApi]] =
.mapAsync(parallelism = numParallelism) { hash =>
bitcoindRpcClient.getBlockFilter(hash.flip, FilterType.Basic).map {
res => (hash, res.filter)
.batch(1000, filter => Vector(filter))(_ :+ _)
.foldAsync(wallet) { case (wallet, filterRes) =>
runStream.map { _ =>
logger.info(s"Synced ${blockHashes.size} filters, it took ${System
.currentTimeMillis() - start}ms")
logger.info("We are synced!")
.batch(1000, filter => Vector(filter))(_ :+ _)
.foldAsync(wallet) { case (wallet, filterRes) =>
/** Creates an anonymous [[NodeApi]] that downloads blocks using
@ -269,7 +290,7 @@ object BitcoindRpcBackendUtil extends Logging {
override def downloadBlocks(
blockHashes: Vector[DoubleSha256Digest]): Future[Unit] = {
logger.info(s"Fetching ${blockHashes.length} blocks from bitcoind")
val numParallelism = Runtime.getRuntime.availableProcessors()
val numParallelism = getParallelism
.flatMap { wallet =>
val runStream: Future[Done] = Source(blockHashes)
@ -334,7 +355,7 @@ object BitcoindRpcBackendUtil extends Logging {
for {
walletSyncState <- wallet.getSyncState()
} yield {
val numParallelism = Runtime.getRuntime.availableProcessors()
val numParallelism = getParallelism
val atomicPrevCount: AtomicInteger = new AtomicInteger(
val processingBitcoindBlocks = new AtomicBoolean(false)
@ -504,4 +525,16 @@ object BitcoindRpcBackendUtil extends Logging {
/** Helper method to retrieve paralleism for streams
* This is needed on machines with any cores which can trigger
* open request exceptions with akka default limit of 32 open requests at a time
* So now we set the maximum parallelism to 8
private def getParallelism: Int = {
//max open requests is 32 in akka, so 1/8 of possible requests
//can be used to query the mempool, else just limit it be number of processors
//see: https://github.com/bitcoin-s/bitcoin-s/issues/4252
Math.min(Runtime.getRuntime.availableProcessors(), 8).toInt
@ -11,12 +11,12 @@ trait NeutrinoWalletApi { self: WalletApi =>
def processCompactFilter(
blockHash: DoubleSha256Digest,
blockFilter: GolombFilter): Future[WalletApi with NeutrinoWalletApi] =
blockFilter: GolombFilter): Future[NeutrinoHDWalletApi] =
processCompactFilters(Vector((blockHash, blockFilter)))
def processCompactFilters(
blockFilters: Vector[(DoubleSha256Digest, GolombFilter)]): Future[
WalletApi with NeutrinoWalletApi]
/** Recreates the account using BIP-157 approach
@ -114,8 +114,9 @@ class WalletHolder(implicit ec: ExecutionContext)
override def processCompactFilters(
blockFilters: Vector[(DoubleSha256Digest, GolombFilter)]): Future[
WalletApi with NeutrinoWalletApi] = delegate(
NeutrinoHDWalletApi] = {
override def rescanNeutrinoWallet(
startOpt: Option[BlockStamp],
@ -709,7 +710,7 @@ class WalletHolder(implicit ec: ExecutionContext)
override def processCompactFilter(
blockHash: DoubleSha256Digest,
blockFilter: GolombFilter): Future[WalletApi with NeutrinoWalletApi] =
blockFilter: GolombFilter): Future[NeutrinoHDWalletApi] =
delegate(_.processCompactFilter(blockHash, blockFilter))
override def fullRescanNeutrinoWallet(addressBatchSize: Int, force: Boolean)(
