discovery: send both local and remote anns in the same goroutine

This commit changes the sending of anns from using separate goroutines
to always sending both local and remote announcements in the same
goroutine. In addition, the local announcements are always sent first.
This change is to fix the following case:

1. Alice and Bob have a channel
2. Alice receives Bob's NodeAnnouncement
3. Alice goes to broadcast the channel
4. The broadcast is split into a local and remote broadcast due to PR
   #7239. Bob's NodeAnnouncement is in the remote batch. Everything else
   (ChannelAnnouncement, ChannelUpdate x2, and Alice's NodeAnnouncement)
   is in the local batch.
5. The remote batch (containing Bob's NodeAnnouncement) runs before the
   local batch since they are spawned in separate goroutines. This means
   that Alice sends Carol the NodeAnnouncement before Carol knows of the
   channel.

In step 2), Bob's NodeAnnouncement (isRemote = true) replaces Bob's
NodeAnnouncement that Alice was going to relay (isRemote = false) after
processing the AnnouncementSignatures.
This commit is contained in:
yyforyongyu 2023-02-02 19:05:56 +08:00
parent b73cfc5998
commit e34a088608
No known key found for this signature in database
GPG key ID: 9BCD95C4FF296868
2 changed files with 39 additions and 28 deletions

View file

@ -1154,35 +1154,49 @@ func (d *AuthenticatedGossiper) splitAnnouncementBatches(
}
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
// split size, and then sends out all items to the set of target peers. If
// isLocal is true, then we'll send them to all peers and skip any gossip
// filter checks.
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(annBatch []msgWithSenders,
isLocal bool) {
splitAnnouncementBatch := d.splitAnnouncementBatches(annBatch)
d.wg.Add(1)
go func() {
defer d.wg.Done()
log.Infof("Broadcasting %v new announcements in %d sub "+
"batches (local=%v)", len(annBatch),
len(splitAnnouncementBatch), isLocal)
for _, announcementBatch := range splitAnnouncementBatch {
if isLocal {
d.sendLocalBatch(announcementBatch)
} else {
d.sendRemoteBatch(announcementBatch)
}
// split size, and then sends out all items to the set of target peers. Locally
// generated announcements are always sent before remotely generated
// announcements.
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(
annBatch msgsToBroadcast) {
// delayNextBatch is a helper closure that blocks for `SubBatchDelay`
// duration to delay the sending of next announcement batch.
delayNextBatch := func() {
select {
case <-time.After(d.cfg.SubBatchDelay):
case <-d.quit:
return
}
}
// Fetch the local and remote announcements.
localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
d.wg.Add(1)
go func() {
defer d.wg.Done()
log.Debugf("Broadcasting %v new local announcements in %d "+
"sub batches", len(annBatch.localMsgs),
len(localBatches))
// Send out the local announcements first.
for _, annBatch := range localBatches {
d.sendLocalBatch(annBatch)
delayNextBatch()
}
log.Debugf("Broadcasting %v new remote announcements in %d "+
"sub batches", len(annBatch.remoteMsgs),
len(remoteBatches))
// Now send the remote announcements.
for _, annBatch := range remoteBatches {
d.sendRemoteBatch(annBatch)
delayNextBatch()
}
}()
}
@ -1354,12 +1368,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
// announcements, we'll blast them out w/o regard for
// our peer's policies so we ensure they propagate
// properly.
d.splitAndSendAnnBatch(
announcementBatch.localMsgs, true,
)
d.splitAndSendAnnBatch(
announcementBatch.remoteMsgs, false,
)
d.splitAndSendAnnBatch(announcementBatch)
// The retransmission timer has ticked which indicates that we
// should check if we need to prune or re-broadcast any of our

View file

@ -784,7 +784,7 @@ func createTestCtx(t *testing.T, startHeight uint32) (*testCtx, error) {
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
NumActiveSyncers: 3,
AnnSigner: &mock.SingleSigner{Privkey: selfKeyPriv},
SubBatchDelay: time.Second * 5,
SubBatchDelay: 1 * time.Millisecond,
MinimumBatchSize: 10,
MaxChannelUpdateBurst: DefaultMaxChannelUpdateBurst,
ChannelUpdateInterval: DefaultChannelUpdateInterval,
@ -3371,7 +3371,9 @@ out:
case <-sentMsgs:
case err := <-notifyErr:
t.Fatal(err)
default:
// Give it 5 seconds to drain out.
case <-time.After(5 * time.Second):
break out
}
}