From bb4d3db8bc3693dfbd999b96f06dfee41b674a2c Mon Sep 17 00:00:00 2001 From: Jesse de Wit Date: Mon, 3 Jun 2024 12:08:29 +0200 Subject: [PATCH] localchans: recreate missing edge if not found If a node contains a channel, but doesn't have a corresponding edge in the graph database, updating the channel policy would fail. In this commit the edge is recreated if the channel exists. This ensures a node can recover from a missing edge in the graph database by calling updatechanpolicy. --- log.go | 2 + routing/localchans/log.go | 31 +++++++ routing/localchans/manager.go | 138 +++++++++++++++++++++++++++-- routing/localchans/manager_test.go | 6 ++ rpcserver.go | 2 +- server.go | 5 ++ 6 files changed, 177 insertions(+), 7 deletions(-) create mode 100644 routing/localchans/log.go diff --git a/log.go b/log.go index c88208ef3..343d86f38 100644 --- a/log.go +++ b/log.go @@ -46,6 +46,7 @@ import ( "github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing/blindedpath" + "github.com/lightningnetwork/lnd/routing/localchans" "github.com/lightningnetwork/lnd/rpcperms" "github.com/lightningnetwork/lnd/signal" "github.com/lightningnetwork/lnd/sweep" @@ -172,6 +173,7 @@ func SetupLoggers(root *build.SubLoggerManager, interceptor signal.Interceptor) AddSubLogger(root, "CHFD", interceptor, chanfunding.UseLogger) AddSubLogger(root, "PEER", interceptor, peer.UseLogger) AddSubLogger(root, "CHCL", interceptor, chancloser.UseLogger) + AddSubLogger(root, "LCHN", interceptor, localchans.UseLogger) AddSubLogger(root, routing.Subsystem, interceptor, routing.UseLogger) AddSubLogger(root, routerrpc.Subsystem, interceptor, routerrpc.UseLogger) diff --git a/routing/localchans/log.go b/routing/localchans/log.go new file mode 100644 index 000000000..fdd233c04 --- /dev/null +++ b/routing/localchans/log.go @@ -0,0 +1,31 @@ +package localchans + +import ( + "github.com/btcsuite/btclog/v2" + "github.com/lightningnetwork/lnd/build" +) + +// log is a logger that is initialized with no output filters. This means the +// package will not perform any logging by default until the caller requests +// it. +var log btclog.Logger + +const Subsystem = "LCHN" + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger(Subsystem, nil)) +} + +// DisableLog disables all library log output. Logging output is disabled by +// default until UseLogger is called. +func DisableLog() { + UseLogger(btclog.Disabled) +} + +// UseLogger uses a specified Logger to output package logging info. This +// should be used in preference to SetLogWriter if the caller is also using +// btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/routing/localchans/manager.go b/routing/localchans/manager.go index f0f9b88de..25df1002a 100644 --- a/routing/localchans/manager.go +++ b/routing/localchans/manager.go @@ -1,10 +1,13 @@ package localchans import ( + "bytes" "errors" "fmt" "sync" + "time" + "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb/models" @@ -19,6 +22,12 @@ import ( // Manager manages the node's local channels. The only operation that is // currently implemented is updating forwarding policies. type Manager struct { + // SelfPub contains the public key of the local node. + SelfPub *btcec.PublicKey + + // DefaultRoutingPolicy is the default routing policy. + DefaultRoutingPolicy models.ForwardingPolicy + // UpdateForwardingPolicies is used by the manager to update active // links with a new policy. UpdateForwardingPolicies func( @@ -40,6 +49,9 @@ type Manager struct { FetchChannel func(tx kvdb.RTx, chanPoint wire.OutPoint) ( *channeldb.OpenChannel, error) + // AddEdge is used to add edge/channel to the topology of the router. + AddEdge func(edge *models.ChannelEdgeInfo) error + // policyUpdateLock ensures that the database and the link do not fall // out of sync if there are concurrent fee update calls. Without it, // there is a chance that policy A updates the database, then policy B @@ -51,7 +63,8 @@ type Manager struct { // UpdatePolicy updates the policy for the specified channels on disk and in // the active links. func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, - chanPoints ...wire.OutPoint) ([]*lnrpc.FailedUpdate, error) { + createMissingEdge bool, chanPoints ...wire.OutPoint) ( + []*lnrpc.FailedUpdate, error) { r.policyUpdateLock.Lock() defer r.policyUpdateLock.Unlock() @@ -69,10 +82,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, var edgesToUpdate []discovery.EdgeWithInfo policiesToUpdate := make(map[wire.OutPoint]models.ForwardingPolicy) - // Next, we'll loop over all the outgoing channels the router knows of. - // If we have a filter then we'll only collected those channels, - // otherwise we'll collect them all. - err := r.ForAllOutgoingChannels(func( + processChan := func( tx kvdb.RTx, info *models.ChannelEdgeInfo, edge *models.ChannelEdgePolicy) error { @@ -125,7 +135,12 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, } return nil - }) + } + + // Next, we'll loop over all the outgoing channels the router knows of. + // If we have a filter then we'll only collect those channels, otherwise + // we'll collect them all. + err := r.ForAllOutgoingChannels(processChan) if err != nil { return nil, err } @@ -155,7 +170,56 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, "not yet confirmed", )) + case createMissingEdge: + // If the edge was not found, but the channel is found, + // that means the edge is missing in the graph database + // and should be recreated. The edge and policy are + // created in-memory. The edge is inserted in createEdge + // below and the policy will be added to the graph in + // the PropagateChanPolicyUpdate call below. + log.Warnf("Missing edge for active channel (%s) "+ + "during policy update. Recreating edge with "+ + "default policy.", + channel.FundingOutpoint.String()) + + info, edge, err := r.createEdge(channel, time.Now()) + if err != nil { + log.Errorf("Failed to recreate missing edge "+ + "for channel (%s): %v", + channel.FundingOutpoint.String(), err) + + f := lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN + failedUpdates = append(failedUpdates, + makeFailureItem(chanPoint, f, + "could not update policies", + )) + } + + // Insert the edge into the database to avoid `edge not + // found` errors during policy update propagation. + err = r.AddEdge(info) + if err != nil { + log.Warnf("Attempt to add missing edge for "+ + "channel (%s) errored with: %v", + channel.FundingOutpoint.String(), err) + + f := lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN + failedUpdates = append(failedUpdates, + makeFailureItem(chanPoint, f, + "could not add edge", + )) + } + + err = processChan(nil, info, edge) + if err != nil { + return nil, err + } + default: + log.Warnf("Missing edge for active channel (%s) "+ + "during policy update. Could not update "+ + "policy.", channel.FundingOutpoint.String()) + failedUpdates = append(failedUpdates, makeFailureItem(chanPoint, lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN, @@ -180,6 +244,68 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, return failedUpdates, nil } +// createEdge recreates an edge and policy from an open channel in-memory. +func (r *Manager) createEdge(channel *channeldb.OpenChannel, + timestamp time.Time) (*models.ChannelEdgeInfo, + *models.ChannelEdgePolicy, error) { + + nodeKey1Bytes := r.SelfPub.SerializeCompressed() + nodeKey2Bytes := channel.IdentityPub.SerializeCompressed() + bitcoinKey1Bytes := channel.LocalChanCfg.MultiSigKey.PubKey. + SerializeCompressed() + bitcoinKey2Bytes := channel.RemoteChanCfg.MultiSigKey.PubKey. + SerializeCompressed() + channelFlags := lnwire.ChanUpdateChanFlags(0) + + // Make it such that node_id_1 is the lexicographically-lesser of the + // two compressed keys sorted in ascending lexicographic order. + if bytes.Compare(nodeKey2Bytes, nodeKey1Bytes) < 0 { + nodeKey1Bytes, nodeKey2Bytes = nodeKey2Bytes, nodeKey1Bytes + bitcoinKey1Bytes, bitcoinKey2Bytes = bitcoinKey2Bytes, + bitcoinKey1Bytes + channelFlags = 1 + } + + var featureBuf bytes.Buffer + err := lnwire.NewRawFeatureVector().Encode(&featureBuf) + if err != nil { + return nil, nil, fmt.Errorf("unable to encode features: %w", + err) + } + + info := &models.ChannelEdgeInfo{ + ChannelID: channel.ShortChanID().ToUint64(), + ChainHash: channel.ChainHash, + Features: featureBuf.Bytes(), + Capacity: channel.Capacity, + ChannelPoint: channel.FundingOutpoint, + } + + copy(info.NodeKey1Bytes[:], nodeKey1Bytes) + copy(info.NodeKey2Bytes[:], nodeKey2Bytes) + copy(info.BitcoinKey1Bytes[:], bitcoinKey1Bytes) + copy(info.BitcoinKey2Bytes[:], bitcoinKey2Bytes) + + // Construct a dummy channel edge policy with default values that will + // be updated with the new values in the call to processChan below. + timeLockDelta := uint16(r.DefaultRoutingPolicy.TimeLockDelta) + edge := &models.ChannelEdgePolicy{ + ChannelID: channel.ShortChanID().ToUint64(), + LastUpdate: timestamp, + TimeLockDelta: timeLockDelta, + ChannelFlags: channelFlags, + MessageFlags: lnwire.ChanUpdateRequiredMaxHtlc, + FeeBaseMSat: r.DefaultRoutingPolicy.BaseFee, + FeeProportionalMillionths: r.DefaultRoutingPolicy.FeeRate, + MinHTLC: r.DefaultRoutingPolicy.MinHTLCOut, + MaxHTLC: r.DefaultRoutingPolicy.MaxHTLC, + } + + copy(edge.ToNode[:], channel.IdentityPub.SerializeCompressed()) + + return info, edge, nil +} + // updateEdge updates the given edge with the new schema. func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint, edge *models.ChannelEdgePolicy, diff --git a/routing/localchans/manager_test.go b/routing/localchans/manager_test.go index 7594eef04..a60d782c8 100644 --- a/routing/localchans/manager_test.go +++ b/routing/localchans/manager_test.go @@ -156,6 +156,7 @@ func TestManager(t *testing.T) { newPolicy routing.ChannelPolicy channelSet []channel specifiedChanPoints []wire.OutPoint + createMissingEdge bool expectedNumUpdates int expectedUpdateFailures []lnrpc.UpdateFailure expectErr error @@ -173,6 +174,7 @@ func TestManager(t *testing.T) { }, }, specifiedChanPoints: []wire.OutPoint{chanPointValid}, + createMissingEdge: false, expectedNumUpdates: 1, expectedUpdateFailures: []lnrpc.UpdateFailure{}, expectErr: nil, @@ -190,6 +192,7 @@ func TestManager(t *testing.T) { }, }, specifiedChanPoints: []wire.OutPoint{}, + createMissingEdge: false, expectedNumUpdates: 1, expectedUpdateFailures: []lnrpc.UpdateFailure{}, expectErr: nil, @@ -207,6 +210,7 @@ func TestManager(t *testing.T) { }, }, specifiedChanPoints: []wire.OutPoint{chanPointMissing}, + createMissingEdge: false, expectedNumUpdates: 0, expectedUpdateFailures: []lnrpc.UpdateFailure{ lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND, @@ -228,6 +232,7 @@ func TestManager(t *testing.T) { }, }, specifiedChanPoints: []wire.OutPoint{chanPointValid}, + createMissingEdge: false, expectedNumUpdates: 1, expectedUpdateFailures: []lnrpc.UpdateFailure{}, expectErr: nil, @@ -242,6 +247,7 @@ func TestManager(t *testing.T) { expectedNumUpdates = test.expectedNumUpdates failedUpdates, err := manager.UpdatePolicy(test.newPolicy, + test.createMissingEdge, test.specifiedChanPoints...) if len(failedUpdates) != len(test.expectedUpdateFailures) { diff --git a/rpcserver.go b/rpcserver.go index 43d922c38..cace7fb94 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -7762,7 +7762,7 @@ func (r *rpcServer) UpdateChannelPolicy(ctx context.Context, // With the scope resolved, we'll now send this to the local channel // manager so it can propagate the new policy for our target channel(s). failedUpdates, err := r.server.localChanMgr.UpdatePolicy(chanPolicy, - targetChans...) + req.CreateMissingEdge, targetChans...) if err != nil { return nil, err } diff --git a/server.go b/server.go index 73456735a..105484c20 100644 --- a/server.go +++ b/server.go @@ -1112,10 +1112,15 @@ func newServer(cfg *Config, listenAddrs []net.Addr, //nolint:lll s.localChanMgr = &localchans.Manager{ + SelfPub: nodeKeyDesc.PubKey, + DefaultRoutingPolicy: cc.RoutingPolicy, ForAllOutgoingChannels: s.graphBuilder.ForAllOutgoingChannels, PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate, UpdateForwardingPolicies: s.htlcSwitch.UpdateForwardingPolicies, FetchChannel: s.chanStateDB.FetchChannel, + AddEdge: func(edge *models.ChannelEdgeInfo) error { + return s.graphBuilder.AddEdge(edge) + }, } utxnStore, err := contractcourt.NewNurseryStore(