Persist whether wallet is rescanning in the database (#4326)

* Persist whether wallet is rescanning in the database

* fix cli

* fix build

* fix unit tests

* fix postgres tests

* remove wallet_state table

* fix rescan bug

* cleanup

* revert Cancellable's

* Cleanup

Co-authored-by: Chris Stewart <stewart.chris1234@gmail.com>
This commit is contained in:
rorp 2022-05-23 17:03:02 -07:00 committed by GitHub
parent 527e3ae862
commit f680ab8691
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 349 additions and 192 deletions

View file

@ -9,7 +9,9 @@ object WalletStateDescriptorType
final case object SyncHeight extends WalletStateDescriptorType final case object SyncHeight extends WalletStateDescriptorType
val all: Vector[WalletStateDescriptorType] = Vector(SyncHeight) final case object Rescan extends WalletStateDescriptorType
val all: Vector[WalletStateDescriptorType] = Vector(SyncHeight, Rescan)
override def fromStringOpt(str: String): Option[WalletStateDescriptorType] = { override def fromStringOpt(str: String): Option[WalletStateDescriptorType] = {
all.find(state => str.toLowerCase() == state.toString.toLowerCase) all.find(state => str.toLowerCase() == state.toString.toLowerCase)
@ -36,8 +38,8 @@ sealed trait WalletStateDescriptorFactory[T <: WalletStateDescriptor]
object WalletStateDescriptor extends StringFactory[WalletStateDescriptor] { object WalletStateDescriptor extends StringFactory[WalletStateDescriptor] {
val all: Vector[StringFactory[WalletStateDescriptor]] = Vector( val all: Vector[StringFactory[WalletStateDescriptor]] =
SyncHeightDescriptor) Vector(SyncHeightDescriptor, RescanDescriptor)
override def fromString(string: String): WalletStateDescriptor = { override def fromString(string: String): WalletStateDescriptor = {
all.find(f => f.fromStringT(string).isSuccess) match { all.find(f => f.fromStringT(string).isSuccess) match {
@ -71,3 +73,22 @@ object SyncHeightDescriptor
SyncHeightDescriptor(hash, height) SyncHeightDescriptor(hash, height)
} }
} }
case class RescanDescriptor(rescanning: Boolean) extends WalletStateDescriptor {
override val descriptorType: WalletStateDescriptorType =
WalletStateDescriptorType.Rescan
override val toString: String = rescanning.toString
}
object RescanDescriptor extends WalletStateDescriptorFactory[RescanDescriptor] {
override val tpe: WalletStateDescriptorType =
WalletStateDescriptorType.Rescan
override def fromString(string: String): RescanDescriptor = {
val rescanning = java.lang.Boolean.parseBoolean(string)
RescanDescriptor(rescanning)
}
}

View file

@ -1697,8 +1697,9 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
.rescanNeutrinoWallet(_: Option[BlockStamp], .rescanNeutrinoWallet(_: Option[BlockStamp],
_: Option[BlockStamp], _: Option[BlockStamp],
_: Int, _: Int,
_: Boolean,
_: Boolean)(_: ExecutionContext)) _: Boolean)(_: ExecutionContext))
.expects(None, None, 100, false, executor) .expects(None, None, 100, false, false, executor)
.returning(Future.successful(RescanState.RescanDone)) .returning(Future.successful(RescanState.RescanDone))
val route1 = val route1 =
@ -1718,6 +1719,7 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
.rescanNeutrinoWallet(_: Option[BlockStamp], .rescanNeutrinoWallet(_: Option[BlockStamp],
_: Option[BlockStamp], _: Option[BlockStamp],
_: Int, _: Int,
_: Boolean,
_: Boolean)(_: ExecutionContext)) _: Boolean)(_: ExecutionContext))
.expects( .expects(
Some(BlockTime( Some(BlockTime(
@ -1725,6 +1727,7 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
None, None,
100, 100,
false, false,
false,
executor) executor)
.returning(Future.successful(RescanState.RescanDone)) .returning(Future.successful(RescanState.RescanDone))
@ -1747,11 +1750,13 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
.rescanNeutrinoWallet(_: Option[BlockStamp], .rescanNeutrinoWallet(_: Option[BlockStamp],
_: Option[BlockStamp], _: Option[BlockStamp],
_: Int, _: Int,
_: Boolean,
_: Boolean)(_: ExecutionContext)) _: Boolean)(_: ExecutionContext))
.expects(None, .expects(None,
Some(BlockHash(DoubleSha256DigestBE.empty)), Some(BlockHash(DoubleSha256DigestBE.empty)),
100, 100,
false, false,
false,
executor) executor)
.returning(Future.successful(RescanState.RescanDone)) .returning(Future.successful(RescanState.RescanDone))
@ -1774,11 +1779,13 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
.rescanNeutrinoWallet(_: Option[BlockStamp], .rescanNeutrinoWallet(_: Option[BlockStamp],
_: Option[BlockStamp], _: Option[BlockStamp],
_: Int, _: Int,
_: Boolean,
_: Boolean)(_: ExecutionContext)) _: Boolean)(_: ExecutionContext))
.expects(Some(BlockHeight(12345)), .expects(Some(BlockHeight(12345)),
Some(BlockHeight(67890)), Some(BlockHeight(67890)),
100, 100,
false, false,
false,
executor) executor)
.returning(Future.successful(RescanState.RescanDone)) .returning(Future.successful(RescanState.RescanDone))
@ -1838,8 +1845,9 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
.rescanNeutrinoWallet(_: Option[BlockStamp], .rescanNeutrinoWallet(_: Option[BlockStamp],
_: Option[BlockStamp], _: Option[BlockStamp],
_: Int, _: Int,
_: Boolean,
_: Boolean)(_: ExecutionContext)) _: Boolean)(_: ExecutionContext))
.expects(None, None, 55, false, executor) .expects(None, None, 55, false, false, executor)
.returning(Future.successful(RescanState.RescanDone)) .returning(Future.successful(RescanState.RescanDone))
val route8 = val route8 =

View file

@ -211,6 +211,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
_ <- startedTorConfigF _ <- startedTorConfigF
wallet <- configuredWalletF wallet <- configuredWalletF
_ <- handleDuplicateSpendingInfoDb(wallet) _ <- handleDuplicateSpendingInfoDb(wallet)
_ <- restartRescanIfNeeded(wallet)
_ <- node.sync() _ <- node.sync()
} yield { } yield {
logger.info( logger.info(
@ -337,6 +338,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
_ = dlcConf.addCallbacks(dlcWalletCallbacks) _ = dlcConf.addCallbacks(dlcWalletCallbacks)
_ <- startedTorConfigF _ <- startedTorConfigF
_ <- handleDuplicateSpendingInfoDb(wallet) _ <- handleDuplicateSpendingInfoDb(wallet)
_ <- restartRescanIfNeeded(wallet)
} yield { } yield {
logger.info(s"Done starting Main!") logger.info(s"Done starting Main!")
() ()
@ -480,7 +482,8 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
endOpt = None, endOpt = None,
addressBatchSize = addressBatchSize =
walletConf.discoveryBatchSize, walletConf.discoveryBatchSize,
useCreationTime = true) useCreationTime = true,
force = true)
} yield clearedWallet } yield clearedWallet
walletF.map(_ => ()) walletF.map(_ => ())
} }
@ -506,7 +509,7 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
.startBitcoindBlockPolling(wallet, bitcoind) .startBitcoindBlockPolling(wallet, bitcoind)
.map { _ => .map { _ =>
BitcoindRpcBackendUtil BitcoindRpcBackendUtil
.startBitcoindMempoolPolling(bitcoind) { tx => .startBitcoindMempoolPolling(wallet, bitcoind) { tx =>
nodeConf.nodeCallbacks nodeConf.nodeCallbacks
.executeOnTxReceivedCallbacks(logger, tx) .executeOnTxReceivedCallbacks(logger, tx)
} }
@ -565,7 +568,8 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
.rescanNeutrinoWallet(startOpt = None, .rescanNeutrinoWallet(startOpt = None,
endOpt = None, endOpt = None,
addressBatchSize = wallet.discoveryBatchSize, addressBatchSize = wallet.discoveryBatchSize,
useCreationTime = true) useCreationTime = true,
force = true)
.recover { case scala.util.control.NonFatal(exn) => .recover { case scala.util.control.NonFatal(exn) =>
logger.error(s"Failed to handleDuplicateSpendingInfoDb rescan", logger.error(s"Failed to handleDuplicateSpendingInfoDb rescan",
exn) exn)
@ -577,6 +581,21 @@ class BitcoinSServerMain(override val serverArgParser: ServerArgParser)(implicit
_ <- spendingInfoDAO.createOutPointsIndexIfNeeded() _ <- spendingInfoDAO.createOutPointsIndexIfNeeded()
} yield () } yield ()
} }
private def restartRescanIfNeeded(wallet: Wallet): Future[RescanState] = {
for {
isRescanning <- wallet.isRescanning()
res <-
if (isRescanning)
wallet.rescanNeutrinoWallet(startOpt = None,
endOpt = None,
addressBatchSize =
wallet.discoveryBatchSize,
useCreationTime = true,
force = true)
else Future.successful(RescanState.RescanDone)
} yield res
}
} }
object BitcoinSServerMain extends BitcoinSAppScalaDaemon { object BitcoinSServerMain extends BitcoinSAppScalaDaemon {

View file

@ -291,8 +291,8 @@ object BitcoindRpcBackendUtil extends Logging {
* if it has changed, it will then request those blocks to process them * if it has changed, it will then request those blocks to process them
* *
* @param startCount The starting block height of the wallet * @param startCount The starting block height of the wallet
* @param interval The amount of time between polls, this should not be too aggressive * @param interval The amount of time between polls, this should not be too aggressive
* as the wallet will need to process the new blocks * as the wallet will need to process the new blocks
*/ */
def startBitcoindBlockPolling( def startBitcoindBlockPolling(
wallet: WalletApi, wallet: WalletApi,
@ -300,73 +300,91 @@ object BitcoindRpcBackendUtil extends Logging {
interval: FiniteDuration = 10.seconds)(implicit interval: FiniteDuration = 10.seconds)(implicit
system: ActorSystem, system: ActorSystem,
ec: ExecutionContext): Future[Cancellable] = { ec: ExecutionContext): Future[Cancellable] = {
val walletSyncStateF = wallet.getSyncState()
val resultF: Future[Cancellable] = for { for {
walletSyncState <- walletSyncStateF walletSyncState <- wallet.getSyncState()
} yield { } yield {
val numParallelism = Runtime.getRuntime.availableProcessors() val numParallelism = Runtime.getRuntime.availableProcessors()
val atomicPrevCount: AtomicInteger = new AtomicInteger( val atomicPrevCount: AtomicInteger = new AtomicInteger(
walletSyncState.height) walletSyncState.height)
val processing = new AtomicBoolean(false) val processingBitcoindBlocks = new AtomicBoolean(false)
def pollBitcoind(): Future[Unit] = {
if (processingBitcoindBlocks.compareAndSet(false, true)) {
logger.trace("Polling bitcoind for block count")
val res: Future[Unit] = bitcoind.getBlockCount.flatMap { count =>
val prevCount = atomicPrevCount.get()
if (prevCount < count) {
logger.info(
s"Bitcoind has new block(s), requesting... ${count - prevCount} blocks")
// use .tail so we don't process the previous block that we already did
val range = prevCount.to(count).tail
val hashFs: Future[Seq[DoubleSha256Digest]] = Source(range)
.mapAsync(parallelism = numParallelism) { height =>
bitcoind.getBlockHash(height).map(_.flip)
}
.map { hash =>
val _ = atomicPrevCount.incrementAndGet()
hash
}
.toMat(Sink.seq)(Keep.right)
.run()
val requestsBlocksF = for {
hashes <- hashFs
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
} yield logger.debug(
"Successfully polled bitcoind for new blocks")
requestsBlocksF.failed.foreach { case err =>
val failedCount = atomicPrevCount.get
atomicPrevCount.set(prevCount)
logger.error(
s"Requesting blocks from bitcoind polling failed, range=[$prevCount, $failedCount]",
err)
}
requestsBlocksF
} else if (prevCount > count) {
Future.failed(new RuntimeException(
s"Bitcoind is at a block height ($count) before the wallet's ($prevCount)"))
} else {
logger.debug(s"In sync $prevCount count=$count")
Future.unit
}
}
res.onComplete(_ => processingBitcoindBlocks.set(false))
res
} else {
logger.info(
s"Skipping scanning the blockchain since a previously scheduled task is still running")
Future.unit
}
}
system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () => system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () =>
{ {
if (processing.compareAndSet(false, true)) { val f = for {
logger.trace("Polling bitcoind for block count") rescanning <- wallet.isRescanning()
val res = bitcoind.getBlockCount.flatMap { count => res <-
val prevCount = atomicPrevCount.get() if (!rescanning) {
if (prevCount < count) { pollBitcoind()
logger.info(
s"Bitcoind has new block(s), requesting... ${count - prevCount} blocks")
// use .tail so we don't process the previous block that we already did
val range = prevCount.to(count).tail
val hashFs: Future[Seq[DoubleSha256Digest]] = Source(range)
.mapAsync(parallelism = numParallelism) { height =>
bitcoind.getBlockHash(height).map(_.flip)
}
.map { hash =>
val _ = atomicPrevCount.incrementAndGet()
hash
}
.toMat(Sink.seq)(Keep.right)
.run()
val requestsBlocksF = for {
hashes <- hashFs
_ <- wallet.nodeApi.downloadBlocks(hashes.toVector)
} yield logger.debug(
"Successfully polled bitcoind for new blocks")
requestsBlocksF.failed.foreach { case err =>
val failedCount = atomicPrevCount.get
atomicPrevCount.set(prevCount)
logger.error(
s"Requesting blocks from bitcoind polling failed, range=[$prevCount, $failedCount]",
err)
}
requestsBlocksF
} else if (prevCount > count) {
Future.failed(new RuntimeException(
s"Bitcoind is at a block height ($count) before the wallet's ($prevCount)"))
} else { } else {
logger.debug(s"In sync $prevCount count=$count") logger.info(
s"Skipping scanning the blockchain during wallet rescan")
Future.unit Future.unit
} }
} } yield res
res.onComplete(_ => processing.set(false))
} else { f.failed.foreach(err => logger.error(s"Failed to poll bitcoind", err))
logger.info(
s"Skipping scanning the blockchain since a previously scheduled task is still running")
}
} }
} }
} }
resultF
} }
def startBitcoindMempoolPolling( def startBitcoindMempoolPolling(
wallet: WalletApi,
bitcoind: BitcoindRpcClient, bitcoind: BitcoindRpcClient,
interval: FiniteDuration = 10.seconds)( interval: FiniteDuration = 10.seconds)(
processTx: Transaction => Future[Unit])(implicit processTx: Transaction => Future[Unit])(implicit
@ -383,52 +401,70 @@ object BitcoindRpcBackendUtil extends Logging {
txids txids
} }
val processing = new AtomicBoolean(false) val processingMempool = new AtomicBoolean(false)
def pollMempool(): Future[Unit] = {
if (processingMempool.compareAndSet(false, true)) {
logger.debug("Polling bitcoind for mempool")
val numParallelism = {
val processors = Runtime.getRuntime.availableProcessors()
//max open requests is 32 in akka, so 1/8 of possible requests
//can be used to query the mempool, else just limit it be number of processors
//see: https://github.com/bitcoin-s/bitcoin-s/issues/4252
Math.min(4, processors)
}
//don't want to execute these in parallel
val processTxFlow = Sink.foreachAsync[Transaction](1)(processTx)
val res = for {
mempool <- bitcoind.getRawMemPool
newTxIds = getDiffAndReplace(mempool.toSet)
_ = logger.debug(s"Found ${newTxIds.size} new mempool transactions")
_ <- Source(newTxIds)
.mapAsync(parallelism = numParallelism) { txid =>
bitcoind
.getRawTransactionRaw(txid)
.map(Option(_))
.recover { case _: Throwable =>
None
}
.collect { case Some(tx) =>
tx
}
}
.toMat(processTxFlow)(Keep.right)
.run()
} yield {
logger.debug(
s"Done processing ${newTxIds.size} new mempool transactions")
()
}
res.onComplete(_ => processingMempool.set(false))
res
} else {
logger.info(
s"Skipping scanning the mempool since a previously scheduled task is still running")
Future.unit
}
}
system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () => system.scheduler.scheduleWithFixedDelay(0.seconds, interval) { () =>
{ {
if (processing.compareAndSet(false, true)) { val f = for {
logger.debug("Polling bitcoind for mempool") rescanning <- wallet.isRescanning()
val numParallelism = { res <-
val processors = Runtime.getRuntime.availableProcessors() if (!rescanning) {
//max open requests is 32 in akka, so 1/8 of possible requests pollMempool()
//can be used to query the mempool, else just limit it be number of processors } else {
//see: https://github.com/bitcoin-s/bitcoin-s/issues/4252 logger.info(s"Skipping scanning the mempool during wallet rescan")
Math.min(4, processors) Future.unit
} }
} yield res
//don't want to execute these in parallel f.failed.foreach(err => logger.error(s"Failed to poll mempool", err))
val processTxFlow = Sink.foreachAsync[Transaction](1)(processTx) ()
val res = for {
mempool <- bitcoind.getRawMemPool
newTxIds = getDiffAndReplace(mempool.toSet)
_ = logger.debug(s"Found ${newTxIds.size} new mempool transactions")
_ <- Source(newTxIds)
.mapAsync(parallelism = numParallelism) { txid =>
bitcoind
.getRawTransactionRaw(txid)
.map(Option(_))
.recover { case _: Throwable =>
None
}
.collect { case Some(tx) =>
tx
}
}
.toMat(processTxFlow)(Keep.right)
.run()
} yield {
logger.debug(
s"Done processing ${newTxIds.size} new mempool transactions")
()
}
res.onComplete(_ => processing.set(false))
} else {
logger.info(
s"Skipping scanning the mempool since a previously scheduled task is still running")
}
} }
} }
} }

View file

@ -711,7 +711,8 @@ case class WalletRoutes(wallet: AnyDLCHDWalletApi)(implicit
endOpt = endBlock, endOpt = endBlock,
addressBatchSize = addressBatchSize =
batchSize.getOrElse(wallet.discoveryBatchSize()), batchSize.getOrElse(wallet.discoveryBatchSize()),
useCreationTime = !ignoreCreationTime) useCreationTime = !ignoreCreationTime,
force = false)
Future.successful("Rescan started.") Future.successful("Rescan started.")
} else { } else {
Future.successful( Future.successful(

View file

@ -76,16 +76,17 @@ trait NeutrinoWalletApi { self: WalletApi =>
startOpt: Option[BlockStamp], startOpt: Option[BlockStamp],
endOpt: Option[BlockStamp], endOpt: Option[BlockStamp],
addressBatchSize: Int, addressBatchSize: Int,
useCreationTime: Boolean)(implicit useCreationTime: Boolean,
ec: ExecutionContext): Future[RescanState] force: Boolean)(implicit ec: ExecutionContext): Future[RescanState]
/** Helper method to rescan the ENTIRE blockchain. */ /** Helper method to rescan the ENTIRE blockchain. */
def fullRescanNeutrinoWallet(addressBatchSize: Int)(implicit def fullRescanNeutrinoWallet(addressBatchSize: Int, force: Boolean = false)(
ec: ExecutionContext): Future[RescanState] = implicit ec: ExecutionContext): Future[RescanState] =
rescanNeutrinoWallet(startOpt = None, rescanNeutrinoWallet(startOpt = None,
endOpt = None, endOpt = None,
addressBatchSize = addressBatchSize, addressBatchSize = addressBatchSize,
useCreationTime = false) useCreationTime = false,
force = force)
def discoveryBatchSize(): Int def discoveryBatchSize(): Int

View file

@ -56,7 +56,8 @@ class RescanDLCTest extends DualWalletTestCachedBitcoind {
_ <- wallet.rescanNeutrinoWallet(startOpt = None, _ <- wallet.rescanNeutrinoWallet(startOpt = None,
endOpt = Some(BlockHash(hash)), endOpt = Some(BlockHash(hash)),
addressBatchSize = 20, addressBatchSize = 20,
useCreationTime = false) useCreationTime = false,
force = false)
postStatus <- getDLCStatus(wallet) postStatus <- getDLCStatus(wallet)
} yield assert(postStatus.state == DLCState.Claimed) } yield assert(postStatus.state == DLCState.Claimed)
@ -96,7 +97,8 @@ class RescanDLCTest extends DualWalletTestCachedBitcoind {
_ <- wallet.rescanNeutrinoWallet(startOpt = None, _ <- wallet.rescanNeutrinoWallet(startOpt = None,
endOpt = Some(BlockHash(hash)), endOpt = Some(BlockHash(hash)),
addressBatchSize = 20, addressBatchSize = 20,
useCreationTime = false) useCreationTime = false,
force = false)
postStatus <- getDLCStatus(wallet) postStatus <- getDLCStatus(wallet)
} yield assert(postStatus.state == DLCState.RemoteClaimed) } yield assert(postStatus.state == DLCState.RemoteClaimed)

View file

@ -82,7 +82,8 @@ class BitcoindBlockPollingTest
txid1 <- bitcoind.sendToAddress(addr, amountToSend) txid1 <- bitcoind.sendToAddress(addr, amountToSend)
// Setup block polling // Setup block polling
_ = BitcoindRpcBackendUtil.startBitcoindMempoolPolling(bitcoind, _ = BitcoindRpcBackendUtil.startBitcoindMempoolPolling(wallet,
bitcoind,
1.second) { tx => 1.second) { tx =>
mempoolTxs += tx mempoolTxs += tx
FutureUtil.unit FutureUtil.unit

View file

@ -141,7 +141,8 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
endOpt = None, endOpt = None,
addressBatchSize = addressBatchSize =
DEFAULT_ADDR_BATCH_SIZE, DEFAULT_ADDR_BATCH_SIZE,
useCreationTime = false) useCreationTime = false,
force = false)
balance <- newTxWallet.getBalance() balance <- newTxWallet.getBalance()
unconfirmedBalance <- newTxWallet.getUnconfirmedBalance() unconfirmedBalance <- newTxWallet.getUnconfirmedBalance()
} yield { } yield {
@ -250,7 +251,8 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
endOpt = None, endOpt = None,
addressBatchSize = addressBatchSize =
DEFAULT_ADDR_BATCH_SIZE, DEFAULT_ADDR_BATCH_SIZE,
useCreationTime = true) useCreationTime = true,
force = false)
balance <- newTxWallet.getBalance() balance <- newTxWallet.getBalance()
unconfirmedBalance <- newTxWallet.getUnconfirmedBalance() unconfirmedBalance <- newTxWallet.getUnconfirmedBalance()
} yield { } yield {
@ -289,7 +291,8 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
endOpt = end, endOpt = end,
addressBatchSize = addressBatchSize =
DEFAULT_ADDR_BATCH_SIZE, DEFAULT_ADDR_BATCH_SIZE,
useCreationTime = false) useCreationTime = false,
force = false)
balanceAfterRescan <- wallet.getBalance() balanceAfterRescan <- wallet.getBalance()
} yield { } yield {
assert(balanceAfterRescan == CurrencyUnits.zero) assert(balanceAfterRescan == CurrencyUnits.zero)
@ -306,7 +309,8 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
endOpt = None, endOpt = None,
addressBatchSize = addressBatchSize =
DEFAULT_ADDR_BATCH_SIZE, DEFAULT_ADDR_BATCH_SIZE,
useCreationTime = true) useCreationTime = true,
force = false)
//slight delay to make sure other rescan is started //slight delay to make sure other rescan is started
val alreadyStartedF = val alreadyStartedF =
@ -315,7 +319,8 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
endOpt = None, endOpt = None,
addressBatchSize = addressBatchSize =
DEFAULT_ADDR_BATCH_SIZE, DEFAULT_ADDR_BATCH_SIZE,
useCreationTime = true) useCreationTime = true,
force = false)
} }
for { for {
start <- startF start <- startF
@ -338,7 +343,8 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
_ <- wallet.rescanNeutrinoWallet(startOpt = None, _ <- wallet.rescanNeutrinoWallet(startOpt = None,
endOpt = None, endOpt = None,
addressBatchSize = 10, addressBatchSize = 10,
useCreationTime = true) useCreationTime = true,
force = false)
usedAddresses <- wallet.listFundedAddresses() usedAddresses <- wallet.listFundedAddresses()
@ -370,13 +376,15 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
_ <- wallet.rescanNeutrinoWallet(startOpt = None, _ <- wallet.rescanNeutrinoWallet(startOpt = None,
endOpt = None, endOpt = None,
addressBatchSize = 10, addressBatchSize = 10,
useCreationTime = true) useCreationTime = true,
force = false)
addressNoFunds <- wallet.getNewChangeAddress() addressNoFunds <- wallet.getNewChangeAddress()
//rescan again //rescan again
_ <- wallet.rescanNeutrinoWallet(startOpt = None, _ <- wallet.rescanNeutrinoWallet(startOpt = None,
endOpt = None, endOpt = None,
addressBatchSize = 10, addressBatchSize = 10,
useCreationTime = true) useCreationTime = true,
force = false)
txid <- bitcoind.sendToAddress(addressNoFunds, amt) txid <- bitcoind.sendToAddress(addressNoFunds, amt)
tx <- bitcoind.getRawTransactionRaw(txid) tx <- bitcoind.getRawTransactionRaw(txid)
_ <- wallet.processTransaction(tx, None) _ <- wallet.processTransaction(tx, None)

View file

@ -159,7 +159,7 @@ abstract class Wallet
override def stop(): Future[Wallet] = Future.successful(this) override def stop(): Future[Wallet] = Future.successful(this)
def getSyncDescriptorOpt(): Future[Option[SyncHeightDescriptor]] = { def getSyncDescriptorOpt(): Future[Option[SyncHeightDescriptor]] = {
stateDescriptorDAO.getSyncDescriptorOpt() stateDescriptorDAO.getSyncHeight()
} }
override def getSyncState(): Future[BlockSyncState] = { override def getSyncState(): Future[BlockSyncState] = {

View file

@ -57,16 +57,18 @@ trait WalletDbManagement extends DbManagement {
// Ordering matters here, tables with a foreign key should be listed after // Ordering matters here, tables with a foreign key should be listed after
// the table that key references // the table that key references
override lazy val allTables: List[TableQuery[Table[_]]] = { override lazy val allTables: List[TableQuery[Table[_]]] = {
List(spkTable, List(
accountTable, spkTable,
addressTable, accountTable,
addressTagTable, addressTable,
txTable, addressTagTable,
incomingTxTable, txTable,
utxoTable, incomingTxTable,
outgoingTxTable, utxoTable,
stateDescriptorTable, outgoingTxTable,
masterXPubTable) stateDescriptorTable,
masterXPubTable
)
} }
} }

View file

@ -15,35 +15,31 @@ import org.bitcoins.core.wallet.rescan.RescanState
import org.bitcoins.crypto.DoubleSha256Digest import org.bitcoins.crypto.DoubleSha256Digest
import org.bitcoins.wallet.{Wallet, WalletLogger} import org.bitcoins.wallet.{Wallet, WalletLogger}
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
private[wallet] trait RescanHandling extends WalletLogger { private[wallet] trait RescanHandling extends WalletLogger {
self: Wallet => self: Wallet =>
private val rescanning = new AtomicBoolean(false)
///////////////////// /////////////////////
// Public facing API // Public facing API
override def isRescanning(): Future[Boolean] = override def isRescanning(): Future[Boolean] = stateDescriptorDAO.isRescanning
Future.successful(rescanning.get())
/** @inheritdoc */ /** @inheritdoc */
override def rescanNeutrinoWallet( override def rescanNeutrinoWallet(
startOpt: Option[BlockStamp], startOpt: Option[BlockStamp],
endOpt: Option[BlockStamp], endOpt: Option[BlockStamp],
addressBatchSize: Int, addressBatchSize: Int,
useCreationTime: Boolean)(implicit useCreationTime: Boolean,
ec: ExecutionContext): Future[RescanState] = { force: Boolean)(implicit ec: ExecutionContext): Future[RescanState] = {
for { for {
account <- getDefaultAccount() account <- getDefaultAccount()
state <- rescanNeutrinoWallet(account.hdAccount, state <- rescanNeutrinoWallet(account.hdAccount,
startOpt, startOpt,
endOpt, endOpt,
addressBatchSize, addressBatchSize,
useCreationTime) useCreationTime,
force)
} yield state } yield state
} }
@ -53,45 +49,55 @@ private[wallet] trait RescanHandling extends WalletLogger {
startOpt: Option[BlockStamp], startOpt: Option[BlockStamp],
endOpt: Option[BlockStamp], endOpt: Option[BlockStamp],
addressBatchSize: Int, addressBatchSize: Int,
useCreationTime: Boolean = true): Future[RescanState] = { useCreationTime: Boolean = true,
if (rescanning.get()) { force: Boolean = false): Future[RescanState] = {
logger.warn( for {
s"Rescan already started, ignoring request to start another one") doRescan <-
Future.successful(RescanState.RescanInProgress) if (force) stateDescriptorDAO.updateRescanning(true).map(_.rescanning)
} else { else
rescanning.set(true) stateDescriptorDAO.compareAndSetRescanning(expectedValue = false,
logger.info( newValue = true)
s"Starting rescanning the wallet from ${startOpt} to ${endOpt} useCreationTime=$useCreationTime") rescanState <-
val start = System.currentTimeMillis() if (doRescan) {
val res = for {
start <- (startOpt, useCreationTime) match {
case (Some(_), true) =>
Future.failed(new IllegalArgumentException(
"Cannot define a starting block and use the wallet creation time"))
case (Some(value), false) =>
Future.successful(Some(value))
case (None, true) =>
walletCreationBlockHeight.map(Some(_))
case (None, false) =>
Future.successful(None)
}
_ <- clearUtxos(account)
_ <- doNeutrinoRescan(account, start, endOpt, addressBatchSize)
} yield {
RescanState.RescanDone
}
res.onComplete {
case Success(_) =>
rescanning.set(false)
logger.info( logger.info(
s"Finished rescanning the wallet. It took ${System.currentTimeMillis() - start}ms") s"Starting rescanning the wallet from ${startOpt} to ${endOpt} useCreationTime=$useCreationTime")
case Failure(err) => val startTime = System.currentTimeMillis()
rescanning.set(false) val res = for {
logger.error(s"Failed to rescan wallet", err) start <- (startOpt, useCreationTime) match {
} case (Some(_), true) =>
res Future.failed(new IllegalArgumentException(
} "Cannot define a starting block and use the wallet creation time"))
case (Some(value), false) =>
Future.successful(Some(value))
case (None, true) =>
walletCreationBlockHeight.map(Some(_))
case (None, false) =>
Future.successful(None)
}
_ <- clearUtxos(account)
_ <- doNeutrinoRescan(account, start, endOpt, addressBatchSize)
_ <- stateDescriptorDAO.updateRescanning(false)
} yield {
logger.info(s"Finished rescanning the wallet. It took ${System
.currentTimeMillis() - startTime}ms")
RescanState.RescanDone
}
res.recoverWith { case err: Throwable =>
logger.error(s"Failed to rescan wallet", err)
stateDescriptorDAO
.updateRescanning(false)
.flatMap(_ => Future.failed(err))
}
res
} else {
logger.warn(
s"Rescan already started, ignoring request to start another one")
Future.successful(RescanState.RescanInProgress)
}
} yield rescanState
} }
/** @inheritdoc */ /** @inheritdoc */

View file

@ -2,6 +2,7 @@ package org.bitcoins.wallet.models
import org.bitcoins.commons.jsonmodels.wallet.WalletStateDescriptorType._ import org.bitcoins.commons.jsonmodels.wallet.WalletStateDescriptorType._
import org.bitcoins.commons.jsonmodels.wallet.{ import org.bitcoins.commons.jsonmodels.wallet.{
RescanDescriptor,
SyncHeightDescriptor, SyncHeightDescriptor,
WalletStateDescriptor, WalletStateDescriptor,
WalletStateDescriptorType WalletStateDescriptorType
@ -55,7 +56,7 @@ case class WalletStateDescriptorDAO()(implicit
Seq] = Seq] =
findByPrimaryKeys(ts.map(_.tpe)) findByPrimaryKeys(ts.map(_.tpe))
def getSyncDescriptorOpt(): Future[Option[SyncHeightDescriptor]] = { def getSyncHeight(): Future[Option[SyncHeightDescriptor]] = {
read(SyncHeight).map { read(SyncHeight).map {
case Some(db) => case Some(db) =>
val desc = SyncHeightDescriptor.fromString(db.descriptor.toString) val desc = SyncHeightDescriptor.fromString(db.descriptor.toString)
@ -67,22 +68,73 @@ case class WalletStateDescriptorDAO()(implicit
def updateSyncHeight( def updateSyncHeight(
hash: DoubleSha256DigestBE, hash: DoubleSha256DigestBE,
height: Int): Future[WalletStateDescriptorDb] = { height: Int): Future[WalletStateDescriptorDb] = {
getSyncDescriptorOpt().flatMap { val tpe: WalletStateDescriptorType = SyncHeight
case Some(old) => val query = table.filter(_.tpe === tpe)
if (old.height > height) { val action = for {
Future.successful(WalletStateDescriptorDb(SyncHeight, old)) oldOpt <- query.result.headOption
} else { res: WalletStateDescriptorDb <- oldOpt match {
case Some(oldDb) =>
val old = SyncHeightDescriptor.fromString(oldDb.descriptor.toString)
if (old.height > height) {
DBIO.successful(WalletStateDescriptorDb(tpe, old))
} else {
val descriptor = SyncHeightDescriptor(hash, height)
val newDb = WalletStateDescriptorDb(tpe, descriptor)
query.update(newDb).map(_ => newDb)
}
case None =>
val descriptor = SyncHeightDescriptor(hash, height) val descriptor = SyncHeightDescriptor(hash, height)
val newDb = WalletStateDescriptorDb(SyncHeight, descriptor) val db = WalletStateDescriptorDb(tpe, descriptor)
update(newDb) (table += db).map(_ => db)
} }
case None => } yield res
val descriptor = SyncHeightDescriptor(hash, height) safeDatabase.run(action)
val db = WalletStateDescriptorDb(SyncHeight, descriptor) }
create(db)
def getRescan(): Future[Option[RescanDescriptor]] = {
read(Rescan).map {
case Some(db) =>
val desc = RescanDescriptor.fromString(db.descriptor.toString)
Some(desc)
case None => None
} }
} }
def isRescanning: Future[Boolean] = getRescan().map(_.exists(_.rescanning))
def updateRescanning(rescanning: Boolean): Future[RescanDescriptor] = {
val desc = RescanDescriptor(rescanning)
upsert(WalletStateDescriptorDb(desc.descriptorType, desc)).map(_ => desc)
}
def compareAndSetRescanning(
expectedValue: Boolean,
newValue: Boolean): Future[Boolean] = {
val tpe: WalletStateDescriptorType = Rescan
val query = table.filter(_.tpe === tpe)
val actions = for {
dbs <- query.result
res <- dbs.headOption match {
case None =>
val desc = RescanDescriptor(newValue)
val db = WalletStateDescriptorDb(tpe, desc)
(table += db).map(_ => true)
case Some(db) =>
val oldDesc = RescanDescriptor.fromString(db.descriptor.toString)
if (oldDesc.rescanning == expectedValue) {
val newDesc = RescanDescriptor(true)
val newDb = WalletStateDescriptorDb(tpe, newDesc)
query.update(newDb).map(_ => true)
} else {
DBIO.successful(false)
}
}
} yield res
safeDatabase.run(actions)
}
class WalletStateDescriptorTable(t: Tag) class WalletStateDescriptorTable(t: Tag)
extends Table[WalletStateDescriptorDb](t, extends Table[WalletStateDescriptorDb](t,
schemaName, schemaName,