mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-01-19 05:45:21 +01:00
Merge pull request #4051 from carlaKC/3000-peererrors
peers: Track errors across connections
This commit is contained in:
commit
b03c3c2df3
@ -1335,7 +1335,13 @@ var listPeersCommand = cli.Command{
|
||||
Name: "listpeers",
|
||||
Category: "Peers",
|
||||
Usage: "List all active, currently connected peers.",
|
||||
Action: actionDecorator(listPeers),
|
||||
Flags: []cli.Flag{
|
||||
cli.BoolFlag{
|
||||
Name: "list_errors",
|
||||
Usage: "list a full set of most recent errors for the peer",
|
||||
},
|
||||
},
|
||||
Action: actionDecorator(listPeers),
|
||||
}
|
||||
|
||||
func listPeers(ctx *cli.Context) error {
|
||||
@ -1343,7 +1349,11 @@ func listPeers(ctx *cli.Context) error {
|
||||
client, cleanUp := getClient(ctx)
|
||||
defer cleanUp()
|
||||
|
||||
req := &lnrpc.ListPeersRequest{}
|
||||
// By default, we display a single error on the cli. If the user
|
||||
// specifically requests a full error set, then we will provide it.
|
||||
req := &lnrpc.ListPeersRequest{
|
||||
LatestError: !ctx.IsSet("list_errors"),
|
||||
}
|
||||
resp, err := client.ListPeers(ctxb, req)
|
||||
if err != nil {
|
||||
return err
|
||||
|
1700
lnrpc/rpc.pb.go
1700
lnrpc/rpc.pb.go
File diff suppressed because it is too large
Load Diff
@ -269,10 +269,18 @@ func request_Lightning_DisconnectPeer_0(ctx context.Context, marshaler runtime.M
|
||||
|
||||
}
|
||||
|
||||
var (
|
||||
filter_Lightning_ListPeers_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)}
|
||||
)
|
||||
|
||||
func request_Lightning_ListPeers_0(ctx context.Context, marshaler runtime.Marshaler, client LightningClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
|
||||
var protoReq ListPeersRequest
|
||||
var metadata runtime.ServerMetadata
|
||||
|
||||
if err := runtime.PopulateQueryParameters(&protoReq, req.URL.Query(), filter_Lightning_ListPeers_0); err != nil {
|
||||
return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err)
|
||||
}
|
||||
|
||||
msg, err := client.ListPeers(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
|
||||
return msg, metadata, err
|
||||
|
||||
|
@ -1571,9 +1571,32 @@ message Peer {
|
||||
|
||||
/// Features advertised by the remote peer in their init message.
|
||||
map<uint32, Feature> features = 11;
|
||||
|
||||
/*
|
||||
The latest errors received from our peer with timestamps, limited to the 10
|
||||
most recent errors. These errors are tracked across peer connections, but
|
||||
are not persisted across lnd restarts. Note that these errors are only
|
||||
stored for peers that we have channels open with, to prevent peers from
|
||||
spamming us with errors at no cost.
|
||||
*/
|
||||
repeated TimestampedError errors = 12;
|
||||
}
|
||||
|
||||
message TimestampedError {
|
||||
// The unix timestamp in seconds when the error occurred.
|
||||
uint64 timestamp = 1;
|
||||
|
||||
// The string representation of the error sent by our peer.
|
||||
string error = 2;
|
||||
}
|
||||
|
||||
message ListPeersRequest {
|
||||
/*
|
||||
If true, only the last error that our peer sent us will be returned with
|
||||
the peer's information, rather than the full set of historic errors we have
|
||||
stored.
|
||||
*/
|
||||
bool latest_error = 1;
|
||||
}
|
||||
message ListPeersResponse {
|
||||
/// The list of currently connected peers
|
||||
|
@ -1186,6 +1186,16 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"parameters": [
|
||||
{
|
||||
"name": "latest_error",
|
||||
"description": "If true, only the last error that our peer sent us will be returned with\nthe peer's information, rather than the full set of historic errors we have\nstored.",
|
||||
"in": "query",
|
||||
"required": false,
|
||||
"type": "boolean",
|
||||
"format": "boolean"
|
||||
}
|
||||
],
|
||||
"tags": [
|
||||
"Lightning"
|
||||
]
|
||||
@ -3819,6 +3829,13 @@
|
||||
"$ref": "#/definitions/lnrpcFeature"
|
||||
},
|
||||
"description": "/ Features advertised by the remote peer in their init message."
|
||||
},
|
||||
"errors": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/lnrpcTimestampedError"
|
||||
},
|
||||
"description": "The latest errors received from our peer with timestamps, limited to the 10\nmost recent errors. These errors are tracked across peer connections, but\nare not persisted across lnd restarts. Note that these errors are only\nstored for peers that we have channels open with, to prevent peers from\nspamming us with errors at no cost."
|
||||
}
|
||||
}
|
||||
},
|
||||
@ -4270,6 +4287,20 @@
|
||||
"lnrpcStopResponse": {
|
||||
"type": "object"
|
||||
},
|
||||
"lnrpcTimestampedError": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"timestamp": {
|
||||
"type": "string",
|
||||
"format": "uint64",
|
||||
"description": "The unix timestamp in seconds when the error occurred."
|
||||
},
|
||||
"error": {
|
||||
"type": "string",
|
||||
"description": "The string representation of the error sent by our peer."
|
||||
}
|
||||
}
|
||||
},
|
||||
"lnrpcTransaction": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
81
peer.go
81
peer.go
@ -28,6 +28,7 @@ import (
|
||||
"github.com/lightningnetwork/lnd/lnwallet"
|
||||
"github.com/lightningnetwork/lnd/lnwire"
|
||||
"github.com/lightningnetwork/lnd/pool"
|
||||
"github.com/lightningnetwork/lnd/queue"
|
||||
"github.com/lightningnetwork/lnd/ticker"
|
||||
)
|
||||
|
||||
@ -52,6 +53,9 @@ const (
|
||||
// messages to be sent across the wire, requested by objects outside
|
||||
// this struct.
|
||||
outgoingQueueLen = 50
|
||||
|
||||
// errorBufferSize is the number of historic peer errors that we store.
|
||||
errorBufferSize = 10
|
||||
)
|
||||
|
||||
// outgoingMsg packages an lnwire.Message to be sent out on the wire, along with
|
||||
@ -91,6 +95,13 @@ type channelCloseUpdate struct {
|
||||
Success bool
|
||||
}
|
||||
|
||||
// timestampedError is a timestamped error that is used to store the most recent
|
||||
// errors we have experienced with our peers.
|
||||
type timestampedError struct {
|
||||
error error
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
// peer is an active peer on the Lightning Network. This struct is responsible
|
||||
// for managing any channel state related to this peer. To do so, it has
|
||||
// several helper goroutines to handle events such as HTLC timeouts, new
|
||||
@ -216,6 +227,14 @@ type peer struct {
|
||||
// peer's chansync message with its own over and over again.
|
||||
resentChanSyncMsg map[lnwire.ChannelID]struct{}
|
||||
|
||||
// errorBuffer stores a set of errors related to a peer. It contains
|
||||
// error messages that our peer has recently sent us over the wire and
|
||||
// records of unknown messages that were sent to us and, so that we can
|
||||
// track a full record of the communication errors we have had with our
|
||||
// peer. If we choose to disconnect from a peer, it also stores the
|
||||
// reason we had for disconnecting.
|
||||
errorBuffer *queue.CircularBuffer
|
||||
|
||||
// writePool is the task pool to that manages reuse of write buffers.
|
||||
// Write tasks are submitted to the pool in order to conserve the total
|
||||
// number of write buffers allocated at any one time, and decouple write
|
||||
@ -233,12 +252,15 @@ type peer struct {
|
||||
var _ lnpeer.Peer = (*peer)(nil)
|
||||
|
||||
// newPeer creates a new peer from an establish connection object, and a
|
||||
// pointer to the main server.
|
||||
// pointer to the main server. It takes an error buffer which may contain errors
|
||||
// from a previous connection with the peer if we have been connected to them
|
||||
// before.
|
||||
func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
|
||||
addr *lnwire.NetAddress, inbound bool,
|
||||
features, legacyFeatures *lnwire.FeatureVector,
|
||||
chanActiveTimeout time.Duration,
|
||||
outgoingCltvRejectDelta uint32) (
|
||||
outgoingCltvRejectDelta uint32,
|
||||
errBuffer *queue.CircularBuffer) (
|
||||
*peer, error) {
|
||||
|
||||
nodePub := addr.IdentityKey
|
||||
@ -276,6 +298,8 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
|
||||
|
||||
chanActiveTimeout: chanActiveTimeout,
|
||||
|
||||
errorBuffer: errBuffer,
|
||||
|
||||
writePool: server.writePool,
|
||||
readPool: server.readPool,
|
||||
|
||||
@ -338,6 +362,7 @@ func (p *peer) Start() error {
|
||||
msg := <-msgChan
|
||||
if msg, ok := msg.(*lnwire.Init); ok {
|
||||
if err := p.handleInitMsg(msg); err != nil {
|
||||
p.storeError(err)
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
@ -668,7 +693,10 @@ func (p *peer) Disconnect(reason error) {
|
||||
return
|
||||
}
|
||||
|
||||
peerLog.Infof("Disconnecting %s, reason: %v", p, reason)
|
||||
err := fmt.Errorf("disconnecting %s, reason: %v", p, reason)
|
||||
p.storeError(err)
|
||||
|
||||
peerLog.Infof(err.Error())
|
||||
|
||||
// Ensure that the TCP connection is properly closed before continuing.
|
||||
p.conn.Close()
|
||||
@ -1026,12 +1054,17 @@ out:
|
||||
peerLog.Infof("unable to read message from %v: %v",
|
||||
p, err)
|
||||
|
||||
switch err.(type) {
|
||||
// If we could not read our peer's message due to an
|
||||
// unknown type or invalid alias, we continue processing
|
||||
// as normal. We store unknown message and address
|
||||
// types, as they may provide debugging insight.
|
||||
switch e := err.(type) {
|
||||
// If this is just a message we don't yet recognize,
|
||||
// we'll continue processing as normal as this allows
|
||||
// us to introduce new messages in a forwards
|
||||
// compatible manner.
|
||||
case *lnwire.UnknownMessage:
|
||||
p.storeError(e)
|
||||
idleTimer.Reset(idleTimeout)
|
||||
continue
|
||||
|
||||
@ -1040,12 +1073,15 @@ out:
|
||||
// simply continue parsing the remainder of their
|
||||
// messages.
|
||||
case *lnwire.ErrUnknownAddrType:
|
||||
p.storeError(e)
|
||||
idleTimer.Reset(idleTimeout)
|
||||
continue
|
||||
|
||||
// If the NodeAnnouncement has an invalid alias, then
|
||||
// we'll log that error above and continue so we can
|
||||
// continue to read messges from the peer.
|
||||
// continue to read messages from the peer. We do not
|
||||
// store this error because it is of little debugging
|
||||
// value.
|
||||
case *lnwire.ErrInvalidNodeAlias:
|
||||
idleTimer.Reset(idleTimeout)
|
||||
continue
|
||||
@ -1141,8 +1177,13 @@ out:
|
||||
discStream.AddMsg(msg)
|
||||
|
||||
default:
|
||||
peerLog.Errorf("unknown message %v received from peer "+
|
||||
"%v", uint16(msg.MsgType()), p)
|
||||
// If the message we received is unknown to us, store
|
||||
// the type to track the failure.
|
||||
err := fmt.Errorf("unknown message type %v received",
|
||||
uint16(msg.MsgType()))
|
||||
p.storeError(err)
|
||||
|
||||
peerLog.Errorf("peer: %v, %v", p, err)
|
||||
}
|
||||
|
||||
if isLinkUpdate {
|
||||
@ -1181,13 +1222,39 @@ func (p *peer) isActiveChannel(chanID lnwire.ChannelID) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
// storeError stores an error in our peer's buffer of recent errors with the
|
||||
// current timestamp. Errors are only stored if we have at least one active
|
||||
// channel with the peer to mitigate dos attack vectors where a peer costlessly
|
||||
// connects to us and spams us with errors.
|
||||
func (p *peer) storeError(err error) {
|
||||
p.activeChanMtx.RLock()
|
||||
channelCount := len(p.activeChannels)
|
||||
p.activeChanMtx.RUnlock()
|
||||
|
||||
// If we do not have any active channels with the peer, we do not store
|
||||
// errors as a dos mitigation.
|
||||
if channelCount == 0 {
|
||||
peerLog.Tracef("no channels with peer: %v, not storing err", p)
|
||||
return
|
||||
}
|
||||
|
||||
p.errorBuffer.Add(
|
||||
×tampedError{timestamp: time.Now(), error: err},
|
||||
)
|
||||
}
|
||||
|
||||
// handleError processes an error message read from the remote peer. The boolean
|
||||
// returns indicates whether the message should be delivered to a targeted peer.
|
||||
// It stores the error we received from the peer in memory if we have a channel
|
||||
// open with the peer.
|
||||
//
|
||||
// NOTE: This method should only be called from within the readHandler.
|
||||
func (p *peer) handleError(msg *lnwire.Error) bool {
|
||||
key := p.addr.IdentityKey
|
||||
|
||||
// Store the error we have received.
|
||||
p.storeError(msg)
|
||||
|
||||
switch {
|
||||
|
||||
// In the case of an all-zero channel ID we want to forward the error to
|
||||
|
116
queue/circular_buf.go
Normal file
116
queue/circular_buf.go
Normal file
@ -0,0 +1,116 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
// errInvalidSize is returned when an invalid size for a buffer is provided.
|
||||
var errInvalidSize = errors.New("buffer size must be > 0")
|
||||
|
||||
// CircularBuffer is a buffer which retains a set of values in memory, and
|
||||
// overwrites the oldest item in the buffer when a new item needs to be
|
||||
// written.
|
||||
type CircularBuffer struct {
|
||||
// total is the total number of items that have been added to the
|
||||
// buffer.
|
||||
total int
|
||||
|
||||
// items is the set of buffered items.
|
||||
items []interface{}
|
||||
}
|
||||
|
||||
// NewCircularBuffer returns a new circular buffer with the size provided. It
|
||||
// will fail if a zero or negative size parameter is provided.
|
||||
func NewCircularBuffer(size int) (*CircularBuffer, error) {
|
||||
if size <= 0 {
|
||||
return nil, errInvalidSize
|
||||
}
|
||||
|
||||
return &CircularBuffer{
|
||||
total: 0,
|
||||
|
||||
// Create a slice with length and capacity equal to the size of
|
||||
// the buffer so that we do not need to resize the underlying
|
||||
// array when we add items.
|
||||
items: make([]interface{}, size),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// index returns the index that should be written to next.
|
||||
func (c *CircularBuffer) index() int {
|
||||
return c.total % len(c.items)
|
||||
}
|
||||
|
||||
// Add adds an item to the buffer, overwriting the oldest item if the buffer
|
||||
// is full.
|
||||
func (c *CircularBuffer) Add(item interface{}) {
|
||||
// Set the item in the next free index in the items array.
|
||||
c.items[c.index()] = item
|
||||
|
||||
// Increment the total number of items that we have stored.
|
||||
c.total++
|
||||
}
|
||||
|
||||
// List returns a copy of the items in the buffer ordered from the oldest to
|
||||
// newest item.
|
||||
func (c *CircularBuffer) List() []interface{} {
|
||||
size := cap(c.items)
|
||||
index := c.index()
|
||||
|
||||
switch {
|
||||
// If no items have been stored yet, we can just return a nil list.
|
||||
case c.total == 0:
|
||||
return nil
|
||||
|
||||
// If we have added fewer items than the buffer size, we can simply
|
||||
// return the total number of items from the beginning of the list
|
||||
// to the index. This special case is added because the oldest item
|
||||
// is at the beginning of the underlying array, not at the index when
|
||||
// we have not filled the array yet.
|
||||
case c.total < size:
|
||||
resp := make([]interface{}, c.total)
|
||||
copy(resp, c.items[:c.index()])
|
||||
return resp
|
||||
}
|
||||
|
||||
resp := make([]interface{}, size)
|
||||
|
||||
// Get the items in the underlying array from index to end, the first
|
||||
// item in this slice will be the oldest item in the list.
|
||||
firstHalf := c.items[index:]
|
||||
|
||||
// Copy the first set into our response slice from index 0, so that
|
||||
// the response returned is from oldest to newest.
|
||||
copy(resp, firstHalf)
|
||||
|
||||
// Get the items in the underlying array from beginning until the write
|
||||
// index, the last item in this slice will be the newest item in the
|
||||
// list.
|
||||
secondHalf := c.items[:index]
|
||||
|
||||
// Copy the second set of items into the response slice offset by the
|
||||
// length of the first set of items so that we return a response which
|
||||
// is ordered from oldest to newest entry.
|
||||
copy(resp[len(firstHalf):], secondHalf)
|
||||
|
||||
return resp
|
||||
}
|
||||
|
||||
// Total returns the total number of items that have been added to the buffer.
|
||||
func (c *CircularBuffer) Total() int {
|
||||
return c.total
|
||||
}
|
||||
|
||||
// Latest returns the item that was most recently added to the buffer.
|
||||
func (c *CircularBuffer) Latest() interface{} {
|
||||
// If no items have been added yet, return nil.
|
||||
if c.total == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// The latest item is one before our total, mod by length.
|
||||
latest := (c.total - 1) % len(c.items)
|
||||
|
||||
// Return the latest item added.
|
||||
return c.items[latest]
|
||||
}
|
198
queue/circular_buf_test.go
Normal file
198
queue/circular_buf_test.go
Normal file
@ -0,0 +1,198 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestNewCircularBuffer tests the size parameter check when creating a circular
|
||||
// buffer.
|
||||
func TestNewCircularBuffer(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
size int
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
name: "zero size",
|
||||
size: 0,
|
||||
expectedError: errInvalidSize,
|
||||
},
|
||||
{
|
||||
name: "negative size",
|
||||
size: -1,
|
||||
expectedError: errInvalidSize,
|
||||
},
|
||||
{
|
||||
name: "ok size",
|
||||
size: 1,
|
||||
expectedError: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
test := test
|
||||
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
_, err := NewCircularBuffer(test.size)
|
||||
if err != test.expectedError {
|
||||
t.Fatalf("expected: %v, got: %v",
|
||||
test.expectedError, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestCircularBuffer tests the adding and listing of items in a circular
|
||||
// buffer.
|
||||
func TestCircularBuffer(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
size int
|
||||
itemCount int
|
||||
expectedItems []interface{}
|
||||
}{
|
||||
{
|
||||
name: "no elements",
|
||||
size: 5,
|
||||
itemCount: 0,
|
||||
expectedItems: nil,
|
||||
},
|
||||
{
|
||||
name: "single element",
|
||||
size: 5,
|
||||
itemCount: 1,
|
||||
expectedItems: []interface{}{
|
||||
0,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no wrap, not full",
|
||||
size: 5,
|
||||
itemCount: 4,
|
||||
expectedItems: []interface{}{
|
||||
0, 1, 2, 3,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no wrap, exactly full",
|
||||
size: 5,
|
||||
itemCount: 5,
|
||||
expectedItems: []interface{}{
|
||||
0, 1, 2, 3, 4,
|
||||
},
|
||||
},
|
||||
{
|
||||
// The underlying array should contain {5, 1, 2, 3, 4}.
|
||||
name: "wrap, one over",
|
||||
size: 5,
|
||||
itemCount: 6,
|
||||
expectedItems: []interface{}{
|
||||
1, 2, 3, 4, 5,
|
||||
},
|
||||
},
|
||||
{
|
||||
// The underlying array should contain {5, 6, 2, 3, 4}.
|
||||
name: "wrap, two over",
|
||||
size: 5,
|
||||
itemCount: 7,
|
||||
expectedItems: []interface{}{
|
||||
2, 3, 4, 5, 6,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
test := test
|
||||
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
buffer, err := NewCircularBuffer(test.size)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < test.itemCount; i++ {
|
||||
buffer.Add(i)
|
||||
}
|
||||
|
||||
// List the items in the buffer and check that the list
|
||||
// is as expected.
|
||||
list := buffer.List()
|
||||
if !reflect.DeepEqual(test.expectedItems, list) {
|
||||
t.Fatalf("expected %v, got: %v",
|
||||
test.expectedItems, list)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestLatest tests fetching of the last item added to a circular buffer.
|
||||
func TestLatest(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
size int
|
||||
|
||||
// items is the number of items to add to the buffer.
|
||||
items int
|
||||
|
||||
// expectedItem is the value we expect from Latest().
|
||||
expectedItem interface{}
|
||||
}{
|
||||
{
|
||||
name: "no items",
|
||||
size: 3,
|
||||
items: 0,
|
||||
expectedItem: nil,
|
||||
},
|
||||
{
|
||||
name: "one item",
|
||||
size: 3,
|
||||
items: 1,
|
||||
expectedItem: 0,
|
||||
},
|
||||
{
|
||||
name: "exactly full",
|
||||
size: 3,
|
||||
items: 3,
|
||||
expectedItem: 2,
|
||||
},
|
||||
{
|
||||
name: "overflow to index 0",
|
||||
size: 3,
|
||||
items: 4,
|
||||
expectedItem: 3,
|
||||
},
|
||||
{
|
||||
name: "overflow twice to index 0",
|
||||
size: 3,
|
||||
items: 7,
|
||||
expectedItem: 6,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
test := test
|
||||
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
//t.Parallel()
|
||||
|
||||
buffer, err := NewCircularBuffer(test.size)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
for i := 0; i < test.items; i++ {
|
||||
buffer.Add(i)
|
||||
}
|
||||
|
||||
latest := buffer.Latest()
|
||||
|
||||
if !reflect.DeepEqual(latest, test.expectedItem) {
|
||||
t.Fatalf("expected: %v, got: %v",
|
||||
test.expectedItem, latest)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
27
rpcserver.go
27
rpcserver.go
@ -2459,6 +2459,33 @@ func (r *rpcServer) ListPeers(ctx context.Context,
|
||||
Features: features,
|
||||
}
|
||||
|
||||
var peerErrors []interface{}
|
||||
|
||||
// If we only want the most recent error, get the most recent
|
||||
// error from the buffer and add it to our list of errors if
|
||||
// it is non-nil. If we want all the stored errors, simply
|
||||
// add the full list to our set of errors.
|
||||
if in.LatestError {
|
||||
latestErr := serverPeer.errorBuffer.Latest()
|
||||
if latestErr != nil {
|
||||
peerErrors = []interface{}{latestErr}
|
||||
}
|
||||
} else {
|
||||
peerErrors = serverPeer.errorBuffer.List()
|
||||
}
|
||||
|
||||
// Add the relevant peer errors to our response.
|
||||
for _, error := range peerErrors {
|
||||
tsError := error.(*timestampedError)
|
||||
|
||||
rpcErr := &lnrpc.TimestampedError{
|
||||
Timestamp: uint64(tsError.timestamp.Unix()),
|
||||
Error: tsError.error.Error(),
|
||||
}
|
||||
|
||||
peer.Errors = append(peer.Errors, rpcErr)
|
||||
}
|
||||
|
||||
resp.Peers = append(resp.Peers, peer)
|
||||
}
|
||||
|
||||
|
34
server.go
34
server.go
@ -53,6 +53,7 @@ import (
|
||||
"github.com/lightningnetwork/lnd/netann"
|
||||
"github.com/lightningnetwork/lnd/peernotifier"
|
||||
"github.com/lightningnetwork/lnd/pool"
|
||||
"github.com/lightningnetwork/lnd/queue"
|
||||
"github.com/lightningnetwork/lnd/routing"
|
||||
"github.com/lightningnetwork/lnd/routing/localchans"
|
||||
"github.com/lightningnetwork/lnd/routing/route"
|
||||
@ -176,6 +177,12 @@ type server struct {
|
||||
persistentConnReqs map[string][]*connmgr.ConnReq
|
||||
persistentRetryCancels map[string]chan struct{}
|
||||
|
||||
// peerErrors keeps a set of peer error buffers for peers that have
|
||||
// disconnected from us. This allows us to track historic peer errors
|
||||
// over connections. The string of the peer's compressed pubkey is used
|
||||
// as a key for this map.
|
||||
peerErrors map[string]*queue.CircularBuffer
|
||||
|
||||
// ignorePeerTermination tracks peers for which the server has initiated
|
||||
// a disconnect. Adding a peer to this map causes the peer termination
|
||||
// watcher to short circuit in the event that peers are purposefully
|
||||
@ -425,6 +432,7 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
|
||||
persistentPeersBackoff: make(map[string]time.Duration),
|
||||
persistentConnReqs: make(map[string][]*connmgr.ConnReq),
|
||||
persistentRetryCancels: make(map[string]chan struct{}),
|
||||
peerErrors: make(map[string]*queue.CircularBuffer),
|
||||
ignorePeerTermination: make(map[*peer]struct{}),
|
||||
scheduledPeerConnection: make(map[string]func()),
|
||||
|
||||
@ -2782,6 +2790,19 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
|
||||
initFeatures := s.featureMgr.Get(feature.SetInit)
|
||||
legacyFeatures := s.featureMgr.Get(feature.SetLegacyGlobal)
|
||||
|
||||
// Lookup past error caches for the peer in the server. If no buffer is
|
||||
// found, create a fresh buffer.
|
||||
pkStr := string(peerAddr.IdentityKey.SerializeCompressed())
|
||||
errBuffer, ok := s.peerErrors[pkStr]
|
||||
if !ok {
|
||||
var err error
|
||||
errBuffer, err = queue.NewCircularBuffer(errorBufferSize)
|
||||
if err != nil {
|
||||
srvrLog.Errorf("unable to create peer %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Now that we've established a connection, create a peer, and it to the
|
||||
// set of currently active peers. Configure the peer with the incoming
|
||||
// and outgoing broadcast deltas to prevent htlcs from being accepted or
|
||||
@ -2791,7 +2812,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
|
||||
p, err := newPeer(
|
||||
conn, connReq, s, peerAddr, inbound, initFeatures,
|
||||
legacyFeatures, cfg.ChanEnableTimeout,
|
||||
defaultOutgoingCltvRejectDelta,
|
||||
defaultOutgoingCltvRejectDelta, errBuffer,
|
||||
)
|
||||
if err != nil {
|
||||
srvrLog.Errorf("unable to create peer %v", err)
|
||||
@ -2803,6 +2824,11 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
|
||||
|
||||
s.addPeer(p)
|
||||
|
||||
// Once we have successfully added the peer to the server, we can
|
||||
// delete the previous error buffer from the server's map of error
|
||||
// buffers.
|
||||
delete(s.peerErrors, pkStr)
|
||||
|
||||
// Dispatch a goroutine to asynchronously start the peer. This process
|
||||
// includes sending and receiving Init messages, which would be a DOS
|
||||
// vector if we held the server's mutex throughout the procedure.
|
||||
@ -3097,6 +3123,12 @@ func (s *server) removePeer(p *peer) {
|
||||
delete(s.outboundPeers, pubStr)
|
||||
}
|
||||
|
||||
// Copy the peer's error buffer across to the server if it has any items
|
||||
// in it so that we can restore peer errors across connections.
|
||||
if p.errorBuffer.Total() > 0 {
|
||||
s.peerErrors[pubStr] = p.errorBuffer
|
||||
}
|
||||
|
||||
// Inform the peer notifier of a peer offline event so that it can be
|
||||
// reported to clients listening for peer events.
|
||||
var pubKey [33]byte
|
||||
|
Loading…
Reference in New Issue
Block a user