diff --git a/chanfitness/chanevent.go b/chanfitness/chanevent.go index 30e856ee1..6338887fe 100644 --- a/chanfitness/chanevent.go +++ b/chanfitness/chanevent.go @@ -43,9 +43,31 @@ type peerLog struct { // online stores whether the peer is currently online. online bool - // onlineEvents is a log of timestamped events observed for the peer. + // onlineEvents is a log of timestamped events observed for the peer + // that we have committed to allocating memory to. onlineEvents []*event + // stagedEvent represents an event that is pending addition to the + // events list. It has not yet been added because we rate limit the + // frequency that we store events at. We need to store this value + // in the log (rather than just ignore events) so that we can flush the + // aggregate outcome to our event log once the rate limiting period has + // ended. + // + // Take the following example: + // - Peer online event recorded + // - Peer offline event, not recorded due to rate limit + // - No more events, we incorrectly believe our peer to be online + // Instead of skipping events, we stage the most recent event during the + // rate limited period so that we know what happened (on aggregate) + // while we were rate limiting events. + // + // Note that we currently only store offline/online events so we can + // use this field to track our online state. With the addition of other + // event types, we need to only stage online/offline events, or split + // them out. + stagedEvent *event + // flapCount is the number of times this peer has been observed as // going offline. flapCount int @@ -105,7 +127,8 @@ func (p *peerLog) onlineEvent(online bool) { p.addEvent(online, eventTime) } -// addEvent records an online or offline event in our event log. +// addEvent records an online or offline event in our event log. and increments +// the peer's flap count. func (p *peerLog) addEvent(online bool, time time.Time) { eventType := peerOnlineEvent if !online { @@ -117,7 +140,26 @@ func (p *peerLog) addEvent(online bool, time time.Time) { eventType: eventType, } - p.onlineEvents = append(p.onlineEvents, event) + // If we have no staged events, we can just stage this event and return. + if p.stagedEvent == nil { + p.stagedEvent = event + return + } + + // We get the amount of time we require between events according to + // peer flap count. + aggregation := getRateLimit(p.flapCount) + nextRecordTime := p.stagedEvent.timestamp.Add(aggregation) + flushEvent := nextRecordTime.Before(event.timestamp) + + // If enough time has passed since our last staged event, we add our + // event to our in-memory list. + if flushEvent { + p.onlineEvents = append(p.onlineEvents, p.stagedEvent) + } + + // Finally, we replace our staged event with the new event we received. + p.stagedEvent = event } // addChannel adds a channel to our log. If we have not tracked any online @@ -160,6 +202,7 @@ func (p *peerLog) removeChannel(channelPoint wire.OutPoint) error { // TODO(carla): this could be done on a per channel basis. if p.channelCount() == 0 { p.onlineEvents = nil + p.stagedEvent = nil } return nil @@ -197,6 +240,18 @@ func (p *peerLog) getFlapCount() (int, *time.Time) { return p.flapCount, p.lastFlap } +// listEvents returns all of the events that our event log has tracked, +// including events that are staged for addition to our set of events but have +// not yet been committed to (because we rate limit and store only the aggregate +// outcome over a period). +func (p *peerLog) listEvents() []*event { + if p.stagedEvent == nil { + return p.onlineEvents + } + + return append(p.onlineEvents, p.stagedEvent) +} + // onlinePeriod represents a period of time over which a peer was online. type onlinePeriod struct { start, end time.Time @@ -211,8 +266,10 @@ type onlinePeriod struct { // to be ordered by ascending timestamp, and can tolerate multiple consecutive // online or offline events. func (p *peerLog) getOnlinePeriods() []*onlinePeriod { + events := p.listEvents() + // Return early if there are no events, there are no online periods. - if len(p.onlineEvents) == 0 { + if len(events) == 0 { return nil } @@ -231,7 +288,7 @@ func (p *peerLog) getOnlinePeriods() []*onlinePeriod { // the online event and the present is not tracked. The type of the most // recent event is tracked using the offline bool so that we can add a // final online period if necessary. - for _, event := range p.onlineEvents { + for _, event := range events { switch event.eventType { case peerOnlineEvent: // If our previous event is nil, we just set it and diff --git a/chanfitness/chanevent_test.go b/chanfitness/chanevent_test.go index 7d75841df..03eec5157 100644 --- a/chanfitness/chanevent_test.go +++ b/chanfitness/chanevent_test.go @@ -122,6 +122,88 @@ func TestPeerLog(t *testing.T) { require.Equal(t, 0, peerLog.channelCount()) require.Len(t, peerLog.onlineEvents, 0) assertFlapCount(3, &lastFlap) + + require.Len(t, peerLog.listEvents(), 0) + require.Nil(t, peerLog.stagedEvent) +} + +// TestRateLimitAdd tests the addition of events to the event log with rate +// limiting in place. +func TestRateLimitAdd(t *testing.T) { + // Create a mock clock specifically for this test so that we can + // progress time without affecting the other tests. + mockedClock := clock.NewTestClock(testNow) + + // Create a new peer log. + peerLog := newPeerLog(mockedClock) + require.Nil(t, peerLog.stagedEvent) + + // Create a channel for our peer log, otherwise it will not track online + // events. + require.NoError(t, peerLog.addChannel(wire.OutPoint{})) + + // First, we add an event to the event log. Since we have no previous + // events, we expect this event to staged immediately. + peerEvent := &event{ + timestamp: testNow, + eventType: peerOfflineEvent, + } + + peerLog.onlineEvent(false) + require.Equal(t, peerEvent, peerLog.stagedEvent) + + // We immediately add another event to our event log. We expect our + // staged event to be replaced with this new event, because insufficient + // time has passed since our last event. + peerEvent = &event{ + timestamp: testNow, + eventType: peerOnlineEvent, + } + + peerLog.onlineEvent(true) + require.Equal(t, peerEvent, peerLog.stagedEvent) + + // We get the amount of time that we need to pass before we record an + // event from our rate limiting tiers. We then progress our test clock + // to just after this point. + delta := getRateLimit(peerLog.flapCount) + newNow := testNow.Add(delta + 1) + mockedClock.SetTime(newNow) + + // Now, when we add an event, we expect our staged event to be added + // to our events list and for our new event to be staged. + newEvent := &event{ + timestamp: newNow, + eventType: peerOfflineEvent, + } + peerLog.onlineEvent(false) + + require.Equal(t, []*event{peerEvent}, peerLog.onlineEvents) + require.Equal(t, newEvent, peerLog.stagedEvent) + + // Now, we test the case where we add many events to our log. We expect + // our set of events to be untouched, but for our staged event to be + // updated. + nextEvent := &event{ + timestamp: newNow, + eventType: peerOnlineEvent, + } + + for i := 0; i < 5; i++ { + // We flip the kind of event for each type so that we can check + // that our staged event is definitely changing each time. + if i%2 == 0 { + nextEvent.eventType = peerOfflineEvent + } else { + nextEvent.eventType = peerOnlineEvent + } + + online := nextEvent.eventType == peerOnlineEvent + + peerLog.onlineEvent(online) + require.Equal(t, []*event{peerEvent}, peerLog.onlineEvents) + require.Equal(t, nextEvent, peerLog.stagedEvent) + } } // TestGetOnlinePeriod tests the getOnlinePeriod function. It tests the case diff --git a/chanfitness/rate_limit.go b/chanfitness/rate_limit.go new file mode 100644 index 000000000..3fa5a3255 --- /dev/null +++ b/chanfitness/rate_limit.go @@ -0,0 +1,37 @@ +package chanfitness + +import "time" + +// rateLimitScale is the number of events we allow per rate limited tier. +// Increasing this value makes our rate limiting more lenient, decreasing it +// makes us less lenient. +const rateLimitScale = 200 + +// rateLimits is the set of rate limit tiers we apply to our peers based on +// their flap count. A peer can be placed in their tier by dividing their flap +// count by the rateLimitScale and returning the value at that index. +var rateLimits = []time.Duration{ + time.Second, + time.Second * 5, + time.Second * 30, + time.Minute, + time.Minute * 30, + time.Hour, +} + +// getRateLimit returns the value of the rate limited tier that we are on based +// on current flap count. If a peer's flap count exceeds the top tier, we just +// return our highest tier. +func getRateLimit(flapCount int) time.Duration { + // Figure out the tier we fall into based on our current flap count. + tier := flapCount / rateLimitScale + + // If we have more events than our number of tiers, we just use the + // last tier + tierLen := len(rateLimits) + if tier >= tierLen { + tier = tierLen - 1 + } + + return rateLimits[tier] +} diff --git a/chanfitness/rate_limit_test.go b/chanfitness/rate_limit_test.go new file mode 100644 index 000000000..0a9aa0e37 --- /dev/null +++ b/chanfitness/rate_limit_test.go @@ -0,0 +1,51 @@ +package chanfitness + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// TestGetRateLimit tests getting of our rate limit using the current constants. +// It creates test cases that are relative to our constants so that they +// can be adjusted without breaking the unit test. +func TestGetRateLimit(t *testing.T) { + tests := []struct { + name string + flapCount int + rateLimit time.Duration + }{ + { + name: "zero flaps", + flapCount: 0, + rateLimit: rateLimits[0], + }, + { + name: "middle tier", + flapCount: rateLimitScale * (len(rateLimits) / 2), + rateLimit: rateLimits[len(rateLimits)/2], + }, + { + name: "last tier", + flapCount: rateLimitScale * (len(rateLimits) - 1), + rateLimit: rateLimits[len(rateLimits)-1], + }, + { + name: "beyond last tier", + flapCount: rateLimitScale * (len(rateLimits) * 2), + rateLimit: rateLimits[len(rateLimits)-1], + }, + } + + for _, test := range tests { + test := test + + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + limit := getRateLimit(test.flapCount) + require.Equal(t, test.rateLimit, limit) + }) + } +}