multi: add flap count and last flap time to listpeers

This commit is contained in:
carla 2020-08-27 09:20:46 +02:00
parent 6cf66aea47
commit e2c0604657
No known key found for this signature in database
GPG Key ID: 4CA7FE54A6213C91
6 changed files with 964 additions and 750 deletions

View File

@ -56,6 +56,9 @@ type ChannelEventStore struct {
// chanInfoRequests serves requests for information about our channel.
chanInfoRequests chan channelInfoRequest
// peerRequests serves requests for information about a peer.
peerRequests chan peerRequest
quit chan struct{}
wg sync.WaitGroup
@ -108,6 +111,17 @@ type channelInfoResponse struct {
err error
}
type peerRequest struct {
peer route.Vertex
responseChan chan peerResponse
}
type peerResponse struct {
flapCount int
ts *time.Time
err error
}
// NewChannelEventStore initializes an event store with the config provided.
// Note that this function does not start the main event loop, Start() must be
// called.
@ -116,6 +130,7 @@ func NewChannelEventStore(config *Config) *ChannelEventStore {
cfg: config,
peers: make(map[route.Vertex]peerMonitor),
chanInfoRequests: make(chan channelInfoRequest),
peerRequests: make(chan peerRequest),
quit: make(chan struct{}),
}
@ -373,6 +388,15 @@ func (c *ChannelEventStore) consume(subscriptions *subscriptions) {
resp.info, resp.err = c.getChanInfo(req)
req.responseChan <- resp
// Serve all requests for information about our peer.
case req := <-c.peerRequests:
var resp peerResponse
resp.flapCount, resp.ts, resp.err = c.flapCount(
req.peer,
)
req.responseChan <- resp
case <-c.cfg.FlapCountTicker.Ticks():
if err := c.recordFlapCount(); err != nil {
log.Errorf("could not record flap "+
@ -449,6 +473,70 @@ func (c *ChannelEventStore) getChanInfo(req channelInfoRequest) (*ChannelInfo,
}, nil
}
// FlapCount returns the flap count we have for a peer and the timestamp of its
// last flap. If we do not have any flaps recorded for the peer, the last flap
// timestamp will be nil.
func (c *ChannelEventStore) FlapCount(peer route.Vertex) (int, *time.Time,
error) {
request := peerRequest{
peer: peer,
responseChan: make(chan peerResponse),
}
// Send a request for the peer's information to the main event loop,
// or return early with an error if the store has already received a
// shutdown signal.
select {
case c.peerRequests <- request:
case <-c.quit:
return 0, nil, errShuttingDown
}
// Return the response we receive on the response channel or exit early
// if the store is instructed to exit.
select {
case resp := <-request.responseChan:
return resp.flapCount, resp.ts, resp.err
case <-c.quit:
return 0, nil, errShuttingDown
}
}
// flapCount gets our peer flap count and last flap timestamp from our in memory
// record of a peer, falling back to on disk if we are not currently tracking
// the peer. If we have no flap count recorded for the peer, a nil last flap
// time will be returned.
func (c *ChannelEventStore) flapCount(peer route.Vertex) (int, *time.Time,
error) {
// First check whether we are tracking this peer in memory, because this
// record will have the most accurate flap count. We do not fail if we
// can't find the peer in memory, because we may have previously
// recorded its flap count on disk.
peerMonitor, ok := c.peers[peer]
if ok {
count, ts := peerMonitor.getFlapCount()
return count, ts, nil
}
// Try to get our flap count from the database. If this value is not
// recorded, we return a nil last flap time to indicate that we have no
// record of the peer's flap count.
flapCount, err := c.cfg.ReadFlapCount(peer)
switch err {
case channeldb.ErrNoPeerBucket:
return 0, nil, nil
case nil:
return int(flapCount.Count), &flapCount.LastFlap, nil
default:
return 0, nil, err
}
}
// recordFlapCount will record our flap count for each peer that we are
// currently tracking, skipping peers that have a 0 flap count.
func (c *ChannelEventStore) recordFlapCount() error {

View File

@ -290,3 +290,54 @@ func TestGetChanInfo(t *testing.T) {
ctx.stop()
}
// TestFlapCount tests querying the store for peer flap counts, covering the
// case where the peer is tracked in memory, and the case where we need to
// lookup the peer on disk.
func TestFlapCount(t *testing.T) {
clock := clock.NewTestClock(testNow)
var (
peer = route.Vertex{9, 9, 9}
peerFlapCount = 3
lastFlap = clock.Now()
)
// Create a test context with one peer's flap count already recorded,
// which mocks it already having its flap count stored on disk.
ctx := newChanEventStoreTestCtx(t)
ctx.flapUpdates[peer] = &channeldb.FlapCount{
Count: uint32(peerFlapCount),
LastFlap: lastFlap,
}
ctx.start()
// Create test variables for a peer and channel, but do not add it to
// our store yet.
peer1 := route.Vertex{1, 2, 3}
// First, query for a peer that we have no record of in memory or on
// disk and confirm that we indicate that the peer was not found.
_, ts, err := ctx.store.FlapCount(peer1)
require.NoError(t, err)
require.Nil(t, ts)
// Send an online event for our peer.
ctx.peerEvent(peer1, true)
// Assert that we now find a record of the peer with flap count = 1.
count, ts, err := ctx.store.FlapCount(peer1)
require.NoError(t, err)
require.Equal(t, lastFlap, *ts)
require.Equal(t, 1, count)
// Make a request for our peer that not tracked in memory, but does
// have its flap count stored on disk.
count, ts, err = ctx.store.FlapCount(peer)
require.NoError(t, err)
require.Equal(t, lastFlap, *ts)
require.Equal(t, peerFlapCount, count)
ctx.stop()
}

File diff suppressed because it is too large Load Diff

View File

@ -1415,6 +1415,20 @@ message Peer {
spamming us with errors at no cost.
*/
repeated TimestampedError errors = 12;
/*
The number of times we have recorded this peer going offline or coming
online, recorded across restarts. Note that this value is decreased over
time if the peer has not recently flapped, so that we can forgive peers
with historically high flap counts.
*/
int32 flap_count = 13;
/*
The timestamp of the last flap we observed for this peer. If this value is
zero, we have not observed any flaps for this peer.
*/
int64 last_flap_ns = 14;
}
message TimestampedError {

View File

@ -4800,6 +4800,16 @@
"$ref": "#/definitions/lnrpcTimestampedError"
},
"description": "The latest errors received from our peer with timestamps, limited to the 10\nmost recent errors. These errors are tracked across peer connections, but\nare not persisted across lnd restarts. Note that these errors are only\nstored for peers that we have channels open with, to prevent peers from\nspamming us with errors at no cost."
},
"flap_count": {
"type": "integer",
"format": "int32",
"description": "The number of times we have recorded this peer going offline or coming\nonline, recorded across restarts. Note that this value is decreased over\ntime if the peer has not recently flapped, so that we can forgive peers\nwith historically high flap counts."
},
"last_flap_ns": {
"type": "string",
"format": "int64",
"description": "The timestamp of the last flap we observed for this peer. If this value is\nzero, we have not observed any flaps for this peer."
}
}
},

View File

@ -2701,6 +2701,30 @@ func (r *rpcServer) ListPeers(ctx context.Context,
rpcPeer.Errors = append(rpcPeer.Errors, rpcErr)
}
// If the server has started, we can query the event store
// for our peer's flap count. If we do so when the server has
// not started, the request will block.
if r.server.Started() {
vertex, err := route.NewVertexFromBytes(nodePub[:])
if err != nil {
return nil, err
}
flap, ts, err := r.server.chanEventStore.FlapCount(
vertex,
)
if err != nil {
return nil, err
}
// If our timestamp is non-nil, we have values for our
// peer's flap count, so we set them.
if ts != nil {
rpcPeer.FlapCount = int32(flap)
rpcPeer.LastFlapNs = ts.UnixNano()
}
}
resp.Peers = append(resp.Peers, rpcPeer)
}