htlcswitch+router: define PaymentResult, GetPaymentResult

This lets us distinguish an critical error from a actual payment result
(success or failure). This is important since we know that we can only
attempt another payment when a final result from the previous payment
attempt is received.
This commit is contained in:
Johan T. Halseth 2019-05-16 15:27:29 +02:00
parent be129eb7c7
commit ec087a9f73
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26
7 changed files with 251 additions and 68 deletions

View File

@ -54,6 +54,8 @@ func newConcurrentTester(t *testing.T) *concurrentTester {
}
func (c *concurrentTester) Fatalf(format string, args ...interface{}) {
c.T.Helper()
c.mtx.Lock()
defer c.mtx.Unlock()
@ -1108,13 +1110,30 @@ func TestChannelLinkMultiHopUnknownPaymentHash(t *testing.T) {
}
// Send payment and expose err channel.
_, err = n.aliceServer.htlcSwitch.SendHTLC(
err = n.aliceServer.htlcSwitch.SendHTLC(
n.firstBobChannelLink.ShortChanID(), pid, htlc,
newMockDeobfuscator(),
)
if !strings.Contains(err.Error(), lnwire.CodeUnknownPaymentHash.String()) {
t.Fatalf("expected %v got %v", err,
lnwire.CodeUnknownPaymentHash)
if err != nil {
t.Fatalf("unable to get send payment: %v", err)
}
resultChan, err := n.aliceServer.htlcSwitch.GetPaymentResult(pid)
if err != nil {
t.Fatalf("unable to get payment result: %v", err)
}
var result *PaymentResult
select {
case result = <-resultChan:
case <-time.After(5 * time.Second):
t.Fatalf("no result arrive")
}
fErr := result.Error
if !strings.Contains(fErr.Error(), lnwire.CodeUnknownPaymentHash.String()) {
t.Fatalf("expected %v got %v", lnwire.CodeUnknownPaymentHash, fErr)
}
// Wait for Alice to receive the revocation.
@ -3867,7 +3886,7 @@ func TestChannelLinkAcceptDuplicatePayment(t *testing.T) {
// With the invoice now added to Carol's registry, we'll send the
// payment. It should succeed w/o any issues as it has been crafted
// properly.
_, err = n.aliceServer.htlcSwitch.SendHTLC(
err = n.aliceServer.htlcSwitch.SendHTLC(
n.firstBobChannelLink.ShortChanID(), pid, htlc,
newMockDeobfuscator(),
)
@ -3875,9 +3894,23 @@ func TestChannelLinkAcceptDuplicatePayment(t *testing.T) {
t.Fatalf("unable to send payment to carol: %v", err)
}
resultChan, err := n.aliceServer.htlcSwitch.GetPaymentResult(pid)
if err != nil {
t.Fatalf("unable to get payment result: %v", err)
}
select {
case result := <-resultChan:
if result.Error != nil {
t.Fatalf("payment failed: %v", result.Error)
}
case <-time.After(5 * time.Second):
t.Fatalf("payment result did not arrive")
}
// Now, if we attempt to send the payment *again* it should be rejected
// as it's a duplicate request.
_, err = n.aliceServer.htlcSwitch.SendHTLC(
err = n.aliceServer.htlcSwitch.SendHTLC(
n.firstBobChannelLink.ShortChanID(), pid, htlc,
newMockDeobfuscator(),
)

View File

@ -0,0 +1,25 @@
package htlcswitch
import "errors"
var (
// ErrPaymentIDNotFound is an error returned if the given paymentID is
// not found.
ErrPaymentIDNotFound = errors.New("paymentID not found")
// ErrPaymentIDAlreadyExists is returned if we try to write a pending
// payment whose paymentID already exists.
ErrPaymentIDAlreadyExists = errors.New("paymentID already exists")
)
// PaymentResult wraps a result received from the network after a payment
// attempt was made.
type PaymentResult struct {
// Preimage is set by the switch in case a sent HTLC was settled.
Preimage [32]byte
// Error is non-nil in case a HTLC send failed, and the HTLC is now
// irrevocably cancelled. If the payment failed during forwarding, this
// error will be a *ForwardingError.
Error error
}

View File

@ -71,8 +71,7 @@ type pendingPayment struct {
paymentHash lntypes.Hash
amount lnwire.MilliSatoshi
preimage chan [sha256.Size]byte
err chan error
resultChan chan *PaymentResult
// deobfuscator is a serializable entity which is used if we received
// an error, it deobfuscates the onion failure blob, and extracts the
@ -346,32 +345,52 @@ func (s *Switch) ProcessContractResolution(msg contractcourt.ResolutionMsg) erro
return nil
}
// GetPaymentResult returns the the result of the payment attempt with the
// given paymentID. The method returns a channel where the payment result will
// be sent when available, or an error is encountered. If the paymentID is
// unknown, ErrPaymentIDNotFound will be returned.
func (s *Switch) GetPaymentResult(paymentID uint64) (<-chan *PaymentResult, error) {
s.pendingMutex.Lock()
payment, ok := s.pendingPayments[paymentID]
s.pendingMutex.Unlock()
if !ok {
return nil, ErrPaymentIDNotFound
}
return payment.resultChan, nil
}
// SendHTLC is used by other subsystems which aren't belong to htlc switch
// package in order to send the htlc update. The paymentID used MUST be unique
// for this HTLC, and MUST be used only once, otherwise the switch might reject
// it.
func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, paymentID uint64,
htlc *lnwire.UpdateAddHTLC,
deobfuscator ErrorDecrypter) ([sha256.Size]byte, error) {
htlc *lnwire.UpdateAddHTLC, deobfuscator ErrorDecrypter) error {
// Before sending, double check that we don't already have 1) an
// in-flight payment to this payment hash, or 2) a complete payment for
// the same hash.
if err := s.control.ClearForTakeoff(htlc); err != nil {
return zeroPreimage, err
return err
}
// Create payment and add to the map of payment in order later to be
// able to retrieve it and return response to the user.
payment := &pendingPayment{
err: make(chan error, 1),
preimage: make(chan [sha256.Size]byte, 1),
resultChan: make(chan *PaymentResult, 1),
paymentHash: htlc.PaymentHash,
amount: htlc.Amount,
deobfuscator: deobfuscator,
}
s.pendingMutex.Lock()
if _, ok := s.pendingPayments[paymentID]; ok {
s.pendingMutex.Unlock()
return ErrPaymentIDAlreadyExists
}
s.pendingPayments[paymentID] = payment
s.pendingMutex.Unlock()
@ -388,32 +407,13 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, paymentID uint64,
if err := s.forward(packet); err != nil {
s.removePendingPayment(paymentID)
if err := s.control.Fail(htlc.PaymentHash); err != nil {
return zeroPreimage, err
return err
}
return zeroPreimage, err
return err
}
// Returns channels so that other subsystem might wait/skip the
// waiting of handling of payment.
var preimage [sha256.Size]byte
var err error
select {
case e := <-payment.err:
err = e
case <-s.quit:
return zeroPreimage, ErrSwitchExiting
}
select {
case p := <-payment.preimage:
preimage = p
case <-s.quit:
return zeroPreimage, ErrSwitchExiting
}
return preimage, err
return nil
}
// UpdateForwardingPolicies sends a message to the switch to update the
@ -880,10 +880,7 @@ func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
// has been restarted since sending the payment.
payment := s.findPayment(pkt.incomingHTLCID)
var (
preimage [32]byte
paymentErr error
)
var result *PaymentResult
switch htlc := pkt.htlc.(type) {
@ -900,7 +897,9 @@ func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
return
}
preimage = htlc.PaymentPreimage
result = &PaymentResult{
Preimage: htlc.PaymentPreimage,
}
// We've received a fail update which means we can finalize the user
// payment and return fail response.
@ -918,10 +917,13 @@ func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
// The error reason will be unencypted in case this a local
// failure or a converted error.
unencrypted := pkt.localFailure || pkt.convertedError
paymentErr = s.parseFailedPayment(
paymentErr := s.parseFailedPayment(
payment, pkt.incomingHTLCID, payment.paymentHash,
unencrypted, pkt.isResolution, htlc,
)
result = &PaymentResult{
Error: paymentErr,
}
default:
log.Warnf("Received unknown response type: %T", pkt.htlc)
@ -931,9 +933,7 @@ func (s *Switch) handleLocalResponse(pkt *htlcPacket) {
// Deliver the payment error and preimage to the application, if it is
// waiting for a response.
if payment != nil {
payment.err <- paymentErr
payment.preimage <- preimage
s.removePendingPayment(pkt.incomingHTLCID)
payment.resultChan <- result
}
}

View File

@ -1417,7 +1417,7 @@ func testSkipLinkLocalForward(t *testing.T, eligible bool,
// We'll attempt to send out a new HTLC that has Alice as the first
// outgoing link. This should fail as Alice isn't yet able to forward
// any active HTLC's.
_, err = s.SendHTLC(aliceChannelLink.ShortChanID(), 0, addMsg, nil)
err = s.SendHTLC(aliceChannelLink.ShortChanID(), 0, addMsg, nil)
if err == nil {
t.Fatalf("local forward should fail due to inactive link")
}
@ -1738,14 +1738,39 @@ func TestSwitchSendPayment(t *testing.T) {
PaymentHash: rhash,
Amount: 1,
}
paymentID := uint64(123)
// First check that the switch will correctly respond that this payment
// ID is unknown.
_, err = s.GetPaymentResult(paymentID)
if err != ErrPaymentIDNotFound {
t.Fatalf("expected ErrPaymentIDNotFound, got %v", err)
}
// Handle the request and checks that bob channel link received it.
errChan := make(chan error)
go func() {
_, err := s.SendHTLC(
aliceChannelLink.ShortChanID(), 0, update,
err := s.SendHTLC(
aliceChannelLink.ShortChanID(), paymentID, update,
newMockDeobfuscator())
errChan <- err
if err != nil {
errChan <- err
return
}
resultChan, err := s.GetPaymentResult(paymentID)
if err != nil {
errChan <- err
return
}
result := <-resultChan
if result.Error != nil {
errChan <- result.Error
return
}
errChan <- nil
}()
select {

View File

@ -794,10 +794,23 @@ func preparePayment(sendingPeer, receivingPeer lnpeer.Peer,
// Send payment and expose err channel.
return invoice, func() error {
_, err := sender.htlcSwitch.SendHTLC(
err := sender.htlcSwitch.SendHTLC(
firstHop, pid, htlc, newMockDeobfuscator(),
)
return err
if err != nil {
return err
}
resultChan, err := sender.htlcSwitch.GetPaymentResult(pid)
if err != nil {
return err
}
result := <-resultChan
if result.Error != nil {
return result.Error
}
return nil
}, nil
}
@ -1261,10 +1274,26 @@ func (n *twoHopNetwork) makeHoldPayment(sendingPeer, receivingPeer lnpeer.Peer,
// Send payment and expose err channel.
go func() {
_, err := sender.htlcSwitch.SendHTLC(
err := sender.htlcSwitch.SendHTLC(
firstHop, pid, htlc, newMockDeobfuscator(),
)
paymentErr <- err
if err != nil {
paymentErr <- err
return
}
resultChan, err := sender.htlcSwitch.GetPaymentResult(pid)
if err != nil {
paymentErr <- err
return
}
result := <-resultChan
if result.Error != nil {
paymentErr <- result.Error
return
}
paymentErr <- nil
}()
return paymentErr

View File

@ -1,28 +1,61 @@
package routing
import (
"crypto/sha256"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwire"
)
type mockPaymentAttemptDispatcher struct {
onPayment func(firstHop lnwire.ShortChannelID) ([32]byte, error)
results map[uint64]*htlcswitch.PaymentResult
}
var _ PaymentAttemptDispatcher = (*mockPaymentAttemptDispatcher)(nil)
func (m *mockPaymentAttemptDispatcher) SendHTLC(firstHop lnwire.ShortChannelID,
_ uint64,
pid uint64,
_ *lnwire.UpdateAddHTLC,
_ htlcswitch.ErrorDecrypter) ([sha256.Size]byte, error) {
_ htlcswitch.ErrorDecrypter) error {
if m.onPayment != nil {
return m.onPayment(firstHop)
if m.onPayment == nil {
return nil
}
return [sha256.Size]byte{}, nil
if m.results == nil {
m.results = make(map[uint64]*htlcswitch.PaymentResult)
}
var result *htlcswitch.PaymentResult
preimage, err := m.onPayment(firstHop)
if err != nil {
fwdErr, ok := err.(*htlcswitch.ForwardingError)
if !ok {
return err
}
result = &htlcswitch.PaymentResult{
Error: fwdErr,
}
} else {
result = &htlcswitch.PaymentResult{Preimage: preimage}
}
m.results[pid] = result
return nil
}
func (m *mockPaymentAttemptDispatcher) GetPaymentResult(paymentID uint64) (
<-chan *htlcswitch.PaymentResult, error) {
c := make(chan *htlcswitch.PaymentResult, 1)
res, ok := m.results[paymentID]
if !ok {
return nil, htlcswitch.ErrPaymentIDNotFound
}
c <- res
return c, nil
}
func (m *mockPaymentAttemptDispatcher) setPaymentResult(

View File

@ -2,7 +2,6 @@ package routing
import (
"bytes"
"crypto/sha256"
"fmt"
"runtime"
"sync"
@ -136,7 +135,15 @@ type PaymentAttemptDispatcher interface {
SendHTLC(firstHop lnwire.ShortChannelID,
paymentID uint64,
htlcAdd *lnwire.UpdateAddHTLC,
deobfuscator htlcswitch.ErrorDecrypter) ([sha256.Size]byte, error)
deobfuscator htlcswitch.ErrorDecrypter) error
// GetPaymentResult returns the the result of the payment attempt with
// the given paymentID. The method returns a channel where the payment
// result will be sent when available, or an error is encountered. If
// the paymentID is unknown, htlcswitch.ErrPaymentIDNotFound will be
// returned.
GetPaymentResult(paymentID uint64) (
<-chan *htlcswitch.PaymentResult, error)
}
// FeeSchema is the set fee configuration for a Lightning Node on the network.
@ -1711,19 +1718,50 @@ func (r *ChannelRouter) sendPaymentAttempt(paySession *paymentSession,
return [32]byte{}, true, err
}
preimage, err := r.cfg.Payer.SendHTLC(
err = r.cfg.Payer.SendHTLC(
firstHop, paymentID, htlcAdd, errorDecryptor,
)
if err == nil {
return preimage, true, nil
if err != nil {
log.Errorf("Failed sending attempt %d for payment %x to "+
"switch: %v", paymentID, paymentHash, err)
// We must inspect the error to know whether it was critical or
// not, to decide whether we should continue trying.
finalOutcome := r.processSendError(
paySession, route, err,
)
return [32]byte{}, finalOutcome, err
}
log.Errorf("Attempt to send payment %x failed: %v",
paymentHash, err)
// Now ask the switch to return the result of the payment when
// available.
resultChan, err := r.cfg.Payer.GetPaymentResult(paymentID)
if err != nil {
log.Errorf("Failed getting result for paymentID %d "+
"from switch: %v", paymentID, err)
return [32]byte{}, true, err
}
finalOutcome := r.processSendError(paySession, route, err)
var result *htlcswitch.PaymentResult
select {
case result = <-resultChan:
case <-r.quit:
return [32]byte{}, true, ErrRouterShuttingDown
}
return [32]byte{}, finalOutcome, err
if result.Error != nil {
log.Errorf("Attempt to send payment %x failed: %v",
paymentHash, result.Error)
finalOutcome := r.processSendError(
paySession, route, result.Error,
)
return [32]byte{}, finalOutcome, result.Error
}
return result.Preimage, true, nil
}
// processSendError analyzes the error for the payment attempt received from the