discovery: add method handleNetworkMessages to process messages

This commit is contained in:
yyforyongyu 2022-11-22 05:14:07 +08:00
parent 29c2458831
commit b237dbfd74
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868

View File

@ -1231,73 +1231,10 @@ func (d *AuthenticatedGossiper) networkHandler() {
validationBarrier.InitJobDependencies(announcement.msg) validationBarrier.InitJobDependencies(announcement.msg)
d.wg.Add(1) d.wg.Add(1)
go func() { go d.handleNetworkMessages(
defer d.wg.Done() announcement, &announcements,
defer validationBarrier.CompleteJob() validationBarrier, shouldBroadcast,
)
// If this message has an existing dependency,
// then we'll wait until that has been fully
// validated before we proceed.
err := validationBarrier.WaitForDependants(
announcement.msg,
)
if err != nil {
log.Debugf("Validating network message %s got err: %v",
announcement.msg.MsgType(), err)
if !routing.IsError(
err,
routing.ErrVBarrierShuttingDown,
routing.ErrParentValidationFailed,
) {
log.Warnf("unexpected error "+
"during validation "+
"barrier shutdown: %v",
err)
}
announcement.err <- err
return
}
// Process the network announcement to
// determine if this is either a new
// announcement from our PoV or an edges to a
// prior vertex/edge we previously proceeded.
emittedAnnouncements, allowDependents := d.processNetworkAnnouncement(
announcement,
)
log.Tracef("Processed network message %s, "+
"returned len(announcements)=%v, "+
"allowDependents=%v",
announcement.msg.MsgType(),
len(emittedAnnouncements),
allowDependents)
// If this message had any dependencies, then
// we can now signal them to continue.
validationBarrier.SignalDependants(
announcement.msg, allowDependents,
)
// If the announcement was accepted, then add
// the emitted announcements to our announce
// batch to be broadcast once the trickle timer
// ticks gain.
if emittedAnnouncements != nil && shouldBroadcast {
// TODO(roasbeef): exclude peer that
// sent.
announcements.AddMsgs(
emittedAnnouncements...,
)
} else if emittedAnnouncements != nil {
log.Trace("Skipping broadcast of " +
"announcements received " +
"during initial graph sync")
}
}()
// The trickle timer has ticked, which indicates we should // The trickle timer has ticked, which indicates we should
// flush to the network the pending batch of new announcements // flush to the network the pending batch of new announcements
@ -1362,6 +1299,64 @@ func (d *AuthenticatedGossiper) networkHandler() {
} }
} }
// handleNetworkMessages is responsible for waiting for dependencies for a
// given network message and processing the message. Once processed, it will
// signal its dependants and add the new announcements to the announce batch.
//
// NOTE: must be run as a goroutine.
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
deDuped *deDupedAnnouncements, vb *routing.ValidationBarrier,
shouldBroadcast bool) {
defer d.wg.Done()
defer vb.CompleteJob()
// If this message has an existing dependency, then we'll wait until
// that has been fully validated before we proceed.
err := vb.WaitForDependants(nMsg.msg)
if err != nil {
log.Debugf("Validating network message %s got err: %v",
nMsg.msg.MsgType(), err)
if !routing.IsError(
err,
routing.ErrVBarrierShuttingDown,
routing.ErrParentValidationFailed,
) {
log.Warnf("unexpected error during validation "+
"barrier shutdown: %v", err)
}
nMsg.err <- err
return
}
// Process the network announcement to determine if this is either a
// new announcement from our PoV or an edges to a prior vertex/edge we
// previously proceeded.
newAnns, allow := d.processNetworkAnnouncement(nMsg)
log.Tracef("Processed network message %s, returned "+
"len(announcements)=%v, allowDependents=%v",
nMsg.msg.MsgType(), len(newAnns), allow)
// If this message had any dependencies, then we can now signal them to
// continue.
vb.SignalDependants(nMsg.msg, allow)
// If the announcement was accepted, then add the emitted announcements
// to our announce batch to be broadcast once the trickle timer ticks
// gain.
if newAnns != nil && shouldBroadcast {
// TODO(roasbeef): exclude peer that sent.
deDuped.AddMsgs(newAnns...)
} else if newAnns != nil {
log.Trace("Skipping broadcast of announcements received " +
"during initial graph sync")
}
}
// TODO(roasbeef): d/c peers that send updates not on our chain // TODO(roasbeef): d/c peers that send updates not on our chain
// InitSyncState is called by outside sub-systems when a connection is // InitSyncState is called by outside sub-systems when a connection is