contractcourt: offer second-level outputs at CSV-1

This commit moves the offering of second-level outputs one block
earlier. The sweeper will check the required locktime and wait until it
matures. This is needed so the second-level outputs can be aggregated
properly.
This commit is contained in:
yyforyongyu 2024-04-08 15:45:03 +08:00
parent c644deb49f
commit acde08c65a
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
4 changed files with 77 additions and 12 deletions

View File

@ -349,6 +349,25 @@ func (h *htlcSuccessResolver) broadcastReSignedSuccessTx() (
"height %v", h, h.htlc.RHash[:], waitHeight)
}
// Deduct one block so this input is offered to the sweeper one block
// earlier since the sweeper will wait for one block to trigger the
// sweeping.
//
// TODO(yy): this is done so the outputs can be aggregated
// properly. Suppose CSV locks of five 2nd-level outputs all
// expire at height 840000, there is a race in block digestion
// between contractcourt and sweeper:
// - G1: block 840000 received in contractcourt, it now offers
// the outputs to the sweeper.
// - G2: block 840000 received in sweeper, it now starts to
// sweep the received outputs - there's no guarantee all
// fives have been received.
// To solve this, we either offer the outputs earlier, or
// implement `blockbeat`, and force contractcourt and sweeper
// to consume each block sequentially.
waitHeight--
// TODO(yy): let sweeper handles the wait?
err := waitForHeight(waitHeight, h.Notifier, h.quit)
if err != nil {
return nil, err
@ -364,10 +383,6 @@ func (h *htlcSuccessResolver) broadcastReSignedSuccessTx() (
Index: commitSpend.SpenderInputIndex,
}
// Finally, let the sweeper sweep the second-level output.
log.Infof("%T(%x): CSV lock expired, offering second-layer "+
"output to sweeper: %v", h, h.htlc.RHash[:], op)
// Let the sweeper sweep the second-level output now that the
// CSV/CLTV locks have expired.
var witType input.StandardWitnessType
@ -380,7 +395,7 @@ func (h *htlcSuccessResolver) broadcastReSignedSuccessTx() (
op, witType,
input.LeaseHtlcAcceptedSuccessSecondLevel,
&h.htlcResolution.SweepSignDesc,
h.htlcResolution.CsvDelay, h.broadcastHeight,
h.htlcResolution.CsvDelay, uint32(commitSpend.SpendingHeight),
h.htlc.RHash,
)
@ -392,7 +407,8 @@ func (h *htlcSuccessResolver) broadcastReSignedSuccessTx() (
)
log.Infof("%T(%x): offering second-level success tx output to sweeper "+
"with no deadline and budget=%v", h, h.htlc.RHash[:], budget)
"with no deadline and budget=%v at height=%v", h,
h.htlc.RHash[:], budget, waitHeight)
// TODO(roasbeef): need to update above for leased types
_, err = h.Sweeper.SweepInput(

View File

@ -706,6 +706,25 @@ func (h *htlcTimeoutResolver) handleCommitSpend(
"height %v", h, h.htlc.RHash[:], waitHeight)
}
// Deduct one block so this input is offered to the sweeper one
// block earlier since the sweeper will wait for one block to
// trigger the sweeping.
//
// TODO(yy): this is done so the outputs can be aggregated
// properly. Suppose CSV locks of five 2nd-level outputs all
// expire at height 840000, there is a race in block digestion
// between contractcourt and sweeper:
// - G1: block 840000 received in contractcourt, it now offers
// the outputs to the sweeper.
// - G2: block 840000 received in sweeper, it now starts to
// sweep the received outputs - there's no guarantee all
// fives have been received.
// To solve this, we either offer the outputs earlier, or
// implement `blockbeat`, and force contractcourt and sweeper
// to consume each block sequentially.
waitHeight--
// TODO(yy): let sweeper handles the wait?
err := waitForHeight(waitHeight, h.Notifier, h.quit)
if err != nil {
return nil, err
@ -735,8 +754,8 @@ func (h *htlcTimeoutResolver) handleCommitSpend(
op, csvWitnessType,
input.LeaseHtlcOfferedTimeoutSecondLevel,
&h.htlcResolution.SweepSignDesc,
h.htlcResolution.CsvDelay, h.broadcastHeight,
h.htlc.RHash,
h.htlcResolution.CsvDelay,
uint32(commitSpend.SpendingHeight), h.htlc.RHash,
)
// Calculate the budget for this sweep.
budget := calculateBudget(
@ -746,8 +765,8 @@ func (h *htlcTimeoutResolver) handleCommitSpend(
)
log.Infof("%T(%x): offering second-level timeout tx output to "+
"sweeper with no deadline and budget=%v", h,
h.htlc.RHash[:], budget)
"sweeper with no deadline and budget=%v at height=%v",
h, h.htlc.RHash[:], budget, waitHeight)
_, err = h.Sweeper.SweepInput(
inp,

View File

@ -652,7 +652,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
// failed, or excluded from the sweeper and return inputs that
// are either new or has been published but failed back, which
// will be retried again here.
inputs := s.updateSweeperInputs()
s.updateSweeperInputs()
select {
// A new inputs is offered to the sweeper. We check to see if
@ -670,7 +670,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
// If this input is forced, we perform an sweep
// immediately.
if input.params.Force {
inputs = s.updateSweeperInputs()
inputs := s.updateSweeperInputs()
s.sweepPendingInputs(inputs)
}
@ -716,6 +716,9 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
// Update the sweeper to the best height.
s.currentHeight = epoch.Height
// Update the inputs with the latest height.
inputs := s.updateSweeperInputs()
log.Debugf("Received new block: height=%v, attempt "+
"sweeping %d inputs", epoch.Height, len(inputs))
@ -1497,6 +1500,17 @@ func (s *UtxoSweeper) updateSweeperInputs() InputsMap {
continue
}
// If the input has a CSV that's not yet reached, we will skip
// this input and wait for the expiry.
locktime = input.BlocksToMaturity() + input.HeightHint()
if s.currentHeight < int32(locktime)-1 {
log.Infof("Skipping input %v due to CSV expiry=%v not "+
"reached, current height is %v", op, locktime,
s.currentHeight)
continue
}
// If this input is new or has been failed to be published,
// we'd retry it. The assumption here is that when an error is
// returned from `PublishTransaction`, it means the tx has

View File

@ -2479,6 +2479,8 @@ func TestUpdateSweeperInputs(t *testing.T) {
defer inp1.AssertExpectations(t)
inp2 := &input.MockInput{}
defer inp2.AssertExpectations(t)
inp3 := &input.MockInput{}
defer inp3.AssertExpectations(t)
// Create a list of inputs using all the states.
//
@ -2486,6 +2488,8 @@ func TestUpdateSweeperInputs(t *testing.T) {
// returned.
inp1.On("RequiredLockTime").Return(
uint32(s.currentHeight), false).Once()
inp1.On("BlocksToMaturity").Return(uint32(0)).Once()
inp1.On("HeightHint").Return(uint32(s.currentHeight)).Once()
input0 := &SweeperInput{state: Init, Input: inp1}
// These inputs won't hit RequiredLockTime so we won't mock.
@ -2496,6 +2500,8 @@ func TestUpdateSweeperInputs(t *testing.T) {
// returned.
inp1.On("RequiredLockTime").Return(
uint32(s.currentHeight), false).Once()
inp1.On("BlocksToMaturity").Return(uint32(0)).Once()
inp1.On("HeightHint").Return(uint32(s.currentHeight)).Once()
input3 := &SweeperInput{state: PublishFailed, Input: inp1}
// These inputs won't hit RequiredLockTime so we won't mock.
@ -2509,6 +2515,14 @@ func TestUpdateSweeperInputs(t *testing.T) {
uint32(s.currentHeight+1), true).Once()
input7 := &SweeperInput{state: Init, Input: inp2}
// Mock the input to have a CSV expiry in the future so it will NOT be
// returned.
inp3.On("RequiredLockTime").Return(
uint32(s.currentHeight), false).Once()
inp3.On("BlocksToMaturity").Return(uint32(2)).Once()
inp3.On("HeightHint").Return(uint32(s.currentHeight)).Once()
input8 := &SweeperInput{state: Init, Input: inp3}
// Add the inputs to the sweeper. After the update, we should see the
// terminated inputs being removed.
s.inputs = map[wire.OutPoint]*SweeperInput{
@ -2520,6 +2534,7 @@ func TestUpdateSweeperInputs(t *testing.T) {
{Index: 5}: input5,
{Index: 6}: input6,
{Index: 7}: input7,
{Index: 8}: input8,
}
// We expect the inputs with `Swept`, `Excluded`, and `Failed` to be
@ -2530,6 +2545,7 @@ func TestUpdateSweeperInputs(t *testing.T) {
{Index: 2}: input2,
{Index: 3}: input3,
{Index: 7}: input7,
{Index: 8}: input8,
}
// We expect only the inputs with `Init` and `PublishFailed` to be