Mempool support for the bitcoind RPC client (#4196)

* Mempool support for the bitcoind RPC client

* fix unit tests

* fix race conditions

* fix compile error

* add more logging
This commit is contained in:
rorp 2022-03-22 13:40:27 -07:00 committed by GitHub
parent 413dbcacbb
commit 0770fe0550
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 155 additions and 44 deletions

View file

@ -43,7 +43,6 @@ import org.bitcoins.server.util.{
WebsocketUtil,
WsServerConfig
}
import org.bitcoins.tor.config.TorAppConfig
import org.bitcoins.wallet._
import org.bitcoins.wallet.config.WalletAppConfig
@ -461,7 +460,14 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
if (bitcoindRpcConf.zmqConfig == ZmqConfig.empty) {
BitcoindRpcBackendUtil
.startBitcoindBlockPolling(wallet, bitcoind)
.map(_ => ())
.map { _ =>
BitcoindRpcBackendUtil
.startBitcoindMempoolPolling(bitcoind) { tx =>
nodeConf.nodeCallbacks
.executeOnTxReceivedCallbacks(logger, tx)
}
()
}
} else {
Future {
BitcoindRpcBackendUtil.startZMQWalletCallbacks(

View file

@ -4,14 +4,14 @@ import akka.Done
import akka.actor.{ActorSystem, Cancellable}
import akka.stream.scaladsl.{Keep, Sink, Source}
import grizzled.slf4j.Logging
import org.bitcoins.chain.{ChainCallbacks}
import org.bitcoins.chain.ChainCallbacks
import org.bitcoins.core.api.node.NodeApi
import org.bitcoins.core.api.wallet.WalletApi
import org.bitcoins.core.gcs.FilterType
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.crypto.DoubleSha256Digest
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.crypto.{DoubleSha256Digest, DoubleSha256DigestBE}
import org.bitcoins.dlc.wallet.DLCWallet
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.rpc.client.v19.V19BlockFilterRpc
@ -19,7 +19,7 @@ import org.bitcoins.rpc.config.ZmqConfig
import org.bitcoins.wallet.Wallet
import org.bitcoins.zmq.ZMQSubscriber
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future, Promise}
@ -308,56 +308,112 @@ object BitcoindRpcBackendUtil extends Logging {
val numParallelism = Runtime.getRuntime.availableProcessors()
val atomicPrevCount: AtomicInteger = new AtomicInteger(
walletSyncState.height)
val processing = new AtomicBoolean(false)
system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () =>
{
logger.trace("Polling bitcoind for block count")
bitcoind.getBlockCount.flatMap { count =>
val prevCount = atomicPrevCount.get()
if (prevCount < count) {
logger.info(
s"Bitcoind has new block(s), requesting... ${count - prevCount} blocks")
if (processing.compareAndSet(false, true)) {
logger.trace("Polling bitcoind for block count")
val res = bitcoind.getBlockCount.flatMap { count =>
val prevCount = atomicPrevCount.get()
if (prevCount < count) {
logger.info(
s"Bitcoind has new block(s), requesting... ${count - prevCount} blocks")
// use .tail so we don't process the previous block that we already did
val range = prevCount.to(count).tail
val hashFs: Future[Seq[DoubleSha256Digest]] = Source(range)
.mapAsync(parallelism = numParallelism) { height =>
bitcoind.getBlockHash(height).map(_.flip)
// use .tail so we don't process the previous block that we already did
val range = prevCount.to(count).tail
val hashFs: Future[Seq[DoubleSha256Digest]] = Source(range)
.mapAsync(parallelism = numParallelism) { height =>
bitcoind.getBlockHash(height).map(_.flip)
}
.map { hash =>
val _ = atomicPrevCount.incrementAndGet()
hash
}
.toMat(Sink.seq)(Keep.right)
.run()
val requestsBlocksF = for {
hashes <- hashFs
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
} yield logger.debug(
"Successfully polled bitcoind for new blocks")
requestsBlocksF.failed.foreach { case err =>
val failedCount = atomicPrevCount.get
atomicPrevCount.set(prevCount)
logger.error(
s"Requesting blocks from bitcoind polling failed, range=[$prevCount, $failedCount]",
err)
}
.map { hash =>
val _ = atomicPrevCount.incrementAndGet()
hash
}
.toMat(Sink.seq)(Keep.right)
.run()
val requestsBlocksF = for {
hashes <- hashFs
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
} yield logger.debug(
"Successfully polled bitcoind for new blocks")
requestsBlocksF.failed.foreach { case err =>
val failedCount = atomicPrevCount.get
atomicPrevCount.set(prevCount)
logger.error(
s"Requesting blocks from bitcoind polling failed, range=[$prevCount, $failedCount]",
err)
requestsBlocksF
} else if (prevCount > count) {
Future.failed(new RuntimeException(
s"Bitcoind is at a block height ($count) before the wallet's ($prevCount)"))
} else {
logger.debug(s"In sync $prevCount count=$count")
Future.unit
}
requestsBlocksF
} else if (prevCount > count) {
Future.failed(new RuntimeException(
s"Bitcoind is at a block height ($count) before the wallet's ($prevCount)"))
} else {
logger.debug(s"In sync $prevCount count=$count")
Future.unit
}
res.onComplete(_ => processing.set(false))
} else {
logger.info(
s"Skipping scanning the blockchain since a previously scheduled task is still running")
}
()
}
}
}
resultF
}
def startBitcoindMempoolPolling(
bitcoind: BitcoindRpcClient,
interval: FiniteDuration = 10.seconds)(
processTx: Transaction => Future[Unit])(implicit
system: ActorSystem,
ec: ExecutionContext): Cancellable = {
@volatile var prevMempool: Set[DoubleSha256DigestBE] =
Set.empty[DoubleSha256DigestBE]
def getDiffAndReplace(
newMempool: Set[DoubleSha256DigestBE]): Set[DoubleSha256DigestBE] =
synchronized {
val txids = newMempool.diff(prevMempool)
prevMempool = newMempool
txids
}
val processing = new AtomicBoolean(false)
system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () =>
{
if (processing.compareAndSet(false, true)) {
logger.debug("Polling bitcoind for mempool")
val res = for {
mempool <- bitcoind.getRawMemPool
newTxIds = getDiffAndReplace(mempool.toSet)
_ = logger.debug(s"Found ${newTxIds.size} new mempool transactions")
newTxsOpt <- FutureUtil.sequentially(newTxIds)(
bitcoind.getRawTransactionRaw(_).map(Option(_)).recover {
case _: Throwable => None
})
newTxs = newTxsOpt.collect { case Some(tx) =>
tx
}
_ <- FutureUtil.sequentially(newTxs)(processTx)
} yield {
logger.debug(
s"Done processing ${newTxIds.size} new mempool transactions")
()
}
res.onComplete(_ => processing.set(false))
} else {
logger.info(
s"Skipping scanning the mempool since a previously scheduled task is still running")
}
}
}
}
}

View file

@ -2,6 +2,8 @@ package org.bitcoins.wallet
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.core.currency._
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.server.BitcoindRpcBackendUtil
import org.bitcoins.testkit.wallet.{
BitcoinSWalletTest,
@ -9,6 +11,7 @@ import org.bitcoins.testkit.wallet.{
}
import org.bitcoins.testkitcore.util.TestUtil.bech32Address
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.DurationInt
class BitcoindBlockPollingTest
@ -57,4 +60,50 @@ class BitcoindBlockPollingTest
_ <- wallet.walletConfig.stop()
} yield assert(balance == amountToSend)
}
it must "properly setup and poll mempool transactions from bitcoind" in {
walletAppConfigWithBitcoind =>
val bitcoind = walletAppConfigWithBitcoind.bitcoind
implicit val walletAppConfig = walletAppConfigWithBitcoind.walletAppConfig
val amountToSend = Bitcoins.one
val mempoolTxs = ArrayBuffer.empty[Transaction]
for {
// Setup wallet
tmpWallet <-
BitcoinSWalletTest.createDefaultWallet(bitcoind, bitcoind, None)
wallet =
BitcoindRpcBackendUtil.createWalletWithBitcoindCallbacks(bitcoind,
tmpWallet,
None)
// populate initial mempool
addr <- wallet.getNewAddress()
txid1 <- bitcoind.sendToAddress(addr, amountToSend)
// Setup block polling
_ = BitcoindRpcBackendUtil.startBitcoindMempoolPolling(bitcoind,
1.second) { tx =>
mempoolTxs += tx
FutureUtil.unit
}
// Send to wallet
addr <- wallet.getNewAddress()
txid2 <- bitcoind.sendToAddress(addr, amountToSend)
// Wait for it to process
_ <- AsyncUtil.awaitCondition(
() => {
mempoolTxs.exists(_.txIdBE == txid1) && mempoolTxs.exists(
_.txIdBE == txid2)
},
1.second)
//clean up
_ <- wallet.walletConfig.stop()
} yield succeed
}
}