Merge pull request #7239 from Roasbeef/gossip-scoping-issues

discovery: ensure we prioritize sending out our own local announcements
This commit is contained in:
Olaoluwa Osuntokun 2022-12-16 13:51:17 -08:00 committed by GitHub
commit de3e0d7875
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 185 additions and 67 deletions

View File

@ -18,6 +18,7 @@ import (
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnpeer"
@ -826,6 +827,11 @@ type msgWithSenders struct {
// msg is the wire message itself.
msg lnwire.Message
// isLocal is true if this was a message that originated locally. We'll
// use this to bypass our normal checks to ensure we prioritize sending
// out our own updates.
isLocal bool
// sender is the set of peers that sent us this message.
senders map[route.Vertex]struct{}
}
@ -903,6 +909,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) {
if !ok {
mws = msgWithSenders{
msg: msg,
isLocal: !message.isRemote,
senders: make(map[route.Vertex]struct{}),
}
mws.senders[sender] = struct{}{}
@ -949,6 +956,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) {
if oldTimestamp < msg.Timestamp {
mws = msgWithSenders{
msg: msg,
isLocal: !message.isRemote,
senders: make(map[route.Vertex]struct{}),
}
@ -992,6 +1000,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) {
if oldTimestamp < msg.Timestamp {
mws = msgWithSenders{
msg: msg,
isLocal: !message.isRemote,
senders: make(map[route.Vertex]struct{}),
}
@ -1020,13 +1029,44 @@ func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
}
}
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
// to broadcast next into messages that are locally sourced and those that are
// sourced remotely.
type msgsToBroadcast struct {
// localMsgs is the set of messages we created locally.
localMsgs []msgWithSenders
// remoteMsgs is the set of messages that we received from a remote
// party.
remoteMsgs []msgWithSenders
}
// addMsg adds a new message to the appropriate sub-slice.
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
if msg.isLocal {
m.localMsgs = append(m.localMsgs, msg)
} else {
m.remoteMsgs = append(m.remoteMsgs, msg)
}
}
// isEmpty returns true if the batch is empty.
func (m *msgsToBroadcast) isEmpty() bool {
return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
}
// length returns the length of the combined message set.
func (m *msgsToBroadcast) length() int {
return len(m.localMsgs) + len(m.remoteMsgs)
}
// Emit returns the set of de-duplicated announcements to be sent out during
// the next announcement epoch, in the order of channel announcements, channel
// updates, and node announcements. Each message emitted, contains the set of
// peers that sent us the message. This way, we can ensure that we don't waste
// bandwidth by re-sending a message to the peer that sent it to us in the
// first place. Additionally, the set of stored messages are reset.
func (d *deDupedAnnouncements) Emit() []msgWithSenders {
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
d.Lock()
defer d.Unlock()
@ -1036,21 +1076,24 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders {
// Create an empty array of lnwire.Messages with a length equal to
// the total number of announcements.
msgs := make([]msgWithSenders, 0, numAnnouncements)
msgs := msgsToBroadcast{
localMsgs: make([]msgWithSenders, 0, numAnnouncements),
remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
}
// Add the channel announcements to the array first.
for _, message := range d.channelAnnouncements {
msgs = append(msgs, message)
msgs.addMsg(message)
}
// Then add the channel updates.
for _, message := range d.channelUpdates {
msgs = append(msgs, message)
msgs.addMsg(message)
}
// Finally add the node announcements.
for _, message := range d.nodeAnnouncements {
msgs = append(msgs, message)
msgs.addMsg(message)
}
d.reset()
@ -1096,26 +1139,83 @@ func splitAnnouncementBatches(subBatchSize int,
return splitAnnouncementBatch
}
// sendBatch broadcasts a list of announcements to our peers.
func (d *AuthenticatedGossiper) sendBatch(announcementBatch []msgWithSenders) {
syncerPeers := d.syncMgr.GossipSyncers()
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
// split size, and then sends out all items to the set of target peers. If
// isLocal is true, then we'll send them to all peers and skip any gossip
// filter checks.
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(annBatch []msgWithSenders,
isLocal bool) {
// We'll first attempt to filter out this new message
// for all peers that have active gossip syncers
// active.
for _, syncer := range syncerPeers {
syncer.FilterGossipMsgs(announcementBatch...)
// Next, If we have new things to announce then broadcast them to all
// our immediately connected peers.
subBatchSize := calculateSubBatchSize(
d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
d.cfg.MinimumBatchSize, len(annBatch),
)
splitAnnouncementBatch := splitAnnouncementBatches(
subBatchSize, annBatch,
)
d.wg.Add(1)
go func() {
defer d.wg.Done()
log.Infof("Broadcasting %v new announcements in %d sub "+
"batches (local=%v)", len(annBatch),
len(splitAnnouncementBatch), isLocal)
for _, announcementBatch := range splitAnnouncementBatch {
d.sendBatch(announcementBatch, isLocal)
select {
case <-time.After(d.cfg.SubBatchDelay):
case <-d.quit:
return
}
}
}()
}
// sendBatch broadcasts a list of announcements to our peers.
func (d *AuthenticatedGossiper) sendBatch(annBatch []msgWithSenders,
isLocal bool) {
// If this is a batch of announcements created locally, then we can
// skip the filter and dedup logic below, and just send the
// announcements out to all our coonnected peers.
if isLocal {
msgsToSend := fn.Map(
annBatch, func(m msgWithSenders) lnwire.Message {
return m.msg
},
)
err := d.cfg.Broadcast(nil, msgsToSend...)
if err != nil {
log.Errorf("Unable to send local batch "+
"announcements: %v", err)
}
return
}
for _, msgChunk := range announcementBatch {
// With the syncers taken care of, we'll merge
// the sender map with the set of syncers, so
// we don't send out duplicate messages.
syncerPeers := d.syncMgr.GossipSyncers()
// We'll first attempt to filter out this new message for all peers
// that have active gossip syncers active.
for _, syncer := range syncerPeers {
syncer.FilterGossipMsgs(annBatch...)
}
for _, msgChunk := range annBatch {
msgChunk := msgChunk
// With the syncers taken care of, we'll merge the sender map
// with the set of syncers, so we don't send out duplicate
// messages.
msgChunk.mergeSyncerMap(syncerPeers)
err := d.cfg.Broadcast(
msgChunk.senders, msgChunk.msg,
)
err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
if err != nil {
log.Errorf("Unable to send batch "+
"announcements: %v", err)
@ -1240,39 +1340,23 @@ func (d *AuthenticatedGossiper) networkHandler() {
// If the current announcements batch is nil, then we
// have no further work here.
if len(announcementBatch) == 0 {
if announcementBatch.isEmpty() {
continue
}
// Next, If we have new things to announce then
// broadcast them to all our immediately connected
// peers.
subBatchSize := calculateSubBatchSize(
d.cfg.TrickleDelay, d.cfg.SubBatchDelay, d.cfg.MinimumBatchSize,
len(announcementBatch),
// At this point, we have the set of local and remote
// announcements we want to send out. We'll do the
// batching as normal for both, but for local
// announcements, we'll blast them out w/o regard for
// our peer's policies so we ensure they propagate
// properly.
d.splitAndSendAnnBatch(
announcementBatch.localMsgs, true,
)
splitAnnouncementBatch := splitAnnouncementBatches(
subBatchSize, announcementBatch,
d.splitAndSendAnnBatch(
announcementBatch.remoteMsgs, false,
)
d.wg.Add(1)
go func() {
defer d.wg.Done()
log.Infof("Broadcasting %v new announcements in %d sub batches",
len(announcementBatch), len(splitAnnouncementBatch))
for _, announcementBatch := range splitAnnouncementBatch {
d.sendBatch(announcementBatch)
select {
case <-time.After(d.cfg.SubBatchDelay):
case <-d.quit:
return
}
}
}()
// The retransmission timer has ticked which indicates that we
// should check if we need to prune or re-broadcast any of our
// personal channels or node announcement. This addresses the
@ -1611,8 +1695,9 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(
// We set ourselves as the source of this message to indicate
// that we shouldn't skip any peers when sending this message.
chanUpdates = append(chanUpdates, networkMsg{
source: d.selfKey,
msg: chanUpdate,
source: d.selfKey,
isRemote: false,
msg: chanUpdate,
})
}
@ -2182,9 +2267,10 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
// be broadcast to the rest of our peers.
if isPublic {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
msg: nodeAnn,
peer: nMsg.peer,
isRemote: nMsg.isRemote,
source: nMsg.source,
msg: nodeAnn,
})
} else {
log.Tracef("Skipping broadcasting node announcement for %x "+
@ -2460,9 +2546,10 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
if proof != nil {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
msg: ann,
peer: nMsg.peer,
isRemote: nMsg.isRemote,
source: nMsg.source,
msg: ann,
})
}
@ -2848,9 +2935,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
var announcements []networkMsg
if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
msg: upd,
peer: nMsg.peer,
source: nMsg.source,
isRemote: nMsg.isRemote,
msg: upd,
})
}
@ -2949,6 +3037,7 @@ func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
} else {
remotePubKey = chanInfo.NodeKey1Bytes
}
// Since the remote peer might not be online we'll call a
// method that will attempt to deliver the proof when it comes
// online.

View File

@ -824,6 +824,8 @@ func TestProcessAnnouncement(t *testing.T) {
require.NoError(t, err, "can't create context")
assertSenderExistence := func(sender *btcec.PublicKey, msg msgWithSenders) {
t.Helper()
if _, ok := msg.senders[route.NewVertex(sender)]; !ok {
t.Fatalf("sender=%x not present in %v",
sender.SerializeCompressed(), spew.Sdump(msg))
@ -1910,31 +1912,37 @@ func TestDeDuplicatedAnnouncements(t *testing.T) {
// Ensure that announcement batch delivers channel announcements,
// channel updates, and node announcements in proper order.
batch := announcements.Emit()
if len(batch) != 4 {
if batch.length() != 4 {
t.Fatal("announcement batch incorrect length")
}
if !reflect.DeepEqual(batch[0].msg, ca2) {
if !reflect.DeepEqual(batch.localMsgs[0].msg, ca2) {
t.Fatalf("channel announcement not first in batch: got %v, "+
"expected %v", spew.Sdump(batch[0].msg), spew.Sdump(ca2))
"expected %v", spew.Sdump(batch.localMsgs[0].msg),
spew.Sdump(ca2))
}
if !reflect.DeepEqual(batch[1].msg, ua3) {
if !reflect.DeepEqual(batch.localMsgs[1].msg, ua3) {
t.Fatalf("channel update not next in batch: got %v, "+
"expected %v", spew.Sdump(batch[1].msg), spew.Sdump(ua2))
"expected %v", spew.Sdump(batch.localMsgs[1].msg),
spew.Sdump(ua2))
}
// We'll ensure that both node announcements are present. We check both
// indexes as due to the randomized order of map iteration they may be
// in either place.
if !reflect.DeepEqual(batch[2].msg, na) && !reflect.DeepEqual(batch[3].msg, na) {
if !reflect.DeepEqual(batch.localMsgs[2].msg, na) &&
!reflect.DeepEqual(batch.localMsgs[3].msg, na) {
t.Fatalf("first node announcement not in last part of batch: "+
"got %v, expected %v", batch[2].msg,
"got %v, expected %v", batch.localMsgs[2].msg,
na)
}
if !reflect.DeepEqual(batch[2].msg, na5) && !reflect.DeepEqual(batch[3].msg, na5) {
if !reflect.DeepEqual(batch.localMsgs[2].msg, na5) &&
!reflect.DeepEqual(batch.localMsgs[3].msg, na5) {
t.Fatalf("second node announcement not in last part of batch: "+
"got %v, expected %v", batch[3].msg,
"got %v, expected %v", batch.localMsgs[3].msg,
na5)
}

View File

@ -1,5 +1,13 @@
# Release Notes
## Peer to Peer Behavior
`lnd` will now [properly prioritize sending out gossip updates generated
locally to all connected
peers](https://github.com/lightningnetwork/lnd/pull/7239), regardless of their
current gossip sync query status.
## BOLT Specs
* Warning messages from peers are now recognized and

13
fn/stream.go Normal file
View File

@ -0,0 +1,13 @@
package fn
// Map takes an input slice, and applies the function f to each element,
// yielding a new slice.
func Map[T1, T2 any](s []T1, f func(T1) T2) []T2 {
r := make([]T2, len(s))
for i, v := range s {
r[i] = f(v)
}
return r
}