Rework TransactionProcessing to be a has-a relationship rather than is-a relationship in the codebase (#5659)

Refactor TransactionProcessing into case class

Get walletTest/test passing

DLCWallet WIP

Get dlcWalletTest/test passing

Small cleanups

Fix RoutesSpec gettransaction unit test

revert logback-test.xml

cleanup
This commit is contained in:
Chris Stewart 2024-09-18 12:51:21 -05:00 committed by GitHub
parent 6bf69de119
commit 625e790477
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 573 additions and 312 deletions

View File

@ -1013,10 +1013,9 @@ class RoutesSpec extends AnyWordSpec with ScalatestRouteTest with MockFactory {
val txDb = TransactionDbHelper.fromTransaction(tx, None)
(mockWalletApi
.findTransaction(_: DoubleSha256DigestBE))
.expects(tx.txIdBE)
.returning(Future.successful(Some(txDb)))
.anyNumberOfTimes()
.findByTxIds(_: Vector[DoubleSha256DigestBE]))
.expects(Vector(tx.txIdBE))
.returning(Future.successful(Vector(txDb)))
val route =
walletRoutes.handleCommand(

View File

@ -360,18 +360,12 @@ object BitcoindRpcBackendUtil extends BitcoinSLogger {
parallelism = numParallelism
)
val sinkF
: Future[Sink[(Block, GetBlockHeaderResult), Future[WalletApi]]] = {
walletF.map { initWallet =>
Sink.foldAsync[WalletApi, (Block, GetBlockHeaderResult)](
initWallet
) {
case (
wallet: WalletApi,
(block: Block, blockHeaderResult: GetBlockHeaderResult)
) =>
val sinkF: Future[Sink[(Block, GetBlockHeaderResult), Future[Done]]] = {
walletF.map { wallet =>
Sink.foreachAsync(1) {
case (block: Block, blockHeaderResult: GetBlockHeaderResult) =>
val blockProcessedF = wallet.processBlock(block)
val executeCallbackF: Future[WalletApi] = {
val executeCallbackF: Future[Unit] = {
for {
wallet <- blockProcessedF
_ <- handleChainCallbacks(
@ -386,7 +380,7 @@ object BitcoindRpcBackendUtil extends BitcoinSLogger {
}
}
val doneF: Future[WalletApi] = sinkF.flatMap { sink =>
val doneF: Future[Done] = sinkF.flatMap { sink =>
source
.via(fetchBlocksFlow)
.toMat(sink)(Keep.right)
@ -394,7 +388,8 @@ object BitcoindRpcBackendUtil extends BitcoinSLogger {
}
for {
w <- doneF
_ <- doneF
w <- walletF
_ <- w.updateUtxoPendingStates()
} yield ()
}

View File

@ -189,12 +189,14 @@ case class WalletRoutes(loadWalletApi: DLCWalletLoaderApi)(implicit
withValidServerCommand(GetTransaction.fromJsArr(arr)) {
case GetTransaction(txId) =>
complete {
wallet.findTransaction(txId).map {
val resultF = wallet.findByTxId(txId).map {
case None =>
Server.httpSuccess(ujson.Null)
case Some(txDb) =>
Server.httpSuccess(txDb.transaction.hex)
}
resultF.failed.foreach(err => logger.error(s"resultF", err))
resultF
}
}

View File

@ -0,0 +1,7 @@
package org.bitcoins.core.api.wallet
import scala.concurrent.Future
trait RescanHandlingApi {
def isRescanning(): Future[Boolean]
}

View File

@ -0,0 +1,59 @@
package org.bitcoins.core.api.wallet
import org.bitcoins.core.api.wallet.db.{SpendingInfoDb, TransactionDb}
import org.bitcoins.core.currency.CurrencyUnit
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.transaction.{OutputWithIndex, Transaction}
import org.bitcoins.core.wallet.fee.FeeUnit
import org.bitcoins.core.wallet.utxo.AddressTag
import org.bitcoins.crypto.DoubleSha256DigestBE
import scala.concurrent.Future
trait TransactionProcessingApi {
def processTransaction(
transaction: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE]
): Future[Unit]
/** Processes TXs originating from our wallet. This is called right after
* we've signed a TX, updating our UTXO state.
*/
def processOurTransaction(
transaction: Transaction,
feeRate: FeeUnit,
inputAmount: CurrencyUnit,
sentAmount: CurrencyUnit,
blockHashOpt: Option[DoubleSha256DigestBE],
newTags: Vector[AddressTag]
): Future[ProcessTxResult]
def processBlock(block: Block): Future[Unit]
def findTransaction(
txId: DoubleSha256DigestBE
): Future[Option[TransactionDb]]
def listTransactions(): Future[Vector[TransactionDb]]
def subscribeForBlockProcessingCompletionSignal(
blockHash: DoubleSha256DigestBE
): Future[DoubleSha256DigestBE]
def processReceivedUtxos(
tx: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE],
spendingInfoDbs: Vector[SpendingInfoDb],
newTags: Vector[AddressTag],
relevantReceivedOutputs: Vector[OutputWithIndex]
): Future[Vector[SpendingInfoDb]]
def processSpentUtxos(
transaction: Transaction,
outputsBeingSpent: Vector[SpendingInfoDb],
blockHashOpt: Option[DoubleSha256DigestBE]
): Future[Vector[SpendingInfoDb]]
def insertTransaction(
tx: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE]
): Future[TransactionDb]
}

View File

@ -2,7 +2,7 @@ package org.bitcoins.core.api.wallet
import org.bitcoins.core.api.wallet.db.SpendingInfoDb
import org.bitcoins.core.hd.HDAccount
import org.bitcoins.core.protocol.transaction.Transaction
import org.bitcoins.core.protocol.transaction.{Transaction, TransactionOutPoint}
import org.bitcoins.core.wallet.utxo.{AddressTag, TxoState}
import scala.concurrent.Future
@ -27,6 +27,10 @@ trait UtxoHandlingApi {
hdAccount: HDAccount,
state: TxoState): Future[Vector[SpendingInfoDb]]
def listUtxos(
outPoints: Vector[TransactionOutPoint]
): Future[Vector[SpendingInfoDb]]
def markUTXOsAsReserved(
utxos: Vector[SpendingInfoDb]): Future[Vector[SpendingInfoDb]]

View File

@ -48,31 +48,25 @@ trait WalletApi extends StartStopAsync[WalletApi] {
def broadcastTransaction(transaction: Transaction): Future[Unit] =
nodeApi.broadcastTransaction(transaction)
def getTransactionsToBroadcast: Future[Vector[Transaction]]
def getFeeRate(): Future[FeeUnit] = feeRateApi.getFeeRate()
def start(): Future[WalletApi]
def stop(): Future[WalletApi]
/** Processes the given transaction, updating our DB state if it's relevant to
* us.
* @param transaction
* The transaction we're processing
* @param blockHash
* Containing block hash
/** Processes the give block, updating our DB state if it's relevant to us.
*
* @param block
* The block we're processing
*/
def processBlock(block: Block): Future[Unit]
def processTransaction(
transaction: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE]): Future[WalletApi]
def processTransactions(
transactions: Vector[Transaction],
blockHashOpt: Option[DoubleSha256DigestBE])(implicit
ec: ExecutionContext): Future[WalletApi] = {
transactions.foldLeft(Future.successful(this)) { case (wallet, tx) =>
wallet.flatMap(_.processTransaction(tx, blockHashOpt))
}
}
blockHashOpt: Option[DoubleSha256DigestBE]
): Future[Unit]
/** Processes TXs originating from our wallet. This is called right after
* we've signed a TX, updating our UTXO state.
@ -83,18 +77,8 @@ trait WalletApi extends StartStopAsync[WalletApi] {
inputAmount: CurrencyUnit,
sentAmount: CurrencyUnit,
blockHashOpt: Option[DoubleSha256DigestBE],
newTags: Vector[AddressTag]): Future[ProcessTxResult]
/** Processes the give block, updating our DB state if it's relevant to us.
*
* @param block
* The block we're processing
*/
def processBlock(block: Block): Future[WalletApi]
def findTransaction(txId: DoubleSha256DigestBE): Future[Option[TransactionDb]]
def listTransactions(): Future[Vector[TransactionDb]]
newTags: Vector[AddressTag]
): Future[ProcessTxResult]
/** Gets the sum of all UTXOs in this wallet */
def getBalance()(implicit ec: ExecutionContext): Future[CurrencyUnit] = {
@ -139,6 +123,8 @@ trait WalletApi extends StartStopAsync[WalletApi] {
def listScriptPubKeys(): Future[Vector[ScriptPubKeyDb]]
def listTransactions(): Future[Vector[TransactionDb]]
def listUtxos(): Future[Vector[SpendingInfoDb]]
def listUtxos(state: TxoState): Future[Vector[SpendingInfoDb]]

View File

@ -40,6 +40,8 @@ import org.bitcoins.dlc.wallet.util.{
IntermediaryDLCStatus
}
import org.bitcoins.wallet.config.WalletAppConfig
import org.bitcoins.wallet.internal.TransactionProcessing
import org.bitcoins.wallet.models.WalletDAOs
import org.bitcoins.wallet.{Wallet, WalletLogger}
import scodec.bits.ByteVector
import slick.dbio.*
@ -51,7 +53,6 @@ import scala.concurrent.Future
abstract class DLCWallet
extends Wallet
with DLCNeutrinoHDWalletApi
with DLCTransactionProcessing
with IncomingDLCOffersHandling {
implicit val dlcConfig: DLCAppConfig
@ -81,6 +82,15 @@ abstract class DLCWallet
private[bitcoins] val contactDAO: DLCContactDAO =
DLCContactDAO()
private val walletDAOs: WalletDAOs = WalletDAOs(accountDAO,
addressDAO,
addressTagDAO,
spendingInfoDAO,
transactionDAO,
incomingTxDAO,
outgoingTxDAO,
scriptPubKeyDAO,
stateDescriptorDAO)
private[wallet] val dlcWalletDAOs = DLCWalletDAOs(
dlcDAO,
contractDataDAO,
@ -103,6 +113,24 @@ abstract class DLCWallet
DLCActionBuilder(dlcWalletDAOs)
}
override lazy val transactionProcessing: DLCTransactionProcessing = {
val txProcessing = TransactionProcessing(
walletApi = this,
chainQueryApi = chainQueryApi,
utxoHandling = utxoHandling,
walletDAOs = walletDAOs
)
DLCTransactionProcessing(
txProcessing = txProcessing,
dlcWalletDAOs = dlcWalletDAOs,
dlcDataManagement = dlcDataManagement,
keyManager = keyManager,
transactionDAO = transactionDAO,
rescanHandling = this,
utxoHandling = utxoHandling,
dlcWalletApi = this
)
}
private lazy val safeDLCDatabase: SafeDatabase = dlcDAO.safeDatabase
private lazy val walletDatabase: SafeDatabase = addressDAO.safeDatabase
@ -1516,26 +1544,11 @@ abstract class DLCWallet
}
}
private[wallet] def getScriptSigParams(
private def getScriptSigParams(
dlcDb: DLCDb,
fundingInputs: Vector[DLCFundingInputDb]
): Future[Vector[ScriptSignatureParams[InputInfo]]] = {
val outPoints =
fundingInputs.filter(_.isInitiator == dlcDb.isInitiator).map(_.outPoint)
val utxosF = utxoHandling.listUtxos(outPoints)
for {
utxos <- utxosF
scriptSigParams <-
FutureUtil.foldLeftAsync(
Vector.empty[ScriptSignatureParams[InputInfo]],
utxos
) { (accum, utxo) =>
transactionDAO
.findByOutPoint(utxo.outPoint)
.map(txOpt =>
utxo.toUTXOInfo(keyManager, txOpt.get.transaction) +: accum)
}
} yield scriptSigParams
transactionProcessing.getScriptSigParams(dlcDb, fundingInputs)
}
override def getDLCFundingTx(contractId: ByteVector): Future[Transaction] = {
@ -1820,7 +1833,6 @@ abstract class DLCWallet
_ <- updateDLCOracleSigs(sigsUsed)
_ <- updateDLCState(contractId, DLCState.Claimed)
dlcDb <- updateClosingTxId(contractId, tx.txIdBE)
oracleSigSum =
OracleSignatures.computeAggregateSignature(outcome, sigsUsed)
aggSig = SchnorrDigitalSignature(
@ -1829,7 +1841,7 @@ abstract class DLCWallet
)
_ <- updateAggregateSignature(contractId, aggSig)
_ <- processTransaction(tx, None)
_ <- transactionProcessing.processTransaction(tx, None)
dlcStatusOpt <- findDLC(dlcId = dlcDb.dlcId)
_ <- dlcConfig.walletCallbacks.executeOnDLCStateChange(dlcStatusOpt.get)
} yield tx
@ -1888,7 +1900,8 @@ abstract class DLCWallet
_ <- updateDLCState(contractId, DLCState.Refunded)
_ <- updateClosingTxId(contractId, refundTx.txIdBE)
_ <- processTransaction(refundTx, blockHashOpt = None)
_ <- transactionProcessing.processTransaction(refundTx,
blockHashOpt = None)
status <- findDLC(dlcDb.dlcId)
_ <- dlcConfig.walletCallbacks.executeOnDLCStateChange(status.get)
} yield refundTx

View File

@ -700,7 +700,7 @@ case class DLCDataManagement(dlcWalletDAOs: DLCWalletDAOs)(implicit
* @return
* the executor and setup if we still have CET signatures else return None
*/
private[wallet] def executorAndSetupFromDb(
def executorAndSetupFromDb(
contractId: ByteVector,
txDAO: TransactionDAO,
fundingUtxoScriptSigParams: Vector[ScriptSignatureParams[InputInfo]],
@ -750,7 +750,7 @@ case class DLCDataManagement(dlcWalletDAOs: DLCWalletDAOs)(implicit
}
}
private[wallet] def executorAndSetupFromDb(
def executorAndSetupFromDb(
dlcDb: DLCDb,
refundSigsDb: DLCRefundSigsDb,
fundingInputs: Vector[DLCFundingInputDb],

View File

@ -1,12 +1,22 @@
package org.bitcoins.dlc.wallet.internal
import org.bitcoins.core.api.dlc.wallet.db._
import org.bitcoins.core.api.wallet.db.SpendingInfoDb
import org.bitcoins.commons.util.BitcoinSLogger
import org.bitcoins.core.api.dlc.wallet.DLCWalletApi
import org.bitcoins.core.api.dlc.wallet.db.*
import org.bitcoins.core.api.wallet.{
ProcessTxResult,
RescanHandlingApi,
TransactionProcessingApi,
UtxoHandlingApi
}
import org.bitcoins.core.api.wallet.db.{SpendingInfoDb, TransactionDb}
import org.bitcoins.core.currency.CurrencyUnit
import org.bitcoins.core.protocol.blockchain.Block
import org.bitcoins.core.protocol.dlc.execution.SetupDLC
import org.bitcoins.core.protocol.dlc.models.DLCMessage._
import org.bitcoins.core.protocol.dlc.models._
import org.bitcoins.core.protocol.script._
import org.bitcoins.core.protocol.tlv._
import org.bitcoins.core.protocol.dlc.models.DLCMessage.*
import org.bitcoins.core.protocol.dlc.models.*
import org.bitcoins.core.protocol.script.*
import org.bitcoins.core.protocol.tlv.*
import org.bitcoins.core.protocol.transaction.{
OutputWithIndex,
Transaction,
@ -14,24 +24,47 @@ import org.bitcoins.core.protocol.transaction.{
}
import org.bitcoins.core.psbt.InputPSBTRecord.PartialSignature
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.core.wallet.utxo.AddressTag
import org.bitcoins.core.wallet.fee.FeeUnit
import org.bitcoins.core.wallet.utxo.{
AddressTag,
InputInfo,
ScriptSignatureParams
}
import org.bitcoins.crypto.{
DoubleSha256DigestBE,
SchnorrDigitalSignature,
Sha256Digest
}
import org.bitcoins.db.SafeDatabase
import org.bitcoins.dlc.wallet.DLCWallet
import org.bitcoins.dlc.wallet.models._
import org.bitcoins.wallet.internal.TransactionProcessing
import org.bitcoins.dlc.wallet.DLCAppConfig
import org.bitcoins.dlc.wallet.models.*
import org.bitcoins.keymanager.bip39.BIP39KeyManager
import org.bitcoins.wallet.models.TransactionDAO
import scala.concurrent._
import scala.concurrent.*
/** Overrides TransactionProcessing from Wallet to add extra logic to process
* transactions that could from our own DLC.
*/
private[bitcoins] trait DLCTransactionProcessing extends TransactionProcessing {
self: DLCWallet =>
case class DLCTransactionProcessing(
txProcessing: TransactionProcessingApi,
dlcWalletDAOs: DLCWalletDAOs,
dlcDataManagement: DLCDataManagement,
keyManager: BIP39KeyManager,
transactionDAO: TransactionDAO,
rescanHandling: RescanHandlingApi,
utxoHandling: UtxoHandlingApi,
dlcWalletApi: DLCWalletApi)(implicit
dlcConfig: DLCAppConfig,
ec: ExecutionContext)
extends TransactionProcessingApi
with BitcoinSLogger {
private val dlcDAO: DLCDAO = dlcWalletDAOs.dlcDAO
private val dlcInputsDAO: DLCFundingInputDAO = dlcWalletDAOs.dlcInputsDAO
private val dlcSigsDAO: DLCCETSignaturesDAO = dlcWalletDAOs.dlcSigsDAO
private val oracleNonceDAO: OracleNonceDAO = dlcWalletDAOs.oracleNonceDAO
private val dlcAnnouncementDAO: DLCAnnouncementDAO =
dlcWalletDAOs.dlcAnnouncementDAO
private lazy val safeDLCDatabase: SafeDatabase = dlcDAO.safeDatabase
/** Calculates the new state of the DLCDb based on the closing transaction,
@ -95,7 +128,7 @@ private[bitcoins] trait DLCTransactionProcessing extends TransactionProcessing {
val withState = dlcDb.updateState(DLCState.RemoteClaimed)
for {
withOutcomeOpt <- calculateAndSetOutcome(withState)
dlc <- findDLC(dlcDb.dlcId)
dlc <- dlcWalletApi.findDLC(dlcDb.dlcId)
_ = dlcConfig.walletCallbacks.executeOnDLCStateChange(dlc.get)
} yield {
withOutcomeOpt
@ -248,63 +281,48 @@ private[bitcoins] trait DLCTransactionProcessing extends TransactionProcessing {
}
}
/** Process incoming utxos as normal, and then update the DLC states if
* applicable
*/
override protected def processReceivedUtxos(
/** Updates DLC states for a funding transaction we've received */
private def processFundingTx(
tx: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE],
spendingInfoDbs: Vector[SpendingInfoDb],
newTags: Vector[AddressTag],
relevantReceivedOutputs: Vector[OutputWithIndex]
): Future[Vector[SpendingInfoDb]] = {
blockHashOpt: Option[DoubleSha256DigestBE]
): Future[Vector[DLCDb]] = {
val dlcDbsF = dlcDAO.findByFundingTxId(tx.txIdBE)
super
.processReceivedUtxos(
tx,
blockHashOpt,
spendingInfoDbs,
newTags,
relevantReceivedOutputs
)
.flatMap { res =>
for {
dlcDbs <- dlcDbsF
_ <-
if (dlcDbs.nonEmpty) {
logger.info(
s"Processing received utxos in tx ${tx.txIdBE.hex} for ${dlcDbs.size} DLC(s)"
)
insertTransaction(tx, blockHashOpt)
} else FutureUtil.unit
for {
dlcDbs <- dlcDbsF
_ <-
if (dlcDbs.nonEmpty) {
logger.info(
s"Processing received utxos in tx ${tx.txIdBE.hex} for ${dlcDbs.size} DLC(s)"
)
txProcessing.insertTransaction(tx, blockHashOpt)
} else FutureUtil.unit
// Update the state to be confirmed or broadcasted
updated = dlcDbs.map { dlcDb =>
dlcDb.state match {
case DLCState.Offered | DLCState.Accepted | DLCState.Signed |
DLCState.Broadcasted =>
if (blockHashOpt.isDefined)
dlcDb.updateState(DLCState.Confirmed)
else dlcDb.copy(state = DLCState.Broadcasted)
case _: DLCState.AdaptorSigComputationState =>
val contractIdOpt = dlcDb.contractIdOpt.map(_.toHex)
throw new IllegalStateException(
s"Cannot be settling a DLC when we are computing adaptor sigs! contractId=${contractIdOpt}"
)
case DLCState.Confirmed | DLCState.Claimed |
DLCState.RemoteClaimed | DLCState.Refunded =>
dlcDb
}
}
_ <- dlcDAO.updateAll(updated)
dlcIds = updated.map(_.dlcId).distinct
isRescanning <- isRescanning()
_ <- sendWsDLCStateChange(dlcIds, isRescanning)
} yield {
res
// Update the state to be confirmed or broadcasted
updated = dlcDbs.map { dlcDb =>
dlcDb.state match {
case DLCState.Offered | DLCState.Accepted | DLCState.Signed |
DLCState.Broadcasted =>
if (blockHashOpt.isDefined)
dlcDb.updateState(DLCState.Confirmed)
else dlcDb.copy(state = DLCState.Broadcasted)
case _: DLCState.AdaptorSigComputationState =>
val contractIdOpt = dlcDb.contractIdOpt.map(_.toHex)
throw new IllegalStateException(
s"Cannot be settling a DLC when we are computing adaptor sigs! contractId=${contractIdOpt}"
)
case DLCState.Confirmed | DLCState.Claimed | DLCState.RemoteClaimed |
DLCState.Refunded =>
dlcDb
}
}
_ <- dlcDAO.updateAll(updated)
dlcIds = updated.map(_.dlcId).distinct
isRescanning <- rescanHandling.isRescanning()
_ <- sendWsDLCStateChange(dlcIds, isRescanning)
} yield {
updated
}
}
/** Sends out a websocket event for the given dlcIds since their [[DLCState]]
@ -323,7 +341,7 @@ private[bitcoins] trait DLCTransactionProcessing extends TransactionProcessing {
// don't send ws events if we are rescanning the wallet
Future.unit
} else {
val updatedDlcDbsF = Future.sequence(dlcIds.map(findDLC))
val updatedDlcDbsF = Future.traverse(dlcIds)(dlcWalletApi.findDLC)
val sendF = updatedDlcDbsF.flatMap { updatedDlcDbs =>
Future.sequence {
updatedDlcDbs.map(u =>
@ -334,34 +352,29 @@ private[bitcoins] trait DLCTransactionProcessing extends TransactionProcessing {
}
}
override protected def processSpentUtxos(
/** Processes all settled DLCs in this transaction and updates their states */
private def processSettledDLCs(
transaction: Transaction,
outputsBeingSpent: Vector[SpendingInfoDb],
blockHashOpt: Option[DoubleSha256DigestBE]
): Future[Vector[SpendingInfoDb]] = {
): Future[Vector[DLCDb]] = {
val outPoints = transaction.inputs.map(_.previousOutput).toVector
val dlcDbsF = dlcDAO.findByFundingOutPoints(outPoints)
super
.processSpentUtxos(transaction, outputsBeingSpent, blockHashOpt)
.flatMap { res =>
for {
dlcDbs <- dlcDbsF
_ <-
if (dlcDbs.nonEmpty) {
logger.info(
s"Processing spent utxos in tx ${transaction.txIdBE.hex} for ${dlcDbs.size} DLC(s)"
)
insertTransaction(transaction, blockHashOpt)
} else FutureUtil.unit
for {
dlcDbs <- dlcDbsF
_ <-
if (dlcDbs.nonEmpty) {
logger.info(
s"Processing spent utxos in tx ${transaction.txIdBE.hex} for ${dlcDbs.size} DLC(s)"
)
txProcessing.insertTransaction(transaction, blockHashOpt)
} else FutureUtil.unit
withTx = dlcDbs.map(_.updateClosingTxId(transaction.txIdBE))
updatedFs = withTx.map(calculateAndSetState)
updated <- Future.sequence(updatedFs)
_ <- dlcDAO.updateAll(updated.flatten)
} yield {
res
}
}
withTx = dlcDbs.map(_.updateClosingTxId(transaction.txIdBE))
updated <- Future.traverse(withTx)(calculateAndSetState)
_ <- dlcDAO.updateAll(updated.flatten)
} yield {
updated.flatten
}
}
private def getOutcomeDbInfo(
@ -476,4 +489,98 @@ private[bitcoins] trait DLCTransactionProcessing extends TransactionProcessing {
sigsAndOutcomeOpt.get
}
def getScriptSigParams(
dlcDb: DLCDb,
fundingInputs: Vector[DLCFundingInputDb]
): Future[Vector[ScriptSignatureParams[InputInfo]]] = {
val outPoints =
fundingInputs.filter(_.isInitiator == dlcDb.isInitiator).map(_.outPoint)
val utxosF = utxoHandling.listUtxos(outPoints)
for {
utxos <- utxosF
scriptSigParams <-
FutureUtil.foldLeftAsync(
Vector.empty[ScriptSignatureParams[InputInfo]],
utxos
) { (accum, utxo) =>
transactionDAO
.findByOutPoint(utxo.outPoint)
.map(txOpt =>
utxo.toUTXOInfo(keyManager, txOpt.get.transaction) +: accum)
}
} yield scriptSigParams
}
override def processTransaction(
transaction: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE]): Future[Unit] = {
txProcessing
.processTransaction(transaction, blockHashOpt)
.flatMap(_ => processFundingTx(transaction, blockHashOpt))
.flatMap(_ => processSettledDLCs(transaction, blockHashOpt))
.map(_ => ())
}
override def processReceivedUtxos(
tx: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE],
spendingInfoDbs: Vector[SpendingInfoDb],
newTags: Vector[AddressTag],
relevantReceivedOutputs: Vector[OutputWithIndex])
: Future[Vector[SpendingInfoDb]] = {
txProcessing.processReceivedUtxos(tx,
blockHashOpt,
spendingInfoDbs,
newTags,
relevantReceivedOutputs)
}
override def processSpentUtxos(
transaction: Transaction,
outputsBeingSpent: Vector[SpendingInfoDb],
blockHashOpt: Option[DoubleSha256DigestBE])
: Future[Vector[SpendingInfoDb]] = {
txProcessing.processSpentUtxos(transaction, outputsBeingSpent, blockHashOpt)
}
/** Processes TXs originating from our wallet. This is called right after
* we've signed a TX, updating our UTXO state.
*/
override def processOurTransaction(
transaction: Transaction,
feeRate: FeeUnit,
inputAmount: CurrencyUnit,
sentAmount: CurrencyUnit,
blockHashOpt: Option[DoubleSha256DigestBE],
newTags: Vector[AddressTag]): Future[ProcessTxResult] = {
txProcessing.processOurTransaction(transaction,
feeRate,
inputAmount,
sentAmount,
blockHashOpt,
newTags)
}
override def processBlock(block: Block): Future[Unit] =
txProcessing.processBlock(block)
override def listTransactions(): Future[Vector[TransactionDb]] =
txProcessing.listTransactions()
override def findTransaction(
txId: DoubleSha256DigestBE): Future[Option[TransactionDb]] =
txProcessing.findTransaction(txId)
override def subscribeForBlockProcessingCompletionSignal(
blockHash: DoubleSha256DigestBE): Future[DoubleSha256DigestBE] = {
txProcessing.subscribeForBlockProcessingCompletionSignal(blockHash)
}
override def insertTransaction(
tx: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE]
): Future[TransactionDb] = {
txProcessing.insertTransaction(tx, blockHashOpt)
}
}

View File

@ -483,7 +483,6 @@ object DLCWalletUtil extends BitcoinSLogger {
}
_ <- dlcA.broadcastTransaction(tx)
dlcDb <- dlcA.dlcDAO.findByContractId(contractId)
_ <- verifyProperlySetTxIds(contractId = contractId, wallet = dlcA)
_ <- verifyProperlySetTxIds(contractId = contractId, wallet = dlcB)
} yield {

View File

@ -9,6 +9,7 @@ import org.bitcoins.core.currency.CurrencyUnit
import org.bitcoins.core.hd.HDAccount
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.transaction.{Transaction, TransactionOutput}
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.dlc.wallet.DLCWallet
import org.bitcoins.rpc.client.common.BitcoindRpcClient
@ -85,9 +86,10 @@ trait FundWalletUtil extends BitcoinSLogger {
}
val fundedWalletF =
txsF.flatMap(txs => wallet.processTransactions(txs, None))
txsF.flatMap(txs =>
FutureUtil.sequentially(txs)(tx => wallet.processTransaction(tx, None)))
fundedWalletF.map(_.asInstanceOf[Wallet])
fundedWalletF.map(_ => wallet)
}
def fundAccountForWalletWithBitcoind(

View File

@ -168,7 +168,7 @@ class ProcessTransactionTest extends BitcoinSWalletTest {
)
} yield fundingTx
val processedFundingTxF: Future[Wallet] = for {
val processedFundingTxF: Future[Unit] = for {
(fundingTx, _) <- fundingTxF
// make sure wallet is empty
balance <- wallet.getBalance()
@ -181,7 +181,7 @@ class ProcessTransactionTest extends BitcoinSWalletTest {
// build spending tx
val spendingTxF = for {
receivingAddress <- receivingAddressF
wallet <- processedFundingTxF
_ <- processedFundingTxF
destinations = Vector(
TransactionOutput(amount, receivingAddress.scriptPubKey)
)
@ -191,11 +191,11 @@ class ProcessTransactionTest extends BitcoinSWalletTest {
fromTagOpt = None,
markAsReserved = true
)
processedSpendingTx <- wallet.processTransaction(
_ <- wallet.processTransaction(
transaction = rawTxHelper.signedTx,
blockHashOpt = None
)
balance <- processedSpendingTx.getBalance()
balance <- wallet.getBalance()
} yield assert(balance == amount)
spendingTxF

View File

@ -109,18 +109,18 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
bitcoindAddr <- bitcoindAddrF
blockHashes <-
bitcoind.generateToAddress(blocks = numBlocks, address = bitcoindAddr)
newTxWallet <- wallet.processTransaction(
_ <- wallet.processTransaction(
transaction = tx,
blockHashOpt = blockHashes.headOption
)
balance <- newTxWallet.getBalance()
unconfirmedBalance <- newTxWallet.getUnconfirmedBalance()
balance <- wallet.getBalance()
unconfirmedBalance <- wallet.getUnconfirmedBalance()
} yield {
// balance doesn't have to exactly equal, as there was money in the
// wallet before hand.
assert(balance >= amt)
assert(amt == unconfirmedBalance)
newTxWallet
()
}
// let's clear the wallet and then do a rescan for the last numBlocks
@ -170,12 +170,12 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
bitcoindAddr <- bitcoindAddrF
blockHashes <-
bitcoind.generateToAddress(blocks = numBlocks, address = bitcoindAddr)
newTxWallet <- wallet.processTransaction(
_ <- wallet.processTransaction(
transaction = tx,
blockHashOpt = blockHashes.headOption
)
balance <- newTxWallet.getBalance()
unconfirmedBalance <- newTxWallet.getUnconfirmedBalance()
balance <- wallet.getBalance()
unconfirmedBalance <- wallet.getUnconfirmedBalance()
} yield {
// balance doesn't have to exactly equal, as there was money in the
// wallet before hand.
@ -200,7 +200,9 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
_ <- wallet.clearAllUtxos()
_ <- wallet.clearAllAddresses()
balanceAfterClear <- wallet.getBalance()
rescanState <- wallet.fullRescanNeutrinoWallet(1, true)
rescanState <- wallet.fullRescanNeutrinoWallet(addressBatchSize = 1,
force = true)
_ <- RescanState.awaitRescanDone(rescanState)
_ <- AsyncUtil.awaitConditionF(
() => wallet.getBalance().map(_ == balanceAfterPayment1),
@ -231,18 +233,18 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
bitcoindAddr <- bitcoindAddrF
blockHashes <-
bitcoind.generateToAddress(blocks = numBlocks, address = bitcoindAddr)
newTxWallet <- wallet.processTransaction(
_ <- wallet.processTransaction(
transaction = tx,
blockHashOpt = blockHashes.headOption
)
balance <- newTxWallet.getBalance()
unconfirmedBalance <- newTxWallet.getUnconfirmedBalance()
balance <- wallet.getBalance()
unconfirmedBalance <- wallet.getUnconfirmedBalance()
} yield {
// balance doesn't have to exactly equal, as there was money in the
// wallet before hand.
assert(balance >= amt)
assert(amt == unconfirmedBalance)
newTxWallet
()
}
for {
@ -519,12 +521,12 @@ class RescanHandlingTest extends BitcoinSWalletTestCachedBitcoindNewest {
bitcoindAddr <- bitcoindAddrF
blockHashes <-
bitcoind.generateToAddress(blocks = numBlocks, address = bitcoindAddr)
newTxWallet <- wallet.processTransaction(
_ <- wallet.processTransaction(
transaction = tx,
blockHashOpt = blockHashes.headOption
)
balance <- newTxWallet.getBalance()
unconfirmedBalance <- newTxWallet.getUnconfirmedBalance()
balance <- wallet.getBalance()
unconfirmedBalance <- wallet.getUnconfirmedBalance()
} yield {
// balance doesn't have to exactly equal, as there was money in the
// wallet before hand.

View File

@ -75,7 +75,6 @@ class WalletCallbackTest extends BitcoinSWalletTest {
for {
address <- wallet.getNewAddress()
tx <- wallet.sendToAddress(address, Satoshis(1000), None)
_ <- wallet.processTransaction(tx, None)
result <- resultP.future
} yield assert(result == tx)
}

View File

@ -17,7 +17,7 @@ import org.bitcoins.core.gcs.{GolombFilter, SimpleFilterMatcher}
import org.bitcoins.core.hd.*
import org.bitcoins.core.number.UInt32
import org.bitcoins.core.protocol.BitcoinAddress
import org.bitcoins.core.protocol.blockchain.ChainParams
import org.bitcoins.core.protocol.blockchain.{Block, ChainParams}
import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.protocol.transaction.*
import org.bitcoins.core.psbt.PSBT
@ -49,7 +49,6 @@ import scala.util.{Failure, Random, Success}
abstract class Wallet
extends NeutrinoHDWalletApi
with AddressHandling
with TransactionProcessing
with RescanHandling
with WalletLogger {
@ -86,6 +85,16 @@ abstract class Wallet
private[bitcoins] val stateDescriptorDAO: WalletStateDescriptorDAO =
WalletStateDescriptorDAO()
private def walletDAOs: WalletDAOs = WalletDAOs(accountDAO,
addressDAO,
addressTagDAO,
spendingInfoDAO,
transactionDAO,
incomingTxDAO,
outgoingTxDAO,
scriptPubKeyDAO,
stateDescriptorDAO)
protected lazy val safeDatabase: SafeDatabase = spendingInfoDAO.safeDatabase
val nodeApi: NodeApi
@ -105,6 +114,15 @@ abstract class Wallet
)
def accountHandling: AccountHandlingApi = AccountHandling(accountDAO)
override lazy val transactionProcessing: TransactionProcessingApi = {
TransactionProcessing(
walletApi = this,
chainQueryApi = chainQueryApi,
utxoHandling = utxoHandling,
walletDAOs = walletDAOs
)
}
def walletCallbacks: WalletCallbacks = walletConfig.callBacks
private def checkRootAccount: Future[Unit] = {
@ -158,6 +176,35 @@ abstract class Wallet
}
}
override def processBlock(block: Block): Future[Unit] = {
transactionProcessing.processBlock(block)
}
override def processTransaction(
transaction: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE]): Future[Unit] = {
transactionProcessing.processTransaction(transaction, blockHashOpt)
}
/** Processes TXs originating from our wallet. This is called right after
* we've signed a TX, updating our UTXO state.
*/
override def processOurTransaction(
transaction: Transaction,
feeRate: FeeUnit,
inputAmount: CurrencyUnit,
sentAmount: CurrencyUnit,
blockHashOpt: Option[DoubleSha256DigestBE],
newTags: Vector[AddressTag]
): Future[ProcessTxResult] = {
transactionProcessing.processOurTransaction(transaction,
feeRate,
inputAmount,
sentAmount,
blockHashOpt,
newTags)
}
override def processCompactFilters(
blockFilters: Vector[(DoubleSha256DigestBE, GolombFilter)]
): Future[Wallet] = {
@ -235,10 +282,21 @@ abstract class Wallet
override def broadcastTransaction(transaction: Transaction): Future[Unit] =
for {
_ <- nodeApi.broadcastTransaction(transaction)
_ <- processTransaction(transaction, blockHashOpt = None)
_ <- transactionProcessing.processTransaction(transaction,
blockHashOpt = None)
_ <- walletCallbacks.executeOnTransactionBroadcast(transaction)
} yield ()
override def getTransactionsToBroadcast: Future[Vector[Transaction]] = {
for {
mempoolUtxos <- spendingInfoDAO.findAllInMempool
txIds = mempoolUtxos.map { utxo =>
utxo.spendingTxIdOpt.getOrElse(utxo.txid)
}
txDbs <- transactionDAO.findByTxIdBEs(txIds)
} yield txDbs.map(_.transaction)
}
override def isEmpty(): Future[Boolean] =
for {
addressCount <- addressDAO.count()
@ -359,7 +417,7 @@ abstract class Wallet
creditingAmount = rawTxHelper.scriptSigParams.foldLeft(
CurrencyUnits.zero
)(_ + _.amount)
_ <- processOurTransaction(
_ <- transactionProcessing.processOurTransaction(
transaction = signed,
feeRate = feeRate,
inputAmount = creditingAmount,
@ -1000,6 +1058,9 @@ abstract class Wallet
override def updateUtxoPendingStates(): Future[Vector[SpendingInfoDb]] =
utxoHandling.updateUtxoPendingStates()
override def listTransactions(): Future[Vector[TransactionDb]] =
transactionProcessing.listTransactions()
override def listUtxos(): Future[Vector[SpendingInfoDb]] =
utxoHandling.listUtxos()

View File

@ -106,9 +106,35 @@ class WalletHolder(initWalletOpt: Option[DLCNeutrinoHDWalletApi])(implicit
Future(wallet).flatMap[T](_)
}
override def processBlock(block: Block): Future[WalletApi] =
override def processBlock(block: Block): Future[Unit] =
delegate(_.processBlock(block))
override def processTransaction(
transaction: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE]): Future[Unit] = {
delegate(_.processTransaction(transaction, blockHashOpt))
}
/** Processes TXs originating from our wallet. This is called right after
* we've signed a TX, updating our UTXO state.
*/
override def processOurTransaction(
transaction: Transaction,
feeRate: FeeUnit,
inputAmount: CurrencyUnit,
sentAmount: CurrencyUnit,
blockHashOpt: Option[DoubleSha256DigestBE],
newTags: Vector[AddressTag]
): Future[ProcessTxResult] = {
delegate(
_.processOurTransaction(transaction,
feeRate,
inputAmount,
sentAmount,
blockHashOpt,
newTags))
}
override def processCompactFilters(
blockFilters: Vector[(DoubleSha256DigestBE, GolombFilter)]
): Future[NeutrinoHDWalletApi] = {
@ -153,24 +179,6 @@ class WalletHolder(initWalletOpt: Option[DLCNeutrinoHDWalletApi])(implicit
res
}
override def processTransaction(
transaction: Transaction,
blockHash: Option[DoubleSha256DigestBE]
): Future[WalletApi] = delegate(_.processTransaction(transaction, blockHash))
override def findTransaction(
txId: DoubleSha256DigestBE
): Future[Option[TransactionDb]] = delegate(_.findTransaction(txId))
// override def fundRawTransaction(
// destinations: Vector[TransactionOutput],
// feeRate: FeeUnit,
// fromTagOpt: Option[AddressTag],
// markAsReserved: Boolean
// ): Future[FundRawTxHelper[ShufflingNonInteractiveFinalizer]] = delegate(
// _.fundRawTransaction(destinations, feeRate, fromTagOpt, markAsReserved)
// )
override def fundRawTransaction(
destinations: Vector[TransactionOutput],
feeRate: FeeUnit,
@ -182,10 +190,6 @@ class WalletHolder(initWalletOpt: Option[DLCNeutrinoHDWalletApi])(implicit
)
}
override def listTransactions(): Future[Vector[TransactionDb]] = delegate(
_.listTransactions()
)
override def updateUtxoPendingStates(): Future[Vector[SpendingInfoDb]] =
delegate(_.updateUtxoPendingStates())
@ -206,6 +210,10 @@ class WalletHolder(initWalletOpt: Option[DLCNeutrinoHDWalletApi])(implicit
override def listDefaultAccountUtxos(): Future[Vector[SpendingInfoDb]] = {
delegate(_.listDefaultAccountUtxos())
}
override def listTransactions(): Future[Vector[TransactionDb]] = {
delegate(_.listTransactions())
}
override def listUtxos(): Future[Vector[SpendingInfoDb]] = delegate(
_.listUtxos()
)
@ -676,14 +684,11 @@ class WalletHolder(initWalletOpt: Option[DLCNeutrinoHDWalletApi])(implicit
override def broadcastTransaction(transaction: Transaction): Future[Unit] =
delegate(_.broadcastTransaction(transaction))
override def getFeeRate(): Future[FeeUnit] = delegate(_.getFeeRate())
override def getTransactionsToBroadcast: Future[Vector[Transaction]] = {
delegate(_.getTransactionsToBroadcast)
}
override def processTransactions(
transactions: Vector[Transaction],
blockHash: Option[DoubleSha256DigestBE]
)(implicit ec: ExecutionContext): Future[WalletApi] = delegate(
_.processTransactions(transactions, blockHash)
)
override def getFeeRate(): Future[FeeUnit] = delegate(_.getFeeRate())
override def getBalance()(implicit
ec: ExecutionContext
@ -1077,26 +1082,6 @@ class WalletHolder(initWalletOpt: Option[DLCNeutrinoHDWalletApi])(implicit
): Future[Vector[SpendingInfoDb]] = {
delegate(_.findByScriptPubKey(scriptPubKey))
}
override def processOurTransaction(
transaction: Transaction,
feeRate: FeeUnit,
inputAmount: CurrencyUnit,
sentAmount: CurrencyUnit,
blockHashOpt: Option[DoubleSha256DigestBE],
newTags: Vector[AddressTag]
): Future[ProcessTxResult] = {
delegate(
_.processOurTransaction(
transaction = transaction,
feeRate = feeRate,
inputAmount = inputAmount,
sentAmount = sentAmount,
blockHashOpt = blockHashOpt,
newTags = newTags
)
)
}
}
object WalletHolder {

View File

@ -8,6 +8,10 @@ import org.bitcoins.core.api.chain.ChainQueryApi.{
InvalidBlockRange
}
import org.bitcoins.core.api.wallet.NeutrinoWalletApi.BlockMatchingResponse
import org.bitcoins.core.api.wallet.{
RescanHandlingApi,
TransactionProcessingApi
}
import org.bitcoins.core.gcs.SimpleFilterMatcher
import org.bitcoins.core.hd.{HDAccount, HDChainType}
import org.bitcoins.core.protocol.BlockStamp.BlockHeight
@ -23,12 +27,16 @@ import slick.dbio.{DBIOAction, Effect, NoStream}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
private[wallet] trait RescanHandling extends WalletLogger {
private[wallet] trait RescanHandling
extends RescanHandlingApi
with WalletLogger {
self: Wallet =>
/////////////////////
// Public facing API
def transactionProcessing: TransactionProcessingApi
override def isRescanning(): Future[Boolean] = stateDescriptorDAO.isRescanning
/** @inheritdoc */
@ -224,7 +232,8 @@ private[wallet] trait RescanHandling extends WalletLogger {
Flow[Int]
.batch[Vector[Int]](filterBatchSize, seed)(aggregate)
.via(fetchFiltersFlow)
.mapAsync(1) { case filterResponse =>
.mapAsync(1) { filterResponse =>
val heightRange = filterResponse.map(_.blockHeight)
val f =
scriptsF.flatMap { scripts =>
searchFiltersForMatches(scripts, filterResponse, parallelism)(
@ -232,15 +241,15 @@ private[wallet] trait RescanHandling extends WalletLogger {
)
}
val heightRange = filterResponse.map(_.blockHeight)
f.onComplete {
case Success(_) =>
if (heightRange.lastOption == range.lastOption) {
// complete the stream if we processed the last filter
rescanCompletePromise.success(())
}
case Failure(_) => // do nothing, the stream will fail on its own
case Failure(err) =>
// do nothing, the stream will fail on its own
logger.error(s"Failed to search filters for matches", err)
}
f
}
@ -297,7 +306,7 @@ private[wallet] trait RescanHandling extends WalletLogger {
account: HDAccount,
forceGenerateSpks: Boolean,
parallelismLevel: Int = Runtime.getRuntime.availableProcessors()
)(implicit ec: ExecutionContext): Future[RescanState.RescanStarted] = {
): Future[RescanState.RescanStarted] = {
require(addressBatchSize > 0, "batch size must be greater than zero")
require(parallelismLevel > 0, "parallelism level must be greater than zero")
for {
@ -309,10 +318,11 @@ private[wallet] trait RescanHandling extends WalletLogger {
endHeight <- endOpt.fold(chainQueryApi.getFilterCount())(
chainQueryApi.getHeightByBlockStamp
)
_ = if (startHeight > endHeight)
_ = if (startHeight > endHeight) {
throw InvalidBlockRange(
s"End position cannot precede start: $startHeight:$endHeight"
)
}
_ = logger.info(
s"Beginning to search for matches between ${startHeight}:${endHeight}"
)
@ -441,12 +451,16 @@ private[wallet] trait RescanHandling extends WalletLogger {
): Future[Unit] = {
logger.debug(s"Requesting ${blocks.size} block(s)")
blocks.foldLeft(Future.unit) { (prevF, blockHash) =>
val completedF = subscribeForBlockProcessingCompletionSignal(blockHash)
val completedF =
transactionProcessing.subscribeForBlockProcessingCompletionSignal(
blockHash)
for {
_ <- prevF
_ <- nodeApi.downloadBlocks(Vector(blockHash))
_ <- completedF
} yield ()
} yield {
()
}
}
}
@ -464,7 +478,7 @@ private[wallet] trait RescanHandling extends WalletLogger {
addressBatchSize = addressBatchSize,
account = account,
forceGenerateSpks = forceGenerateSpks
)(ExecutionContext.fromExecutor(walletConfig.rescanThreadPool))
)
} yield {
rescanState
}

View File

@ -1,7 +1,13 @@
package org.bitcoins.wallet.internal
import org.bitcoins.core.api.wallet.ProcessTxResult
import org.bitcoins.core.api.wallet.db._
import org.bitcoins.core.api.chain.ChainQueryApi
import org.bitcoins.core.api.wallet.{
ProcessTxResult,
TransactionProcessingApi,
WalletApi
}
import org.bitcoins.core.api.wallet.db.*
import org.bitcoins.core.config.NetworkParameters
import org.bitcoins.core.currency.CurrencyUnit
import org.bitcoins.core.number.UInt32
import org.bitcoins.core.protocol.BitcoinAddress
@ -14,25 +20,57 @@ import org.bitcoins.core.protocol.transaction.{
}
import org.bitcoins.core.util.TimeUtil
import org.bitcoins.core.wallet.fee.FeeUnit
import org.bitcoins.core.wallet.utxo.TxoState._
import org.bitcoins.core.wallet.utxo.TxoState.*
import org.bitcoins.core.wallet.utxo.{AddressTag, TxoState}
import org.bitcoins.crypto.DoubleSha256DigestBE
import org.bitcoins.wallet._
import org.bitcoins.db.SafeDatabase
import org.bitcoins.wallet.*
import org.bitcoins.wallet.callback.WalletCallbacks
import org.bitcoins.wallet.config.WalletAppConfig
import org.bitcoins.wallet.models.{
AddressDAO,
AddressTagDAO,
IncomingTransactionDAO,
OutgoingTransactionDAO,
ScriptPubKeyDAO,
SpendingInfoDAO,
TransactionDAO,
WalletDAOs,
WalletStateDescriptorDAO
}
import slick.dbio.{DBIOAction, Effect, NoStream}
import scala.concurrent.{Future, Promise}
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}
/** Provides functionality for processing transactions. This includes importing
* UTXOs spent to our wallet, updating confirmation counts and marking UTXOs as
* spent when spending from our wallet
*/
private[bitcoins] trait TransactionProcessing extends WalletLogger {
self: Wallet =>
import walletConfig.profile.api._
case class TransactionProcessing(
walletApi: WalletApi,
chainQueryApi: ChainQueryApi,
utxoHandling: UtxoHandling,
walletDAOs: WalletDAOs)(implicit
walletConfig: WalletAppConfig,
ec: ExecutionContext)
extends TransactionProcessingApi
with WalletLogger {
import org.bitcoins.core.currency.currencyUnitNumeric
private def walletCallbacks: WalletCallbacks = walletConfig.callBacks
private val stateDescriptorDAO: WalletStateDescriptorDAO =
walletDAOs.stateDescriptorDAO
private val spendingInfoDAO: SpendingInfoDAO = walletDAOs.utxoDAO
private val transactionDAO: TransactionDAO = walletDAOs.transactionDAO
private val scriptPubKeyDAO: ScriptPubKeyDAO = walletDAOs.scriptPubKeyDAO
private val addressDAO: AddressDAO = walletDAOs.addressDAO
private val incomingTxDAO: IncomingTransactionDAO = walletDAOs.incomingTxDAO
private val outgoingTxDAO: OutgoingTransactionDAO = walletDAOs.outgoingTxDAO
private val addressTagDAO: AddressTagDAO = walletDAOs.addressTagDAO
private val safeDatabase: SafeDatabase = stateDescriptorDAO.safeDatabase
private val networkParameters: NetworkParameters = walletConfig.network
def utxoHandling: UtxoHandling
/////////////////////
// Public facing API
@ -40,7 +78,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
override def processTransaction(
transaction: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE]
): Future[Wallet] = {
): Future[Unit] = {
for {
relevantReceivedOutputs <- getRelevantOutputs(transaction)
result <- processTransactionImpl(
@ -49,7 +87,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
newTags = Vector.empty,
receivedSpendingInfoDbsOpt = None,
spentSpendingInfoDbsOpt = None,
relevantReceivedOutputs
relevantReceivedOutputs = relevantReceivedOutputs
)
} yield {
if (result.updatedIncoming.nonEmpty || result.updatedOutgoing.nonEmpty) {
@ -58,17 +96,17 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
)
}
this
()
}
}
override def processBlock(block: Block): Future[Wallet] = {
override def processBlock(block: Block): Future[Unit] = {
val start = TimeUtil.currentEpochMs
val isEmptyF = isEmpty()
val isEmptyF = walletApi.isEmpty()
val heightF = chainQueryApi.getBlockHeight(block.blockHeader.hashBE)
heightF.foreach { heightOpt =>
logger.debug(
s"Processing block=${block.blockHeader.hash.flip.hex} heightOpt=$heightOpt"
s"Processing block=${block.blockHeader.hashBE.hex} heightOpt=$heightOpt"
)
}
@ -76,7 +114,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
case Some(height) =>
val resF = for {
isEmpty <- isEmptyF
newWallet <- {
_ <- {
if (!isEmpty) {
processBlockCachedUtxos(block)
} else {
@ -86,15 +124,15 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
Future.successful(this)
}
}
} yield newWallet
} yield ()
val f = for {
res <- resF
_ <- resF
hash = block.blockHeader.hashBE
_ <- stateDescriptorDAO.updateSyncHeight(hash, height)
_ <- walletConfig.callBacks.executeOnBlockProcessed(block)
} yield {
res
()
}
f.onComplete(failure =>
@ -116,7 +154,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
logger.warn(
s"Could not find blockheight for blockHash=${block.blockHeader.hashBE.hex}, skipping processing in wallet"
)
Future.successful(this)
Future.unit
}
}
@ -126,7 +164,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
* [[processTransaction]] is called. This significantly improves performance
* on rescans or IBD with an existing wallet
*/
private def processBlockCachedUtxos(block: Block): Future[Wallet] = {
private def processBlockCachedUtxos(block: Block): Future[Unit] = {
// fetch all received spending info dbs relevant to txs in this block to improve performance
val receivedSpendingInfoDbsF =
spendingInfoDAO
@ -145,7 +183,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
// as an optimization
val relevantReceivedOutputsForBlockF = getRelevantOutputsForBlock(block)
val resultF: Future[Future[Wallet]] = for {
val resultF: Future[Future[Unit]] = for {
// map on these first so we don't have to call
// .map everytime we iterate through a tx
// which is costly (thread overhead)
@ -161,15 +199,15 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
Some(spentSpendingInfoDbs)
}
val blockInputs = block.transactions.flatMap(_.inputs)
val wallet: Future[Wallet] = {
block.transactions.foldLeft(Future.successful(this)) {
val processedF: Future[Unit] = {
block.transactions.foldLeft(Future.successful(())) {
(walletF, transaction) =>
for {
wallet <- walletF
_ <- walletF
relevantReceivedOutputsForTx = relevantReceivedOutputsForBlock
.getOrElse(transaction.txIdBE, Vector.empty)
processTxResult <- {
wallet.processTransactionImpl(
processTransactionImpl(
transaction = transaction,
blockHashOpt = blockHashOpt,
newTags = Vector.empty,
@ -201,11 +239,11 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
cachedSpentOpt = newCachedSpentOpt
}
} yield {
this
()
}
}
}
wallet
processedF
}
resultF.flatten
@ -223,7 +261,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
/////////////////////
// Internal wallet API
protected def insertTransaction(
override def insertTransaction(
tx: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE]
): Future[TransactionDb] = {
@ -307,10 +345,10 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
/////////////////////
// Private methods
private var blockProcessingSignals =
Map.empty[DoubleSha256DigestBE, Promise[DoubleSha256DigestBE]]
private val blockProcessingSignals =
mutable.Map.empty[DoubleSha256DigestBE, Promise[DoubleSha256DigestBE]]
private[wallet] def subscribeForBlockProcessingCompletionSignal(
override def subscribeForBlockProcessingCompletionSignal(
blockHash: DoubleSha256DigestBE
): Future[DoubleSha256DigestBE] =
synchronized {
@ -318,30 +356,28 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
case Some(existingSignal) => existingSignal.future
case None =>
val newSignal = Promise[DoubleSha256DigestBE]()
blockProcessingSignals =
blockProcessingSignals.updated(blockHash, newSignal)
blockProcessingSignals.addOne((blockHash, newSignal))
newSignal.future
}
}
private def signalBlockProcessingCompletion(
blockHash: DoubleSha256DigestBE,
failure: Try[_]
): Unit =
failure: Try[?]
): Unit = {
synchronized {
logger.debug(
s"Updating wallet signal completion for ${blockHash.flip.hex}"
)
blockProcessingSignals.get(blockHash).foreach { signal =>
blockProcessingSignals =
blockProcessingSignals.filterNot(_._1 == blockHash)
failure match {
case Success(_) =>
signal.success(blockHash)
case Failure(exception) => signal.failure(exception)
}
blockProcessingSignals.remove(blockHash) match {
case Some(signal) =>
failure match {
case Success(_) =>
signal.success(blockHash)
case Failure(exception) => signal.failure(exception)
}
()
case None => ()
}
}
}
/** Processes received utxos that are contained in the given transaction
* @param transaction
@ -354,7 +390,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
* tags associated with this tx
* @return
*/
protected def processReceivedUtxos(
override def processReceivedUtxos(
transaction: Transaction,
blockHashOpt: Option[DoubleSha256DigestBE],
spendingInfoDbs: Vector[SpendingInfoDb],
@ -387,7 +423,7 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
/** Searches for outputs on the given transaction that are being spent from
* our wallet
*/
protected def processSpentUtxos(
override def processSpentUtxos(
transaction: Transaction,
outputsBeingSpent: Vector[SpendingInfoDb],
blockHashOpt: Option[DoubleSha256DigestBE]
@ -483,9 +519,9 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
}
_ <-
// only notify about our transactions
if (incoming.nonEmpty || outgoing.nonEmpty)
if (incoming.nonEmpty || outgoing.nonEmpty) {
walletCallbacks.executeOnTransactionProcessed(transaction)
else Future.unit
} else Future.unit
} yield {
ProcessTxResult(incoming, outgoing)
}
@ -809,15 +845,4 @@ private[bitcoins] trait TransactionProcessing extends WalletLogger {
}
}
}
private[wallet] def getTransactionsToBroadcast
: Future[Vector[Transaction]] = {
for {
mempoolUtxos <- spendingInfoDAO.findAllInMempool
txIds = mempoolUtxos.map { utxo =>
utxo.spendingTxIdOpt.getOrElse(utxo.txid)
}
txDbs <- transactionDAO.findByTxIdBEs(txIds)
} yield txDbs.map(_.transaction)
}
}

View File

@ -55,7 +55,7 @@ case class UtxoHandling(
}
/** Returns all the utxos originating from the given outpoints */
def listUtxos(
override def listUtxos(
outPoints: Vector[TransactionOutPoint]
): Future[Vector[SpendingInfoDb]] = {
spendingInfoDAO

View File

@ -44,7 +44,9 @@ trait WalletSync extends BitcoinSLogger {
blocksToSync <- blocksToSyncF
syncedWallet <- FutureUtil.foldLeftAsync(wallet, blocksToSync) {
case (wallet, nextBlock) =>
wallet.processBlock(nextBlock)
wallet
.processBlock(nextBlock)
.map(_ => wallet)
}
} yield syncedWallet