Add WalletCallbacks.onBlockProcessed() (#3912)

This commit is contained in:
Chris Stewart 2021-12-16 12:26:48 -06:00 committed by GitHub
parent 1969056372
commit 2d9d12816b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 72 additions and 13 deletions

View file

@ -11,6 +11,7 @@ Bitcoin-S support call backs for the following events that happen in the wallet:
2. onTransactionBroadcast 2. onTransactionBroadcast
3. onReservedUtxos 3. onReservedUtxos
4. onNewAddressGenerated 4. onNewAddressGenerated
5. onBlockProcessed
That means every time one of these events happens, we will call your callback That means every time one of these events happens, we will call your callback
so that you can be notified of the event. These callbacks will be run after the message has been so that you can be notified of the event. These callbacks will be run after the message has been

View file

@ -7,6 +7,7 @@ import org.bitcoins.core.api.chain.ChainQueryApi
import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse import org.bitcoins.core.api.chain.ChainQueryApi.FilterResponse
import org.bitcoins.core.gcs.BlockFilter import org.bitcoins.core.gcs.BlockFilter
import org.bitcoins.core.protocol.BlockStamp import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.core.protocol.blockchain.RegTestNetChainParams
import org.bitcoins.core.util.FutureUtil import org.bitcoins.core.util.FutureUtil
import org.bitcoins.crypto.DoubleSha256DigestBE import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.server.BitcoinSAppConfig import org.bitcoins.server.BitcoinSAppConfig
@ -56,10 +57,15 @@ trait BaseWalletTest extends EmbeddedPg { _: Suite with BitcoinSAkkaAsyncTest =>
/** Gets the height of the given block */ /** Gets the height of the given block */
override def getBlockHeight( override def getBlockHeight(
blockHash: DoubleSha256DigestBE): Future[Option[Int]] = blockHash: DoubleSha256DigestBE): Future[Option[Int]] = {
if (blockHash == testBlockHash) if (blockHash == testBlockHash) {
Future.successful(Some(1)) Future.successful(Some(1))
else FutureUtil.none } else if (
blockHash == RegTestNetChainParams.genesisBlock.blockHeader.hashBE
) {
Future.successful(Some(1))
} else FutureUtil.none
}
/** Gets the hash of the block that is what we consider "best" */ /** Gets the hash of the block that is what we consider "best" */
override def getBestBlockHash(): Future[DoubleSha256DigestBE] = override def getBestBlockHash(): Future[DoubleSha256DigestBE] =
@ -67,10 +73,11 @@ trait BaseWalletTest extends EmbeddedPg { _: Suite with BitcoinSAkkaAsyncTest =>
/** Gets number of confirmations for the given block hash */ /** Gets number of confirmations for the given block hash */
override def getNumberOfConfirmations( override def getNumberOfConfirmations(
blockHash: DoubleSha256DigestBE): Future[Option[Int]] = blockHash: DoubleSha256DigestBE): Future[Option[Int]] = {
if (blockHash == testBlockHash) if (blockHash == testBlockHash) {
Future.successful(Some(6)) Future.successful(Some(6))
else FutureUtil.none } else FutureUtil.none
}
/** Gets the number of compact filters in the database */ /** Gets the number of compact filters in the database */
override def getFilterCount(): Future[Int] = Future.successful(1) override def getFilterCount(): Future[Int] = Future.successful(1)

View file

@ -2,6 +2,7 @@ package org.bitcoins.wallet
import org.bitcoins.core.api.wallet.db.SpendingInfoDb import org.bitcoins.core.api.wallet.db.SpendingInfoDb
import org.bitcoins.core.protocol.BitcoinAddress import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.blockchain.{Block, RegTestNetChainParams}
import org.bitcoins.core.protocol.transaction.{EmptyTransaction, Transaction} import org.bitcoins.core.protocol.transaction.{EmptyTransaction, Transaction}
import org.bitcoins.testkit.wallet.BitcoinSWalletTest import org.bitcoins.testkit.wallet.BitcoinSWalletTest
import org.bitcoins.testkit.wallet.FundWalletUtil.FundedWallet import org.bitcoins.testkit.wallet.FundWalletUtil.FundedWallet
@ -140,4 +141,27 @@ class WalletCallbackTest extends BitcoinSWalletTest {
// just compare outPoints because states will be changed so they won't be equal // just compare outPoints because states will be changed so they won't be equal
} yield assert(result.map(_.outPoint) == reserved.map(_.outPoint)) } yield assert(result.map(_.outPoint) == reserved.map(_.outPoint))
} }
it must "verify OnBlockProcessed callbacks are executed" in {
fundedWallet: FundedWallet =>
val resultP: Promise[Block] = Promise()
val block = RegTestNetChainParams.genesisBlock
val callback: OnBlockProcessed = (b: Block) => {
Future {
resultP.success(b)
()
}
}
val callbacks = WalletCallbacks.onBlockProcessed(callback)
fundedWallet.wallet.walletConfig.addCallbacks(callbacks)
val wallet = fundedWallet.wallet
for {
_ <- wallet.processBlock(block)
result <- resultP.future
} yield assert(result == block)
}
} }

View file

@ -4,6 +4,7 @@ import grizzled.slf4j.Logger
import org.bitcoins.core.api.wallet.db.SpendingInfoDb import org.bitcoins.core.api.wallet.db.SpendingInfoDb
import org.bitcoins.core.api.{Callback, CallbackHandler} import org.bitcoins.core.api.{Callback, CallbackHandler}
import org.bitcoins.core.protocol.BitcoinAddress import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.Transaction import org.bitcoins.core.protocol.transaction.Transaction
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}
@ -27,6 +28,8 @@ trait WalletCallbacks {
BitcoinAddress, BitcoinAddress,
OnNewAddressGenerated] OnNewAddressGenerated]
def onBlockProcessed: CallbackHandler[Block, OnBlockProcessed]
def +(other: WalletCallbacks): WalletCallbacks def +(other: WalletCallbacks): WalletCallbacks
def executeOnTransactionProcessed(logger: Logger, tx: Transaction)(implicit def executeOnTransactionProcessed(logger: Logger, tx: Transaction)(implicit
@ -68,6 +71,15 @@ trait WalletCallbacks {
err)) err))
} }
def executeOnBlockProcessed(logger: Logger, block: Block)(implicit
ec: ExecutionContext): Future[Unit] = {
onBlockProcessed.execute(
block,
(err: Throwable) =>
logger.error(s"${onBlockProcessed.name} Callback failed with error: ",
err))
}
} }
/** Callback for handling a processed transaction */ /** Callback for handling a processed transaction */
@ -79,6 +91,8 @@ trait OnReservedUtxos extends Callback[Vector[SpendingInfoDb]]
trait OnNewAddressGenerated extends Callback[BitcoinAddress] trait OnNewAddressGenerated extends Callback[BitcoinAddress]
trait OnBlockProcessed extends Callback[Block]
object WalletCallbacks { object WalletCallbacks {
private case class WalletCallbacksImpl( private case class WalletCallbacksImpl(
@ -91,7 +105,8 @@ object WalletCallbacks {
onReservedUtxos: CallbackHandler[Vector[SpendingInfoDb], OnReservedUtxos], onReservedUtxos: CallbackHandler[Vector[SpendingInfoDb], OnReservedUtxos],
onNewAddressGenerated: CallbackHandler[ onNewAddressGenerated: CallbackHandler[
BitcoinAddress, BitcoinAddress,
OnNewAddressGenerated] OnNewAddressGenerated],
onBlockProcessed: CallbackHandler[Block, OnBlockProcessed]
) extends WalletCallbacks { ) extends WalletCallbacks {
override def +(other: WalletCallbacks): WalletCallbacks = override def +(other: WalletCallbacks): WalletCallbacks =
@ -102,7 +117,8 @@ object WalletCallbacks {
onTransactionBroadcast ++ other.onTransactionBroadcast, onTransactionBroadcast ++ other.onTransactionBroadcast,
onReservedUtxos = onReservedUtxos ++ other.onReservedUtxos, onReservedUtxos = onReservedUtxos ++ other.onReservedUtxos,
onNewAddressGenerated = onNewAddressGenerated =
onNewAddressGenerated ++ other.onNewAddressGenerated onNewAddressGenerated ++ other.onNewAddressGenerated,
onBlockProcessed = onBlockProcessed ++ other.onBlockProcessed
) )
} }
@ -122,15 +138,20 @@ object WalletCallbacks {
def onNewAddressGenerated(f: OnNewAddressGenerated): WalletCallbacks = def onNewAddressGenerated(f: OnNewAddressGenerated): WalletCallbacks =
WalletCallbacks(onNewAddressGenerated = Vector(f)) WalletCallbacks(onNewAddressGenerated = Vector(f))
def onBlockProcessed(f: OnBlockProcessed): WalletCallbacks = {
WalletCallbacks(onBlockProcessed = Vector(f))
}
/** Empty callbacks that does nothing with the received data */ /** Empty callbacks that does nothing with the received data */
val empty: WalletCallbacks = val empty: WalletCallbacks =
apply(Vector.empty, Vector.empty, Vector.empty, Vector.empty) apply(Vector.empty, Vector.empty, Vector.empty, Vector.empty, Vector.empty)
def apply( def apply(
onTransactionProcessed: Vector[OnTransactionProcessed] = Vector.empty, onTransactionProcessed: Vector[OnTransactionProcessed] = Vector.empty,
onTransactionBroadcast: Vector[OnTransactionBroadcast] = Vector.empty, onTransactionBroadcast: Vector[OnTransactionBroadcast] = Vector.empty,
onReservedUtxos: Vector[OnReservedUtxos] = Vector.empty, onReservedUtxos: Vector[OnReservedUtxos] = Vector.empty,
onNewAddressGenerated: Vector[OnNewAddressGenerated] = Vector.empty onNewAddressGenerated: Vector[OnNewAddressGenerated] = Vector.empty,
onBlockProcessed: Vector[OnBlockProcessed] = Vector.empty
): WalletCallbacks = { ): WalletCallbacks = {
WalletCallbacksImpl( WalletCallbacksImpl(
onTransactionProcessed = onTransactionProcessed =
@ -148,7 +169,11 @@ object WalletCallbacks {
onNewAddressGenerated = onNewAddressGenerated =
CallbackHandler[BitcoinAddress, OnNewAddressGenerated]( CallbackHandler[BitcoinAddress, OnNewAddressGenerated](
"onNewAddressGenerated", "onNewAddressGenerated",
onNewAddressGenerated) onNewAddressGenerated),
onBlockProcessed = CallbackHandler[Block, OnBlockProcessed](
"onBlockProcessed",
onBlockProcessed
)
) )
} }
} }

View file

@ -68,11 +68,13 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
val f = for { val f = for {
res <- resF res <- resF
hash = block.blockHeader.hashBE hash = block.blockHeader.hashBE
height <- chainQueryApi.getBlockHeight(hash) height <- chainQueryApi.getBlockHeight(hash)
_ <- stateDescriptorDAO.updateSyncHeight(hash, height.get) _ <- stateDescriptorDAO.updateSyncHeight(hash, height.get)
} yield res } yield {
walletConfig.walletCallbacks.executeOnBlockProcessed(logger, block)
res
}
f.onComplete(failure => f.onComplete(failure =>
signalBlockProcessingCompletion(block.blockHeader.hash, failure)) signalBlockProcessingCompletion(block.blockHeader.hash, failure))