mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-01-19 05:45:21 +01:00
discovery: ensure we prioritize sending out our own local announcements
In this commit, we modify our gossip broadcast logic to ensure that we always will send out our own gossip messages regardless of the filtering/feature policies of the peer. Before this commit, it was possible that when we went to broadcast an announcement, none of our peers actually had us as a syncer peer (lnd terminology). In this case, the FilterGossipMsg function wouldn't do anything, as they don't have an active timestamp filter set. When we go to them merge the syncer map, we'd add all these peers we didn't send to, meaning we would skip them when it came to broadcast time. In this commit, we now split things into two phases: we'll broadcast _our_ own announcements to all our peers, but then do the normal filtering and chunking for the announcements we got from a remote peer. Fixes https://github.com/lightningnetwork/lnd/issues/6531 Fixes https://github.com/lightningnetwork/lnd/issues/7223 Fixes https://github.com/lightningnetwork/lnd/issues/7073
This commit is contained in:
parent
9a4701d709
commit
52451b37af
@ -18,6 +18,7 @@ import (
|
||||
"github.com/lightningnetwork/lnd/batch"
|
||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||
"github.com/lightningnetwork/lnd/channeldb"
|
||||
"github.com/lightningnetwork/lnd/fn"
|
||||
"github.com/lightningnetwork/lnd/keychain"
|
||||
"github.com/lightningnetwork/lnd/kvdb"
|
||||
"github.com/lightningnetwork/lnd/lnpeer"
|
||||
@ -1035,7 +1036,8 @@ type msgsToBroadcast struct {
|
||||
// localMsgs is the set of messages we created locally.
|
||||
localMsgs []msgWithSenders
|
||||
|
||||
// remoteMsgs is the set of messages that we received from a remote party.
|
||||
// remoteMsgs is the set of messages that we received from a remote
|
||||
// party.
|
||||
remoteMsgs []msgWithSenders
|
||||
}
|
||||
|
||||
@ -1053,13 +1055,18 @@ func (m *msgsToBroadcast) isEmpty() bool {
|
||||
return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
|
||||
}
|
||||
|
||||
// length returns the length of the combined message set.
|
||||
func (m *msgsToBroadcast) length() int {
|
||||
return len(m.localMsgs) + len(m.remoteMsgs)
|
||||
}
|
||||
|
||||
// Emit returns the set of de-duplicated announcements to be sent out during
|
||||
// the next announcement epoch, in the order of channel announcements, channel
|
||||
// updates, and node announcements. Each message emitted, contains the set of
|
||||
// peers that sent us the message. This way, we can ensure that we don't waste
|
||||
// bandwidth by re-sending a message to the peer that sent it to us in the
|
||||
// first place. Additionally, the set of stored messages are reset.
|
||||
func (d *deDupedAnnouncements) Emit() []msgWithSenders {
|
||||
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
|
||||
d.Lock()
|
||||
defer d.Unlock()
|
||||
|
||||
@ -1069,21 +1076,24 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders {
|
||||
|
||||
// Create an empty array of lnwire.Messages with a length equal to
|
||||
// the total number of announcements.
|
||||
msgs := make([]msgWithSenders, 0, numAnnouncements)
|
||||
msgs := msgsToBroadcast{
|
||||
localMsgs: make([]msgWithSenders, 0, numAnnouncements),
|
||||
remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
|
||||
}
|
||||
|
||||
// Add the channel announcements to the array first.
|
||||
for _, message := range d.channelAnnouncements {
|
||||
msgs = append(msgs, message)
|
||||
msgs.addMsg(message)
|
||||
}
|
||||
|
||||
// Then add the channel updates.
|
||||
for _, message := range d.channelUpdates {
|
||||
msgs = append(msgs, message)
|
||||
msgs.addMsg(message)
|
||||
}
|
||||
|
||||
// Finally add the node announcements.
|
||||
for _, message := range d.nodeAnnouncements {
|
||||
msgs = append(msgs, message)
|
||||
msgs.addMsg(message)
|
||||
}
|
||||
|
||||
d.reset()
|
||||
@ -1129,26 +1139,83 @@ func splitAnnouncementBatches(subBatchSize int,
|
||||
return splitAnnouncementBatch
|
||||
}
|
||||
|
||||
// sendBatch broadcasts a list of announcements to our peers.
|
||||
func (d *AuthenticatedGossiper) sendBatch(announcementBatch []msgWithSenders) {
|
||||
syncerPeers := d.syncMgr.GossipSyncers()
|
||||
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
|
||||
// split size, and then sends out all items to the set of target peers. If
|
||||
// isLocal is true, then we'll send them to all peers and skip any gossip
|
||||
// filter checks.
|
||||
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(annBatch []msgWithSenders,
|
||||
isLocal bool) {
|
||||
|
||||
// We'll first attempt to filter out this new message
|
||||
// for all peers that have active gossip syncers
|
||||
// active.
|
||||
for _, syncer := range syncerPeers {
|
||||
syncer.FilterGossipMsgs(announcementBatch...)
|
||||
// Next, If we have new things to announce then broadcast them to all
|
||||
// our immediately connected peers.
|
||||
subBatchSize := calculateSubBatchSize(
|
||||
d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
|
||||
d.cfg.MinimumBatchSize, len(annBatch),
|
||||
)
|
||||
|
||||
splitAnnouncementBatch := splitAnnouncementBatches(
|
||||
subBatchSize, annBatch,
|
||||
)
|
||||
|
||||
d.wg.Add(1)
|
||||
go func() {
|
||||
defer d.wg.Done()
|
||||
|
||||
log.Infof("Broadcasting %v new announcements in %d sub "+
|
||||
"batches (local=%v)", len(annBatch),
|
||||
len(splitAnnouncementBatch), isLocal)
|
||||
|
||||
for _, announcementBatch := range splitAnnouncementBatch {
|
||||
d.sendBatch(announcementBatch, isLocal)
|
||||
|
||||
select {
|
||||
case <-time.After(d.cfg.SubBatchDelay):
|
||||
case <-d.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// sendBatch broadcasts a list of announcements to our peers.
|
||||
func (d *AuthenticatedGossiper) sendBatch(annBatch []msgWithSenders,
|
||||
isLocal bool) {
|
||||
|
||||
// If this is a batch of announcements created locally, then we can
|
||||
// skip the filter and deup logic below, and just send the
|
||||
// announcements out to all our coonnected peers.
|
||||
if isLocal {
|
||||
msgsToSend := fn.Map(
|
||||
annBatch, func(m msgWithSenders) lnwire.Message {
|
||||
return m.msg
|
||||
},
|
||||
)
|
||||
err := d.cfg.Broadcast(nil, msgsToSend...)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to send local batch "+
|
||||
"announcements: %v", err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
for _, msgChunk := range announcementBatch {
|
||||
// With the syncers taken care of, we'll merge
|
||||
// the sender map with the set of syncers, so
|
||||
// we don't send out duplicate messages.
|
||||
syncerPeers := d.syncMgr.GossipSyncers()
|
||||
|
||||
// We'll first attempt to filter out this new message for all peers
|
||||
// that have active gossip syncers active.
|
||||
for _, syncer := range syncerPeers {
|
||||
syncer.FilterGossipMsgs(annBatch...)
|
||||
}
|
||||
|
||||
for _, msgChunk := range annBatch {
|
||||
msgChunk := msgChunk
|
||||
|
||||
// With the syncers taken care of, we'll merge the sender map
|
||||
// with the set of syncers, so we don't send out duplicate
|
||||
// messages.
|
||||
msgChunk.mergeSyncerMap(syncerPeers)
|
||||
|
||||
err := d.cfg.Broadcast(
|
||||
msgChunk.senders, msgChunk.msg,
|
||||
)
|
||||
err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to send batch "+
|
||||
"announcements: %v", err)
|
||||
@ -1273,39 +1340,23 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
||||
|
||||
// If the current announcements batch is nil, then we
|
||||
// have no further work here.
|
||||
if len(announcementBatch) == 0 {
|
||||
if announcementBatch.isEmpty() {
|
||||
continue
|
||||
}
|
||||
|
||||
// Next, If we have new things to announce then
|
||||
// broadcast them to all our immediately connected
|
||||
// peers.
|
||||
subBatchSize := calculateSubBatchSize(
|
||||
d.cfg.TrickleDelay, d.cfg.SubBatchDelay, d.cfg.MinimumBatchSize,
|
||||
len(announcementBatch),
|
||||
// At this point, we have the set of local and remote
|
||||
// announcements we want to send out. We'll do the
|
||||
// batching as normal for both, but for local
|
||||
// announcements, we'll blast them out w/o regard for
|
||||
// our peer's policies so we ensure they propagate
|
||||
// properly.
|
||||
d.splitAndSendAnnBatch(
|
||||
announcementBatch.localMsgs, true,
|
||||
)
|
||||
|
||||
splitAnnouncementBatch := splitAnnouncementBatches(
|
||||
subBatchSize, announcementBatch,
|
||||
d.splitAndSendAnnBatch(
|
||||
announcementBatch.remoteMsgs, false,
|
||||
)
|
||||
|
||||
d.wg.Add(1)
|
||||
go func() {
|
||||
defer d.wg.Done()
|
||||
|
||||
log.Infof("Broadcasting %v new announcements in %d sub batches",
|
||||
len(announcementBatch), len(splitAnnouncementBatch))
|
||||
|
||||
for _, announcementBatch := range splitAnnouncementBatch {
|
||||
d.sendBatch(announcementBatch)
|
||||
select {
|
||||
case <-time.After(d.cfg.SubBatchDelay):
|
||||
case <-d.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// The retransmission timer has ticked which indicates that we
|
||||
// should check if we need to prune or re-broadcast any of our
|
||||
// personal channels or node announcement. This addresses the
|
||||
|
@ -1910,31 +1910,37 @@ func TestDeDuplicatedAnnouncements(t *testing.T) {
|
||||
// Ensure that announcement batch delivers channel announcements,
|
||||
// channel updates, and node announcements in proper order.
|
||||
batch := announcements.Emit()
|
||||
if len(batch) != 4 {
|
||||
if batch.length() != 4 {
|
||||
t.Fatal("announcement batch incorrect length")
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(batch[0].msg, ca2) {
|
||||
if !reflect.DeepEqual(batch.localMsgs[0].msg, ca2) {
|
||||
t.Fatalf("channel announcement not first in batch: got %v, "+
|
||||
"expected %v", spew.Sdump(batch[0].msg), spew.Sdump(ca2))
|
||||
"expected %v", spew.Sdump(batch.localMsgs[0].msg),
|
||||
spew.Sdump(ca2))
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(batch[1].msg, ua3) {
|
||||
if !reflect.DeepEqual(batch.localMsgs[1].msg, ua3) {
|
||||
t.Fatalf("channel update not next in batch: got %v, "+
|
||||
"expected %v", spew.Sdump(batch[1].msg), spew.Sdump(ua2))
|
||||
"expected %v", spew.Sdump(batch.localMsgs[1].msg),
|
||||
spew.Sdump(ua2))
|
||||
}
|
||||
|
||||
// We'll ensure that both node announcements are present. We check both
|
||||
// indexes as due to the randomized order of map iteration they may be
|
||||
// in either place.
|
||||
if !reflect.DeepEqual(batch[2].msg, na) && !reflect.DeepEqual(batch[3].msg, na) {
|
||||
if !reflect.DeepEqual(batch.localMsgs[2].msg, na) &&
|
||||
!reflect.DeepEqual(batch.localMsgs[3].msg, na) {
|
||||
|
||||
t.Fatalf("first node announcement not in last part of batch: "+
|
||||
"got %v, expected %v", batch[2].msg,
|
||||
"got %v, expected %v", batch.localMsgs[2].msg,
|
||||
na)
|
||||
}
|
||||
if !reflect.DeepEqual(batch[2].msg, na5) && !reflect.DeepEqual(batch[3].msg, na5) {
|
||||
if !reflect.DeepEqual(batch.localMsgs[2].msg, na5) &&
|
||||
!reflect.DeepEqual(batch.localMsgs[3].msg, na5) {
|
||||
|
||||
t.Fatalf("second node announcement not in last part of batch: "+
|
||||
"got %v, expected %v", batch[3].msg,
|
||||
"got %v, expected %v", batch.localMsgs[3].msg,
|
||||
na5)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user