Merge pull request #8174 from yyforyongyu/fix-inflight-payments

routing: fix stuck inflight payments
This commit is contained in:
Yong 2024-08-07 23:23:27 +08:00 committed by GitHub
commit a449a5d132
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 1314 additions and 487 deletions

View File

@ -211,6 +211,11 @@ func (f *PkgFilter) Decode(r io.Reader) error {
return err
}
// String returns a human-readable string.
func (f *PkgFilter) String() string {
return fmt.Sprintf("count=%v, filter=%v", f.count, f.filter)
}
// FwdPkg records all adds, settles, and fails that were locked in as a result
// of the remote peer sending us a revocation. Each package is identified by
// the short chanid and remote commitment height corresponding to the revocation

View File

@ -424,7 +424,9 @@ func (p *PaymentControl) RegisterAttempt(paymentHash lntypes.Hash,
// Ensure we aren't sending more than the total payment amount.
sentAmt, _ := payment.SentAmt()
if sentAmt+amt > payment.Info.Value {
return ErrValueExceedsAmt
return fmt.Errorf("%w: attempted=%v, payment amount="+
"%v", ErrValueExceedsAmt, sentAmt+amt,
payment.Info.Value)
}
htlcsBucket, err := bucket.CreateBucketIfNotExists(

View File

@ -734,10 +734,7 @@ func TestPaymentControlMultiShard(t *testing.T) {
b := *attempt
b.AttemptID = 3
_, err = pControl.RegisterAttempt(info.PaymentIdentifier, &b)
if err != ErrValueExceedsAmt {
t.Fatalf("expected ErrValueExceedsAmt, got: %v",
err)
}
require.ErrorIs(t, err, ErrValueExceedsAmt)
// Fail the second attempt.
a := attempts[1]

View File

@ -48,6 +48,15 @@
bumping an anchor channel closing was not possible when no HTLCs were on the
commitment when the channel was force closed.
* [Fixed](https://github.com/lightningnetwork/lnd/pull/8174) old payments that
are stuck inflight. Though the root cause is unknown, it's possible the
network result for a given HTLC attempt was not saved, which is now fixed.
Check
[here](https://github.com/lightningnetwork/lnd/pull/8174#issue-1992055103)
for the details about the analysis, and
[here](https://github.com/lightningnetwork/lnd/issues/8146) for a summary of
the issue.
# New Features
## Functional Enhancements
## RPC Additions

View File

@ -3297,7 +3297,7 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg,
return
}
l.log.Debugf("settle-fail-filter %v", fwdPkg.SettleFailFilter)
l.log.Debugf("settle-fail-filter: %v", fwdPkg.SettleFailFilter)
var switchPackets []*htlcPacket
for i, pd := range settleFails {

View File

@ -90,32 +90,32 @@ type networkResultStore struct {
results map[uint64][]chan *networkResult
resultsMtx sync.Mutex
// paymentIDMtx is a multimutex used to make sure the database and
// result subscribers map is consistent for each payment ID in case of
// attemptIDMtx is a multimutex used to make sure the database and
// result subscribers map is consistent for each attempt ID in case of
// concurrent callers.
paymentIDMtx *multimutex.Mutex[uint64]
attemptIDMtx *multimutex.Mutex[uint64]
}
func newNetworkResultStore(db kvdb.Backend) *networkResultStore {
return &networkResultStore{
backend: db,
results: make(map[uint64][]chan *networkResult),
paymentIDMtx: multimutex.NewMutex[uint64](),
attemptIDMtx: multimutex.NewMutex[uint64](),
}
}
// storeResult stores the networkResult for the given paymentID, and
// notifies any subscribers.
func (store *networkResultStore) storeResult(paymentID uint64,
// storeResult stores the networkResult for the given attemptID, and notifies
// any subscribers.
func (store *networkResultStore) storeResult(attemptID uint64,
result *networkResult) error {
// We get a mutex for this payment ID. This is needed to ensure
// We get a mutex for this attempt ID. This is needed to ensure
// consistency between the database state and the subscribers in case
// of concurrent calls.
store.paymentIDMtx.Lock(paymentID)
defer store.paymentIDMtx.Unlock(paymentID)
store.attemptIDMtx.Lock(attemptID)
defer store.attemptIDMtx.Unlock(attemptID)
log.Debugf("Storing result for paymentID=%v", paymentID)
log.Debugf("Storing result for attemptID=%v", attemptID)
// Serialize the payment result.
var b bytes.Buffer
@ -123,8 +123,8 @@ func (store *networkResultStore) storeResult(paymentID uint64,
return err
}
var paymentIDBytes [8]byte
binary.BigEndian.PutUint64(paymentIDBytes[:], paymentID)
var attemptIDBytes [8]byte
binary.BigEndian.PutUint64(attemptIDBytes[:], attemptID)
err := kvdb.Batch(store.backend, func(tx kvdb.RwTx) error {
networkResults, err := tx.CreateTopLevelBucket(
@ -134,7 +134,7 @@ func (store *networkResultStore) storeResult(paymentID uint64,
return err
}
return networkResults.Put(paymentIDBytes[:], b.Bytes())
return networkResults.Put(attemptIDBytes[:], b.Bytes())
})
if err != nil {
return err
@ -143,28 +143,27 @@ func (store *networkResultStore) storeResult(paymentID uint64,
// Now that the result is stored in the database, we can notify any
// active subscribers.
store.resultsMtx.Lock()
for _, res := range store.results[paymentID] {
for _, res := range store.results[attemptID] {
res <- result
}
delete(store.results, paymentID)
delete(store.results, attemptID)
store.resultsMtx.Unlock()
return nil
}
// subscribeResult is used to get the payment result for the given
// payment ID. It returns a channel on which the result will be delivered when
// ready.
func (store *networkResultStore) subscribeResult(paymentID uint64) (
// subscribeResult is used to get the HTLC attempt result for the given attempt
// ID. It returns a channel on which the result will be delivered when ready.
func (store *networkResultStore) subscribeResult(attemptID uint64) (
<-chan *networkResult, error) {
// We get a mutex for this payment ID. This is needed to ensure
// consistency between the database state and the subscribers in case
// of concurrent calls.
store.paymentIDMtx.Lock(paymentID)
defer store.paymentIDMtx.Unlock(paymentID)
store.attemptIDMtx.Lock(attemptID)
defer store.attemptIDMtx.Unlock(attemptID)
log.Debugf("Subscribing to result for paymentID=%v", paymentID)
log.Debugf("Subscribing to result for attemptID=%v", attemptID)
var (
result *networkResult
@ -173,7 +172,7 @@ func (store *networkResultStore) subscribeResult(paymentID uint64) (
err := kvdb.View(store.backend, func(tx kvdb.RTx) error {
var err error
result, err = fetchResult(tx, paymentID)
result, err = fetchResult(tx, attemptID)
switch {
// Result not yet available, we will notify once a result is
@ -205,8 +204,8 @@ func (store *networkResultStore) subscribeResult(paymentID uint64) (
// Otherwise we store the result channel for when the result is
// available.
store.resultsMtx.Lock()
store.results[paymentID] = append(
store.results[paymentID], resultChan,
store.results[attemptID] = append(
store.results[attemptID], resultChan,
)
store.resultsMtx.Unlock()
@ -234,8 +233,8 @@ func (store *networkResultStore) getResult(pid uint64) (
}
func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) {
var paymentIDBytes [8]byte
binary.BigEndian.PutUint64(paymentIDBytes[:], pid)
var attemptIDBytes [8]byte
binary.BigEndian.PutUint64(attemptIDBytes[:], pid)
networkResults := tx.ReadBucket(networkResultStoreBucketKey)
if networkResults == nil {
@ -243,7 +242,7 @@ func fetchResult(tx kvdb.RTx, pid uint64) (*networkResult, error) {
}
// Check whether a result is already available.
resultBytes := networkResults.Get(paymentIDBytes[:])
resultBytes := networkResults.Get(attemptIDBytes[:])
if resultBytes == nil {
return nil, ErrPaymentIDNotFound
}

View File

@ -431,11 +431,26 @@ func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) erro
}
}
// GetAttemptResult returns the result of the payment attempt with the given
// HasAttemptResult reads the network result store to fetch the specified
// attempt. Returns true if the attempt result exists.
func (s *Switch) HasAttemptResult(attemptID uint64) (bool, error) {
_, err := s.networkResults.getResult(attemptID)
if err == nil {
return true, nil
}
if !errors.Is(err, ErrPaymentIDNotFound) {
return false, err
}
return false, nil
}
// GetAttemptResult returns the result of the HTLC attempt with the given
// attemptID. The paymentHash should be set to the payment's overall hash, or
// in case of AMP payments the payment's unique identifier.
//
// The method returns a channel where the payment result will be sent when
// The method returns a channel where the HTLC attempt result will be sent when
// available, or an error is encountered during forwarding. When a result is
// received on the channel, the HTLC is guaranteed to no longer be in flight.
// The switch shutting down is signaled by closing the channel. If the
@ -452,9 +467,9 @@ func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash,
}
)
// If the payment is not found in the circuit map, check whether a
// result is already available.
// Assumption: no one will add this payment ID other than the caller.
// If the HTLC is not found in the circuit map, check whether a result
// is already available.
// Assumption: no one will add this attempt ID other than the caller.
if s.circuits.LookupCircuit(inKey) == nil {
res, err := s.networkResults.getResult(attemptID)
if err != nil {
@ -464,7 +479,7 @@ func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash,
c <- res
nChan = c
} else {
// The payment was committed to the circuits, subscribe for a
// The HTLC was committed to the circuits, subscribe for a
// result.
nChan, err = s.networkResults.subscribeResult(attemptID)
if err != nil {
@ -474,7 +489,7 @@ func (s *Switch) GetAttemptResult(attemptID uint64, paymentHash lntypes.Hash,
resultChan := make(chan *PaymentResult, 1)
// Since the payment was known, we can start a goroutine that can
// Since the attempt was known, we can start a goroutine that can
// extract the result when it is available, and pass it on to the
// caller.
s.wg.Add(1)
@ -939,7 +954,7 @@ func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
// Store the result to the db. This will also notify subscribers about
// the result.
if err := s.networkResults.storeResult(attemptID, n); err != nil {
log.Errorf("Unable to complete payment for pid=%v: %v",
log.Errorf("Unable to store attempt result for pid=%v: %v",
attemptID, err)
return
}
@ -967,7 +982,7 @@ func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
// This can only happen if the circuit is still open, which is why this
// ordering is chosen.
if err := s.teardownCircuit(pkt); err != nil {
log.Warnf("Unable to teardown circuit %s: %v",
log.Errorf("Unable to teardown circuit %s: %v",
pkt.inKey(), err)
return
}
@ -1099,296 +1114,26 @@ func (s *Switch) parseFailedPayment(deobfuscator ErrorDecrypter,
// updates back. This behaviour is achieved by creation of payment circuits.
func (s *Switch) handlePacketForward(packet *htlcPacket) error {
switch htlc := packet.htlc.(type) {
// Channel link forwarded us a new htlc, therefore we initiate the
// payment circuit within our internal state so we can properly forward
// the ultimate settle message back latter.
case *lnwire.UpdateAddHTLC:
// Check if the node is set to reject all onward HTLCs and also make
// sure that HTLC is not from the source node.
if s.cfg.RejectHTLC {
failure := NewDetailedLinkError(
&lnwire.FailChannelDisabled{},
OutgoingFailureForwardsDisabled,
)
return s.handlePacketAdd(packet, htlc)
return s.failAddPacket(packet, failure)
}
case *lnwire.UpdateFulfillHTLC:
return s.handlePacketSettle(packet)
// Before we attempt to find a non-strict forwarding path for
// this htlc, check whether the htlc is being routed over the
// same incoming and outgoing channel. If our node does not
// allow forwards of this nature, we fail the htlc early. This
// check is in place to disallow inefficiently routed htlcs from
// locking up our balance. With channels where the
// option-scid-alias feature was negotiated, we also have to be
// sure that the IDs aren't the same since one or both could be
// an alias.
linkErr := s.checkCircularForward(
packet.incomingChanID, packet.outgoingChanID,
s.cfg.AllowCircularRoute, htlc.PaymentHash,
)
if linkErr != nil {
return s.failAddPacket(packet, linkErr)
}
s.indexMtx.RLock()
targetLink, err := s.getLinkByMapping(packet)
if err != nil {
s.indexMtx.RUnlock()
log.Debugf("unable to find link with "+
"destination %v", packet.outgoingChanID)
// If packet was forwarded from another channel link
// than we should notify this link that some error
// occurred.
linkError := NewLinkError(
&lnwire.FailUnknownNextPeer{},
)
return s.failAddPacket(packet, linkError)
}
targetPeerKey := targetLink.PeerPubKey()
interfaceLinks, _ := s.getLinks(targetPeerKey)
s.indexMtx.RUnlock()
// We'll keep track of any HTLC failures during the link
// selection process. This way we can return the error for
// precise link that the sender selected, while optimistically
// trying all links to utilize our available bandwidth.
linkErrs := make(map[lnwire.ShortChannelID]*LinkError)
// Find all destination channel links with appropriate
// bandwidth.
var destinations []ChannelLink
for _, link := range interfaceLinks {
var failure *LinkError
// We'll skip any links that aren't yet eligible for
// forwarding.
if !link.EligibleToForward() {
failure = NewDetailedLinkError(
&lnwire.FailUnknownNextPeer{},
OutgoingFailureLinkNotEligible,
)
} else {
// We'll ensure that the HTLC satisfies the
// current forwarding conditions of this target
// link.
currentHeight := atomic.LoadUint32(&s.bestHeight)
failure = link.CheckHtlcForward(
htlc.PaymentHash, packet.incomingAmount,
packet.amount, packet.incomingTimeout,
packet.outgoingTimeout,
packet.inboundFee,
currentHeight,
packet.originalOutgoingChanID,
)
}
// If this link can forward the htlc, add it to the set
// of destinations.
if failure == nil {
destinations = append(destinations, link)
continue
}
linkErrs[link.ShortChanID()] = failure
}
// If we had a forwarding failure due to the HTLC not
// satisfying the current policy, then we'll send back an
// error, but ensure we send back the error sourced at the
// *target* link.
if len(destinations) == 0 {
// At this point, some or all of the links rejected the
// HTLC so we couldn't forward it. So we'll try to look
// up the error that came from the source.
linkErr, ok := linkErrs[packet.outgoingChanID]
if !ok {
// If we can't find the error of the source,
// then we'll return an unknown next peer,
// though this should never happen.
linkErr = NewLinkError(
&lnwire.FailUnknownNextPeer{},
)
log.Warnf("unable to find err source for "+
"outgoing_link=%v, errors=%v",
packet.outgoingChanID,
lnutils.SpewLogClosure(linkErrs))
}
log.Tracef("incoming HTLC(%x) violated "+
"target outgoing link (id=%v) policy: %v",
htlc.PaymentHash[:], packet.outgoingChanID,
linkErr)
return s.failAddPacket(packet, linkErr)
}
// Choose a random link out of the set of links that can forward
// this htlc. The reason for randomization is to evenly
// distribute the htlc load without making assumptions about
// what the best channel is.
destination := destinations[rand.Intn(len(destinations))] // nolint:gosec
// Retrieve the incoming link by its ShortChannelID. Note that
// the incomingChanID is never set to hop.Source here.
s.indexMtx.RLock()
incomingLink, err := s.getLinkByShortID(packet.incomingChanID)
s.indexMtx.RUnlock()
if err != nil {
// If we couldn't find the incoming link, we can't
// evaluate the incoming's exposure to dust, so we just
// fail the HTLC back.
linkErr := NewLinkError(
&lnwire.FailTemporaryChannelFailure{},
)
return s.failAddPacket(packet, linkErr)
}
// Evaluate whether this HTLC would increase our fee exposure
// over the threshold on the incoming link. If it does, fail it
// backwards.
if s.dustExceedsFeeThreshold(
incomingLink, packet.incomingAmount, true,
) {
// The incoming dust exceeds the threshold, so we fail
// the add back.
linkErr := NewLinkError(
&lnwire.FailTemporaryChannelFailure{},
)
return s.failAddPacket(packet, linkErr)
}
// Also evaluate whether this HTLC would increase our fee
// exposure over the threshold on the destination link. If it
// does, fail it back.
if s.dustExceedsFeeThreshold(
destination, packet.amount, false,
) {
// The outgoing dust exceeds the threshold, so we fail
// the add back.
linkErr := NewLinkError(
&lnwire.FailTemporaryChannelFailure{},
)
return s.failAddPacket(packet, linkErr)
}
// Send the packet to the destination channel link which
// manages the channel.
packet.outgoingChanID = destination.ShortChanID()
return destination.handleSwitchPacket(packet)
case *lnwire.UpdateFailHTLC, *lnwire.UpdateFulfillHTLC:
// If the source of this packet has not been set, use the
// circuit map to lookup the origin.
circuit, err := s.closeCircuit(packet)
if err != nil {
return err
}
// closeCircuit returns a nil circuit when a settle packet returns an
// ErrUnknownCircuit error upon the inner call to CloseCircuit.
if circuit == nil {
return nil
}
fail, isFail := htlc.(*lnwire.UpdateFailHTLC)
if isFail && !packet.hasSource {
// HTLC resolutions and messages restored from disk
// don't have the obfuscator set from the original htlc
// add packet - set it here for use in blinded errors.
packet.obfuscator = circuit.ErrorEncrypter
switch {
// No message to encrypt, locally sourced payment.
case circuit.ErrorEncrypter == nil:
// If this is a resolution message, then we'll need to
// encrypt it as it's actually internally sourced.
case packet.isResolution:
var err error
// TODO(roasbeef): don't need to pass actually?
failure := &lnwire.FailPermanentChannelFailure{}
fail.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop(
failure,
)
if err != nil {
err = fmt.Errorf("unable to obfuscate "+
"error: %v", err)
log.Error(err)
}
// Alternatively, if the remote party send us an
// UpdateFailMalformedHTLC, then we'll need to convert
// this into a proper well formatted onion error as
// there's no HMAC currently.
case packet.convertedError:
log.Infof("Converting malformed HTLC error "+
"for circuit for Circuit(%x: "+
"(%s, %d) <-> (%s, %d))", packet.circuit.PaymentHash,
packet.incomingChanID, packet.incomingHTLCID,
packet.outgoingChanID, packet.outgoingHTLCID)
fail.Reason = circuit.ErrorEncrypter.EncryptMalformedError(
fail.Reason,
)
default:
// Otherwise, it's a forwarded error, so we'll perform a
// wrapper encryption as normal.
fail.Reason = circuit.ErrorEncrypter.IntermediateEncrypt(
fail.Reason,
)
}
} else if !isFail && circuit.Outgoing != nil {
// If this is an HTLC settle, and it wasn't from a
// locally initiated HTLC, then we'll log a forwarding
// event so we can flush it to disk later.
//
// TODO(roasbeef): only do this once link actually
// fully settles?
localHTLC := packet.incomingChanID == hop.Source
if !localHTLC {
log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+
"from IncomingChanID(%v) to OutgoingChanID(%v)",
circuit.PaymentHash[:], circuit.OutgoingAmount,
circuit.IncomingAmount-circuit.OutgoingAmount,
circuit.Incoming.ChanID, circuit.Outgoing.ChanID)
s.fwdEventMtx.Lock()
s.pendingFwdingEvents = append(
s.pendingFwdingEvents,
channeldb.ForwardingEvent{
Timestamp: time.Now(),
IncomingChanID: circuit.Incoming.ChanID,
OutgoingChanID: circuit.Outgoing.ChanID,
AmtIn: circuit.IncomingAmount,
AmtOut: circuit.OutgoingAmount,
},
)
s.fwdEventMtx.Unlock()
}
}
// A blank IncomingChanID in a circuit indicates that it is a pending
// user-initiated payment.
if packet.incomingChanID == hop.Source {
s.wg.Add(1)
go s.handleLocalResponse(packet)
return nil
}
// Check to see that the source link is online before removing
// the circuit.
return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
// Channel link forwarded us an update_fail_htlc message.
//
// NOTE: when the channel link receives an update_fail_malformed_htlc
// from upstream, it will convert the message into update_fail_htlc and
// forward it. Thus there's no need to catch `UpdateFailMalformedHTLC`
// here.
case *lnwire.UpdateFailHTLC:
return s.handlePacketFail(packet, htlc)
default:
return errors.New("wrong update type")
return fmt.Errorf("wrong update type: %T", htlc)
}
}
@ -1629,47 +1374,34 @@ func (s *Switch) teardownCircuit(pkt *htlcPacket) error {
case *lnwire.UpdateFailHTLC:
pktType = "FAIL"
default:
err := fmt.Errorf("cannot tear down packet of type: %T", htlc)
log.Errorf(err.Error())
return fmt.Errorf("cannot tear down packet of type: %T", htlc)
}
var paymentHash lntypes.Hash
// Perform a defensive check to make sure we don't try to access a nil
// circuit.
circuit := pkt.circuit
if circuit != nil {
copy(paymentHash[:], circuit.PaymentHash[:])
}
log.Debugf("Tearing down circuit with %s pkt, removing circuit=%v "+
"with keystone=%v", pktType, pkt.inKey(), pkt.outKey())
err := s.circuits.DeleteCircuits(pkt.inKey())
if err != nil {
log.Warnf("Failed to tear down circuit (%s, %d) <-> (%s, %d) "+
"with payment_hash=%v using %s pkt", pkt.incomingChanID,
pkt.incomingHTLCID, pkt.outgoingChanID,
pkt.outgoingHTLCID, pkt.circuit.PaymentHash, pktType)
return err
}
switch {
case pkt.circuit.HasKeystone():
log.Debugf("Tearing down open circuit with %s pkt, removing circuit=%v "+
"with keystone=%v", pktType, pkt.inKey(), pkt.outKey())
err := s.circuits.DeleteCircuits(pkt.inKey())
if err != nil {
log.Warnf("Failed to tear down open circuit (%s, %d) <-> (%s, %d) "+
"with payment_hash-%v using %s pkt",
pkt.incomingChanID, pkt.incomingHTLCID,
pkt.outgoingChanID, pkt.outgoingHTLCID,
pkt.circuit.PaymentHash, pktType)
return err
}
log.Debugf("Closed completed %s circuit for %x: "+
"(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash,
pkt.incomingChanID, pkt.incomingHTLCID,
pkt.outgoingChanID, pkt.outgoingHTLCID)
default:
log.Debugf("Tearing down incomplete circuit with %s for inkey=%v",
pktType, pkt.inKey())
err := s.circuits.DeleteCircuits(pkt.inKey())
if err != nil {
log.Warnf("Failed to tear down pending %s circuit for %x: "+
"(%s, %d)", pktType, pkt.circuit.PaymentHash,
pkt.incomingChanID, pkt.incomingHTLCID)
return err
}
log.Debugf("Removed pending onion circuit for %x: "+
"(%s, %d)", pkt.circuit.PaymentHash,
pkt.incomingChanID, pkt.incomingHTLCID)
}
log.Debugf("Closed %s circuit for %v: (%s, %d) <-> (%s, %d)", pktType,
paymentHash, pkt.incomingChanID, pkt.incomingHTLCID,
pkt.outgoingChanID, pkt.outgoingHTLCID)
return nil
}
@ -3052,3 +2784,339 @@ func (s *Switch) AddAliasForLink(chanID lnwire.ChannelID,
return nil
}
// handlePacketAdd handles forwarding an Add packet.
func (s *Switch) handlePacketAdd(packet *htlcPacket,
htlc *lnwire.UpdateAddHTLC) error {
// Check if the node is set to reject all onward HTLCs and also make
// sure that HTLC is not from the source node.
if s.cfg.RejectHTLC {
failure := NewDetailedLinkError(
&lnwire.FailChannelDisabled{},
OutgoingFailureForwardsDisabled,
)
return s.failAddPacket(packet, failure)
}
// Before we attempt to find a non-strict forwarding path for this
// htlc, check whether the htlc is being routed over the same incoming
// and outgoing channel. If our node does not allow forwards of this
// nature, we fail the htlc early. This check is in place to disallow
// inefficiently routed htlcs from locking up our balance. With
// channels where the option-scid-alias feature was negotiated, we also
// have to be sure that the IDs aren't the same since one or both could
// be an alias.
linkErr := s.checkCircularForward(
packet.incomingChanID, packet.outgoingChanID,
s.cfg.AllowCircularRoute, htlc.PaymentHash,
)
if linkErr != nil {
return s.failAddPacket(packet, linkErr)
}
s.indexMtx.RLock()
targetLink, err := s.getLinkByMapping(packet)
if err != nil {
s.indexMtx.RUnlock()
log.Debugf("unable to find link with "+
"destination %v", packet.outgoingChanID)
// If packet was forwarded from another channel link than we
// should notify this link that some error occurred.
linkError := NewLinkError(
&lnwire.FailUnknownNextPeer{},
)
return s.failAddPacket(packet, linkError)
}
targetPeerKey := targetLink.PeerPubKey()
interfaceLinks, _ := s.getLinks(targetPeerKey)
s.indexMtx.RUnlock()
// We'll keep track of any HTLC failures during the link selection
// process. This way we can return the error for precise link that the
// sender selected, while optimistically trying all links to utilize
// our available bandwidth.
linkErrs := make(map[lnwire.ShortChannelID]*LinkError)
// Find all destination channel links with appropriate bandwidth.
var destinations []ChannelLink
for _, link := range interfaceLinks {
var failure *LinkError
// We'll skip any links that aren't yet eligible for
// forwarding.
if !link.EligibleToForward() {
failure = NewDetailedLinkError(
&lnwire.FailUnknownNextPeer{},
OutgoingFailureLinkNotEligible,
)
} else {
// We'll ensure that the HTLC satisfies the current
// forwarding conditions of this target link.
currentHeight := atomic.LoadUint32(&s.bestHeight)
failure = link.CheckHtlcForward(
htlc.PaymentHash, packet.incomingAmount,
packet.amount, packet.incomingTimeout,
packet.outgoingTimeout,
packet.inboundFee,
currentHeight,
packet.originalOutgoingChanID,
)
}
// If this link can forward the htlc, add it to the set of
// destinations.
if failure == nil {
destinations = append(destinations, link)
continue
}
linkErrs[link.ShortChanID()] = failure
}
// If we had a forwarding failure due to the HTLC not satisfying the
// current policy, then we'll send back an error, but ensure we send
// back the error sourced at the *target* link.
if len(destinations) == 0 {
// At this point, some or all of the links rejected the HTLC so
// we couldn't forward it. So we'll try to look up the error
// that came from the source.
linkErr, ok := linkErrs[packet.outgoingChanID]
if !ok {
// If we can't find the error of the source, then we'll
// return an unknown next peer, though this should
// never happen.
linkErr = NewLinkError(
&lnwire.FailUnknownNextPeer{},
)
log.Warnf("unable to find err source for "+
"outgoing_link=%v, errors=%v",
packet.outgoingChanID,
lnutils.SpewLogClosure(linkErrs))
}
log.Tracef("incoming HTLC(%x) violated "+
"target outgoing link (id=%v) policy: %v",
htlc.PaymentHash[:], packet.outgoingChanID,
linkErr)
return s.failAddPacket(packet, linkErr)
}
// Choose a random link out of the set of links that can forward this
// htlc. The reason for randomization is to evenly distribute the htlc
// load without making assumptions about what the best channel is.
//nolint:gosec
destination := destinations[rand.Intn(len(destinations))]
// Retrieve the incoming link by its ShortChannelID. Note that the
// incomingChanID is never set to hop.Source here.
s.indexMtx.RLock()
incomingLink, err := s.getLinkByShortID(packet.incomingChanID)
s.indexMtx.RUnlock()
if err != nil {
// If we couldn't find the incoming link, we can't evaluate the
// incoming's exposure to dust, so we just fail the HTLC back.
linkErr := NewLinkError(
&lnwire.FailTemporaryChannelFailure{},
)
return s.failAddPacket(packet, linkErr)
}
// Evaluate whether this HTLC would increase our fee exposure over the
// threshold on the incoming link. If it does, fail it backwards.
if s.dustExceedsFeeThreshold(
incomingLink, packet.incomingAmount, true,
) {
// The incoming dust exceeds the threshold, so we fail the add
// back.
linkErr := NewLinkError(
&lnwire.FailTemporaryChannelFailure{},
)
return s.failAddPacket(packet, linkErr)
}
// Also evaluate whether this HTLC would increase our fee exposure over
// the threshold on the destination link. If it does, fail it back.
if s.dustExceedsFeeThreshold(
destination, packet.amount, false,
) {
// The outgoing dust exceeds the threshold, so we fail the add
// back.
linkErr := NewLinkError(
&lnwire.FailTemporaryChannelFailure{},
)
return s.failAddPacket(packet, linkErr)
}
// Send the packet to the destination channel link which manages the
// channel.
packet.outgoingChanID = destination.ShortChanID()
return destination.handleSwitchPacket(packet)
}
// handlePacketSettle handles forwarding a settle packet.
func (s *Switch) handlePacketSettle(packet *htlcPacket) error {
// If the source of this packet has not been set, use the circuit map
// to lookup the origin.
circuit, err := s.closeCircuit(packet)
if err != nil {
return err
}
// closeCircuit returns a nil circuit when a settle packet returns an
// ErrUnknownCircuit error upon the inner call to CloseCircuit.
//
// NOTE: We can only get a nil circuit when it has already been deleted
// and when `UpdateFulfillHTLC` is received. After which `RevokeAndAck`
// is received, which invokes `processRemoteSettleFails` in its link.
if circuit == nil {
log.Debugf("Found nil circuit: packet=%v", spew.Sdump(packet))
return nil
}
localHTLC := packet.incomingChanID == hop.Source
// If this is a locally initiated HTLC, we need to handle the packet by
// storing the network result.
//
// A blank IncomingChanID in a circuit indicates that it is a pending
// user-initiated payment.
//
// NOTE: `closeCircuit` modifies the state of `packet`.
if localHTLC {
// TODO(yy): remove the goroutine and send back the error here.
s.wg.Add(1)
go s.handleLocalResponse(packet)
// If this is a locally initiated HTLC, there's no need to
// forward it so we exit.
return nil
}
// If this is an HTLC settle, and it wasn't from a locally initiated
// HTLC, then we'll log a forwarding event so we can flush it to disk
// later.
if circuit.Outgoing != nil {
log.Infof("Forwarded HTLC(%x) of %v (fee: %v) "+
"from IncomingChanID(%v) to OutgoingChanID(%v)",
circuit.PaymentHash[:], circuit.OutgoingAmount,
circuit.IncomingAmount-circuit.OutgoingAmount,
circuit.Incoming.ChanID, circuit.Outgoing.ChanID)
s.fwdEventMtx.Lock()
s.pendingFwdingEvents = append(
s.pendingFwdingEvents,
channeldb.ForwardingEvent{
Timestamp: time.Now(),
IncomingChanID: circuit.Incoming.ChanID,
OutgoingChanID: circuit.Outgoing.ChanID,
AmtIn: circuit.IncomingAmount,
AmtOut: circuit.OutgoingAmount,
},
)
s.fwdEventMtx.Unlock()
}
// Deliver this packet.
return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
}
// handlePacketFail handles forwarding a fail packet.
func (s *Switch) handlePacketFail(packet *htlcPacket,
htlc *lnwire.UpdateFailHTLC) error {
// If the source of this packet has not been set, use the circuit map
// to lookup the origin.
circuit, err := s.closeCircuit(packet)
if err != nil {
return err
}
// If this is a locally initiated HTLC, we need to handle the packet by
// storing the network result.
//
// A blank IncomingChanID in a circuit indicates that it is a pending
// user-initiated payment.
//
// NOTE: `closeCircuit` modifies the state of `packet`.
if packet.incomingChanID == hop.Source {
// TODO(yy): remove the goroutine and send back the error here.
s.wg.Add(1)
go s.handleLocalResponse(packet)
// If this is a locally initiated HTLC, there's no need to
// forward it so we exit.
return nil
}
// Exit early if this hasSource is true. This flag is only set via
// mailbox's `FailAdd`. This method has two callsites,
// - the packet has timed out after `MailboxDeliveryTimeout`, defaults
// to 1 min.
// - the HTLC fails the validation in `channel.AddHTLC`.
// In either case, the `Reason` field is populated. Thus there's no
// need to proceed and extract the failure reason below.
if packet.hasSource {
// Deliver this packet.
return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
}
// HTLC resolutions and messages restored from disk don't have the
// obfuscator set from the original htlc add packet - set it here for
// use in blinded errors.
packet.obfuscator = circuit.ErrorEncrypter
switch {
// No message to encrypt, locally sourced payment.
case circuit.ErrorEncrypter == nil:
// TODO(yy) further check this case as we shouldn't end up here
// as `isLocal` is already false.
// If this is a resolution message, then we'll need to encrypt it as
// it's actually internally sourced.
case packet.isResolution:
var err error
// TODO(roasbeef): don't need to pass actually?
failure := &lnwire.FailPermanentChannelFailure{}
htlc.Reason, err = circuit.ErrorEncrypter.EncryptFirstHop(
failure,
)
if err != nil {
err = fmt.Errorf("unable to obfuscate error: %w", err)
log.Error(err)
}
// Alternatively, if the remote party sends us an
// UpdateFailMalformedHTLC, then we'll need to convert this into a
// proper well formatted onion error as there's no HMAC currently.
case packet.convertedError:
log.Infof("Converting malformed HTLC error for circuit for "+
"Circuit(%x: (%s, %d) <-> (%s, %d))",
packet.circuit.PaymentHash,
packet.incomingChanID, packet.incomingHTLCID,
packet.outgoingChanID, packet.outgoingHTLCID)
htlc.Reason = circuit.ErrorEncrypter.EncryptMalformedError(
htlc.Reason,
)
default:
// Otherwise, it's a forwarded error, so we'll perform a
// wrapper encryption as normal.
htlc.Reason = circuit.ErrorEncrypter.IntermediateEncrypt(
htlc.Reason,
)
}
// Deliver this packet.
return s.mailOrchestrator.Deliver(packet.incomingChanID, packet)
}

View File

@ -662,4 +662,16 @@ var allTestCases = []*lntest.TestCase{
Name: "coop close with external delivery",
TestFunc: testCoopCloseWithExternalDelivery,
},
{
Name: "payment failed htlc local swept",
TestFunc: testPaymentFailedHTLCLocalSwept,
},
{
Name: "payment succeeded htlc remote swept",
TestFunc: testPaymentSucceededHTLCRemoteSwept,
},
{
Name: "send to route failed htlc timeout",
TestFunc: testSendToRouteFailHTLCTimeout,
},
}

View File

@ -10,19 +10,331 @@ import (
"github.com/btcsuite/btcd/btcutil"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lncfg"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"
"github.com/lightningnetwork/lnd/lnrpc/routerrpc"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lntest/node"
"github.com/lightningnetwork/lnd/lntest/rpc"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/require"
)
// testSendDirectPayment creates a topology Alice->Bob and then tests that Alice
// can send a direct payment to Bob. This test modifies the fee estimator to
// return floor fee rate(1 sat/vb).
// testPaymentSucceededHTLCRemoteSwept checks that when an outgoing HTLC is
// timed out and is swept by the remote via the direct preimage spend path, the
// payment will be marked as succeeded. This test creates a topology from Alice
// -> Bob, and let Alice send payments to Bob. Bob then goes offline, such that
// Alice's outgoing HTLC will time out. Once the force close transaction is
// broadcast by Alice, she then goes offline and Bob comes back online to take
// her outgoing HTLC. And Alice should mark this payment as succeeded after she
// comes back online again.
func testPaymentSucceededHTLCRemoteSwept(ht *lntest.HarnessTest) {
// Set the feerate to be 10 sat/vb.
ht.SetFeeEstimate(2500)
// Open a channel with 100k satoshis between Alice and Bob with Alice
// being the sole funder of the channel.
chanAmt := btcutil.Amount(100_000)
openChannelParams := lntest.OpenChannelParams{
Amt: chanAmt,
}
// Create a two hop network: Alice -> Bob.
chanPoints, nodes := createSimpleNetwork(ht, nil, 2, openChannelParams)
chanPoint := chanPoints[0]
alice, bob := nodes[0], nodes[1]
// We now create two payments, one above dust and the other below dust,
// and we should see different behavior in terms of when the payment
// will be marked as failed due to the HTLC timeout.
//
// First, create random preimages.
preimage := ht.RandomPreimage()
dustPreimage := ht.RandomPreimage()
// Get the preimage hashes.
payHash := preimage.Hash()
dustPayHash := dustPreimage.Hash()
// Create an hold invoice for Bob which expects a payment of 10k
// satoshis from Alice.
const paymentAmt = 10_000
req := &invoicesrpc.AddHoldInvoiceRequest{
Value: paymentAmt,
Hash: payHash[:],
// Use a small CLTV value so we can mine fewer blocks.
CltvExpiry: finalCltvDelta,
}
invoice := bob.RPC.AddHoldInvoice(req)
// Create another hold invoice for Bob which expects a payment of 1k
// satoshis from Alice.
const dustAmt = 1000
req = &invoicesrpc.AddHoldInvoiceRequest{
Value: dustAmt,
Hash: dustPayHash[:],
// Use a small CLTV value so we can mine fewer blocks.
CltvExpiry: finalCltvDelta,
}
dustInvoice := bob.RPC.AddHoldInvoice(req)
// Alice now sends both payments to Bob.
payReq := &routerrpc.SendPaymentRequest{
PaymentRequest: invoice.PaymentRequest,
TimeoutSeconds: 3600,
}
dustPayReq := &routerrpc.SendPaymentRequest{
PaymentRequest: dustInvoice.PaymentRequest,
TimeoutSeconds: 3600,
}
// We expect the payment to stay in-flight from both streams.
ht.SendPaymentAssertInflight(alice, payReq)
ht.SendPaymentAssertInflight(alice, dustPayReq)
// We also check the payments are marked as IN_FLIGHT in Alice's
// database.
ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_IN_FLIGHT)
ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_IN_FLIGHT)
// Bob should have two incoming HTLC.
ht.AssertIncomingHTLCActive(bob, chanPoint, payHash[:])
ht.AssertIncomingHTLCActive(bob, chanPoint, dustPayHash[:])
// Alice should have two outgoing HTLCs.
ht.AssertOutgoingHTLCActive(alice, chanPoint, payHash[:])
ht.AssertOutgoingHTLCActive(alice, chanPoint, dustPayHash[:])
// Let Bob go offline.
restartBob := ht.SuspendNode(bob)
// Alice force closes the channel, which puts her commitment tx into
// the mempool.
ht.CloseChannelAssertPending(alice, chanPoint, true)
// We now let Alice go offline to avoid her sweeping her outgoing htlc.
restartAlice := ht.SuspendNode(alice)
// Mine one block to confirm Alice's force closing tx.
ht.MineBlocksAndAssertNumTxes(1, 1)
// Restart Bob to settle the invoice and sweep the htlc output.
require.NoError(ht, restartBob())
// Bob now settles the invoices, since his link with Alice is broken,
// Alice won't know the preimages.
bob.RPC.SettleInvoice(preimage[:])
bob.RPC.SettleInvoice(dustPreimage[:])
// Once Bob comes back up, he should find the force closing transaction
// from Alice and try to sweep the non-dust outgoing htlc via the
// direct preimage spend.
ht.AssertNumPendingSweeps(bob, 1)
// Mine a block to trigger the sweep.
//
// TODO(yy): remove it once `blockbeat` is implemented.
ht.MineEmptyBlocks(1)
// Mine Bob's sweeping tx.
ht.MineBlocksAndAssertNumTxes(1, 1)
// Let Alice come back up. Since the channel is now closed, we expect
// different behaviors based on whether the HTLC is a dust.
// - For dust payment, it should be failed now as the HTLC won't go
// onchain.
// - For non-dust payment, it should be marked as succeeded since her
// outgoing htlc is swept by Bob.
require.NoError(ht, restartAlice())
// Since Alice is restarted, we need to track the payments again.
payStream := alice.RPC.TrackPaymentV2(payHash[:])
dustPayStream := alice.RPC.TrackPaymentV2(dustPayHash[:])
// Check that the dust payment is failed in both the stream and DB.
ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_FAILED)
ht.AssertPaymentStatusFromStream(dustPayStream, lnrpc.Payment_FAILED)
// We expect the non-dust payment to marked as succeeded in Alice's
// database and also from her stream.
ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_SUCCEEDED)
ht.AssertPaymentStatusFromStream(payStream, lnrpc.Payment_SUCCEEDED)
}
// testPaymentFailedHTLCLocalSwept checks that when an outgoing HTLC is timed
// out and claimed onchain via the timeout path, the payment will be marked as
// failed. This test creates a topology from Alice -> Bob, and let Alice send
// payments to Bob. Bob then goes offline, such that Alice's outgoing HTLC will
// time out. Alice will also be restarted to make sure resumed payments are
// also marked as failed.
func testPaymentFailedHTLCLocalSwept(ht *lntest.HarnessTest) {
success := ht.Run("fail payment", func(t *testing.T) {
st := ht.Subtest(t)
runTestPaymentHTLCTimeout(st, false)
})
if !success {
return
}
ht.Run("fail resumed payment", func(t *testing.T) {
st := ht.Subtest(t)
runTestPaymentHTLCTimeout(st, true)
})
}
// runTestPaymentHTLCTimeout is the helper function that actually runs the
// testPaymentFailedHTLCLocalSwept.
func runTestPaymentHTLCTimeout(ht *lntest.HarnessTest, restartAlice bool) {
// Set the feerate to be 10 sat/vb.
ht.SetFeeEstimate(2500)
// Open a channel with 100k satoshis between Alice and Bob with Alice
// being the sole funder of the channel.
chanAmt := btcutil.Amount(100_000)
openChannelParams := lntest.OpenChannelParams{
Amt: chanAmt,
}
// Create a two hop network: Alice -> Bob.
chanPoints, nodes := createSimpleNetwork(ht, nil, 2, openChannelParams)
chanPoint := chanPoints[0]
alice, bob := nodes[0], nodes[1]
// We now create two payments, one above dust and the other below dust,
// and we should see different behavior in terms of when the payment
// will be marked as failed due to the HTLC timeout.
//
// First, create random preimages.
preimage := ht.RandomPreimage()
dustPreimage := ht.RandomPreimage()
// Get the preimage hashes.
payHash := preimage.Hash()
dustPayHash := dustPreimage.Hash()
// Create an hold invoice for Bob which expects a payment of 10k
// satoshis from Alice.
const paymentAmt = 20_000
req := &invoicesrpc.AddHoldInvoiceRequest{
Value: paymentAmt,
Hash: payHash[:],
// Use a small CLTV value so we can mine fewer blocks.
CltvExpiry: finalCltvDelta,
}
invoice := bob.RPC.AddHoldInvoice(req)
// Create another hold invoice for Bob which expects a payment of 1k
// satoshis from Alice.
const dustAmt = 1000
req = &invoicesrpc.AddHoldInvoiceRequest{
Value: dustAmt,
Hash: dustPayHash[:],
// Use a small CLTV value so we can mine fewer blocks.
CltvExpiry: finalCltvDelta,
}
dustInvoice := bob.RPC.AddHoldInvoice(req)
// Alice now sends both the payments to Bob.
payReq := &routerrpc.SendPaymentRequest{
PaymentRequest: invoice.PaymentRequest,
TimeoutSeconds: 3600,
}
dustPayReq := &routerrpc.SendPaymentRequest{
PaymentRequest: dustInvoice.PaymentRequest,
TimeoutSeconds: 3600,
}
// We expect the payment to stay in-flight from both streams.
ht.SendPaymentAssertInflight(alice, payReq)
ht.SendPaymentAssertInflight(alice, dustPayReq)
// We also check the payments are marked as IN_FLIGHT in Alice's
// database.
ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_IN_FLIGHT)
ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_IN_FLIGHT)
// Bob should have two incoming HTLC.
ht.AssertIncomingHTLCActive(bob, chanPoint, payHash[:])
ht.AssertIncomingHTLCActive(bob, chanPoint, dustPayHash[:])
// Alice should have two outgoing HTLCs.
ht.AssertOutgoingHTLCActive(alice, chanPoint, payHash[:])
ht.AssertOutgoingHTLCActive(alice, chanPoint, dustPayHash[:])
// Let Bob go offline.
ht.Shutdown(bob)
// We'll now mine enough blocks to trigger Alice to broadcast her
// commitment transaction due to the fact that the HTLC is about to
// timeout. With the default outgoing broadcast delta of zero, this
// will be the same height as the htlc expiry height.
numBlocks := padCLTV(
uint32(req.CltvExpiry - lncfg.DefaultOutgoingBroadcastDelta),
)
ht.MineBlocks(int(numBlocks))
// Restart Alice if requested.
if restartAlice {
// Restart Alice to test the resumed payment is canceled.
ht.RestartNode(alice)
}
// We now subscribe to the payment status.
payStream := alice.RPC.TrackPaymentV2(payHash[:])
dustPayStream := alice.RPC.TrackPaymentV2(dustPayHash[:])
// Mine a block to confirm Alice's closing transaction.
ht.MineBlocksAndAssertNumTxes(1, 1)
// Now the channel is closed, we expect different behaviors based on
// whether the HTLC is a dust. For dust payment, it should be failed
// now as the HTLC won't go onchain. For non-dust payment, it should
// still be inflight. It won't be marked as failed unless the outgoing
// HTLC is resolved onchain.
//
// NOTE: it's possible for Bob to race against Alice using the
// preimage path. If Bob successfully claims the HTLC, Alice should
// mark the non-dust payment as succeeded.
//
// Check that the dust payment is failed in both the stream and DB.
ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_FAILED)
ht.AssertPaymentStatusFromStream(dustPayStream, lnrpc.Payment_FAILED)
// Check that the non-dust payment is still in-flight.
//
// NOTE: we don't check the payment status from the stream here as
// there's no new status being sent.
ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_IN_FLIGHT)
// We now have two possible cases for the non-dust payment:
// - Bob stays offline, and Alice will sweep her outgoing HTLC, which
// makes the payment failed.
// - Bob comes back online, and claims the HTLC on Alice's commitment
// via direct preimage spend, hence racing against Alice onchain. If
// he succeeds, Alice should mark the payment as succeeded.
//
// TODO(yy): test the second case once we have the RPC to clean
// mempool.
// Since Alice's force close transaction has been confirmed, she should
// sweep her outgoing HTLC in next block.
ht.MineBlocksAndAssertNumTxes(1, 1)
// Cleanup the channel.
ht.CleanupForceClose(alice)
// We expect the non-dust payment to marked as failed in Alice's
// database and also from her stream.
ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_FAILED)
ht.AssertPaymentStatusFromStream(payStream, lnrpc.Payment_FAILED)
}
// testSendDirectPayment creates a topology Alice->Bob and then tests that
// Alice can send a direct payment to Bob. This test modifies the fee estimator
// to return floor fee rate(1 sat/vb).
func testSendDirectPayment(ht *lntest.HarnessTest) {
// Grab Alice and Bob's nodes for convenience.
alice, bob := ht.Alice, ht.Bob
@ -900,3 +1212,219 @@ func sendPaymentInterceptAndCancel(ht *lntest.HarnessTest,
// Cancel the context, which will disconnect the above interceptor.
cancelInterceptor()
}
// testSendToRouteFailHTLCTimeout is similar to
// testPaymentFailedHTLCLocalSwept. The only difference is the `SendPayment` is
// replaced with `SendToRouteV2`. It checks that when an outgoing HTLC is timed
// out and claimed onchain via the timeout path, the payment will be marked as
// failed. This test creates a topology from Alice -> Bob, and let Alice send
// payments to Bob. Bob then goes offline, such that Alice's outgoing HTLC will
// time out. Alice will also be restarted to make sure resumed payments are
// also marked as failed.
func testSendToRouteFailHTLCTimeout(ht *lntest.HarnessTest) {
success := ht.Run("fail payment", func(t *testing.T) {
st := ht.Subtest(t)
runSendToRouteFailHTLCTimeout(st, false)
})
if !success {
return
}
ht.Run("fail resumed payment", func(t *testing.T) {
st := ht.Subtest(t)
runTestPaymentHTLCTimeout(st, true)
})
}
// runSendToRouteFailHTLCTimeout is the helper function that actually runs the
// testSendToRouteFailHTLCTimeout.
func runSendToRouteFailHTLCTimeout(ht *lntest.HarnessTest, restartAlice bool) {
// Set the feerate to be 10 sat/vb.
ht.SetFeeEstimate(2500)
// Open a channel with 100k satoshis between Alice and Bob with Alice
// being the sole funder of the channel.
chanAmt := btcutil.Amount(100_000)
openChannelParams := lntest.OpenChannelParams{
Amt: chanAmt,
}
// Create a two hop network: Alice -> Bob.
chanPoints, nodes := createSimpleNetwork(ht, nil, 2, openChannelParams)
chanPoint := chanPoints[0]
alice, bob := nodes[0], nodes[1]
// We now create two payments, one above dust and the other below dust,
// and we should see different behavior in terms of when the payment
// will be marked as failed due to the HTLC timeout.
//
// First, create random preimages.
preimage := ht.RandomPreimage()
dustPreimage := ht.RandomPreimage()
// Get the preimage hashes.
payHash := preimage.Hash()
dustPayHash := dustPreimage.Hash()
// Create an hold invoice for Bob which expects a payment of 10k
// satoshis from Alice.
const paymentAmt = 20_000
req := &invoicesrpc.AddHoldInvoiceRequest{
Value: paymentAmt,
Hash: payHash[:],
// Use a small CLTV value so we can mine fewer blocks.
CltvExpiry: finalCltvDelta,
}
invoice := bob.RPC.AddHoldInvoice(req)
// Create another hold invoice for Bob which expects a payment of 1k
// satoshis from Alice.
const dustAmt = 1000
req = &invoicesrpc.AddHoldInvoiceRequest{
Value: dustAmt,
Hash: dustPayHash[:],
// Use a small CLTV value so we can mine fewer blocks.
CltvExpiry: finalCltvDelta,
}
dustInvoice := bob.RPC.AddHoldInvoice(req)
// Construct a route to send the non-dust payment.
go func() {
// Query the route to send the payment.
routesReq := &lnrpc.QueryRoutesRequest{
PubKey: bob.PubKeyStr,
Amt: paymentAmt,
FinalCltvDelta: finalCltvDelta,
}
routes := alice.RPC.QueryRoutes(routesReq)
require.Len(ht, routes.Routes, 1)
route := routes.Routes[0]
require.Len(ht, route.Hops, 1)
// Modify the hop to include MPP info.
route.Hops[0].MppRecord = &lnrpc.MPPRecord{
PaymentAddr: invoice.PaymentAddr,
TotalAmtMsat: int64(
lnwire.NewMSatFromSatoshis(paymentAmt),
),
}
// Send the payment with the modified value.
req := &routerrpc.SendToRouteRequest{
PaymentHash: payHash[:],
Route: route,
}
// Send the payment and expect no error.
attempt := alice.RPC.SendToRouteV2(req)
require.Equal(ht, lnrpc.HTLCAttempt_FAILED, attempt.Status)
}()
// Check that the payment is in-flight.
ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_IN_FLIGHT)
// Construct a route to send the dust payment.
go func() {
// Query the route to send the payment.
routesReq := &lnrpc.QueryRoutesRequest{
PubKey: bob.PubKeyStr,
Amt: dustAmt,
FinalCltvDelta: finalCltvDelta,
}
routes := alice.RPC.QueryRoutes(routesReq)
require.Len(ht, routes.Routes, 1)
route := routes.Routes[0]
require.Len(ht, route.Hops, 1)
// Modify the hop to include MPP info.
route.Hops[0].MppRecord = &lnrpc.MPPRecord{
PaymentAddr: dustInvoice.PaymentAddr,
TotalAmtMsat: int64(
lnwire.NewMSatFromSatoshis(dustAmt),
),
}
// Send the payment with the modified value.
req := &routerrpc.SendToRouteRequest{
PaymentHash: dustPayHash[:],
Route: route,
}
// Send the payment and expect no error.
attempt := alice.RPC.SendToRouteV2(req)
require.Equal(ht, lnrpc.HTLCAttempt_FAILED, attempt.Status)
}()
// Check that the dust payment is in-flight.
ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_IN_FLIGHT)
// Bob should have two incoming HTLC.
ht.AssertIncomingHTLCActive(bob, chanPoint, payHash[:])
ht.AssertIncomingHTLCActive(bob, chanPoint, dustPayHash[:])
// Alice should have two outgoing HTLCs.
ht.AssertOutgoingHTLCActive(alice, chanPoint, payHash[:])
ht.AssertOutgoingHTLCActive(alice, chanPoint, dustPayHash[:])
// Let Bob go offline.
ht.Shutdown(bob)
// We'll now mine enough blocks to trigger Alice to broadcast her
// commitment transaction due to the fact that the HTLC is about to
// timeout. With the default outgoing broadcast delta of zero, this
// will be the same height as the htlc expiry height.
numBlocks := padCLTV(
uint32(req.CltvExpiry - lncfg.DefaultOutgoingBroadcastDelta),
)
ht.MineEmptyBlocks(int(numBlocks - 1))
// Restart Alice if requested.
if restartAlice {
// Restart Alice to test the resumed payment is canceled.
ht.RestartNode(alice)
}
// We now subscribe to the payment status.
payStream := alice.RPC.TrackPaymentV2(payHash[:])
dustPayStream := alice.RPC.TrackPaymentV2(dustPayHash[:])
// Mine a block to confirm Alice's closing transaction.
ht.MineBlocksAndAssertNumTxes(1, 1)
// Now the channel is closed, we expect different behaviors based on
// whether the HTLC is a dust. For dust payment, it should be failed
// now as the HTLC won't go onchain. For non-dust payment, it should
// still be inflight. It won't be marked as failed unless the outgoing
// HTLC is resolved onchain.
//
// Check that the dust payment is failed in both the stream and DB.
ht.AssertPaymentStatus(alice, dustPreimage, lnrpc.Payment_FAILED)
ht.AssertPaymentStatusFromStream(dustPayStream, lnrpc.Payment_FAILED)
// Check that the non-dust payment is still in-flight.
//
// NOTE: we don't check the payment status from the stream here as
// there's no new status being sent.
ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_IN_FLIGHT)
// We now have two possible cases for the non-dust payment:
// - Bob stays offline, and Alice will sweep her outgoing HTLC, which
// makes the payment failed.
// - Bob comes back online, and claims the HTLC on Alice's commitment
// via direct preimage spend, hence racing against Alice onchain. If
// he succeeds, Alice should mark the payment as succeeded.
//
// TODO(yy): test the second case once we have the RPC to clean
// mempool.
// Since Alice's force close transaction has been confirmed, she should
// sweep her outgoing HTLC in next block.
ht.MineBlocksAndAssertNumTxes(2, 1)
// We expect the non-dust payment to marked as failed in Alice's
// database and also from her stream.
ht.AssertPaymentStatus(alice, preimage, lnrpc.Payment_FAILED)
ht.AssertPaymentStatusFromStream(payStream, lnrpc.Payment_FAILED)
}

View File

@ -399,6 +399,8 @@ func (h *HarnessTest) resetStandbyNodes(t *testing.T) {
// config for the coming test. This will also inherit the
// test's running context.
h.RestartNodeWithExtraArgs(hn, hn.Cfg.OriginalExtraArgs)
hn.AddToLogf("Finished test case %v", h.manager.currentTestCase)
}
}
@ -1624,31 +1626,14 @@ func (h *HarnessTest) OpenChannelPsbt(srcNode, destNode *node.HarnessNode,
return respStream, upd.PsbtFund.Psbt
}
// CleanupForceClose mines a force close commitment found in the mempool and
// the following sweep transaction from the force closing node.
// CleanupForceClose mines blocks to clean up the force close process. This is
// used for tests that are not asserting the expected behavior is found during
// the force close process, e.g., num of sweeps, etc. Instead, it provides a
// shortcut to move the test forward with a clean mempool.
func (h *HarnessTest) CleanupForceClose(hn *node.HarnessNode) {
// Wait for the channel to be marked pending force close.
h.AssertNumPendingForceClose(hn, 1)
// Mine enough blocks for the node to sweep its funds from the force
// closed channel. The commit sweep resolver is able to offer the input
// to the sweeper at defaulCSV-1, and broadcast the sweep tx once one
// more block is mined.
//
// NOTE: we might empty blocks here as we don't know the exact number
// of blocks to mine. This may end up mining more blocks than needed.
h.MineEmptyBlocks(node.DefaultCSV - 1)
// Assert there is one pending sweep.
h.AssertNumPendingSweeps(hn, 1)
// Mine a block to trigger the sweep.
h.MineEmptyBlocks(1)
// The node should now sweep the funds, clean up by mining the sweeping
// tx.
h.MineBlocksAndAssertNumTxes(1, 1)
// Mine blocks to get any second level HTLC resolved. If there are no
// HTLCs, this will behave like h.AssertNumPendingCloseChannels.
h.mineTillForceCloseResolved(hn)
@ -1771,6 +1756,14 @@ func (h *HarnessTest) SendPaymentAssertSettled(hn *node.HarnessNode,
return h.SendPaymentAndAssertStatus(hn, req, lnrpc.Payment_SUCCEEDED)
}
// SendPaymentAssertInflight sends a payment from the passed node and asserts
// the payment is inflight.
func (h *HarnessTest) SendPaymentAssertInflight(hn *node.HarnessNode,
req *routerrpc.SendPaymentRequest) *lnrpc.Payment {
return h.SendPaymentAndAssertStatus(hn, req, lnrpc.Payment_IN_FLIGHT)
}
// OpenChannelRequest is used to open a channel using the method
// OpenMultiChannelsAsync.
type OpenChannelRequest struct {

View File

@ -167,16 +167,9 @@ func (h *HarnessTest) cleanMempool() {
}
// mineTillForceCloseResolved asserts that the number of pending close channels
// are zero. Each time it checks, a new block is mined using MineBlocksSlow to
// give the node some time to catch up the chain.
//
// NOTE: this method is a workaround to make sure we have a clean mempool at
// the end of a channel force closure. We cannot directly mine blocks and
// assert channels being fully closed because the subsystems in lnd don't share
// the same block height. This is especially the case when blocks are produced
// too fast.
// TODO(yy): remove this workaround when syncing blocks are unified in all the
// subsystems.
// are zero. Each time it checks, an empty block is mined, followed by a
// mempool check to see if there are any sweeping txns. If found, these txns
// are then mined to clean up the mempool.
func (h *HarnessTest) mineTillForceCloseResolved(hn *node.HarnessNode) {
_, startHeight := h.GetBestBlock()
@ -184,7 +177,15 @@ func (h *HarnessTest) mineTillForceCloseResolved(hn *node.HarnessNode) {
resp := hn.RPC.PendingChannels()
total := len(resp.PendingForceClosingChannels)
if total != 0 {
h.MineBlocks(1)
// Mine an empty block first.
h.MineEmptyBlocks(1)
// If there are new sweeping txns, mine a block to
// confirm it.
mem := h.GetRawMempool()
if len(mem) != 0 {
h.MineBlocksAndAssertNumTxes(1, len(mem))
}
return fmt.Errorf("expected num of pending force " +
"close channel to be zero")

View File

@ -60,6 +60,12 @@ func (m *mockPaymentAttemptDispatcherOld) SendHTLC(
return nil
}
func (m *mockPaymentAttemptDispatcherOld) HasAttemptResult(
attemptID uint64) (bool, error) {
return false, nil
}
func (m *mockPaymentAttemptDispatcherOld) GetAttemptResult(paymentID uint64,
_ lntypes.Hash, _ htlcswitch.ErrorDecrypter) (
<-chan *htlcswitch.PaymentResult, error) {
@ -209,6 +215,10 @@ func (m *mockPayerOld) SendHTLC(_ lnwire.ShortChannelID,
}
func (m *mockPayerOld) HasAttemptResult(attemptID uint64) (bool, error) {
return false, nil
}
func (m *mockPayerOld) GetAttemptResult(paymentID uint64, _ lntypes.Hash,
_ htlcswitch.ErrorDecrypter) (<-chan *htlcswitch.PaymentResult, error) {
@ -585,6 +595,13 @@ func (m *mockPaymentAttemptDispatcher) SendHTLC(firstHop lnwire.ShortChannelID,
return args.Error(0)
}
func (m *mockPaymentAttemptDispatcher) HasAttemptResult(
attemptID uint64) (bool, error) {
args := m.Called(attemptID)
return args.Bool(0), args.Error(1)
}
func (m *mockPaymentAttemptDispatcher) GetAttemptResult(attemptID uint64,
paymentHash lntypes.Hash, deobfuscator htlcswitch.ErrorDecrypter) (
<-chan *htlcswitch.PaymentResult, error) {

View File

@ -179,7 +179,7 @@ func (p *paymentLifecycle) resumePayment(ctx context.Context) ([32]byte,
for _, a := range payment.InFlightHTLCs() {
a := a
log.Infof("Resuming payment shard %v for payment %v",
log.Infof("Resuming HTLC attempt %v for payment %v",
a.AttemptID, p.identifier)
p.resultCollector(&a)
@ -463,6 +463,8 @@ func (p *paymentLifecycle) collectResultAsync(attempt *channeldb.HTLCAttempt) {
func (p *paymentLifecycle) collectResult(attempt *channeldb.HTLCAttempt) (
*attemptResult, error) {
log.Tracef("Collecting result for attempt %v", spew.Sdump(attempt))
// We'll retrieve the hash specific to this shard from the
// shardTracker, since it will be needed to regenerate the circuit
// below.
@ -663,8 +665,8 @@ func (p *paymentLifecycle) createNewPaymentAttempt(rt *route.Route,
func (p *paymentLifecycle) sendAttempt(
attempt *channeldb.HTLCAttempt) (*attemptResult, error) {
log.Debugf("Attempting to send payment %v (pid=%v)", p.identifier,
attempt.AttemptID)
log.Debugf("Sending HTLC attempt(id=%v, amt=%v) for payment %v",
attempt.AttemptID, attempt.Route.TotalAmount, p.identifier)
rt := attempt.Route

View File

@ -135,6 +135,16 @@ type PaymentAttemptDispatcher interface {
// NOTE: New payment attempts MUST NOT be made after the keepPids map
// has been created and this method has returned.
CleanStore(keepPids map[uint64]struct{}) error
// HasAttemptResult reads the network result store to fetch the
// specified attempt. Returns true if the attempt result exists.
//
// NOTE: This method is used and should only be used by the router to
// resume payments during startup. It can be viewed as a subset of
// `GetAttemptResult` in terms of its functionality, and can be removed
// once we move the construction of `UpdateAddHTLC` and
// `ErrorDecrypter` into `htlcswitch`.
HasAttemptResult(attemptID uint64) (bool, error)
}
// PaymentSessionSource is an interface that defines a source for the router to
@ -252,9 +262,9 @@ type Config struct {
// sessions.
SessionSource PaymentSessionSource
// QueryBandwidth is a method that allows the router to query the lower
// link layer to determine the up-to-date available bandwidth at a
// prospective link to be traversed. If the link isn't available, then
// GetLink is a method that allows the router to query the lower link
// layer to determine the up-to-date available bandwidth at a
// prospective link to be traversed. If the link isn't available, then
// a value of zero should be returned. Otherwise, the current up-to-
// date knowledge of the available bandwidth of the link should be
// returned.
@ -275,6 +285,11 @@ type Config struct {
// ApplyChannelUpdate can be called to apply a new channel update to the
// graph that we received from a payment failure.
ApplyChannelUpdate func(msg *lnwire.ChannelUpdate) bool
// ClosedSCIDs is used by the router to fetch closed channels.
//
// TODO(yy): remove it once the root cause of stuck payments is found.
ClosedSCIDs map[lnwire.ShortChannelID]struct{}
}
// EdgeLocator is a struct used to identify a specific edge.
@ -336,88 +351,8 @@ func (r *ChannelRouter) Start() error {
// If any payments are still in flight, we resume, to make sure their
// results are properly handled.
payments, err := r.cfg.Control.FetchInFlightPayments()
if err != nil {
return err
}
// Before we restart existing payments and start accepting more
// payments to be made, we clean the network result store of the
// Switch. We do this here at startup to ensure no more payments can be
// made concurrently, so we know the toKeep map will be up-to-date
// until the cleaning has finished.
toKeep := make(map[uint64]struct{})
for _, p := range payments {
for _, a := range p.HTLCs {
toKeep[a.AttemptID] = struct{}{}
}
}
log.Debugf("Cleaning network result store.")
if err := r.cfg.Payer.CleanStore(toKeep); err != nil {
return err
}
for _, payment := range payments {
log.Infof("Resuming payment %v", payment.Info.PaymentIdentifier)
r.wg.Add(1)
go func(payment *channeldb.MPPayment) {
defer r.wg.Done()
// Get the hashes used for the outstanding HTLCs.
htlcs := make(map[uint64]lntypes.Hash)
for _, a := range payment.HTLCs {
a := a
// We check whether the individual attempts
// have their HTLC hash set, if not we'll fall
// back to the overall payment hash.
hash := payment.Info.PaymentIdentifier
if a.Hash != nil {
hash = *a.Hash
}
htlcs[a.AttemptID] = hash
}
// Since we are not supporting creating more shards
// after a restart (only receiving the result of the
// shards already outstanding), we create a simple
// shard tracker that will map the attempt IDs to
// hashes used for the HTLCs. This will be enough also
// for AMP payments, since we only need the hashes for
// the individual HTLCs to regenerate the circuits, and
// we don't currently persist the root share necessary
// to re-derive them.
shardTracker := shards.NewSimpleShardTracker(
payment.Info.PaymentIdentifier, htlcs,
)
// We create a dummy, empty payment session such that
// we won't make another payment attempt when the
// result for the in-flight attempt is received.
paySession := r.cfg.SessionSource.NewPaymentSessionEmpty()
// We pass in a non-timeout context, to indicate we
// don't need it to timeout. It will stop immediately
// after the existing attempt has finished anyway. We
// also set a zero fee limit, as no more routes should
// be tried.
noTimeout := time.Duration(0)
_, _, err := r.sendPayment(
context.Background(), 0,
payment.Info.PaymentIdentifier, noTimeout,
paySession, shardTracker,
)
if err != nil {
log.Errorf("Resuming payment %v failed: %v.",
payment.Info.PaymentIdentifier, err)
return
}
log.Infof("Resumed payment %v completed.",
payment.Info.PaymentIdentifier)
}(payment)
if err := r.resumePayments(); err != nil {
log.Error("Failed to resume payments during startup")
}
return nil
@ -1133,6 +1068,9 @@ func (r *ChannelRouter) SendToRouteSkipTempErr(htlcHash lntypes.Hash,
func (r *ChannelRouter) sendToRoute(htlcHash lntypes.Hash, rt *route.Route,
skipTempErr bool) (*channeldb.HTLCAttempt, error) {
log.Debugf("SendToRoute for payment %v with skipTempErr=%v",
htlcHash, skipTempErr)
// Calculate amount paid to receiver.
amt := rt.ReceiverAmt()
@ -1448,6 +1386,204 @@ func (r *ChannelRouter) BuildRoute(amt fn.Option[lnwire.MilliSatoshi],
)
}
// resumePayments fetches inflight payments and resumes their payment
// lifecycles.
func (r *ChannelRouter) resumePayments() error {
// Get all payments that are inflight.
payments, err := r.cfg.Control.FetchInFlightPayments()
if err != nil {
return err
}
// Before we restart existing payments and start accepting more
// payments to be made, we clean the network result store of the
// Switch. We do this here at startup to ensure no more payments can be
// made concurrently, so we know the toKeep map will be up-to-date
// until the cleaning has finished.
toKeep := make(map[uint64]struct{})
for _, p := range payments {
for _, a := range p.HTLCs {
toKeep[a.AttemptID] = struct{}{}
// Try to fail the attempt if the route contains a dead
// channel.
r.failStaleAttempt(a, p.Info.PaymentIdentifier)
}
}
log.Debugf("Cleaning network result store.")
if err := r.cfg.Payer.CleanStore(toKeep); err != nil {
return err
}
// launchPayment is a helper closure that handles resuming the payment.
launchPayment := func(payment *channeldb.MPPayment) {
defer r.wg.Done()
// Get the hashes used for the outstanding HTLCs.
htlcs := make(map[uint64]lntypes.Hash)
for _, a := range payment.HTLCs {
a := a
// We check whether the individual attempts have their
// HTLC hash set, if not we'll fall back to the overall
// payment hash.
hash := payment.Info.PaymentIdentifier
if a.Hash != nil {
hash = *a.Hash
}
htlcs[a.AttemptID] = hash
}
payHash := payment.Info.PaymentIdentifier
// Since we are not supporting creating more shards after a
// restart (only receiving the result of the shards already
// outstanding), we create a simple shard tracker that will map
// the attempt IDs to hashes used for the HTLCs. This will be
// enough also for AMP payments, since we only need the hashes
// for the individual HTLCs to regenerate the circuits, and we
// don't currently persist the root share necessary to
// re-derive them.
shardTracker := shards.NewSimpleShardTracker(payHash, htlcs)
// We create a dummy, empty payment session such that we won't
// make another payment attempt when the result for the
// in-flight attempt is received.
paySession := r.cfg.SessionSource.NewPaymentSessionEmpty()
// We pass in a non-timeout context, to indicate we don't need
// it to timeout. It will stop immediately after the existing
// attempt has finished anyway. We also set a zero fee limit,
// as no more routes should be tried.
noTimeout := time.Duration(0)
_, _, err := r.sendPayment(
context.Background(), 0, payHash, noTimeout, paySession,
shardTracker,
)
if err != nil {
log.Errorf("Resuming payment %v failed: %v", payHash,
err)
return
}
log.Infof("Resumed payment %v completed", payHash)
}
for _, payment := range payments {
log.Infof("Resuming payment %v", payment.Info.PaymentIdentifier)
r.wg.Add(1)
go launchPayment(payment)
}
return nil
}
// failStaleAttempt will fail an HTLC attempt if it's using an unknown channel
// in its route. It first consults the switch to see if there's already a
// network result stored for this attempt. If not, it will check whether the
// first hop of this attempt is using an active channel known to us. If
// inactive, this attempt will be failed.
//
// NOTE: there's an unknown bug that caused the network result for a particular
// attempt to NOT be saved, resulting a payment being stuck forever. More info:
// - https://github.com/lightningnetwork/lnd/issues/8146
// - https://github.com/lightningnetwork/lnd/pull/8174
func (r *ChannelRouter) failStaleAttempt(a channeldb.HTLCAttempt,
payHash lntypes.Hash) {
// We can only fail inflight HTLCs so we skip the settled/failed ones.
if a.Failure != nil || a.Settle != nil {
return
}
// First, check if we've already had a network result for this attempt.
// If no result is found, we'll check whether the reference link is
// still known to us.
ok, err := r.cfg.Payer.HasAttemptResult(a.AttemptID)
if err != nil {
log.Errorf("Failed to fetch network result for attempt=%v",
a.AttemptID)
return
}
// There's already a network result, no need to fail it here as the
// payment lifecycle will take care of it, so we can exit early.
if ok {
log.Debugf("Already have network result for attempt=%v",
a.AttemptID)
return
}
// We now need to decide whether this attempt should be failed here.
// For very old payments, it's possible that the network results were
// never saved, causing the payments to be stuck inflight. We now check
// whether the first hop is referencing an active channel ID and, if
// not, we will fail the attempt as it has no way to be retried again.
var shouldFail bool
// Validate that the attempt has hop info. If this attempt has no hop
// info it indicates an error in our db.
if len(a.Route.Hops) == 0 {
log.Errorf("Found empty hop for attempt=%v", a.AttemptID)
shouldFail = true
} else {
// Get the short channel ID.
chanID := a.Route.Hops[0].ChannelID
scid := lnwire.NewShortChanIDFromInt(chanID)
// Check whether this link is active. If so, we won't fail the
// attempt but keep waiting for its result.
_, err := r.cfg.GetLink(scid)
if err == nil {
return
}
// We should get the link not found err. If not, we will log an
// error and skip failing this attempt since an unknown error
// occurred.
if !errors.Is(err, htlcswitch.ErrChannelLinkNotFound) {
log.Errorf("Failed to get link for attempt=%v for "+
"payment=%v: %v", a.AttemptID, payHash, err)
return
}
// The channel link is not active, we now check whether this
// channel is already closed. If so, we fail the HTLC attempt
// as there's no need to wait for its network result because
// there's no link. If the channel is still pending, we'll keep
// waiting for the result as we may get a contract resolution
// for this HTLC.
if _, ok := r.cfg.ClosedSCIDs[scid]; ok {
shouldFail = true
}
}
// Exit if there's no need to fail.
if !shouldFail {
return
}
log.Errorf("Failing stale attempt=%v for payment=%v", a.AttemptID,
payHash)
// Fail the attempt in db. If there's an error, there's nothing we can
// do here but logging it.
failInfo := &channeldb.HTLCFailInfo{
Reason: channeldb.HTLCFailUnknown,
FailTime: r.cfg.Clock.Now(),
}
_, err = r.cfg.Control.FailAttempt(payHash, a.AttemptID, failInfo)
if err != nil {
log.Errorf("Fail attempt=%v got error: %v", a.AttemptID, err)
}
}
// getEdgeUnifiers returns a list of edge unifiers for the given route.
func getEdgeUnifiers(source route.Vertex, hops []route.Vertex,
outgoingChans map[uint64]struct{},

View File

@ -93,6 +93,8 @@ func (c *testCtx) getChannelIDFromAlias(t *testing.T, a, b string) uint64 {
return channelID
}
var mockClosedSCIDs map[lnwire.ShortChannelID]struct{}
func createTestCtxFromGraphInstance(t *testing.T, startingHeight uint32,
graphInstance *testGraphInstance) *testCtx {
@ -161,6 +163,7 @@ func createTestCtxFromGraphInstanceAssumeValid(t *testing.T,
PathFindingConfig: pathFindingConfig,
Clock: clock.NewTestClock(time.Unix(1, 0)),
ApplyChannelUpdate: graphBuilder.ApplyChannelUpdate,
ClosedSCIDs: mockClosedSCIDs,
})
require.NoError(t, router.Start(), "unable to start router")
@ -2170,6 +2173,7 @@ func TestSendToRouteSkipTempErrSuccess(t *testing.T) {
NextPaymentID: func() (uint64, error) {
return 0, nil
},
ClosedSCIDs: mockClosedSCIDs,
}}
// Register mockers with the expected method calls.
@ -2253,6 +2257,7 @@ func TestSendToRouteSkipTempErrNonMPP(t *testing.T) {
NextPaymentID: func() (uint64, error) {
return 0, nil
},
ClosedSCIDs: mockClosedSCIDs,
}}
// Expect an error to be returned.
@ -2307,6 +2312,7 @@ func TestSendToRouteSkipTempErrTempFailure(t *testing.T) {
NextPaymentID: func() (uint64, error) {
return 0, nil
},
ClosedSCIDs: mockClosedSCIDs,
}}
// Create the error to be returned.
@ -2389,6 +2395,7 @@ func TestSendToRouteSkipTempErrPermanentFailure(t *testing.T) {
NextPaymentID: func() (uint64, error) {
return 0, nil
},
ClosedSCIDs: mockClosedSCIDs,
}}
// Create the error to be returned.
@ -2475,6 +2482,7 @@ func TestSendToRouteTempFailure(t *testing.T) {
NextPaymentID: func() (uint64, error) {
return 0, nil
},
ClosedSCIDs: mockClosedSCIDs,
}}
// Create the error to be returned.

View File

@ -1005,6 +1005,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
PathFindingConfig: pathFindingConfig,
Clock: clock.NewDefaultClock(),
ApplyChannelUpdate: s.graphBuilder.ApplyChannelUpdate,
ClosedSCIDs: s.fetchClosedChannelSCIDs(),
})
if err != nil {
return nil, fmt.Errorf("can't create router: %w", err)
@ -4829,3 +4830,52 @@ func shouldPeerBootstrap(cfg *Config) bool {
// covering the bootstrapping process.
return !cfg.NoNetBootstrap && !isDevNetwork
}
// fetchClosedChannelSCIDs returns a set of SCIDs that have their force closing
// finished.
func (s *server) fetchClosedChannelSCIDs() map[lnwire.ShortChannelID]struct{} {
// Get a list of closed channels.
channels, err := s.chanStateDB.FetchClosedChannels(false)
if err != nil {
srvrLog.Errorf("Failed to fetch closed channels: %v", err)
return nil
}
// Save the SCIDs in a map.
closedSCIDs := make(map[lnwire.ShortChannelID]struct{}, len(channels))
for _, c := range channels {
// If the channel is not pending, its FC has been finalized.
if !c.IsPending {
closedSCIDs[c.ShortChanID] = struct{}{}
}
}
// Double check whether the reported closed channel has indeed finished
// closing.
//
// NOTE: There are misalignments regarding when a channel's FC is
// marked as finalized. We double check the pending channels to make
// sure the returned SCIDs are indeed terminated.
//
// TODO(yy): fix the misalignments in `FetchClosedChannels`.
pendings, err := s.chanStateDB.FetchPendingChannels()
if err != nil {
srvrLog.Errorf("Failed to fetch pending channels: %v", err)
return nil
}
for _, c := range pendings {
if _, ok := closedSCIDs[c.ShortChannelID]; !ok {
continue
}
// If the channel is still reported as pending, remove it from
// the map.
delete(closedSCIDs, c.ShortChannelID)
srvrLog.Warnf("Channel=%v is prematurely marked as finalized",
c.ShortChannelID)
}
return closedSCIDs
}