From d0dadfa423d21f21b5a199147a692265e03c1fbd Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Fri, 12 Aug 2022 14:34:03 -0500 Subject: [PATCH] 2022 08 11 issue 4600 and only emit `syncing` ws event when we are actually syncing (#4604) * Check if bitcoind is syncing before running scheduled job to poll bitcoind * Set syncing flag to true if initialblockdownload is true * Implement logic so we only emit websocket events when state is changed, rather than everytime we poll bitcoind * Implement logic to only emit websocket event when descriptor flag changes rather than when it is set --- .../bitcoins/server/BitcoinSServerMain.scala | 12 +- .../server/BitcoindRpcBackendUtil.scala | 120 ++++++++++++------ .../chain/blockchain/ChainHandler.scala | 29 ++++- 3 files changed, 112 insertions(+), 49 deletions(-) diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala index 5c539426e3..66b1716527 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoinSServerMain.scala @@ -368,10 +368,10 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit nodeApi <- nodeApiF feeProvider <- feeProviderF } yield { - val l = DLCWalletBitcoindBackendLoader(walletHolder, - bitcoind, - nodeApi, - feeProvider) + val l = DLCWalletBitcoindBackendLoader(walletHolder = walletHolder, + bitcoind = bitcoind, + nodeApi = nodeApi, + feeProvider = feeProvider) walletLoaderApiOpt = Some(l) l @@ -399,7 +399,9 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit val bitcoindSyncStateF: Future[BitcoindSyncState] = { for { bitcoind <- bitcoindF - bitcoindNetwork <- getBlockChainInfo(bitcoind).map(_.chain) + blockchainInfo <- getBlockChainInfo(bitcoind) + _ <- bitcoind.setSyncing(blockchainInfo.initialblockdownload) + bitcoindNetwork = blockchainInfo.chain _ = require( bitcoindNetwork == network, s"bitcoind ($bitcoindNetwork) on different network than wallet ($network)") diff --git a/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala b/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala index 58f2daff40..571065d1ea 100644 --- a/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala +++ b/app/server/src/main/scala/org/bitcoins/server/BitcoindRpcBackendUtil.scala @@ -65,7 +65,19 @@ object BitcoindRpcBackendUtil extends Logging { //run the stream val res = streamF.flatMap(_.run()) res.onComplete { case _ => - setSyncingFlag(false, bitcoind, chainCallbacksOpt) + val isBitcoindInSyncF = BitcoindRpcBackendUtil.isBitcoindInSync(bitcoind) + isBitcoindInSyncF.flatMap { isBitcoindInSync => + if (isBitcoindInSync) { + //if bitcoind is in sync, and we are in sync with bitcoind, set the syncing flag to false + setSyncingFlag(false, bitcoind, chainCallbacksOpt) + } else { + //if bitcoind is not in sync, we cannot be done syncing. Keep the syncing flag to true + //so do nothing in this case + logger.warn( + s"We synced against bitcoind, but bitcoind is not in sync with the network.") + Future.unit + } + } } res.map(_ => ()) @@ -111,11 +123,23 @@ object BitcoindRpcBackendUtil extends Logging { bitcoind: BitcoindRpcClient, chainCallbacksOpt: Option[ChainCallbacks])(implicit ec: ExecutionContext): Future[Unit] = { - logger.debug(s"Setting bitcoind syncing flag to $syncing") + val oldSyncingFlagF = bitcoind.isSyncing() for { + oldFlag <- oldSyncingFlagF _ <- bitcoind.setSyncing(syncing) + _ <- { + if (oldFlag != syncing) { + val executeCallbackOpt = + chainCallbacksOpt.map(_.executeOnSyncFlagChanged(syncing)) + executeCallbackOpt match { + case Some(f) => f + case None => Future.unit + } + } else { + Future.unit + } + } } yield { - chainCallbacksOpt.map(_.executeOnSyncFlagChanged(syncing)) () } } @@ -379,39 +403,47 @@ object BitcoindRpcBackendUtil extends Logging { system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () => { - if (processingBitcoindBlocks.compareAndSet(false, true)) { - val f = for { - walletSyncState <- wallet.getSyncState() - rescanning <- wallet.isRescanning() - res <- - if (!rescanning) { - val pollFOptF = pollBitcoind(wallet = wallet, - bitcoind = bitcoind, - chainCallbacksOpt = - chainCallbacksOpt, - prevCount = walletSyncState.height) + val isBitcoindSyncedF = isBitcoindInSync(bitcoind) - pollFOptF.flatMap { - case Some(pollF) => pollF - case None => Future.unit + isBitcoindSyncedF.map { isBitcoindSynced => + if (!isBitcoindSynced) { + logger.info(s"Bitcoind is not synced, waiting for IBD to complete.") + } else if (processingBitcoindBlocks.compareAndSet(false, true)) { + val f = for { + walletSyncState <- wallet.getSyncState() + rescanning <- wallet.isRescanning() + res <- + if (!rescanning) { + val pollFOptF = + pollBitcoind(wallet = wallet, + bitcoind = bitcoind, + chainCallbacksOpt = chainCallbacksOpt, + prevCount = walletSyncState.height) + + pollFOptF.flatMap { + case Some(pollF) => pollF + case None => Future.unit + } + } else { + logger.info( + s"Skipping scanning the blockchain during wallet rescan") + Future.unit } - } else { - logger.info( - s"Skipping scanning the blockchain during wallet rescan") - Future.unit - } - } yield res + } yield res - f.onComplete { _ => - processingBitcoindBlocks.set(false) - BitcoindRpcBackendUtil.setSyncingFlag(false, - bitcoind, - chainCallbacksOpt) - } //reset polling variable - f.failed.foreach(err => logger.error(s"Failed to poll bitcoind", err)) - } else { - logger.info(s"Previous bitcoind polling still running") + f.onComplete { _ => + processingBitcoindBlocks.set(false) + BitcoindRpcBackendUtil.setSyncingFlag(false, + bitcoind, + chainCallbacksOpt) + } //reset polling variable + f.failed.foreach(err => + logger.error(s"Failed to poll bitcoind", err)) + } else { + logger.info(s"Previous bitcoind polling still running") + } } + () } } } @@ -470,19 +502,17 @@ object BitcoindRpcBackendUtil extends Logging { logger.debug("Polling bitcoind for block count") val resF: Future[Unit] = for { - _ <- bitcoind.setSyncing(true) - _ <- setSyncingFlag(true, bitcoind, chainCallbacksOpt) count <- bitcoind.getBlockCount retval <- { if (prevCount < count) { logger.info( s"Bitcoind has new block(s), requesting... ${count - prevCount} blocks") - - // use .tail so we don't process the previous block that we already did - val range = prevCount.to(count).tail - - range.foreach(r => queue.offer(r)) - Future.unit + val setSyncFlagF = setSyncingFlag(true, bitcoind, chainCallbacksOpt) + setSyncFlagF.map { _ => + // use .tail so we don't process the previous block that we already did + val range = prevCount.to(count).tail + range.foreach(r => queue.offer(r)) + } } else if (prevCount > count) { Future.failed(new RuntimeException( s"Bitcoind is at a block height ($count) before the wallet's ($prevCount)")) @@ -578,4 +608,14 @@ object BitcoindRpcBackendUtil extends Logging { } } + /** Checks if bitcoind has all blocks for the headers it has seen on the network */ + private def isBitcoindInSync(bitcoind: BitcoindRpcClient)(implicit + ec: ExecutionContext): Future[Boolean] = { + for { + blockchainInfo <- bitcoind.getBlockChainInfo + } yield { + blockchainInfo.headers == blockchainInfo.blocks + } + } + } diff --git a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala index 49e8d65dc1..2971958291 100644 --- a/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala +++ b/chain/src/main/scala/org/bitcoins/chain/blockchain/ChainHandler.scala @@ -1020,13 +1020,34 @@ class ChainHandler( } override def setSyncing(value: Boolean): Future[ChainApi] = { + val isSyncingF = stateDAO.isSyncing + for { + isSyncing <- isSyncingF + _ <- { + if (isSyncing == value) { + //do nothing as we are already at this state + Future.unit + } else { + updateSyncingAndExecuteCallback(value) + } + } + } yield { + this + } + } + + private def updateSyncingAndExecuteCallback(value: Boolean): Future[Unit] = { for { changed <- stateDAO.updateSyncing(value) - } yield { - if (changed && chainConfig.callBacks.onSyncFlagChanged.nonEmpty) { - chainConfig.callBacks.executeOnSyncFlagChanged(value) + _ <- { + if (changed && chainConfig.callBacks.onSyncFlagChanged.nonEmpty) { + chainConfig.callBacks.executeOnSyncFlagChanged(value) + } else { + Future.unit + } } - this + } yield { + () } } }