Merge pull request #5901 from Roasbeef/dedup-interceptor

Dedup interceptor
This commit is contained in:
Olaoluwa Osuntokun 2021-10-28 17:44:49 -07:00 committed by GitHub
commit 66ca2a994b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 178 additions and 4 deletions

View file

@ -580,6 +580,10 @@ messages directly. There is no routing/path finding involved.
fixed](https://github.com/lightningnetwork/lnd/pull/5893) in the
[`btcwallet` dependency](https://github.com/btcsuite/btcwallet/pull/773).
* [A bug has been fixed that would at times cause intercepted HTLCs to be
re-notified](https://github.com/lightningnetwork/lnd/pull/5901), which could
lead to higher-level HTLC mismanagement issues.
## Documentation
The [code contribution guidelines have been updated to mention the new

View file

@ -150,6 +150,11 @@ func (r *forwardInterceptor) holdAndForwardToClient(
htlc := forward.Packet()
inKey := htlc.IncomingCircuit
// Ignore already held htlcs.
if _, ok := r.holdForwards[inKey]; ok {
return nil
}
// First hold the forward, then send to client.
r.holdForwards[inKey] = forward
interceptionRequest := &ForwardHtlcInterceptRequest{

View file

@ -33,7 +33,168 @@ type interceptorTestCase struct {
interceptorAction routerrpc.ResolveHoldForwardAction
}
// testForwardInterceptor tests the forward interceptor RPC layer.
// testForwardInterceptorDedupHtlc tests that upon reconnection, duplicate
// HTLCs aren't re-notified using the HTLC interceptor API.
func testForwardInterceptorDedupHtlc(net *lntest.NetworkHarness, t *harnessTest) {
// Initialize the test context with 3 connected nodes.
alice := net.NewNode(t.t, "alice", nil)
defer shutdownAndAssert(net, t, alice)
bob := net.NewNode(t.t, "bob", nil)
defer shutdownAndAssert(net, t, alice)
carol := net.NewNode(t.t, "carol", nil)
defer shutdownAndAssert(net, t, alice)
tc := newInterceptorTestContext(t, net, alice, bob, carol)
const (
chanAmt = btcutil.Amount(300000)
)
// Open and wait for channels.
tc.openChannel(tc.alice, tc.bob, chanAmt)
tc.openChannel(tc.bob, tc.carol, chanAmt)
defer tc.closeChannels()
tc.waitForChannels()
ctxb := context.Background()
ctxt, cancelInterceptor := context.WithCancel(ctxb)
interceptor, err := tc.bob.RouterClient.HtlcInterceptor(ctxt)
require.NoError(tc.t.t, err, "failed to create HtlcInterceptor")
addResponse, err := tc.carol.AddInvoice(ctxb, &lnrpc.Invoice{
ValueMsat: 1000,
})
require.NoError(tc.t.t, err, "unable to add invoice")
invoice, err := tc.carol.LookupInvoice(ctxb, &lnrpc.PaymentHash{
RHashStr: hex.EncodeToString(addResponse.RHash),
})
require.NoError(tc.t.t, err, "unable to find invoice")
// We start the htlc interceptor with a simple implementation that
// saves all intercepted packets. These packets are held to simulate a
// pending payment.
interceptedPacketstMap := &sync.Map{}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
packet, err := interceptor.Recv()
if err != nil {
// If it is just the error result of the
// context cancellation the we exit silently.
status, ok := status.FromError(err)
if ok && status.Code() == codes.Canceled {
return
}
// Otherwise it an unexpected error, we fail
// the test.
require.NoError(
tc.t.t, err,
"unexpected error in interceptor.Recv()",
)
return
}
interceptedPacketstMap.Store(
packet.IncomingCircuitKey.HtlcId, packet,
)
}
}()
// We initiate a payment from Alice.
wg.Add(1)
go func() {
defer wg.Done()
_, _ = tc.sendAliceToCarolPayment(
ctxb, 1000,
invoice.RHash, invoice.PaymentAddr,
)
}()
// Here we should wait for the channel to contain a pending htlc, and
// also be shown as being active.
err = wait.Predicate(func() bool {
channels, err := tc.bob.ListChannels(ctxt, &lnrpc.ListChannelsRequest{
ActiveOnly: true,
Peer: tc.alice.PubKey[:],
})
if err != nil {
return false
}
if len(channels.Channels) == 0 {
return false
}
aliceChan := channels.Channels[0]
if len(aliceChan.PendingHtlcs) == 0 {
return false
}
return aliceChan.Active
}, defaultTimeout)
require.NoError(
tc.t.t, err, "alice <> bob channel pending htlc never arrived",
)
// At this point we want to make bob's link send all pending htlcs to
// the switch again. We force this behavior by disconnecting and
// connecting to the peer.
if err := tc.net.DisconnectNodes(tc.bob, tc.alice); err != nil {
tc.t.Fatalf("failed to disconnect alice and bob")
}
tc.net.EnsureConnected(tc.t.t, tc.bob, tc.alice)
// Here we wait for the channel to be active again.
err = wait.Predicate(func() bool {
req := &lnrpc.ListChannelsRequest{
ActiveOnly: true,
Peer: tc.alice.PubKey[:],
}
channels, err := tc.bob.ListChannels(ctxt, req)
return err == nil && len(channels.Channels) > 0
}, defaultTimeout)
require.NoError(
tc.t.t, err, "alice <> bob channel didn't re-activate",
)
// Now that the channel is active we make sure the test passes as
// expected.
payments, err := tc.alice.ListPayments(ctxb, &lnrpc.ListPaymentsRequest{
IncludeIncomplete: true,
})
require.NoError(tc.t.t, err, "failed to fetch payment")
// We expect one in flight payment since we held the htlcs.
require.Equal(tc.t.t, len(payments.Payments), 1)
require.Equal(tc.t.t, payments.Payments[0].Status, lnrpc.Payment_IN_FLIGHT)
// We now fail all htlcs to cancel the payment.
packetsCount := 0
interceptedPacketstMap.Range(func(_, packet interface{}) bool {
p := packet.(*routerrpc.ForwardHtlcInterceptRequest)
_ = interceptor.Send(&routerrpc.ForwardHtlcInterceptResponse{
IncomingCircuitKey: p.IncomingCircuitKey,
Action: routerrpc.ResolveHoldForwardAction_FAIL,
})
packetsCount++
return true
})
// At this point if we have more than one held htlcs then we should
// fail. This means we hold the same htlc twice which is a risk we
// want to eliminate. If we don't have the same htlc twice in theory we
// can cancel one and settle the other by mistake.
require.Equal(tc.t.t, packetsCount, 1)
cancelInterceptor()
wg.Wait()
}
// testForwardInterceptorBasic tests the forward interceptor RPC layer.
// The test creates a cluster of 3 connected nodes: Alice -> Bob -> Carol
// Alice sends 4 different payments to Carol while the interceptor handles
// differently the htlcs.
@ -43,7 +204,7 @@ type interceptorTestCase struct {
// 3. Intercepted held htlcs result in no payment (invoice is not settled).
// 4. When Interceptor disconnects it resumes all held htlcs, which result in
// valid payment (invoice is settled).
func testForwardInterceptor(net *lntest.NetworkHarness, t *harnessTest) {
func testForwardInterceptorBasic(net *lntest.NetworkHarness, t *harnessTest) {
// Initialize the test context with 3 connected nodes.
alice := net.NewNode(t.t, "alice", nil)
defer shutdownAndAssert(net, t, alice)

View file

@ -316,8 +316,12 @@ var allTestCases = []*testCase{
test: testRestAPI,
},
{
name: "intercept forwarded htlc packets",
test: testForwardInterceptor,
name: "forward interceptor",
test: testForwardInterceptorBasic,
},
{
name: "forward interceptor dedup htlcs",
test: testForwardInterceptorDedupHtlc,
},
{
name: "wumbo channels",