From 276b335cf5dae5cbf2cf4e40c168d8c96bccc843 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 5 Feb 2025 09:39:33 +0200 Subject: [PATCH] graph: refactor announcement handling logic In this commit, we remove the `processUpdate` method which handles each announement type (node, channel, channel update) in a separate switch case. Each of these cases currently has a non-trivial amount of code. This commit creates separate methods for each message type we want to handle instead. This removes a level of indentation and will make things easier to review when we start editing the code for each handler. --- graph/builder.go | 605 ++++++++++++++++++++++++----------------------- 1 file changed, 307 insertions(+), 298 deletions(-) diff --git a/graph/builder.go b/graph/builder.go index 632156282..3b88cf947 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -674,7 +674,20 @@ func (b *Builder) handleNetworkUpdate(update *routingMsg) { // Process the routing update to determine if this is either a new // update from our PoV or an update to a prior vertex/edge we // previously accepted. - err := b.processUpdate(update.msg, update.op...) + var err error + switch msg := update.msg.(type) { + case *models.LightningNode: + err = b.addNode(msg, update.op...) + + case *models.ChannelEdgeInfo: + err = b.addEdge(msg, update.op...) + + case *models.ChannelEdgePolicy: + err = b.updateEdge(msg, update.op...) + + default: + err = errors.Errorf("wrong routing update message type") + } update.err <- err // If the error is not nil here, there's no need to send topology @@ -1094,303 +1107,6 @@ func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte, chanFeatures []byte, return legacyFundingScript() } -// processUpdate processes a new relate authenticated channel/edge, node or -// channel/edge update network update. If the update didn't affect the internal -// state of the draft due to either being out of date, invalid, or redundant, -// then error is returned. -// -//nolint:funlen -func (b *Builder) processUpdate(msg interface{}, - op ...batch.SchedulerOption) error { - - switch msg := msg.(type) { - case *models.LightningNode: - // Before we add the node to the database, we'll check to see - // if the announcement is "fresh" or not. If it isn't, then - // we'll return an error. - err := b.assertNodeAnnFreshness(msg.PubKeyBytes, msg.LastUpdate) - if err != nil { - return err - } - - if err := b.cfg.Graph.AddLightningNode(msg, op...); err != nil { - return errors.Errorf("unable to add node %x to the "+ - "graph: %v", msg.PubKeyBytes, err) - } - - log.Tracef("Updated vertex data for node=%x", msg.PubKeyBytes) - b.stats.incNumNodeUpdates() - - case *models.ChannelEdgeInfo: - log.Debugf("Received ChannelEdgeInfo for channel %v", - msg.ChannelID) - - // Prior to processing the announcement we first check if we - // already know of this channel, if so, then we can exit early. - _, _, exists, isZombie, err := b.cfg.Graph.HasChannelEdge( - msg.ChannelID, - ) - if err != nil && - !errors.Is(err, graphdb.ErrGraphNoEdgesFound) { - - return errors.Errorf("unable to check for edge "+ - "existence: %v", err) - } - if isZombie { - return NewErrf(ErrIgnored, "ignoring msg for zombie "+ - "chan_id=%v", msg.ChannelID) - } - if exists { - return NewErrf(ErrIgnored, "ignoring msg for known "+ - "chan_id=%v", msg.ChannelID) - } - - // If AssumeChannelValid is present, then we are unable to - // perform any of the expensive checks below, so we'll - // short-circuit our path straight to adding the edge to our - // graph. If the passed ShortChannelID is an alias, then we'll - // skip validation as it will not map to a legitimate tx. This - // is not a DoS vector as only we can add an alias - // ChannelAnnouncement from the gossiper. - scid := lnwire.NewShortChanIDFromInt(msg.ChannelID) - if b.cfg.AssumeChannelValid || b.cfg.IsAlias(scid) { - err := b.cfg.Graph.AddChannelEdge(msg, op...) - if err != nil { - return fmt.Errorf("unable to add edge: %w", err) - } - log.Tracef("New channel discovered! Link "+ - "connects %x and %x with ChannelID(%v)", - msg.NodeKey1Bytes, msg.NodeKey2Bytes, - msg.ChannelID) - b.stats.incNumEdgesDiscovered() - - break - } - - // Before we can add the channel to the channel graph, we need - // to obtain the full funding outpoint that's encoded within - // the channel ID. - channelID := lnwire.NewShortChanIDFromInt(msg.ChannelID) - fundingTx, err := lnwallet.FetchFundingTxWrapper( - b.cfg.Chain, &channelID, b.quit, - ) - if err != nil { - //nolint:ll - // - // In order to ensure we don't erroneously mark a - // channel as a zombie due to an RPC failure, we'll - // attempt to string match for the relevant errors. - // - // * btcd: - // * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316 - // * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086 - // * bitcoind: - // * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770 - // * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954 - switch { - case strings.Contains(err.Error(), "not found"): - fallthrough - - case strings.Contains(err.Error(), "out of range"): - // If the funding transaction isn't found at - // all, then we'll mark the edge itself as a - // zombie so we don't continue to request it. - // We use the "zero key" for both node pubkeys - // so this edge can't be resurrected. - zErr := b.addZombieEdge(msg.ChannelID) - if zErr != nil { - return zErr - } - - default: - } - - return NewErrf(ErrNoFundingTransaction, "unable to "+ - "locate funding tx: %v", err) - } - - // Recreate witness output to be sure that declared in channel - // edge bitcoin keys and channel value corresponds to the - // reality. - fundingPkScript, err := makeFundingScript( - msg.BitcoinKey1Bytes[:], msg.BitcoinKey2Bytes[:], - msg.Features, msg.TapscriptRoot, - ) - if err != nil { - return err - } - - // Next we'll validate that this channel is actually well - // formed. If this check fails, then this channel either - // doesn't exist, or isn't the one that was meant to be created - // according to the passed channel proofs. - fundingPoint, err := chanvalidate.Validate( - &chanvalidate.Context{ - Locator: &chanvalidate.ShortChanIDChanLocator{ - ID: channelID, - }, - MultiSigPkScript: fundingPkScript, - FundingTx: fundingTx, - }, - ) - if err != nil { - // Mark the edge as a zombie so we won't try to - // re-validate it on start up. - if err := b.addZombieEdge(msg.ChannelID); err != nil { - return err - } - - return NewErrf(ErrInvalidFundingOutput, "output "+ - "failed validation: %w", err) - } - - // Now that we have the funding outpoint of the channel, ensure - // that it hasn't yet been spent. If so, then this channel has - // been closed so we'll ignore it. - chanUtxo, err := b.cfg.Chain.GetUtxo( - fundingPoint, fundingPkScript, channelID.BlockHeight, - b.quit, - ) - if err != nil { - if errors.Is(err, btcwallet.ErrOutputSpent) { - zErr := b.addZombieEdge(msg.ChannelID) - if zErr != nil { - return zErr - } - } - - return NewErrf(ErrChannelSpent, "unable to fetch utxo "+ - "for chan_id=%v, chan_point=%v: %v", - msg.ChannelID, fundingPoint, err) - } - - // TODO(roasbeef): this is a hack, needs to be removed - // after commitment fees are dynamic. - msg.Capacity = btcutil.Amount(chanUtxo.Value) - msg.ChannelPoint = *fundingPoint - if err := b.cfg.Graph.AddChannelEdge(msg, op...); err != nil { - return errors.Errorf("unable to add edge: %v", err) - } - - log.Debugf("New channel discovered! Link "+ - "connects %x and %x with ChannelPoint(%v): "+ - "chan_id=%v, capacity=%v", - msg.NodeKey1Bytes, msg.NodeKey2Bytes, - fundingPoint, msg.ChannelID, msg.Capacity) - b.stats.incNumEdgesDiscovered() - - // As a new edge has been added to the channel graph, we'll - // update the current UTXO filter within our active - // FilteredChainView so we are notified if/when this channel is - // closed. - filterUpdate := []graphdb.EdgePoint{ - { - FundingPkScript: fundingPkScript, - OutPoint: *fundingPoint, - }, - } - err = b.cfg.ChainView.UpdateFilter( - filterUpdate, b.bestHeight.Load(), - ) - if err != nil { - return errors.Errorf("unable to update chain "+ - "view: %v", err) - } - - case *models.ChannelEdgePolicy: - log.Debugf("Received ChannelEdgePolicy for channel %v", - msg.ChannelID) - - // We make sure to hold the mutex for this channel ID, - // such that no other goroutine is concurrently doing - // database accesses for the same channel ID. - b.channelEdgeMtx.Lock(msg.ChannelID) - defer b.channelEdgeMtx.Unlock(msg.ChannelID) - - edge1Timestamp, edge2Timestamp, exists, isZombie, err := - b.cfg.Graph.HasChannelEdge(msg.ChannelID) - if err != nil && !errors.Is( - err, graphdb.ErrGraphNoEdgesFound, - ) { - - return errors.Errorf("unable to check for edge "+ - "existence: %v", err) - } - - // If the channel is marked as a zombie in our database, and - // we consider this a stale update, then we should not apply the - // policy. - isStaleUpdate := time.Since(msg.LastUpdate) > - b.cfg.ChannelPruneExpiry - - if isZombie && isStaleUpdate { - return NewErrf(ErrIgnored, "ignoring stale update "+ - "(flags=%v|%v) for zombie chan_id=%v", - msg.MessageFlags, msg.ChannelFlags, - msg.ChannelID) - } - - // If the channel doesn't exist in our database, we cannot - // apply the updated policy. - if !exists { - return NewErrf(ErrIgnored, "ignoring update "+ - "(flags=%v|%v) for unknown chan_id=%v", - msg.MessageFlags, msg.ChannelFlags, - msg.ChannelID) - } - - log.Debugf("Found edge1Timestamp=%v, edge2Timestamp=%v", - edge1Timestamp, edge2Timestamp) - - // As edges are directional edge node has a unique policy for - // the direction of the edge they control. Therefore, we first - // check if we already have the most up-to-date information for - // that edge. If this message has a timestamp not strictly - // newer than what we already know of we can exit early. - switch msg.ChannelFlags & lnwire.ChanUpdateDirection { - // A flag set of 0 indicates this is an announcement for the - // "first" node in the channel. - case 0: - // Ignore outdated message. - if !edge1Timestamp.Before(msg.LastUpdate) { - return NewErrf(ErrOutdated, "Ignoring "+ - "outdated update (flags=%v|%v) for "+ - "known chan_id=%v", msg.MessageFlags, - msg.ChannelFlags, msg.ChannelID) - } - - // Similarly, a flag set of 1 indicates this is an announcement - // for the "second" node in the channel. - case 1: - // Ignore outdated message. - if !edge2Timestamp.Before(msg.LastUpdate) { - return NewErrf(ErrOutdated, "Ignoring "+ - "outdated update (flags=%v|%v) for "+ - "known chan_id=%v", msg.MessageFlags, - msg.ChannelFlags, msg.ChannelID) - } - } - - // Now that we know this isn't a stale update, we'll apply the - // new edge policy to the proper directional edge within the - // channel graph. - if err = b.cfg.Graph.UpdateEdgePolicy(msg, op...); err != nil { - err := errors.Errorf("unable to add channel: %v", err) - log.Error(err) - return err - } - - log.Tracef("New channel update applied: %v", - lnutils.SpewLogClosure(msg)) - b.stats.incNumChannelUpdates() - - default: - return errors.Errorf("wrong routing update message type") - } - - return nil -} - // routingMsg couples a routing related routing topology update to the // error channel. type routingMsg struct { @@ -1479,6 +1195,32 @@ func (b *Builder) AddNode(node *models.LightningNode, } } +// addNode does some basic checks on the given LightningNode against what we +// currently have persisted in the graph, and then adds it to the graph. If we +// already know about the node, then we only update our DB if the new update +// has a newer timestamp than the last one we received. +func (b *Builder) addNode(node *models.LightningNode, + op ...batch.SchedulerOption) error { + + // Before we add the node to the database, we'll check to see if the + // announcement is "fresh" or not. If it isn't, then we'll return an + // error. + err := b.assertNodeAnnFreshness(node.PubKeyBytes, node.LastUpdate) + if err != nil { + return err + } + + if err := b.cfg.Graph.AddLightningNode(node, op...); err != nil { + return errors.Errorf("unable to add node %x to the "+ + "graph: %v", node.PubKeyBytes, err) + } + + log.Tracef("Updated vertex data for node=%x", node.PubKeyBytes) + b.stats.incNumNodeUpdates() + + return nil +} + // AddEdge is used to add edge/channel to the topology of the router, after all // information about channel will be gathered this edge/channel might be used // in construction of payment path. @@ -1506,6 +1248,182 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo, } } +// addEdge does some validation on the new channel edge against what we +// currently have persisted in the graph, and then adds it to the graph. The +// Chain View is updated with the new edge if it is successfully added to the +// graph. We only persist the channel if we currently dont have it at all in +// our graph. +// +// TODO(elle): this currently also does funding-transaction validation. But this +// should be moved to the gossiper instead. +func (b *Builder) addEdge(edge *models.ChannelEdgeInfo, + op ...batch.SchedulerOption) error { + + log.Debugf("Received ChannelEdgeInfo for channel %v", edge.ChannelID) + + // Prior to processing the announcement we first check if we + // already know of this channel, if so, then we can exit early. + _, _, exists, isZombie, err := b.cfg.Graph.HasChannelEdge( + edge.ChannelID, + ) + if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) { + return errors.Errorf("unable to check for edge existence: %v", + err) + } + if isZombie { + return NewErrf(ErrIgnored, "ignoring msg for zombie chan_id=%v", + edge.ChannelID) + } + if exists { + return NewErrf(ErrIgnored, "ignoring msg for known chan_id=%v", + edge.ChannelID) + } + + // If AssumeChannelValid is present, then we are unable to perform any + // of the expensive checks below, so we'll short-circuit our path + // straight to adding the edge to our graph. If the passed + // ShortChannelID is an alias, then we'll skip validation as it will + // not map to a legitimate tx. This is not a DoS vector as only we can + // add an alias ChannelAnnouncement from the gossiper. + scid := lnwire.NewShortChanIDFromInt(edge.ChannelID) + if b.cfg.AssumeChannelValid || b.cfg.IsAlias(scid) { + err := b.cfg.Graph.AddChannelEdge(edge, op...) + if err != nil { + return fmt.Errorf("unable to add edge: %w", err) + } + log.Tracef("New channel discovered! Link connects %x and %x "+ + "with ChannelID(%v)", edge.NodeKey1Bytes, + edge.NodeKey2Bytes, edge.ChannelID) + b.stats.incNumEdgesDiscovered() + + return nil + } + + // Before we can add the channel to the channel graph, we need to obtain + // the full funding outpoint that's encoded within the channel ID. + channelID := lnwire.NewShortChanIDFromInt(edge.ChannelID) + fundingTx, err := lnwallet.FetchFundingTxWrapper( + b.cfg.Chain, &channelID, b.quit, + ) + if err != nil { + //nolint:ll + // + // In order to ensure we don't erroneously mark a channel as a + // zombie due to an RPC failure, we'll attempt to string match + // for the relevant errors. + // + // * btcd: + // * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316 + // * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086 + // * bitcoind: + // * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770 + // * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954 + switch { + case strings.Contains(err.Error(), "not found"): + fallthrough + + case strings.Contains(err.Error(), "out of range"): + // If the funding transaction isn't found at all, then + // we'll mark the edge itself as a zombie so we don't + // continue to request it. We use the "zero key" for + // both node pubkeys so this edge can't be resurrected. + zErr := b.addZombieEdge(edge.ChannelID) + if zErr != nil { + return zErr + } + + default: + } + + return NewErrf(ErrNoFundingTransaction, "unable to "+ + "locate funding tx: %v", err) + } + + // Recreate witness output to be sure that declared in channel edge + // bitcoin keys and channel value corresponds to the reality. + fundingPkScript, err := makeFundingScript( + edge.BitcoinKey1Bytes[:], edge.BitcoinKey2Bytes[:], + edge.Features, edge.TapscriptRoot, + ) + if err != nil { + return err + } + + // Next we'll validate that this channel is actually well formed. If + // this check fails, then this channel either doesn't exist, or isn't + // the one that was meant to be created according to the passed channel + // proofs. + fundingPoint, err := chanvalidate.Validate( + &chanvalidate.Context{ + Locator: &chanvalidate.ShortChanIDChanLocator{ + ID: channelID, + }, + MultiSigPkScript: fundingPkScript, + FundingTx: fundingTx, + }, + ) + if err != nil { + // Mark the edge as a zombie so we won't try to re-validate it + // on start up. + if err := b.addZombieEdge(edge.ChannelID); err != nil { + return err + } + + return NewErrf(ErrInvalidFundingOutput, "output failed "+ + "validation: %w", err) + } + + // Now that we have the funding outpoint of the channel, ensure + // that it hasn't yet been spent. If so, then this channel has + // been closed so we'll ignore it. + chanUtxo, err := b.cfg.Chain.GetUtxo( + fundingPoint, fundingPkScript, channelID.BlockHeight, b.quit, + ) + if err != nil { + if errors.Is(err, btcwallet.ErrOutputSpent) { + zErr := b.addZombieEdge(edge.ChannelID) + if zErr != nil { + return zErr + } + } + + return NewErrf(ErrChannelSpent, "unable to fetch utxo for "+ + "chan_id=%v, chan_point=%v: %v", edge.ChannelID, + fundingPoint, err) + } + + // TODO(roasbeef): this is a hack, needs to be removed after commitment + // fees are dynamic. + edge.Capacity = btcutil.Amount(chanUtxo.Value) + edge.ChannelPoint = *fundingPoint + if err := b.cfg.Graph.AddChannelEdge(edge, op...); err != nil { + return errors.Errorf("unable to add edge: %v", err) + } + + log.Debugf("New channel discovered! Link connects %x and %x with "+ + "ChannelPoint(%v): chan_id=%v, capacity=%v", edge.NodeKey1Bytes, + edge.NodeKey2Bytes, fundingPoint, edge.ChannelID, edge.Capacity) + b.stats.incNumEdgesDiscovered() + + // As a new edge has been added to the channel graph, we'll update the + // current UTXO filter within our active FilteredChainView so we are + // notified if/when this channel is closed. + filterUpdate := []graphdb.EdgePoint{ + { + FundingPkScript: fundingPkScript, + OutPoint: *fundingPoint, + }, + } + + err = b.cfg.ChainView.UpdateFilter(filterUpdate, b.bestHeight.Load()) + if err != nil { + return errors.Errorf("unable to update chain "+ + "view: %v", err) + } + + return nil +} + // UpdateEdge is used to update edge information, without this message edge // considered as not fully constructed. // @@ -1532,6 +1450,97 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy, } } +// updateEdge validates the new edge policy against what we currently have +// persisted in the graph, and then applies it to the graph if the update is +// considered fresh enough and if we actually have a channel persisted for the +// given update. +func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy, + op ...batch.SchedulerOption) error { + + log.Debugf("Received ChannelEdgePolicy for channel %v", + policy.ChannelID) + + // We make sure to hold the mutex for this channel ID, such that no + // other goroutine is concurrently doing database accesses for the same + // channel ID. + b.channelEdgeMtx.Lock(policy.ChannelID) + defer b.channelEdgeMtx.Unlock(policy.ChannelID) + + edge1Timestamp, edge2Timestamp, exists, isZombie, err := + b.cfg.Graph.HasChannelEdge(policy.ChannelID) + if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) { + return errors.Errorf("unable to check for edge existence: %v", + err) + } + + // If the channel is marked as a zombie in our database, and + // we consider this a stale update, then we should not apply the + // policy. + isStaleUpdate := time.Since(policy.LastUpdate) > + b.cfg.ChannelPruneExpiry + + if isZombie && isStaleUpdate { + return NewErrf(ErrIgnored, "ignoring stale update "+ + "(flags=%v|%v) for zombie chan_id=%v", + policy.MessageFlags, policy.ChannelFlags, + policy.ChannelID) + } + + // If the channel doesn't exist in our database, we cannot apply the + // updated policy. + if !exists { + return NewErrf(ErrIgnored, "ignoring update (flags=%v|%v) for "+ + "unknown chan_id=%v", policy.MessageFlags, + policy.ChannelFlags, policy.ChannelID) + } + + log.Debugf("Found edge1Timestamp=%v, edge2Timestamp=%v", + edge1Timestamp, edge2Timestamp) + + // As edges are directional edge node has a unique policy for the + // direction of the edge they control. Therefore, we first check if we + // already have the most up-to-date information for that edge. If this + // message has a timestamp not strictly newer than what we already know + // of we can exit early. + switch policy.ChannelFlags & lnwire.ChanUpdateDirection { + // A flag set of 0 indicates this is an announcement for the "first" + // node in the channel. + case 0: + // Ignore outdated message. + if !edge1Timestamp.Before(policy.LastUpdate) { + return NewErrf(ErrOutdated, "Ignoring "+ + "outdated update (flags=%v|%v) for "+ + "known chan_id=%v", policy.MessageFlags, + policy.ChannelFlags, policy.ChannelID) + } + + // Similarly, a flag set of 1 indicates this is an announcement + // for the "second" node in the channel. + case 1: + // Ignore outdated message. + if !edge2Timestamp.Before(policy.LastUpdate) { + return NewErrf(ErrOutdated, "Ignoring "+ + "outdated update (flags=%v|%v) for "+ + "known chan_id=%v", policy.MessageFlags, + policy.ChannelFlags, policy.ChannelID) + } + } + + // Now that we know this isn't a stale update, we'll apply the new edge + // policy to the proper directional edge within the channel graph. + if err = b.cfg.Graph.UpdateEdgePolicy(policy, op...); err != nil { + err := errors.Errorf("unable to add channel: %v", err) + log.Error(err) + return err + } + + log.Tracef("New channel update applied: %v", + lnutils.SpewLogClosure(policy)) + b.stats.incNumChannelUpdates() + + return nil +} + // CurrentBlockHeight returns the block height from POV of the router subsystem. // // NOTE: This method is part of the ChannelGraphSource interface.