2022 08 11 issue 4600 and only emit syncing ws event when we are actually syncing (#4604)

* Check if bitcoind is syncing before running scheduled job to poll bitcoind

* Set syncing flag to true if initialblockdownload is true

* Implement logic so we only emit websocket events when state is changed, rather than everytime we poll bitcoind

* Implement logic to only emit websocket event when descriptor flag changes rather than when it is set
This commit is contained in:
Chris Stewart 2022-08-12 14:34:03 -05:00 committed by GitHub
parent dea99457b5
commit d0dadfa423
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 112 additions and 49 deletions

View File

@ -368,10 +368,10 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
nodeApi <- nodeApiF
feeProvider <- feeProviderF
} yield {
val l = DLCWalletBitcoindBackendLoader(walletHolder,
bitcoind,
nodeApi,
feeProvider)
val l = DLCWalletBitcoindBackendLoader(walletHolder = walletHolder,
bitcoind = bitcoind,
nodeApi = nodeApi,
feeProvider = feeProvider)
walletLoaderApiOpt = Some(l)
l
@ -399,7 +399,9 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
val bitcoindSyncStateF: Future[BitcoindSyncState] = {
for {
bitcoind <- bitcoindF
bitcoindNetwork <- getBlockChainInfo(bitcoind).map(_.chain)
blockchainInfo <- getBlockChainInfo(bitcoind)
_ <- bitcoind.setSyncing(blockchainInfo.initialblockdownload)
bitcoindNetwork = blockchainInfo.chain
_ = require(
bitcoindNetwork == network,
s"bitcoind ($bitcoindNetwork) on different network than wallet ($network)")

View File

@ -65,7 +65,19 @@ object BitcoindRpcBackendUtil extends Logging {
//run the stream
val res = streamF.flatMap(_.run())
res.onComplete { case _ =>
setSyncingFlag(false, bitcoind, chainCallbacksOpt)
val isBitcoindInSyncF = BitcoindRpcBackendUtil.isBitcoindInSync(bitcoind)
isBitcoindInSyncF.flatMap { isBitcoindInSync =>
if (isBitcoindInSync) {
//if bitcoind is in sync, and we are in sync with bitcoind, set the syncing flag to false
setSyncingFlag(false, bitcoind, chainCallbacksOpt)
} else {
//if bitcoind is not in sync, we cannot be done syncing. Keep the syncing flag to true
//so do nothing in this case
logger.warn(
s"We synced against bitcoind, but bitcoind is not in sync with the network.")
Future.unit
}
}
}
res.map(_ => ())
@ -111,11 +123,23 @@ object BitcoindRpcBackendUtil extends Logging {
bitcoind: BitcoindRpcClient,
chainCallbacksOpt: Option[ChainCallbacks])(implicit
ec: ExecutionContext): Future[Unit] = {
logger.debug(s"Setting bitcoind syncing flag to $syncing")
val oldSyncingFlagF = bitcoind.isSyncing()
for {
oldFlag <- oldSyncingFlagF
_ <- bitcoind.setSyncing(syncing)
_ <- {
if (oldFlag != syncing) {
val executeCallbackOpt =
chainCallbacksOpt.map(_.executeOnSyncFlagChanged(syncing))
executeCallbackOpt match {
case Some(f) => f
case None => Future.unit
}
} else {
Future.unit
}
}
} yield {
chainCallbacksOpt.map(_.executeOnSyncFlagChanged(syncing))
()
}
}
@ -379,39 +403,47 @@ object BitcoindRpcBackendUtil extends Logging {
system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () =>
{
if (processingBitcoindBlocks.compareAndSet(false, true)) {
val f = for {
walletSyncState <- wallet.getSyncState()
rescanning <- wallet.isRescanning()
res <-
if (!rescanning) {
val pollFOptF = pollBitcoind(wallet = wallet,
bitcoind = bitcoind,
chainCallbacksOpt =
chainCallbacksOpt,
prevCount = walletSyncState.height)
val isBitcoindSyncedF = isBitcoindInSync(bitcoind)
pollFOptF.flatMap {
case Some(pollF) => pollF
case None => Future.unit
isBitcoindSyncedF.map { isBitcoindSynced =>
if (!isBitcoindSynced) {
logger.info(s"Bitcoind is not synced, waiting for IBD to complete.")
} else if (processingBitcoindBlocks.compareAndSet(false, true)) {
val f = for {
walletSyncState <- wallet.getSyncState()
rescanning <- wallet.isRescanning()
res <-
if (!rescanning) {
val pollFOptF =
pollBitcoind(wallet = wallet,
bitcoind = bitcoind,
chainCallbacksOpt = chainCallbacksOpt,
prevCount = walletSyncState.height)
pollFOptF.flatMap {
case Some(pollF) => pollF
case None => Future.unit
}
} else {
logger.info(
s"Skipping scanning the blockchain during wallet rescan")
Future.unit
}
} else {
logger.info(
s"Skipping scanning the blockchain during wallet rescan")
Future.unit
}
} yield res
} yield res
f.onComplete { _ =>
processingBitcoindBlocks.set(false)
BitcoindRpcBackendUtil.setSyncingFlag(false,
bitcoind,
chainCallbacksOpt)
} //reset polling variable
f.failed.foreach(err => logger.error(s"Failed to poll bitcoind", err))
} else {
logger.info(s"Previous bitcoind polling still running")
f.onComplete { _ =>
processingBitcoindBlocks.set(false)
BitcoindRpcBackendUtil.setSyncingFlag(false,
bitcoind,
chainCallbacksOpt)
} //reset polling variable
f.failed.foreach(err =>
logger.error(s"Failed to poll bitcoind", err))
} else {
logger.info(s"Previous bitcoind polling still running")
}
}
()
}
}
}
@ -470,19 +502,17 @@ object BitcoindRpcBackendUtil extends Logging {
logger.debug("Polling bitcoind for block count")
val resF: Future[Unit] = for {
_ <- bitcoind.setSyncing(true)
_ <- setSyncingFlag(true, bitcoind, chainCallbacksOpt)
count <- bitcoind.getBlockCount
retval <- {
if (prevCount < count) {
logger.info(
s"Bitcoind has new block(s), requesting... ${count - prevCount} blocks")
// use .tail so we don't process the previous block that we already did
val range = prevCount.to(count).tail
range.foreach(r => queue.offer(r))
Future.unit
val setSyncFlagF = setSyncingFlag(true, bitcoind, chainCallbacksOpt)
setSyncFlagF.map { _ =>
// use .tail so we don't process the previous block that we already did
val range = prevCount.to(count).tail
range.foreach(r => queue.offer(r))
}
} else if (prevCount > count) {
Future.failed(new RuntimeException(
s"Bitcoind is at a block height ($count) before the wallet's ($prevCount)"))
@ -578,4 +608,14 @@ object BitcoindRpcBackendUtil extends Logging {
}
}
/** Checks if bitcoind has all blocks for the headers it has seen on the network */
private def isBitcoindInSync(bitcoind: BitcoindRpcClient)(implicit
ec: ExecutionContext): Future[Boolean] = {
for {
blockchainInfo <- bitcoind.getBlockChainInfo
} yield {
blockchainInfo.headers == blockchainInfo.blocks
}
}
}

View File

@ -1020,13 +1020,34 @@ class ChainHandler(
}
override def setSyncing(value: Boolean): Future[ChainApi] = {
val isSyncingF = stateDAO.isSyncing
for {
isSyncing <- isSyncingF
_ <- {
if (isSyncing == value) {
//do nothing as we are already at this state
Future.unit
} else {
updateSyncingAndExecuteCallback(value)
}
}
} yield {
this
}
}
private def updateSyncingAndExecuteCallback(value: Boolean): Future[Unit] = {
for {
changed <- stateDAO.updateSyncing(value)
} yield {
if (changed && chainConfig.callBacks.onSyncFlagChanged.nonEmpty) {
chainConfig.callBacks.executeOnSyncFlagChanged(value)
_ <- {
if (changed && chainConfig.callBacks.onSyncFlagChanged.nonEmpty) {
chainConfig.callBacks.executeOnSyncFlagChanged(value)
} else {
Future.unit
}
}
this
} yield {
()
}
}
}