sweep: make TxPublisher.currentHeight atomic

This commit is contained in:
yyforyongyu 2024-05-01 12:16:54 +08:00
parent e8a1d4876e
commit 3f8da16b77
No known key found for this signature in database
GPG key ID: 9BCD95C4FF296868

View file

@ -267,7 +267,7 @@ type TxPublisher struct {
cfg *TxPublisherConfig
// currentHeight is the current block height.
currentHeight int32
currentHeight atomic.Int32
// records is a map keyed by the requestCounter and the value is the tx
// being monitored.
@ -373,7 +373,9 @@ func (t *TxPublisher) initializeFeeFunction(
}
// Get the initial conf target.
confTarget := calcCurrentConfTarget(t.currentHeight, req.DeadlineHeight)
confTarget := calcCurrentConfTarget(
t.currentHeight.Load(), req.DeadlineHeight,
)
log.Debugf("Initializing fee function with conf target=%v, budget=%v, "+
"maxFeeRateAllowed=%v", confTarget, req.Budget,
@ -542,7 +544,7 @@ func (t *TxPublisher) broadcast(requestID uint64) (*BumpResult, error) {
tx := record.tx
log.Debugf("Publishing sweep tx %v, num_inputs=%v, height=%v",
txid, len(tx.TxIn), t.currentHeight)
txid, len(tx.TxIn), t.currentHeight.Load())
// Set the event, and change it to TxFailed if the wallet fails to
// publish it.
@ -715,7 +717,7 @@ func (t *TxPublisher) monitor(blockEvent *chainntnfs.BlockEpochEvent) {
epoch.Height)
// Update the best known height for the publisher.
t.currentHeight = epoch.Height
t.currentHeight.Store(epoch.Height)
// Check all monitored txns to see if any of them needs
// to be bumped.
@ -788,7 +790,7 @@ func (t *TxPublisher) processRecords() {
}
// Get the current height to be used in the following goroutines.
currentHeight := t.currentHeight
currentHeight := t.currentHeight.Load()
// For records that are not confirmed, we perform a fee bump if needed.
for requestID, r := range feeBumpRecords {
@ -854,7 +856,7 @@ func (t *TxPublisher) handleFeeBumpTx(requestID uint64, r *monitorRecord,
// TODO(yy): send this error back to the sweeper so it can
// re-group the inputs?
log.Errorf("Failed to increase fee rate for tx %v at "+
"height=%v: %v", oldTxid, t.currentHeight, err)
"height=%v: %v", oldTxid, t.currentHeight.Load(), err)
return
}
@ -862,7 +864,7 @@ func (t *TxPublisher) handleFeeBumpTx(requestID uint64, r *monitorRecord,
// If the fee rate was not increased, there's no need to bump the fee.
if !increased {
log.Tracef("Skip bumping tx %v at height=%v", oldTxid,
t.currentHeight)
t.currentHeight.Load())
return
}
@ -1122,7 +1124,7 @@ func (t *TxPublisher) createSweepTx(inputs []input.Input, changePkScript []byte,
// Validate and calculate the fee and change amount.
txFee, changeAmtOpt, locktimeOpt, err := prepareSweepTx(
inputs, changePkScript, feeRate, t.currentHeight,
inputs, changePkScript, feeRate, t.currentHeight.Load(),
)
if err != nil {
return nil, 0, err
@ -1178,7 +1180,7 @@ func (t *TxPublisher) createSweepTx(inputs []input.Input, changePkScript []byte,
// We'll default to using the current block height as locktime, if none
// of the inputs commits to a different locktime.
sweepTx.LockTime = uint32(locktimeOpt.UnwrapOr(t.currentHeight))
sweepTx.LockTime = uint32(locktimeOpt.UnwrapOr(t.currentHeight.Load()))
prevInputFetcher, err := input.MultiPrevOutFetcher(inputs)
if err != nil {