multi: stop casting peer warning messages as errors

Split the logic for processing `error` and `warning` messages from our
peers.
This commit is contained in:
positiveblue 2022-08-24 12:26:42 -07:00
parent e65f05360e
commit 4d4d8e480c
No known key found for this signature in database
GPG key ID: 4FFF2510928804DC
8 changed files with 282 additions and 57 deletions

View file

@ -859,15 +859,23 @@ func (f *Manager) reservationCoordinator() {
switch msg := fmsg.msg.(type) {
case *lnwire.OpenChannel:
f.handleFundingOpen(fmsg.peer, msg)
case *lnwire.AcceptChannel:
f.handleFundingAccept(fmsg.peer, msg)
case *lnwire.FundingCreated:
f.handleFundingCreated(fmsg.peer, msg)
case *lnwire.FundingSigned:
f.handleFundingSigned(fmsg.peer, msg)
case *lnwire.FundingLocked:
f.wg.Add(1)
go f.handleFundingLocked(fmsg.peer, msg)
case *lnwire.Warning:
f.handleWarningMsg(fmsg.peer, msg)
case *lnwire.Error:
f.handleErrorMsg(fmsg.peer, msg)
}
@ -4193,12 +4201,16 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
}
}
// handleWarningMsg processes the warning which was received from remote peer.
func (f *Manager) handleWarningMsg(peer lnpeer.Peer, msg *lnwire.Warning) {
log.Warnf("received warning message from peer %x: %v",
peer.IdentityKey().SerializeCompressed(), msg.Warning())
}
// handleErrorMsg processes the error which was received from remote peer,
// depending on the type of error we should do different clean up steps and
// inform the user about it.
func (f *Manager) handleErrorMsg(peer lnpeer.Peer,
msg *lnwire.Error) {
func (f *Manager) handleErrorMsg(peer lnpeer.Peer, msg *lnwire.Error) {
chanID := msg.ChanID
peerKey := peer.IdentityKey()

View file

@ -774,6 +774,14 @@ func fundChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
// Forward the response to Alice.
alice.fundingMgr.ProcessFundingMsg(acceptChannelResponse, bob)
// Check that sending warning messages does not abort the funding
// process.
warningMsg := &lnwire.Warning{
Data: []byte("random warning"),
}
alice.fundingMgr.ProcessFundingMsg(warningMsg, bob)
bob.fundingMgr.ProcessFundingMsg(warningMsg, alice)
// Alice responds with a FundingCreated message.
fundingCreated := assertFundingMsgSent(
t, alice.msgChan, "FundingCreated",

View file

@ -2069,7 +2069,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// although we "MAY" do so according to the specification.
case *lnwire.Warning:
l.log.Warnf("received warning message from peer: %v",
msg.Error.Error())
msg.Warning())
case *lnwire.Error:
// Error received from remote, MUST fail channel, but should

View file

@ -5196,13 +5196,21 @@ func TestChannelLinkFail(t *testing.T) {
t.Parallel()
testCases := []struct {
// name is the description for this test case.
name string
// options is used to set up mocks and configure the link
// before it is started.
options func(*channelLink)
// link test is used to execute the given test on the channel
// link after it is started.
linkTest func(*testing.T, *channelLink, *lnwallet.LightningChannel)
linkTest func(*testing.T, *channelLink,
*lnwallet.LightningChannel)
// shouldFail indicates whether or not the link should fail
// during this test case.
shouldFail bool
// shouldForceClose indicates whether we expect the link to
// force close the channel in response to the actions performed
@ -5215,8 +5223,24 @@ func TestChannelLinkFail(t *testing.T) {
permanentFailure bool
}{
{
// Test that we don't force close if syncing states
// fails at startup.
"don't fail the channel if we receive a warning " +
"message",
func(c *channelLink) {
},
func(_ *testing.T, c *channelLink,
_ *lnwallet.LightningChannel) {
warningMsg := &lnwire.Warning{
Data: []byte("random warning"),
}
c.HandleChannelUpdate(warningMsg)
},
false,
false,
false,
},
{
"don't force close if syncing states fails at startup",
func(c *channelLink) {
c.cfg.SyncStates = true
@ -5224,15 +5248,18 @@ func TestChannelLinkFail(t *testing.T) {
// the SendMessage call fail.
c.cfg.Peer.(*mockPeer).disconnected = true
},
func(t *testing.T, c *channelLink, _ *lnwallet.LightningChannel) {
func(*testing.T, *channelLink,
*lnwallet.LightningChannel) { // nolint:whitespace,lll
// Should fail at startup.
},
true,
false,
false,
},
{
// Test that we don't force closes the channel if
// resolving forward packages fails at startup.
"we don't force closes the channel if resolving " +
"forward packages fails at startup",
func(c *channelLink) {
// We make the call to resolveFwdPkgs fail by
// making the underlying forwarder fail.
@ -5241,18 +5268,23 @@ func TestChannelLinkFail(t *testing.T) {
}
c.channel.State().Packager = pkg
},
func(t *testing.T, c *channelLink, _ *lnwallet.LightningChannel) {
func(*testing.T, *channelLink,
*lnwallet.LightningChannel) { // nolint:whitespace,lll
// Should fail at startup.
},
true,
false,
false,
},
{
// Test that we don't force close the channel if we
// receive an invalid Settle message.
"don't force close the channel if we receive an " +
"invalid Settle message",
func(c *channelLink) {
},
func(t *testing.T, c *channelLink, _ *lnwallet.LightningChannel) {
func(_ *testing.T, c *channelLink,
_ *lnwallet.LightningChannel) {
// Recevive an htlc settle for an htlc that was
// never added.
htlcSettle := &lnwire.UpdateFulfillHTLC{
@ -5261,16 +5293,17 @@ func TestChannelLinkFail(t *testing.T) {
}
c.HandleChannelUpdate(htlcSettle)
},
true,
false,
false,
},
{
// Test that we force close the channel if we receive
// an invalid CommitSig, not containing enough HTLC
// sigs.
"force close the channel if we receive an invalid " +
"CommitSig, not containing enough HTLC sigs",
func(c *channelLink) {
},
func(t *testing.T, c *channelLink, remoteChannel *lnwallet.LightningChannel) {
func(_ *testing.T, c *channelLink,
remoteChannel *lnwallet.LightningChannel) {
// Generate an HTLC and send to the link.
htlc1 := generateHtlc(t, c, 0)
@ -5300,15 +5333,18 @@ func TestChannelLinkFail(t *testing.T) {
c.HandleChannelUpdate(commitSig)
},
true,
true,
false,
},
{
// Test that we force close the channel if we receive
// an invalid CommitSig, where the sig itself is
// corrupted.
"force close the channel if we receive an invalid " +
"CommitSig, where the sig itself is corrupted",
func(c *channelLink) {
},
func(t *testing.T, c *channelLink, remoteChannel *lnwallet.LightningChannel) {
func(t *testing.T, c *channelLink,
remoteChannel *lnwallet.LightningChannel) {
t.Helper()
// Generate an HTLC and send to the link.
htlc1 := generateHtlc(t, c, 0)
@ -5340,17 +5376,21 @@ func TestChannelLinkFail(t *testing.T) {
c.HandleChannelUpdate(commitSig)
},
true,
true,
false,
},
{
// Test that we consider the failure permanent if we
// receive a link error from the remote.
"consider the failure permanent if we receive a link " +
"error from the remote",
func(c *channelLink) {
},
func(t *testing.T, c *channelLink, remoteChannel *lnwallet.LightningChannel) {
func(_ *testing.T, c *channelLink,
remoteChannel *lnwallet.LightningChannel) {
err := &lnwire.Error{}
c.HandleChannelUpdate(err)
},
true,
false,
// TODO(halseth) For compatibility with CL we currently
// don't treat Errors as permanent errors.
@ -5361,12 +5401,10 @@ func TestChannelLinkFail(t *testing.T) {
const chanAmt = btcutil.SatoshiPerBitcoin * 5
// Execute each test case.
for i, test := range testCases {
for _, test := range testCases {
link, remoteChannel, _, start, _, err :=
newSingleLinkTestHarness(t, chanAmt, 0)
if err != nil {
t.Fatalf("unable to create link: %v", err)
}
require.NoError(t, err, test.name)
coreLink := link.(*channelLink)
@ -5375,41 +5413,50 @@ func TestChannelLinkFail(t *testing.T) {
linkErrors := make(chan LinkFailureError, 1)
coreLink.cfg.OnChannelFailure = func(_ lnwire.ChannelID,
_ lnwire.ShortChannelID, linkErr LinkFailureError) {
linkErrors <- linkErr
}
// Set up the link before starting it.
test.options(coreLink)
if err := start(); err != nil {
t.Fatalf("unable to start test harness: %v", err)
}
err = start()
require.NoError(t, err, test.name)
// Execute the test case.
test.linkTest(t, coreLink, remoteChannel)
// Currently we expect all test cases to lead to link error.
var linkErr LinkFailureError
errReceived := false
select {
case linkErr = <-linkErrors:
errReceived = true
case <-time.After(10 * time.Second):
t.Fatalf("%d) Alice did not fail"+
"channel", i)
// If we do not receive a link error in 10s we assume
// that we won't receive any.
}
require.Equal(t, test.shouldFail, errReceived, test.name)
// Check that the link is up and return.
if !test.shouldFail {
require.False(t, coreLink.failed)
return
}
require.True(t, coreLink.failed)
// If we expect the link to force close the channel in this
// case, check that it happens. If not, make sure it does not
// happen.
if test.shouldForceClose != linkErr.ForceClose {
t.Fatalf("%d) Expected Alice to force close(%v), "+
"instead got(%v)", i, test.shouldForceClose,
linkErr.ForceClose)
}
if test.permanentFailure != linkErr.PermanentFailure {
t.Fatalf("%d) Expected Alice set permanent failure(%v), "+
"instead got(%v)", i, test.permanentFailure,
linkErr.PermanentFailure)
}
require.Equal(
t, test.shouldForceClose, linkErr.ForceClose, test.name,
)
require.Equal(
t, test.permanentFailure, linkErr.PermanentFailure,
test.name,
)
}
}

View file

@ -91,60 +91,70 @@ func WriteElement(w *bytes.Buffer, element interface{}) error {
if _, err := w.Write(b[:]); err != nil {
return err
}
case uint8:
var b [1]byte
b[0] = e
if _, err := w.Write(b[:]); err != nil {
return err
}
case FundingFlag:
var b [1]byte
b[0] = uint8(e)
if _, err := w.Write(b[:]); err != nil {
return err
}
case uint16:
var b [2]byte
binary.BigEndian.PutUint16(b[:], e)
if _, err := w.Write(b[:]); err != nil {
return err
}
case ChanUpdateMsgFlags:
var b [1]byte
b[0] = uint8(e)
if _, err := w.Write(b[:]); err != nil {
return err
}
case ChanUpdateChanFlags:
var b [1]byte
b[0] = uint8(e)
if _, err := w.Write(b[:]); err != nil {
return err
}
case MilliSatoshi:
var b [8]byte
binary.BigEndian.PutUint64(b[:], uint64(e))
if _, err := w.Write(b[:]); err != nil {
return err
}
case btcutil.Amount:
var b [8]byte
binary.BigEndian.PutUint64(b[:], uint64(e))
if _, err := w.Write(b[:]); err != nil {
return err
}
case uint32:
var b [4]byte
binary.BigEndian.PutUint32(b[:], e)
if _, err := w.Write(b[:]); err != nil {
return err
}
case uint64:
var b [8]byte
binary.BigEndian.PutUint64(b[:], e)
if _, err := w.Write(b[:]); err != nil {
return err
}
case *btcec.PublicKey:
if e == nil {
return fmt.Errorf("cannot write nil pubkey")
@ -156,6 +166,7 @@ func WriteElement(w *bytes.Buffer, element interface{}) error {
if _, err := w.Write(b[:]); err != nil {
return err
}
case []Sig:
var b [2]byte
numSigs := uint16(len(e))
@ -169,11 +180,13 @@ func WriteElement(w *bytes.Buffer, element interface{}) error {
return err
}
}
case Sig:
// Write buffer
if _, err := w.Write(e[:]); err != nil {
return err
}
case PingPayload:
var l [2]byte
binary.BigEndian.PutUint16(l[:], uint16(len(e)))
@ -184,6 +197,7 @@ func WriteElement(w *bytes.Buffer, element interface{}) error {
if _, err := w.Write(e[:]); err != nil {
return err
}
case PongPayload:
var l [2]byte
binary.BigEndian.PutUint16(l[:], uint16(len(e)))
@ -194,6 +208,18 @@ func WriteElement(w *bytes.Buffer, element interface{}) error {
if _, err := w.Write(e[:]); err != nil {
return err
}
case WarningData:
var l [2]byte
binary.BigEndian.PutUint16(l[:], uint16(len(e)))
if _, err := w.Write(l[:]); err != nil {
return err
}
if _, err := w.Write(e[:]); err != nil {
return err
}
case ErrorData:
var l [2]byte
binary.BigEndian.PutUint16(l[:], uint16(len(e)))
@ -204,6 +230,7 @@ func WriteElement(w *bytes.Buffer, element interface{}) error {
if _, err := w.Write(e[:]); err != nil {
return err
}
case OpaqueReason:
var l [2]byte
binary.BigEndian.PutUint16(l[:], uint16(len(e)))
@ -214,14 +241,17 @@ func WriteElement(w *bytes.Buffer, element interface{}) error {
if _, err := w.Write(e[:]); err != nil {
return err
}
case [33]byte:
if _, err := w.Write(e[:]); err != nil {
return err
}
case []byte:
if _, err := w.Write(e[:]); err != nil {
return err
}
case PkScript:
// The largest script we'll accept is a p2wsh which is exactly
// 34 bytes long.
@ -233,6 +263,7 @@ func WriteElement(w *bytes.Buffer, element interface{}) error {
if err := wire.WriteVarBytes(w, 0, e); err != nil {
return err
}
case *RawFeatureVector:
if e == nil {
return fmt.Errorf("cannot write nil feature vector")
@ -265,10 +296,12 @@ func WriteElement(w *bytes.Buffer, element interface{}) error {
if _, err := w.Write(e[:]); err != nil {
return err
}
case FailCode:
if err := WriteElement(w, uint16(e)); err != nil {
return err
}
case ShortChannelID:
// Check that field fit in 3 bytes and write the blockHeight
if e.BlockHeight > ((1 << 24) - 1) {
@ -399,6 +432,7 @@ func WriteElement(w *bytes.Buffer, element interface{}) error {
return err
}
}
case color.RGBA:
if err := WriteElements(w, e.R, e.G, e.B); err != nil {
return err
@ -473,68 +507,78 @@ func ReadElement(r io.Reader, element interface{}) error {
if err != nil {
return err
}
*e = alias
case *ShortChanIDEncoding:
var b [1]uint8
if _, err := r.Read(b[:]); err != nil {
return err
}
*e = ShortChanIDEncoding(b[0])
case *uint8:
var b [1]uint8
if _, err := r.Read(b[:]); err != nil {
return err
}
*e = b[0]
case *FundingFlag:
var b [1]uint8
if _, err := r.Read(b[:]); err != nil {
return err
}
*e = FundingFlag(b[0])
case *uint16:
var b [2]byte
if _, err := io.ReadFull(r, b[:]); err != nil {
return err
}
*e = binary.BigEndian.Uint16(b[:])
case *ChanUpdateMsgFlags:
var b [1]uint8
if _, err := r.Read(b[:]); err != nil {
return err
}
*e = ChanUpdateMsgFlags(b[0])
case *ChanUpdateChanFlags:
var b [1]uint8
if _, err := r.Read(b[:]); err != nil {
return err
}
*e = ChanUpdateChanFlags(b[0])
case *uint32:
var b [4]byte
if _, err := io.ReadFull(r, b[:]); err != nil {
return err
}
*e = binary.BigEndian.Uint32(b[:])
case *uint64:
var b [8]byte
if _, err := io.ReadFull(r, b[:]); err != nil {
return err
}
*e = binary.BigEndian.Uint64(b[:])
case *MilliSatoshi:
var b [8]byte
if _, err := io.ReadFull(r, b[:]); err != nil {
return err
}
*e = MilliSatoshi(int64(binary.BigEndian.Uint64(b[:])))
case *btcutil.Amount:
var b [8]byte
if _, err := io.ReadFull(r, b[:]); err != nil {
return err
}
*e = btcutil.Amount(int64(binary.BigEndian.Uint64(b[:])))
case **btcec.PublicKey:
var b [btcec.PubKeyBytesLenCompressed]byte
if _, err = io.ReadFull(r, b[:]); err != nil {
@ -546,13 +590,13 @@ func ReadElement(r io.Reader, element interface{}) error {
return err
}
*e = pubKey
case **RawFeatureVector:
f := NewRawFeatureVector()
err = f.Decode(r)
if err != nil {
return err
}
*e = f
case *[]Sig:
@ -571,13 +615,13 @@ func ReadElement(r io.Reader, element interface{}) error {
}
}
}
*e = sigs
case *Sig:
if _, err := io.ReadFull(r, e[:]); err != nil {
return err
}
case *OpaqueReason:
var l [2]byte
if _, err := io.ReadFull(r, l[:]); err != nil {
@ -589,6 +633,19 @@ func ReadElement(r io.Reader, element interface{}) error {
if _, err := io.ReadFull(r, *e); err != nil {
return err
}
case *WarningData:
var l [2]byte
if _, err := io.ReadFull(r, l[:]); err != nil {
return err
}
errorLen := binary.BigEndian.Uint16(l[:])
*e = WarningData(make([]byte, errorLen))
if _, err := io.ReadFull(r, *e); err != nil {
return err
}
case *ErrorData:
var l [2]byte
if _, err := io.ReadFull(r, l[:]); err != nil {
@ -600,6 +657,7 @@ func ReadElement(r io.Reader, element interface{}) error {
if _, err := io.ReadFull(r, *e); err != nil {
return err
}
case *PingPayload:
var l [2]byte
if _, err := io.ReadFull(r, l[:]); err != nil {
@ -611,6 +669,7 @@ func ReadElement(r io.Reader, element interface{}) error {
if _, err := io.ReadFull(r, *e); err != nil {
return err
}
case *PongPayload:
var l [2]byte
if _, err := io.ReadFull(r, l[:]); err != nil {
@ -622,20 +681,24 @@ func ReadElement(r io.Reader, element interface{}) error {
if _, err := io.ReadFull(r, *e); err != nil {
return err
}
case *[33]byte:
if _, err := io.ReadFull(r, e[:]); err != nil {
return err
}
case []byte:
if _, err := io.ReadFull(r, e); err != nil {
return err
}
case *PkScript:
pkScript, err := wire.ReadVarBytes(r, 0, 34, "pkscript")
if err != nil {
return err
}
*e = pkScript
case *wire.OutPoint:
var h [32]byte
if _, err = io.ReadFull(r, h[:]); err != nil {
@ -657,10 +720,12 @@ func ReadElement(r io.Reader, element interface{}) error {
Hash: *hash,
Index: uint32(index),
}
case *FailCode:
if err := ReadElement(r, (*uint16)(e)); err != nil {
return err
}
case *ChannelID:
if _, err := io.ReadFull(r, e[:]); err != nil {
return err
@ -833,6 +898,7 @@ func ReadElement(r io.Reader, element interface{}) error {
}
*e = addresses
case *color.RGBA:
err := ReadElements(r,
&e.R,
@ -842,6 +908,7 @@ func ReadElement(r io.Reader, element interface{}) error {
if err != nil {
return err
}
case *DeliveryAddress:
var addrLen [2]byte
if _, err = io.ReadFull(r, addrLen[:]); err != nil {

View file

@ -1,16 +1,70 @@
package lnwire
import (
"bytes"
"fmt"
"io"
)
// WarningData is a set of bytes associated with a particular sent warning. A
// receiving node SHOULD only print out data verbatim if the string is composed
// solely of printable ASCII characters. For reference, the printable character
// set includes byte values 32 through 127 inclusive.
type WarningData []byte
// Warning is used to express non-critical errors in the protocol, providing
// a "soft" way for nodes to communicate failures.
type Warning struct {
// ChanID references the active channel in which the warning occurred
// within. If the ChanID is all zeros, then this warning applies to the
// entire established connection.
ChanID ChannelID
// Data is the attached warning data that describes the exact failure
// which caused the warning message to be sent.
Data WarningData
}
// A compile time check to ensure Warning implements the lnwire.Message
// interface.
var _ Message = (*Warning)(nil)
// Warning is used to express non-critical errors in the protocol, providing
// a "soft" way for nodes to communicate failures. Since it has the same
// structure as errors, warnings simply include an Error so that we can leverage
// their encode/decode functionality, and over write the MsgType function to
// differentiate them.
type Warning struct {
Error
// NewWarning creates a new Warning message.
func NewWarning() *Warning {
return &Warning{}
}
// Warning returns the string representation to Warning.
func (c *Warning) Warning() string {
errMsg := "non-ascii data"
if isASCII(c.Data) {
errMsg = string(c.Data)
}
return fmt.Sprintf("chan_id=%v, err=%v", c.ChanID, errMsg)
}
// Decode deserializes a serialized Warning message stored in the passed
// io.Reader observing the specified protocol version.
//
// This is part of the lnwire.Message interface.
func (c *Warning) Decode(r io.Reader, pver uint32) error {
return ReadElements(r,
&c.ChanID,
&c.Data,
)
}
// Encode serializes the target Warning into the passed io.Writer observing the
// protocol version specified.
//
// This is part of the lnwire.Message interface.
func (c *Warning) Encode(w *bytes.Buffer, pver uint32) error {
if err := WriteBytes(w, c.ChanID[:]); err != nil {
return err
}
return WriteWarningData(w, c.Data)
}
// MsgType returns the integer uniquely identifying an Warning message on the

View file

@ -241,6 +241,11 @@ func WritePongPayload(buf *bytes.Buffer, payload PongPayload) error {
return writeDataWithLength(buf, payload)
}
// WriteWarningData appends the data to the provided buffer.
func WriteWarningData(buf *bytes.Buffer, data WarningData) error {
return writeDataWithLength(buf, data)
}
// WriteErrorData appends the data to the provided buffer.
func WriteErrorData(buf *bytes.Buffer, data ErrorData) error {
return writeDataWithLength(buf, data)

View file

@ -1539,7 +1539,7 @@ out:
case *lnwire.Warning:
targetChan = msg.ChanID
isLinkUpdate = p.handleError(&msg.Error)
isLinkUpdate = p.handleWarning(msg)
case *lnwire.Error:
targetChan = msg.ChanID
@ -1675,6 +1675,38 @@ func (p *Brontide) storeError(err error) {
)
}
// handleWarning processes a warning message read from the remote peer. The
// boolean return indicates whether the message should be delivered to a
// targeted peer or not. The message gets stored in memory as an error if we
// have open channels with the peer we received it from.
//
// NOTE: This method should only be called from within the readHandler.
func (p *Brontide) handleWarning(msg *lnwire.Warning) bool {
switch {
// Connection wide messages should be forward the warning to all the
// channels with this peer.
case msg.ChanID == lnwire.ConnectionWideID:
for _, chanStream := range p.activeMsgStreams {
chanStream.AddMsg(msg)
}
return false
// If the channel ID for the warning message corresponds to a pending
// channel, then the funding manager will handle the warning.
case p.cfg.FundingManager.IsPendingChannel(msg.ChanID, p):
p.cfg.FundingManager.ProcessFundingMsg(msg, p)
return false
// If not we hand the warning to the channel link for this channel.
case p.isActiveChannel(msg.ChanID):
return true
default:
return false
}
}
// handleError processes an error message read from the remote peer. The boolean
// returns indicates whether the message should be delivered to a targeted peer.
// It stores the error we received from the peer in memory if we have a channel
@ -1775,7 +1807,7 @@ func messageSummary(msg lnwire.Message) string {
msg.ChanID, msg.ID, msg.FailureCode)
case *lnwire.Warning:
return fmt.Sprintf("%v", msg.Error.Error())
return fmt.Sprintf("%v", msg.Warning())
case *lnwire.Error:
return fmt.Sprintf("%v", msg.Error())