discovery: flatten future message cache

This commit removes the slice used when saving future messages into the
cache. Instead, each message is now saved independently into the cache
with a monotonically increasing integer as its ID.
This commit is contained in:
yyforyongyu 2023-02-24 04:29:35 +08:00
parent 52facd3e5a
commit 78a983c014
No known key found for this signature in database
GPG key ID: 9BCD95C4FF296868

View file

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcec/v2"
@ -428,8 +429,9 @@ type AuthenticatedGossiper struct {
// height specified in the future. We will save them and resend it to
// the chan networkMsgs once the block height has reached. The cached
// map format is,
// {blockHeight: [msg1, msg2, ...], ...}
futureMsgs *lru.Cache[uint32, *cachedNetworkMsg]
// {msgID1: msg1, msgID2: msg2, ...}
futureMsgs *lru.Cache[uint64, *cachedFutureMsg]
futureMsgID atomic.Uint64
// chanPolicyUpdates is a channel that requests to update the
// forwarding policy of a set of channels is sent over.
@ -486,7 +488,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
selfKeyLoc: selfKeyDesc.KeyLocator,
cfg: &cfg,
networkMsgs: make(chan *networkMsg),
futureMsgs: lru.NewCache[uint32, *cachedNetworkMsg](
futureMsgs: lru.NewCache[uint64, *cachedFutureMsg](
maxFutureMessages,
),
quit: make(chan struct{}),
@ -636,33 +638,64 @@ func (d *AuthenticatedGossiper) syncBlockHeight() {
}
}
// cachedFutureMsg is a future message that's saved to the `futureMsgs` cache.
type cachedFutureMsg struct {
// msg is the network message.
msg *networkMsg
// height is the block height.
height uint32
}
// Size returns the size of the message.
func (c *cachedFutureMsg) Size() (uint64, error) {
// Return a constant 1.
return 1, nil
}
// resendFutureMessages takes a block height, resends all the future messages
// found at that height and deletes those messages found in the gossiper's
// futureMsgs.
// found below and equal to that height and deletes those messages found in the
// gossiper's futureMsgs.
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
result, err := d.futureMsgs.Get(height)
var (
// msgs are the target messages.
msgs []*networkMsg
// keys are the target messages' caching keys.
keys []uint64
)
// filterMsgs is the visitor used when iterating the future cache.
filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
if cmsg.height <= height {
msgs = append(msgs, cmsg.msg)
keys = append(keys, k)
}
return true
}
// Filter out the target messages.
d.futureMsgs.Range(filterMsgs)
// Return early if no messages found.
if err == cache.ErrElementNotFound {
if len(msgs) == 0 {
return
}
// The error must nil, we will log an error and exit.
if err != nil {
log.Errorf("Reading future messages got error: %v", err)
return
// Remove the filtered messages.
for _, key := range keys {
d.futureMsgs.Delete(key)
}
msgs := result.msgs
log.Debugf("Resending %d network messages at height %d",
len(msgs), height)
for _, pMsg := range msgs {
for _, msg := range msgs {
select {
case d.networkMsgs <- pMsg.msg:
case d.networkMsgs <- msg:
case <-d.quit:
pMsg.msg.err <- ErrGossiperShuttingDown
msg.err <- ErrGossiperShuttingDown
}
}
}
@ -1879,23 +1912,11 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
return false
}
// Add the premature message to our future messages which will
// be resent once the block height has reached.
// Add the premature message to our future messages which will be
// resent once the block height has reached.
//
// Init an empty cached message and overwrite it if there are cached
// messages found.
cachedMsgs := &cachedNetworkMsg{
msgs: make([]*processedNetworkMsg, 0),
}
result, err := d.futureMsgs.Get(msgHeight)
// No error returned means we have old messages cached.
if err == nil {
cachedMsgs = result
}
// Copy the networkMsgs since the old message's err chan will
// be consumed.
// Copy the networkMsgs since the old message's err chan will be
// consumed.
copied := &networkMsg{
peer: msg.peer,
source: msg.source,
@ -1905,15 +1926,15 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
err: make(chan error, 1),
}
// The processed boolean is unused in the futureMsgs case.
pMsg := &processedNetworkMsg{msg: copied}
// Create the cached message.
cachedMsg := &cachedFutureMsg{
msg: copied,
height: msgHeight,
}
// Add the network message.
msgs := cachedMsgs.msgs
msgs = append(msgs, pMsg)
_, err = d.futureMsgs.Put(msgHeight, &cachedNetworkMsg{
msgs: msgs,
})
// Increment the msg ID and add it to the cache.
nextMsgID := d.futureMsgID.Add(1)
_, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
if err != nil {
log.Errorf("Adding future message got error: %v", err)
}