From 76a3edd0afa55a7c498cb69169cf8471be354d3b Mon Sep 17 00:00:00 2001 From: Chris Stewart Date: Fri, 10 Apr 2020 14:19:39 -0500 Subject: [PATCH] 2020 04 02 get new address queue (#1299) * WIP: Build queue for getnewaddress * Implement background thread to process requests to getNewAddress so that we fix issue 1009 with it's async issues * Run scalafmt * Replace the mutable.ArrayBuffer in AddressHandling with ConcurrentLinkedQueue * Put FundTransactionhandling.fundRawTransactionInternal's call to getNewChangeAddress() into the for expression so we don't generate an address when the funding of the transaction fails when selecting utxos * Move thread initialization out of method * Switch to using ArrayBlockingQueue, which will block thread with .take() so we don't need to loop constantly, add 'addressQueueSize' and 'addressQueueTimeout' configurations * Update wallet-test/src/test/scala/org/bitcoins/wallet/AddressHandlingTest.scala Co-Authored-By: rorp * Add error handling to return the failed future if the queue is full, add a unit test for when the queue is full * Run scalafmt * Rebase * Add scaladoc for throwing an exception * Run scalafmt again Co-authored-by: rorp --- app/server/src/main/resources/reference.conf | 7 ++ docs/config/configuration.md | 11 ++- testkit/src/main/resources/reference.conf | 8 ++ .../testkit/wallet/FundWalletUtil.scala | 14 +--- .../bitcoins/wallet/AddressHandlingTest.scala | 46 +++++++++++ .../wallet/config/WalletAppConfig.scala | 22 ++++++ .../wallet/internal/AddressHandling.scala | 79 +++++++++++++++++-- .../internal/FundTransactionHandling.scala | 4 +- 8 files changed, 169 insertions(+), 22 deletions(-) diff --git a/app/server/src/main/resources/reference.conf b/app/server/src/main/resources/reference.conf index 00589e140d..720e64ca30 100644 --- a/app/server/src/main/resources/reference.conf +++ b/app/server/src/main/resources/reference.conf @@ -70,6 +70,13 @@ bitcoin-s { discoveryBatchSize = 100 requiredConfirmations = 6 + # How big the address queue size is before we throw an exception + # because of an overflow + addressQueueSize = 10 + + # How long we attempt to generate an address for + # before we timeout + addressQueueTimeout = 5 seconds } } diff --git a/docs/config/configuration.md b/docs/config/configuration.md index cc4bb8142d..1333abf404 100644 --- a/docs/config/configuration.md +++ b/docs/config/configuration.md @@ -142,7 +142,16 @@ bitcoin-s { discoveryBatchSize = 100 requiredConfirmations = 6 - } + + # How big the address queue size is before we throw an exception + # because of an overflow + addressQueueSize = 10 + + # How long we attempt to generate an address for + # before we timeout + addressQueueTimeout = 5 seconds + + } } diff --git a/testkit/src/main/resources/reference.conf b/testkit/src/main/resources/reference.conf index e3a05561fa..6767e313b8 100644 --- a/testkit/src/main/resources/reference.conf +++ b/testkit/src/main/resources/reference.conf @@ -71,6 +71,14 @@ bitcoin-s { discoveryBatchSize = 100 requiredConfirmations = 6 + + # How big the address queue size is before we throw an exception + # because of an overflow + addressQueueSize = 10 + + # How long we attempt to generate an address for + # before we timeout + addressQueueTimeout = 5 seconds } } diff --git a/testkit/src/main/scala/org/bitcoins/testkit/wallet/FundWalletUtil.scala b/testkit/src/main/scala/org/bitcoins/testkit/wallet/FundWalletUtil.scala index 1d13e808b2..ca815d7104 100644 --- a/testkit/src/main/scala/org/bitcoins/testkit/wallet/FundWalletUtil.scala +++ b/testkit/src/main/scala/org/bitcoins/testkit/wallet/FundWalletUtil.scala @@ -21,18 +21,8 @@ trait FundWalletUtil { account: HDAccount, wallet: Wallet)(implicit ec: ExecutionContext): Future[Wallet] = { - val init = Future.successful(Vector.empty[BitcoinAddress]) - val addressesF: Future[Vector[BitcoinAddress]] = 0.until(3).foldLeft(init) { - case (accumF, _) => - //this Thread.sleep is needed because of - //https://github.com/bitcoin-s/bitcoin-s/issues/1009 - //once that is resolved we should be able to remove this - for { - accum <- accumF - address <- wallet.getNewAddress(account) - } yield { - accum.:+(address) - } + val addressesF: Future[Vector[BitcoinAddress]] = Future.sequence { + Vector.fill(3)(wallet.getNewAddress(account)) } //construct three txs that send money to these addresses diff --git a/wallet-test/src/test/scala/org/bitcoins/wallet/AddressHandlingTest.scala b/wallet-test/src/test/scala/org/bitcoins/wallet/AddressHandlingTest.scala index d3f3418291..7165079ec2 100644 --- a/wallet-test/src/test/scala/org/bitcoins/wallet/AddressHandlingTest.scala +++ b/wallet-test/src/test/scala/org/bitcoins/wallet/AddressHandlingTest.scala @@ -3,9 +3,12 @@ package org.bitcoins.wallet import org.bitcoins.core.currency.Satoshis import org.bitcoins.core.wallet.fee.SatoshisPerVirtualByte import org.bitcoins.testkit.wallet.FundWalletUtil.FundedWallet +import org.bitcoins.rpc.util.AsyncUtil import org.bitcoins.testkit.wallet.{BitcoinSWalletTest, WalletTestUtil} import org.scalatest.FutureOutcome +import scala.concurrent.Future + class AddressHandlingTest extends BitcoinSWalletTest { type FixtureParam = FundedWallet @@ -68,4 +71,47 @@ class AddressHandlingTest extends BitcoinSWalletTest { assert(address2 != address3, "Must generate a new address") } } + + it must "be safe to call getNewAddress multiple times in a row" in { + fundedWallet: FundedWallet => + val wallet = fundedWallet.wallet + val addressesF = Future.sequence { + Vector.fill(10)(wallet.getNewAddress()) + } + + for { + addresses <- addressesF + } yield { + assert(addresses.size == 10) + assert(addresses.distinct.length == addresses.length, + s"We receive an identical address!") + + } + } + + it must "fail with an illegal state exception if the queue is full" in { + fundedWallet: FundedWallet => + val wallet = fundedWallet.wallet + //attempt to generate 20 addresses simultaneously + //this should overwhelm our buffer size of 10 + val numAddress = 20 + val generatedF = Vector.fill(numAddress)(wallet.getNewAddress()) + + //some hacking here so we don't get an ugly stack trace + //when the thread gets killed while processing things in the queue + //we want to make sure everything is done processing before we assert + //we failed + val allCompletedF = + AsyncUtil.retryUntilSatisfied(generatedF.forall(_.isCompleted)) + val addressesF = allCompletedF.flatMap { _ => + Future.sequence { + generatedF + } + } + + recoverToSucceededIf[IllegalStateException] { + addressesF + } + } + } diff --git a/wallet/src/main/scala/org/bitcoins/wallet/config/WalletAppConfig.scala b/wallet/src/main/scala/org/bitcoins/wallet/config/WalletAppConfig.scala index eadbd3ccf0..e25cff243e 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/config/WalletAppConfig.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/config/WalletAppConfig.scala @@ -1,6 +1,7 @@ package org.bitcoins.wallet.config import java.nio.file.{Files, Path} +import java.util.concurrent.TimeUnit import com.typesafe.config.Config import org.bitcoins.core.hd._ @@ -9,6 +10,7 @@ import org.bitcoins.db.AppConfig import org.bitcoins.keymanager.{KeyManagerParams, WalletStorage} import org.bitcoins.wallet.db.WalletDbManagement +import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.concurrent.{ExecutionContext, Future} /** Configuration for the Bitcoin-S wallet @@ -98,6 +100,26 @@ case class WalletAppConfig( def kmParams: KeyManagerParams = KeyManagerParams(seedPath, defaultAccountKind, network) + /** How much elements we can have in [[org.bitcoins.wallet.internal.AddressHandling.addressRequestQueue]] + * before we throw an exception */ + def addressQueueSize: Int = { + if (config.hasPath("wallet.addressQueueSize")) { + config.getInt("wallet.addressQueueSize") + } else { + 100 + } + } + + /** How long we wait while generating an address in [[org.bitcoins.wallet.internal.AddressHandling.addressRequestQueue]] + * before we timeout */ + def addressQueueTimeout: scala.concurrent.duration.Duration = { + if (config.hasPath("wallet.addressQueueTimeout")) { + val javaDuration = config.getDuration("wallet.addressQueueTimeout") + new FiniteDuration(javaDuration.toNanos, TimeUnit.NANOSECONDS) + } else { + 5.second + } + } } object WalletAppConfig { diff --git a/wallet/src/main/scala/org/bitcoins/wallet/internal/AddressHandling.scala b/wallet/src/main/scala/org/bitcoins/wallet/internal/AddressHandling.scala index efc3a0221a..9925bb6731 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/internal/AddressHandling.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/internal/AddressHandling.scala @@ -14,7 +14,7 @@ import org.bitcoins.wallet._ import org.bitcoins.wallet.api.AddressInfo import org.bitcoins.wallet.models.{AccountDb, AddressDb, AddressDbHelper} -import scala.concurrent.Future +import scala.concurrent.{Await, Future, Promise, TimeoutException} import scala.util.{Failure, Success} /** @@ -146,18 +146,21 @@ private[wallet] trait AddressHandling extends WalletLogger { } } + /** Queues a request to generate an address and returns a Future that will + * be completed when the request is processed in the queue. If the queue + * is full it throws an exception. + * @throws IllegalStateException + * */ private def getNewAddressHelper( account: AccountDb, chainType: HDChainType ): Future[BitcoinAddress] = { + val p = Promise[AddressDb] + addressRequestQueue.add((account, chainType, p)) for { - addressDb <- getNewAddressDb(account, chainType) - _ = logger.debug(s"Writing $addressDb to DB") - written <- addressDAO.create(addressDb) + addressDb <- p.future } yield { - logger.debug( - s"Got ${chainType} address ${written.address} at key path ${written.path} with pubkey ${written.ecPublicKey}") - written.address + addressDb.address } } @@ -309,4 +312,66 @@ private[wallet] trait AddressHandling extends WalletLogger { } } + /** Background thread meant to ensure safety when calling [[getNewAddress()]] + * We to ensure independent calls to getNewAddress don't result in a race condition + * to the database that would generate the same address and cause an error. + * With this background thread, we poll the [[addressRequestQueue]] seeing if there + * are any elements in it, if there are, we process them and complete the Promise in the queue. */ + lazy val walletThread = new Thread(AddressQueueRunnable) + + lazy val addressRequestQueue = { + new java.util.concurrent.ArrayBlockingQueue[( + AccountDb, + HDChainType, + Promise[AddressDb])]( + walletConfig.addressQueueSize + ) + } + walletThread.setDaemon(true) + walletThread.setName(s"wallet-address-queue-${System.currentTimeMillis()}") + walletThread.start() + + /** A runnable that drains [[addressRequestQueue]]. Currently polls every 100ms + * seeing if things are in the queue. This is needed because otherwise + * wallet address generation is not async safe. + * @see https://github.com/bitcoin-s/bitcoin-s/issues/1009 + * */ + private case object AddressQueueRunnable extends Runnable { + override def run(): Unit = { + while (!walletThread.isInterrupted) { + val (account, chainType, promise) = addressRequestQueue.take() + logger.debug( + s"Processing $account $chainType in our address request queue") + + val addressDbF = getNewAddressDb(account, chainType) + val resultF: Future[BitcoinAddress] = addressDbF.flatMap { addressDb => + val writeF = addressDAO.create(addressDb) + + val addrF = writeF.map { w => + promise.success(w) + w.address + } + addrF.failed.foreach { exn => + promise.failure(exn) + } + addrF + } + //make sure this is completed before we iterate to the next one + //otherwise we will possibly have a race condition + + try { + Await.result(resultF, walletConfig.addressQueueTimeout) + } catch { + case timeout: TimeoutException => + logger.error( + s"Timeout for generating address account=$account chainType=$chainType!", + timeout) + //continue executing + case scala.util.control.NonFatal(exn) => + logger.error(s"Failed to generate address for $account $chainType", + exn) + } + } + } + } } diff --git a/wallet/src/main/scala/org/bitcoins/wallet/internal/FundTransactionHandling.scala b/wallet/src/main/scala/org/bitcoins/wallet/internal/FundTransactionHandling.scala index 709b5fdf7e..9b3c4da63d 100644 --- a/wallet/src/main/scala/org/bitcoins/wallet/internal/FundTransactionHandling.scala +++ b/wallet/src/main/scala/org/bitcoins/wallet/internal/FundTransactionHandling.scala @@ -59,7 +59,7 @@ trait FundTransactionHandling extends WalletLogger { self: LockedWalletApi => keyManagerOpt: Option[BIP39KeyManager], markAsReserved: Boolean = false): Future[BitcoinTxBuilder] = { val utxosF = listUtxos(fromAccount.hdAccount) - val changeAddrF = getNewChangeAddress(fromAccount) + val selectedUtxosF = for { walletUtxos <- utxosF //currently just grab the biggest utxos @@ -82,7 +82,7 @@ trait FundTransactionHandling extends WalletLogger { self: LockedWalletApi => val txBuilderF = for { addrInfosWithUtxo <- addrInfosWithUtxoF - change <- changeAddrF + change <- getNewChangeAddress(fromAccount) utxoSpendingInfos = { addrInfosWithUtxo.map { case (utxo, addrInfo) =>