Use filters for bitcoind backend syncing if available (#2926)

* Use filters for bitcoind backend syncing if available

* Start bitcoind polling after sync is done

* Fix compile issue and test
This commit is contained in:
benthecarman 2021-04-20 17:28:23 -05:00 committed by GitHub
parent de5f7fc7f9
commit 105942efa2
4 changed files with 97 additions and 60 deletions

View file

@ -156,16 +156,17 @@ class BitcoinSServerMain(override val args: Array[String])
if err.getMessage.contains("If we have spent a spendinginfodb") =>
handleMissingSpendingInfoDb(err, wallet)
}
_ <- BitcoindRpcBackendUtil.syncWalletToBitcoind(bitcoind, wallet)
_ = BitcoindRpcBackendUtil
.syncWalletToBitcoind(bitcoind, wallet)
.flatMap { _ =>
if (bitcoindRpcConf.zmqConfig == ZmqConfig.empty) {
BitcoindRpcBackendUtil.startBitcoindBlockPolling(wallet, bitcoind)
} else Future.unit
}
blockCount <- bitcoind.getBlockCount
// Create callbacks for processing new blocks
_ =
if (bitcoindRpcConf.zmqConfig == ZmqConfig.empty) {
BitcoindRpcBackendUtil.startBitcoindBlockPolling(wallet,
bitcoind,
blockCount)
} else {
if (bitcoindRpcConf.zmqConfig != ZmqConfig.empty) {
BitcoindRpcBackendUtil.startZMQWalletCallbacks(wallet)
}

View file

@ -5,10 +5,12 @@ import akka.actor.{ActorSystem, Cancellable}
import akka.stream.scaladsl.{Keep, Sink, Source}
import grizzled.slf4j.Logging
import org.bitcoins.core.api.node.NodeApi
import org.bitcoins.core.gcs.FilterType
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.crypto.DoubleSha256Digest
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.rpc.client.v19.V19BlockFilterRpc
import org.bitcoins.rpc.config.ZmqConfig
import org.bitcoins.wallet.Wallet
import org.bitcoins.zmq.ZMQSubscriber
@ -67,9 +69,16 @@ object BitcoindRpcBackendUtil extends Logging {
s"wallet height: $walletHeight, bitcoind height: $bitcoindHeight"))
} else {
import system.dispatcher
val hasFiltersF = bitcoind
.getFilter(wallet.walletConfig.chain.genesisHashBE)
.map(_ => true)
.recover { case _: Throwable => false }
val blockRange = walletHeight.to(bitcoindHeight).tail
val numParallelism = Runtime.getRuntime.availableProcessors()
logger.info(s"Syncing ${blockRange.size} blocks")
logger.info(s"Fetching block hashes")
val hashFs = Source(blockRange)
.mapAsync(numParallelism) {
bitcoind
@ -79,8 +88,13 @@ object BitcoindRpcBackendUtil extends Logging {
.toMat(Sink.seq)(Keep.right)
.run()
for {
hashes <- hashFs
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
hashes <- hashFs.map(_.toVector)
hasFilters <- hasFiltersF
_ <- {
if (hasFilters)
filterSync(hashes, bitcoind.asInstanceOf[V19BlockFilterRpc], wallet)
else wallet.nodeApi.downloadBlocks(hashes)
}
} yield wallet
}
}
@ -147,6 +161,26 @@ object BitcoindRpcBackendUtil extends Logging {
}
}
private def filterSync(
blockHashes: Vector[DoubleSha256Digest],
bitcoindRpcClient: V19BlockFilterRpc,
wallet: Wallet)(implicit system: ActorSystem): Future[Unit] = {
import system.dispatcher
val numParallelism = Runtime.getRuntime.availableProcessors()
val runStream: Future[Done] = Source(blockHashes)
.mapAsync(parallelism = numParallelism) { hash =>
bitcoindRpcClient.getBlockFilter(hash.flip, FilterType.Basic).map {
res => (hash, res.filter)
}
}
.batch(1000, filter => Vector(filter))(_ :+ _)
.foldAsync(wallet) { case (wallet, filterRes) =>
wallet.processCompactFilters(filterRes)
}
.run()
runStream.map(_ => ())
}
private def getNodeApiWalletCallback(
bitcoindRpcClient: BitcoindRpcClient,
walletF: Future[Wallet])(implicit system: ActorSystem): NodeApi = {
@ -189,53 +223,55 @@ object BitcoindRpcBackendUtil extends Logging {
def startBitcoindBlockPolling(
wallet: Wallet,
bitcoind: BitcoindRpcClient,
startCount: Int,
interval: FiniteDuration = 10.seconds)(implicit
system: ActorSystem,
ec: ExecutionContext): Cancellable = {
val numParallelism = Runtime.getRuntime.availableProcessors()
val atomicPrevCount: AtomicInteger = new AtomicInteger(startCount)
system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () =>
{
logger.debug("Polling bitcoind for block count")
bitcoind.getBlockCount.flatMap { count =>
val prevCount = atomicPrevCount.get()
if (prevCount < count) {
logger.debug("Bitcoind has new block(s), requesting...")
ec: ExecutionContext): Future[Cancellable] = {
bitcoind.getBlockCount.map { startCount =>
val numParallelism = Runtime.getRuntime.availableProcessors()
val atomicPrevCount: AtomicInteger = new AtomicInteger(startCount)
system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () =>
{
logger.debug("Polling bitcoind for block count")
bitcoind.getBlockCount.flatMap { count =>
val prevCount = atomicPrevCount.get()
if (prevCount < count) {
logger.debug("Bitcoind has new block(s), requesting...")
// use .tail so we don't process the previous block that we already did
val range = prevCount.to(count).tail
val hashFs: Future[Seq[DoubleSha256Digest]] = Source(range)
.mapAsync(parallelism = numParallelism) { height =>
bitcoind.getBlockHash(height).map(_.flip)
// use .tail so we don't process the previous block that we already did
val range = prevCount.to(count).tail
val hashFs: Future[Seq[DoubleSha256Digest]] = Source(range)
.mapAsync(parallelism = numParallelism) { height =>
bitcoind.getBlockHash(height).map(_.flip)
}
.map { hash =>
val _ = atomicPrevCount.incrementAndGet()
hash
}
.toMat(Sink.seq)(Keep.right)
.run()
val requestsBlocksF = for {
hashes <- hashFs
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
} yield logger.debug(
"Successfully polled bitcoind for new blocks")
requestsBlocksF.onComplete {
case Success(_) => ()
case Failure(err) =>
atomicPrevCount.set(prevCount)
logger.error("Requesting blocks from bitcoind polling failed",
err)
}
.map { hash =>
val _ = atomicPrevCount.incrementAndGet()
hash
}
.toMat(Sink.seq)(Keep.right)
.run()
val requestsBlocksF = for {
hashes <- hashFs
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
} yield logger.debug("Successfully polled bitcoind for new blocks")
requestsBlocksF.onComplete {
case Success(_) => ()
case Failure(err) =>
atomicPrevCount.set(prevCount)
logger.error("Requesting blocks from bitcoind polling failed",
err)
}
requestsBlocksF
} else if (prevCount > count) {
Future.failed(new RuntimeException(
s"Bitcoind is at a block height ($count) before the wallet's ($prevCount)"))
} else Future.unit
requestsBlocksF
} else if (prevCount > count) {
Future.failed(new RuntimeException(
s"Bitcoind is at a block height ($count) before the wallet's ($prevCount)"))
} else Future.unit
}
()
}
()
}
}
}

View file

@ -41,8 +41,6 @@ class BitcoindBlockPollingTest
val amountToSend = Bitcoins.one
for {
blockCount <- bitcoind.getBlockCount
// Setup wallet
tmpWallet <-
BitcoinSWalletTest.createDefaultWallet(bitcoind, bitcoind, None)
@ -56,7 +54,6 @@ class BitcoindBlockPollingTest
// Send to wallet
addr <- wallet.getNewAddress()
_ <- bitcoind.sendToAddress(addr, amountToSend)
_ <- bitcoind.generateToAddress(6, addr)
// assert wallet hasn't seen it yet
firstBalance <- wallet.getBalance()
@ -65,16 +62,17 @@ class BitcoindBlockPollingTest
// Setup block polling
_ = BitcoindRpcBackendUtil.startBitcoindBlockPolling(wallet,
bitcoind,
blockCount,
1.second)
_ <- bitcoind.generateToAddress(6, addr)
// Wait for it to process
_ <- AsyncUtil.awaitConditionF(() =>
wallet.getBalance().map(_ > Satoshis.zero))
_ <- AsyncUtil.awaitConditionF(
() => wallet.getBalance().map(_ > Satoshis.zero),
1.second)
balance <- wallet.getBalance()
_ = assert(balance == amountToSend)
//clean up
_ <- wallet.stop()
} yield succeed
} yield assert(balance == amountToSend)
}
}

View file

@ -345,8 +345,10 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
Future.failed(new RuntimeException(
s"Attempting to spend an ImmatureCoinbase ${out.outPoint.hex}, this should not be possible until it is confirmed."))
case TxoState.ConfirmedSpent =>
Future.failed(new RuntimeException(
s"Attempted to mark an already spent utxo ${out.outPoint.hex} with a new spending tx ${spendingTxId.hex}"))
if (!out.spendingTxIdOpt.contains(spendingTxId)) {
Future.failed(new RuntimeException(
s"Attempted to mark an already spent utxo ${out.outPoint.hex} with a new spending tx ${spendingTxId.hex}"))
} else Future.successful(Some(out))
case TxoState.DoesNotExist =>
Future.failed(new RuntimeException(
s"Attempted to process a transaction for a utxo that does not exist ${out.outPoint.hex} with a new spending tx ${spendingTxId.hex}"))