2020 04 02 get new address queue (#1299)

* WIP: Build queue for getnewaddress

* Implement background thread to process requests to getNewAddress so that we fix issue 1009 with it's async issues

* Run scalafmt

* Replace the mutable.ArrayBuffer in AddressHandling with ConcurrentLinkedQueue

* Put FundTransactionhandling.fundRawTransactionInternal's call to getNewChangeAddress() into the for expression so we don't generate an address when the funding of the transaction fails when selecting utxos

* Move thread initialization out of method

* Switch to using ArrayBlockingQueue, which will block thread with .take() so we don't need to loop constantly, add 'addressQueueSize' and 'addressQueueTimeout' configurations

* Update wallet-test/src/test/scala/org/bitcoins/wallet/AddressHandlingTest.scala

Co-Authored-By: rorp <rorp@users.noreply.github.com>

* Add error handling to return the failed future if the queue is full, add a unit test for when the queue is full

* Run scalafmt

* Rebase

* Add scaladoc for throwing an exception

* Run scalafmt again

Co-authored-by: rorp <rorp@users.noreply.github.com>
This commit is contained in:
Chris Stewart 2020-04-10 14:19:39 -05:00 committed by GitHub
parent 0532d97ae5
commit 76a3edd0af
8 changed files with 169 additions and 22 deletions

View File

@ -70,6 +70,13 @@ bitcoin-s {
discoveryBatchSize = 100
requiredConfirmations = 6
# How big the address queue size is before we throw an exception
# because of an overflow
addressQueueSize = 10
# How long we attempt to generate an address for
# before we timeout
addressQueueTimeout = 5 seconds
}
}

View File

@ -142,6 +142,15 @@ bitcoin-s {
discoveryBatchSize = 100
requiredConfirmations = 6
# How big the address queue size is before we throw an exception
# because of an overflow
addressQueueSize = 10
# How long we attempt to generate an address for
# before we timeout
addressQueueTimeout = 5 seconds
}
}

View File

@ -71,6 +71,14 @@ bitcoin-s {
discoveryBatchSize = 100
requiredConfirmations = 6
# How big the address queue size is before we throw an exception
# because of an overflow
addressQueueSize = 10
# How long we attempt to generate an address for
# before we timeout
addressQueueTimeout = 5 seconds
}
}

View File

@ -21,18 +21,8 @@ trait FundWalletUtil {
account: HDAccount,
wallet: Wallet)(implicit ec: ExecutionContext): Future[Wallet] = {
val init = Future.successful(Vector.empty[BitcoinAddress])
val addressesF: Future[Vector[BitcoinAddress]] = 0.until(3).foldLeft(init) {
case (accumF, _) =>
//this Thread.sleep is needed because of
//https://github.com/bitcoin-s/bitcoin-s/issues/1009
//once that is resolved we should be able to remove this
for {
accum <- accumF
address <- wallet.getNewAddress(account)
} yield {
accum.:+(address)
}
val addressesF: Future[Vector[BitcoinAddress]] = Future.sequence {
Vector.fill(3)(wallet.getNewAddress(account))
}
//construct three txs that send money to these addresses

View File

@ -3,9 +3,12 @@ package org.bitcoins.wallet
import org.bitcoins.core.currency.Satoshis
import org.bitcoins.core.wallet.fee.SatoshisPerVirtualByte
import org.bitcoins.testkit.wallet.FundWalletUtil.FundedWallet
import org.bitcoins.rpc.util.AsyncUtil
import org.bitcoins.testkit.wallet.{BitcoinSWalletTest, WalletTestUtil}
import org.scalatest.FutureOutcome
import scala.concurrent.Future
class AddressHandlingTest extends BitcoinSWalletTest {
type FixtureParam = FundedWallet
@ -68,4 +71,47 @@ class AddressHandlingTest extends BitcoinSWalletTest {
assert(address2 != address3, "Must generate a new address")
}
}
it must "be safe to call getNewAddress multiple times in a row" in {
fundedWallet: FundedWallet =>
val wallet = fundedWallet.wallet
val addressesF = Future.sequence {
Vector.fill(10)(wallet.getNewAddress())
}
for {
addresses <- addressesF
} yield {
assert(addresses.size == 10)
assert(addresses.distinct.length == addresses.length,
s"We receive an identical address!")
}
}
it must "fail with an illegal state exception if the queue is full" in {
fundedWallet: FundedWallet =>
val wallet = fundedWallet.wallet
//attempt to generate 20 addresses simultaneously
//this should overwhelm our buffer size of 10
val numAddress = 20
val generatedF = Vector.fill(numAddress)(wallet.getNewAddress())
//some hacking here so we don't get an ugly stack trace
//when the thread gets killed while processing things in the queue
//we want to make sure everything is done processing before we assert
//we failed
val allCompletedF =
AsyncUtil.retryUntilSatisfied(generatedF.forall(_.isCompleted))
val addressesF = allCompletedF.flatMap { _ =>
Future.sequence {
generatedF
}
}
recoverToSucceededIf[IllegalStateException] {
addressesF
}
}
}

View File

@ -1,6 +1,7 @@
package org.bitcoins.wallet.config
import java.nio.file.{Files, Path}
import java.util.concurrent.TimeUnit
import com.typesafe.config.Config
import org.bitcoins.core.hd._
@ -9,6 +10,7 @@ import org.bitcoins.db.AppConfig
import org.bitcoins.keymanager.{KeyManagerParams, WalletStorage}
import org.bitcoins.wallet.db.WalletDbManagement
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
/** Configuration for the Bitcoin-S wallet
@ -98,6 +100,26 @@ case class WalletAppConfig(
def kmParams: KeyManagerParams =
KeyManagerParams(seedPath, defaultAccountKind, network)
/** How much elements we can have in [[org.bitcoins.wallet.internal.AddressHandling.addressRequestQueue]]
* before we throw an exception */
def addressQueueSize: Int = {
if (config.hasPath("wallet.addressQueueSize")) {
config.getInt("wallet.addressQueueSize")
} else {
100
}
}
/** How long we wait while generating an address in [[org.bitcoins.wallet.internal.AddressHandling.addressRequestQueue]]
* before we timeout */
def addressQueueTimeout: scala.concurrent.duration.Duration = {
if (config.hasPath("wallet.addressQueueTimeout")) {
val javaDuration = config.getDuration("wallet.addressQueueTimeout")
new FiniteDuration(javaDuration.toNanos, TimeUnit.NANOSECONDS)
} else {
5.second
}
}
}
object WalletAppConfig {

View File

@ -14,7 +14,7 @@ import org.bitcoins.wallet._
import org.bitcoins.wallet.api.AddressInfo
import org.bitcoins.wallet.models.{AccountDb, AddressDb, AddressDbHelper}
import scala.concurrent.Future
import scala.concurrent.{Await, Future, Promise, TimeoutException}
import scala.util.{Failure, Success}
/**
@ -146,18 +146,21 @@ private[wallet] trait AddressHandling extends WalletLogger {
}
}
/** Queues a request to generate an address and returns a Future that will
* be completed when the request is processed in the queue. If the queue
* is full it throws an exception.
* @throws IllegalStateException
* */
private def getNewAddressHelper(
account: AccountDb,
chainType: HDChainType
): Future[BitcoinAddress] = {
val p = Promise[AddressDb]
addressRequestQueue.add((account, chainType, p))
for {
addressDb <- getNewAddressDb(account, chainType)
_ = logger.debug(s"Writing $addressDb to DB")
written <- addressDAO.create(addressDb)
addressDb <- p.future
} yield {
logger.debug(
s"Got ${chainType} address ${written.address} at key path ${written.path} with pubkey ${written.ecPublicKey}")
written.address
addressDb.address
}
}
@ -309,4 +312,66 @@ private[wallet] trait AddressHandling extends WalletLogger {
}
}
/** Background thread meant to ensure safety when calling [[getNewAddress()]]
* We to ensure independent calls to getNewAddress don't result in a race condition
* to the database that would generate the same address and cause an error.
* With this background thread, we poll the [[addressRequestQueue]] seeing if there
* are any elements in it, if there are, we process them and complete the Promise in the queue. */
lazy val walletThread = new Thread(AddressQueueRunnable)
lazy val addressRequestQueue = {
new java.util.concurrent.ArrayBlockingQueue[(
AccountDb,
HDChainType,
Promise[AddressDb])](
walletConfig.addressQueueSize
)
}
walletThread.setDaemon(true)
walletThread.setName(s"wallet-address-queue-${System.currentTimeMillis()}")
walletThread.start()
/** A runnable that drains [[addressRequestQueue]]. Currently polls every 100ms
* seeing if things are in the queue. This is needed because otherwise
* wallet address generation is not async safe.
* @see https://github.com/bitcoin-s/bitcoin-s/issues/1009
* */
private case object AddressQueueRunnable extends Runnable {
override def run(): Unit = {
while (!walletThread.isInterrupted) {
val (account, chainType, promise) = addressRequestQueue.take()
logger.debug(
s"Processing $account $chainType in our address request queue")
val addressDbF = getNewAddressDb(account, chainType)
val resultF: Future[BitcoinAddress] = addressDbF.flatMap { addressDb =>
val writeF = addressDAO.create(addressDb)
val addrF = writeF.map { w =>
promise.success(w)
w.address
}
addrF.failed.foreach { exn =>
promise.failure(exn)
}
addrF
}
//make sure this is completed before we iterate to the next one
//otherwise we will possibly have a race condition
try {
Await.result(resultF, walletConfig.addressQueueTimeout)
} catch {
case timeout: TimeoutException =>
logger.error(
s"Timeout for generating address account=$account chainType=$chainType!",
timeout)
//continue executing
case scala.util.control.NonFatal(exn) =>
logger.error(s"Failed to generate address for $account $chainType",
exn)
}
}
}
}
}

View File

@ -59,7 +59,7 @@ trait FundTransactionHandling extends WalletLogger { self: LockedWalletApi =>
keyManagerOpt: Option[BIP39KeyManager],
markAsReserved: Boolean = false): Future[BitcoinTxBuilder] = {
val utxosF = listUtxos(fromAccount.hdAccount)
val changeAddrF = getNewChangeAddress(fromAccount)
val selectedUtxosF = for {
walletUtxos <- utxosF
//currently just grab the biggest utxos
@ -82,7 +82,7 @@ trait FundTransactionHandling extends WalletLogger { self: LockedWalletApi =>
val txBuilderF = for {
addrInfosWithUtxo <- addrInfosWithUtxoF
change <- changeAddrF
change <- getNewChangeAddress(fromAccount)
utxoSpendingInfos = {
addrInfosWithUtxo.map {
case (utxo, addrInfo) =>