diff --git a/contractcourt/htlc_success_resolver.go b/contractcourt/htlc_success_resolver.go index cf2894501..213df8e4a 100644 --- a/contractcourt/htlc_success_resolver.go +++ b/contractcourt/htlc_success_resolver.go @@ -349,6 +349,25 @@ func (h *htlcSuccessResolver) broadcastReSignedSuccessTx() ( "height %v", h, h.htlc.RHash[:], waitHeight) } + // Deduct one block so this input is offered to the sweeper one block + // earlier since the sweeper will wait for one block to trigger the + // sweeping. + // + // TODO(yy): this is done so the outputs can be aggregated + // properly. Suppose CSV locks of five 2nd-level outputs all + // expire at height 840000, there is a race in block digestion + // between contractcourt and sweeper: + // - G1: block 840000 received in contractcourt, it now offers + // the outputs to the sweeper. + // - G2: block 840000 received in sweeper, it now starts to + // sweep the received outputs - there's no guarantee all + // fives have been received. + // To solve this, we either offer the outputs earlier, or + // implement `blockbeat`, and force contractcourt and sweeper + // to consume each block sequentially. + waitHeight-- + + // TODO(yy): let sweeper handles the wait? err := waitForHeight(waitHeight, h.Notifier, h.quit) if err != nil { return nil, err @@ -364,10 +383,6 @@ func (h *htlcSuccessResolver) broadcastReSignedSuccessTx() ( Index: commitSpend.SpenderInputIndex, } - // Finally, let the sweeper sweep the second-level output. - log.Infof("%T(%x): CSV lock expired, offering second-layer "+ - "output to sweeper: %v", h, h.htlc.RHash[:], op) - // Let the sweeper sweep the second-level output now that the // CSV/CLTV locks have expired. var witType input.StandardWitnessType @@ -380,7 +395,7 @@ func (h *htlcSuccessResolver) broadcastReSignedSuccessTx() ( op, witType, input.LeaseHtlcAcceptedSuccessSecondLevel, &h.htlcResolution.SweepSignDesc, - h.htlcResolution.CsvDelay, h.broadcastHeight, + h.htlcResolution.CsvDelay, uint32(commitSpend.SpendingHeight), h.htlc.RHash, ) @@ -392,7 +407,8 @@ func (h *htlcSuccessResolver) broadcastReSignedSuccessTx() ( ) log.Infof("%T(%x): offering second-level success tx output to sweeper "+ - "with no deadline and budget=%v", h, h.htlc.RHash[:], budget) + "with no deadline and budget=%v at height=%v", h, + h.htlc.RHash[:], budget, waitHeight) // TODO(roasbeef): need to update above for leased types _, err = h.Sweeper.SweepInput( diff --git a/contractcourt/htlc_timeout_resolver.go b/contractcourt/htlc_timeout_resolver.go index 0b749039d..8d3f7504d 100644 --- a/contractcourt/htlc_timeout_resolver.go +++ b/contractcourt/htlc_timeout_resolver.go @@ -706,6 +706,25 @@ func (h *htlcTimeoutResolver) handleCommitSpend( "height %v", h, h.htlc.RHash[:], waitHeight) } + // Deduct one block so this input is offered to the sweeper one + // block earlier since the sweeper will wait for one block to + // trigger the sweeping. + // + // TODO(yy): this is done so the outputs can be aggregated + // properly. Suppose CSV locks of five 2nd-level outputs all + // expire at height 840000, there is a race in block digestion + // between contractcourt and sweeper: + // - G1: block 840000 received in contractcourt, it now offers + // the outputs to the sweeper. + // - G2: block 840000 received in sweeper, it now starts to + // sweep the received outputs - there's no guarantee all + // fives have been received. + // To solve this, we either offer the outputs earlier, or + // implement `blockbeat`, and force contractcourt and sweeper + // to consume each block sequentially. + waitHeight-- + + // TODO(yy): let sweeper handles the wait? err := waitForHeight(waitHeight, h.Notifier, h.quit) if err != nil { return nil, err @@ -735,8 +754,8 @@ func (h *htlcTimeoutResolver) handleCommitSpend( op, csvWitnessType, input.LeaseHtlcOfferedTimeoutSecondLevel, &h.htlcResolution.SweepSignDesc, - h.htlcResolution.CsvDelay, h.broadcastHeight, - h.htlc.RHash, + h.htlcResolution.CsvDelay, + uint32(commitSpend.SpendingHeight), h.htlc.RHash, ) // Calculate the budget for this sweep. budget := calculateBudget( @@ -746,8 +765,8 @@ func (h *htlcTimeoutResolver) handleCommitSpend( ) log.Infof("%T(%x): offering second-level timeout tx output to "+ - "sweeper with no deadline and budget=%v", h, - h.htlc.RHash[:], budget) + "sweeper with no deadline and budget=%v at height=%v", + h, h.htlc.RHash[:], budget, waitHeight) _, err = h.Sweeper.SweepInput( inp, diff --git a/sweep/sweeper.go b/sweep/sweeper.go index 9252fb2fa..982d9ad84 100644 --- a/sweep/sweeper.go +++ b/sweep/sweeper.go @@ -652,7 +652,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { // failed, or excluded from the sweeper and return inputs that // are either new or has been published but failed back, which // will be retried again here. - inputs := s.updateSweeperInputs() + s.updateSweeperInputs() select { // A new inputs is offered to the sweeper. We check to see if @@ -670,7 +670,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { // If this input is forced, we perform an sweep // immediately. if input.params.Force { - inputs = s.updateSweeperInputs() + inputs := s.updateSweeperInputs() s.sweepPendingInputs(inputs) } @@ -716,6 +716,9 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) { // Update the sweeper to the best height. s.currentHeight = epoch.Height + // Update the inputs with the latest height. + inputs := s.updateSweeperInputs() + log.Debugf("Received new block: height=%v, attempt "+ "sweeping %d inputs", epoch.Height, len(inputs)) @@ -1497,6 +1500,17 @@ func (s *UtxoSweeper) updateSweeperInputs() InputsMap { continue } + // If the input has a CSV that's not yet reached, we will skip + // this input and wait for the expiry. + locktime = input.BlocksToMaturity() + input.HeightHint() + if s.currentHeight < int32(locktime)-1 { + log.Infof("Skipping input %v due to CSV expiry=%v not "+ + "reached, current height is %v", op, locktime, + s.currentHeight) + + continue + } + // If this input is new or has been failed to be published, // we'd retry it. The assumption here is that when an error is // returned from `PublishTransaction`, it means the tx has diff --git a/sweep/sweeper_test.go b/sweep/sweeper_test.go index fde472450..450e434e7 100644 --- a/sweep/sweeper_test.go +++ b/sweep/sweeper_test.go @@ -2479,6 +2479,8 @@ func TestUpdateSweeperInputs(t *testing.T) { defer inp1.AssertExpectations(t) inp2 := &input.MockInput{} defer inp2.AssertExpectations(t) + inp3 := &input.MockInput{} + defer inp3.AssertExpectations(t) // Create a list of inputs using all the states. // @@ -2486,6 +2488,8 @@ func TestUpdateSweeperInputs(t *testing.T) { // returned. inp1.On("RequiredLockTime").Return( uint32(s.currentHeight), false).Once() + inp1.On("BlocksToMaturity").Return(uint32(0)).Once() + inp1.On("HeightHint").Return(uint32(s.currentHeight)).Once() input0 := &SweeperInput{state: Init, Input: inp1} // These inputs won't hit RequiredLockTime so we won't mock. @@ -2496,6 +2500,8 @@ func TestUpdateSweeperInputs(t *testing.T) { // returned. inp1.On("RequiredLockTime").Return( uint32(s.currentHeight), false).Once() + inp1.On("BlocksToMaturity").Return(uint32(0)).Once() + inp1.On("HeightHint").Return(uint32(s.currentHeight)).Once() input3 := &SweeperInput{state: PublishFailed, Input: inp1} // These inputs won't hit RequiredLockTime so we won't mock. @@ -2509,6 +2515,14 @@ func TestUpdateSweeperInputs(t *testing.T) { uint32(s.currentHeight+1), true).Once() input7 := &SweeperInput{state: Init, Input: inp2} + // Mock the input to have a CSV expiry in the future so it will NOT be + // returned. + inp3.On("RequiredLockTime").Return( + uint32(s.currentHeight), false).Once() + inp3.On("BlocksToMaturity").Return(uint32(2)).Once() + inp3.On("HeightHint").Return(uint32(s.currentHeight)).Once() + input8 := &SweeperInput{state: Init, Input: inp3} + // Add the inputs to the sweeper. After the update, we should see the // terminated inputs being removed. s.inputs = map[wire.OutPoint]*SweeperInput{ @@ -2520,6 +2534,7 @@ func TestUpdateSweeperInputs(t *testing.T) { {Index: 5}: input5, {Index: 6}: input6, {Index: 7}: input7, + {Index: 8}: input8, } // We expect the inputs with `Swept`, `Excluded`, and `Failed` to be @@ -2530,6 +2545,7 @@ func TestUpdateSweeperInputs(t *testing.T) { {Index: 2}: input2, {Index: 3}: input3, {Index: 7}: input7, + {Index: 8}: input8, } // We expect only the inputs with `Init` and `PublishFailed` to be