diff --git a/discovery/gossiper.go b/discovery/gossiper.go index e1fa53843..f8510e93a 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -18,6 +18,7 @@ import ( "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnpeer" @@ -1035,7 +1036,8 @@ type msgsToBroadcast struct { // localMsgs is the set of messages we created locally. localMsgs []msgWithSenders - // remoteMsgs is the set of messages that we received from a remote party. + // remoteMsgs is the set of messages that we received from a remote + // party. remoteMsgs []msgWithSenders } @@ -1053,13 +1055,18 @@ func (m *msgsToBroadcast) isEmpty() bool { return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0 } +// length returns the length of the combined message set. +func (m *msgsToBroadcast) length() int { + return len(m.localMsgs) + len(m.remoteMsgs) +} + // Emit returns the set of de-duplicated announcements to be sent out during // the next announcement epoch, in the order of channel announcements, channel // updates, and node announcements. Each message emitted, contains the set of // peers that sent us the message. This way, we can ensure that we don't waste // bandwidth by re-sending a message to the peer that sent it to us in the // first place. Additionally, the set of stored messages are reset. -func (d *deDupedAnnouncements) Emit() []msgWithSenders { +func (d *deDupedAnnouncements) Emit() msgsToBroadcast { d.Lock() defer d.Unlock() @@ -1069,21 +1076,24 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders { // Create an empty array of lnwire.Messages with a length equal to // the total number of announcements. - msgs := make([]msgWithSenders, 0, numAnnouncements) + msgs := msgsToBroadcast{ + localMsgs: make([]msgWithSenders, 0, numAnnouncements), + remoteMsgs: make([]msgWithSenders, 0, numAnnouncements), + } // Add the channel announcements to the array first. for _, message := range d.channelAnnouncements { - msgs = append(msgs, message) + msgs.addMsg(message) } // Then add the channel updates. for _, message := range d.channelUpdates { - msgs = append(msgs, message) + msgs.addMsg(message) } // Finally add the node announcements. for _, message := range d.nodeAnnouncements { - msgs = append(msgs, message) + msgs.addMsg(message) } d.reset() @@ -1129,26 +1139,83 @@ func splitAnnouncementBatches(subBatchSize int, return splitAnnouncementBatch } -// sendBatch broadcasts a list of announcements to our peers. -func (d *AuthenticatedGossiper) sendBatch(announcementBatch []msgWithSenders) { - syncerPeers := d.syncMgr.GossipSyncers() +// splitAndSendAnnBatch takes a batch of messages, computes the proper batch +// split size, and then sends out all items to the set of target peers. If +// isLocal is true, then we'll send them to all peers and skip any gossip +// filter checks. +func (d *AuthenticatedGossiper) splitAndSendAnnBatch(annBatch []msgWithSenders, + isLocal bool) { - // We'll first attempt to filter out this new message - // for all peers that have active gossip syncers - // active. - for _, syncer := range syncerPeers { - syncer.FilterGossipMsgs(announcementBatch...) + // Next, If we have new things to announce then broadcast them to all + // our immediately connected peers. + subBatchSize := calculateSubBatchSize( + d.cfg.TrickleDelay, d.cfg.SubBatchDelay, + d.cfg.MinimumBatchSize, len(annBatch), + ) + + splitAnnouncementBatch := splitAnnouncementBatches( + subBatchSize, annBatch, + ) + + d.wg.Add(1) + go func() { + defer d.wg.Done() + + log.Infof("Broadcasting %v new announcements in %d sub "+ + "batches (local=%v)", len(annBatch), + len(splitAnnouncementBatch), isLocal) + + for _, announcementBatch := range splitAnnouncementBatch { + d.sendBatch(announcementBatch, isLocal) + + select { + case <-time.After(d.cfg.SubBatchDelay): + case <-d.quit: + return + } + } + }() +} + +// sendBatch broadcasts a list of announcements to our peers. +func (d *AuthenticatedGossiper) sendBatch(annBatch []msgWithSenders, + isLocal bool) { + + // If this is a batch of announcements created locally, then we can + // skip the filter and deup logic below, and just send the + // announcements out to all our coonnected peers. + if isLocal { + msgsToSend := fn.Map( + annBatch, func(m msgWithSenders) lnwire.Message { + return m.msg + }, + ) + err := d.cfg.Broadcast(nil, msgsToSend...) + if err != nil { + log.Errorf("Unable to send local batch "+ + "announcements: %v", err) + } + + return } - for _, msgChunk := range announcementBatch { - // With the syncers taken care of, we'll merge - // the sender map with the set of syncers, so - // we don't send out duplicate messages. + syncerPeers := d.syncMgr.GossipSyncers() + + // We'll first attempt to filter out this new message for all peers + // that have active gossip syncers active. + for _, syncer := range syncerPeers { + syncer.FilterGossipMsgs(annBatch...) + } + + for _, msgChunk := range annBatch { + msgChunk := msgChunk + + // With the syncers taken care of, we'll merge the sender map + // with the set of syncers, so we don't send out duplicate + // messages. msgChunk.mergeSyncerMap(syncerPeers) - err := d.cfg.Broadcast( - msgChunk.senders, msgChunk.msg, - ) + err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg) if err != nil { log.Errorf("Unable to send batch "+ "announcements: %v", err) @@ -1273,39 +1340,23 @@ func (d *AuthenticatedGossiper) networkHandler() { // If the current announcements batch is nil, then we // have no further work here. - if len(announcementBatch) == 0 { + if announcementBatch.isEmpty() { continue } - // Next, If we have new things to announce then - // broadcast them to all our immediately connected - // peers. - subBatchSize := calculateSubBatchSize( - d.cfg.TrickleDelay, d.cfg.SubBatchDelay, d.cfg.MinimumBatchSize, - len(announcementBatch), + // At this point, we have the set of local and remote + // announcements we want to send out. We'll do the + // batching as normal for both, but for local + // announcements, we'll blast them out w/o regard for + // our peer's policies so we ensure they propagate + // properly. + d.splitAndSendAnnBatch( + announcementBatch.localMsgs, true, ) - - splitAnnouncementBatch := splitAnnouncementBatches( - subBatchSize, announcementBatch, + d.splitAndSendAnnBatch( + announcementBatch.remoteMsgs, false, ) - d.wg.Add(1) - go func() { - defer d.wg.Done() - - log.Infof("Broadcasting %v new announcements in %d sub batches", - len(announcementBatch), len(splitAnnouncementBatch)) - - for _, announcementBatch := range splitAnnouncementBatch { - d.sendBatch(announcementBatch) - select { - case <-time.After(d.cfg.SubBatchDelay): - case <-d.quit: - return - } - } - }() - // The retransmission timer has ticked which indicates that we // should check if we need to prune or re-broadcast any of our // personal channels or node announcement. This addresses the diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index a426620fe..4e220896d 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -1910,31 +1910,37 @@ func TestDeDuplicatedAnnouncements(t *testing.T) { // Ensure that announcement batch delivers channel announcements, // channel updates, and node announcements in proper order. batch := announcements.Emit() - if len(batch) != 4 { + if batch.length() != 4 { t.Fatal("announcement batch incorrect length") } - if !reflect.DeepEqual(batch[0].msg, ca2) { + if !reflect.DeepEqual(batch.localMsgs[0].msg, ca2) { t.Fatalf("channel announcement not first in batch: got %v, "+ - "expected %v", spew.Sdump(batch[0].msg), spew.Sdump(ca2)) + "expected %v", spew.Sdump(batch.localMsgs[0].msg), + spew.Sdump(ca2)) } - if !reflect.DeepEqual(batch[1].msg, ua3) { + if !reflect.DeepEqual(batch.localMsgs[1].msg, ua3) { t.Fatalf("channel update not next in batch: got %v, "+ - "expected %v", spew.Sdump(batch[1].msg), spew.Sdump(ua2)) + "expected %v", spew.Sdump(batch.localMsgs[1].msg), + spew.Sdump(ua2)) } // We'll ensure that both node announcements are present. We check both // indexes as due to the randomized order of map iteration they may be // in either place. - if !reflect.DeepEqual(batch[2].msg, na) && !reflect.DeepEqual(batch[3].msg, na) { + if !reflect.DeepEqual(batch.localMsgs[2].msg, na) && + !reflect.DeepEqual(batch.localMsgs[3].msg, na) { + t.Fatalf("first node announcement not in last part of batch: "+ - "got %v, expected %v", batch[2].msg, + "got %v, expected %v", batch.localMsgs[2].msg, na) } - if !reflect.DeepEqual(batch[2].msg, na5) && !reflect.DeepEqual(batch[3].msg, na5) { + if !reflect.DeepEqual(batch.localMsgs[2].msg, na5) && + !reflect.DeepEqual(batch.localMsgs[3].msg, na5) { + t.Fatalf("second node announcement not in last part of batch: "+ - "got %v, expected %v", batch[3].msg, + "got %v, expected %v", batch.localMsgs[3].msg, na5) }