localchans: recreate missing edge if not found

If a node contains a channel, but doesn't have a corresponding edge in
the graph database, updating the channel policy would fail. In this
commit the edge is recreated if the channel exists. This ensures a node
can recover from a missing edge in the graph database by calling
updatechanpolicy.
This commit is contained in:
Jesse de Wit 2024-06-03 12:08:29 +02:00
parent aa2ddf77d0
commit bb4d3db8bc
No known key found for this signature in database
GPG Key ID: 78A9DCCE385AE6B4
6 changed files with 177 additions and 7 deletions

2
log.go
View File

@ -46,6 +46,7 @@ import (
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/blindedpath"
"github.com/lightningnetwork/lnd/routing/localchans"
"github.com/lightningnetwork/lnd/rpcperms"
"github.com/lightningnetwork/lnd/signal"
"github.com/lightningnetwork/lnd/sweep"
@ -172,6 +173,7 @@ func SetupLoggers(root *build.SubLoggerManager, interceptor signal.Interceptor)
AddSubLogger(root, "CHFD", interceptor, chanfunding.UseLogger)
AddSubLogger(root, "PEER", interceptor, peer.UseLogger)
AddSubLogger(root, "CHCL", interceptor, chancloser.UseLogger)
AddSubLogger(root, "LCHN", interceptor, localchans.UseLogger)
AddSubLogger(root, routing.Subsystem, interceptor, routing.UseLogger)
AddSubLogger(root, routerrpc.Subsystem, interceptor, routerrpc.UseLogger)

31
routing/localchans/log.go Normal file
View File

@ -0,0 +1,31 @@
package localchans
import (
"github.com/btcsuite/btclog/v2"
"github.com/lightningnetwork/lnd/build"
)
// log is a logger that is initialized with no output filters. This means the
// package will not perform any logging by default until the caller requests
// it.
var log btclog.Logger
const Subsystem = "LCHN"
// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger(Subsystem, nil))
}
// DisableLog disables all library log output. Logging output is disabled by
// default until UseLogger is called.
func DisableLog() {
UseLogger(btclog.Disabled)
}
// UseLogger uses a specified Logger to output package logging info. This
// should be used in preference to SetLogWriter if the caller is also using
// btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}

View File

@ -1,10 +1,13 @@
package localchans
import (
"bytes"
"errors"
"fmt"
"sync"
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/models"
@ -19,6 +22,12 @@ import (
// Manager manages the node's local channels. The only operation that is
// currently implemented is updating forwarding policies.
type Manager struct {
// SelfPub contains the public key of the local node.
SelfPub *btcec.PublicKey
// DefaultRoutingPolicy is the default routing policy.
DefaultRoutingPolicy models.ForwardingPolicy
// UpdateForwardingPolicies is used by the manager to update active
// links with a new policy.
UpdateForwardingPolicies func(
@ -40,6 +49,9 @@ type Manager struct {
FetchChannel func(tx kvdb.RTx, chanPoint wire.OutPoint) (
*channeldb.OpenChannel, error)
// AddEdge is used to add edge/channel to the topology of the router.
AddEdge func(edge *models.ChannelEdgeInfo) error
// policyUpdateLock ensures that the database and the link do not fall
// out of sync if there are concurrent fee update calls. Without it,
// there is a chance that policy A updates the database, then policy B
@ -51,7 +63,8 @@ type Manager struct {
// UpdatePolicy updates the policy for the specified channels on disk and in
// the active links.
func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
chanPoints ...wire.OutPoint) ([]*lnrpc.FailedUpdate, error) {
createMissingEdge bool, chanPoints ...wire.OutPoint) (
[]*lnrpc.FailedUpdate, error) {
r.policyUpdateLock.Lock()
defer r.policyUpdateLock.Unlock()
@ -69,10 +82,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
var edgesToUpdate []discovery.EdgeWithInfo
policiesToUpdate := make(map[wire.OutPoint]models.ForwardingPolicy)
// Next, we'll loop over all the outgoing channels the router knows of.
// If we have a filter then we'll only collected those channels,
// otherwise we'll collect them all.
err := r.ForAllOutgoingChannels(func(
processChan := func(
tx kvdb.RTx,
info *models.ChannelEdgeInfo,
edge *models.ChannelEdgePolicy) error {
@ -125,7 +135,12 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
}
return nil
})
}
// Next, we'll loop over all the outgoing channels the router knows of.
// If we have a filter then we'll only collect those channels, otherwise
// we'll collect them all.
err := r.ForAllOutgoingChannels(processChan)
if err != nil {
return nil, err
}
@ -155,7 +170,56 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
"not yet confirmed",
))
case createMissingEdge:
// If the edge was not found, but the channel is found,
// that means the edge is missing in the graph database
// and should be recreated. The edge and policy are
// created in-memory. The edge is inserted in createEdge
// below and the policy will be added to the graph in
// the PropagateChanPolicyUpdate call below.
log.Warnf("Missing edge for active channel (%s) "+
"during policy update. Recreating edge with "+
"default policy.",
channel.FundingOutpoint.String())
info, edge, err := r.createEdge(channel, time.Now())
if err != nil {
log.Errorf("Failed to recreate missing edge "+
"for channel (%s): %v",
channel.FundingOutpoint.String(), err)
f := lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN
failedUpdates = append(failedUpdates,
makeFailureItem(chanPoint, f,
"could not update policies",
))
}
// Insert the edge into the database to avoid `edge not
// found` errors during policy update propagation.
err = r.AddEdge(info)
if err != nil {
log.Warnf("Attempt to add missing edge for "+
"channel (%s) errored with: %v",
channel.FundingOutpoint.String(), err)
f := lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN
failedUpdates = append(failedUpdates,
makeFailureItem(chanPoint, f,
"could not add edge",
))
}
err = processChan(nil, info, edge)
if err != nil {
return nil, err
}
default:
log.Warnf("Missing edge for active channel (%s) "+
"during policy update. Could not update "+
"policy.", channel.FundingOutpoint.String())
failedUpdates = append(failedUpdates,
makeFailureItem(chanPoint,
lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
@ -180,6 +244,68 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
return failedUpdates, nil
}
// createEdge recreates an edge and policy from an open channel in-memory.
func (r *Manager) createEdge(channel *channeldb.OpenChannel,
timestamp time.Time) (*models.ChannelEdgeInfo,
*models.ChannelEdgePolicy, error) {
nodeKey1Bytes := r.SelfPub.SerializeCompressed()
nodeKey2Bytes := channel.IdentityPub.SerializeCompressed()
bitcoinKey1Bytes := channel.LocalChanCfg.MultiSigKey.PubKey.
SerializeCompressed()
bitcoinKey2Bytes := channel.RemoteChanCfg.MultiSigKey.PubKey.
SerializeCompressed()
channelFlags := lnwire.ChanUpdateChanFlags(0)
// Make it such that node_id_1 is the lexicographically-lesser of the
// two compressed keys sorted in ascending lexicographic order.
if bytes.Compare(nodeKey2Bytes, nodeKey1Bytes) < 0 {
nodeKey1Bytes, nodeKey2Bytes = nodeKey2Bytes, nodeKey1Bytes
bitcoinKey1Bytes, bitcoinKey2Bytes = bitcoinKey2Bytes,
bitcoinKey1Bytes
channelFlags = 1
}
var featureBuf bytes.Buffer
err := lnwire.NewRawFeatureVector().Encode(&featureBuf)
if err != nil {
return nil, nil, fmt.Errorf("unable to encode features: %w",
err)
}
info := &models.ChannelEdgeInfo{
ChannelID: channel.ShortChanID().ToUint64(),
ChainHash: channel.ChainHash,
Features: featureBuf.Bytes(),
Capacity: channel.Capacity,
ChannelPoint: channel.FundingOutpoint,
}
copy(info.NodeKey1Bytes[:], nodeKey1Bytes)
copy(info.NodeKey2Bytes[:], nodeKey2Bytes)
copy(info.BitcoinKey1Bytes[:], bitcoinKey1Bytes)
copy(info.BitcoinKey2Bytes[:], bitcoinKey2Bytes)
// Construct a dummy channel edge policy with default values that will
// be updated with the new values in the call to processChan below.
timeLockDelta := uint16(r.DefaultRoutingPolicy.TimeLockDelta)
edge := &models.ChannelEdgePolicy{
ChannelID: channel.ShortChanID().ToUint64(),
LastUpdate: timestamp,
TimeLockDelta: timeLockDelta,
ChannelFlags: channelFlags,
MessageFlags: lnwire.ChanUpdateRequiredMaxHtlc,
FeeBaseMSat: r.DefaultRoutingPolicy.BaseFee,
FeeProportionalMillionths: r.DefaultRoutingPolicy.FeeRate,
MinHTLC: r.DefaultRoutingPolicy.MinHTLCOut,
MaxHTLC: r.DefaultRoutingPolicy.MaxHTLC,
}
copy(edge.ToNode[:], channel.IdentityPub.SerializeCompressed())
return info, edge, nil
}
// updateEdge updates the given edge with the new schema.
func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint,
edge *models.ChannelEdgePolicy,

View File

@ -156,6 +156,7 @@ func TestManager(t *testing.T) {
newPolicy routing.ChannelPolicy
channelSet []channel
specifiedChanPoints []wire.OutPoint
createMissingEdge bool
expectedNumUpdates int
expectedUpdateFailures []lnrpc.UpdateFailure
expectErr error
@ -173,6 +174,7 @@ func TestManager(t *testing.T) {
},
},
specifiedChanPoints: []wire.OutPoint{chanPointValid},
createMissingEdge: false,
expectedNumUpdates: 1,
expectedUpdateFailures: []lnrpc.UpdateFailure{},
expectErr: nil,
@ -190,6 +192,7 @@ func TestManager(t *testing.T) {
},
},
specifiedChanPoints: []wire.OutPoint{},
createMissingEdge: false,
expectedNumUpdates: 1,
expectedUpdateFailures: []lnrpc.UpdateFailure{},
expectErr: nil,
@ -207,6 +210,7 @@ func TestManager(t *testing.T) {
},
},
specifiedChanPoints: []wire.OutPoint{chanPointMissing},
createMissingEdge: false,
expectedNumUpdates: 0,
expectedUpdateFailures: []lnrpc.UpdateFailure{
lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
@ -228,6 +232,7 @@ func TestManager(t *testing.T) {
},
},
specifiedChanPoints: []wire.OutPoint{chanPointValid},
createMissingEdge: false,
expectedNumUpdates: 1,
expectedUpdateFailures: []lnrpc.UpdateFailure{},
expectErr: nil,
@ -242,6 +247,7 @@ func TestManager(t *testing.T) {
expectedNumUpdates = test.expectedNumUpdates
failedUpdates, err := manager.UpdatePolicy(test.newPolicy,
test.createMissingEdge,
test.specifiedChanPoints...)
if len(failedUpdates) != len(test.expectedUpdateFailures) {

View File

@ -7762,7 +7762,7 @@ func (r *rpcServer) UpdateChannelPolicy(ctx context.Context,
// With the scope resolved, we'll now send this to the local channel
// manager so it can propagate the new policy for our target channel(s).
failedUpdates, err := r.server.localChanMgr.UpdatePolicy(chanPolicy,
targetChans...)
req.CreateMissingEdge, targetChans...)
if err != nil {
return nil, err
}

View File

@ -1112,10 +1112,15 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
//nolint:lll
s.localChanMgr = &localchans.Manager{
SelfPub: nodeKeyDesc.PubKey,
DefaultRoutingPolicy: cc.RoutingPolicy,
ForAllOutgoingChannels: s.graphBuilder.ForAllOutgoingChannels,
PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate,
UpdateForwardingPolicies: s.htlcSwitch.UpdateForwardingPolicies,
FetchChannel: s.chanStateDB.FetchChannel,
AddEdge: func(edge *models.ChannelEdgeInfo) error {
return s.graphBuilder.AddEdge(edge)
},
}
utxnStore, err := contractcourt.NewNurseryStore(