Wallet callbacks (#1543)

* Wallet Callbacks

* Move noop to Callback

* Add docs

* Add tests for wallet callbacks

* Use BitcoinAddress instead of AddressDb, improve tests
This commit is contained in:
Ben Carman 2020-06-17 17:21:19 -05:00 committed by GitHub
parent ba0f38ccf6
commit 5319b4c927
10 changed files with 405 additions and 12 deletions

View file

@ -487,7 +487,7 @@ class RoutesSpec
.expects(testAddress, Bitcoins(100), *)
.returning(Future.successful(EmptyTransaction))
(mockNode.broadcastTransaction _)
(mockWalletApi.broadcastTransaction _)
.expects(EmptyTransaction)
.returning(FutureUtil.unit)
.anyNumberOfTimes()
@ -556,7 +556,7 @@ class RoutesSpec
*)
.returning(Future.successful(EmptyTransaction))
(mockNode.broadcastTransaction _)
(mockWalletApi.broadcastTransaction _)
.expects(EmptyTransaction)
.returning(FutureUtil.unit)
.anyNumberOfTimes()
@ -635,7 +635,7 @@ class RoutesSpec
CoinSelectionAlgo.AccumulateSmallestViable)
.returning(Future.successful(EmptyTransaction))
(mockNode.broadcastTransaction _)
(mockWalletApi.broadcastTransaction _)
.expects(EmptyTransaction)
.returning(FutureUtil.unit)
.anyNumberOfTimes()
@ -712,7 +712,7 @@ class RoutesSpec
.expects(message, false, *)
.returning(Future.successful(EmptyTransaction))
(mockNode.broadcastTransaction _)
(mockWalletApi.broadcastTransaction _)
.expects(EmptyTransaction)
.returning(FutureUtil.unit)
.anyNumberOfTimes()

View file

@ -98,7 +98,7 @@ case class WalletRoutes(wallet: WalletApi, node: Node)(
tx <- wallet.sendToAddress(address,
bitcoins,
satoshisPerVirtualByteOpt)
_ <- node.broadcastTransaction(tx)
_ <- wallet.broadcastTransaction(tx)
} yield {
Server.httpSuccess(tx.txIdBE)
}
@ -120,7 +120,7 @@ case class WalletRoutes(wallet: WalletApi, node: Node)(
address,
bitcoins,
satoshisPerVirtualByteOpt)
_ <- node.broadcastTransaction(tx)
_ <- wallet.broadcastTransaction(tx)
} yield Server.httpSuccess(tx.txIdBE)
}
}
@ -137,7 +137,7 @@ case class WalletRoutes(wallet: WalletApi, node: Node)(
bitcoins,
satoshisPerVirtualByteOpt,
algo)
_ <- node.broadcastTransaction(tx)
_ <- wallet.broadcastTransaction(tx)
} yield Server.httpSuccess(tx.txIdBE)
}
}
@ -153,7 +153,7 @@ case class WalletRoutes(wallet: WalletApi, node: Node)(
tx <- wallet.makeOpReturnCommitment(message,
hashMessage,
satoshisPerVirtualByteOpt)
_ <- node.broadcastTransaction(tx)
_ <- wallet.broadcastTransaction(tx)
} yield {
Server.httpSuccess(tx.txIdBE)
}

View file

@ -1,6 +1,6 @@
package org.bitcoins.core.api
import org.bitcoins.core.util.SeqWrapper
import org.bitcoins.core.util.{FutureUtil, SeqWrapper}
import org.slf4j.Logger
import scala.concurrent.{ExecutionContext, Future}
@ -26,6 +26,12 @@ trait Callback3[T1, T2, T3] extends Callback[(T1, T2, T3)] {
apply(param._1, param._2, param._3)
}
object Callback {
/** Does nothing */
def noop[T]: T => Future[Unit] = _ => FutureUtil.unit
}
/** Manages a set of callbacks, should be used to manage execution and logging if needed */
case class CallbackHandler[C, T <: Callback[C]](
name: String,

View file

@ -0,0 +1,88 @@
---
title: Wallet Callbacks
id: wallet-callbacks
---
#### Callbacks
Bitcoin-S support call backs for the following events that happen in the wallet:
1. onTransactionProcessed
2. onTransactionBroadcast
3. onReservedUtxos
4. onNewAddressGenerated
That means every time one of these events happens, we will call your callback
so that you can be notified of the event. These callbacks will be run after the message has been
recieved and will execute synchronously. If any of them fail an error log will be output, and the remainder of the callbacks will continue.
Let's make an easy one:
#### Example
Here is an example of constructing a wallet and registering a callback, so you can be notified of an event.
```scala mdoc:invisible
import akka.actor.ActorSystem
import org.bitcoins.core.api._
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.wallet.fee._
import org.bitcoins.feeprovider._
import org.bitcoins.keymanager.bip39.BIP39KeyManager
import org.bitcoins.rpc.client.v19.BitcoindV19RpcClient
import org.bitcoins.rpc.config.BitcoindInstance
import org.bitcoins.testkit.BitcoinSTestAppConfig
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.bitcoins.wallet._
import org.bitcoins.wallet.config.WalletAppConfig
import java.time.Instant
import scala.concurrent.{ExecutionContextExecutor, Future}
```
```scala mdoc:compile-only
implicit val system: ActorSystem = ActorSystem("example")
implicit val ec: ExecutionContextExecutor = system.dispatcher
implicit val walletConf: WalletAppConfig =
BitcoinSTestAppConfig.getNeutrinoTestConfig().walletConf
// let's use a helper method to get a v19 bitcoind
// and a ChainApi
val bitcoind = BitcoindV19RpcClient(BitcoindInstance.fromConfigFile())
// Create our key manager
val keyManagerE = BIP39KeyManager.initialize(kmParams = walletConf.kmParams,
bip39PasswordOpt = None)
val keyManager = keyManagerE match {
case Right(keyManager) => keyManager
case Left(err) =>
throw new RuntimeException(s"Cannot initialize key manager err=$err")
}
// Here is a super simple example of a callback, this could be replaced with anything, from
// relaying the transaction on the network, finding relevant wallet outputs, verifying the transaction,
// or writing it to disk
val exampleProcessTx: OnTransactionProcessed = (tx: Transaction) =>
Future.successful(println(s"Processed Tx: ${tx.txIdBE}"))
// Create our WalletCallbacks that
val exampleCallbacks = WalletCallbacks(
onTransactionProcessed = Vector(exampleProcessTx))
// Now we can create a wallet
val wallet =
Wallet(keyManager = keyManager,
nodeApi = bitcoind,
chainQueryApi = bitcoind,
feeRateApi = ConstantFeeRateProvider(SatoshisPerVirtualByte.one),
creationTime = Instant.now)
// Finally, we can add the callbacks to our wallet
wallet.addCallbacks(exampleCallbacks)
// Then to trigger the event we can run
val exampleTx = Transaction(
"0200000000010258e87a21b56daf0c23be8e7070456c336f7cbaa5c8757924f545887bb2abdd7500000000da00473044022074018ad4180097b873323c0015720b3684cc8123891048e7dbcd9b55ad679c99022073d369b740e3eb53dcefa33823c8070514ca55a7dd9544f157c167913261118c01483045022100f61038b308dc1da865a34852746f015772934208c6d24454393cd99bdf2217770220056e675a675a6d0a02b85b14e5e29074d8a25a9b5760bea2816f661910a006ea01475221029583bf39ae0a609747ad199addd634fa6108559d6c5cd39b4c2183f1ab96e07f2102dab61ff49a14db6a7d02b0cd1fbb78fc4b18312b5b4e54dae4dba2fbfef536d752aeffffffff838d0427d0ec650a68aa46bb0b098aea4422c071b2ca78352a077959d07cea1d01000000232200208c2353173743b595dfb4a07b72ba8e42e3797da74e87fe7d9d7497e3b2028903ffffffff0270aaf00800000000160014d85c2b71d0060b09c9886aeb815e50991dda124d00e1f5050000000016001400aea9a2e5f0f876a588df5546e8742d1d87008f000400473044022062eb7a556107a7c73f45ac4ab5a1dddf6f7075fb1275969a7f383efff784bcb202200c05dbb7470dbf2f08557dd356c7325c1ed30913e996cd3840945db12228da5f01473044022065f45ba5998b59a27ffe1a7bed016af1f1f90d54b3aa8f7450aa5f56a25103bd02207f724703ad1edb96680b284b56d4ffcb88f7fb759eabbe08aa30f29b851383d20147522103089dc10c7ac6db54f91329af617333db388cead0c231f723379d1b99030b02dc21023add904f3d6dcf59ddb906b0dee23529b7ffb9ed50e5e86151926860221f0e7352ae00000000")
wallet.processTransaction(exampleTx, None)
```

View file

@ -0,0 +1,144 @@
package org.bitcoins.wallet
import org.bitcoins.core.currency._
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.script.P2PKHScriptPubKey
import org.bitcoins.core.protocol.transaction.{
EmptyTransaction,
Transaction,
TransactionOutput
}
import org.bitcoins.crypto.ECPublicKey
import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.bitcoins.testkit.wallet.FundWalletUtil.FundedWallet
import org.bitcoins.wallet.models.SpendingInfoDb
import org.scalatest.FutureOutcome
import scala.concurrent.{Future, Promise}
class WalletCallbackTest extends BitcoinSWalletTest {
type FixtureParam = FundedWallet
override def withFixture(test: OneArgAsyncTest): FutureOutcome = {
withFundedWallet(test, getBIP39PasswordOpt())
}
behavior of "WalletCallbacks"
it must "verify OnNewAddressGenerated callbacks are executed" in {
fundedWallet: FundedWallet =>
val resultP: Promise[BitcoinAddress] = Promise()
val callback: OnNewAddressGenerated = (addr: BitcoinAddress) => {
Future {
resultP.success(addr)
()
}
}
val callbacks = WalletCallbacks.onNewAddressGenerated(callback)
val wallet = fundedWallet.wallet.addCallbacks(callbacks)
for {
address <- wallet.getNewAddress()
exists <- wallet.contains(address, None)
_ = assert(exists, "Wallet must contain address after generating it")
result <- resultP.future
} yield assert(result == address)
}
it must "verify OnTransactionProcessed callbacks are executed" in {
fundedWallet: FundedWallet =>
val resultP: Promise[Transaction] = Promise()
val callback: OnTransactionProcessed = (tx: Transaction) => {
Future {
resultP.success(tx)
()
}
}
val callbacks = WalletCallbacks.onTransactionProcessed(callback)
val wallet = fundedWallet.wallet.addCallbacks(callbacks)
for {
_ <- wallet.processTransaction(EmptyTransaction, None)
result <- resultP.future
} yield assert(result == EmptyTransaction)
}
it must "verify OnTransactionBroadcast callbacks are executed" in {
fundedWallet: FundedWallet =>
val resultP: Promise[Transaction] = Promise()
val callback: OnTransactionBroadcast = (tx: Transaction) => {
Future {
resultP.success(tx)
()
}
}
val callbacks = WalletCallbacks.onTransactionBroadcast(callback)
val wallet = fundedWallet.wallet.addCallbacks(callbacks)
for {
_ <- wallet.broadcastTransaction(EmptyTransaction)
result <- resultP.future
} yield assert(result == EmptyTransaction)
}
private val dummyOutput = TransactionOutput(
10000.satoshis,
P2PKHScriptPubKey(ECPublicKey.freshPublicKey))
it must "verify OnReservedUtxos callbacks are executed when reserving" in {
fundedWallet: FundedWallet =>
val resultP: Promise[Vector[SpendingInfoDb]] = Promise()
val callback: OnReservedUtxos = (infos: Vector[SpendingInfoDb]) => {
Future {
resultP.success(infos)
()
}
}
val callbacks = WalletCallbacks.onReservedUtxos(callback)
val wallet = fundedWallet.wallet.addCallbacks(callbacks)
for {
utxos <- wallet.listUtxos()
_ <- wallet.markUTXOsAsReserved(Vector(utxos.head))
result <- resultP.future
} yield assert(
// just compare outPoints because states will be changed so they won't be equal
result.map(_.outPoint) == Vector(utxos.head).map(_.outPoint))
}
it must "verify OnReservedUtxos callbacks are executed when un-reserving" in {
fundedWallet: FundedWallet =>
val resultP: Promise[Vector[SpendingInfoDb]] = Promise()
val callback: OnReservedUtxos = (infos: Vector[SpendingInfoDb]) => {
Future {
resultP.success(infos)
()
}
}
val callbacks = WalletCallbacks.onReservedUtxos(callback)
for {
utxos <- fundedWallet.wallet.listUtxos()
reserved <- fundedWallet.wallet.markUTXOsAsReserved(Vector(utxos.head))
wallet = fundedWallet.wallet.addCallbacks(callbacks)
_ <- wallet.unmarkUTXOsAsReserved(reserved)
result <- resultP.future
// just compare outPoints because states will be changed so they won't be equal
} yield assert(result.map(_.outPoint) == reserved.map(_.outPoint))
}
}

View file

@ -13,7 +13,7 @@ import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.protocol.transaction._
import org.bitcoins.core.script.constant.ScriptConstant
import org.bitcoins.core.script.control.OP_RETURN
import org.bitcoins.core.util.BitcoinScriptUtil
import org.bitcoins.core.util.{BitcoinScriptUtil, Mutable}
import org.bitcoins.core.wallet.builder.{
RawTxBuilderWithFinalizer,
RawTxSigner,
@ -64,11 +64,26 @@ abstract class Wallet
val chainQueryApi: ChainQueryApi
val creationTime: Instant = keyManager.creationTime
private val callbacks = new Mutable(WalletCallbacks.empty)
def walletCallbacks: WalletCallbacks = callbacks.atomicGet
def addCallbacks(newCallbacks: WalletCallbacks): Wallet = {
callbacks.atomicUpdate(newCallbacks)(_ + _)
this
}
override def stop(): Unit = {
walletConfig.stop()
stopWalletThread()
}
override def broadcastTransaction(transaction: Transaction): Future[Unit] =
for {
_ <- nodeApi.broadcastTransaction(transaction)
_ <- walletCallbacks.executeOnTransactionBroadcast(logger, transaction)
} yield ()
override def isEmpty(): Future[Boolean] =
for {
addressCount <- addressDAO.count()

View file

@ -0,0 +1,131 @@
package org.bitcoins.wallet
import org.bitcoins.core.api.{Callback, CallbackHandler}
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.wallet.models.SpendingInfoDb
import org.slf4j.Logger
import scala.concurrent.{ExecutionContext, Future}
/**
* Callbacks for responding to events in the wallet.
* The appropriate callback is executed whenever the wallet finishes,
* the corresponding function.
*/
trait WalletCallbacks {
def onTransactionProcessed: CallbackHandler[
Transaction,
OnTransactionProcessed]
def onTransactionBroadcast: CallbackHandler[
Transaction,
OnTransactionBroadcast]
def onReservedUtxos: CallbackHandler[Vector[SpendingInfoDb], OnReservedUtxos]
def onNewAddressGenerated: CallbackHandler[
BitcoinAddress,
OnNewAddressGenerated]
def +(other: WalletCallbacks): WalletCallbacks
def executeOnTransactionProcessed(logger: Logger, tx: Transaction)(
implicit ec: ExecutionContext): Future[Unit] = {
onTransactionProcessed.execute(logger, tx)
}
def executeOnTransactionBroadcast(logger: Logger, tx: Transaction)(
implicit ec: ExecutionContext): Future[Unit] = {
onTransactionBroadcast.execute(logger, tx)
}
def executeOnReservedUtxos(logger: Logger, utxos: Vector[SpendingInfoDb])(
implicit ec: ExecutionContext): Future[Unit] = {
onReservedUtxos.execute(logger, utxos)
}
def executeOnNewAddressGenerated(logger: Logger, address: BitcoinAddress)(
implicit ec: ExecutionContext): Future[Unit] = {
onNewAddressGenerated.execute(logger, address)
}
}
/** Callback for handling a processed transaction */
trait OnTransactionProcessed extends Callback[Transaction]
trait OnTransactionBroadcast extends Callback[Transaction]
trait OnReservedUtxos extends Callback[Vector[SpendingInfoDb]]
trait OnNewAddressGenerated extends Callback[BitcoinAddress]
object WalletCallbacks {
private case class WalletCallbacksImpl(
onTransactionProcessed: CallbackHandler[
Transaction,
OnTransactionProcessed],
onTransactionBroadcast: CallbackHandler[
Transaction,
OnTransactionBroadcast],
onReservedUtxos: CallbackHandler[Vector[SpendingInfoDb], OnReservedUtxos],
onNewAddressGenerated: CallbackHandler[
BitcoinAddress,
OnNewAddressGenerated]
) extends WalletCallbacks {
override def +(other: WalletCallbacks): WalletCallbacks = copy(
onTransactionProcessed = onTransactionProcessed ++ other.onTransactionProcessed,
onTransactionBroadcast = onTransactionBroadcast ++ other.onTransactionBroadcast,
onReservedUtxos = onReservedUtxos ++ other.onReservedUtxos,
onNewAddressGenerated = onNewAddressGenerated ++ other.onNewAddressGenerated
)
}
/** Constructs a set of callbacks that only acts on processed transaction */
def onTransactionProcessed(f: OnTransactionProcessed): WalletCallbacks =
WalletCallbacks(onTransactionProcessed = Vector(f))
/** Constructs a set of callbacks that only acts on broadcasted transaction */
def onTransactionBroadcast(f: OnTransactionBroadcast): WalletCallbacks =
WalletCallbacks(onTransactionBroadcast = Vector(f))
/** Constructs a set of callbacks that only acts on utxos becoming reserved or unreserved */
def onReservedUtxos(f: OnReservedUtxos): WalletCallbacks =
WalletCallbacks(onReservedUtxos = Vector(f))
/** Constructs a set of callbacks that only acts on new address generation */
def onNewAddressGenerated(f: OnNewAddressGenerated): WalletCallbacks =
WalletCallbacks(onNewAddressGenerated = Vector(f))
/** Empty callbacks that does nothing with the received data */
val empty: WalletCallbacks =
apply(Vector.empty, Vector.empty, Vector.empty, Vector.empty)
def apply(
onTransactionProcessed: Vector[OnTransactionProcessed] = Vector.empty,
onTransactionBroadcast: Vector[OnTransactionBroadcast] = Vector.empty,
onReservedUtxos: Vector[OnReservedUtxos] = Vector.empty,
onNewAddressGenerated: Vector[OnNewAddressGenerated] = Vector.empty
): WalletCallbacks = {
WalletCallbacksImpl(
onTransactionProcessed =
CallbackHandler[Transaction, OnTransactionProcessed](
"onTransactionProcessed",
onTransactionProcessed),
onTransactionBroadcast =
CallbackHandler[Transaction, OnTransactionBroadcast](
"onTransactionBroadcast",
onTransactionBroadcast),
onReservedUtxos =
CallbackHandler[Vector[SpendingInfoDb], OnReservedUtxos](
"onReservedUtxos",
onReservedUtxos),
onNewAddressGenerated =
CallbackHandler[BitcoinAddress, OnNewAddressGenerated](
"onNewAddressGenerated",
onNewAddressGenerated)
)
}
}

View file

@ -204,6 +204,8 @@ private[wallet] trait AddressHandling extends WalletLogger {
addressRequestQueue.add((account, chainType, p))
for {
addressDb <- p.future
_ <- walletCallbacks.executeOnNewAddressGenerated(logger,
addressDb.address)
} yield {
addressDb.address
}

View file

@ -196,6 +196,8 @@ private[wallet] trait TransactionProcessing extends WalletLogger {
for {
incoming <- incomingTxoFut
outgoing <- outgoingTxFut
_ <- walletCallbacks.executeOnTransactionProcessed(logger,
transaction)
} yield {
ProcessTxResult(incoming.toList, outgoing.toList)
}

View file

@ -221,7 +221,10 @@ private[wallet] trait UtxoHandling extends WalletLogger {
override def markUTXOsAsReserved(
utxos: Vector[SpendingInfoDb]): Future[Vector[SpendingInfoDb]] = {
val updated = utxos.map(_.copyWithState(TxoState.Reserved))
spendingInfoDAO.updateAll(updated)
for {
utxos <- spendingInfoDAO.updateAll(updated)
_ <- walletCallbacks.executeOnReservedUtxos(logger, utxos)
} yield utxos
}
override def unmarkUTXOsAsReserved(
@ -252,7 +255,9 @@ private[wallet] trait UtxoHandling extends WalletLogger {
case (hash, utxos) =>
updateUtxoConfirmedStates(utxos, hash)
}
} yield updatedMempoolUtxos ++ updatedBlockUtxos.flatten
updated = updatedMempoolUtxos ++ updatedBlockUtxos.flatten
_ <- walletCallbacks.executeOnReservedUtxos(logger, updated)
} yield updated
}
/** @inheritdoc */