diff --git a/graph/builder.go b/graph/builder.go index 1aa948800..632156282 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -3,7 +3,6 @@ package graph import ( "bytes" "fmt" - "runtime" "strings" "sync" "sync/atomic" @@ -669,51 +668,21 @@ func (b *Builder) pruneZombieChans() error { // notifies topology changes, if any. // // NOTE: must be run inside goroutine. -func (b *Builder) handleNetworkUpdate(vb *ValidationBarrier, - update *routingMsg) { - +func (b *Builder) handleNetworkUpdate(update *routingMsg) { defer b.wg.Done() - defer vb.CompleteJob() - - // If this message has an existing dependency, then we'll wait until - // that has been fully validated before we proceed. - err := vb.WaitForDependants(update.msg) - if err != nil { - switch { - case IsError(err, ErrVBarrierShuttingDown): - update.err <- err - - case IsError(err, ErrParentValidationFailed): - update.err <- NewErrf(ErrIgnored, err.Error()) //nolint - - default: - log.Warnf("unexpected error during validation "+ - "barrier shutdown: %v", err) - update.err <- err - } - - return - } // Process the routing update to determine if this is either a new // update from our PoV or an update to a prior vertex/edge we // previously accepted. - err = b.processUpdate(update.msg, update.op...) + err := b.processUpdate(update.msg, update.op...) update.err <- err - // If this message had any dependencies, then we can now signal them to - // continue. - allowDependents := err == nil || IsError(err, ErrIgnored, ErrOutdated) - vb.SignalDependants(update.msg, allowDependents) - // If the error is not nil here, there's no need to send topology // change. if err != nil { - // We now decide to log an error or not. If allowDependents is - // false, it means there is an error and the error is neither - // ErrIgnored or ErrOutdated. In this case, we'll log an error. - // Otherwise, we'll add debug log only. - if allowDependents { + // Log as a debug message if this is not an error we need to be + // concerned about. + if IsError(err, ErrIgnored, ErrOutdated) { log.Debugf("process network updates got: %v", err) } else { log.Errorf("process network updates got: %v", err) @@ -753,31 +722,6 @@ func (b *Builder) networkHandler() { b.stats.Reset() - // We'll use this validation barrier to ensure that we process all jobs - // in the proper order during parallel validation. - // - // NOTE: For AssumeChannelValid, we bump up the maximum number of - // concurrent validation requests since there are no blocks being - // fetched. This significantly increases the performance of IGD for - // neutrino nodes. - // - // However, we dial back to use multiple of the number of cores when - // fully validating, to avoid fetching up to 1000 blocks from the - // backend. On bitcoind, this will empirically cause massive latency - // spikes when executing this many concurrent RPC calls. Critical - // subsystems or basic rpc calls that rely on calls such as GetBestBlock - // will hang due to excessive load. - // - // See https://github.com/lightningnetwork/lnd/issues/4892. - var validationBarrier *ValidationBarrier - if b.cfg.AssumeChannelValid { - validationBarrier = NewValidationBarrier(1000, b.quit) - } else { - validationBarrier = NewValidationBarrier( - 4*runtime.NumCPU(), b.quit, - ) - } - for { // If there are stats, resume the statTicker. if !b.stats.Empty() { @@ -789,13 +733,8 @@ func (b *Builder) networkHandler() { // result we'll modify the channel graph accordingly depending // on the exact type of the message. case update := <-b.networkUpdates: - // We'll set up any dependants, and wait until a free - // slot for this job opens up, this allows us to not - // have thousands of goroutines active. - validationBarrier.InitJobDependencies(update.msg) - b.wg.Add(1) - go b.handleNetworkUpdate(validationBarrier, update) + go b.handleNetworkUpdate(update) // TODO(roasbeef): remove all unconnected vertexes // after N blocks pass with no corresponding diff --git a/graph/validation_barrier.go b/graph/validation_barrier.go index a97709605..a5a8da8bb 100644 --- a/graph/validation_barrier.go +++ b/graph/validation_barrier.go @@ -4,7 +4,6 @@ import ( "fmt" "sync" - "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) @@ -125,33 +124,14 @@ func (v *ValidationBarrier) InitJobDependencies(job interface{}) { v.nodeAnnDependencies[route.Vertex(msg.NodeID1)] = signals v.nodeAnnDependencies[route.Vertex(msg.NodeID2)] = signals } - case *models.ChannelEdgeInfo: - - shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - if _, ok := v.chanAnnFinSignal[shortID]; !ok { - signals := &validationSignals{ - allow: make(chan struct{}), - deny: make(chan struct{}), - } - - v.chanAnnFinSignal[shortID] = signals - v.chanEdgeDependencies[shortID] = signals - - v.nodeAnnDependencies[route.Vertex(msg.NodeKey1Bytes)] = signals - v.nodeAnnDependencies[route.Vertex(msg.NodeKey2Bytes)] = signals - } // These other types don't have any dependants, so no further // initialization needs to be done beyond just occupying a job slot. - case *models.ChannelEdgePolicy: - return case *lnwire.ChannelUpdate1: return case *lnwire.NodeAnnouncement: // TODO(roasbeef): node ann needs to wait on existing channel updates return - case *models.LightningNode: - return case *lnwire.AnnounceSignatures1: // TODO(roasbeef): need to wait on chan ann? return @@ -187,20 +167,6 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { switch msg := job.(type) { // Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the // completion of any active ChannelAnnouncement jobs related to them. - case *models.ChannelEdgePolicy: - shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - signals, ok = v.chanEdgeDependencies[shortID] - - jobDesc = fmt.Sprintf("job=lnwire.ChannelEdgePolicy, scid=%v", - msg.ChannelID) - - case *models.LightningNode: - vertex := route.Vertex(msg.PubKeyBytes) - signals, ok = v.nodeAnnDependencies[vertex] - - jobDesc = fmt.Sprintf("job=channeldb.LightningNode, pub=%s", - vertex) - case *lnwire.ChannelUpdate1: signals, ok = v.chanEdgeDependencies[msg.ShortChannelID] @@ -217,7 +183,6 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { // return directly. case *lnwire.AnnounceSignatures1: // TODO(roasbeef): need to wait on chan ann? - case *models.ChannelEdgeInfo: case *lnwire.ChannelAnnouncement1: } @@ -263,17 +228,6 @@ func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) { // If we've just finished executing a ChannelAnnouncement, then we'll // close out the signal, and remove the signal from the map of active // ones. This will allow/deny any dependent jobs to continue execution. - case *models.ChannelEdgeInfo: - shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - finSignals, ok := v.chanAnnFinSignal[shortID] - if ok { - if allow { - close(finSignals.allow) - } else { - close(finSignals.deny) - } - delete(v.chanAnnFinSignal, shortID) - } case *lnwire.ChannelAnnouncement1: finSignals, ok := v.chanAnnFinSignal[msg.ShortChannelID] if ok { @@ -290,15 +244,10 @@ func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) { // For all other job types, we'll delete the tracking entries from the // map, as if we reach this point, then all dependants have already // finished executing and we can proceed. - case *models.LightningNode: - delete(v.nodeAnnDependencies, route.Vertex(msg.PubKeyBytes)) case *lnwire.NodeAnnouncement: delete(v.nodeAnnDependencies, route.Vertex(msg.NodeID)) case *lnwire.ChannelUpdate1: delete(v.chanEdgeDependencies, msg.ShortChannelID) - case *models.ChannelEdgePolicy: - shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - delete(v.chanEdgeDependencies, shortID) case *lnwire.AnnounceSignatures1: return