From ce4cd5dfc61565c1db01d3eaab357d43da3e5ee3 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 6 Mar 2025 12:23:46 +0200 Subject: [PATCH 1/4] graph/db: adjust TestPartialNode The test as it stands today does not make sense as it adds a Partial/Shell node to the graph via AddLightningNode which will never happen since this is only ever triggered by the gossiper which only calls the method with a full node announcement. Shell/Partial nodes are only ever added via AddChannelEdge which will insert a partial node if we are adding a channel edge which has node pub keys that we dont have a node entry for. So we adjust the test to use this more accurate flow. --- graph/db/graph_test.go | 68 +++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index dc41f02a2..87214fc77 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -155,62 +155,70 @@ func TestNodeInsertionAndDeletion(t *testing.T) { } // TestPartialNode checks that we can add and retrieve a LightningNode where -// where only the pubkey is known to the database. +// only the pubkey is known to the database. func TestPartialNode(t *testing.T) { t.Parallel() graph, err := MakeTestGraph(t) require.NoError(t, err, "unable to make test database") - // We want to be able to insert nodes into the graph that only has the - // PubKey set. - node := &models.LightningNode{ - HaveNodeAnnouncement: false, - PubKeyBytes: testPub, - } + // To insert a partial node, we need to add a channel edge that has + // node keys for nodes we are not yet aware + var node1, node2 models.LightningNode + copy(node1.PubKeyBytes[:], pubKey1Bytes) + copy(node2.PubKeyBytes[:], pubKey2Bytes) - if err := graph.AddLightningNode(node); err != nil { - t.Fatalf("unable to add node: %v", err) - } - assertNodeInCache(t, graph, node, nil) + // Create an edge attached to these nodes and add it to the graph. + edgeInfo, _ := createEdge(140, 0, 0, 0, &node1, &node2) + require.NoError(t, graph.AddChannelEdge(&edgeInfo)) - // Next, fetch the node from the database to ensure everything was + // Both of the nodes should now be in both the graph (as partial/shell) + // nodes _and_ the cache should also have an awareness of both nodes. + assertNodeInCache(t, graph, &node1, nil) + assertNodeInCache(t, graph, &node2, nil) + + // Next, fetch the node2 from the database to ensure everything was // serialized properly. - dbNode, err := graph.FetchLightningNode(testPub) - require.NoError(t, err, "unable to locate node") + dbNode1, err := graph.FetchLightningNode(pubKey1) + require.NoError(t, err) + dbNode2, err := graph.FetchLightningNode(pubKey2) + require.NoError(t, err) - _, exists, err := graph.HasLightningNode(dbNode.PubKeyBytes) - if err != nil { - t.Fatalf("unable to query for node: %v", err) - } else if !exists { - t.Fatalf("node should be found but wasn't") - } + _, exists, err := graph.HasLightningNode(dbNode1.PubKeyBytes) + require.NoError(t, err) + require.True(t, exists) // The two nodes should match exactly! (with default values for // LastUpdate and db set to satisfy compareNodes()) - node = &models.LightningNode{ + expectedNode1 := &models.LightningNode{ HaveNodeAnnouncement: false, LastUpdate: time.Unix(0, 0), - PubKeyBytes: testPub, + PubKeyBytes: pubKey1, } + require.NoError(t, compareNodes(dbNode1, expectedNode1)) - if err := compareNodes(node, dbNode); err != nil { - t.Fatalf("nodes don't match: %v", err) + _, exists, err = graph.HasLightningNode(dbNode2.PubKeyBytes) + require.NoError(t, err) + require.True(t, exists) + + // The two nodes should match exactly! (with default values for + // LastUpdate and db set to satisfy compareNodes()) + expectedNode2 := &models.LightningNode{ + HaveNodeAnnouncement: false, + LastUpdate: time.Unix(0, 0), + PubKeyBytes: pubKey2, } + require.NoError(t, compareNodes(dbNode2, expectedNode2)) // Next, delete the node from the graph, this should purge all data // related to the node. - if err := graph.DeleteLightningNode(testPub); err != nil { - t.Fatalf("unable to delete node: %v", err) - } + require.NoError(t, graph.DeleteLightningNode(pubKey1)) assertNodeNotInCache(t, graph, testPub) // Finally, attempt to fetch the node again. This should fail as the // node should have been deleted from the database. _, err = graph.FetchLightningNode(testPub) - if err != ErrGraphNodeNotFound { - t.Fatalf("fetch after delete should fail!") - } + require.ErrorIs(t, err, ErrGraphNodeNotFound) } func TestAliasLookup(t *testing.T) { From fa4cfc82d8aa2a5e3531505fd3f57c50aed5f8b2 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 19 Feb 2025 08:48:30 -0300 Subject: [PATCH 2/4] graph/db: move Topology client management to ChannelGraph We plan to later on add an option for a remote graph source which will be managed from the ChannelGraph. In such a set-up, a node would rely on the remote graph source for graph updates instead of from gossip sync. In this scenario, however, our topology subscription logic should still notify clients of all updates and so it makes more sense to have the logic as part of the ChannelGraph so that we can send updates we receive from the remote graph. --- autopilot/manager.go | 4 +- graph/builder.go | 117 ++-------------------------- graph/db/graph.go | 130 +++++++++++++++++++++++++++++--- graph/db/graph_test.go | 20 +++++ graph/{ => db}/notifications.go | 59 ++++++++++----- graph/notifications_test.go | 24 +++--- pilot.go | 2 +- rpcserver.go | 9 +-- server.go | 2 +- 9 files changed, 209 insertions(+), 158 deletions(-) rename graph/{ => db}/notifications.go (91%) diff --git a/autopilot/manager.go b/autopilot/manager.go index dba4cc6cc..0463f98d9 100644 --- a/autopilot/manager.go +++ b/autopilot/manager.go @@ -6,7 +6,7 @@ import ( "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/wire" - "github.com/lightningnetwork/lnd/graph" + graphdb "github.com/lightningnetwork/lnd/graph/db" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" ) @@ -36,7 +36,7 @@ type ManagerCfg struct { // SubscribeTopology is used to get a subscription for topology changes // on the network. - SubscribeTopology func() (*graph.TopologyClient, error) + SubscribeTopology func() (*graphdb.TopologyClient, error) } // Manager is struct that manages an autopilot agent, making it possible to diff --git a/graph/builder.go b/graph/builder.go index 3e1115553..f92b523b0 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -109,8 +109,7 @@ type Builder struct { started atomic.Bool stopped atomic.Bool - ntfnClientCounter atomic.Uint64 - bestHeight atomic.Uint32 + bestHeight atomic.Uint32 cfg *Config @@ -123,22 +122,6 @@ type Builder struct { // of our currently known best chain are sent over. staleBlocks <-chan *chainview.FilteredBlock - // topologyUpdates is a channel that carries new topology updates - // messages from outside the Builder to be processed by the - // networkHandler. - topologyUpdates chan any - - // topologyClients maps a client's unique notification ID to a - // topologyClient client that contains its notification dispatch - // channel. - topologyClients *lnutils.SyncMap[uint64, *topologyClient] - - // ntfnClientUpdates is a channel that's used to send new updates to - // topology notification clients to the Builder. Updates either - // add a new notification client, or cancel notifications for an - // existing client. - ntfnClientUpdates chan *topologyClientUpdate - // channelEdgeMtx is a mutex we use to make sure we process only one // ChannelEdgePolicy at a time for a given channelID, to ensure // consistency between the various database accesses. @@ -163,14 +146,11 @@ var _ ChannelGraphSource = (*Builder)(nil) // NewBuilder constructs a new Builder. func NewBuilder(cfg *Config) (*Builder, error) { return &Builder{ - cfg: cfg, - topologyUpdates: make(chan any), - topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{}, - ntfnClientUpdates: make(chan *topologyClientUpdate), - channelEdgeMtx: multimutex.NewMutex[uint64](), - statTicker: ticker.New(defaultStatInterval), - stats: new(builderStats), - quit: make(chan struct{}), + cfg: cfg, + channelEdgeMtx: multimutex.NewMutex[uint64](), + statTicker: ticker.New(defaultStatInterval), + stats: new(builderStats), + quit: make(chan struct{}), }, nil } @@ -656,28 +636,6 @@ func (b *Builder) pruneZombieChans() error { return nil } -// handleTopologyUpdate is responsible for sending any topology changes -// notifications to registered clients. -// -// NOTE: must be run inside goroutine. -func (b *Builder) handleTopologyUpdate(update any) { - defer b.wg.Done() - - topChange := &TopologyChange{} - err := addToTopologyChange(b.cfg.Graph, topChange, update) - if err != nil { - log.Errorf("unable to update topology change notification: %v", - err) - return - } - - if topChange.isEmpty() { - return - } - - b.notifyTopologyChange(topChange) -} - // networkHandler is the primary goroutine for the Builder. The roles of // this goroutine include answering queries related to the state of the // network, pruning the graph on new block notification, applying network @@ -701,16 +659,6 @@ func (b *Builder) networkHandler() { } select { - // A new fully validated topology update has just arrived. - // We'll notify any registered clients. - case update := <-b.topologyUpdates: - b.wg.Add(1) - go b.handleTopologyUpdate(update) - - // TODO(roasbeef): remove all unconnected vertexes - // after N blocks pass with no corresponding - // announcements. - case chainUpdate, ok := <-b.staleBlocks: // If the channel has been closed, then this indicates // the daemon is shutting down, so we exit ourselves. @@ -783,31 +731,6 @@ func (b *Builder) networkHandler() { " processed.", chainUpdate.Height) } - // A new notification client update has arrived. We're either - // gaining a new client, or cancelling notifications for an - // existing client. - case ntfnUpdate := <-b.ntfnClientUpdates: - clientID := ntfnUpdate.clientID - - if ntfnUpdate.cancel { - client, ok := b.topologyClients.LoadAndDelete( - clientID, - ) - if ok { - close(client.exit) - client.wg.Wait() - - close(client.ntfnChan) - } - - continue - } - - b.topologyClients.Store(clientID, &topologyClient{ - ntfnChan: ntfnUpdate.ntfnChan, - exit: make(chan struct{}), - }) - // The graph prune ticker has ticked, so we'll examine the // state of the known graph to filter out any zombie channels // for pruning. @@ -934,16 +857,6 @@ func (b *Builder) updateGraphWithClosedChannels( log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash, blockHeight, len(chansClosed)) - if len(chansClosed) == 0 { - return err - } - - // Notify all currently registered clients of the newly closed channels. - closeSummaries := createCloseSummaries(blockHeight, chansClosed...) - b.notifyTopologyChange(&TopologyChange{ - ClosedChannels: closeSummaries, - }) - return nil } @@ -1067,12 +980,6 @@ func (b *Builder) AddNode(node *models.LightningNode, return err } - select { - case b.topologyUpdates <- node: - case <-b.quit: - return ErrGraphBuilderShuttingDown - } - return nil } @@ -1117,12 +1024,6 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo, return err } - select { - case b.topologyUpdates <- edge: - case <-b.quit: - return ErrGraphBuilderShuttingDown - } - return nil } @@ -1224,12 +1125,6 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy, return err } - select { - case b.topologyUpdates <- update: - case <-b.quit: - return ErrGraphBuilderShuttingDown - } - return nil } diff --git a/graph/db/graph.go b/graph/db/graph.go index b9891ffc2..fa8a6c29c 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -12,10 +12,15 @@ import ( "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/lnutils" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) +// ErrChanGraphShuttingDown indicates that the ChannelGraph has shutdown or is +// busy shutting down. +var ErrChanGraphShuttingDown = fmt.Errorf("ChannelGraph shutting down") + // Config is a struct that holds all the necessary dependencies for a // ChannelGraph. type Config struct { @@ -46,6 +51,26 @@ type ChannelGraph struct { *KVStore + // ntfnClientCounter is an atomic counter that's used to assign unique + // notification client IDs to new clients. + ntfnClientCounter atomic.Uint64 + + // topologyUpdate is a channel that carries new topology updates + // messages from outside the ChannelGraph to be processed by the + // networkHandler. + topologyUpdate chan any + + // topologyClients maps a client's unique notification ID to a + // topologyClient client that contains its notification dispatch + // channel. + topologyClients *lnutils.SyncMap[uint64, *topologyClient] + + // ntfnClientUpdates is a channel that's used to send new updates to + // topology notification clients to the ChannelGraph. Updates either + // add a new notification client, or cancel notifications for an + // existing client. + ntfnClientUpdates chan *topologyClientUpdate + quit chan struct{} wg sync.WaitGroup } @@ -65,8 +90,11 @@ func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph, } g := &ChannelGraph{ - KVStore: store, - quit: make(chan struct{}), + KVStore: store, + topologyUpdate: make(chan any), + topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{}, + ntfnClientUpdates: make(chan *topologyClientUpdate), + quit: make(chan struct{}), } // The graph cache can be turned off (e.g. for mobile users) for a @@ -95,6 +123,9 @@ func (c *ChannelGraph) Start() error { } } + c.wg.Add(1) + go c.handleTopologySubscriptions() + return nil } @@ -113,6 +144,60 @@ func (c *ChannelGraph) Stop() error { return nil } +// handleTopologySubscriptions ensures that topology client subscriptions, +// subscription cancellations and topology notifications are handled +// synchronously. +// +// NOTE: this MUST be run in a goroutine. +func (c *ChannelGraph) handleTopologySubscriptions() { + defer c.wg.Done() + + for { + select { + // A new fully validated topology update has just arrived. + // We'll notify any registered clients. + case update := <-c.topologyUpdate: + // TODO(elle): change topology handling to be handled + // synchronously so that we can guarantee the order of + // notification delivery. + c.wg.Add(1) + go c.handleTopologyUpdate(update) + + // TODO(roasbeef): remove all unconnected vertexes + // after N blocks pass with no corresponding + // announcements. + + // A new notification client update has arrived. We're either + // gaining a new client, or cancelling notifications for an + // existing client. + case ntfnUpdate := <-c.ntfnClientUpdates: + clientID := ntfnUpdate.clientID + + if ntfnUpdate.cancel { + client, ok := c.topologyClients.LoadAndDelete( + clientID, + ) + if ok { + close(client.exit) + client.wg.Wait() + + close(client.ntfnChan) + } + + continue + } + + c.topologyClients.Store(clientID, &topologyClient{ + ntfnChan: ntfnUpdate.ntfnChan, + exit: make(chan struct{}), + }) + + case <-c.quit: + return + } + } +} + // populateCache loads the entire channel graph into the in-memory graph cache. // // NOTE: This should only be called if the graphCache has been constructed. @@ -234,6 +319,12 @@ func (c *ChannelGraph) AddLightningNode(node *models.LightningNode, ) } + select { + case c.topologyUpdate <- node: + case <-c.quit: + return ErrChanGraphShuttingDown + } + return nil } @@ -276,6 +367,12 @@ func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo, c.graphCache.AddChannel(edge, nil, nil) } + select { + case c.topologyUpdate <- edge: + case <-c.quit: + return ErrChanGraphShuttingDown + } + return nil } @@ -411,6 +508,17 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, c.graphCache.Stats()) } + if len(edges) != 0 { + // Notify all currently registered clients of the newly closed + // channels. + closeSummaries := createCloseSummaries( + blockHeight, edges..., + ) + c.notifyTopologyChange(&TopologyChange{ + ClosedChannels: closeSummaries, + }) + } + return edges, nil } @@ -527,16 +635,20 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, return err } - if c.graphCache == nil { - return nil + if c.graphCache != nil { + var isUpdate1 bool + if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 { + isUpdate1 = true + } + + c.graphCache.UpdatePolicy(edge, from, to, isUpdate1) } - var isUpdate1 bool - if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 { - isUpdate1 = true + select { + case c.topologyUpdate <- edge: + case <-c.quit: + return ErrChanGraphShuttingDown } - c.graphCache.UpdatePolicy(edge, from, to, isUpdate1) - return nil } diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index 87214fc77..754da5eef 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -972,6 +972,23 @@ func randEdgePolicy(chanID uint64, db kvdb.Backend) *models.ChannelEdgePolicy { return newEdgePolicy(chanID, db, update) } +func copyEdgePolicy(p *models.ChannelEdgePolicy) *models.ChannelEdgePolicy { + return &models.ChannelEdgePolicy{ + SigBytes: p.SigBytes, + ChannelID: p.ChannelID, + LastUpdate: p.LastUpdate, + MessageFlags: p.MessageFlags, + ChannelFlags: p.ChannelFlags, + TimeLockDelta: p.TimeLockDelta, + MinHTLC: p.MinHTLC, + MaxHTLC: p.MaxHTLC, + FeeBaseMSat: p.FeeBaseMSat, + FeeProportionalMillionths: p.FeeProportionalMillionths, + ToNode: p.ToNode, + ExtraOpaqueData: p.ExtraOpaqueData, + } +} + func newEdgePolicy(chanID uint64, db kvdb.Backend, updateTime int64) *models.ChannelEdgePolicy { @@ -2937,6 +2954,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { if err := graph.UpdateEdgePolicy(edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } + edge1 = copyEdgePolicy(edge1) // Avoid read/write race conditions. edge2 := randEdgePolicy(chanID.ToUint64(), graph.db) edge2.ChannelFlags = 1 @@ -2945,6 +2963,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { if err := graph.UpdateEdgePolicy(edge2); err != nil { t.Fatalf("unable to update edge: %v", err) } + edge2 = copyEdgePolicy(edge2) // Avoid read/write race conditions. // checkIndexTimestamps is a helper function that checks the edge update // index only includes the given timestamps. @@ -4052,6 +4071,7 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) { 253, 217, 3, 8, 0, 0, 0, 10, 0, 0, 0, 20, } require.NoError(t, graph.UpdateEdgePolicy(edge1)) + edge1 = copyEdgePolicy(edge1) // Avoid read/write race conditions. directedChan := getSingleChannel() require.NotNil(t, directedChan) diff --git a/graph/notifications.go b/graph/db/notifications.go similarity index 91% rename from graph/notifications.go rename to graph/db/notifications.go index 76eabdb02..7d54a7431 100644 --- a/graph/notifications.go +++ b/graph/db/notifications.go @@ -1,4 +1,4 @@ -package graph +package graphdb import ( "fmt" @@ -54,16 +54,16 @@ type topologyClientUpdate struct { // topology occurs. Changes that will be sent at notifications include: new // nodes appearing, node updating their attributes, new channels, channels // closing, and updates in the routing policies of a channel's directed edges. -func (b *Builder) SubscribeTopology() (*TopologyClient, error) { +func (c *ChannelGraph) SubscribeTopology() (*TopologyClient, error) { // If the router is not yet started, return an error to avoid a // deadlock waiting for it to handle the subscription request. - if !b.started.Load() { + if !c.started.Load() { return nil, fmt.Errorf("router not started") } // We'll first atomically obtain the next ID for this client from the // incrementing client ID counter. - clientID := b.ntfnClientCounter.Add(1) + clientID := c.ntfnClientCounter.Add(1) log.Debugf("New graph topology client subscription, client %v", clientID) @@ -71,12 +71,12 @@ func (b *Builder) SubscribeTopology() (*TopologyClient, error) { ntfnChan := make(chan *TopologyChange, 10) select { - case b.ntfnClientUpdates <- &topologyClientUpdate{ + case c.ntfnClientUpdates <- &topologyClientUpdate{ cancel: false, clientID: clientID, ntfnChan: ntfnChan, }: - case <-b.quit: + case <-c.quit: return nil, errors.New("ChannelRouter shutting down") } @@ -84,11 +84,11 @@ func (b *Builder) SubscribeTopology() (*TopologyClient, error) { TopologyChanges: ntfnChan, Cancel: func() { select { - case b.ntfnClientUpdates <- &topologyClientUpdate{ + case c.ntfnClientUpdates <- &topologyClientUpdate{ cancel: true, clientID: clientID, }: - case <-b.quit: + case <-c.quit: return } }, @@ -114,7 +114,7 @@ type topologyClient struct { // notifyTopologyChange notifies all registered clients of a new change in // graph topology in a non-blocking. -func (b *Builder) notifyTopologyChange(topologyDiff *TopologyChange) { +func (c *ChannelGraph) notifyTopologyChange(topologyDiff *TopologyChange) { // notifyClient is a helper closure that will send topology updates to // the given client. notifyClient := func(clientID uint64, client *topologyClient) bool { @@ -127,23 +127,22 @@ func (b *Builder) notifyTopologyChange(topologyDiff *TopologyChange) { len(topologyDiff.ChannelEdgeUpdates), len(topologyDiff.ClosedChannels)) - go func(c *topologyClient) { - defer c.wg.Done() + go func(t *topologyClient) { + defer t.wg.Done() select { // In this case we'll try to send the notification // directly to the upstream client consumer. - case c.ntfnChan <- topologyDiff: + case t.ntfnChan <- topologyDiff: // If the client cancels the notifications, then we'll // exit early. - case <-c.exit: + case <-t.exit: // Similarly, if the ChannelRouter itself exists early, // then we'll also exit ourselves. - case <-b.quit: - + case <-c.quit: } }(client) @@ -154,7 +153,29 @@ func (b *Builder) notifyTopologyChange(topologyDiff *TopologyChange) { // Range over the set of active clients, and attempt to send the // topology updates. - b.topologyClients.Range(notifyClient) + c.topologyClients.Range(notifyClient) +} + +// handleTopologyUpdate is responsible for sending any topology changes +// notifications to registered clients. +// +// NOTE: must be run inside goroutine. +func (c *ChannelGraph) handleTopologyUpdate(update any) { + defer c.wg.Done() + + topChange := &TopologyChange{} + err := c.addToTopologyChange(topChange, update) + if err != nil { + log.Errorf("unable to update topology change notification: %v", + err) + return + } + + if topChange.isEmpty() { + return + } + + c.notifyTopologyChange(topChange) } // TopologyChange represents a new set of modifications to the channel graph. @@ -310,8 +331,8 @@ type ChannelEdgeUpdate struct { // constitutes. This function will also fetch any required auxiliary // information required to create the topology change update from the graph // database. -func addToTopologyChange(graph DB, update *TopologyChange, - msg interface{}) error { +func (c *ChannelGraph) addToTopologyChange(update *TopologyChange, + msg any) error { switch m := msg.(type) { @@ -345,7 +366,7 @@ func addToTopologyChange(graph DB, update *TopologyChange, // We'll need to fetch the edge's information from the database // in order to get the information concerning which nodes are // being connected. - edgeInfo, _, _, err := graph.FetchChannelEdgesByID(m.ChannelID) + edgeInfo, _, _, err := c.FetchChannelEdgesByID(m.ChannelID) if err != nil { return errors.Errorf("unable fetch channel edge: %v", err) diff --git a/graph/notifications_test.go b/graph/notifications_test.go index 807b3fea7..0e2ec7afb 100644 --- a/graph/notifications_test.go +++ b/graph/notifications_test.go @@ -469,7 +469,7 @@ func TestEdgeUpdateNotification(t *testing.T) { // With the channel edge now in place, we'll subscribe for topology // notifications. - ntfnClient, err := ctx.builder.SubscribeTopology() + ntfnClient, err := ctx.graph.SubscribeTopology() require.NoError(t, err, "unable to subscribe for channel notifications") // Create random policy edges that are stemmed to the channel id @@ -489,7 +489,8 @@ func TestEdgeUpdateNotification(t *testing.T) { t.Fatalf("unable to add edge update: %v", err) } - assertEdgeCorrect := func(t *testing.T, edgeUpdate *ChannelEdgeUpdate, + assertEdgeCorrect := func(t *testing.T, + edgeUpdate *graphdb.ChannelEdgeUpdate, edgeAnn *models.ChannelEdgePolicy) { if edgeUpdate.ChanID != edgeAnn.ChannelID { @@ -659,7 +660,7 @@ func TestNodeUpdateNotification(t *testing.T) { } // Create a new client to receive notifications. - ntfnClient, err := ctx.builder.SubscribeTopology() + ntfnClient, err := ctx.graph.SubscribeTopology() require.NoError(t, err, "unable to subscribe for channel notifications") // Change network topology by adding the updated info for the two nodes @@ -672,7 +673,7 @@ func TestNodeUpdateNotification(t *testing.T) { } assertNodeNtfnCorrect := func(t *testing.T, ann *models.LightningNode, - nodeUpdate *NetworkNodeUpdate) { + nodeUpdate *graphdb.NetworkNodeUpdate) { nodeKey, _ := ann.PubKey() @@ -699,9 +700,10 @@ func TestNodeUpdateNotification(t *testing.T) { t.Fatalf("node alias doesn't match: expected %v, got %v", ann.Alias, nodeUpdate.Alias) } - if nodeUpdate.Color != EncodeHexColor(ann.Color) { - t.Fatalf("node color doesn't match: expected %v, got %v", - EncodeHexColor(ann.Color), nodeUpdate.Color) + if nodeUpdate.Color != graphdb.EncodeHexColor(ann.Color) { + t.Fatalf("node color doesn't match: expected %v, "+ + "got %v", graphdb.EncodeHexColor(ann.Color), + nodeUpdate.Color) } } @@ -793,7 +795,7 @@ func TestNotificationCancellation(t *testing.T) { ctx := createTestCtxSingleNode(t, startingBlockHeight) // Create a new client to receive notifications. - ntfnClient, err := ctx.builder.SubscribeTopology() + ntfnClient, err := ctx.graph.SubscribeTopology() require.NoError(t, err, "unable to subscribe for channel notifications") // We'll create the utxo for a new channel. @@ -919,7 +921,7 @@ func TestChannelCloseNotification(t *testing.T) { // With the channel edge now in place, we'll subscribe for topology // notifications. - ntfnClient, err := ctx.builder.SubscribeTopology() + ntfnClient, err := ctx.graph.SubscribeTopology() require.NoError(t, err, "unable to subscribe for channel notifications") // Next, we'll simulate the closure of our channel by generating a new @@ -1002,7 +1004,9 @@ func TestEncodeHexColor(t *testing.T) { } for _, tc := range colorTestCases { - encoded := EncodeHexColor(color.RGBA{tc.R, tc.G, tc.B, 0}) + encoded := graphdb.EncodeHexColor( + color.RGBA{tc.R, tc.G, tc.B, 0}, + ) if (encoded == tc.encoded) != tc.isValid { t.Fatalf("incorrect color encoding, "+ "want: %v, got: %v", tc.encoded, encoded) diff --git a/pilot.go b/pilot.go index 11333a072..8cbf23cc6 100644 --- a/pilot.go +++ b/pilot.go @@ -295,6 +295,6 @@ func initAutoPilot(svr *server, cfg *lncfg.AutoPilot, }, nil }, SubscribeTransactions: svr.cc.Wallet.SubscribeTransactions, - SubscribeTopology: svr.graphBuilder.SubscribeTopology, + SubscribeTopology: svr.graphDB.SubscribeTopology, }, nil } diff --git a/rpcserver.go b/rpcserver.go index 7236a5dad..692ba74d2 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -48,7 +48,6 @@ import ( "github.com/lightningnetwork/lnd/feature" "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/funding" - "github.com/lightningnetwork/lnd/graph" graphdb "github.com/lightningnetwork/lnd/graph/db" "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/htlcswitch" @@ -3294,7 +3293,7 @@ func (r *rpcServer) GetInfo(_ context.Context, // TODO(roasbeef): add synced height n stuff isTestNet := chainreg.IsTestnet(&r.cfg.ActiveNetParams) - nodeColor := graph.EncodeHexColor(nodeAnn.RGBColor) + nodeColor := graphdb.EncodeHexColor(nodeAnn.RGBColor) version := build.Version() + " commit=" + build.Commit return &lnrpc.GetInfoResponse{ @@ -6886,7 +6885,7 @@ func marshalNode(node *models.LightningNode) *lnrpc.LightningNode { PubKey: hex.EncodeToString(node.PubKeyBytes[:]), Addresses: nodeAddrs, Alias: node.Alias, - Color: graph.EncodeHexColor(node.Color), + Color: graphdb.EncodeHexColor(node.Color), Features: features, CustomRecords: customRecords, } @@ -7084,7 +7083,7 @@ func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription, // First, we start by subscribing to a new intent to receive // notifications from the channel router. - client, err := r.server.graphBuilder.SubscribeTopology() + client, err := r.server.graphDB.SubscribeTopology() if err != nil { return err } @@ -7137,7 +7136,7 @@ func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription, // returned by the router to the form of notifications expected by the current // gRPC service. func marshallTopologyChange( - topChange *graph.TopologyChange) *lnrpc.GraphTopologyUpdate { + topChange *graphdb.TopologyChange) *lnrpc.GraphTopologyUpdate { // encodeKey is a simple helper function that converts a live public // key into a hex-encoded version of the compressed serialization for diff --git a/server.go b/server.go index dba560bb2..deae1f591 100644 --- a/server.go +++ b/server.go @@ -368,7 +368,7 @@ type server struct { // updatePersistentPeerAddrs subscribes to topology changes and stores // advertised addresses for any NodeAnnouncements from our persisted peers. func (s *server) updatePersistentPeerAddrs() error { - graphSub, err := s.graphBuilder.SubscribeTopology() + graphSub, err := s.graphDB.SubscribeTopology() if err != nil { return err } From 2614110684aad4013a093ec35bd3a5afc95343f0 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 5 Mar 2025 07:58:14 +0200 Subject: [PATCH 3/4] graph/db: refactor to group all topology notification fields A clean-up commit just to separate out all topology related fields in ChannelGraph into a dedicated struct that then gets mounted to the ChannelGraph. --- graph/db/graph.go | 30 ++++-------------------------- graph/db/notifications.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 26 deletions(-) diff --git a/graph/db/graph.go b/graph/db/graph.go index fa8a6c29c..9e35e58dd 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -12,7 +12,6 @@ import ( "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/kvdb" - "github.com/lightningnetwork/lnd/lnutils" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) @@ -50,26 +49,7 @@ type ChannelGraph struct { graphCache *GraphCache *KVStore - - // ntfnClientCounter is an atomic counter that's used to assign unique - // notification client IDs to new clients. - ntfnClientCounter atomic.Uint64 - - // topologyUpdate is a channel that carries new topology updates - // messages from outside the ChannelGraph to be processed by the - // networkHandler. - topologyUpdate chan any - - // topologyClients maps a client's unique notification ID to a - // topologyClient client that contains its notification dispatch - // channel. - topologyClients *lnutils.SyncMap[uint64, *topologyClient] - - // ntfnClientUpdates is a channel that's used to send new updates to - // topology notification clients to the ChannelGraph. Updates either - // add a new notification client, or cancel notifications for an - // existing client. - ntfnClientUpdates chan *topologyClientUpdate + *topologyManager quit chan struct{} wg sync.WaitGroup @@ -90,11 +70,9 @@ func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph, } g := &ChannelGraph{ - KVStore: store, - topologyUpdate: make(chan any), - topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{}, - ntfnClientUpdates: make(chan *topologyClientUpdate), - quit: make(chan struct{}), + KVStore: store, + topologyManager: newTopologyManager(), + quit: make(chan struct{}), } // The graph cache can be turned off (e.g. for mobile users) for a diff --git a/graph/db/notifications.go b/graph/db/notifications.go index 7d54a7431..2ed2be16f 100644 --- a/graph/db/notifications.go +++ b/graph/db/notifications.go @@ -5,15 +5,50 @@ import ( "image/color" "net" "sync" + "sync/atomic" "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/wire" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/graph/db/models" + "github.com/lightningnetwork/lnd/lnutils" "github.com/lightningnetwork/lnd/lnwire" ) +// topologyManager holds all the fields required to manage the network topology +// subscriptions and notifications. +type topologyManager struct { + // ntfnClientCounter is an atomic counter that's used to assign unique + // notification client IDs to new clients. + ntfnClientCounter atomic.Uint64 + + // topologyUpdate is a channel that carries new topology updates + // messages from outside the ChannelGraph to be processed by the + // networkHandler. + topologyUpdate chan any + + // topologyClients maps a client's unique notification ID to a + // topologyClient client that contains its notification dispatch + // channel. + topologyClients *lnutils.SyncMap[uint64, *topologyClient] + + // ntfnClientUpdates is a channel that's used to send new updates to + // topology notification clients to the ChannelGraph. Updates either + // add a new notification client, or cancel notifications for an + // existing client. + ntfnClientUpdates chan *topologyClientUpdate +} + +// newTopologyManager creates a new instance of the topologyManager. +func newTopologyManager() *topologyManager { + return &topologyManager{ + topologyUpdate: make(chan any), + topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{}, + ntfnClientUpdates: make(chan *topologyClientUpdate), + } +} + // TopologyClient represents an intent to receive notifications from the // channel router regarding changes to the topology of the channel graph. The // TopologyChanges channel will be sent upon with new updates to the channel From 220eac2f0f39cdfbf006ba0fdd101cceb496b064 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Wed, 5 Mar 2025 08:01:47 +0200 Subject: [PATCH 4/4] docs: update release notes --- docs/release-notes/release-notes-0.19.0.md | 2 -- docs/release-notes/release-notes-0.20.0.md | 9 +++++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index c4efc2d99..118c1fde5 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -262,8 +262,6 @@ The underlying functionality between those two options remain the same. - [Abstract autopilot access](https://github.com/lightningnetwork/lnd/pull/9480) - [Abstract invoicerpc server access](https://github.com/lightningnetwork/lnd/pull/9516) - [Refactor to hide DB transactions](https://github.com/lightningnetwork/lnd/pull/9513) - - Move the [graph cache out of the graph - CRUD](https://github.com/lightningnetwork/lnd/pull/9544) layer. * [Golang was updated to `v1.22.11`](https://github.com/lightningnetwork/lnd/pull/9462). diff --git a/docs/release-notes/release-notes-0.20.0.md b/docs/release-notes/release-notes-0.20.0.md index 1ee2e05fa..1e528bc8d 100644 --- a/docs/release-notes/release-notes-0.20.0.md +++ b/docs/release-notes/release-notes-0.20.0.md @@ -69,6 +69,15 @@ ## Code Health +* Graph abstraction and refactoring work: + - Move the [graph cache out of the graph + CRUD](https://github.com/lightningnetwork/lnd/pull/9544) layer. + - Move [topology + subscription](https://github.com/lightningnetwork/lnd/pull/9577) and + notification handling from the graph.Builder to the ChannelGraph. + ## Tooling and Documentation # Contributors (Alphabetical Order) + +* Elle Mouton