Merge pull request #9577 from ellemouton/graph19

graph: move topology subscription to `ChannelGraph`
This commit is contained in:
Elle 2025-03-06 15:41:34 +02:00 committed by GitHub
commit bb4d964149
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 269 additions and 190 deletions

View file

@ -6,7 +6,7 @@ import (
"github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/graph" graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
@ -36,7 +36,7 @@ type ManagerCfg struct {
// SubscribeTopology is used to get a subscription for topology changes // SubscribeTopology is used to get a subscription for topology changes
// on the network. // on the network.
SubscribeTopology func() (*graph.TopologyClient, error) SubscribeTopology func() (*graphdb.TopologyClient, error)
} }
// Manager is struct that manages an autopilot agent, making it possible to // Manager is struct that manages an autopilot agent, making it possible to

View file

@ -262,8 +262,6 @@ The underlying functionality between those two options remain the same.
- [Abstract autopilot access](https://github.com/lightningnetwork/lnd/pull/9480) - [Abstract autopilot access](https://github.com/lightningnetwork/lnd/pull/9480)
- [Abstract invoicerpc server access](https://github.com/lightningnetwork/lnd/pull/9516) - [Abstract invoicerpc server access](https://github.com/lightningnetwork/lnd/pull/9516)
- [Refactor to hide DB transactions](https://github.com/lightningnetwork/lnd/pull/9513) - [Refactor to hide DB transactions](https://github.com/lightningnetwork/lnd/pull/9513)
- Move the [graph cache out of the graph
CRUD](https://github.com/lightningnetwork/lnd/pull/9544) layer.
* [Golang was updated to * [Golang was updated to
`v1.22.11`](https://github.com/lightningnetwork/lnd/pull/9462). `v1.22.11`](https://github.com/lightningnetwork/lnd/pull/9462).

View file

@ -69,6 +69,15 @@
## Code Health ## Code Health
* Graph abstraction and refactoring work:
- Move the [graph cache out of the graph
CRUD](https://github.com/lightningnetwork/lnd/pull/9544) layer.
- Move [topology
subscription](https://github.com/lightningnetwork/lnd/pull/9577) and
notification handling from the graph.Builder to the ChannelGraph.
## Tooling and Documentation ## Tooling and Documentation
# Contributors (Alphabetical Order) # Contributors (Alphabetical Order)
* Elle Mouton

View file

@ -109,7 +109,6 @@ type Builder struct {
started atomic.Bool started atomic.Bool
stopped atomic.Bool stopped atomic.Bool
ntfnClientCounter atomic.Uint64
bestHeight atomic.Uint32 bestHeight atomic.Uint32
cfg *Config cfg *Config
@ -123,22 +122,6 @@ type Builder struct {
// of our currently known best chain are sent over. // of our currently known best chain are sent over.
staleBlocks <-chan *chainview.FilteredBlock staleBlocks <-chan *chainview.FilteredBlock
// topologyUpdates is a channel that carries new topology updates
// messages from outside the Builder to be processed by the
// networkHandler.
topologyUpdates chan any
// topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch
// channel.
topologyClients *lnutils.SyncMap[uint64, *topologyClient]
// ntfnClientUpdates is a channel that's used to send new updates to
// topology notification clients to the Builder. Updates either
// add a new notification client, or cancel notifications for an
// existing client.
ntfnClientUpdates chan *topologyClientUpdate
// channelEdgeMtx is a mutex we use to make sure we process only one // channelEdgeMtx is a mutex we use to make sure we process only one
// ChannelEdgePolicy at a time for a given channelID, to ensure // ChannelEdgePolicy at a time for a given channelID, to ensure
// consistency between the various database accesses. // consistency between the various database accesses.
@ -164,9 +147,6 @@ var _ ChannelGraphSource = (*Builder)(nil)
func NewBuilder(cfg *Config) (*Builder, error) { func NewBuilder(cfg *Config) (*Builder, error) {
return &Builder{ return &Builder{
cfg: cfg, cfg: cfg,
topologyUpdates: make(chan any),
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate),
channelEdgeMtx: multimutex.NewMutex[uint64](), channelEdgeMtx: multimutex.NewMutex[uint64](),
statTicker: ticker.New(defaultStatInterval), statTicker: ticker.New(defaultStatInterval),
stats: new(builderStats), stats: new(builderStats),
@ -656,28 +636,6 @@ func (b *Builder) pruneZombieChans() error {
return nil return nil
} }
// handleTopologyUpdate is responsible for sending any topology changes
// notifications to registered clients.
//
// NOTE: must be run inside goroutine.
func (b *Builder) handleTopologyUpdate(update any) {
defer b.wg.Done()
topChange := &TopologyChange{}
err := addToTopologyChange(b.cfg.Graph, topChange, update)
if err != nil {
log.Errorf("unable to update topology change notification: %v",
err)
return
}
if topChange.isEmpty() {
return
}
b.notifyTopologyChange(topChange)
}
// networkHandler is the primary goroutine for the Builder. The roles of // networkHandler is the primary goroutine for the Builder. The roles of
// this goroutine include answering queries related to the state of the // this goroutine include answering queries related to the state of the
// network, pruning the graph on new block notification, applying network // network, pruning the graph on new block notification, applying network
@ -701,16 +659,6 @@ func (b *Builder) networkHandler() {
} }
select { select {
// A new fully validated topology update has just arrived.
// We'll notify any registered clients.
case update := <-b.topologyUpdates:
b.wg.Add(1)
go b.handleTopologyUpdate(update)
// TODO(roasbeef): remove all unconnected vertexes
// after N blocks pass with no corresponding
// announcements.
case chainUpdate, ok := <-b.staleBlocks: case chainUpdate, ok := <-b.staleBlocks:
// If the channel has been closed, then this indicates // If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves. // the daemon is shutting down, so we exit ourselves.
@ -783,31 +731,6 @@ func (b *Builder) networkHandler() {
" processed.", chainUpdate.Height) " processed.", chainUpdate.Height)
} }
// A new notification client update has arrived. We're either
// gaining a new client, or cancelling notifications for an
// existing client.
case ntfnUpdate := <-b.ntfnClientUpdates:
clientID := ntfnUpdate.clientID
if ntfnUpdate.cancel {
client, ok := b.topologyClients.LoadAndDelete(
clientID,
)
if ok {
close(client.exit)
client.wg.Wait()
close(client.ntfnChan)
}
continue
}
b.topologyClients.Store(clientID, &topologyClient{
ntfnChan: ntfnUpdate.ntfnChan,
exit: make(chan struct{}),
})
// The graph prune ticker has ticked, so we'll examine the // The graph prune ticker has ticked, so we'll examine the
// state of the known graph to filter out any zombie channels // state of the known graph to filter out any zombie channels
// for pruning. // for pruning.
@ -934,16 +857,6 @@ func (b *Builder) updateGraphWithClosedChannels(
log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash, log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash,
blockHeight, len(chansClosed)) blockHeight, len(chansClosed))
if len(chansClosed) == 0 {
return err
}
// Notify all currently registered clients of the newly closed channels.
closeSummaries := createCloseSummaries(blockHeight, chansClosed...)
b.notifyTopologyChange(&TopologyChange{
ClosedChannels: closeSummaries,
})
return nil return nil
} }
@ -1067,12 +980,6 @@ func (b *Builder) AddNode(node *models.LightningNode,
return err return err
} }
select {
case b.topologyUpdates <- node:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
return nil return nil
} }
@ -1117,12 +1024,6 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
return err return err
} }
select {
case b.topologyUpdates <- edge:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
return nil return nil
} }
@ -1224,12 +1125,6 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
return err return err
} }
select {
case b.topologyUpdates <- update:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
return nil return nil
} }

View file

@ -16,6 +16,10 @@ import (
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
) )
// ErrChanGraphShuttingDown indicates that the ChannelGraph has shutdown or is
// busy shutting down.
var ErrChanGraphShuttingDown = fmt.Errorf("ChannelGraph shutting down")
// Config is a struct that holds all the necessary dependencies for a // Config is a struct that holds all the necessary dependencies for a
// ChannelGraph. // ChannelGraph.
type Config struct { type Config struct {
@ -45,6 +49,7 @@ type ChannelGraph struct {
graphCache *GraphCache graphCache *GraphCache
*KVStore *KVStore
*topologyManager
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -66,6 +71,7 @@ func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph,
g := &ChannelGraph{ g := &ChannelGraph{
KVStore: store, KVStore: store,
topologyManager: newTopologyManager(),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -95,6 +101,9 @@ func (c *ChannelGraph) Start() error {
} }
} }
c.wg.Add(1)
go c.handleTopologySubscriptions()
return nil return nil
} }
@ -113,6 +122,60 @@ func (c *ChannelGraph) Stop() error {
return nil return nil
} }
// handleTopologySubscriptions ensures that topology client subscriptions,
// subscription cancellations and topology notifications are handled
// synchronously.
//
// NOTE: this MUST be run in a goroutine.
func (c *ChannelGraph) handleTopologySubscriptions() {
defer c.wg.Done()
for {
select {
// A new fully validated topology update has just arrived.
// We'll notify any registered clients.
case update := <-c.topologyUpdate:
// TODO(elle): change topology handling to be handled
// synchronously so that we can guarantee the order of
// notification delivery.
c.wg.Add(1)
go c.handleTopologyUpdate(update)
// TODO(roasbeef): remove all unconnected vertexes
// after N blocks pass with no corresponding
// announcements.
// A new notification client update has arrived. We're either
// gaining a new client, or cancelling notifications for an
// existing client.
case ntfnUpdate := <-c.ntfnClientUpdates:
clientID := ntfnUpdate.clientID
if ntfnUpdate.cancel {
client, ok := c.topologyClients.LoadAndDelete(
clientID,
)
if ok {
close(client.exit)
client.wg.Wait()
close(client.ntfnChan)
}
continue
}
c.topologyClients.Store(clientID, &topologyClient{
ntfnChan: ntfnUpdate.ntfnChan,
exit: make(chan struct{}),
})
case <-c.quit:
return
}
}
}
// populateCache loads the entire channel graph into the in-memory graph cache. // populateCache loads the entire channel graph into the in-memory graph cache.
// //
// NOTE: This should only be called if the graphCache has been constructed. // NOTE: This should only be called if the graphCache has been constructed.
@ -234,6 +297,12 @@ func (c *ChannelGraph) AddLightningNode(node *models.LightningNode,
) )
} }
select {
case c.topologyUpdate <- node:
case <-c.quit:
return ErrChanGraphShuttingDown
}
return nil return nil
} }
@ -276,6 +345,12 @@ func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo,
c.graphCache.AddChannel(edge, nil, nil) c.graphCache.AddChannel(edge, nil, nil)
} }
select {
case c.topologyUpdate <- edge:
case <-c.quit:
return ErrChanGraphShuttingDown
}
return nil return nil
} }
@ -411,6 +486,17 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
c.graphCache.Stats()) c.graphCache.Stats())
} }
if len(edges) != 0 {
// Notify all currently registered clients of the newly closed
// channels.
closeSummaries := createCloseSummaries(
blockHeight, edges...,
)
c.notifyTopologyChange(&TopologyChange{
ClosedChannels: closeSummaries,
})
}
return edges, nil return edges, nil
} }
@ -527,16 +613,20 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
return err return err
} }
if c.graphCache == nil { if c.graphCache != nil {
return nil
}
var isUpdate1 bool var isUpdate1 bool
if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 { if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
isUpdate1 = true isUpdate1 = true
} }
c.graphCache.UpdatePolicy(edge, from, to, isUpdate1) c.graphCache.UpdatePolicy(edge, from, to, isUpdate1)
}
select {
case c.topologyUpdate <- edge:
case <-c.quit:
return ErrChanGraphShuttingDown
}
return nil return nil
} }

View file

@ -155,62 +155,70 @@ func TestNodeInsertionAndDeletion(t *testing.T) {
} }
// TestPartialNode checks that we can add and retrieve a LightningNode where // TestPartialNode checks that we can add and retrieve a LightningNode where
// where only the pubkey is known to the database. // only the pubkey is known to the database.
func TestPartialNode(t *testing.T) { func TestPartialNode(t *testing.T) {
t.Parallel() t.Parallel()
graph, err := MakeTestGraph(t) graph, err := MakeTestGraph(t)
require.NoError(t, err, "unable to make test database") require.NoError(t, err, "unable to make test database")
// We want to be able to insert nodes into the graph that only has the // To insert a partial node, we need to add a channel edge that has
// PubKey set. // node keys for nodes we are not yet aware
node := &models.LightningNode{ var node1, node2 models.LightningNode
HaveNodeAnnouncement: false, copy(node1.PubKeyBytes[:], pubKey1Bytes)
PubKeyBytes: testPub, copy(node2.PubKeyBytes[:], pubKey2Bytes)
}
if err := graph.AddLightningNode(node); err != nil { // Create an edge attached to these nodes and add it to the graph.
t.Fatalf("unable to add node: %v", err) edgeInfo, _ := createEdge(140, 0, 0, 0, &node1, &node2)
} require.NoError(t, graph.AddChannelEdge(&edgeInfo))
assertNodeInCache(t, graph, node, nil)
// Next, fetch the node from the database to ensure everything was // Both of the nodes should now be in both the graph (as partial/shell)
// nodes _and_ the cache should also have an awareness of both nodes.
assertNodeInCache(t, graph, &node1, nil)
assertNodeInCache(t, graph, &node2, nil)
// Next, fetch the node2 from the database to ensure everything was
// serialized properly. // serialized properly.
dbNode, err := graph.FetchLightningNode(testPub) dbNode1, err := graph.FetchLightningNode(pubKey1)
require.NoError(t, err, "unable to locate node") require.NoError(t, err)
dbNode2, err := graph.FetchLightningNode(pubKey2)
require.NoError(t, err)
_, exists, err := graph.HasLightningNode(dbNode.PubKeyBytes) _, exists, err := graph.HasLightningNode(dbNode1.PubKeyBytes)
if err != nil { require.NoError(t, err)
t.Fatalf("unable to query for node: %v", err) require.True(t, exists)
} else if !exists {
t.Fatalf("node should be found but wasn't")
}
// The two nodes should match exactly! (with default values for // The two nodes should match exactly! (with default values for
// LastUpdate and db set to satisfy compareNodes()) // LastUpdate and db set to satisfy compareNodes())
node = &models.LightningNode{ expectedNode1 := &models.LightningNode{
HaveNodeAnnouncement: false, HaveNodeAnnouncement: false,
LastUpdate: time.Unix(0, 0), LastUpdate: time.Unix(0, 0),
PubKeyBytes: testPub, PubKeyBytes: pubKey1,
} }
require.NoError(t, compareNodes(dbNode1, expectedNode1))
if err := compareNodes(node, dbNode); err != nil { _, exists, err = graph.HasLightningNode(dbNode2.PubKeyBytes)
t.Fatalf("nodes don't match: %v", err) require.NoError(t, err)
require.True(t, exists)
// The two nodes should match exactly! (with default values for
// LastUpdate and db set to satisfy compareNodes())
expectedNode2 := &models.LightningNode{
HaveNodeAnnouncement: false,
LastUpdate: time.Unix(0, 0),
PubKeyBytes: pubKey2,
} }
require.NoError(t, compareNodes(dbNode2, expectedNode2))
// Next, delete the node from the graph, this should purge all data // Next, delete the node from the graph, this should purge all data
// related to the node. // related to the node.
if err := graph.DeleteLightningNode(testPub); err != nil { require.NoError(t, graph.DeleteLightningNode(pubKey1))
t.Fatalf("unable to delete node: %v", err)
}
assertNodeNotInCache(t, graph, testPub) assertNodeNotInCache(t, graph, testPub)
// Finally, attempt to fetch the node again. This should fail as the // Finally, attempt to fetch the node again. This should fail as the
// node should have been deleted from the database. // node should have been deleted from the database.
_, err = graph.FetchLightningNode(testPub) _, err = graph.FetchLightningNode(testPub)
if err != ErrGraphNodeNotFound { require.ErrorIs(t, err, ErrGraphNodeNotFound)
t.Fatalf("fetch after delete should fail!")
}
} }
func TestAliasLookup(t *testing.T) { func TestAliasLookup(t *testing.T) {
@ -964,6 +972,23 @@ func randEdgePolicy(chanID uint64, db kvdb.Backend) *models.ChannelEdgePolicy {
return newEdgePolicy(chanID, db, update) return newEdgePolicy(chanID, db, update)
} }
func copyEdgePolicy(p *models.ChannelEdgePolicy) *models.ChannelEdgePolicy {
return &models.ChannelEdgePolicy{
SigBytes: p.SigBytes,
ChannelID: p.ChannelID,
LastUpdate: p.LastUpdate,
MessageFlags: p.MessageFlags,
ChannelFlags: p.ChannelFlags,
TimeLockDelta: p.TimeLockDelta,
MinHTLC: p.MinHTLC,
MaxHTLC: p.MaxHTLC,
FeeBaseMSat: p.FeeBaseMSat,
FeeProportionalMillionths: p.FeeProportionalMillionths,
ToNode: p.ToNode,
ExtraOpaqueData: p.ExtraOpaqueData,
}
}
func newEdgePolicy(chanID uint64, db kvdb.Backend, func newEdgePolicy(chanID uint64, db kvdb.Backend,
updateTime int64) *models.ChannelEdgePolicy { updateTime int64) *models.ChannelEdgePolicy {
@ -2929,6 +2954,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) {
if err := graph.UpdateEdgePolicy(edge1); err != nil { if err := graph.UpdateEdgePolicy(edge1); err != nil {
t.Fatalf("unable to update edge: %v", err) t.Fatalf("unable to update edge: %v", err)
} }
edge1 = copyEdgePolicy(edge1) // Avoid read/write race conditions.
edge2 := randEdgePolicy(chanID.ToUint64(), graph.db) edge2 := randEdgePolicy(chanID.ToUint64(), graph.db)
edge2.ChannelFlags = 1 edge2.ChannelFlags = 1
@ -2937,6 +2963,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) {
if err := graph.UpdateEdgePolicy(edge2); err != nil { if err := graph.UpdateEdgePolicy(edge2); err != nil {
t.Fatalf("unable to update edge: %v", err) t.Fatalf("unable to update edge: %v", err)
} }
edge2 = copyEdgePolicy(edge2) // Avoid read/write race conditions.
// checkIndexTimestamps is a helper function that checks the edge update // checkIndexTimestamps is a helper function that checks the edge update
// index only includes the given timestamps. // index only includes the given timestamps.
@ -4044,6 +4071,7 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) {
253, 217, 3, 8, 0, 0, 0, 10, 0, 0, 0, 20, 253, 217, 3, 8, 0, 0, 0, 10, 0, 0, 0, 20,
} }
require.NoError(t, graph.UpdateEdgePolicy(edge1)) require.NoError(t, graph.UpdateEdgePolicy(edge1))
edge1 = copyEdgePolicy(edge1) // Avoid read/write race conditions.
directedChan := getSingleChannel() directedChan := getSingleChannel()
require.NotNil(t, directedChan) require.NotNil(t, directedChan)

View file

@ -1,19 +1,54 @@
package graph package graphdb
import ( import (
"fmt" "fmt"
"image/color" "image/color"
"net" "net"
"sync" "sync"
"sync/atomic"
"github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/go-errors/errors" "github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/lnutils"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
) )
// topologyManager holds all the fields required to manage the network topology
// subscriptions and notifications.
type topologyManager struct {
// ntfnClientCounter is an atomic counter that's used to assign unique
// notification client IDs to new clients.
ntfnClientCounter atomic.Uint64
// topologyUpdate is a channel that carries new topology updates
// messages from outside the ChannelGraph to be processed by the
// networkHandler.
topologyUpdate chan any
// topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch
// channel.
topologyClients *lnutils.SyncMap[uint64, *topologyClient]
// ntfnClientUpdates is a channel that's used to send new updates to
// topology notification clients to the ChannelGraph. Updates either
// add a new notification client, or cancel notifications for an
// existing client.
ntfnClientUpdates chan *topologyClientUpdate
}
// newTopologyManager creates a new instance of the topologyManager.
func newTopologyManager() *topologyManager {
return &topologyManager{
topologyUpdate: make(chan any),
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate),
}
}
// TopologyClient represents an intent to receive notifications from the // TopologyClient represents an intent to receive notifications from the
// channel router regarding changes to the topology of the channel graph. The // channel router regarding changes to the topology of the channel graph. The
// TopologyChanges channel will be sent upon with new updates to the channel // TopologyChanges channel will be sent upon with new updates to the channel
@ -54,16 +89,16 @@ type topologyClientUpdate struct {
// topology occurs. Changes that will be sent at notifications include: new // topology occurs. Changes that will be sent at notifications include: new
// nodes appearing, node updating their attributes, new channels, channels // nodes appearing, node updating their attributes, new channels, channels
// closing, and updates in the routing policies of a channel's directed edges. // closing, and updates in the routing policies of a channel's directed edges.
func (b *Builder) SubscribeTopology() (*TopologyClient, error) { func (c *ChannelGraph) SubscribeTopology() (*TopologyClient, error) {
// If the router is not yet started, return an error to avoid a // If the router is not yet started, return an error to avoid a
// deadlock waiting for it to handle the subscription request. // deadlock waiting for it to handle the subscription request.
if !b.started.Load() { if !c.started.Load() {
return nil, fmt.Errorf("router not started") return nil, fmt.Errorf("router not started")
} }
// We'll first atomically obtain the next ID for this client from the // We'll first atomically obtain the next ID for this client from the
// incrementing client ID counter. // incrementing client ID counter.
clientID := b.ntfnClientCounter.Add(1) clientID := c.ntfnClientCounter.Add(1)
log.Debugf("New graph topology client subscription, client %v", log.Debugf("New graph topology client subscription, client %v",
clientID) clientID)
@ -71,12 +106,12 @@ func (b *Builder) SubscribeTopology() (*TopologyClient, error) {
ntfnChan := make(chan *TopologyChange, 10) ntfnChan := make(chan *TopologyChange, 10)
select { select {
case b.ntfnClientUpdates <- &topologyClientUpdate{ case c.ntfnClientUpdates <- &topologyClientUpdate{
cancel: false, cancel: false,
clientID: clientID, clientID: clientID,
ntfnChan: ntfnChan, ntfnChan: ntfnChan,
}: }:
case <-b.quit: case <-c.quit:
return nil, errors.New("ChannelRouter shutting down") return nil, errors.New("ChannelRouter shutting down")
} }
@ -84,11 +119,11 @@ func (b *Builder) SubscribeTopology() (*TopologyClient, error) {
TopologyChanges: ntfnChan, TopologyChanges: ntfnChan,
Cancel: func() { Cancel: func() {
select { select {
case b.ntfnClientUpdates <- &topologyClientUpdate{ case c.ntfnClientUpdates <- &topologyClientUpdate{
cancel: true, cancel: true,
clientID: clientID, clientID: clientID,
}: }:
case <-b.quit: case <-c.quit:
return return
} }
}, },
@ -114,7 +149,7 @@ type topologyClient struct {
// notifyTopologyChange notifies all registered clients of a new change in // notifyTopologyChange notifies all registered clients of a new change in
// graph topology in a non-blocking. // graph topology in a non-blocking.
func (b *Builder) notifyTopologyChange(topologyDiff *TopologyChange) { func (c *ChannelGraph) notifyTopologyChange(topologyDiff *TopologyChange) {
// notifyClient is a helper closure that will send topology updates to // notifyClient is a helper closure that will send topology updates to
// the given client. // the given client.
notifyClient := func(clientID uint64, client *topologyClient) bool { notifyClient := func(clientID uint64, client *topologyClient) bool {
@ -127,23 +162,22 @@ func (b *Builder) notifyTopologyChange(topologyDiff *TopologyChange) {
len(topologyDiff.ChannelEdgeUpdates), len(topologyDiff.ChannelEdgeUpdates),
len(topologyDiff.ClosedChannels)) len(topologyDiff.ClosedChannels))
go func(c *topologyClient) { go func(t *topologyClient) {
defer c.wg.Done() defer t.wg.Done()
select { select {
// In this case we'll try to send the notification // In this case we'll try to send the notification
// directly to the upstream client consumer. // directly to the upstream client consumer.
case c.ntfnChan <- topologyDiff: case t.ntfnChan <- topologyDiff:
// If the client cancels the notifications, then we'll // If the client cancels the notifications, then we'll
// exit early. // exit early.
case <-c.exit: case <-t.exit:
// Similarly, if the ChannelRouter itself exists early, // Similarly, if the ChannelRouter itself exists early,
// then we'll also exit ourselves. // then we'll also exit ourselves.
case <-b.quit: case <-c.quit:
} }
}(client) }(client)
@ -154,7 +188,29 @@ func (b *Builder) notifyTopologyChange(topologyDiff *TopologyChange) {
// Range over the set of active clients, and attempt to send the // Range over the set of active clients, and attempt to send the
// topology updates. // topology updates.
b.topologyClients.Range(notifyClient) c.topologyClients.Range(notifyClient)
}
// handleTopologyUpdate is responsible for sending any topology changes
// notifications to registered clients.
//
// NOTE: must be run inside goroutine.
func (c *ChannelGraph) handleTopologyUpdate(update any) {
defer c.wg.Done()
topChange := &TopologyChange{}
err := c.addToTopologyChange(topChange, update)
if err != nil {
log.Errorf("unable to update topology change notification: %v",
err)
return
}
if topChange.isEmpty() {
return
}
c.notifyTopologyChange(topChange)
} }
// TopologyChange represents a new set of modifications to the channel graph. // TopologyChange represents a new set of modifications to the channel graph.
@ -310,8 +366,8 @@ type ChannelEdgeUpdate struct {
// constitutes. This function will also fetch any required auxiliary // constitutes. This function will also fetch any required auxiliary
// information required to create the topology change update from the graph // information required to create the topology change update from the graph
// database. // database.
func addToTopologyChange(graph DB, update *TopologyChange, func (c *ChannelGraph) addToTopologyChange(update *TopologyChange,
msg interface{}) error { msg any) error {
switch m := msg.(type) { switch m := msg.(type) {
@ -345,7 +401,7 @@ func addToTopologyChange(graph DB, update *TopologyChange,
// We'll need to fetch the edge's information from the database // We'll need to fetch the edge's information from the database
// in order to get the information concerning which nodes are // in order to get the information concerning which nodes are
// being connected. // being connected.
edgeInfo, _, _, err := graph.FetchChannelEdgesByID(m.ChannelID) edgeInfo, _, _, err := c.FetchChannelEdgesByID(m.ChannelID)
if err != nil { if err != nil {
return errors.Errorf("unable fetch channel edge: %v", return errors.Errorf("unable fetch channel edge: %v",
err) err)

View file

@ -469,7 +469,7 @@ func TestEdgeUpdateNotification(t *testing.T) {
// With the channel edge now in place, we'll subscribe for topology // With the channel edge now in place, we'll subscribe for topology
// notifications. // notifications.
ntfnClient, err := ctx.builder.SubscribeTopology() ntfnClient, err := ctx.graph.SubscribeTopology()
require.NoError(t, err, "unable to subscribe for channel notifications") require.NoError(t, err, "unable to subscribe for channel notifications")
// Create random policy edges that are stemmed to the channel id // Create random policy edges that are stemmed to the channel id
@ -489,7 +489,8 @@ func TestEdgeUpdateNotification(t *testing.T) {
t.Fatalf("unable to add edge update: %v", err) t.Fatalf("unable to add edge update: %v", err)
} }
assertEdgeCorrect := func(t *testing.T, edgeUpdate *ChannelEdgeUpdate, assertEdgeCorrect := func(t *testing.T,
edgeUpdate *graphdb.ChannelEdgeUpdate,
edgeAnn *models.ChannelEdgePolicy) { edgeAnn *models.ChannelEdgePolicy) {
if edgeUpdate.ChanID != edgeAnn.ChannelID { if edgeUpdate.ChanID != edgeAnn.ChannelID {
@ -659,7 +660,7 @@ func TestNodeUpdateNotification(t *testing.T) {
} }
// Create a new client to receive notifications. // Create a new client to receive notifications.
ntfnClient, err := ctx.builder.SubscribeTopology() ntfnClient, err := ctx.graph.SubscribeTopology()
require.NoError(t, err, "unable to subscribe for channel notifications") require.NoError(t, err, "unable to subscribe for channel notifications")
// Change network topology by adding the updated info for the two nodes // Change network topology by adding the updated info for the two nodes
@ -672,7 +673,7 @@ func TestNodeUpdateNotification(t *testing.T) {
} }
assertNodeNtfnCorrect := func(t *testing.T, ann *models.LightningNode, assertNodeNtfnCorrect := func(t *testing.T, ann *models.LightningNode,
nodeUpdate *NetworkNodeUpdate) { nodeUpdate *graphdb.NetworkNodeUpdate) {
nodeKey, _ := ann.PubKey() nodeKey, _ := ann.PubKey()
@ -699,9 +700,10 @@ func TestNodeUpdateNotification(t *testing.T) {
t.Fatalf("node alias doesn't match: expected %v, got %v", t.Fatalf("node alias doesn't match: expected %v, got %v",
ann.Alias, nodeUpdate.Alias) ann.Alias, nodeUpdate.Alias)
} }
if nodeUpdate.Color != EncodeHexColor(ann.Color) { if nodeUpdate.Color != graphdb.EncodeHexColor(ann.Color) {
t.Fatalf("node color doesn't match: expected %v, got %v", t.Fatalf("node color doesn't match: expected %v, "+
EncodeHexColor(ann.Color), nodeUpdate.Color) "got %v", graphdb.EncodeHexColor(ann.Color),
nodeUpdate.Color)
} }
} }
@ -793,7 +795,7 @@ func TestNotificationCancellation(t *testing.T) {
ctx := createTestCtxSingleNode(t, startingBlockHeight) ctx := createTestCtxSingleNode(t, startingBlockHeight)
// Create a new client to receive notifications. // Create a new client to receive notifications.
ntfnClient, err := ctx.builder.SubscribeTopology() ntfnClient, err := ctx.graph.SubscribeTopology()
require.NoError(t, err, "unable to subscribe for channel notifications") require.NoError(t, err, "unable to subscribe for channel notifications")
// We'll create the utxo for a new channel. // We'll create the utxo for a new channel.
@ -919,7 +921,7 @@ func TestChannelCloseNotification(t *testing.T) {
// With the channel edge now in place, we'll subscribe for topology // With the channel edge now in place, we'll subscribe for topology
// notifications. // notifications.
ntfnClient, err := ctx.builder.SubscribeTopology() ntfnClient, err := ctx.graph.SubscribeTopology()
require.NoError(t, err, "unable to subscribe for channel notifications") require.NoError(t, err, "unable to subscribe for channel notifications")
// Next, we'll simulate the closure of our channel by generating a new // Next, we'll simulate the closure of our channel by generating a new
@ -1002,7 +1004,9 @@ func TestEncodeHexColor(t *testing.T) {
} }
for _, tc := range colorTestCases { for _, tc := range colorTestCases {
encoded := EncodeHexColor(color.RGBA{tc.R, tc.G, tc.B, 0}) encoded := graphdb.EncodeHexColor(
color.RGBA{tc.R, tc.G, tc.B, 0},
)
if (encoded == tc.encoded) != tc.isValid { if (encoded == tc.encoded) != tc.isValid {
t.Fatalf("incorrect color encoding, "+ t.Fatalf("incorrect color encoding, "+
"want: %v, got: %v", tc.encoded, encoded) "want: %v, got: %v", tc.encoded, encoded)

View file

@ -295,6 +295,6 @@ func initAutoPilot(svr *server, cfg *lncfg.AutoPilot,
}, nil }, nil
}, },
SubscribeTransactions: svr.cc.Wallet.SubscribeTransactions, SubscribeTransactions: svr.cc.Wallet.SubscribeTransactions,
SubscribeTopology: svr.graphBuilder.SubscribeTopology, SubscribeTopology: svr.graphDB.SubscribeTopology,
}, nil }, nil
} }

View file

@ -48,7 +48,6 @@ import (
"github.com/lightningnetwork/lnd/feature" "github.com/lightningnetwork/lnd/feature"
"github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/funding" "github.com/lightningnetwork/lnd/funding"
"github.com/lightningnetwork/lnd/graph"
graphdb "github.com/lightningnetwork/lnd/graph/db" graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/graph/db/models"
"github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/htlcswitch"
@ -3294,7 +3293,7 @@ func (r *rpcServer) GetInfo(_ context.Context,
// TODO(roasbeef): add synced height n stuff // TODO(roasbeef): add synced height n stuff
isTestNet := chainreg.IsTestnet(&r.cfg.ActiveNetParams) isTestNet := chainreg.IsTestnet(&r.cfg.ActiveNetParams)
nodeColor := graph.EncodeHexColor(nodeAnn.RGBColor) nodeColor := graphdb.EncodeHexColor(nodeAnn.RGBColor)
version := build.Version() + " commit=" + build.Commit version := build.Version() + " commit=" + build.Commit
return &lnrpc.GetInfoResponse{ return &lnrpc.GetInfoResponse{
@ -6886,7 +6885,7 @@ func marshalNode(node *models.LightningNode) *lnrpc.LightningNode {
PubKey: hex.EncodeToString(node.PubKeyBytes[:]), PubKey: hex.EncodeToString(node.PubKeyBytes[:]),
Addresses: nodeAddrs, Addresses: nodeAddrs,
Alias: node.Alias, Alias: node.Alias,
Color: graph.EncodeHexColor(node.Color), Color: graphdb.EncodeHexColor(node.Color),
Features: features, Features: features,
CustomRecords: customRecords, CustomRecords: customRecords,
} }
@ -7084,7 +7083,7 @@ func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription,
// First, we start by subscribing to a new intent to receive // First, we start by subscribing to a new intent to receive
// notifications from the channel router. // notifications from the channel router.
client, err := r.server.graphBuilder.SubscribeTopology() client, err := r.server.graphDB.SubscribeTopology()
if err != nil { if err != nil {
return err return err
} }
@ -7137,7 +7136,7 @@ func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription,
// returned by the router to the form of notifications expected by the current // returned by the router to the form of notifications expected by the current
// gRPC service. // gRPC service.
func marshallTopologyChange( func marshallTopologyChange(
topChange *graph.TopologyChange) *lnrpc.GraphTopologyUpdate { topChange *graphdb.TopologyChange) *lnrpc.GraphTopologyUpdate {
// encodeKey is a simple helper function that converts a live public // encodeKey is a simple helper function that converts a live public
// key into a hex-encoded version of the compressed serialization for // key into a hex-encoded version of the compressed serialization for

View file

@ -368,7 +368,7 @@ type server struct {
// updatePersistentPeerAddrs subscribes to topology changes and stores // updatePersistentPeerAddrs subscribes to topology changes and stores
// advertised addresses for any NodeAnnouncements from our persisted peers. // advertised addresses for any NodeAnnouncements from our persisted peers.
func (s *server) updatePersistentPeerAddrs() error { func (s *server) updatePersistentPeerAddrs() error {
graphSub, err := s.graphBuilder.SubscribeTopology() graphSub, err := s.graphDB.SubscribeTopology()
if err != nil { if err != nil {
return err return err
} }