graph: change ValidationBarrier usage in the builder code

This omits calls to InitJobDependencies, SignalDependants, and
WaitForDependants. These changes have been made here because
the router / builder code does not actually need job dependency
management. Calls to the builder code (i.e. AddNode, AddEdge,
UpdateEdge) are all blocking in the gossiper. This, combined
with the fact that child jobs are run after parent jobs in the
gossiper, means that the calls to the router will happen in the
proper dependency order.
This commit is contained in:
Eugene Siegel 2024-10-14 19:50:05 -04:00
parent 6cabc74c20
commit 2731d09a0b
2 changed files with 6 additions and 118 deletions

View file

@ -3,7 +3,6 @@ package graph
import (
"bytes"
"fmt"
"runtime"
"strings"
"sync"
"sync/atomic"
@ -669,51 +668,21 @@ func (b *Builder) pruneZombieChans() error {
// notifies topology changes, if any.
//
// NOTE: must be run inside goroutine.
func (b *Builder) handleNetworkUpdate(vb *ValidationBarrier,
update *routingMsg) {
func (b *Builder) handleNetworkUpdate(update *routingMsg) {
defer b.wg.Done()
defer vb.CompleteJob()
// If this message has an existing dependency, then we'll wait until
// that has been fully validated before we proceed.
err := vb.WaitForDependants(update.msg)
if err != nil {
switch {
case IsError(err, ErrVBarrierShuttingDown):
update.err <- err
case IsError(err, ErrParentValidationFailed):
update.err <- NewErrf(ErrIgnored, err.Error()) //nolint
default:
log.Warnf("unexpected error during validation "+
"barrier shutdown: %v", err)
update.err <- err
}
return
}
// Process the routing update to determine if this is either a new
// update from our PoV or an update to a prior vertex/edge we
// previously accepted.
err = b.processUpdate(update.msg, update.op...)
err := b.processUpdate(update.msg, update.op...)
update.err <- err
// If this message had any dependencies, then we can now signal them to
// continue.
allowDependents := err == nil || IsError(err, ErrIgnored, ErrOutdated)
vb.SignalDependants(update.msg, allowDependents)
// If the error is not nil here, there's no need to send topology
// change.
if err != nil {
// We now decide to log an error or not. If allowDependents is
// false, it means there is an error and the error is neither
// ErrIgnored or ErrOutdated. In this case, we'll log an error.
// Otherwise, we'll add debug log only.
if allowDependents {
// Log as a debug message if this is not an error we need to be
// concerned about.
if IsError(err, ErrIgnored, ErrOutdated) {
log.Debugf("process network updates got: %v", err)
} else {
log.Errorf("process network updates got: %v", err)
@ -753,31 +722,6 @@ func (b *Builder) networkHandler() {
b.stats.Reset()
// We'll use this validation barrier to ensure that we process all jobs
// in the proper order during parallel validation.
//
// NOTE: For AssumeChannelValid, we bump up the maximum number of
// concurrent validation requests since there are no blocks being
// fetched. This significantly increases the performance of IGD for
// neutrino nodes.
//
// However, we dial back to use multiple of the number of cores when
// fully validating, to avoid fetching up to 1000 blocks from the
// backend. On bitcoind, this will empirically cause massive latency
// spikes when executing this many concurrent RPC calls. Critical
// subsystems or basic rpc calls that rely on calls such as GetBestBlock
// will hang due to excessive load.
//
// See https://github.com/lightningnetwork/lnd/issues/4892.
var validationBarrier *ValidationBarrier
if b.cfg.AssumeChannelValid {
validationBarrier = NewValidationBarrier(1000, b.quit)
} else {
validationBarrier = NewValidationBarrier(
4*runtime.NumCPU(), b.quit,
)
}
for {
// If there are stats, resume the statTicker.
if !b.stats.Empty() {
@ -789,13 +733,8 @@ func (b *Builder) networkHandler() {
// result we'll modify the channel graph accordingly depending
// on the exact type of the message.
case update := <-b.networkUpdates:
// We'll set up any dependants, and wait until a free
// slot for this job opens up, this allows us to not
// have thousands of goroutines active.
validationBarrier.InitJobDependencies(update.msg)
b.wg.Add(1)
go b.handleNetworkUpdate(validationBarrier, update)
go b.handleNetworkUpdate(update)
// TODO(roasbeef): remove all unconnected vertexes
// after N blocks pass with no corresponding

View file

@ -4,7 +4,6 @@ import (
"fmt"
"sync"
"github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)
@ -125,33 +124,14 @@ func (v *ValidationBarrier) InitJobDependencies(job interface{}) {
v.nodeAnnDependencies[route.Vertex(msg.NodeID1)] = signals
v.nodeAnnDependencies[route.Vertex(msg.NodeID2)] = signals
}
case *models.ChannelEdgeInfo:
shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
if _, ok := v.chanAnnFinSignal[shortID]; !ok {
signals := &validationSignals{
allow: make(chan struct{}),
deny: make(chan struct{}),
}
v.chanAnnFinSignal[shortID] = signals
v.chanEdgeDependencies[shortID] = signals
v.nodeAnnDependencies[route.Vertex(msg.NodeKey1Bytes)] = signals
v.nodeAnnDependencies[route.Vertex(msg.NodeKey2Bytes)] = signals
}
// These other types don't have any dependants, so no further
// initialization needs to be done beyond just occupying a job slot.
case *models.ChannelEdgePolicy:
return
case *lnwire.ChannelUpdate1:
return
case *lnwire.NodeAnnouncement:
// TODO(roasbeef): node ann needs to wait on existing channel updates
return
case *models.LightningNode:
return
case *lnwire.AnnounceSignatures1:
// TODO(roasbeef): need to wait on chan ann?
return
@ -187,20 +167,6 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error {
switch msg := job.(type) {
// Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the
// completion of any active ChannelAnnouncement jobs related to them.
case *models.ChannelEdgePolicy:
shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
signals, ok = v.chanEdgeDependencies[shortID]
jobDesc = fmt.Sprintf("job=lnwire.ChannelEdgePolicy, scid=%v",
msg.ChannelID)
case *models.LightningNode:
vertex := route.Vertex(msg.PubKeyBytes)
signals, ok = v.nodeAnnDependencies[vertex]
jobDesc = fmt.Sprintf("job=channeldb.LightningNode, pub=%s",
vertex)
case *lnwire.ChannelUpdate1:
signals, ok = v.chanEdgeDependencies[msg.ShortChannelID]
@ -217,7 +183,6 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error {
// return directly.
case *lnwire.AnnounceSignatures1:
// TODO(roasbeef): need to wait on chan ann?
case *models.ChannelEdgeInfo:
case *lnwire.ChannelAnnouncement1:
}
@ -263,17 +228,6 @@ func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) {
// If we've just finished executing a ChannelAnnouncement, then we'll
// close out the signal, and remove the signal from the map of active
// ones. This will allow/deny any dependent jobs to continue execution.
case *models.ChannelEdgeInfo:
shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
finSignals, ok := v.chanAnnFinSignal[shortID]
if ok {
if allow {
close(finSignals.allow)
} else {
close(finSignals.deny)
}
delete(v.chanAnnFinSignal, shortID)
}
case *lnwire.ChannelAnnouncement1:
finSignals, ok := v.chanAnnFinSignal[msg.ShortChannelID]
if ok {
@ -290,15 +244,10 @@ func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) {
// For all other job types, we'll delete the tracking entries from the
// map, as if we reach this point, then all dependants have already
// finished executing and we can proceed.
case *models.LightningNode:
delete(v.nodeAnnDependencies, route.Vertex(msg.PubKeyBytes))
case *lnwire.NodeAnnouncement:
delete(v.nodeAnnDependencies, route.Vertex(msg.NodeID))
case *lnwire.ChannelUpdate1:
delete(v.chanEdgeDependencies, msg.ShortChannelID)
case *models.ChannelEdgePolicy:
shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
delete(v.chanEdgeDependencies, shortID)
case *lnwire.AnnounceSignatures1:
return