htlcswitch: change ForwardPackets to return error

As part of the preparation to the switch interceptor feature, this
function is changed  to return error instead of error channel that
is closed automatically.
Returning an error channel has become complex to maintain and
implement when adding more asynchronous flows to the switch.
The change doesn't affect the current behavior which logs the
errors as before.
This commit is contained in:
Roei Erez 2020-05-19 12:13:02 +03:00
parent 87880c0d56
commit 1a6701122c
6 changed files with 131 additions and 135 deletions

View File

@ -135,10 +135,10 @@ type ChannelLinkConfig struct {
Switch *Switch
// ForwardPackets attempts to forward the batch of htlcs through the
// switch, any failed packets will be returned to the provided
// ChannelLink. The link's quit signal should be provided to allow
// switch. The function returns and error in case it fails to send one or
// more packets. The link's quit signal should be provided to allow
// cancellation of forwarding during link shutdown.
ForwardPackets func(chan struct{}, ...*htlcPacket) chan error
ForwardPackets func(chan struct{}, ...*htlcPacket) error
// DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
// blobs, which are then used to inform how to forward an HTLC.
@ -2971,8 +2971,10 @@ func (l *channelLink) forwardBatch(packets ...*htlcPacket) {
filteredPkts = append(filteredPkts, pkt)
}
errChan := l.cfg.ForwardPackets(l.quit, filteredPkts...)
go handleBatchFwdErrs(errChan, l.log)
if err := l.cfg.ForwardPackets(l.quit, filteredPkts...); err != nil {
log.Errorf("Unhandled error while reforwarding htlc "+
"settle/fail over htlcswitch: %v", err)
}
}
// sendHTLCError functions cancels HTLC and send cancel message back to the

View File

@ -86,7 +86,7 @@ type mailBoxConfig struct {
// forwardPackets send a varidic number of htlcPackets to the switch to
// be routed. A quit channel should be provided so that the call can
// properly exit during shutdown.
forwardPackets func(chan struct{}, ...*htlcPacket) chan error
forwardPackets func(chan struct{}, ...*htlcPacket) error
// clock is a time source for the mailbox.
clock clock.Clock
@ -680,8 +680,10 @@ func (m *memoryMailBox) FailAdd(pkt *htlcPacket) {
},
}
errChan := m.cfg.forwardPackets(m.quit, failPkt)
go handleBatchFwdErrs(errChan, log)
if err := m.cfg.forwardPackets(m.quit, failPkt); err != nil {
log.Errorf("Unhandled error while reforwarding packets "+
"settle/fail over htlcswitch: %v", err)
}
}
// MessageOutBox returns a channel that any new messages ready for delivery
@ -734,7 +736,7 @@ type mailOrchConfig struct {
// forwardPackets send a varidic number of htlcPackets to the switch to
// be routed. A quit channel should be provided so that the call can
// properly exit during shutdown.
forwardPackets func(chan struct{}, ...*htlcPacket) chan error
forwardPackets func(chan struct{}, ...*htlcPacket) error
// fetchUpdate retreives the most recent channel update for the channel
// this mailbox belongs to.

View File

@ -218,16 +218,13 @@ func newMailboxContext(t *testing.T, startTime time.Time,
}
func (c *mailboxContext) forward(_ chan struct{},
pkts ...*htlcPacket) chan error {
pkts ...*htlcPacket) error {
for _, pkt := range pkts {
c.forwards <- pkt
}
errChan := make(chan error)
close(errChan)
return errChan
return nil
}
func (c *mailboxContext) sendAdds(start, num int) []*htlcPacket {
@ -555,12 +552,8 @@ func TestMailOrchestrator(t *testing.T) {
}, nil
},
forwardPackets: func(_ chan struct{},
pkts ...*htlcPacket) chan error {
// Close the channel immediately so the goroutine
// logging errors can exit.
errChan := make(chan error)
close(errChan)
return errChan
pkts ...*htlcPacket) error {
return nil
},
clock: clock.NewTestClock(time.Now()),
expiry: testExpiry,

View File

@ -9,7 +9,6 @@ import (
"time"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog"
"github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
@ -548,24 +547,14 @@ func (s *Switch) IsForwardedHTLC(chanID lnwire.ShortChannelID,
// given to forward them through the router. The sending link's quit channel is
// used to prevent deadlocks when the switch stops a link in the midst of
// forwarding.
//
// NOTE: This method guarantees that the returned err chan will eventually be
// closed. The receiver should read on the channel until receiving such a
// signal.
func (s *Switch) ForwardPackets(linkQuit chan struct{},
packets ...*htlcPacket) chan error {
packets ...*htlcPacket) error {
var (
// fwdChan is a buffered channel used to receive err msgs from
// the htlcPlex when forwarding this batch.
fwdChan = make(chan error, len(packets))
// errChan is a buffered channel returned to the caller, that is
// proxied by the fwdChan. This method guarantees that errChan
// will be closed eventually to alert the receiver that it can
// stop reading from the channel.
errChan = make(chan error, len(packets))
// numSent keeps a running count of how many packets are
// forwarded to the switch, which determines how many responses
// we will wait for on the fwdChan..
@ -574,8 +563,7 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{},
// No packets, nothing to do.
if len(packets) == 0 {
close(errChan)
return errChan
return nil
}
// Setup a barrier to prevent the background tasks from processing
@ -590,18 +578,13 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{},
// it is already in the process of shutting down.
select {
case <-linkQuit:
close(errChan)
return errChan
return nil
case <-s.quit:
close(errChan)
return errChan
return nil
default:
// Spawn a goroutine the proxy the errs back to the returned err
// chan. This is done to ensure the err chan returned to the
// caller closed properly, alerting the receiver of completion
// or shutdown.
// Spawn a goroutine to log the errors returned from failed packets.
s.wg.Add(1)
go s.proxyFwdErrs(&numSent, &wg, fwdChan, errChan)
go s.logFwdErrs(&numSent, &wg, fwdChan)
}
// Make a first pass over the packets, forwarding any settles or fails.
@ -619,7 +602,7 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{},
default:
err := s.routeAsync(packet, fwdChan, linkQuit)
if err != nil {
return errChan
return fmt.Errorf("failed to forward packet %v", err)
}
numSent++
}
@ -628,7 +611,7 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{},
// If this batch did not contain any circuits to commit, we can return
// early.
if len(circuits) == 0 {
return errChan
return nil
}
// Write any circuits that we found to disk.
@ -664,7 +647,7 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{},
for _, packet := range addedPackets {
err := s.routeAsync(packet, fwdChan, linkQuit)
if err != nil {
return errChan
return fmt.Errorf("failed to forward packet %v", err)
}
numSent++
}
@ -693,21 +676,12 @@ func (s *Switch) ForwardPackets(linkQuit chan struct{},
}
}
return errChan
return nil
}
// proxyFwdErrs transmits any errors received on `fwdChan` back to `errChan`,
// and guarantees that the `errChan` will be closed after 1) all errors have
// been sent, or 2) the switch has received a shutdown. The `errChan` should be
// buffered with at least the value of `num` after the barrier has been
// released.
//
// NOTE: The receiver of `errChan` should read until the channel closed, since
// this proxying guarantees that the close will happen.
func (s *Switch) proxyFwdErrs(num *int, wg *sync.WaitGroup,
fwdChan, errChan chan error) {
// logFwdErrs logs any errors received on `fwdChan`
func (s *Switch) logFwdErrs(num *int, wg *sync.WaitGroup, fwdChan chan error) {
defer s.wg.Done()
defer close(errChan)
// Wait here until the outer function has finished persisting
// and routing the packets. This guarantees we don't read from num until
@ -718,7 +692,10 @@ func (s *Switch) proxyFwdErrs(num *int, wg *sync.WaitGroup,
for i := 0; i < numSent; i++ {
select {
case err := <-fwdChan:
errChan <- err
if err != nil {
log.Errorf("Unhandled error while reforwarding htlc "+
"settle/fail over htlcswitch: %v", err)
}
case <-s.quit:
log.Errorf("unable to forward htlc packet " +
"htlc switch was stopped")
@ -1925,28 +1902,10 @@ func (s *Switch) reforwardSettleFails(fwdPkgs []*channeldb.FwdPkg) {
// Since this send isn't tied to a specific link, we pass a nil
// link quit channel, meaning the send will fail only if the
// switch receives a shutdown request.
errChan := s.ForwardPackets(nil, switchPackets...)
go handleBatchFwdErrs(errChan, log)
}
}
// handleBatchFwdErrs waits on the given errChan until it is closed, logging the
// errors returned from any unsuccessful forwarding attempts.
func handleBatchFwdErrs(errChan chan error, l btclog.Logger) {
for {
err, ok := <-errChan
if !ok {
// Err chan has been drained or switch is shutting down.
// Either way, return.
return
if err := s.ForwardPackets(nil, switchPackets...); err != nil {
log.Errorf("Unhandled error while reforwarding packets "+
"settle/fail over htlcswitch: %v", err)
}
if err == nil {
continue
}
l.Errorf("Unhandled error while reforwarding htlc "+
"settle/fail over htlcswitch: %v", err)
}
}

View File

@ -172,6 +172,13 @@ func TestSwitchSendPending(t *testing.T) {
t.Fatalf("unable to create alice server: %v", err)
}
bobPeer, err := newMockServer(
t, "bob", testStartingHeight, nil, testDefaultDelta,
)
if err != nil {
t.Fatalf("unable to create bob server: %v", err)
}
s, err := initSwitchWithDB(testStartingHeight, nil)
if err != nil {
t.Fatalf("unable to init switch: %v", err)
@ -181,7 +188,7 @@ func TestSwitchSendPending(t *testing.T) {
}
defer s.Stop()
chanID1, _, aliceChanID, bobChanID := genIDs()
chanID1, chanID2, aliceChanID, bobChanID := genIDs()
pendingChanID := lnwire.ShortChannelID{}
@ -192,6 +199,13 @@ func TestSwitchSendPending(t *testing.T) {
t.Fatalf("unable to add alice link: %v", err)
}
bobChannelLink := newMockChannelLink(
s, chanID2, bobChanID, bobPeer, true,
)
if err := s.AddLink(bobChannelLink); err != nil {
t.Fatalf("unable to add bob link: %v", err)
}
// Create request which should is being forwarded from Bob channel
// link to Alice channel link.
preimage, err := genPreimage()
@ -212,7 +226,17 @@ func TestSwitchSendPending(t *testing.T) {
// Send the ADD packet, this should not be forwarded out to the link
// since there are no eligible links.
err = forwardPackets(t, s, packet)
if err = s.ForwardPackets(nil, packet); err != nil {
t.Fatal(err)
}
select {
case p := <-bobChannelLink.packets:
if p.linkFailure != nil {
err = p.linkFailure
}
case <-time.After(time.Second):
t.Fatal("no timely reply from switch")
}
linkErr, ok := err.(*LinkError)
if !ok {
t.Fatalf("expected link error, got: %T", err)
@ -248,7 +272,7 @@ func TestSwitchSendPending(t *testing.T) {
packet.incomingHTLCID++
// Handle the request and checks that bob channel link received it.
if err := forwardPackets(t, s, packet); err != nil {
if err := s.ForwardPackets(nil, packet); err != nil {
t.Fatalf("unexpected forward failure: %v", err)
}
@ -321,7 +345,7 @@ func TestSwitchForward(t *testing.T) {
}
// Handle the request and checks that bob channel link received it.
if err := forwardPackets(t, s, packet); err != nil {
if err := s.ForwardPackets(nil, packet); err != nil {
t.Fatal(err)
}
@ -355,7 +379,7 @@ func TestSwitchForward(t *testing.T) {
}
// Handle the request and checks that payment circuit works properly.
if err := forwardPackets(t, s, packet); err != nil {
if err := s.ForwardPackets(nil, packet); err != nil {
t.Fatal(err)
}
@ -450,7 +474,7 @@ func TestSwitchForwardFailAfterFullAdd(t *testing.T) {
}
// Handle the request and checks that bob channel link received it.
if err := forwardPackets(t, s, ogPacket); err != nil {
if err := s.ForwardPackets(nil, ogPacket); err != nil {
t.Fatal(err)
}
@ -538,7 +562,7 @@ func TestSwitchForwardFailAfterFullAdd(t *testing.T) {
}
// Send the fail packet from the remote peer through the switch.
if err := <-s2.ForwardPackets(nil, fail); err != nil {
if err := s2.ForwardPackets(nil, fail); err != nil {
t.Fatalf(err.Error())
}
@ -562,9 +586,13 @@ func TestSwitchForwardFailAfterFullAdd(t *testing.T) {
}
// Send the fail packet from the remote peer through the switch.
if err := <-s2.ForwardPackets(nil, fail); err == nil {
t.Fatalf("expected failure when sending duplicate fail " +
"with no pending circuit")
if err := s.ForwardPackets(nil, fail); err != nil {
t.Fatal(err)
}
select {
case <-aliceChannelLink.packets:
t.Fatalf("expected duplicate fail to not arrive at the destination")
case <-time.After(time.Second):
}
}
@ -645,7 +673,7 @@ func TestSwitchForwardSettleAfterFullAdd(t *testing.T) {
}
// Handle the request and checks that bob channel link received it.
if err := forwardPackets(t, s, ogPacket); err != nil {
if err := s.ForwardPackets(nil, ogPacket); err != nil {
t.Fatal(err)
}
@ -735,7 +763,7 @@ func TestSwitchForwardSettleAfterFullAdd(t *testing.T) {
}
// Send the settle packet from the remote peer through the switch.
if err := <-s2.ForwardPackets(nil, settle); err != nil {
if err := s2.ForwardPackets(nil, settle); err != nil {
t.Fatalf(err.Error())
}
@ -759,10 +787,14 @@ func TestSwitchForwardSettleAfterFullAdd(t *testing.T) {
t.Fatalf("wrong amount of circuits")
}
// Send the settle packet again, which should fail.
if err := <-s2.ForwardPackets(nil, settle); err != nil {
t.Fatalf("expected success when sending duplicate settle " +
"with no pending circuit")
// Send the settle packet again, which not arrive at destination.
if err := s2.ForwardPackets(nil, settle); err != nil {
t.Fatal(err)
}
select {
case <-bobChannelLink.packets:
t.Fatalf("expected duplicate fail to not arrive at the destination")
case <-time.After(time.Second):
}
}
@ -843,7 +875,7 @@ func TestSwitchForwardDropAfterFullAdd(t *testing.T) {
}
// Handle the request and checks that bob channel link received it.
if err := forwardPackets(t, s, ogPacket); err != nil {
if err := s.ForwardPackets(nil, ogPacket); err != nil {
t.Fatal(err)
}
@ -916,7 +948,7 @@ func TestSwitchForwardDropAfterFullAdd(t *testing.T) {
// Resend the failed htlc. The packet will be dropped silently since the
// switch will detect that it has been half added previously.
if err := <-s2.ForwardPackets(nil, ogPacket); err != nil {
if err := s2.ForwardPackets(nil, ogPacket); err != nil {
t.Fatal(err)
}
@ -1008,7 +1040,7 @@ func TestSwitchForwardFailAfterHalfAdd(t *testing.T) {
}
// Handle the request and checks that bob channel link received it.
if err := forwardPackets(t, s, ogPacket); err != nil {
if err := s.ForwardPackets(nil, ogPacket); err != nil {
t.Fatal(err)
}
@ -1076,7 +1108,7 @@ func TestSwitchForwardFailAfterHalfAdd(t *testing.T) {
// Resend the failed htlc, it should be returned to alice since the
// switch will detect that it has been half added previously.
err = <-s2.ForwardPackets(nil, ogPacket)
err = s2.ForwardPackets(nil, ogPacket)
if err != nil {
t.Fatal(err)
}
@ -1174,7 +1206,7 @@ func TestSwitchForwardCircuitPersistence(t *testing.T) {
}
// Handle the request and checks that bob channel link received it.
if err := forwardPackets(t, s, ogPacket); err != nil {
if err := s.ForwardPackets(nil, ogPacket); err != nil {
t.Fatal(err)
}
@ -1264,7 +1296,7 @@ func TestSwitchForwardCircuitPersistence(t *testing.T) {
}
// Handle the request and checks that payment circuit works properly.
if err := <-s2.ForwardPackets(nil, ogPacket); err != nil {
if err := s2.ForwardPackets(nil, ogPacket); err != nil {
t.Fatal(err)
}
@ -1414,7 +1446,17 @@ func TestCircularForwards(t *testing.T) {
// Attempt to forward the packet and check for the expected
// error.
err = forwardPackets(t, s, packet)
if err = s.ForwardPackets(nil, packet); err != nil {
t.Fatal(err)
}
select {
case p := <-aliceChannelLink.packets:
if p.linkFailure != nil {
err = p.linkFailure
}
case <-time.After(time.Second):
t.Fatal("no timely reply from switch")
}
if !reflect.DeepEqual(err, test.expectedErr) {
t.Fatalf("expected: %v, got: %v",
test.expectedErr, err)
@ -1634,18 +1676,30 @@ func testSkipIneligibleLinksMultiHopForward(t *testing.T,
}
// The request to forward should fail as
err = forwardPackets(t, s, packet)
if err := s.ForwardPackets(nil, packet); err != nil {
t.Fatal(err)
}
var linkError *LinkError
select {
case p := <-aliceChannelLink.packets:
linkError = p.linkFailure
case p := <-bobChannelLink1.packets:
linkError = p.linkFailure
case p := <-bobChannelLink2.packets:
linkError = p.linkFailure
case <-time.After(time.Second):
t.Fatal("no timely reply from switch")
}
failure := obfuscator.(*mockObfuscator).failure
if testCase.expectedReply == lnwire.CodeNone {
if err != nil {
if linkError != nil {
t.Fatalf("forwarding should have succeeded")
}
if failure != nil {
t.Fatalf("unexpected failure %T", failure)
}
} else {
if err == nil {
if linkError == nil {
t.Fatalf("forwarding should have failed due to " +
"inactive link")
}
@ -1793,7 +1847,7 @@ func TestSwitchCancel(t *testing.T) {
}
// Handle the request and checks that bob channel link received it.
if err := forwardPackets(t, s, request); err != nil {
if err := s.ForwardPackets(nil, request); err != nil {
t.Fatal(err)
}
@ -1825,7 +1879,7 @@ func TestSwitchCancel(t *testing.T) {
}
// Handle the request and checks that payment circuit works properly.
if err := forwardPackets(t, s, request); err != nil {
if err := s.ForwardPackets(nil, request); err != nil {
t.Fatal(err)
}
@ -1908,7 +1962,7 @@ func TestSwitchAddSamePayment(t *testing.T) {
}
// Handle the request and checks that bob channel link received it.
if err := forwardPackets(t, s, request); err != nil {
if err := s.ForwardPackets(nil, request); err != nil {
t.Fatal(err)
}
@ -1938,7 +1992,7 @@ func TestSwitchAddSamePayment(t *testing.T) {
}
// Handle the request and checks that bob channel link received it.
if err := forwardPackets(t, s, request); err != nil {
if err := s.ForwardPackets(nil, request); err != nil {
t.Fatal(err)
}
@ -1967,7 +2021,7 @@ func TestSwitchAddSamePayment(t *testing.T) {
}
// Handle the request and checks that payment circuit works properly.
if err := forwardPackets(t, s, request); err != nil {
if err := s.ForwardPackets(nil, request); err != nil {
t.Fatal(err)
}
@ -1993,7 +2047,7 @@ func TestSwitchAddSamePayment(t *testing.T) {
}
// Handle the request and checks that payment circuit works properly.
if err := forwardPackets(t, s, request); err != nil {
if err := s.ForwardPackets(nil, request); err != nil {
t.Fatal(err)
}
@ -2136,7 +2190,7 @@ func TestSwitchSendPayment(t *testing.T) {
},
}
if err := forwardPackets(t, s, packet); err != nil {
if err := s.ForwardPackets(nil, packet); err != nil {
t.Fatalf("can't forward htlc packet: %v", err)
}
@ -2631,7 +2685,7 @@ func TestInvalidFailure(t *testing.T) {
},
}
if err := forwardPackets(t, s, packet); err != nil {
if err := s.ForwardPackets(nil, packet); err != nil {
t.Fatalf("can't forward htlc packet: %v", err)
}
@ -3057,17 +3111,3 @@ func getThreeHopEvents(channels *clusterChannels, htlcID uint64,
return aliceEvents, bobEvents, carolEvents
}
// forwardPackets forwards packets to the switch and enforces a timeout on the
// reply.
func forwardPackets(t *testing.T, s *Switch, packets ...*htlcPacket) error {
select {
case err := <-s.ForwardPackets(nil, packets...):
return err
case <-time.After(time.Second):
t.Fatal("no timely reply from switch")
return nil
}
}

View File

@ -89,13 +89,13 @@
<time> [ERR] HSWC: ChannelLink(<chan>): unhandled error while forwarding htlc packet over htlcswitch: insufficient bandwidth to route htlc
<time> [ERR] HSWC: ChannelLink(<chan>): unhandled error while forwarding htlc packet over htlcswitch: node configured to disallow forwards
<time> [ERR] HSWC: ChannelLink(<chan>): unhandled error while forwarding htlc packet over htlcswitch: UnknownNextPeer
<time> [ERR] HSWC: ChannelLink(<chan>): Unhandled error while reforwarding htlc settle/fail over htlcswitch: AmountBelowMinimum(amt=<amt>, update=(lnwire.ChannelUpdate) {
<time> [ERR] HSWC: ChannelLink(<chan>): Unhandled error while reforwarding htlc settle/fail over htlcswitch: circuit has already been closed
<time> [ERR] HSWC: ChannelLink(<chan>): Unhandled error while reforwarding htlc settle/fail over htlcswitch: FeeInsufficient(htlc_amt==<amt>, update=(lnwire.ChannelUpdate) {
<time> [ERR] HSWC: ChannelLink(<chan>): Unhandled error while reforwarding htlc settle/fail over htlcswitch: insufficient bandwidth to route htlc
<time> [ERR] HSWC: ChannelLink(<chan>): Unhandled error while reforwarding htlc settle/fail over htlcswitch: node configured to disallow forwards
<time> [ERR] HSWC: ChannelLink(<chan>): Unhandled error while reforwarding htlc settle/fail over htlcswitch: UnknownNextPeer
<time> [ERR] HSWC: ChannelLink(<chan>): failing link: unable to synchronize channel states: unable to send chan sync message for ChannelPoint(<chan_point>): set tcp <ip>: use of closed network connection with error: unable to resume channel, recovery required
<time> [ERR] HSWC: Unhandled error while reforwarding htlc settle/fail over htlcswitch: AmountBelowMinimum(amt=<amt>, update=(lnwire.ChannelUpdate) {
<time> [ERR] HSWC: Unhandled error while reforwarding htlc settle/fail over htlcswitch: circuit has already been closed
<time> [ERR] HSWC: Unhandled error while reforwarding htlc settle/fail over htlcswitch: FeeInsufficient(htlc_amt==<amt>, update=(lnwire.ChannelUpdate) {
<time> [ERR] HSWC: Unhandled error while reforwarding htlc settle/fail over htlcswitch: insufficient bandwidth to route htlc
<time> [ERR] HSWC: Unhandled error while reforwarding htlc settle/fail over htlcswitch: node configured to disallow forwards
<time> [ERR] HSWC: Unhandled error while reforwarding htlc settle/fail over htlcswitch: UnknownNextPeer
<time> [ERR] HSWC: FeeInsufficient(htlc_amt==<amt>, update=(lnwire.ChannelUpdate) {
<time> [ERR] HSWC: insufficient bandwidth to route htlc
<time> [ERR] HSWC: Link <chan> not found