diff --git a/channeldb/forwarding_package.go b/channeldb/forwarding_package.go index 09f5671cc..4c447fc03 100644 --- a/channeldb/forwarding_package.go +++ b/channeldb/forwarding_package.go @@ -211,6 +211,11 @@ func (f *PkgFilter) Decode(r io.Reader) error { return err } +// String returns a human-readable string. +func (f *PkgFilter) String() string { + return fmt.Sprintf("count=%v, filter=%v", f.count, f.filter) +} + // FwdPkg records all adds, settles, and fails that were locked in as a result // of the remote peer sending us a revocation. Each package is identified by // the short chanid and remote commitment height corresponding to the revocation diff --git a/channeldb/payment_control.go b/channeldb/payment_control.go index 0eadf4b1f..6e688cf51 100644 --- a/channeldb/payment_control.go +++ b/channeldb/payment_control.go @@ -424,7 +424,9 @@ func (p *PaymentControl) RegisterAttempt(paymentHash lntypes.Hash, // Ensure we aren't sending more than the total payment amount. sentAmt, _ := payment.SentAmt() if sentAmt+amt > payment.Info.Value { - return ErrValueExceedsAmt + return fmt.Errorf("%w: attempted=%v, payment amount="+ + "%v", ErrValueExceedsAmt, sentAmt+amt, + payment.Info.Value) } htlcsBucket, err := bucket.CreateBucketIfNotExists( diff --git a/channeldb/payment_control_test.go b/channeldb/payment_control_test.go index 5b394edae..7751ea367 100644 --- a/channeldb/payment_control_test.go +++ b/channeldb/payment_control_test.go @@ -734,10 +734,7 @@ func TestPaymentControlMultiShard(t *testing.T) { b := *attempt b.AttemptID = 3 _, err = pControl.RegisterAttempt(info.PaymentIdentifier, &b) - if err != ErrValueExceedsAmt { - t.Fatalf("expected ErrValueExceedsAmt, got: %v", - err) - } + require.ErrorIs(t, err, ErrValueExceedsAmt) // Fail the second attempt. a := attempts[1] diff --git a/docs/release-notes/release-notes-0.18.3.md b/docs/release-notes/release-notes-0.18.3.md index dad3e6cb0..a1a3209c0 100644 --- a/docs/release-notes/release-notes-0.18.3.md +++ b/docs/release-notes/release-notes-0.18.3.md @@ -48,6 +48,15 @@ bumping an anchor channel closing was not possible when no HTLCs were on the commitment when the channel was force closed. +* [Fixed](https://github.com/lightningnetwork/lnd/pull/8174) old payments that + are stuck inflight. Though the root cause is unknown, it's possible the + network result for a given HTLC attempt was not saved, which is now fixed. + Check + [here](https://github.com/lightningnetwork/lnd/pull/8174#issue-1992055103) + for the details about the analysis, and + [here](https://github.com/lightningnetwork/lnd/issues/8146) for a summary of + the issue. + # New Features ## Functional Enhancements ## RPC Additions diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 7b7688d9c..337ce636c 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -3297,7 +3297,7 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg, return } - l.log.Debugf("settle-fail-filter %v", fwdPkg.SettleFailFilter) + l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter) var switchPackets []*htlcPacket for i, pd := range settleFails { diff --git a/htlcswitch/payment_result.go b/htlcswitch/payment_result.go index cd982b8bb..db959e2d1 100644 --- a/htlcswitch/payment_result.go +++ b/htlcswitch/payment_result.go @@ -90,32 +90,32 @@ type networkResultStore struct { results map[uint64][]chan *networkResult resultsMtx sync.Mutex - // paymentIDMtx is a multimutex used to make sure the database and - // result subscribers map is consistent for each payment ID in case of + // attemptIDMtx is a multimutex used to make sure the database and + // result subscribers map is consistent for each attempt ID in case of // concurrent callers. - paymentIDMtx *multimutex.Mutex[uint64] + attemptIDMtx *multimutex.Mutex[uint64] } func newNetworkResultStore(db kvdb.Backend) *networkResultStore { return &networkResultStore{ backend: db, results: make(map[uint64][]chan *networkResult), - paymentIDMtx: multimutex.NewMutex[uint64](), + attemptIDMtx: multimutex.NewMutex[uint64](), } } -// storeResult stores the networkResult for the given paymentID, and -// notifies any subscribers. -func (store *networkResultStore) storeResult(paymentID uint64, +// storeResult stores the networkResult for the given attemptID, and notifies +// any subscribers. +func (store *networkResultStore) storeResult(attemptID uint64, result *networkResult) error { - // We get a mutex for this payment ID. This is needed to ensure + // We get a mutex for this attempt ID. This is needed to ensure // consistency between the database state and the subscribers in case // of concurrent calls. - store.paymentIDMtx.Lock(paymentID) - defer store.paymentIDMtx.Unlock(paymentID) + store.attemptIDMtx.Lock(attemptID) + defer store.attemptIDMtx.Unlock(attemptID) - log.Debugf("Storing result for paymentID=%v", paymentID) + log.Debugf("Storing result for attemptID=%v", attemptID) // Serialize the payment result. var b bytes.Buffer @@ -123,8 +123,8 @@ func (store *networkResultStore) storeResult(paymentID uint64, return err } - var paymentIDBytes [8]byte - binary.BigEndian.PutUint64(paymentIDBytes[:], paymentID) + var attemptIDBytes [8]byte + binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID) err := kvdb.Batch(store.backend, func(tx kvdb.RwTx) error { networkResults, err := tx.CreateTopLevelBucket( @@ -134,7 +134,7 @@ func (store *networkResultStore) storeResult(paymentID uint64, return err } - return networkResults.Put(paymentIDBytes[:], b.Bytes()) + return networkResults.Put(attemptIDBytes[:], b.Bytes()) }) if err != nil { return err @@ -143,28 +143,27 @@ func (store *networkResultStore) storeResult(paymentID uint64, // Now that the result is stored in the database, we can notify any // active subscribers. store.resultsMtx.Lock() - for _, res := range store.results[paymentID] { + for _, res := range store.results[attemptID] { res <- result } - delete(store.results, paymentID) + delete(store.results, attemptID) store.resultsMtx.Unlock() return nil } -// subscribeResult is used to get the payment result for the given -// payment ID. It returns a channel on which the result will be delivered when -// ready. -func (store *networkResultStore) subscribeResult(paymentID uint64) ( +// subscribeResult is used to get the HTLC attempt result for the given attempt +// ID. It returns a channel on which the result will be delivered when ready. +func (store *networkResultStore) subscribeResult(attemptID uint64) ( <-chan *networkResult, error) { // We get a mutex for this payment ID. This is needed to ensure // consistency between the database state and the subscribers in case // of concurrent calls. - store.paymentIDMtx.Lock(paymentID) - defer store.paymentIDMtx.Unlock(paymentID) + store.attemptIDMtx.Lock(attemptID) + defer store.attemptIDMtx.Unlock(attemptID) - log.Debugf("Subscribing to result for paymentID=%v", paymentID) + log.Debugf("Subscribing to result for attemptID=%v", attemptID) var ( result *networkResult @@ -173,7 +172,7 @@ func (store *networkResultStore) subscribeResult(paymentID uint64) ( err := kvdb.View(store.backend, func(tx kvdb.RTx) error { var err error - result, err = fetchResult(tx, paymentID) + result, err = fetchResult(tx, attemptID) switch { // Result not yet available, we will notify once a result is @@ -205,8 +204,8 @@ func (store *networkResultStore) subscribeResult(paymentID uint64) ( // Otherwise we store the result channel for when the result is // available. store.resultsMtx.Lock() - store.results[paymentID] = append( - store.results[paymentID], resultChan, + store.results[attemptID] = append( + store.results[attemptID], resultChan, ) store.resultsMtx.Unlock() @@ -234,8 +233,8 @@ func (store *networkResultStore) getResult(pid uint64) ( } func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) { - var paymentIDBytes [8]byte - binary.BigEndian.PutUint64(paymentIDBytes[:], pid) + var attemptIDBytes [8]byte + binary.BigEndian.PutUint64(attemptIDBytes[:], pid) networkResults := tx.ReadBucket(networkResultStoreBucketKey) if networkResults == nil { @@ -243,7 +242,7 @@ func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) { } // Check whether a result is already available. - resultBytes := networkResults.Get(paymentIDBytes[:]) + resultBytes := networkResults.Get(attemptIDBytes[:]) if resultBytes == nil { return nil, ErrPaymentIDNotFound } diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 793da57db..efd469f78 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -431,11 +431,26 @@ func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) erro } } -// GetAttemptResult returns the result of the payment attempt with the given +// HasAttemptResult reads the network result store to fetch the specified +// attempt. Returns true if the attempt result exists. +func (s *Switch) HasAttemptResult(attemptID uint64) (bool, error) { + _, err := s.networkResults.getResult(attemptID) + if err == nil { + return true, nil + } + + if !errors.Is(err, ErrPaymentIDNotFound) { + return false, err + } + + return false, nil +} + +// GetAttemptResult returns the result of the HTLC attempt with the given // attemptID. The paymentHash should be set to the payment's overall hash, or // in case of AMP payments the payment's unique identifier. // -// The method returns a channel where the payment result will be sent when +// The method returns a channel where the HTLC attempt result will be sent when // available, or an error is encountered during forwarding. When a result is // received on the channel, the HTLC is guaranteed to no longer be in flight. // The switch shutting down is signaled by closing the channel. If the @@ -452,9 +467,9 @@ func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash, } ) - // If the payment is not found in the circuit map, check whether a - // result is already available. - // Assumption: no one will add this payment ID other than the caller. + // If the HTLC is not found in the circuit map, check whether a result + // is already available. + // Assumption: no one will add this attempt ID other than the caller. if s.circuits.LookupCircuit(inKey) == nil { res, err := s.networkResults.getResult(attemptID) if err != nil { @@ -464,7 +479,7 @@ func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash, c <- res nChan = c } else { - // The payment was committed to the circuits, subscribe for a + // The HTLC was committed to the circuits, subscribe for a // result. nChan, err = s.networkResults.subscribeResult(attemptID) if err != nil { @@ -474,7 +489,7 @@ func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash, resultChan := make(chan *PaymentResult, 1) - // Since the payment was known, we can start a goroutine that can + // Since the attempt was known, we can start a goroutine that can // extract the result when it is available, and pass it on to the // caller. s.wg.Add(1) @@ -939,7 +954,7 @@ func (s *Switch) handleLocalResponse(pkt *htlcPacket) { // Store the result to the db. This will also notify subscribers about // the result. if err := s.networkResults.storeResult(attemptID, n); err != nil { - log.Errorf("Unable to complete payment for pid=%v: %v", + log.Errorf("Unable to store attempt result for pid=%v: %v", attemptID, err) return } @@ -967,7 +982,7 @@ func (s *Switch) handleLocalResponse(pkt *htlcPacket) { // This can only happen if the circuit is still open, which is why this // ordering is chosen. if err := s.teardownCircuit(pkt); err != nil { - log.Warnf("Unable to teardown circuit %s: %v", + log.Errorf("Unable to teardown circuit %s: %v", pkt.inKey(), err) return } @@ -1099,296 +1114,26 @@ func (s *Switch) parseFailedPayment(deobfuscator ErrorDecrypter, // updates back. This behaviour is achieved by creation of payment circuits. func (s *Switch) handlePacketForward(packet *htlcPacket) error { switch htlc := packet.htlc.(type) { - // Channel link forwarded us a new htlc, therefore we initiate the // payment circuit within our internal state so we can properly forward // the ultimate settle message back latter. case *lnwire.UpdateAddHTLC: - // Check if the node is set to reject all onward HTLCs and also make - // sure that HTLC is not from the source node. - if s.cfg.RejectHTLC { - failure := NewDetailedLinkError( - &lnwire.FailChannelDisabled{}, - OutgoingFailureForwardsDisabled, - ) + return s.handlePacketAdd(packet, htlc) - return s.failAddPacket(packet, failure) - } + case *lnwire.UpdateFulfillHTLC: + return s.handlePacketSettle(packet) - // Before we attempt to find a non-strict forwarding path for - // this htlc, check whether the htlc is being routed over the - // same incoming and outgoing channel. If our node does not - // allow forwards of this nature, we fail the htlc early. This - // check is in place to disallow inefficiently routed htlcs from - // locking up our balance. With channels where the - // option-scid-alias feature was negotiated, we also have to be - // sure that the IDs aren't the same since one or both could be - // an alias. - linkErr := s.checkCircularForward( - packet.incomingChanID, packet.outgoingChanID, - s.cfg.AllowCircularRoute, htlc.PaymentHash, - ) - if linkErr != nil { - return s.failAddPacket(packet, linkErr) - } - - s.indexMtx.RLock() - targetLink, err := s.getLinkByMapping(packet) - if err != nil { - s.indexMtx.RUnlock() - - log.Debugf("unable to find link with "+ - "destination %v", packet.outgoingChanID) - - // If packet was forwarded from another channel link - // than we should notify this link that some error - // occurred. - linkError := NewLinkError( - &lnwire.FailUnknownNextPeer{}, - ) - - return s.failAddPacket(packet, linkError) - } - targetPeerKey := targetLink.PeerPubKey() - interfaceLinks, _ := s.getLinks(targetPeerKey) - s.indexMtx.RUnlock() - - // We'll keep track of any HTLC failures during the link - // selection process. This way we can return the error for - // precise link that the sender selected, while optimistically - // trying all links to utilize our available bandwidth. - linkErrs := make(map[lnwire.ShortChannelID]*LinkError) - - // Find all destination channel links with appropriate - // bandwidth. - var destinations []ChannelLink - for _, link := range interfaceLinks { - var failure *LinkError - - // We'll skip any links that aren't yet eligible for - // forwarding. - if !link.EligibleToForward() { - failure = NewDetailedLinkError( - &lnwire.FailUnknownNextPeer{}, - OutgoingFailureLinkNotEligible, - ) - } else { - // We'll ensure that the HTLC satisfies the - // current forwarding conditions of this target - // link. - currentHeight := atomic.LoadUint32(&s.bestHeight) - failure = link.CheckHtlcForward( - htlc.PaymentHash, packet.incomingAmount, - packet.amount, packet.incomingTimeout, - packet.outgoingTimeout, - packet.inboundFee, - currentHeight, - packet.originalOutgoingChanID, - ) - } - - // If this link can forward the htlc, add it to the set - // of destinations. - if failure == nil { - destinations = append(destinations, link) - continue - } - - linkErrs[link.ShortChanID()] = failure - } - - // If we had a forwarding failure due to the HTLC not - // satisfying the current policy, then we'll send back an - // error, but ensure we send back the error sourced at the - // *target* link. - if len(destinations) == 0 { - // At this point, some or all of the links rejected the - // HTLC so we couldn't forward it. So we'll try to look - // up the error that came from the source. - linkErr, ok := linkErrs[packet.outgoingChanID] - if !ok { - // If we can't find the error of the source, - // then we'll return an unknown next peer, - // though this should never happen. - linkErr = NewLinkError( - &lnwire.FailUnknownNextPeer{}, - ) - log.Warnf("unable to find err source for "+ - "outgoing_link=%v, errors=%v", - packet.outgoingChanID, - lnutils.SpewLogClosure(linkErrs)) - } - - log.Tracef("incoming HTLC(%x) violated "+ - "target outgoing link (id=%v) policy: %v", - htlc.PaymentHash[:], packet.outgoingChanID, - linkErr) - - return s.failAddPacket(packet, linkErr) - } - - // Choose a random link out of the set of links that can forward - // this htlc. The reason for randomization is to evenly - // distribute the htlc load without making assumptions about - // what the best channel is. - destination := destinations[rand.Intn(len(destinations))] // nolint:gosec - - // Retrieve the incoming link by its ShortChannelID. Note that - // the incomingChanID is never set to hop.Source here. - s.indexMtx.RLock() - incomingLink, err := s.getLinkByShortID(packet.incomingChanID) - s.indexMtx.RUnlock() - if err != nil { - // If we couldn't find the incoming link, we can't - // evaluate the incoming's exposure to dust, so we just - // fail the HTLC back. - linkErr := NewLinkError( - &lnwire.FailTemporaryChannelFailure{}, - ) - - return s.failAddPacket(packet, linkErr) - } - - // Evaluate whether this HTLC would increase our fee exposure - // over the threshold on the incoming link. If it does, fail it - // backwards. - if s.dustExceedsFeeThreshold( - incomingLink, packet.incomingAmount, true, - ) { - // The incoming dust exceeds the threshold, so we fail - // the add back. - linkErr := NewLinkError( - &lnwire.FailTemporaryChannelFailure{}, - ) - - return s.failAddPacket(packet, linkErr) - } - - // Also evaluate whether this HTLC would increase our fee - // exposure over the threshold on the destination link. If it - // does, fail it back. - if s.dustExceedsFeeThreshold( - destination, packet.amount, false, - ) { - // The outgoing dust exceeds the threshold, so we fail - // the add back. - linkErr := NewLinkError( - &lnwire.FailTemporaryChannelFailure{}, - ) - - return s.failAddPacket(packet, linkErr) - } - - // Send the packet to the destination channel link which - // manages the channel. - packet.outgoingChanID = destination.ShortChanID() - return destination.handleSwitchPacket(packet) - - case *lnwire.UpdateFailHTLC, *lnwire.UpdateFulfillHTLC: - // If the source of this packet has not been set, use the - // circuit map to lookup the origin. - circuit, err := s.closeCircuit(packet) - if err != nil { - return err - } - - // closeCircuit returns a nil circuit when a settle packet returns an - // ErrUnknownCircuit error upon the inner call to CloseCircuit. - if circuit == nil { - return nil - } - - fail, isFail := htlc.(*lnwire.UpdateFailHTLC) - if isFail && !packet.hasSource { - // HTLC resolutions and messages restored from disk - // don't have the obfuscator set from the original htlc - // add packet - set it here for use in blinded errors. - packet.obfuscator = circuit.ErrorEncrypter - - switch { - // No message to encrypt, locally sourced payment. - case circuit.ErrorEncrypter == nil: - - // If this is a resolution message, then we'll need to - // encrypt it as it's actually internally sourced. - case packet.isResolution: - var err error - // TODO(roasbeef): don't need to pass actually? - failure := &lnwire.FailPermanentChannelFailure{} - fail.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop( - failure, - ) - if err != nil { - err = fmt.Errorf("unable to obfuscate "+ - "error: %v", err) - log.Error(err) - } - - // Alternatively, if the remote party send us an - // UpdateFailMalformedHTLC, then we'll need to convert - // this into a proper well formatted onion error as - // there's no HMAC currently. - case packet.convertedError: - log.Infof("Converting malformed HTLC error "+ - "for circuit for Circuit(%x: "+ - "(%s, %d) <-> (%s, %d))", packet.circuit.PaymentHash, - packet.incomingChanID, packet.incomingHTLCID, - packet.outgoingChanID, packet.outgoingHTLCID) - - fail.Reason = circuit.ErrorEncrypter.EncryptMalformedError( - fail.Reason, - ) - - default: - // Otherwise, it's a forwarded error, so we'll perform a - // wrapper encryption as normal. - fail.Reason = circuit.ErrorEncrypter.IntermediateEncrypt( - fail.Reason, - ) - } - } else if !isFail && circuit.Outgoing != nil { - // If this is an HTLC settle, and it wasn't from a - // locally initiated HTLC, then we'll log a forwarding - // event so we can flush it to disk later. - // - // TODO(roasbeef): only do this once link actually - // fully settles? - localHTLC := packet.incomingChanID == hop.Source - if !localHTLC { - log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+ - "from IncomingChanID(%v) to OutgoingChanID(%v)", - circuit.PaymentHash[:], circuit.OutgoingAmount, - circuit.IncomingAmount-circuit.OutgoingAmount, - circuit.Incoming.ChanID, circuit.Outgoing.ChanID) - s.fwdEventMtx.Lock() - s.pendingFwdingEvents = append( - s.pendingFwdingEvents, - channeldb.ForwardingEvent{ - Timestamp: time.Now(), - IncomingChanID: circuit.Incoming.ChanID, - OutgoingChanID: circuit.Outgoing.ChanID, - AmtIn: circuit.IncomingAmount, - AmtOut: circuit.OutgoingAmount, - }, - ) - s.fwdEventMtx.Unlock() - } - } - - // A blank IncomingChanID in a circuit indicates that it is a pending - // user-initiated payment. - if packet.incomingChanID == hop.Source { - s.wg.Add(1) - go s.handleLocalResponse(packet) - return nil - } - - // Check to see that the source link is online before removing - // the circuit. - return s.mailOrchestrator.Deliver(packet.incomingChanID, packet) + // Channel link forwarded us an update_fail_htlc message. + // + // NOTE: when the channel link receives an update_fail_malformed_htlc + // from upstream, it will convert the message into update_fail_htlc and + // forward it. Thus there's no need to catch `UpdateFailMalformedHTLC` + // here. + case *lnwire.UpdateFailHTLC: + return s.handlePacketFail(packet, htlc) default: - return errors.New("wrong update type") + return fmt.Errorf("wrong update type: %T", htlc) } } @@ -1629,47 +1374,34 @@ func (s *Switch) teardownCircuit(pkt *htlcPacket) error { case *lnwire.UpdateFailHTLC: pktType = "FAIL" default: - err := fmt.Errorf("cannot tear down packet of type: %T", htlc) - log.Errorf(err.Error()) + return fmt.Errorf("cannot tear down packet of type: %T", htlc) + } + + var paymentHash lntypes.Hash + + // Perform a defensive check to make sure we don't try to access a nil + // circuit. + circuit := pkt.circuit + if circuit != nil { + copy(paymentHash[:], circuit.PaymentHash[:]) + } + + log.Debugf("Tearing down circuit with %s pkt, removing circuit=%v "+ + "with keystone=%v", pktType, pkt.inKey(), pkt.outKey()) + + err := s.circuits.DeleteCircuits(pkt.inKey()) + if err != nil { + log.Warnf("Failed to tear down circuit (%s, %d) <-> (%s, %d) "+ + "with payment_hash=%v using %s pkt", pkt.incomingChanID, + pkt.incomingHTLCID, pkt.outgoingChanID, + pkt.outgoingHTLCID, pkt.circuit.PaymentHash, pktType) + return err } - switch { - case pkt.circuit.HasKeystone(): - log.Debugf("Tearing down open circuit with %s pkt, removing circuit=%v "+ - "with keystone=%v", pktType, pkt.inKey(), pkt.outKey()) - - err := s.circuits.DeleteCircuits(pkt.inKey()) - if err != nil { - log.Warnf("Failed to tear down open circuit (%s, %d) <-> (%s, %d) "+ - "with payment_hash-%v using %s pkt", - pkt.incomingChanID, pkt.incomingHTLCID, - pkt.outgoingChanID, pkt.outgoingHTLCID, - pkt.circuit.PaymentHash, pktType) - return err - } - - log.Debugf("Closed completed %s circuit for %x: "+ - "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash, - pkt.incomingChanID, pkt.incomingHTLCID, - pkt.outgoingChanID, pkt.outgoingHTLCID) - - default: - log.Debugf("Tearing down incomplete circuit with %s for inkey=%v", - pktType, pkt.inKey()) - - err := s.circuits.DeleteCircuits(pkt.inKey()) - if err != nil { - log.Warnf("Failed to tear down pending %s circuit for %x: "+ - "(%s, %d)", pktType, pkt.circuit.PaymentHash, - pkt.incomingChanID, pkt.incomingHTLCID) - return err - } - - log.Debugf("Removed pending onion circuit for %x: "+ - "(%s, %d)", pkt.circuit.PaymentHash, - pkt.incomingChanID, pkt.incomingHTLCID) - } + log.Debugf("Closed %s circuit for %v: (%s, %d) <-> (%s, %d)", pktType, + paymentHash, pkt.incomingChanID, pkt.incomingHTLCID, + pkt.outgoingChanID, pkt.outgoingHTLCID) return nil } @@ -3052,3 +2784,339 @@ func (s *Switch) AddAliasForLink(chanID lnwire.ChannelID, return nil } + +// handlePacketAdd handles forwarding an Add packet. +func (s *Switch) handlePacketAdd(packet *htlcPacket, + htlc *lnwire.UpdateAddHTLC) error { + + // Check if the node is set to reject all onward HTLCs and also make + // sure that HTLC is not from the source node. + if s.cfg.RejectHTLC { + failure := NewDetailedLinkError( + &lnwire.FailChannelDisabled{}, + OutgoingFailureForwardsDisabled, + ) + + return s.failAddPacket(packet, failure) + } + + // Before we attempt to find a non-strict forwarding path for this + // htlc, check whether the htlc is being routed over the same incoming + // and outgoing channel. If our node does not allow forwards of this + // nature, we fail the htlc early. This check is in place to disallow + // inefficiently routed htlcs from locking up our balance. With + // channels where the option-scid-alias feature was negotiated, we also + // have to be sure that the IDs aren't the same since one or both could + // be an alias. + linkErr := s.checkCircularForward( + packet.incomingChanID, packet.outgoingChanID, + s.cfg.AllowCircularRoute, htlc.PaymentHash, + ) + if linkErr != nil { + return s.failAddPacket(packet, linkErr) + } + + s.indexMtx.RLock() + targetLink, err := s.getLinkByMapping(packet) + if err != nil { + s.indexMtx.RUnlock() + + log.Debugf("unable to find link with "+ + "destination %v", packet.outgoingChanID) + + // If packet was forwarded from another channel link than we + // should notify this link that some error occurred. + linkError := NewLinkError( + &lnwire.FailUnknownNextPeer{}, + ) + + return s.failAddPacket(packet, linkError) + } + targetPeerKey := targetLink.PeerPubKey() + interfaceLinks, _ := s.getLinks(targetPeerKey) + s.indexMtx.RUnlock() + + // We'll keep track of any HTLC failures during the link selection + // process. This way we can return the error for precise link that the + // sender selected, while optimistically trying all links to utilize + // our available bandwidth. + linkErrs := make(map[lnwire.ShortChannelID]*LinkError) + + // Find all destination channel links with appropriate bandwidth. + var destinations []ChannelLink + for _, link := range interfaceLinks { + var failure *LinkError + + // We'll skip any links that aren't yet eligible for + // forwarding. + if !link.EligibleToForward() { + failure = NewDetailedLinkError( + &lnwire.FailUnknownNextPeer{}, + OutgoingFailureLinkNotEligible, + ) + } else { + // We'll ensure that the HTLC satisfies the current + // forwarding conditions of this target link. + currentHeight := atomic.LoadUint32(&s.bestHeight) + failure = link.CheckHtlcForward( + htlc.PaymentHash, packet.incomingAmount, + packet.amount, packet.incomingTimeout, + packet.outgoingTimeout, + packet.inboundFee, + currentHeight, + packet.originalOutgoingChanID, + ) + } + + // If this link can forward the htlc, add it to the set of + // destinations. + if failure == nil { + destinations = append(destinations, link) + continue + } + + linkErrs[link.ShortChanID()] = failure + } + + // If we had a forwarding failure due to the HTLC not satisfying the + // current policy, then we'll send back an error, but ensure we send + // back the error sourced at the *target* link. + if len(destinations) == 0 { + // At this point, some or all of the links rejected the HTLC so + // we couldn't forward it. So we'll try to look up the error + // that came from the source. + linkErr, ok := linkErrs[packet.outgoingChanID] + if !ok { + // If we can't find the error of the source, then we'll + // return an unknown next peer, though this should + // never happen. + linkErr = NewLinkError( + &lnwire.FailUnknownNextPeer{}, + ) + log.Warnf("unable to find err source for "+ + "outgoing_link=%v, errors=%v", + packet.outgoingChanID, + lnutils.SpewLogClosure(linkErrs)) + } + + log.Tracef("incoming HTLC(%x) violated "+ + "target outgoing link (id=%v) policy: %v", + htlc.PaymentHash[:], packet.outgoingChanID, + linkErr) + + return s.failAddPacket(packet, linkErr) + } + + // Choose a random link out of the set of links that can forward this + // htlc. The reason for randomization is to evenly distribute the htlc + // load without making assumptions about what the best channel is. + //nolint:gosec + destination := destinations[rand.Intn(len(destinations))] + + // Retrieve the incoming link by its ShortChannelID. Note that the + // incomingChanID is never set to hop.Source here. + s.indexMtx.RLock() + incomingLink, err := s.getLinkByShortID(packet.incomingChanID) + s.indexMtx.RUnlock() + if err != nil { + // If we couldn't find the incoming link, we can't evaluate the + // incoming's exposure to dust, so we just fail the HTLC back. + linkErr := NewLinkError( + &lnwire.FailTemporaryChannelFailure{}, + ) + + return s.failAddPacket(packet, linkErr) + } + + // Evaluate whether this HTLC would increase our fee exposure over the + // threshold on the incoming link. If it does, fail it backwards. + if s.dustExceedsFeeThreshold( + incomingLink, packet.incomingAmount, true, + ) { + // The incoming dust exceeds the threshold, so we fail the add + // back. + linkErr := NewLinkError( + &lnwire.FailTemporaryChannelFailure{}, + ) + + return s.failAddPacket(packet, linkErr) + } + + // Also evaluate whether this HTLC would increase our fee exposure over + // the threshold on the destination link. If it does, fail it back. + if s.dustExceedsFeeThreshold( + destination, packet.amount, false, + ) { + // The outgoing dust exceeds the threshold, so we fail the add + // back. + linkErr := NewLinkError( + &lnwire.FailTemporaryChannelFailure{}, + ) + + return s.failAddPacket(packet, linkErr) + } + + // Send the packet to the destination channel link which manages the + // channel. + packet.outgoingChanID = destination.ShortChanID() + + return destination.handleSwitchPacket(packet) +} + +// handlePacketSettle handles forwarding a settle packet. +func (s *Switch) handlePacketSettle(packet *htlcPacket) error { + // If the source of this packet has not been set, use the circuit map + // to lookup the origin. + circuit, err := s.closeCircuit(packet) + if err != nil { + return err + } + + // closeCircuit returns a nil circuit when a settle packet returns an + // ErrUnknownCircuit error upon the inner call to CloseCircuit. + // + // NOTE: We can only get a nil circuit when it has already been deleted + // and when `UpdateFulfillHTLC` is received. After which `RevokeAndAck` + // is received, which invokes `processRemoteSettleFails` in its link. + if circuit == nil { + log.Debugf("Found nil circuit: packet=%v", spew.Sdump(packet)) + return nil + } + + localHTLC := packet.incomingChanID == hop.Source + + // If this is a locally initiated HTLC, we need to handle the packet by + // storing the network result. + // + // A blank IncomingChanID in a circuit indicates that it is a pending + // user-initiated payment. + // + // NOTE: `closeCircuit` modifies the state of `packet`. + if localHTLC { + // TODO(yy): remove the goroutine and send back the error here. + s.wg.Add(1) + go s.handleLocalResponse(packet) + + // If this is a locally initiated HTLC, there's no need to + // forward it so we exit. + return nil + } + + // If this is an HTLC settle, and it wasn't from a locally initiated + // HTLC, then we'll log a forwarding event so we can flush it to disk + // later. + if circuit.Outgoing != nil { + log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+ + "from IncomingChanID(%v) to OutgoingChanID(%v)", + circuit.PaymentHash[:], circuit.OutgoingAmount, + circuit.IncomingAmount-circuit.OutgoingAmount, + circuit.Incoming.ChanID, circuit.Outgoing.ChanID) + + s.fwdEventMtx.Lock() + s.pendingFwdingEvents = append( + s.pendingFwdingEvents, + channeldb.ForwardingEvent{ + Timestamp: time.Now(), + IncomingChanID: circuit.Incoming.ChanID, + OutgoingChanID: circuit.Outgoing.ChanID, + AmtIn: circuit.IncomingAmount, + AmtOut: circuit.OutgoingAmount, + }, + ) + s.fwdEventMtx.Unlock() + } + + // Deliver this packet. + return s.mailOrchestrator.Deliver(packet.incomingChanID, packet) +} + +// handlePacketFail handles forwarding a fail packet. +func (s *Switch) handlePacketFail(packet *htlcPacket, + htlc *lnwire.UpdateFailHTLC) error { + + // If the source of this packet has not been set, use the circuit map + // to lookup the origin. + circuit, err := s.closeCircuit(packet) + if err != nil { + return err + } + + // If this is a locally initiated HTLC, we need to handle the packet by + // storing the network result. + // + // A blank IncomingChanID in a circuit indicates that it is a pending + // user-initiated payment. + // + // NOTE: `closeCircuit` modifies the state of `packet`. + if packet.incomingChanID == hop.Source { + // TODO(yy): remove the goroutine and send back the error here. + s.wg.Add(1) + go s.handleLocalResponse(packet) + + // If this is a locally initiated HTLC, there's no need to + // forward it so we exit. + return nil + } + + // Exit early if this hasSource is true. This flag is only set via + // mailbox's `FailAdd`. This method has two callsites, + // - the packet has timed out after `MailboxDeliveryTimeout`, defaults + // to 1 min. + // - the HTLC fails the validation in `channel.AddHTLC`. + // In either case, the `Reason` field is populated. Thus there's no + // need to proceed and extract the failure reason below. + if packet.hasSource { + // Deliver this packet. + return s.mailOrchestrator.Deliver(packet.incomingChanID, packet) + } + + // HTLC resolutions and messages restored from disk don't have the + // obfuscator set from the original htlc add packet - set it here for + // use in blinded errors. + packet.obfuscator = circuit.ErrorEncrypter + + switch { + // No message to encrypt, locally sourced payment. + case circuit.ErrorEncrypter == nil: + // TODO(yy) further check this case as we shouldn't end up here + // as `isLocal` is already false. + + // If this is a resolution message, then we'll need to encrypt it as + // it's actually internally sourced. + case packet.isResolution: + var err error + // TODO(roasbeef): don't need to pass actually? + failure := &lnwire.FailPermanentChannelFailure{} + htlc.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop( + failure, + ) + if err != nil { + err = fmt.Errorf("unable to obfuscate error: %w", err) + log.Error(err) + } + + // Alternatively, if the remote party sends us an + // UpdateFailMalformedHTLC, then we'll need to convert this into a + // proper well formatted onion error as there's no HMAC currently. + case packet.convertedError: + log.Infof("Converting malformed HTLC error for circuit for "+ + "Circuit(%x: (%s, %d) <-> (%s, %d))", + packet.circuit.PaymentHash, + packet.incomingChanID, packet.incomingHTLCID, + packet.outgoingChanID, packet.outgoingHTLCID) + + htlc.Reason = circuit.ErrorEncrypter.EncryptMalformedError( + htlc.Reason, + ) + + default: + // Otherwise, it's a forwarded error, so we'll perform a + // wrapper encryption as normal. + htlc.Reason = circuit.ErrorEncrypter.IntermediateEncrypt( + htlc.Reason, + ) + } + + // Deliver this packet. + return s.mailOrchestrator.Deliver(packet.incomingChanID, packet) +} diff --git a/itest/list_on_test.go b/itest/list_on_test.go index 063460d21..8fa4352b2 100644 --- a/itest/list_on_test.go +++ b/itest/list_on_test.go @@ -662,4 +662,16 @@ var allTestCases = []*lntest.TestCase{ Name: "coop close with external delivery", TestFunc: testCoopCloseWithExternalDelivery, }, + { + Name: "payment failed htlc local swept", + TestFunc: testPaymentFailedHTLCLocalSwept, + }, + { + Name: "payment succeeded htlc remote swept", + TestFunc: testPaymentSucceededHTLCRemoteSwept, + }, + { + Name: "send to route failed htlc timeout", + TestFunc: testSendToRouteFailHTLCTimeout, + }, } diff --git a/itest/lnd_payment_test.go b/itest/lnd_payment_test.go index cdd163605..317ca02ed 100644 --- a/itest/lnd_payment_test.go +++ b/itest/lnd_payment_test.go @@ -10,19 +10,331 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/lightningnetwork/lnd/input" + "github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc" "github.com/lightningnetwork/lnd/lnrpc/routerrpc" "github.com/lightningnetwork/lnd/lntest" "github.com/lightningnetwork/lnd/lntest/node" "github.com/lightningnetwork/lnd/lntest/rpc" "github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/lntypes" + "github.com/lightningnetwork/lnd/lnwire" "github.com/stretchr/testify/require" ) -// testSendDirectPayment creates a topology Alice->Bob and then tests that Alice -// can send a direct payment to Bob. This test modifies the fee estimator to -// return floor fee rate(1 sat/vb). +// testPaymentSucceededHTLCRemoteSwept checks that when an outgoing HTLC is +// timed out and is swept by the remote via the direct preimage spend path, the +// payment will be marked as succeeded. This test creates a topology from Alice +// -> Bob, and let Alice send payments to Bob. Bob then goes offline, such that +// Alice's outgoing HTLC will time out. Once the force close transaction is +// broadcast by Alice, she then goes offline and Bob comes back online to take +// her outgoing HTLC. And Alice should mark this payment as succeeded after she +// comes back online again. +func testPaymentSucceededHTLCRemoteSwept(ht *lntest.HarnessTest) { + // Set the feerate to be 10 sat/vb. + ht.SetFeeEstimate(2500) + + // Open a channel with 100k satoshis between Alice and Bob with Alice + // being the sole funder of the channel. + chanAmt := btcutil.Amount(100_000) + openChannelParams := lntest.OpenChannelParams{ + Amt: chanAmt, + } + + // Create a two hop network: Alice -> Bob. + chanPoints, nodes := createSimpleNetwork(ht, nil, 2, openChannelParams) + chanPoint := chanPoints[0] + alice, bob := nodes[0], nodes[1] + + // We now create two payments, one above dust and the other below dust, + // and we should see different behavior in terms of when the payment + // will be marked as failed due to the HTLC timeout. + // + // First, create random preimages. + preimage := ht.RandomPreimage() + dustPreimage := ht.RandomPreimage() + + // Get the preimage hashes. + payHash := preimage.Hash() + dustPayHash := dustPreimage.Hash() + + // Create an hold invoice for Bob which expects a payment of 10k + // satoshis from Alice. + const paymentAmt = 10_000 + req := &invoicesrpc.AddHoldInvoiceRequest{ + Value: paymentAmt, + Hash: payHash[:], + // Use a small CLTV value so we can mine fewer blocks. + CltvExpiry: finalCltvDelta, + } + invoice := bob.RPC.AddHoldInvoice(req) + + // Create another hold invoice for Bob which expects a payment of 1k + // satoshis from Alice. + const dustAmt = 1000 + req = &invoicesrpc.AddHoldInvoiceRequest{ + Value: dustAmt, + Hash: dustPayHash[:], + // Use a small CLTV value so we can mine fewer blocks. + CltvExpiry: finalCltvDelta, + } + dustInvoice := bob.RPC.AddHoldInvoice(req) + + // Alice now sends both payments to Bob. + payReq := &routerrpc.SendPaymentRequest{ + PaymentRequest: invoice.PaymentRequest, + TimeoutSeconds: 3600, + } + dustPayReq := &routerrpc.SendPaymentRequest{ + PaymentRequest: dustInvoice.PaymentRequest, + TimeoutSeconds: 3600, + } + + // We expect the payment to stay in-flight from both streams. + ht.SendPaymentAssertInflight(alice, payReq) + ht.SendPaymentAssertInflight(alice, dustPayReq) + + // We also check the payments are marked as IN_FLIGHT in Alice's + // database. + ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_IN_FLIGHT) + ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_IN_FLIGHT) + + // Bob should have two incoming HTLC. + ht.AssertIncomingHTLCActive(bob, chanPoint, payHash[:]) + ht.AssertIncomingHTLCActive(bob, chanPoint, dustPayHash[:]) + + // Alice should have two outgoing HTLCs. + ht.AssertOutgoingHTLCActive(alice, chanPoint, payHash[:]) + ht.AssertOutgoingHTLCActive(alice, chanPoint, dustPayHash[:]) + + // Let Bob go offline. + restartBob := ht.SuspendNode(bob) + + // Alice force closes the channel, which puts her commitment tx into + // the mempool. + ht.CloseChannelAssertPending(alice, chanPoint, true) + + // We now let Alice go offline to avoid her sweeping her outgoing htlc. + restartAlice := ht.SuspendNode(alice) + + // Mine one block to confirm Alice's force closing tx. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Restart Bob to settle the invoice and sweep the htlc output. + require.NoError(ht, restartBob()) + + // Bob now settles the invoices, since his link with Alice is broken, + // Alice won't know the preimages. + bob.RPC.SettleInvoice(preimage[:]) + bob.RPC.SettleInvoice(dustPreimage[:]) + + // Once Bob comes back up, he should find the force closing transaction + // from Alice and try to sweep the non-dust outgoing htlc via the + // direct preimage spend. + ht.AssertNumPendingSweeps(bob, 1) + + // Mine a block to trigger the sweep. + // + // TODO(yy): remove it once `blockbeat` is implemented. + ht.MineEmptyBlocks(1) + + // Mine Bob's sweeping tx. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Let Alice come back up. Since the channel is now closed, we expect + // different behaviors based on whether the HTLC is a dust. + // - For dust payment, it should be failed now as the HTLC won't go + // onchain. + // - For non-dust payment, it should be marked as succeeded since her + // outgoing htlc is swept by Bob. + require.NoError(ht, restartAlice()) + + // Since Alice is restarted, we need to track the payments again. + payStream := alice.RPC.TrackPaymentV2(payHash[:]) + dustPayStream := alice.RPC.TrackPaymentV2(dustPayHash[:]) + + // Check that the dust payment is failed in both the stream and DB. + ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_FAILED) + ht.AssertPaymentStatusFromStream(dustPayStream, lnrpc.Payment_FAILED) + + // We expect the non-dust payment to marked as succeeded in Alice's + // database and also from her stream. + ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_SUCCEEDED) + ht.AssertPaymentStatusFromStream(payStream, lnrpc.Payment_SUCCEEDED) +} + +// testPaymentFailedHTLCLocalSwept checks that when an outgoing HTLC is timed +// out and claimed onchain via the timeout path, the payment will be marked as +// failed. This test creates a topology from Alice -> Bob, and let Alice send +// payments to Bob. Bob then goes offline, such that Alice's outgoing HTLC will +// time out. Alice will also be restarted to make sure resumed payments are +// also marked as failed. +func testPaymentFailedHTLCLocalSwept(ht *lntest.HarnessTest) { + success := ht.Run("fail payment", func(t *testing.T) { + st := ht.Subtest(t) + runTestPaymentHTLCTimeout(st, false) + }) + if !success { + return + } + + ht.Run("fail resumed payment", func(t *testing.T) { + st := ht.Subtest(t) + runTestPaymentHTLCTimeout(st, true) + }) +} + +// runTestPaymentHTLCTimeout is the helper function that actually runs the +// testPaymentFailedHTLCLocalSwept. +func runTestPaymentHTLCTimeout(ht *lntest.HarnessTest, restartAlice bool) { + // Set the feerate to be 10 sat/vb. + ht.SetFeeEstimate(2500) + + // Open a channel with 100k satoshis between Alice and Bob with Alice + // being the sole funder of the channel. + chanAmt := btcutil.Amount(100_000) + openChannelParams := lntest.OpenChannelParams{ + Amt: chanAmt, + } + + // Create a two hop network: Alice -> Bob. + chanPoints, nodes := createSimpleNetwork(ht, nil, 2, openChannelParams) + chanPoint := chanPoints[0] + alice, bob := nodes[0], nodes[1] + + // We now create two payments, one above dust and the other below dust, + // and we should see different behavior in terms of when the payment + // will be marked as failed due to the HTLC timeout. + // + // First, create random preimages. + preimage := ht.RandomPreimage() + dustPreimage := ht.RandomPreimage() + + // Get the preimage hashes. + payHash := preimage.Hash() + dustPayHash := dustPreimage.Hash() + + // Create an hold invoice for Bob which expects a payment of 10k + // satoshis from Alice. + const paymentAmt = 20_000 + req := &invoicesrpc.AddHoldInvoiceRequest{ + Value: paymentAmt, + Hash: payHash[:], + // Use a small CLTV value so we can mine fewer blocks. + CltvExpiry: finalCltvDelta, + } + invoice := bob.RPC.AddHoldInvoice(req) + + // Create another hold invoice for Bob which expects a payment of 1k + // satoshis from Alice. + const dustAmt = 1000 + req = &invoicesrpc.AddHoldInvoiceRequest{ + Value: dustAmt, + Hash: dustPayHash[:], + // Use a small CLTV value so we can mine fewer blocks. + CltvExpiry: finalCltvDelta, + } + dustInvoice := bob.RPC.AddHoldInvoice(req) + + // Alice now sends both the payments to Bob. + payReq := &routerrpc.SendPaymentRequest{ + PaymentRequest: invoice.PaymentRequest, + TimeoutSeconds: 3600, + } + dustPayReq := &routerrpc.SendPaymentRequest{ + PaymentRequest: dustInvoice.PaymentRequest, + TimeoutSeconds: 3600, + } + + // We expect the payment to stay in-flight from both streams. + ht.SendPaymentAssertInflight(alice, payReq) + ht.SendPaymentAssertInflight(alice, dustPayReq) + + // We also check the payments are marked as IN_FLIGHT in Alice's + // database. + ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_IN_FLIGHT) + ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_IN_FLIGHT) + + // Bob should have two incoming HTLC. + ht.AssertIncomingHTLCActive(bob, chanPoint, payHash[:]) + ht.AssertIncomingHTLCActive(bob, chanPoint, dustPayHash[:]) + + // Alice should have two outgoing HTLCs. + ht.AssertOutgoingHTLCActive(alice, chanPoint, payHash[:]) + ht.AssertOutgoingHTLCActive(alice, chanPoint, dustPayHash[:]) + + // Let Bob go offline. + ht.Shutdown(bob) + + // We'll now mine enough blocks to trigger Alice to broadcast her + // commitment transaction due to the fact that the HTLC is about to + // timeout. With the default outgoing broadcast delta of zero, this + // will be the same height as the htlc expiry height. + numBlocks := padCLTV( + uint32(req.CltvExpiry - lncfg.DefaultOutgoingBroadcastDelta), + ) + ht.MineBlocks(int(numBlocks)) + + // Restart Alice if requested. + if restartAlice { + // Restart Alice to test the resumed payment is canceled. + ht.RestartNode(alice) + } + + // We now subscribe to the payment status. + payStream := alice.RPC.TrackPaymentV2(payHash[:]) + dustPayStream := alice.RPC.TrackPaymentV2(dustPayHash[:]) + + // Mine a block to confirm Alice's closing transaction. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Now the channel is closed, we expect different behaviors based on + // whether the HTLC is a dust. For dust payment, it should be failed + // now as the HTLC won't go onchain. For non-dust payment, it should + // still be inflight. It won't be marked as failed unless the outgoing + // HTLC is resolved onchain. + // + // NOTE: it's possible for Bob to race against Alice using the + // preimage path. If Bob successfully claims the HTLC, Alice should + // mark the non-dust payment as succeeded. + // + // Check that the dust payment is failed in both the stream and DB. + ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_FAILED) + ht.AssertPaymentStatusFromStream(dustPayStream, lnrpc.Payment_FAILED) + + // Check that the non-dust payment is still in-flight. + // + // NOTE: we don't check the payment status from the stream here as + // there's no new status being sent. + ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_IN_FLIGHT) + + // We now have two possible cases for the non-dust payment: + // - Bob stays offline, and Alice will sweep her outgoing HTLC, which + // makes the payment failed. + // - Bob comes back online, and claims the HTLC on Alice's commitment + // via direct preimage spend, hence racing against Alice onchain. If + // he succeeds, Alice should mark the payment as succeeded. + // + // TODO(yy): test the second case once we have the RPC to clean + // mempool. + + // Since Alice's force close transaction has been confirmed, she should + // sweep her outgoing HTLC in next block. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Cleanup the channel. + ht.CleanupForceClose(alice) + + // We expect the non-dust payment to marked as failed in Alice's + // database and also from her stream. + ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_FAILED) + ht.AssertPaymentStatusFromStream(payStream, lnrpc.Payment_FAILED) +} + +// testSendDirectPayment creates a topology Alice->Bob and then tests that +// Alice can send a direct payment to Bob. This test modifies the fee estimator +// to return floor fee rate(1 sat/vb). func testSendDirectPayment(ht *lntest.HarnessTest) { // Grab Alice and Bob's nodes for convenience. alice, bob := ht.Alice, ht.Bob @@ -900,3 +1212,219 @@ func sendPaymentInterceptAndCancel(ht *lntest.HarnessTest, // Cancel the context, which will disconnect the above interceptor. cancelInterceptor() } + +// testSendToRouteFailHTLCTimeout is similar to +// testPaymentFailedHTLCLocalSwept. The only difference is the `SendPayment` is +// replaced with `SendToRouteV2`. It checks that when an outgoing HTLC is timed +// out and claimed onchain via the timeout path, the payment will be marked as +// failed. This test creates a topology from Alice -> Bob, and let Alice send +// payments to Bob. Bob then goes offline, such that Alice's outgoing HTLC will +// time out. Alice will also be restarted to make sure resumed payments are +// also marked as failed. +func testSendToRouteFailHTLCTimeout(ht *lntest.HarnessTest) { + success := ht.Run("fail payment", func(t *testing.T) { + st := ht.Subtest(t) + runSendToRouteFailHTLCTimeout(st, false) + }) + if !success { + return + } + + ht.Run("fail resumed payment", func(t *testing.T) { + st := ht.Subtest(t) + runTestPaymentHTLCTimeout(st, true) + }) +} + +// runSendToRouteFailHTLCTimeout is the helper function that actually runs the +// testSendToRouteFailHTLCTimeout. +func runSendToRouteFailHTLCTimeout(ht *lntest.HarnessTest, restartAlice bool) { + // Set the feerate to be 10 sat/vb. + ht.SetFeeEstimate(2500) + + // Open a channel with 100k satoshis between Alice and Bob with Alice + // being the sole funder of the channel. + chanAmt := btcutil.Amount(100_000) + openChannelParams := lntest.OpenChannelParams{ + Amt: chanAmt, + } + + // Create a two hop network: Alice -> Bob. + chanPoints, nodes := createSimpleNetwork(ht, nil, 2, openChannelParams) + chanPoint := chanPoints[0] + alice, bob := nodes[0], nodes[1] + + // We now create two payments, one above dust and the other below dust, + // and we should see different behavior in terms of when the payment + // will be marked as failed due to the HTLC timeout. + // + // First, create random preimages. + preimage := ht.RandomPreimage() + dustPreimage := ht.RandomPreimage() + + // Get the preimage hashes. + payHash := preimage.Hash() + dustPayHash := dustPreimage.Hash() + + // Create an hold invoice for Bob which expects a payment of 10k + // satoshis from Alice. + const paymentAmt = 20_000 + req := &invoicesrpc.AddHoldInvoiceRequest{ + Value: paymentAmt, + Hash: payHash[:], + // Use a small CLTV value so we can mine fewer blocks. + CltvExpiry: finalCltvDelta, + } + invoice := bob.RPC.AddHoldInvoice(req) + + // Create another hold invoice for Bob which expects a payment of 1k + // satoshis from Alice. + const dustAmt = 1000 + req = &invoicesrpc.AddHoldInvoiceRequest{ + Value: dustAmt, + Hash: dustPayHash[:], + // Use a small CLTV value so we can mine fewer blocks. + CltvExpiry: finalCltvDelta, + } + dustInvoice := bob.RPC.AddHoldInvoice(req) + + // Construct a route to send the non-dust payment. + go func() { + // Query the route to send the payment. + routesReq := &lnrpc.QueryRoutesRequest{ + PubKey: bob.PubKeyStr, + Amt: paymentAmt, + FinalCltvDelta: finalCltvDelta, + } + routes := alice.RPC.QueryRoutes(routesReq) + require.Len(ht, routes.Routes, 1) + + route := routes.Routes[0] + require.Len(ht, route.Hops, 1) + + // Modify the hop to include MPP info. + route.Hops[0].MppRecord = &lnrpc.MPPRecord{ + PaymentAddr: invoice.PaymentAddr, + TotalAmtMsat: int64( + lnwire.NewMSatFromSatoshis(paymentAmt), + ), + } + + // Send the payment with the modified value. + req := &routerrpc.SendToRouteRequest{ + PaymentHash: payHash[:], + Route: route, + } + + // Send the payment and expect no error. + attempt := alice.RPC.SendToRouteV2(req) + require.Equal(ht, lnrpc.HTLCAttempt_FAILED, attempt.Status) + }() + + // Check that the payment is in-flight. + ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_IN_FLIGHT) + + // Construct a route to send the dust payment. + go func() { + // Query the route to send the payment. + routesReq := &lnrpc.QueryRoutesRequest{ + PubKey: bob.PubKeyStr, + Amt: dustAmt, + FinalCltvDelta: finalCltvDelta, + } + routes := alice.RPC.QueryRoutes(routesReq) + require.Len(ht, routes.Routes, 1) + + route := routes.Routes[0] + require.Len(ht, route.Hops, 1) + + // Modify the hop to include MPP info. + route.Hops[0].MppRecord = &lnrpc.MPPRecord{ + PaymentAddr: dustInvoice.PaymentAddr, + TotalAmtMsat: int64( + lnwire.NewMSatFromSatoshis(dustAmt), + ), + } + + // Send the payment with the modified value. + req := &routerrpc.SendToRouteRequest{ + PaymentHash: dustPayHash[:], + Route: route, + } + + // Send the payment and expect no error. + attempt := alice.RPC.SendToRouteV2(req) + require.Equal(ht, lnrpc.HTLCAttempt_FAILED, attempt.Status) + }() + + // Check that the dust payment is in-flight. + ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_IN_FLIGHT) + + // Bob should have two incoming HTLC. + ht.AssertIncomingHTLCActive(bob, chanPoint, payHash[:]) + ht.AssertIncomingHTLCActive(bob, chanPoint, dustPayHash[:]) + + // Alice should have two outgoing HTLCs. + ht.AssertOutgoingHTLCActive(alice, chanPoint, payHash[:]) + ht.AssertOutgoingHTLCActive(alice, chanPoint, dustPayHash[:]) + + // Let Bob go offline. + ht.Shutdown(bob) + + // We'll now mine enough blocks to trigger Alice to broadcast her + // commitment transaction due to the fact that the HTLC is about to + // timeout. With the default outgoing broadcast delta of zero, this + // will be the same height as the htlc expiry height. + numBlocks := padCLTV( + uint32(req.CltvExpiry - lncfg.DefaultOutgoingBroadcastDelta), + ) + ht.MineEmptyBlocks(int(numBlocks - 1)) + + // Restart Alice if requested. + if restartAlice { + // Restart Alice to test the resumed payment is canceled. + ht.RestartNode(alice) + } + + // We now subscribe to the payment status. + payStream := alice.RPC.TrackPaymentV2(payHash[:]) + dustPayStream := alice.RPC.TrackPaymentV2(dustPayHash[:]) + + // Mine a block to confirm Alice's closing transaction. + ht.MineBlocksAndAssertNumTxes(1, 1) + + // Now the channel is closed, we expect different behaviors based on + // whether the HTLC is a dust. For dust payment, it should be failed + // now as the HTLC won't go onchain. For non-dust payment, it should + // still be inflight. It won't be marked as failed unless the outgoing + // HTLC is resolved onchain. + // + // Check that the dust payment is failed in both the stream and DB. + ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_FAILED) + ht.AssertPaymentStatusFromStream(dustPayStream, lnrpc.Payment_FAILED) + + // Check that the non-dust payment is still in-flight. + // + // NOTE: we don't check the payment status from the stream here as + // there's no new status being sent. + ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_IN_FLIGHT) + + // We now have two possible cases for the non-dust payment: + // - Bob stays offline, and Alice will sweep her outgoing HTLC, which + // makes the payment failed. + // - Bob comes back online, and claims the HTLC on Alice's commitment + // via direct preimage spend, hence racing against Alice onchain. If + // he succeeds, Alice should mark the payment as succeeded. + // + // TODO(yy): test the second case once we have the RPC to clean + // mempool. + + // Since Alice's force close transaction has been confirmed, she should + // sweep her outgoing HTLC in next block. + ht.MineBlocksAndAssertNumTxes(2, 1) + + // We expect the non-dust payment to marked as failed in Alice's + // database and also from her stream. + ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_FAILED) + ht.AssertPaymentStatusFromStream(payStream, lnrpc.Payment_FAILED) +} diff --git a/lntest/harness.go b/lntest/harness.go index 9a11eaa88..e8996e9a6 100644 --- a/lntest/harness.go +++ b/lntest/harness.go @@ -399,6 +399,8 @@ func (h *HarnessTest) resetStandbyNodes(t *testing.T) { // config for the coming test. This will also inherit the // test's running context. h.RestartNodeWithExtraArgs(hn, hn.Cfg.OriginalExtraArgs) + + hn.AddToLogf("Finished test case %v", h.manager.currentTestCase) } } @@ -1624,31 +1626,14 @@ func (h *HarnessTest) OpenChannelPsbt(srcNode, destNode *node.HarnessNode, return respStream, upd.PsbtFund.Psbt } -// CleanupForceClose mines a force close commitment found in the mempool and -// the following sweep transaction from the force closing node. +// CleanupForceClose mines blocks to clean up the force close process. This is +// used for tests that are not asserting the expected behavior is found during +// the force close process, e.g., num of sweeps, etc. Instead, it provides a +// shortcut to move the test forward with a clean mempool. func (h *HarnessTest) CleanupForceClose(hn *node.HarnessNode) { // Wait for the channel to be marked pending force close. h.AssertNumPendingForceClose(hn, 1) - // Mine enough blocks for the node to sweep its funds from the force - // closed channel. The commit sweep resolver is able to offer the input - // to the sweeper at defaulCSV-1, and broadcast the sweep tx once one - // more block is mined. - // - // NOTE: we might empty blocks here as we don't know the exact number - // of blocks to mine. This may end up mining more blocks than needed. - h.MineEmptyBlocks(node.DefaultCSV - 1) - - // Assert there is one pending sweep. - h.AssertNumPendingSweeps(hn, 1) - - // Mine a block to trigger the sweep. - h.MineEmptyBlocks(1) - - // The node should now sweep the funds, clean up by mining the sweeping - // tx. - h.MineBlocksAndAssertNumTxes(1, 1) - // Mine blocks to get any second level HTLC resolved. If there are no // HTLCs, this will behave like h.AssertNumPendingCloseChannels. h.mineTillForceCloseResolved(hn) @@ -1771,6 +1756,14 @@ func (h *HarnessTest) SendPaymentAssertSettled(hn *node.HarnessNode, return h.SendPaymentAndAssertStatus(hn, req, lnrpc.Payment_SUCCEEDED) } +// SendPaymentAssertInflight sends a payment from the passed node and asserts +// the payment is inflight. +func (h *HarnessTest) SendPaymentAssertInflight(hn *node.HarnessNode, + req *routerrpc.SendPaymentRequest) *lnrpc.Payment { + + return h.SendPaymentAndAssertStatus(hn, req, lnrpc.Payment_IN_FLIGHT) +} + // OpenChannelRequest is used to open a channel using the method // OpenMultiChannelsAsync. type OpenChannelRequest struct { diff --git a/lntest/harness_miner.go b/lntest/harness_miner.go index 65994d254..bc9aef180 100644 --- a/lntest/harness_miner.go +++ b/lntest/harness_miner.go @@ -167,16 +167,9 @@ func (h *HarnessTest) cleanMempool() { } // mineTillForceCloseResolved asserts that the number of pending close channels -// are zero. Each time it checks, a new block is mined using MineBlocksSlow to -// give the node some time to catch up the chain. -// -// NOTE: this method is a workaround to make sure we have a clean mempool at -// the end of a channel force closure. We cannot directly mine blocks and -// assert channels being fully closed because the subsystems in lnd don't share -// the same block height. This is especially the case when blocks are produced -// too fast. -// TODO(yy): remove this workaround when syncing blocks are unified in all the -// subsystems. +// are zero. Each time it checks, an empty block is mined, followed by a +// mempool check to see if there are any sweeping txns. If found, these txns +// are then mined to clean up the mempool. func (h *HarnessTest) mineTillForceCloseResolved(hn *node.HarnessNode) { _, startHeight := h.GetBestBlock() @@ -184,7 +177,15 @@ func (h *HarnessTest) mineTillForceCloseResolved(hn *node.HarnessNode) { resp := hn.RPC.PendingChannels() total := len(resp.PendingForceClosingChannels) if total != 0 { - h.MineBlocks(1) + // Mine an empty block first. + h.MineEmptyBlocks(1) + + // If there are new sweeping txns, mine a block to + // confirm it. + mem := h.GetRawMempool() + if len(mem) != 0 { + h.MineBlocksAndAssertNumTxes(1, len(mem)) + } return fmt.Errorf("expected num of pending force " + "close channel to be zero") diff --git a/routing/mock_test.go b/routing/mock_test.go index f712c420d..306c18210 100644 --- a/routing/mock_test.go +++ b/routing/mock_test.go @@ -60,6 +60,12 @@ func (m *mockPaymentAttemptDispatcherOld) SendHTLC( return nil } +func (m *mockPaymentAttemptDispatcherOld) HasAttemptResult( + attemptID uint64) (bool, error) { + + return false, nil +} + func (m *mockPaymentAttemptDispatcherOld) GetAttemptResult(paymentID uint64, _ lntypes.Hash, _ htlcswitch.ErrorDecrypter) ( <-chan *htlcswitch.PaymentResult, error) { @@ -209,6 +215,10 @@ func (m *mockPayerOld) SendHTLC(_ lnwire.ShortChannelID, } +func (m *mockPayerOld) HasAttemptResult(attemptID uint64) (bool, error) { + return false, nil +} + func (m *mockPayerOld) GetAttemptResult(paymentID uint64, _ lntypes.Hash, _ htlcswitch.ErrorDecrypter) (<-chan *htlcswitch.PaymentResult, error) { @@ -585,6 +595,13 @@ func (m *mockPaymentAttemptDispatcher) SendHTLC(firstHop lnwire.ShortChannelID, return args.Error(0) } +func (m *mockPaymentAttemptDispatcher) HasAttemptResult( + attemptID uint64) (bool, error) { + + args := m.Called(attemptID) + return args.Bool(0), args.Error(1) +} + func (m *mockPaymentAttemptDispatcher) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash, deobfuscator htlcswitch.ErrorDecrypter) ( <-chan *htlcswitch.PaymentResult, error) { diff --git a/routing/payment_lifecycle.go b/routing/payment_lifecycle.go index 1d103c5ac..5244d4d63 100644 --- a/routing/payment_lifecycle.go +++ b/routing/payment_lifecycle.go @@ -179,7 +179,7 @@ func (p *paymentLifecycle) resumePayment(ctx context.Context) ([32]byte, for _, a := range payment.InFlightHTLCs() { a := a - log.Infof("Resuming payment shard %v for payment %v", + log.Infof("Resuming HTLC attempt %v for payment %v", a.AttemptID, p.identifier) p.resultCollector(&a) @@ -463,6 +463,8 @@ func (p *paymentLifecycle) collectResultAsync(attempt *channeldb.HTLCAttempt) { func (p *paymentLifecycle) collectResult(attempt *channeldb.HTLCAttempt) ( *attemptResult, error) { + log.Tracef("Collecting result for attempt %v", spew.Sdump(attempt)) + // We'll retrieve the hash specific to this shard from the // shardTracker, since it will be needed to regenerate the circuit // below. @@ -663,8 +665,8 @@ func (p *paymentLifecycle) createNewPaymentAttempt(rt *route.Route, func (p *paymentLifecycle) sendAttempt( attempt *channeldb.HTLCAttempt) (*attemptResult, error) { - log.Debugf("Attempting to send payment %v (pid=%v)", p.identifier, - attempt.AttemptID) + log.Debugf("Sending HTLC attempt(id=%v, amt=%v) for payment %v", + attempt.AttemptID, attempt.Route.TotalAmount, p.identifier) rt := attempt.Route diff --git a/routing/router.go b/routing/router.go index 4169548c5..ec134a15b 100644 --- a/routing/router.go +++ b/routing/router.go @@ -135,6 +135,16 @@ type PaymentAttemptDispatcher interface { // NOTE: New payment attempts MUST NOT be made after the keepPids map // has been created and this method has returned. CleanStore(keepPids map[uint64]struct{}) error + + // HasAttemptResult reads the network result store to fetch the + // specified attempt. Returns true if the attempt result exists. + // + // NOTE: This method is used and should only be used by the router to + // resume payments during startup. It can be viewed as a subset of + // `GetAttemptResult` in terms of its functionality, and can be removed + // once we move the construction of `UpdateAddHTLC` and + // `ErrorDecrypter` into `htlcswitch`. + HasAttemptResult(attemptID uint64) (bool, error) } // PaymentSessionSource is an interface that defines a source for the router to @@ -252,9 +262,9 @@ type Config struct { // sessions. SessionSource PaymentSessionSource - // QueryBandwidth is a method that allows the router to query the lower - // link layer to determine the up-to-date available bandwidth at a - // prospective link to be traversed. If the link isn't available, then + // GetLink is a method that allows the router to query the lower link + // layer to determine the up-to-date available bandwidth at a + // prospective link to be traversed. If the link isn't available, then // a value of zero should be returned. Otherwise, the current up-to- // date knowledge of the available bandwidth of the link should be // returned. @@ -275,6 +285,11 @@ type Config struct { // ApplyChannelUpdate can be called to apply a new channel update to the // graph that we received from a payment failure. ApplyChannelUpdate func(msg *lnwire.ChannelUpdate) bool + + // ClosedSCIDs is used by the router to fetch closed channels. + // + // TODO(yy): remove it once the root cause of stuck payments is found. + ClosedSCIDs map[lnwire.ShortChannelID]struct{} } // EdgeLocator is a struct used to identify a specific edge. @@ -336,88 +351,8 @@ func (r *ChannelRouter) Start() error { // If any payments are still in flight, we resume, to make sure their // results are properly handled. - payments, err := r.cfg.Control.FetchInFlightPayments() - if err != nil { - return err - } - - // Before we restart existing payments and start accepting more - // payments to be made, we clean the network result store of the - // Switch. We do this here at startup to ensure no more payments can be - // made concurrently, so we know the toKeep map will be up-to-date - // until the cleaning has finished. - toKeep := make(map[uint64]struct{}) - for _, p := range payments { - for _, a := range p.HTLCs { - toKeep[a.AttemptID] = struct{}{} - } - } - - log.Debugf("Cleaning network result store.") - if err := r.cfg.Payer.CleanStore(toKeep); err != nil { - return err - } - - for _, payment := range payments { - log.Infof("Resuming payment %v", payment.Info.PaymentIdentifier) - r.wg.Add(1) - go func(payment *channeldb.MPPayment) { - defer r.wg.Done() - - // Get the hashes used for the outstanding HTLCs. - htlcs := make(map[uint64]lntypes.Hash) - for _, a := range payment.HTLCs { - a := a - - // We check whether the individual attempts - // have their HTLC hash set, if not we'll fall - // back to the overall payment hash. - hash := payment.Info.PaymentIdentifier - if a.Hash != nil { - hash = *a.Hash - } - - htlcs[a.AttemptID] = hash - } - - // Since we are not supporting creating more shards - // after a restart (only receiving the result of the - // shards already outstanding), we create a simple - // shard tracker that will map the attempt IDs to - // hashes used for the HTLCs. This will be enough also - // for AMP payments, since we only need the hashes for - // the individual HTLCs to regenerate the circuits, and - // we don't currently persist the root share necessary - // to re-derive them. - shardTracker := shards.NewSimpleShardTracker( - payment.Info.PaymentIdentifier, htlcs, - ) - - // We create a dummy, empty payment session such that - // we won't make another payment attempt when the - // result for the in-flight attempt is received. - paySession := r.cfg.SessionSource.NewPaymentSessionEmpty() - - // We pass in a non-timeout context, to indicate we - // don't need it to timeout. It will stop immediately - // after the existing attempt has finished anyway. We - // also set a zero fee limit, as no more routes should - // be tried. - noTimeout := time.Duration(0) - _, _, err := r.sendPayment( - context.Background(), 0, - payment.Info.PaymentIdentifier, noTimeout, - paySession, shardTracker, - ) - if err != nil { - log.Errorf("Resuming payment %v failed: %v.", - payment.Info.PaymentIdentifier, err) - return - } - - log.Infof("Resumed payment %v completed.", - payment.Info.PaymentIdentifier) - }(payment) + if err := r.resumePayments(); err != nil { + log.Error("Failed to resume payments during startup") } return nil @@ -1133,6 +1068,9 @@ func (r *ChannelRouter) SendToRouteSkipTempErr(htlcHash lntypes.Hash, func (r *ChannelRouter) sendToRoute(htlcHash lntypes.Hash, rt *route.Route, skipTempErr bool) (*channeldb.HTLCAttempt, error) { + log.Debugf("SendToRoute for payment %v with skipTempErr=%v", + htlcHash, skipTempErr) + // Calculate amount paid to receiver. amt := rt.ReceiverAmt() @@ -1448,6 +1386,204 @@ func (r *ChannelRouter) BuildRoute(amt fn.Option[lnwire.MilliSatoshi], ) } +// resumePayments fetches inflight payments and resumes their payment +// lifecycles. +func (r *ChannelRouter) resumePayments() error { + // Get all payments that are inflight. + payments, err := r.cfg.Control.FetchInFlightPayments() + if err != nil { + return err + } + + // Before we restart existing payments and start accepting more + // payments to be made, we clean the network result store of the + // Switch. We do this here at startup to ensure no more payments can be + // made concurrently, so we know the toKeep map will be up-to-date + // until the cleaning has finished. + toKeep := make(map[uint64]struct{}) + for _, p := range payments { + for _, a := range p.HTLCs { + toKeep[a.AttemptID] = struct{}{} + + // Try to fail the attempt if the route contains a dead + // channel. + r.failStaleAttempt(a, p.Info.PaymentIdentifier) + } + } + + log.Debugf("Cleaning network result store.") + if err := r.cfg.Payer.CleanStore(toKeep); err != nil { + return err + } + + // launchPayment is a helper closure that handles resuming the payment. + launchPayment := func(payment *channeldb.MPPayment) { + defer r.wg.Done() + + // Get the hashes used for the outstanding HTLCs. + htlcs := make(map[uint64]lntypes.Hash) + for _, a := range payment.HTLCs { + a := a + + // We check whether the individual attempts have their + // HTLC hash set, if not we'll fall back to the overall + // payment hash. + hash := payment.Info.PaymentIdentifier + if a.Hash != nil { + hash = *a.Hash + } + + htlcs[a.AttemptID] = hash + } + + payHash := payment.Info.PaymentIdentifier + + // Since we are not supporting creating more shards after a + // restart (only receiving the result of the shards already + // outstanding), we create a simple shard tracker that will map + // the attempt IDs to hashes used for the HTLCs. This will be + // enough also for AMP payments, since we only need the hashes + // for the individual HTLCs to regenerate the circuits, and we + // don't currently persist the root share necessary to + // re-derive them. + shardTracker := shards.NewSimpleShardTracker(payHash, htlcs) + + // We create a dummy, empty payment session such that we won't + // make another payment attempt when the result for the + // in-flight attempt is received. + paySession := r.cfg.SessionSource.NewPaymentSessionEmpty() + + // We pass in a non-timeout context, to indicate we don't need + // it to timeout. It will stop immediately after the existing + // attempt has finished anyway. We also set a zero fee limit, + // as no more routes should be tried. + noTimeout := time.Duration(0) + _, _, err := r.sendPayment( + context.Background(), 0, payHash, noTimeout, paySession, + shardTracker, + ) + if err != nil { + log.Errorf("Resuming payment %v failed: %v", payHash, + err) + + return + } + + log.Infof("Resumed payment %v completed", payHash) + } + + for _, payment := range payments { + log.Infof("Resuming payment %v", payment.Info.PaymentIdentifier) + + r.wg.Add(1) + go launchPayment(payment) + } + + return nil +} + +// failStaleAttempt will fail an HTLC attempt if it's using an unknown channel +// in its route. It first consults the switch to see if there's already a +// network result stored for this attempt. If not, it will check whether the +// first hop of this attempt is using an active channel known to us. If +// inactive, this attempt will be failed. +// +// NOTE: there's an unknown bug that caused the network result for a particular +// attempt to NOT be saved, resulting a payment being stuck forever. More info: +// - https://github.com/lightningnetwork/lnd/issues/8146 +// - https://github.com/lightningnetwork/lnd/pull/8174 +func (r *ChannelRouter) failStaleAttempt(a channeldb.HTLCAttempt, + payHash lntypes.Hash) { + + // We can only fail inflight HTLCs so we skip the settled/failed ones. + if a.Failure != nil || a.Settle != nil { + return + } + + // First, check if we've already had a network result for this attempt. + // If no result is found, we'll check whether the reference link is + // still known to us. + ok, err := r.cfg.Payer.HasAttemptResult(a.AttemptID) + if err != nil { + log.Errorf("Failed to fetch network result for attempt=%v", + a.AttemptID) + return + } + + // There's already a network result, no need to fail it here as the + // payment lifecycle will take care of it, so we can exit early. + if ok { + log.Debugf("Already have network result for attempt=%v", + a.AttemptID) + return + } + + // We now need to decide whether this attempt should be failed here. + // For very old payments, it's possible that the network results were + // never saved, causing the payments to be stuck inflight. We now check + // whether the first hop is referencing an active channel ID and, if + // not, we will fail the attempt as it has no way to be retried again. + var shouldFail bool + + // Validate that the attempt has hop info. If this attempt has no hop + // info it indicates an error in our db. + if len(a.Route.Hops) == 0 { + log.Errorf("Found empty hop for attempt=%v", a.AttemptID) + + shouldFail = true + } else { + // Get the short channel ID. + chanID := a.Route.Hops[0].ChannelID + scid := lnwire.NewShortChanIDFromInt(chanID) + + // Check whether this link is active. If so, we won't fail the + // attempt but keep waiting for its result. + _, err := r.cfg.GetLink(scid) + if err == nil { + return + } + + // We should get the link not found err. If not, we will log an + // error and skip failing this attempt since an unknown error + // occurred. + if !errors.Is(err, htlcswitch.ErrChannelLinkNotFound) { + log.Errorf("Failed to get link for attempt=%v for "+ + "payment=%v: %v", a.AttemptID, payHash, err) + + return + } + + // The channel link is not active, we now check whether this + // channel is already closed. If so, we fail the HTLC attempt + // as there's no need to wait for its network result because + // there's no link. If the channel is still pending, we'll keep + // waiting for the result as we may get a contract resolution + // for this HTLC. + if _, ok := r.cfg.ClosedSCIDs[scid]; ok { + shouldFail = true + } + } + + // Exit if there's no need to fail. + if !shouldFail { + return + } + + log.Errorf("Failing stale attempt=%v for payment=%v", a.AttemptID, + payHash) + + // Fail the attempt in db. If there's an error, there's nothing we can + // do here but logging it. + failInfo := &channeldb.HTLCFailInfo{ + Reason: channeldb.HTLCFailUnknown, + FailTime: r.cfg.Clock.Now(), + } + _, err = r.cfg.Control.FailAttempt(payHash, a.AttemptID, failInfo) + if err != nil { + log.Errorf("Fail attempt=%v got error: %v", a.AttemptID, err) + } +} + // getEdgeUnifiers returns a list of edge unifiers for the given route. func getEdgeUnifiers(source route.Vertex, hops []route.Vertex, outgoingChans map[uint64]struct{}, diff --git a/routing/router_test.go b/routing/router_test.go index 3a9b97d29..073761fa0 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -93,6 +93,8 @@ func (c *testCtx) getChannelIDFromAlias(t *testing.T, a, b string) uint64 { return channelID } +var mockClosedSCIDs map[lnwire.ShortChannelID]struct{} + func createTestCtxFromGraphInstance(t *testing.T, startingHeight uint32, graphInstance *testGraphInstance) *testCtx { @@ -161,6 +163,7 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T, PathFindingConfig: pathFindingConfig, Clock: clock.NewTestClock(time.Unix(1, 0)), ApplyChannelUpdate: graphBuilder.ApplyChannelUpdate, + ClosedSCIDs: mockClosedSCIDs, }) require.NoError(t, router.Start(), "unable to start router") @@ -2170,6 +2173,7 @@ func TestSendToRouteSkipTempErrSuccess(t *testing.T) { NextPaymentID: func() (uint64, error) { return 0, nil }, + ClosedSCIDs: mockClosedSCIDs, }} // Register mockers with the expected method calls. @@ -2253,6 +2257,7 @@ func TestSendToRouteSkipTempErrNonMPP(t *testing.T) { NextPaymentID: func() (uint64, error) { return 0, nil }, + ClosedSCIDs: mockClosedSCIDs, }} // Expect an error to be returned. @@ -2307,6 +2312,7 @@ func TestSendToRouteSkipTempErrTempFailure(t *testing.T) { NextPaymentID: func() (uint64, error) { return 0, nil }, + ClosedSCIDs: mockClosedSCIDs, }} // Create the error to be returned. @@ -2389,6 +2395,7 @@ func TestSendToRouteSkipTempErrPermanentFailure(t *testing.T) { NextPaymentID: func() (uint64, error) { return 0, nil }, + ClosedSCIDs: mockClosedSCIDs, }} // Create the error to be returned. @@ -2475,6 +2482,7 @@ func TestSendToRouteTempFailure(t *testing.T) { NextPaymentID: func() (uint64, error) { return 0, nil }, + ClosedSCIDs: mockClosedSCIDs, }} // Create the error to be returned. diff --git a/server.go b/server.go index a99591997..555ee2627 100644 --- a/server.go +++ b/server.go @@ -1005,6 +1005,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, PathFindingConfig: pathFindingConfig, Clock: clock.NewDefaultClock(), ApplyChannelUpdate: s.graphBuilder.ApplyChannelUpdate, + ClosedSCIDs: s.fetchClosedChannelSCIDs(), }) if err != nil { return nil, fmt.Errorf("can't create router: %w", err) @@ -4829,3 +4830,52 @@ func shouldPeerBootstrap(cfg *Config) bool { // covering the bootstrapping process. return !cfg.NoNetBootstrap && !isDevNetwork } + +// fetchClosedChannelSCIDs returns a set of SCIDs that have their force closing +// finished. +func (s *server) fetchClosedChannelSCIDs() map[lnwire.ShortChannelID]struct{} { + // Get a list of closed channels. + channels, err := s.chanStateDB.FetchClosedChannels(false) + if err != nil { + srvrLog.Errorf("Failed to fetch closed channels: %v", err) + return nil + } + + // Save the SCIDs in a map. + closedSCIDs := make(map[lnwire.ShortChannelID]struct{}, len(channels)) + for _, c := range channels { + // If the channel is not pending, its FC has been finalized. + if !c.IsPending { + closedSCIDs[c.ShortChanID] = struct{}{} + } + } + + // Double check whether the reported closed channel has indeed finished + // closing. + // + // NOTE: There are misalignments regarding when a channel's FC is + // marked as finalized. We double check the pending channels to make + // sure the returned SCIDs are indeed terminated. + // + // TODO(yy): fix the misalignments in `FetchClosedChannels`. + pendings, err := s.chanStateDB.FetchPendingChannels() + if err != nil { + srvrLog.Errorf("Failed to fetch pending channels: %v", err) + return nil + } + + for _, c := range pendings { + if _, ok := closedSCIDs[c.ShortChannelID]; !ok { + continue + } + + // If the channel is still reported as pending, remove it from + // the map. + delete(closedSCIDs, c.ShortChannelID) + + srvrLog.Warnf("Channel=%v is prematurely marked as finalized", + c.ShortChannelID) + } + + return closedSCIDs +}