mirror of
https://github.com/lightningnetwork/lnd.git
synced 2024-11-19 18:10:34 +01:00
Merge pull request #5214 from carlaKC/4788-terminalshard
routing: handle failure to launch shard after permanent failure
This commit is contained in:
commit
4d358a84e4
@ -290,18 +290,19 @@ func (p *PaymentControl) RegisterAttempt(paymentHash lntypes.Hash,
|
||||
return err
|
||||
}
|
||||
|
||||
// Ensure the payment is in-flight.
|
||||
if err := ensureInFlight(p); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We cannot register a new attempt if the payment already has
|
||||
// reached a terminal condition:
|
||||
// reached a terminal condition. We check this before
|
||||
// ensureInFlight because it is a more general check.
|
||||
settle, fail := p.TerminalInfo()
|
||||
if settle != nil || fail != nil {
|
||||
return ErrPaymentTerminal
|
||||
}
|
||||
|
||||
// Ensure the payment is in-flight.
|
||||
if err := ensureInFlight(p); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Make sure any existing shards match the new one with regards
|
||||
// to MPP options.
|
||||
mpp := attempt.Route.FinalHop().MPP
|
||||
|
@ -1013,19 +1013,15 @@ func TestPaymentControlMultiShard(t *testing.T) {
|
||||
// up in the Succeeded state. If both failed the payment should
|
||||
// also be Failed at this poinnt.
|
||||
finalStatus := StatusFailed
|
||||
expRegErr := ErrPaymentAlreadyFailed
|
||||
if test.settleFirst || test.settleLast {
|
||||
finalStatus = StatusSucceeded
|
||||
expRegErr = ErrPaymentAlreadySucceeded
|
||||
}
|
||||
|
||||
assertPaymentStatus(t, pControl, info.PaymentHash, finalStatus)
|
||||
|
||||
// Finally assert we cannot register more attempts.
|
||||
_, err = pControl.RegisterAttempt(info.PaymentHash, &b)
|
||||
if err != expRegErr {
|
||||
t.Fatalf("expected error %v, got: %v", expRegErr, err)
|
||||
}
|
||||
require.Equal(t, ErrPaymentTerminal, err)
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
@ -83,7 +83,8 @@ func (m *mockPaymentAttemptDispatcher) setPaymentResult(
|
||||
}
|
||||
|
||||
type mockPaymentSessionSource struct {
|
||||
routes []*route.Route
|
||||
routes []*route.Route
|
||||
routeRelease chan struct{}
|
||||
}
|
||||
|
||||
var _ PaymentSessionSource = (*mockPaymentSessionSource)(nil)
|
||||
@ -91,7 +92,10 @@ var _ PaymentSessionSource = (*mockPaymentSessionSource)(nil)
|
||||
func (m *mockPaymentSessionSource) NewPaymentSession(
|
||||
_ *LightningPayment) (PaymentSession, error) {
|
||||
|
||||
return &mockPaymentSession{m.routes}, nil
|
||||
return &mockPaymentSession{
|
||||
routes: m.routes,
|
||||
release: m.routeRelease,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *mockPaymentSessionSource) NewPaymentSessionForRoute(
|
||||
@ -137,6 +141,11 @@ func (m *mockMissionControl) GetProbability(fromNode, toNode route.Vertex,
|
||||
|
||||
type mockPaymentSession struct {
|
||||
routes []*route.Route
|
||||
|
||||
// release is a channel that optionally blocks requesting a route
|
||||
// from our mock payment channel. If this value is nil, we will just
|
||||
// release the route automatically.
|
||||
release chan struct{}
|
||||
}
|
||||
|
||||
var _ PaymentSession = (*mockPaymentSession)(nil)
|
||||
@ -144,6 +153,10 @@ var _ PaymentSession = (*mockPaymentSession)(nil)
|
||||
func (m *mockPaymentSession) RequestRoute(_, _ lnwire.MilliSatoshi,
|
||||
_, height uint32) (*route.Route, error) {
|
||||
|
||||
if m.release != nil {
|
||||
m.release <- struct{}{}
|
||||
}
|
||||
|
||||
if len(m.routes) == 0 {
|
||||
return nil, errNoPathFound
|
||||
}
|
||||
@ -155,10 +168,9 @@ func (m *mockPaymentSession) RequestRoute(_, _ lnwire.MilliSatoshi,
|
||||
}
|
||||
|
||||
type mockPayer struct {
|
||||
sendResult chan error
|
||||
paymentResultErr chan error
|
||||
paymentResult chan *htlcswitch.PaymentResult
|
||||
quit chan struct{}
|
||||
sendResult chan error
|
||||
paymentResult chan *htlcswitch.PaymentResult
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
var _ PaymentAttemptDispatcher = (*mockPayer)(nil)
|
||||
@ -180,12 +192,16 @@ func (m *mockPayer) GetPaymentResult(paymentID uint64, _ lntypes.Hash,
|
||||
_ htlcswitch.ErrorDecrypter) (<-chan *htlcswitch.PaymentResult, error) {
|
||||
|
||||
select {
|
||||
case res := <-m.paymentResult:
|
||||
case res, ok := <-m.paymentResult:
|
||||
resChan := make(chan *htlcswitch.PaymentResult, 1)
|
||||
resChan <- res
|
||||
if !ok {
|
||||
close(resChan)
|
||||
} else {
|
||||
resChan <- res
|
||||
}
|
||||
|
||||
return resChan, nil
|
||||
case err := <-m.paymentResultErr:
|
||||
return nil, err
|
||||
|
||||
case <-m.quit:
|
||||
return nil, fmt.Errorf("test quitting")
|
||||
}
|
||||
@ -248,13 +264,13 @@ func makeMockControlTower() *mockControlTower {
|
||||
func (m *mockControlTower) InitPayment(phash lntypes.Hash,
|
||||
c *channeldb.PaymentCreationInfo) error {
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
if m.init != nil {
|
||||
m.init <- initArgs{c}
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
// Don't allow re-init a successful payment.
|
||||
if _, ok := m.successful[phash]; ok {
|
||||
return channeldb.ErrAlreadyPaid
|
||||
@ -279,27 +295,49 @@ func (m *mockControlTower) InitPayment(phash lntypes.Hash,
|
||||
func (m *mockControlTower) RegisterAttempt(phash lntypes.Hash,
|
||||
a *channeldb.HTLCAttemptInfo) error {
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
if m.registerAttempt != nil {
|
||||
m.registerAttempt <- registerAttemptArgs{a}
|
||||
}
|
||||
|
||||
// Cannot register attempts for successful or failed payments.
|
||||
if _, ok := m.successful[phash]; ok {
|
||||
return channeldb.ErrPaymentAlreadySucceeded
|
||||
}
|
||||
|
||||
if _, ok := m.failed[phash]; ok {
|
||||
return channeldb.ErrPaymentAlreadyFailed
|
||||
}
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
// Lookup payment.
|
||||
p, ok := m.payments[phash]
|
||||
if !ok {
|
||||
return channeldb.ErrPaymentNotInitiated
|
||||
}
|
||||
|
||||
var inFlight bool
|
||||
for _, a := range p.attempts {
|
||||
if a.Settle != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if a.Failure != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
inFlight = true
|
||||
}
|
||||
|
||||
// Cannot register attempts for successful or failed payments.
|
||||
_, settled := m.successful[phash]
|
||||
_, failed := m.failed[phash]
|
||||
|
||||
if settled || failed {
|
||||
return channeldb.ErrPaymentTerminal
|
||||
}
|
||||
|
||||
if settled && !inFlight {
|
||||
return channeldb.ErrPaymentAlreadySucceeded
|
||||
}
|
||||
|
||||
if failed && !inFlight {
|
||||
return channeldb.ErrPaymentAlreadyFailed
|
||||
}
|
||||
|
||||
// Add attempt to payment.
|
||||
p.attempts = append(p.attempts, channeldb.HTLCAttempt{
|
||||
HTLCAttemptInfo: *a,
|
||||
})
|
||||
@ -312,13 +350,13 @@ func (m *mockControlTower) SettleAttempt(phash lntypes.Hash,
|
||||
pid uint64, settleInfo *channeldb.HTLCSettleInfo) (
|
||||
*channeldb.HTLCAttempt, error) {
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
if m.settleAttempt != nil {
|
||||
m.settleAttempt <- settleAttemptArgs{settleInfo.Preimage}
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
// Only allow setting attempts if the payment is known.
|
||||
p, ok := m.payments[phash]
|
||||
if !ok {
|
||||
@ -353,13 +391,13 @@ func (m *mockControlTower) SettleAttempt(phash lntypes.Hash,
|
||||
func (m *mockControlTower) FailAttempt(phash lntypes.Hash, pid uint64,
|
||||
failInfo *channeldb.HTLCFailInfo) (*channeldb.HTLCAttempt, error) {
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
if m.failAttempt != nil {
|
||||
m.failAttempt <- failAttemptArgs{failInfo}
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
// Only allow failing attempts if the payment is known.
|
||||
p, ok := m.payments[phash]
|
||||
if !ok {
|
||||
@ -437,13 +475,13 @@ func (m *mockControlTower) FetchPayment(phash lntypes.Hash) (
|
||||
func (m *mockControlTower) FetchInFlightPayments() (
|
||||
[]*channeldb.InFlightPayment, error) {
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
if m.fetchInFlight != nil {
|
||||
m.fetchInFlight <- struct{}{}
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
// In flight are all payments not successful or failed.
|
||||
var fl []*channeldb.InFlightPayment
|
||||
for hash, p := range m.payments {
|
||||
|
@ -115,6 +115,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
|
||||
|
||||
// We'll continue until either our payment succeeds, or we encounter a
|
||||
// critical error during path finding.
|
||||
lifecycle:
|
||||
for {
|
||||
// Start by quickly checking if there are any outcomes already
|
||||
// available to handle before we reevaluate our state.
|
||||
@ -171,7 +172,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
|
||||
if err := shardHandler.waitForShard(); err != nil {
|
||||
return [32]byte{}, nil, err
|
||||
}
|
||||
continue
|
||||
continue lifecycle
|
||||
}
|
||||
|
||||
// Before we attempt any new shard, we'll check to see if
|
||||
@ -195,7 +196,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
|
||||
return [32]byte{}, nil, saveErr
|
||||
}
|
||||
|
||||
continue
|
||||
continue lifecycle
|
||||
|
||||
case <-p.router.quit:
|
||||
return [32]byte{}, nil, ErrRouterShuttingDown
|
||||
@ -234,7 +235,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
|
||||
return [32]byte{}, nil, saveErr
|
||||
}
|
||||
|
||||
continue
|
||||
continue lifecycle
|
||||
}
|
||||
|
||||
// We still have active shards, we'll wait for an
|
||||
@ -242,12 +243,23 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
|
||||
if err := shardHandler.waitForShard(); err != nil {
|
||||
return [32]byte{}, nil, err
|
||||
}
|
||||
continue
|
||||
continue lifecycle
|
||||
}
|
||||
|
||||
// We found a route to try, launch a new shard.
|
||||
attempt, outcome, err := shardHandler.launchShard(rt)
|
||||
if err != nil {
|
||||
switch {
|
||||
// We may get a terminal error if we've processed a shard with
|
||||
// a terminal state (settled or permanent failure), while we
|
||||
// were pathfinding. We know we're in a terminal state here,
|
||||
// so we can continue and wait for our last shards to return.
|
||||
case err == channeldb.ErrPaymentTerminal:
|
||||
log.Infof("Payment: %v in terminal state, abandoning "+
|
||||
"shard", p.paymentHash)
|
||||
|
||||
continue lifecycle
|
||||
|
||||
case err != nil:
|
||||
return [32]byte{}, nil, err
|
||||
}
|
||||
|
||||
@ -270,7 +282,7 @@ func (p *paymentLifecycle) resumePayment() ([32]byte, *route.Route, error) {
|
||||
|
||||
// Error was handled successfully, continue to make a
|
||||
// new attempt.
|
||||
continue
|
||||
continue lifecycle
|
||||
}
|
||||
|
||||
// Now that the shard was successfully sent, launch a go
|
||||
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user