localchans: recreate missing edge if not found

If a node contains a channel, but doesn't have a corresponding edge in
the graph database, updating the channel policy would fail. In this
commit the edge is recreated if the channel exists. This ensures a node
can recover from a missing edge in the graph database by calling
updatechanpolicy.
This commit is contained in:
Jesse de Wit 2024-06-03 12:08:29 +02:00 committed by Oliver Gugger
parent 9609aa332e
commit 8cc78b371d
No known key found for this signature in database
GPG key ID: 8E4256593F177720
6 changed files with 177 additions and 7 deletions

2
log.go
View file

@ -44,6 +44,7 @@ import (
"github.com/lightningnetwork/lnd/peernotifier" "github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/blindedpath" "github.com/lightningnetwork/lnd/routing/blindedpath"
"github.com/lightningnetwork/lnd/routing/localchans"
"github.com/lightningnetwork/lnd/rpcperms" "github.com/lightningnetwork/lnd/rpcperms"
"github.com/lightningnetwork/lnd/signal" "github.com/lightningnetwork/lnd/signal"
"github.com/lightningnetwork/lnd/sweep" "github.com/lightningnetwork/lnd/sweep"
@ -167,6 +168,7 @@ func SetupLoggers(root *build.RotatingLogWriter, interceptor signal.Interceptor)
AddSubLogger(root, "CHFD", interceptor, chanfunding.UseLogger) AddSubLogger(root, "CHFD", interceptor, chanfunding.UseLogger)
AddSubLogger(root, "PEER", interceptor, peer.UseLogger) AddSubLogger(root, "PEER", interceptor, peer.UseLogger)
AddSubLogger(root, "CHCL", interceptor, chancloser.UseLogger) AddSubLogger(root, "CHCL", interceptor, chancloser.UseLogger)
AddSubLogger(root, "LCHN", interceptor, localchans.UseLogger)
AddSubLogger(root, routing.Subsystem, interceptor, routing.UseLogger) AddSubLogger(root, routing.Subsystem, interceptor, routing.UseLogger)
AddSubLogger(root, routerrpc.Subsystem, interceptor, routerrpc.UseLogger) AddSubLogger(root, routerrpc.Subsystem, interceptor, routerrpc.UseLogger)

31
routing/localchans/log.go Normal file
View file

@ -0,0 +1,31 @@
package localchans
import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)
// log is a logger that is initialized with no output filters. This means the
// package will not perform any logging by default until the caller requests
// it.
var log btclog.Logger
const Subsystem = "LCHN"
// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger(Subsystem, nil))
}
// DisableLog disables all library log output. Logging output is disabled by
// default until UseLogger is called.
func DisableLog() {
UseLogger(btclog.Disabled)
}
// UseLogger uses a specified Logger to output package logging info. This
// should be used in preference to SetLogWriter if the caller is also using
// btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}

View file

@ -1,10 +1,13 @@
package localchans package localchans
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/models" "github.com/lightningnetwork/lnd/channeldb/models"
@ -19,6 +22,12 @@ import (
// Manager manages the node's local channels. The only operation that is // Manager manages the node's local channels. The only operation that is
// currently implemented is updating forwarding policies. // currently implemented is updating forwarding policies.
type Manager struct { type Manager struct {
// SelfPub contains the public key of the local node.
SelfPub *btcec.PublicKey
// DefaultRoutingPolicy is the default routing policy.
DefaultRoutingPolicy models.ForwardingPolicy
// UpdateForwardingPolicies is used by the manager to update active // UpdateForwardingPolicies is used by the manager to update active
// links with a new policy. // links with a new policy.
UpdateForwardingPolicies func( UpdateForwardingPolicies func(
@ -40,6 +49,9 @@ type Manager struct {
FetchChannel func(tx kvdb.RTx, chanPoint wire.OutPoint) ( FetchChannel func(tx kvdb.RTx, chanPoint wire.OutPoint) (
*channeldb.OpenChannel, error) *channeldb.OpenChannel, error)
// AddEdge is used to add edge/channel to the topology of the router.
AddEdge func(edge *models.ChannelEdgeInfo) error
// policyUpdateLock ensures that the database and the link do not fall // policyUpdateLock ensures that the database and the link do not fall
// out of sync if there are concurrent fee update calls. Without it, // out of sync if there are concurrent fee update calls. Without it,
// there is a chance that policy A updates the database, then policy B // there is a chance that policy A updates the database, then policy B
@ -51,7 +63,8 @@ type Manager struct {
// UpdatePolicy updates the policy for the specified channels on disk and in // UpdatePolicy updates the policy for the specified channels on disk and in
// the active links. // the active links.
func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy, func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
chanPoints ...wire.OutPoint) ([]*lnrpc.FailedUpdate, error) { createMissingEdge bool, chanPoints ...wire.OutPoint) (
[]*lnrpc.FailedUpdate, error) {
r.policyUpdateLock.Lock() r.policyUpdateLock.Lock()
defer r.policyUpdateLock.Unlock() defer r.policyUpdateLock.Unlock()
@ -69,10 +82,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
var edgesToUpdate []discovery.EdgeWithInfo var edgesToUpdate []discovery.EdgeWithInfo
policiesToUpdate := make(map[wire.OutPoint]models.ForwardingPolicy) policiesToUpdate := make(map[wire.OutPoint]models.ForwardingPolicy)
// Next, we'll loop over all the outgoing channels the router knows of. processChan := func(
// If we have a filter then we'll only collected those channels,
// otherwise we'll collect them all.
err := r.ForAllOutgoingChannels(func(
tx kvdb.RTx, tx kvdb.RTx,
info *models.ChannelEdgeInfo, info *models.ChannelEdgeInfo,
edge *models.ChannelEdgePolicy) error { edge *models.ChannelEdgePolicy) error {
@ -125,7 +135,12 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
} }
return nil return nil
}) }
// Next, we'll loop over all the outgoing channels the router knows of.
// If we have a filter then we'll only collect those channels, otherwise
// we'll collect them all.
err := r.ForAllOutgoingChannels(processChan)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -155,7 +170,56 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
"not yet confirmed", "not yet confirmed",
)) ))
case createMissingEdge:
// If the edge was not found, but the channel is found,
// that means the edge is missing in the graph database
// and should be recreated. The edge and policy are
// created in-memory. The edge is inserted in createEdge
// below and the policy will be added to the graph in
// the PropagateChanPolicyUpdate call below.
log.Warnf("Missing edge for active channel (%s) "+
"during policy update. Recreating edge with "+
"default policy.",
channel.FundingOutpoint.String())
info, edge, err := r.createEdge(channel, time.Now())
if err != nil {
log.Errorf("Failed to recreate missing edge "+
"for channel (%s): %v",
channel.FundingOutpoint.String(), err)
f := lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN
failedUpdates = append(failedUpdates,
makeFailureItem(chanPoint, f,
"could not update policies",
))
}
// Insert the edge into the database to avoid `edge not
// found` errors during policy update propagation.
err = r.AddEdge(info)
if err != nil {
log.Warnf("Attempt to add missing edge for "+
"channel (%s) errored with: %v",
channel.FundingOutpoint.String(), err)
f := lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN
failedUpdates = append(failedUpdates,
makeFailureItem(chanPoint, f,
"could not add edge",
))
}
err = processChan(nil, info, edge)
if err != nil {
return nil, err
}
default: default:
log.Warnf("Missing edge for active channel (%s) "+
"during policy update. Could not update "+
"policy.", channel.FundingOutpoint.String())
failedUpdates = append(failedUpdates, failedUpdates = append(failedUpdates,
makeFailureItem(chanPoint, makeFailureItem(chanPoint,
lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN, lnrpc.UpdateFailure_UPDATE_FAILURE_UNKNOWN,
@ -180,6 +244,68 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
return failedUpdates, nil return failedUpdates, nil
} }
// createEdge recreates an edge and policy from an open channel in-memory.
func (r *Manager) createEdge(channel *channeldb.OpenChannel,
timestamp time.Time) (*models.ChannelEdgeInfo,
*models.ChannelEdgePolicy, error) {
nodeKey1Bytes := r.SelfPub.SerializeCompressed()
nodeKey2Bytes := channel.IdentityPub.SerializeCompressed()
bitcoinKey1Bytes := channel.LocalChanCfg.MultiSigKey.PubKey.
SerializeCompressed()
bitcoinKey2Bytes := channel.RemoteChanCfg.MultiSigKey.PubKey.
SerializeCompressed()
channelFlags := lnwire.ChanUpdateChanFlags(0)
// Make it such that node_id_1 is the lexicographically-lesser of the
// two compressed keys sorted in ascending lexicographic order.
if bytes.Compare(nodeKey2Bytes, nodeKey1Bytes) < 0 {
nodeKey1Bytes, nodeKey2Bytes = nodeKey2Bytes, nodeKey1Bytes
bitcoinKey1Bytes, bitcoinKey2Bytes = bitcoinKey2Bytes,
bitcoinKey1Bytes
channelFlags = 1
}
var featureBuf bytes.Buffer
err := lnwire.NewRawFeatureVector().Encode(&featureBuf)
if err != nil {
return nil, nil, fmt.Errorf("unable to encode features: %w",
err)
}
info := &models.ChannelEdgeInfo{
ChannelID: channel.ShortChanID().ToUint64(),
ChainHash: channel.ChainHash,
Features: featureBuf.Bytes(),
Capacity: channel.Capacity,
ChannelPoint: channel.FundingOutpoint,
}
copy(info.NodeKey1Bytes[:], nodeKey1Bytes)
copy(info.NodeKey2Bytes[:], nodeKey2Bytes)
copy(info.BitcoinKey1Bytes[:], bitcoinKey1Bytes)
copy(info.BitcoinKey2Bytes[:], bitcoinKey2Bytes)
// Construct a dummy channel edge policy with default values that will
// be updated with the new values in the call to processChan below.
timeLockDelta := uint16(r.DefaultRoutingPolicy.TimeLockDelta)
edge := &models.ChannelEdgePolicy{
ChannelID: channel.ShortChanID().ToUint64(),
LastUpdate: timestamp,
TimeLockDelta: timeLockDelta,
ChannelFlags: channelFlags,
MessageFlags: lnwire.ChanUpdateRequiredMaxHtlc,
FeeBaseMSat: r.DefaultRoutingPolicy.BaseFee,
FeeProportionalMillionths: r.DefaultRoutingPolicy.FeeRate,
MinHTLC: r.DefaultRoutingPolicy.MinHTLCOut,
MaxHTLC: r.DefaultRoutingPolicy.MaxHTLC,
}
copy(edge.ToNode[:], channel.IdentityPub.SerializeCompressed())
return info, edge, nil
}
// updateEdge updates the given edge with the new schema. // updateEdge updates the given edge with the new schema.
func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint, func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint,
edge *models.ChannelEdgePolicy, edge *models.ChannelEdgePolicy,

View file

@ -156,6 +156,7 @@ func TestManager(t *testing.T) {
newPolicy routing.ChannelPolicy newPolicy routing.ChannelPolicy
channelSet []channel channelSet []channel
specifiedChanPoints []wire.OutPoint specifiedChanPoints []wire.OutPoint
createMissingEdge bool
expectedNumUpdates int expectedNumUpdates int
expectedUpdateFailures []lnrpc.UpdateFailure expectedUpdateFailures []lnrpc.UpdateFailure
expectErr error expectErr error
@ -173,6 +174,7 @@ func TestManager(t *testing.T) {
}, },
}, },
specifiedChanPoints: []wire.OutPoint{chanPointValid}, specifiedChanPoints: []wire.OutPoint{chanPointValid},
createMissingEdge: false,
expectedNumUpdates: 1, expectedNumUpdates: 1,
expectedUpdateFailures: []lnrpc.UpdateFailure{}, expectedUpdateFailures: []lnrpc.UpdateFailure{},
expectErr: nil, expectErr: nil,
@ -190,6 +192,7 @@ func TestManager(t *testing.T) {
}, },
}, },
specifiedChanPoints: []wire.OutPoint{}, specifiedChanPoints: []wire.OutPoint{},
createMissingEdge: false,
expectedNumUpdates: 1, expectedNumUpdates: 1,
expectedUpdateFailures: []lnrpc.UpdateFailure{}, expectedUpdateFailures: []lnrpc.UpdateFailure{},
expectErr: nil, expectErr: nil,
@ -207,6 +210,7 @@ func TestManager(t *testing.T) {
}, },
}, },
specifiedChanPoints: []wire.OutPoint{chanPointMissing}, specifiedChanPoints: []wire.OutPoint{chanPointMissing},
createMissingEdge: false,
expectedNumUpdates: 0, expectedNumUpdates: 0,
expectedUpdateFailures: []lnrpc.UpdateFailure{ expectedUpdateFailures: []lnrpc.UpdateFailure{
lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND, lnrpc.UpdateFailure_UPDATE_FAILURE_NOT_FOUND,
@ -228,6 +232,7 @@ func TestManager(t *testing.T) {
}, },
}, },
specifiedChanPoints: []wire.OutPoint{chanPointValid}, specifiedChanPoints: []wire.OutPoint{chanPointValid},
createMissingEdge: false,
expectedNumUpdates: 1, expectedNumUpdates: 1,
expectedUpdateFailures: []lnrpc.UpdateFailure{}, expectedUpdateFailures: []lnrpc.UpdateFailure{},
expectErr: nil, expectErr: nil,
@ -242,6 +247,7 @@ func TestManager(t *testing.T) {
expectedNumUpdates = test.expectedNumUpdates expectedNumUpdates = test.expectedNumUpdates
failedUpdates, err := manager.UpdatePolicy(test.newPolicy, failedUpdates, err := manager.UpdatePolicy(test.newPolicy,
test.createMissingEdge,
test.specifiedChanPoints...) test.specifiedChanPoints...)
if len(failedUpdates) != len(test.expectedUpdateFailures) { if len(failedUpdates) != len(test.expectedUpdateFailures) {

View file

@ -7742,7 +7742,7 @@ func (r *rpcServer) UpdateChannelPolicy(ctx context.Context,
// With the scope resolved, we'll now send this to the local channel // With the scope resolved, we'll now send this to the local channel
// manager so it can propagate the new policy for our target channel(s). // manager so it can propagate the new policy for our target channel(s).
failedUpdates, err := r.server.localChanMgr.UpdatePolicy(chanPolicy, failedUpdates, err := r.server.localChanMgr.UpdatePolicy(chanPolicy,
targetChans...) req.CreateMissingEdge, targetChans...)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -1100,10 +1100,15 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
//nolint:lll //nolint:lll
s.localChanMgr = &localchans.Manager{ s.localChanMgr = &localchans.Manager{
SelfPub: nodeKeyDesc.PubKey,
DefaultRoutingPolicy: cc.RoutingPolicy,
ForAllOutgoingChannels: s.graphBuilder.ForAllOutgoingChannels, ForAllOutgoingChannels: s.graphBuilder.ForAllOutgoingChannels,
PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate, PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate,
UpdateForwardingPolicies: s.htlcSwitch.UpdateForwardingPolicies, UpdateForwardingPolicies: s.htlcSwitch.UpdateForwardingPolicies,
FetchChannel: s.chanStateDB.FetchChannel, FetchChannel: s.chanStateDB.FetchChannel,
AddEdge: func(edge *models.ChannelEdgeInfo) error {
return s.graphBuilder.AddEdge(edge)
},
} }
utxnStore, err := contractcourt.NewNurseryStore( utxnStore, err := contractcourt.NewNurseryStore(