graph: refactor announcement handling logic

In this commit, we remove the `processUpdate` method which handles each
announement type (node, channel, channel update) in a separate switch
case. Each of these cases currently has a non-trivial amount of code.
This commit creates separate methods for each message type we want to
handle instead. This removes a level of indentation and will make things
easier to review when we start editing the code for each handler.
This commit is contained in:
Elle Mouton 2025-02-05 09:39:33 +02:00
parent 1974903fb2
commit 276b335cf5
No known key found for this signature in database
GPG key ID: D7D916376026F177

View file

@ -674,7 +674,20 @@ func (b *Builder) handleNetworkUpdate(update *routingMsg) {
// Process the routing update to determine if this is either a new // Process the routing update to determine if this is either a new
// update from our PoV or an update to a prior vertex/edge we // update from our PoV or an update to a prior vertex/edge we
// previously accepted. // previously accepted.
err := b.processUpdate(update.msg, update.op...) var err error
switch msg := update.msg.(type) {
case *models.LightningNode:
err = b.addNode(msg, update.op...)
case *models.ChannelEdgeInfo:
err = b.addEdge(msg, update.op...)
case *models.ChannelEdgePolicy:
err = b.updateEdge(msg, update.op...)
default:
err = errors.Errorf("wrong routing update message type")
}
update.err <- err update.err <- err
// If the error is not nil here, there's no need to send topology // If the error is not nil here, there's no need to send topology
@ -1094,303 +1107,6 @@ func makeFundingScript(bitcoinKey1, bitcoinKey2 []byte, chanFeatures []byte,
return legacyFundingScript() return legacyFundingScript()
} }
// processUpdate processes a new relate authenticated channel/edge, node or
// channel/edge update network update. If the update didn't affect the internal
// state of the draft due to either being out of date, invalid, or redundant,
// then error is returned.
//
//nolint:funlen
func (b *Builder) processUpdate(msg interface{},
op ...batch.SchedulerOption) error {
switch msg := msg.(type) {
case *models.LightningNode:
// Before we add the node to the database, we'll check to see
// if the announcement is "fresh" or not. If it isn't, then
// we'll return an error.
err := b.assertNodeAnnFreshness(msg.PubKeyBytes, msg.LastUpdate)
if err != nil {
return err
}
if err := b.cfg.Graph.AddLightningNode(msg, op...); err != nil {
return errors.Errorf("unable to add node %x to the "+
"graph: %v", msg.PubKeyBytes, err)
}
log.Tracef("Updated vertex data for node=%x", msg.PubKeyBytes)
b.stats.incNumNodeUpdates()
case *models.ChannelEdgeInfo:
log.Debugf("Received ChannelEdgeInfo for channel %v",
msg.ChannelID)
// Prior to processing the announcement we first check if we
// already know of this channel, if so, then we can exit early.
_, _, exists, isZombie, err := b.cfg.Graph.HasChannelEdge(
msg.ChannelID,
)
if err != nil &&
!errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
return errors.Errorf("unable to check for edge "+
"existence: %v", err)
}
if isZombie {
return NewErrf(ErrIgnored, "ignoring msg for zombie "+
"chan_id=%v", msg.ChannelID)
}
if exists {
return NewErrf(ErrIgnored, "ignoring msg for known "+
"chan_id=%v", msg.ChannelID)
}
// If AssumeChannelValid is present, then we are unable to
// perform any of the expensive checks below, so we'll
// short-circuit our path straight to adding the edge to our
// graph. If the passed ShortChannelID is an alias, then we'll
// skip validation as it will not map to a legitimate tx. This
// is not a DoS vector as only we can add an alias
// ChannelAnnouncement from the gossiper.
scid := lnwire.NewShortChanIDFromInt(msg.ChannelID)
if b.cfg.AssumeChannelValid || b.cfg.IsAlias(scid) {
err := b.cfg.Graph.AddChannelEdge(msg, op...)
if err != nil {
return fmt.Errorf("unable to add edge: %w", err)
}
log.Tracef("New channel discovered! Link "+
"connects %x and %x with ChannelID(%v)",
msg.NodeKey1Bytes, msg.NodeKey2Bytes,
msg.ChannelID)
b.stats.incNumEdgesDiscovered()
break
}
// Before we can add the channel to the channel graph, we need
// to obtain the full funding outpoint that's encoded within
// the channel ID.
channelID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
fundingTx, err := lnwallet.FetchFundingTxWrapper(
b.cfg.Chain, &channelID, b.quit,
)
if err != nil {
//nolint:ll
//
// In order to ensure we don't erroneously mark a
// channel as a zombie due to an RPC failure, we'll
// attempt to string match for the relevant errors.
//
// * btcd:
// * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316
// * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086
// * bitcoind:
// * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770
// * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954
switch {
case strings.Contains(err.Error(), "not found"):
fallthrough
case strings.Contains(err.Error(), "out of range"):
// If the funding transaction isn't found at
// all, then we'll mark the edge itself as a
// zombie so we don't continue to request it.
// We use the "zero key" for both node pubkeys
// so this edge can't be resurrected.
zErr := b.addZombieEdge(msg.ChannelID)
if zErr != nil {
return zErr
}
default:
}
return NewErrf(ErrNoFundingTransaction, "unable to "+
"locate funding tx: %v", err)
}
// Recreate witness output to be sure that declared in channel
// edge bitcoin keys and channel value corresponds to the
// reality.
fundingPkScript, err := makeFundingScript(
msg.BitcoinKey1Bytes[:], msg.BitcoinKey2Bytes[:],
msg.Features, msg.TapscriptRoot,
)
if err != nil {
return err
}
// Next we'll validate that this channel is actually well
// formed. If this check fails, then this channel either
// doesn't exist, or isn't the one that was meant to be created
// according to the passed channel proofs.
fundingPoint, err := chanvalidate.Validate(
&chanvalidate.Context{
Locator: &chanvalidate.ShortChanIDChanLocator{
ID: channelID,
},
MultiSigPkScript: fundingPkScript,
FundingTx: fundingTx,
},
)
if err != nil {
// Mark the edge as a zombie so we won't try to
// re-validate it on start up.
if err := b.addZombieEdge(msg.ChannelID); err != nil {
return err
}
return NewErrf(ErrInvalidFundingOutput, "output "+
"failed validation: %w", err)
}
// Now that we have the funding outpoint of the channel, ensure
// that it hasn't yet been spent. If so, then this channel has
// been closed so we'll ignore it.
chanUtxo, err := b.cfg.Chain.GetUtxo(
fundingPoint, fundingPkScript, channelID.BlockHeight,
b.quit,
)
if err != nil {
if errors.Is(err, btcwallet.ErrOutputSpent) {
zErr := b.addZombieEdge(msg.ChannelID)
if zErr != nil {
return zErr
}
}
return NewErrf(ErrChannelSpent, "unable to fetch utxo "+
"for chan_id=%v, chan_point=%v: %v",
msg.ChannelID, fundingPoint, err)
}
// TODO(roasbeef): this is a hack, needs to be removed
// after commitment fees are dynamic.
msg.Capacity = btcutil.Amount(chanUtxo.Value)
msg.ChannelPoint = *fundingPoint
if err := b.cfg.Graph.AddChannelEdge(msg, op...); err != nil {
return errors.Errorf("unable to add edge: %v", err)
}
log.Debugf("New channel discovered! Link "+
"connects %x and %x with ChannelPoint(%v): "+
"chan_id=%v, capacity=%v",
msg.NodeKey1Bytes, msg.NodeKey2Bytes,
fundingPoint, msg.ChannelID, msg.Capacity)
b.stats.incNumEdgesDiscovered()
// As a new edge has been added to the channel graph, we'll
// update the current UTXO filter within our active
// FilteredChainView so we are notified if/when this channel is
// closed.
filterUpdate := []graphdb.EdgePoint{
{
FundingPkScript: fundingPkScript,
OutPoint: *fundingPoint,
},
}
err = b.cfg.ChainView.UpdateFilter(
filterUpdate, b.bestHeight.Load(),
)
if err != nil {
return errors.Errorf("unable to update chain "+
"view: %v", err)
}
case *models.ChannelEdgePolicy:
log.Debugf("Received ChannelEdgePolicy for channel %v",
msg.ChannelID)
// We make sure to hold the mutex for this channel ID,
// such that no other goroutine is concurrently doing
// database accesses for the same channel ID.
b.channelEdgeMtx.Lock(msg.ChannelID)
defer b.channelEdgeMtx.Unlock(msg.ChannelID)
edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
b.cfg.Graph.HasChannelEdge(msg.ChannelID)
if err != nil && !errors.Is(
err, graphdb.ErrGraphNoEdgesFound,
) {
return errors.Errorf("unable to check for edge "+
"existence: %v", err)
}
// If the channel is marked as a zombie in our database, and
// we consider this a stale update, then we should not apply the
// policy.
isStaleUpdate := time.Since(msg.LastUpdate) >
b.cfg.ChannelPruneExpiry
if isZombie && isStaleUpdate {
return NewErrf(ErrIgnored, "ignoring stale update "+
"(flags=%v|%v) for zombie chan_id=%v",
msg.MessageFlags, msg.ChannelFlags,
msg.ChannelID)
}
// If the channel doesn't exist in our database, we cannot
// apply the updated policy.
if !exists {
return NewErrf(ErrIgnored, "ignoring update "+
"(flags=%v|%v) for unknown chan_id=%v",
msg.MessageFlags, msg.ChannelFlags,
msg.ChannelID)
}
log.Debugf("Found edge1Timestamp=%v, edge2Timestamp=%v",
edge1Timestamp, edge2Timestamp)
// As edges are directional edge node has a unique policy for
// the direction of the edge they control. Therefore, we first
// check if we already have the most up-to-date information for
// that edge. If this message has a timestamp not strictly
// newer than what we already know of we can exit early.
switch msg.ChannelFlags & lnwire.ChanUpdateDirection {
// A flag set of 0 indicates this is an announcement for the
// "first" node in the channel.
case 0:
// Ignore outdated message.
if !edge1Timestamp.Before(msg.LastUpdate) {
return NewErrf(ErrOutdated, "Ignoring "+
"outdated update (flags=%v|%v) for "+
"known chan_id=%v", msg.MessageFlags,
msg.ChannelFlags, msg.ChannelID)
}
// Similarly, a flag set of 1 indicates this is an announcement
// for the "second" node in the channel.
case 1:
// Ignore outdated message.
if !edge2Timestamp.Before(msg.LastUpdate) {
return NewErrf(ErrOutdated, "Ignoring "+
"outdated update (flags=%v|%v) for "+
"known chan_id=%v", msg.MessageFlags,
msg.ChannelFlags, msg.ChannelID)
}
}
// Now that we know this isn't a stale update, we'll apply the
// new edge policy to the proper directional edge within the
// channel graph.
if err = b.cfg.Graph.UpdateEdgePolicy(msg, op...); err != nil {
err := errors.Errorf("unable to add channel: %v", err)
log.Error(err)
return err
}
log.Tracef("New channel update applied: %v",
lnutils.SpewLogClosure(msg))
b.stats.incNumChannelUpdates()
default:
return errors.Errorf("wrong routing update message type")
}
return nil
}
// routingMsg couples a routing related routing topology update to the // routingMsg couples a routing related routing topology update to the
// error channel. // error channel.
type routingMsg struct { type routingMsg struct {
@ -1479,6 +1195,32 @@ func (b *Builder) AddNode(node *models.LightningNode,
} }
} }
// addNode does some basic checks on the given LightningNode against what we
// currently have persisted in the graph, and then adds it to the graph. If we
// already know about the node, then we only update our DB if the new update
// has a newer timestamp than the last one we received.
func (b *Builder) addNode(node *models.LightningNode,
op ...batch.SchedulerOption) error {
// Before we add the node to the database, we'll check to see if the
// announcement is "fresh" or not. If it isn't, then we'll return an
// error.
err := b.assertNodeAnnFreshness(node.PubKeyBytes, node.LastUpdate)
if err != nil {
return err
}
if err := b.cfg.Graph.AddLightningNode(node, op...); err != nil {
return errors.Errorf("unable to add node %x to the "+
"graph: %v", node.PubKeyBytes, err)
}
log.Tracef("Updated vertex data for node=%x", node.PubKeyBytes)
b.stats.incNumNodeUpdates()
return nil
}
// AddEdge is used to add edge/channel to the topology of the router, after all // AddEdge is used to add edge/channel to the topology of the router, after all
// information about channel will be gathered this edge/channel might be used // information about channel will be gathered this edge/channel might be used
// in construction of payment path. // in construction of payment path.
@ -1506,6 +1248,182 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
} }
} }
// addEdge does some validation on the new channel edge against what we
// currently have persisted in the graph, and then adds it to the graph. The
// Chain View is updated with the new edge if it is successfully added to the
// graph. We only persist the channel if we currently dont have it at all in
// our graph.
//
// TODO(elle): this currently also does funding-transaction validation. But this
// should be moved to the gossiper instead.
func (b *Builder) addEdge(edge *models.ChannelEdgeInfo,
op ...batch.SchedulerOption) error {
log.Debugf("Received ChannelEdgeInfo for channel %v", edge.ChannelID)
// Prior to processing the announcement we first check if we
// already know of this channel, if so, then we can exit early.
_, _, exists, isZombie, err := b.cfg.Graph.HasChannelEdge(
edge.ChannelID,
)
if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
return errors.Errorf("unable to check for edge existence: %v",
err)
}
if isZombie {
return NewErrf(ErrIgnored, "ignoring msg for zombie chan_id=%v",
edge.ChannelID)
}
if exists {
return NewErrf(ErrIgnored, "ignoring msg for known chan_id=%v",
edge.ChannelID)
}
// If AssumeChannelValid is present, then we are unable to perform any
// of the expensive checks below, so we'll short-circuit our path
// straight to adding the edge to our graph. If the passed
// ShortChannelID is an alias, then we'll skip validation as it will
// not map to a legitimate tx. This is not a DoS vector as only we can
// add an alias ChannelAnnouncement from the gossiper.
scid := lnwire.NewShortChanIDFromInt(edge.ChannelID)
if b.cfg.AssumeChannelValid || b.cfg.IsAlias(scid) {
err := b.cfg.Graph.AddChannelEdge(edge, op...)
if err != nil {
return fmt.Errorf("unable to add edge: %w", err)
}
log.Tracef("New channel discovered! Link connects %x and %x "+
"with ChannelID(%v)", edge.NodeKey1Bytes,
edge.NodeKey2Bytes, edge.ChannelID)
b.stats.incNumEdgesDiscovered()
return nil
}
// Before we can add the channel to the channel graph, we need to obtain
// the full funding outpoint that's encoded within the channel ID.
channelID := lnwire.NewShortChanIDFromInt(edge.ChannelID)
fundingTx, err := lnwallet.FetchFundingTxWrapper(
b.cfg.Chain, &channelID, b.quit,
)
if err != nil {
//nolint:ll
//
// In order to ensure we don't erroneously mark a channel as a
// zombie due to an RPC failure, we'll attempt to string match
// for the relevant errors.
//
// * btcd:
// * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1316
// * https://github.com/btcsuite/btcd/blob/master/rpcserver.go#L1086
// * bitcoind:
// * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L770
// * https://github.com/bitcoin/bitcoin/blob/7fcf53f7b4524572d1d0c9a5fdc388e87eb02416/src/rpc/blockchain.cpp#L954
switch {
case strings.Contains(err.Error(), "not found"):
fallthrough
case strings.Contains(err.Error(), "out of range"):
// If the funding transaction isn't found at all, then
// we'll mark the edge itself as a zombie so we don't
// continue to request it. We use the "zero key" for
// both node pubkeys so this edge can't be resurrected.
zErr := b.addZombieEdge(edge.ChannelID)
if zErr != nil {
return zErr
}
default:
}
return NewErrf(ErrNoFundingTransaction, "unable to "+
"locate funding tx: %v", err)
}
// Recreate witness output to be sure that declared in channel edge
// bitcoin keys and channel value corresponds to the reality.
fundingPkScript, err := makeFundingScript(
edge.BitcoinKey1Bytes[:], edge.BitcoinKey2Bytes[:],
edge.Features, edge.TapscriptRoot,
)
if err != nil {
return err
}
// Next we'll validate that this channel is actually well formed. If
// this check fails, then this channel either doesn't exist, or isn't
// the one that was meant to be created according to the passed channel
// proofs.
fundingPoint, err := chanvalidate.Validate(
&chanvalidate.Context{
Locator: &chanvalidate.ShortChanIDChanLocator{
ID: channelID,
},
MultiSigPkScript: fundingPkScript,
FundingTx: fundingTx,
},
)
if err != nil {
// Mark the edge as a zombie so we won't try to re-validate it
// on start up.
if err := b.addZombieEdge(edge.ChannelID); err != nil {
return err
}
return NewErrf(ErrInvalidFundingOutput, "output failed "+
"validation: %w", err)
}
// Now that we have the funding outpoint of the channel, ensure
// that it hasn't yet been spent. If so, then this channel has
// been closed so we'll ignore it.
chanUtxo, err := b.cfg.Chain.GetUtxo(
fundingPoint, fundingPkScript, channelID.BlockHeight, b.quit,
)
if err != nil {
if errors.Is(err, btcwallet.ErrOutputSpent) {
zErr := b.addZombieEdge(edge.ChannelID)
if zErr != nil {
return zErr
}
}
return NewErrf(ErrChannelSpent, "unable to fetch utxo for "+
"chan_id=%v, chan_point=%v: %v", edge.ChannelID,
fundingPoint, err)
}
// TODO(roasbeef): this is a hack, needs to be removed after commitment
// fees are dynamic.
edge.Capacity = btcutil.Amount(chanUtxo.Value)
edge.ChannelPoint = *fundingPoint
if err := b.cfg.Graph.AddChannelEdge(edge, op...); err != nil {
return errors.Errorf("unable to add edge: %v", err)
}
log.Debugf("New channel discovered! Link connects %x and %x with "+
"ChannelPoint(%v): chan_id=%v, capacity=%v", edge.NodeKey1Bytes,
edge.NodeKey2Bytes, fundingPoint, edge.ChannelID, edge.Capacity)
b.stats.incNumEdgesDiscovered()
// As a new edge has been added to the channel graph, we'll update the
// current UTXO filter within our active FilteredChainView so we are
// notified if/when this channel is closed.
filterUpdate := []graphdb.EdgePoint{
{
FundingPkScript: fundingPkScript,
OutPoint: *fundingPoint,
},
}
err = b.cfg.ChainView.UpdateFilter(filterUpdate, b.bestHeight.Load())
if err != nil {
return errors.Errorf("unable to update chain "+
"view: %v", err)
}
return nil
}
// UpdateEdge is used to update edge information, without this message edge // UpdateEdge is used to update edge information, without this message edge
// considered as not fully constructed. // considered as not fully constructed.
// //
@ -1532,6 +1450,97 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
} }
} }
// updateEdge validates the new edge policy against what we currently have
// persisted in the graph, and then applies it to the graph if the update is
// considered fresh enough and if we actually have a channel persisted for the
// given update.
func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy,
op ...batch.SchedulerOption) error {
log.Debugf("Received ChannelEdgePolicy for channel %v",
policy.ChannelID)
// We make sure to hold the mutex for this channel ID, such that no
// other goroutine is concurrently doing database accesses for the same
// channel ID.
b.channelEdgeMtx.Lock(policy.ChannelID)
defer b.channelEdgeMtx.Unlock(policy.ChannelID)
edge1Timestamp, edge2Timestamp, exists, isZombie, err :=
b.cfg.Graph.HasChannelEdge(policy.ChannelID)
if err != nil && !errors.Is(err, graphdb.ErrGraphNoEdgesFound) {
return errors.Errorf("unable to check for edge existence: %v",
err)
}
// If the channel is marked as a zombie in our database, and
// we consider this a stale update, then we should not apply the
// policy.
isStaleUpdate := time.Since(policy.LastUpdate) >
b.cfg.ChannelPruneExpiry
if isZombie && isStaleUpdate {
return NewErrf(ErrIgnored, "ignoring stale update "+
"(flags=%v|%v) for zombie chan_id=%v",
policy.MessageFlags, policy.ChannelFlags,
policy.ChannelID)
}
// If the channel doesn't exist in our database, we cannot apply the
// updated policy.
if !exists {
return NewErrf(ErrIgnored, "ignoring update (flags=%v|%v) for "+
"unknown chan_id=%v", policy.MessageFlags,
policy.ChannelFlags, policy.ChannelID)
}
log.Debugf("Found edge1Timestamp=%v, edge2Timestamp=%v",
edge1Timestamp, edge2Timestamp)
// As edges are directional edge node has a unique policy for the
// direction of the edge they control. Therefore, we first check if we
// already have the most up-to-date information for that edge. If this
// message has a timestamp not strictly newer than what we already know
// of we can exit early.
switch policy.ChannelFlags & lnwire.ChanUpdateDirection {
// A flag set of 0 indicates this is an announcement for the "first"
// node in the channel.
case 0:
// Ignore outdated message.
if !edge1Timestamp.Before(policy.LastUpdate) {
return NewErrf(ErrOutdated, "Ignoring "+
"outdated update (flags=%v|%v) for "+
"known chan_id=%v", policy.MessageFlags,
policy.ChannelFlags, policy.ChannelID)
}
// Similarly, a flag set of 1 indicates this is an announcement
// for the "second" node in the channel.
case 1:
// Ignore outdated message.
if !edge2Timestamp.Before(policy.LastUpdate) {
return NewErrf(ErrOutdated, "Ignoring "+
"outdated update (flags=%v|%v) for "+
"known chan_id=%v", policy.MessageFlags,
policy.ChannelFlags, policy.ChannelID)
}
}
// Now that we know this isn't a stale update, we'll apply the new edge
// policy to the proper directional edge within the channel graph.
if err = b.cfg.Graph.UpdateEdgePolicy(policy, op...); err != nil {
err := errors.Errorf("unable to add channel: %v", err)
log.Error(err)
return err
}
log.Tracef("New channel update applied: %v",
lnutils.SpewLogClosure(policy))
b.stats.incNumChannelUpdates()
return nil
}
// CurrentBlockHeight returns the block height from POV of the router subsystem. // CurrentBlockHeight returns the block height from POV of the router subsystem.
// //
// NOTE: This method is part of the ChannelGraphSource interface. // NOTE: This method is part of the ChannelGraphSource interface.