routing: split launchShard into registerAttempt and sendAttempt

This commit removes the method `launchShard` and splits its original
functionality into two steps - first create the attempt, second send the
attempt. This enables us to have finer control over "which error is
returned from which system and how to handle it".
This commit is contained in:
yyforyongyu 2023-03-07 18:43:14 +08:00
parent 49bafc0207
commit 7209c65ccf
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
5 changed files with 146 additions and 194 deletions

View File

@ -264,6 +264,7 @@ func (m *MPPayment) InFlightHTLCs() []HTLCAttempt {
// GetAttempt returns the specified htlc attempt on the payment.
func (m *MPPayment) GetAttempt(id uint64) (*HTLCAttempt, error) {
// TODO(yy): iteration can be slow, make it into a tree or use BS.
for _, htlc := range m.HTLCs {
htlc := htlc
if htlc.AttemptID == id {

View File

@ -273,48 +273,23 @@ lifecycle:
log.Tracef("Found route: %s", spew.Sdump(rt.Hops))
// We found a route to try, launch a new shard.
attempt, outcome, err := p.launchShard(rt, ps.RemainingAmt)
switch {
// We may get a terminal error if we've processed a shard with
// a terminal state (settled or permanent failure), while we
// were pathfinding. We know we're in a terminal state here,
// so we can continue and wait for our last shards to return.
case err == channeldb.ErrPaymentTerminal:
log.Infof("Payment %v in terminal state, abandoning "+
"shard", p.identifier)
continue lifecycle
case err != nil:
// We found a route to try, create a new HTLC attempt to try.
attempt, err := p.registerAttempt(rt, ps.RemainingAmt)
if err != nil {
return exitWithErr(err)
}
// If we encountered a non-critical error when launching the
// shard, handle it.
if outcome.err != nil {
log.Warnf("Failed to launch shard %v for "+
"payment %v: %v", attempt.AttemptID,
p.identifier, outcome.err)
// We must inspect the error to know whether it was
// critical or not, to decide whether we should
// continue trying.
_, err := p.handleSwitchErr(
attempt, outcome.err,
)
if err != nil {
return exitWithErr(err)
}
// Error was handled successfully, continue to make a
// new attempt.
continue lifecycle
// Once the attempt is created, send it to the htlcswitch.
result, err := p.sendAttempt(attempt)
if err != nil {
return exitWithErr(err)
}
// Now that the shard was successfully sent, launch a go
// routine that will handle its result when its back.
p.collectResultAsync(attempt)
if result.err == nil {
p.collectResultAsync(attempt)
}
}
}
@ -373,44 +348,6 @@ type attemptResult struct {
attempt *channeldb.HTLCAttempt
}
// launchShard creates and sends an HTLC attempt along the given route,
// registering it with the control tower before sending it. The lastShard
// argument should be true if this shard will consume the remainder of the
// amount to send. It returns the HTLCAttemptInfo that was created for the
// shard, along with a launchOutcome. The launchOutcome is used to indicate
// whether the attempt was successfully sent. If the launchOutcome wraps a
// non-nil error, it means that the attempt was not sent onto the network, so
// no result will be available in the future for it.
func (p *paymentLifecycle) launchShard(rt *route.Route,
remainingAmt lnwire.MilliSatoshi) (*channeldb.HTLCAttempt,
*attemptResult, error) {
attempt, err := p.registerAttempt(rt, remainingAmt)
if err != nil {
return nil, nil, err
}
// Now that the attempt is created and checkpointed to the DB, we send
// it.
_, sendErr := p.sendAttempt(attempt)
if sendErr != nil {
// TODO(joostjager): Distinguish unexpected internal errors
// from real send errors.
htlcAttempt, err := p.failAttempt(attempt.AttemptID, sendErr)
if err != nil {
return nil, nil, err
}
// Return a launchOutcome indicating the shard failed.
return attempt, &attemptResult{
attempt: htlcAttempt.attempt,
err: sendErr,
}, nil
}
return attempt, &attemptResult{}, nil
}
// collectResultAsync launches a goroutine that will wait for the result of the
// given HTLC attempt to be available then handle its result. It will fail the
// payment with the control tower if a terminal error is encountered.
@ -659,12 +596,8 @@ func (p *paymentLifecycle) createNewPaymentAttempt(rt *route.Route,
func (p *paymentLifecycle) sendAttempt(
attempt *channeldb.HTLCAttempt) (*attemptResult, error) {
log.Tracef("Attempting to send payment %v (pid=%v), "+
"using route: %v", p.identifier, attempt.AttemptID,
newLogClosure(func() string {
return spew.Sdump(attempt.Route)
}),
)
log.Debugf("Attempting to send payment %v (pid=%v)", p.identifier,
attempt.AttemptID)
rt := attempt.Route
@ -708,8 +641,8 @@ func (p *paymentLifecycle) sendAttempt(
return p.handleSwitchErr(attempt, err)
}
log.Debugf("Payment %v (pid=%v) successfully sent to switch, route: %v",
p.identifier, attempt.AttemptID, &attempt.Route)
log.Debugf("Attempt %v for payment %v successfully sent to switch, "+
"route: %v", attempt.AttemptID, p.identifier, &attempt.Route)
return &attemptResult{
attempt: attempt,

View File

@ -800,6 +800,15 @@ func makeSettledAttempt(total, fee int,
}
}
func makeFailedAttempt(total, fee int) *channeldb.HTLCAttempt {
return &channeldb.HTLCAttempt{
HTLCAttemptInfo: makeAttemptInfo(total, total-fee),
Failure: &channeldb.HTLCFailInfo{
Reason: channeldb.HTLCFailInternal,
},
}
}
func makeAttemptInfo(total, amtForwarded int) channeldb.HTLCAttemptInfo {
hop := &route.Hop{AmtToForward: lnwire.MilliSatoshi(amtForwarded)}
return channeldb.HTLCAttemptInfo{

View File

@ -2,7 +2,6 @@ package routing
import (
"bytes"
goErrors "errors"
"fmt"
"math"
"runtime"
@ -2505,81 +2504,82 @@ func (r *ChannelRouter) sendToRoute(htlcHash lntypes.Hash, rt *route.Route,
r, 0, paymentIdentifier, nil, shardTracker, 0, 0,
)
var shardError error
attempt, outcome, err := p.launchShard(rt, 0)
// With SendToRoute, it can happen that the route exceeds protocol
// constraints. Mark the payment as failed with an internal error.
if err == route.ErrMaxRouteHopsExceeded ||
err == sphinx.ErrMaxRoutingInfoSizeExceeded {
log.Debugf("Invalid route provided for payment %x: %v",
paymentIdentifier, err)
controlErr := r.cfg.Control.FailPayment(
paymentIdentifier, channeldb.FailureReasonError,
)
if controlErr != nil {
return nil, controlErr
}
}
// In any case, don't continue if there is an error.
// We found a route to try, create a new HTLC attempt to try.
//
// NOTE: we use zero `remainingAmt` here to simulate the same effect of
// setting the lastShard to be false, which is used by previous
// implementation.
attempt, err := p.registerAttempt(rt, 0)
if err != nil {
return nil, err
}
var htlcAttempt *channeldb.HTLCAttempt
switch {
// Failed to launch shard.
case outcome.err != nil:
shardError = outcome.err
htlcAttempt = outcome.attempt
// Once the attempt is created, send it to the htlcswitch. Notice that
// the `err` returned here has already been processed by
// `handleSwitchErr`, which means if there's a terminal failure, the
// payment has been failed.
result, err := p.sendAttempt(attempt)
if err != nil {
return nil, err
}
// Shard successfully launched, wait for the result to be available.
default:
result, err := p.collectResult(attempt)
// We now lookup the payment to see if it's already failed.
payment, err := p.router.cfg.Control.FetchPayment(p.identifier)
if err != nil {
return result.attempt, err
}
// Exit if the above error has caused the payment to be failed, we also
// return the error from sending attempt to mimic the old behavior of
// this method.
if payment.GetFailureReason() != nil {
return result.attempt, result.err
}
// Since for SendToRoute we won't retry in case the shard fails, we'll
// mark the payment failed with the control tower immediately if the
// skipTempErr is false.
reason := channeldb.FailureReasonError
// If we failed to send the HTLC, we need to further decide if we want
// to fail the payment.
if result.err != nil {
// If skipTempErr, we'll return the attempt and the temp error.
if skipTempErr {
return result.attempt, result.err
}
// Otherwise we need to fail the payment.
err := r.cfg.Control.FailPayment(paymentIdentifier, reason)
if err != nil {
return nil, err
}
// We got a successful result.
if result.err == nil {
return result.attempt, nil
}
// The shard failed, break switch to handle it.
shardError = result.err
htlcAttempt = result.attempt
return result.attempt, result.err
}
// Since for SendToRoute we won't retry in case the shard fails, we'll
// mark the payment failed with the control tower immediately. Process
// the error to check if it maps into a terminal error code, if not use
// a generic NO_ROUTE error.
var failureReason *channeldb.FailureReason
_, err = p.handleSwitchErr(attempt, shardError)
switch {
// If a non-terminal error is returned and `skipTempErr` is false, then
// we'll use the normal no route error.
case err == nil && !skipTempErr:
err = r.cfg.Control.FailPayment(
paymentIdentifier, channeldb.FailureReasonNoRoute,
)
// If this is a failure reason, then we'll apply the failure directly
// to the control tower, and return the normal response to the caller.
case goErrors.As(err, &failureReason):
err = r.cfg.Control.FailPayment(
paymentIdentifier, *failureReason,
)
}
// The attempt was successfully sent, wait for the result to be
// available.
result, err = p.collectResult(attempt)
if err != nil {
return nil, err
}
return htlcAttempt, shardError
// We got a successful result.
if result.err == nil {
return result.attempt, nil
}
// An error returned from collecting the result, we'll mark the payment
// as failed if we don't skip temp error.
if !skipTempErr {
err := r.cfg.Control.FailPayment(paymentIdentifier, reason)
if err != nil {
return nil, err
}
}
return result.attempt, result.err
}
// sendPayment attempts to send a payment to the passed payment hash. This

View File

@ -3025,8 +3025,8 @@ func TestSendToRouteMaxHops(t *testing.T) {
// Send off the payment request to the router. We expect an error back
// indicating that the route is too long.
var payment lntypes.Hash
_, err = ctx.router.SendToRoute(payment, rt)
var payHash lntypes.Hash
_, err = ctx.router.SendToRoute(payHash, rt)
if err != route.ErrMaxRouteHopsExceeded {
t.Fatalf("expected ErrMaxRouteHopsExceeded, but got %v", err)
}
@ -4272,11 +4272,13 @@ func TestBlockDifferenceFix(t *testing.T) {
// TestSendToRouteSkipTempErrSuccess validates a successful payment send.
func TestSendToRouteSkipTempErrSuccess(t *testing.T) {
var (
payHash lntypes.Hash
payAmt = lnwire.MilliSatoshi(10000)
testAttempt = &channeldb.HTLCAttempt{}
payHash lntypes.Hash
payAmt = lnwire.MilliSatoshi(10000)
)
preimage := lntypes.Preimage{1}
testAttempt := makeSettledAttempt(int(payAmt), 0, preimage)
node, err := createTestNode()
require.NoError(t, err)
@ -4313,7 +4315,7 @@ func TestSendToRouteSkipTempErrSuccess(t *testing.T) {
controlTower.On("RegisterAttempt", payHash, mock.Anything).Return(nil)
controlTower.On("SettleAttempt",
payHash, mock.Anything, mock.Anything,
).Return(testAttempt, nil)
).Return(&testAttempt, nil)
payer.On("SendHTLC",
mock.Anything, mock.Anything, mock.Anything,
@ -4332,15 +4334,23 @@ func TestSendToRouteSkipTempErrSuccess(t *testing.T) {
mock.Anything, rt,
).Return(nil)
// Mock the control tower to return the mocked payment.
payment := &mockMPPayment{}
controlTower.On("FetchPayment", payHash).Return(payment, nil).Once()
// Mock the payment to return nil failrue reason.
payment.On("GetFailureReason").Return(nil)
// Expect a successful send to route.
attempt, err := router.SendToRouteSkipTempErr(payHash, rt)
require.NoError(t, err)
require.Equal(t, testAttempt, attempt)
require.Equal(t, &testAttempt, attempt)
// Assert the above methods are called as expected.
controlTower.AssertExpectations(t)
payer.AssertExpectations(t)
missionControl.AssertExpectations(t)
payment.AssertExpectations(t)
}
// TestSendToRouteSkipTempErrTempFailure validates a temporary failure won't
@ -4413,11 +4423,19 @@ func TestSendToRouteSkipTempErrTempFailure(t *testing.T) {
}
})
// Return a nil reason to mock a temporary failure.
// Mock the control tower to return the mocked payment.
payment := &mockMPPayment{}
controlTower.On("FetchPayment", payHash).Return(payment, nil).Once()
// Mock the mission control to return a nil reason from reporting the
// attempt failure.
missionControl.On("ReportPaymentFail",
mock.Anything, rt, mock.Anything, mock.Anything,
).Return(nil, nil)
// Mock the payment to return nil failrue reason.
payment.On("GetFailureReason").Return(nil)
// Expect a failed send to route.
attempt, err := router.SendToRouteSkipTempErr(payHash, rt)
require.Equal(t, tempErr, err)
@ -4427,17 +4445,18 @@ func TestSendToRouteSkipTempErrTempFailure(t *testing.T) {
controlTower.AssertExpectations(t)
payer.AssertExpectations(t)
missionControl.AssertExpectations(t)
payment.AssertExpectations(t)
}
// TestSendToRouteSkipTempErrPermanentFailure validates a permanent failure
// will fail the payment.
func TestSendToRouteSkipTempErrPermanentFailure(t *testing.T) {
var (
payHash lntypes.Hash
payAmt = lnwire.MilliSatoshi(10000)
testAttempt = &channeldb.HTLCAttempt{}
payHash lntypes.Hash
payAmt = lnwire.MilliSatoshi(10000)
)
testAttempt := makeFailedAttempt(int(payAmt), 0)
node, err := createTestNode()
require.NoError(t, err)
@ -4469,9 +4488,15 @@ func TestSendToRouteSkipTempErrPermanentFailure(t *testing.T) {
},
}}
// Create the error to be returned.
permErr := htlcswitch.NewForwardingError(
&lnwire.FailIncorrectDetails{}, 1,
)
// Register mockers with the expected method calls.
controlTower.On("InitPayment", payHash, mock.Anything).Return(nil)
controlTower.On("RegisterAttempt", payHash, mock.Anything).Return(nil)
controlTower.On("FailAttempt",
payHash, mock.Anything, mock.Anything,
).Return(testAttempt, nil)
@ -4479,34 +4504,23 @@ func TestSendToRouteSkipTempErrPermanentFailure(t *testing.T) {
// Expect the payment to be failed.
controlTower.On("FailPayment", payHash, mock.Anything).Return(nil)
// Mock an error to be returned from sending the htlc.
payer.On("SendHTLC",
mock.Anything, mock.Anything, mock.Anything,
).Return(nil)
).Return(permErr)
// Create a buffered chan and it will be returned by GetAttemptResult.
payer.resultChan = make(chan *htlcswitch.PaymentResult, 1)
// Create the error to be returned.
permErr := htlcswitch.NewForwardingError(
&lnwire.FailIncorrectDetails{}, 1,
)
// Mock GetAttemptResult to return a failure.
payer.On("GetAttemptResult",
mock.Anything, mock.Anything, mock.Anything,
).Run(func(_ mock.Arguments) {
// Send a permanent failure.
payer.resultChan <- &htlcswitch.PaymentResult{
Error: permErr,
}
})
// Return a reason to mock a permanent failure.
failureReason := channeldb.FailureReasonPaymentDetails
missionControl.On("ReportPaymentFail",
mock.Anything, rt, mock.Anything, mock.Anything,
).Return(&failureReason, nil)
// Mock the control tower to return the mocked payment.
payment := &mockMPPayment{}
controlTower.On("FetchPayment", payHash).Return(payment, nil).Once()
// Mock the payment to return a failrue reason.
payment.On("GetFailureReason").Return(&failureReason)
// Expect a failed send to route.
attempt, err := router.SendToRouteSkipTempErr(payHash, rt)
require.Equal(t, permErr, err)
@ -4516,17 +4530,18 @@ func TestSendToRouteSkipTempErrPermanentFailure(t *testing.T) {
controlTower.AssertExpectations(t)
payer.AssertExpectations(t)
missionControl.AssertExpectations(t)
payment.AssertExpectations(t)
}
// TestSendToRouteTempFailure validates a temporary failure will cause the
// payment to be failed.
func TestSendToRouteTempFailure(t *testing.T) {
var (
payHash lntypes.Hash
payAmt = lnwire.MilliSatoshi(10000)
testAttempt = &channeldb.HTLCAttempt{}
payHash lntypes.Hash
payAmt = lnwire.MilliSatoshi(10000)
)
testAttempt := makeFailedAttempt(int(payAmt), 0)
node, err := createTestNode()
require.NoError(t, err)
@ -4558,6 +4573,11 @@ func TestSendToRouteTempFailure(t *testing.T) {
},
}}
// Create the error to be returned.
tempErr := htlcswitch.NewForwardingError(
&lnwire.FailTemporaryChannelFailure{}, 1,
)
// Register mockers with the expected method calls.
controlTower.On("InitPayment", payHash, mock.Anything).Return(nil)
controlTower.On("RegisterAttempt", payHash, mock.Anything).Return(nil)
@ -4570,26 +4590,14 @@ func TestSendToRouteTempFailure(t *testing.T) {
payer.On("SendHTLC",
mock.Anything, mock.Anything, mock.Anything,
).Return(nil)
).Return(tempErr)
// Create a buffered chan and it will be returned by GetAttemptResult.
payer.resultChan = make(chan *htlcswitch.PaymentResult, 1)
// Mock the control tower to return the mocked payment.
payment := &mockMPPayment{}
controlTower.On("FetchPayment", payHash).Return(payment, nil).Once()
// Create the error to be returned.
tempErr := htlcswitch.NewForwardingError(
&lnwire.FailTemporaryChannelFailure{},
1,
)
// Mock GetAttemptResult to return a failure.
payer.On("GetAttemptResult",
mock.Anything, mock.Anything, mock.Anything,
).Run(func(_ mock.Arguments) {
// Send an attempt failure.
payer.resultChan <- &htlcswitch.PaymentResult{
Error: tempErr,
}
})
// Mock the payment to return nil failrue reason.
payment.On("GetFailureReason").Return(nil)
// Return a nil reason to mock a temporary failure.
missionControl.On("ReportPaymentFail",
@ -4605,6 +4613,7 @@ func TestSendToRouteTempFailure(t *testing.T) {
controlTower.AssertExpectations(t)
payer.AssertExpectations(t)
missionControl.AssertExpectations(t)
payment.AssertExpectations(t)
}
// TestNewRouteRequest tests creation of route requests for blinded and