Move pollBitcoind out of startBitcoindBlockPolling (#4559)

* Move pollBitcoind out of startBitcoindBlockPolling

* Rework startBitcoindBlockPolling to not return Future[Cancellable]

* Stop NodeCallbacksStreamManager inside of NodeAppConfig.stop()

* Make BitcoindSyncState which encapsulates syncing with bitcoind and polling of mempool

* Fix bug where processingBitcoindBlocks cannot be top level val as that method may be used by different parts of the codebase

* Rename isPolling -> processingBitcoindBlocks

* Cleanup more resources in appServerTest/test

* Complete polling stream regardless if we synced blocks are not to complete the Future returned by pollBitcoind

* Empty commit to run CI
This commit is contained in:
Chris Stewart 2022-08-02 06:40:17 -05:00 committed by GitHub
parent 8b646802b3
commit 191df09196
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 193 additions and 147 deletions

View File

@ -1,9 +1,12 @@
package org.bitcoins.server
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.cli.{CliCommand, Config, ConsoleCli}
import org.bitcoins.commons.util.ServerArgParser
import org.bitcoins.testkit.fixtures.BitcoinSAppConfigBitcoinFixtureNotStarted
import scala.concurrent.duration.DurationInt
/** Test starting bitcoin-s with bitcoind as the backend for app */
class BitcoinSServerMainBitcoindTest
extends BitcoinSAppConfigBitcoinFixtureNotStarted {
@ -26,6 +29,8 @@ class BitcoinSServerMainBitcoindTest
addr = ConsoleCli.exec(CliCommand.GetNewAddress(labelOpt = None),
cliConfig)
blockHash = ConsoleCli.exec(CliCommand.GetBestBlockHash, cliConfig)
_ <- AsyncUtil.nonBlockingSleep(1.second)
_ <- server.stop() //stop to free all resources
} yield {
assert(info.isSuccess)
assert(balance.isSuccess)
@ -45,6 +50,8 @@ class BitcoinSServerMainBitcoindTest
val failF = for {
_ <- server.start()
infoT = ConsoleCli.exec(CliCommand.WalletInfo, cliConfig)
_ <- AsyncUtil.nonBlockingSleep(1.second)
_ <- server.stop()
} yield {
assert(infoT.isFailure)
assert(

View File

@ -1,10 +1,13 @@
package org.bitcoins.server
import org.bitcoins.asyncutil.AsyncUtil
import org.bitcoins.cli.{CliCommand, Config, ConsoleCli}
import org.bitcoins.commons.util.ServerArgParser
import org.bitcoins.testkit.fixtures.BitcoinSAppConfigBitcoinFixtureNotStarted
import org.bitcoins.testkit.tor.CachedTor
import scala.concurrent.duration.DurationInt
/** Test starting bitcoin-s with bitcoind as the backend for app */
class BitcoinSServerMainBitcoindTorTest
extends BitcoinSAppConfigBitcoinFixtureNotStarted
@ -29,6 +32,8 @@ class BitcoinSServerMainBitcoindTorTest
addr = ConsoleCli.exec(CliCommand.GetNewAddress(labelOpt = None),
cliConfig)
blockHash = ConsoleCli.exec(CliCommand.GetBestBlockHash, cliConfig)
_ <- AsyncUtil.nonBlockingSleep(1.second)
_ <- server.stop() //stop to free all resources
} yield {
assert(info.isSuccess)
assert(balance.isSuccess)

View File

@ -26,7 +26,6 @@ import org.bitcoins.core.api.node.{
NodeType
}
import org.bitcoins.core.api.wallet.NeutrinoHDWalletApi
import org.bitcoins.core.util.TimeUtil
import org.bitcoins.core.wallet.rescan.RescanState
import org.bitcoins.dlc.node.DLCNode
@ -39,6 +38,7 @@ import org.bitcoins.node.models.NodeStateDescriptorDAO
import org.bitcoins.rpc.BitcoindException.InWarmUp
import org.bitcoins.rpc.client.common.BitcoindRpcClient
import org.bitcoins.rpc.config.{BitcoindRpcAppConfig, ZmqConfig}
import org.bitcoins.server.bitcoind.BitcoindSyncState
import org.bitcoins.server.routes.{BitcoinSServerRunner, CommonRoutes, Server}
import org.bitcoins.server.util._
import org.bitcoins.tor.config.TorAppConfig
@ -98,8 +98,14 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
override def stop(): Future[WalletHolder] = {
logger.error(s"Exiting process")
mempoolPollingCancellableOpt.foreach(_.cancel())
for {
_ <- {
bitcoindSyncStateOpt match {
case Some(bitcoindSyncState) =>
bitcoindSyncState.stop()
case None => Future.unit
}
}
_ <- conf.stop()
_ <- walletLoaderApiOpt match {
case Some(l) => l.stop()
@ -294,13 +300,11 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
} yield info
}
/** Variable to hold the cancellable to stop polling bitcoind for blocks/txs */
private var mempoolPollingCancellableOpt: Option[BitcoindPollingCancellabe] =
None
/** The wallet loader that is being used for our wallet. */
private[this] var walletLoaderApiOpt: Option[DLCWalletLoaderApi] = None
private[this] var bitcoindSyncStateOpt: Option[BitcoindSyncState] = None
/** Start the bitcoin-s wallet server with a bitcoind backend
* @param startedTorConfigF a future that is completed when tor is fully started
* @return
@ -390,10 +394,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
} yield dlcNode
}
//intentionally Future[Future[]], the inner Future[BitcoindPollingCancellable]
//is blocked on syncing the wallet from bitcoind. This can take hours
//so don't block the rest of the appllication on that
val pollingCancellableNestedF: Future[Future[BitcoindPollingCancellabe]] =
val bitcoindSyncStateF: Future[BitcoindSyncState] = {
for {
bitcoind <- bitcoindF
bitcoindNetwork <- getBlockChainInfo(bitcoind).map(_.chain)
@ -417,13 +418,13 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
(wallet, walletConfig, dlcConfig) <- walletF
_ = walletConfig.addCallbacks(walletCallbacks)
//intentionally doesn't map on this otherwise we
//wait until we are done syncing the entire wallet
//which could take 1 hour
mempoolPollingCancellable = syncWalletWithBitcoindAndStartPolling(
bitcoindSyncState <- syncWalletWithBitcoindAndStartPolling(
bitcoind,
wallet,
Some(chainCallbacks))
_ = {
bitcoindSyncStateOpt = Some(bitcoindSyncState)
}
dlcWalletCallbacks = WebsocketUtil.buildDLCWalletCallbacks(wsQueue)
_ = dlcConfig.addCallbacks(dlcWalletCallbacks)
_ <- startedTorConfigF
@ -431,16 +432,13 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
_ <- restartRescanIfNeeded(wallet)
} yield {
logger.info(s"Done starting Main!")
mempoolPollingCancellable
bitcoindSyncState
}
//set the polling cancellable after the wallet is done syncing against bitcoind
pollingCancellableNestedF.map(
_.map(c => mempoolPollingCancellableOpt = Some(c)))
}
//don't return the Future that represents the full syncing of the wallet with bitcoind
for {
_ <- pollingCancellableNestedF //drop nested Future here
_ <- bitcoindSyncStateF //drop nested Future here
walletHolder <- walletF.map(_._1)
} yield walletHolder
}
@ -541,8 +539,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
private def syncWalletWithBitcoindAndStartPolling(
bitcoind: BitcoindRpcClient,
wallet: NeutrinoHDWalletApi,
chainCallbacksOpt: Option[ChainCallbacks]): Future[
BitcoindPollingCancellabe] = {
chainCallbacksOpt: Option[ChainCallbacks]): Future[BitcoindSyncState] = {
val f = for {
_ <- AsyncUtil.retryUntilSatisfiedF(
conditionF = { () =>
@ -556,25 +553,27 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
interval = 1.second,
maxTries = 12
)
_ <- BitcoindRpcBackendUtil.syncWalletToBitcoind(
syncF = BitcoindRpcBackendUtil.syncWalletToBitcoind(
bitcoind,
wallet,
chainCallbacksOpt)(system)
_ <- wallet.updateUtxoPendingStates()
pollingCancellable <-
if (bitcoindRpcConf.zmqConfig == ZmqConfig.empty) {
BitcoindRpcBackendUtil
.startBitcoindBlockPolling(wallet, bitcoind, chainCallbacksOpt)
.map { blockingPollingCancellable =>
val mempoolCancellable = BitcoindRpcBackendUtil
.startBitcoindMempoolPolling(wallet, bitcoind) { tx =>
nodeConf.callBacks
.executeOnTxReceivedCallbacks(logger, tx)
}
_ = syncF.map(_ => wallet.updateUtxoPendingStates())
BitcoindPollingCancellabe(blockingPollingCancellable,
mempoolCancellable)
//don't start polling until initial sync is done
pollingCancellable <- syncF.flatMap { _ =>
if (bitcoindRpcConf.zmqConfig == ZmqConfig.empty) {
val blockingPollingCancellable = BitcoindRpcBackendUtil
.startBitcoindBlockPolling(wallet, bitcoind, chainCallbacksOpt)
val mempoolCancellable = BitcoindRpcBackendUtil
.startBitcoindMempoolPolling(wallet, bitcoind) { tx =>
nodeConf.callBacks
.executeOnTxReceivedCallbacks(logger, tx)
}
val combinedCancellable =
BitcoindPollingCancellabe(blockingPollingCancellable,
mempoolCancellable)
Future.successful(combinedCancellable)
} else {
Future {
BitcoindRpcBackendUtil.startZMQWalletCallbacks(
@ -583,7 +582,9 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
BitcoindPollingCancellabe.none
}
}
} yield pollingCancellable
}
} yield BitcoindSyncState(syncF, pollingCancellable)
f.failed.foreach(err =>
logger.error(s"Error syncing bitcoin-s wallet with bitcoind", err))

View File

@ -2,6 +2,7 @@ package org.bitcoins.server
import akka.{Done, NotUsed}
import akka.actor.{ActorSystem, Cancellable}
import akka.stream.BoundedSourceQueue
import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source}
import grizzled.slf4j.Logging
import org.bitcoins.chain.ChainCallbacks
@ -43,10 +44,7 @@ object BitcoindRpcBackendUtil extends Logging {
_ <- setSyncingFlag(true, bitcoind, chainCallbacksOpt)
bitcoindHeight <- bitcoind.getBlockCount
walletStateOpt <- wallet.getSyncDescriptorOpt()
walletBirthdayHeight = 0 // need to come back to this likely
_ = logger.info(
s"Syncing from bitcoind with bitcoindHeight=$bitcoindHeight walletHeight=${walletStateOpt
.getOrElse(walletBirthdayHeight)}")
heightRange <- {
walletStateOpt match {
case None =>
@ -56,6 +54,8 @@ object BitcoindRpcBackendUtil extends Logging {
Future.successful(range)
}
}
_ = logger.info(
s"Syncing from bitcoind with bitcoindHeight=$bitcoindHeight walletHeight=${heightRange.start}")
syncFlow <- buildBitcoindSyncSink(bitcoind, wallet)
stream = Source(heightRange).toMat(syncFlow)(Keep.right)
} yield stream
@ -108,11 +108,14 @@ object BitcoindRpcBackendUtil extends Logging {
syncing: Boolean,
bitcoind: BitcoindRpcClient,
chainCallbacksOpt: Option[ChainCallbacks])(implicit
ec: ExecutionContext) = for {
_ <- bitcoind.setSyncing(false)
} yield {
chainCallbacksOpt.map(_.executeOnSyncFlagChanged(logger, syncing))
()
ec: ExecutionContext): Future[Unit] = {
logger.debug(s"Setting bitcoind syncing flag to $syncing")
for {
_ <- bitcoind.setSyncing(syncing)
} yield {
chainCallbacksOpt.map(_.executeOnSyncFlagChanged(logger, syncing))
()
}
}
/** Helper method to sync the wallet until the bitcoind height.
@ -131,7 +134,7 @@ object BitcoindRpcBackendUtil extends Logging {
.map(_ => true)
.recover { case _: Throwable => false }
val numParallelism = getParallelism
val numParallelism = FutureUtil.getParallelism
//feeding blockchain hashes into this sync
//will sync our wallet with those blockchain hashes
val syncWalletSinkF: Future[
@ -259,7 +262,7 @@ object BitcoindRpcBackendUtil extends Logging {
Future[NeutrinoHDWalletApi]] = {
import system.dispatcher
val numParallelism = getParallelism
val numParallelism = FutureUtil.getParallelism
val sink: Sink[DoubleSha256Digest, Future[NeutrinoHDWalletApi]] =
Flow[DoubleSha256Digest]
.mapAsync(parallelism = numParallelism) { hash =>
@ -290,7 +293,7 @@ object BitcoindRpcBackendUtil extends Logging {
override def downloadBlocks(
blockHashes: Vector[DoubleSha256Digest]): Future[Unit] = {
logger.info(s"Fetching ${blockHashes.length} blocks from bitcoind")
val numParallelism = getParallelism
val numParallelism = FutureUtil.getParallelism
walletF
.flatMap { wallet =>
val runStream: Future[Done] = Source(blockHashes)
@ -349,90 +352,29 @@ object BitcoindRpcBackendUtil extends Logging {
bitcoind: BitcoindRpcClient,
chainCallbacksOpt: Option[ChainCallbacks],
interval: FiniteDuration = 10.seconds)(implicit
system: ActorSystem,
ec: ExecutionContext): Future[Cancellable] = {
system: ActorSystem): Cancellable = {
import system.dispatcher
for {
walletSyncState <- wallet.getSyncState()
} yield {
val numParallelism = getParallelism
val atomicPrevCount: AtomicInteger = new AtomicInteger(
walletSyncState.height)
val processingBitcoindBlocks = new AtomicBoolean(false)
val processingBitcoindBlocks = new AtomicBoolean(false)
def pollBitcoind(): Future[Unit] = {
system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () =>
{
if (processingBitcoindBlocks.compareAndSet(false, true)) {
logger.trace("Polling bitcoind for block count")
bitcoind.setSyncing(true)
val res: Future[Unit] = for {
_ <- setSyncingFlag(true, bitcoind, chainCallbacksOpt)
count <- bitcoind.getBlockCount
retval <- {
val prevCount = atomicPrevCount.get()
if (prevCount < count) {
logger.info(
s"Bitcoind has new block(s), requesting... ${count - prevCount} blocks")
// use .tail so we don't process the previous block that we already did
val range = prevCount.to(count).tail
val hashFs: Future[Seq[DoubleSha256Digest]] = Source(range)
.mapAsync(parallelism = numParallelism) { height =>
bitcoind.getBlockHash(height).map(_.flip)
}
.map { hash =>
val _ = atomicPrevCount.incrementAndGet()
hash
}
.toMat(Sink.seq)(Keep.right)
.run()
val requestsBlocksF = for {
hashes <- hashFs
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
} yield logger.debug(
"Successfully polled bitcoind for new blocks")
requestsBlocksF.failed.foreach { case err =>
val failedCount = atomicPrevCount.get
atomicPrevCount.set(prevCount)
logger.error(
s"Requesting blocks from bitcoind polling failed, range=[$prevCount, $failedCount]",
err)
}
requestsBlocksF
} else if (prevCount > count) {
Future.failed(new RuntimeException(
s"Bitcoind is at a block height ($count) before the wallet's ($prevCount)"))
} else {
logger.debug(s"In sync $prevCount count=$count")
Future.unit
}
}
} yield {
retval
}
res.onComplete { _ =>
processingBitcoindBlocks.set(false)
setSyncingFlag(false, bitcoind, chainCallbacksOpt)
}
res
} else {
logger.info(
s"Skipping scanning the blockchain since a previously scheduled task is still running")
Future.unit
}
}
system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () =>
{
val f = for {
walletSyncState <- wallet.getSyncState()
rescanning <- wallet.isRescanning()
res <-
if (!rescanning) {
pollBitcoind()
val pollFOptF = pollBitcoind(wallet = wallet,
bitcoind = bitcoind,
chainCallbacksOpt =
chainCallbacksOpt,
prevCount = walletSyncState.height)
pollFOptF.flatMap {
case Some(pollF) => pollF
case None => Future.unit
}
} else {
logger.info(
s"Skipping scanning the blockchain during wallet rescan")
@ -440,12 +382,89 @@ object BitcoindRpcBackendUtil extends Logging {
}
} yield res
f.onComplete { _ =>
processingBitcoindBlocks.set(false)
BitcoindRpcBackendUtil.setSyncingFlag(false,
bitcoind,
chainCallbacksOpt)
} //reset polling variable
f.failed.foreach(err => logger.error(s"Failed to poll bitcoind", err))
} else {
logger.info(s"Previous bitcoind polling still running")
}
}
}
}
/** Polls bitcoind for syncing the blockchain
* @return None if there was nothing to sync, else the Future[Done] that is completed when the sync is finished.
*/
private def pollBitcoind(
wallet: WalletApi,
bitcoind: BitcoindRpcClient,
chainCallbacksOpt: Option[ChainCallbacks],
prevCount: Int)(implicit
system: ActorSystem): Future[Option[Future[Done]]] = {
import system.dispatcher
val atomicPrevCount = new AtomicInteger(prevCount)
val queueSource: Source[Int, BoundedSourceQueue[Int]] = Source.queue(100)
val numParallelism = FutureUtil.getParallelism
val processBlockSink = Sink.foreachAsync[Vector[DoubleSha256Digest]](1) {
case hashes: Vector[DoubleSha256Digest] =>
val requestsBlocksF = wallet.nodeApi.downloadBlocks(hashes)
requestsBlocksF.failed.foreach { case err =>
val failedCount = atomicPrevCount.get
atomicPrevCount.set(prevCount)
logger.error(
s"Requesting blocks from bitcoind polling failed, range=[$prevCount, $failedCount]",
err)
}
requestsBlocksF
}
val (queue, doneF) = queueSource
.mapAsync(parallelism = numParallelism) { height: Int =>
bitcoind.getBlockHash(height).map(_.flip)
}
.map { hash =>
val _ = atomicPrevCount.incrementAndGet()
hash
}
.batch(100, seed = hash => Vector(hash))(_ :+ _)
.toMat(processBlockSink)(Keep.both)
.run()
logger.debug("Polling bitcoind for block count")
val resF: Future[Unit] = for {
_ <- bitcoind.setSyncing(true)
_ <- setSyncingFlag(true, bitcoind, chainCallbacksOpt)
count <- bitcoind.getBlockCount
retval <- {
if (prevCount < count) {
logger.info(
s"Bitcoind has new block(s), requesting... ${count - prevCount} blocks")
// use .tail so we don't process the previous block that we already did
val range = prevCount.to(count).tail
range.foreach(r => queue.offer(r))
Future.unit
} else if (prevCount > count) {
Future.failed(new RuntimeException(
s"Bitcoind is at a block height ($count) before the wallet's ($prevCount)"))
} else {
logger.debug(s"In sync $prevCount count=$count")
Future.unit
}
}
} yield {
queue.complete() //complete the stream after offering al heights we need ot sync
retval
}
resF.map(_ => Some(doneF))
}
def startBitcoindMempoolPolling(
wallet: WalletApi,
bitcoind: BitcoindRpcClient,
@ -526,15 +545,4 @@ object BitcoindRpcBackendUtil extends Logging {
}
}
/** Helper method to retrieve paralleism for streams
* This is needed on machines with any cores which can trigger
* open request exceptions with akka default limit of 32 open requests at a time
* So now we set the maximum parallelism to 8
*/
private def getParallelism: Int = {
//max open requests is 32 in akka, so 1/8 of possible requests
//can be used to query the mempool, else just limit it be number of processors
//see: https://github.com/bitcoin-s/bitcoin-s/issues/4252
Math.min(Runtime.getRuntime.availableProcessors(), 8).toInt
}
}

View File

@ -0,0 +1,19 @@
package org.bitcoins.server.bitcoind
import org.bitcoins.server.util.BitcoindPollingCancellabe
import scala.concurrent.Future
/** @param syncF the future that will be completed when the synchronization with bitcoind is complete
* @param pollingCancellable You can cancel bitcoind polling by calling [[BitcoindPollingCancellabe.cancel()]]
*/
case class BitcoindSyncState(
syncF: Future[Unit],
pollingCancellable: BitcoindPollingCancellabe) {
/** Stops syncing and polling bitcoind */
def stop(): Future[Unit] = {
pollingCancellable.cancel()
syncF
}
}

View File

@ -87,8 +87,8 @@ class BitcoindRpcClient(override val instance: BitcoindInstance)(implicit
result.feerate match {
case Some(feeRate) => Future.successful(feeRate)
case None =>
Future.failed(
new RuntimeException("Unexpected error when getting fee rate"))
Future.failed(new RuntimeException(
s"Unexpected error when getting fee rate, errors=${result.errors}"))
}
}
// Chain Api

View File

@ -16,6 +16,7 @@ import org.bitcoins.core.config.{MainNet, RegTest, SigNet, TestNet3}
import org.bitcoins.core.util.TimeUtil
import org.bitcoins.db.{DbAppConfig, JdbcProfileComponent}
import org.bitcoins.node._
import org.bitcoins.node.callback.NodeCallbackStreamManager
import org.bitcoins.node.db.NodeDbManagement
import org.bitcoins.node.models.Peer
import org.bitcoins.node.networking.peer.DataMessageHandler
@ -90,9 +91,13 @@ case class NodeAppConfig(baseDatadir: Path, configOverrides: Vector[Config])(
}
override def stop(): Future[Unit] = {
clearCallbacks()
val stopCallbacksF = callBacks match {
case stream: NodeCallbackStreamManager => stream.stop()
case _: NodeCallbacks =>
Future.unit
}
val _ = stopHikariLogger()
super.stop()
stopCallbacksF.flatMap(_ => super.stop())
}
lazy val nodeType: NodeType =

View File

@ -18,6 +18,7 @@ trait WalletAppConfigWithBitcoindFixtures
with EmbeddedPg { _: CachedBitcoind[_] =>
override def afterAll(): Unit = {
super[EmbeddedPg].afterAll()
super[BitcoinSAsyncFixtureTest].afterAll()
}
}

View File

@ -45,10 +45,10 @@ class BitcoindBlockPollingTest
_ = assert(firstBalance == Satoshis.zero)
// Setup block polling
_ <- BitcoindRpcBackendUtil.startBitcoindBlockPolling(wallet,
bitcoind,
None,
1.second)
_ = BitcoindRpcBackendUtil.startBitcoindBlockPolling(wallet,
bitcoind,
None,
1.second)
_ <- bitcoind.generateToAddress(6, bech32Address)
// Wait for it to process

View File

@ -978,8 +978,8 @@ abstract class Wallet
override def run(): Unit = {
getFeeRate()
.map(feeRate => Some(feeRate))
.recover { case NonFatal(ex) =>
logger.error("Cannot get fee rate ", ex)
.recover { case NonFatal(_) =>
//logger.error("Cannot get fee rate ", ex)
None
}
.foreach { feeRateOpt =>