sweep: start tracking inputs spent by unknown tx

This commit adds a new field `InputsSpent` to the `BumpResult` so they
can be used to track inputs spent by txns not recoginized by the fee
bumper.
This commit is contained in:
yyforyongyu 2025-01-24 05:49:19 +08:00
parent 388183e173
commit 2f1205a394
No known key found for this signature in database
GPG key ID: 9BCD95C4FF296868
2 changed files with 83 additions and 7 deletions

View file

@ -273,6 +273,10 @@ type BumpResult struct {
// Err is the error that occurred during the broadcast.
Err error
// SpentInputs are the inputs spent by another tx which caused the
// current tx to be failed.
SpentInputs map[wire.OutPoint]*wire.MsgTx
// requestID is the ID of the request that created this record.
requestID uint64
}
@ -812,6 +816,10 @@ type monitorRecord struct {
// outpointToTxIndex is a map of outpoint to tx index.
outpointToTxIndex map[wire.OutPoint]int
// spentInputs are the inputs spent by another tx which caused the
// current tx failed.
spentInputs map[wire.OutPoint]*wire.MsgTx
}
// Start starts the publisher by subscribing to block epoch updates and kicking
@ -910,6 +918,9 @@ func (t *TxPublisher) processRecords() {
// If the any of the inputs has been spent, the record will be
// marked as failed or confirmed.
if len(spends) != 0 {
// Attach the spending txns.
r.spentInputs = spends
// When tx is nil, it means we haven't tried the initial
// broadcast yet the input is already spent. This could
// happen when the node shuts down, a previous sweeping
@ -1159,16 +1170,71 @@ func (t *TxPublisher) handleUnknownSpent(r *monitorRecord) {
"bumper, failing it now:\n%v", r.requestID,
inputTypeSummary(r.req.Inputs))
// Create a result that will be sent to the resultChan which is
// listened by the caller.
// Create a result that will be sent to the resultChan which is listened
// by the caller.
result := &BumpResult{
Event: TxUnknownSpend,
Tx: r.tx,
requestID: r.requestID,
Err: ErrUnknownSpent,
Event: TxUnknownSpend,
Tx: r.tx,
requestID: r.requestID,
Err: ErrUnknownSpent,
SpentInputs: r.spentInputs,
}
// Notify that this tx is confirmed and remove the record from the map.
// Get the fee function, which will be used to decided the next fee rate
// to use if the sweeper decides to retry sweeping this input.
feeFunc := r.feeFunction
// When the record is failed before the initial broadcast is attempted,
// it will have a nil fee func. In this case, we'll create the fee func
// here.
//
// NOTE: Since the current record is failed and will be deleted, we
// don't need to update the record on this fee function. We only need
// the fee rate data so the sweeper can pick up where we left off.
if feeFunc == nil {
f, err := t.initializeFeeFunction(r.req)
// TODO(yy): The only error we would receive here is when the
// pkScript is not recognized by the weightEstimator. What we
// should do instead is to check the pkScript immediately after
// receiving a sweep request so we don't need to check it again,
// which will also save us from error checking from several
// callsites.
if err != nil {
log.Errorf("Failed to create fee func for record %v: "+
"%v", r.requestID, err)
// Overwrite the event and error so the sweeper will
// remove this input.
result.Event = TxFatal
result.Err = err
// Notify the sweeper about this result in the end.
t.handleResult(result)
return
}
feeFunc = f
}
// Since the sweeping tx has been replaced by another party's tx, we
// missed this block window to increase its fee rate. To make sure the
// fee rate stays in the initial line, we now ask the fee function to
// give us the next fee rate as if the sweeping tx were RBFed. This new
// fee rate will be used as the starting fee rate if the upper system
// decides to continue sweeping the rest of the inputs.
_, err := feeFunc.Increment()
if err != nil {
// The fee function has reached its max position - nothing we
// can do here other than letting the user increase the budget.
log.Errorf("Failed to calculate the next fee rate for "+
"Record(%v): %v", r.requestID, err)
}
// Attach the new fee rate to be used for the next sweeping attempt.
result.FeeRate = feeFunc.FeeRate()
// Notify the sweeper about this result in the end.
t.handleResult(result)
}

View file

@ -1772,6 +1772,13 @@ func TestProcessRecordsSpent(t *testing.T) {
tp.subscriberChans.Store(requestID, subscriber)
tp.records.Store(requestID, recordConfirmed)
// Mock the fee function to increase feerate.
m.feeFunc.On("Increment").Return(true, nil).Once()
// Create a test feerate and return it from the mock fee function.
feerate := chainfee.SatPerKWeight(1000)
m.feeFunc.On("FeeRate").Return(feerate)
// Call processRecords and expect the results are notified back.
tp.processRecords()
@ -1785,6 +1792,9 @@ func TestProcessRecordsSpent(t *testing.T) {
require.Equal(t, TxUnknownSpend, result.Event)
require.Equal(t, tx, result.Tx)
// We expect the fee rate to be updated.
require.Equal(t, feerate, result.FeeRate)
// No error should be set.
require.ErrorIs(t, result.Err, ErrUnknownSpent)
require.Equal(t, requestID, result.requestID)