multi: let ForEachChannel be a method on ChannelGraph

Having a `ForEachChannel` method on the `LightningNode` struct itself
results in a `kvdb.Backend` object needing to be stored within the
LightningNode struct. In this commit, this method is replaced with a
`ForEachNodeChannel` method on the `ChannelGraph` struct will perform
the same function without needing the db pointer to be stored within the
LightningNode. This change, the LightningNode struct more closely
represents the schema on disk.

The existing `ForEachNodeChannel` method on `ChannelGraph` is renamed to
`ForEachNodeDirectedChannel`. It performs a slightly different function
since it's call-back operates on Cached policies.
This commit is contained in:
Elle Mouton 2023-10-23 13:44:40 +02:00
parent dacb86fdbc
commit fa7c1e250b
No known key found for this signature in database
GPG Key ID: D7D916376026F177
7 changed files with 207 additions and 171 deletions

View File

@ -53,6 +53,8 @@ func ChannelGraphFromDatabase(db *channeldb.ChannelGraph) ChannelGraph {
// channeldb.LightningNode. The wrapper method implement the autopilot.Node
// interface.
type dbNode struct {
db *channeldb.ChannelGraph
tx kvdb.RTx
node *channeldb.LightningNode
@ -86,31 +88,36 @@ func (d dbNode) Addrs() []net.Addr {
//
// NOTE: Part of the autopilot.Node interface.
func (d dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
return d.node.ForEachChannel(d.tx, func(tx kvdb.RTx,
ei *channeldb.ChannelEdgeInfo, ep, _ *channeldb.ChannelEdgePolicy) error {
return d.db.ForEachNodeChannel(d.tx, d.node.PubKeyBytes,
func(tx kvdb.RTx, ei *channeldb.ChannelEdgeInfo, ep,
_ *channeldb.ChannelEdgePolicy) error {
// Skip channels for which no outgoing edge policy is available.
//
// TODO(joostjager): Ideally the case where channels have a nil
// policy should be supported, as autopilot is not looking at
// the policies. For now, it is not easily possible to get a
// reference to the other end LightningNode object without
// retrieving the policy.
if ep == nil {
return nil
}
// Skip channels for which no outgoing edge policy is
// available.
//
// TODO(joostjager): Ideally the case where channels
// have a nil policy should be supported, as autopilot
// is not looking at the policies. For now, it is not
// easily possible to get a reference to the other end
// LightningNode object without retrieving the policy.
if ep == nil {
return nil
}
edge := ChannelEdge{
ChanID: lnwire.NewShortChanIDFromInt(ep.ChannelID),
Capacity: ei.Capacity,
Peer: dbNode{
tx: tx,
node: ep.Node,
},
}
edge := ChannelEdge{
ChanID: lnwire.NewShortChanIDFromInt(
ep.ChannelID,
),
Capacity: ei.Capacity,
Peer: dbNode{
tx: tx,
db: d.db,
node: ep.Node,
},
}
return cb(edge)
})
return cb(edge)
})
}
// ForEachNode is a higher-order function that should be called once for each
@ -128,6 +135,7 @@ func (d *databaseChannelGraph) ForEachNode(cb func(Node) error) error {
}
node := dbNode{
db: d.db,
tx: tx,
node: n,
}
@ -266,6 +274,7 @@ func (d *databaseChannelGraph) addRandChannel(node1, node2 *btcec.PublicKey,
ChanID: chanID,
Capacity: capacity,
Peer: dbNode{
db: d.db,
node: vertex1,
},
},
@ -273,6 +282,7 @@ func (d *databaseChannelGraph) addRandChannel(node1, node2 *btcec.PublicKey,
ChanID: chanID,
Capacity: capacity,
Peer: dbNode{
db: d.db,
node: vertex2,
},
},

View File

@ -459,16 +459,14 @@ func (c *ChannelGraph) ForEachChannel(cb func(*ChannelEdgeInfo,
}, func() {})
}
// ForEachNodeChannel iterates through all channels of a given node, executing the
// passed callback with an edge info structure and the policies of each end
// of the channel. The first edge policy is the outgoing edge *to* the
// connecting node, while the second is the incoming edge *from* the
// connecting node. If the callback returns an error, then the iteration is
// halted with the error propagated back up to the caller.
// ForEachNodeDirectedChannel iterates through all channels of a given node,
// executing the passed callback on the directed edge representing the channel
// and its incoming policy. If the callback returns an error, then the iteration
// is halted with the error propagated back up to the caller.
//
// Unknown policies are passed into the callback as nil values.
func (c *ChannelGraph) ForEachNodeChannel(tx kvdb.RTx, node route.Vertex,
cb func(channel *DirectedChannel) error) error {
func (c *ChannelGraph) ForEachNodeDirectedChannel(tx kvdb.RTx,
node route.Vertex, cb func(channel *DirectedChannel) error) error {
if c.graphCache != nil {
return c.graphCache.ForEachChannel(node, cb)
@ -557,44 +555,49 @@ func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
return c.ForEachNode(func(tx kvdb.RTx, node *LightningNode) error {
channels := make(map[uint64]*DirectedChannel)
err := node.ForEachChannel(tx, func(tx kvdb.RTx,
e *ChannelEdgeInfo, p1 *ChannelEdgePolicy,
p2 *ChannelEdgePolicy) error {
err := c.ForEachNodeChannel(tx, node.PubKeyBytes,
func(tx kvdb.RTx, e *ChannelEdgeInfo,
p1 *ChannelEdgePolicy,
p2 *ChannelEdgePolicy) error {
toNodeCallback := func() route.Vertex {
return node.PubKeyBytes
}
toNodeFeatures, err := c.FetchNodeFeatures(
node.PubKeyBytes,
)
if err != nil {
return err
}
toNodeCallback := func() route.Vertex {
return node.PubKeyBytes
}
toNodeFeatures, err := c.FetchNodeFeatures(
node.PubKeyBytes,
)
if err != nil {
return err
}
var cachedInPolicy *CachedEdgePolicy
if p2 != nil {
cachedInPolicy := NewCachedPolicy(p2)
cachedInPolicy.ToNodePubKey = toNodeCallback
cachedInPolicy.ToNodeFeatures = toNodeFeatures
}
var cachedInPolicy *CachedEdgePolicy
if p2 != nil {
cachedInPolicy := NewCachedPolicy(p2)
cachedInPolicy.ToNodePubKey =
toNodeCallback
cachedInPolicy.ToNodeFeatures =
toNodeFeatures
}
directedChannel := &DirectedChannel{
ChannelID: e.ChannelID,
IsNode1: node.PubKeyBytes == e.NodeKey1Bytes,
OtherNode: e.NodeKey2Bytes,
Capacity: e.Capacity,
OutPolicySet: p1 != nil,
InPolicy: cachedInPolicy,
}
directedChannel := &DirectedChannel{
ChannelID: e.ChannelID,
IsNode1: node.PubKeyBytes ==
e.NodeKey1Bytes,
OtherNode: e.NodeKey2Bytes,
Capacity: e.Capacity,
OutPolicySet: p1 != nil,
InPolicy: cachedInPolicy,
}
if node.PubKeyBytes == e.NodeKey2Bytes {
directedChannel.OtherNode = e.NodeKey1Bytes
}
if node.PubKeyBytes == e.NodeKey2Bytes {
directedChannel.OtherNode =
e.NodeKey1Bytes
}
channels[e.ChannelID] = directedChannel
channels[e.ChannelID] = directedChannel
return nil
})
return nil
})
if err != nil {
return err
}
@ -2740,15 +2743,18 @@ func (l *LightningNode) NodeAnnouncement(signed bool) (*lnwire.NodeAnnouncement,
// isPublic determines whether the node is seen as public within the graph from
// the source node's point of view. An existing database transaction can also be
// specified.
func (l *LightningNode) isPublic(tx kvdb.RTx, sourcePubKey []byte) (bool, error) {
func (c *ChannelGraph) isPublic(tx kvdb.RTx, nodePub route.Vertex,
sourcePubKey []byte) (bool, error) {
// In order to determine whether this node is publicly advertised within
// the graph, we'll need to look at all of its edges and check whether
// they extend to any other node than the source node. errDone will be
// used to terminate the check early.
nodeIsPublic := false
errDone := errors.New("done")
err := l.ForEachChannel(tx, func(_ kvdb.RTx, info *ChannelEdgeInfo,
_, _ *ChannelEdgePolicy) error {
err := c.ForEachNodeChannel(tx, nodePub, func(tx kvdb.RTx,
info *ChannelEdgeInfo, _ *ChannelEdgePolicy,
_ *ChannelEdgePolicy) error {
// If this edge doesn't extend to the source node, we'll
// terminate our search as we can now conclude that the node is
@ -3003,10 +3009,10 @@ func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend,
return traversal(tx)
}
// ForEachChannel iterates through all channels of this node, executing the
// passed callback with an edge info structure and the policies of each end
// of the channel. The first edge policy is the outgoing edge *to* the
// connecting node, while the second is the incoming edge *from* the
// ForEachNodeChannel iterates through all channels of the given node,
// executing the passed callback with an edge info structure and the policies
// of each end of the channel. The first edge policy is the outgoing edge *to*
// the connecting node, while the second is the incoming edge *from* the
// connecting node. If the callback returns an error, then the iteration is
// halted with the error propagated back up to the caller.
//
@ -3016,14 +3022,11 @@ func nodeTraversal(tx kvdb.RTx, nodePub []byte, db kvdb.Backend,
// should be passed as the first argument. Otherwise the first argument should
// be nil and a fresh transaction will be created to execute the graph
// traversal.
func (l *LightningNode) ForEachChannel(tx kvdb.RTx,
func (c *ChannelGraph) ForEachNodeChannel(tx kvdb.RTx, nodePub route.Vertex,
cb func(kvdb.RTx, *ChannelEdgeInfo, *ChannelEdgePolicy,
*ChannelEdgePolicy) error) error {
nodePub := l.PubKeyBytes[:]
db := l.db
return nodeTraversal(tx, nodePub, db, cb)
return nodeTraversal(tx, nodePub[:], c.db, cb)
}
// ChannelEdgeInfo represents a fully authenticated channel along with all its
@ -3718,7 +3721,7 @@ func (c *ChannelGraph) IsPublicNode(pubKey [33]byte) (bool, error) {
return err
}
nodeIsPublic, err = node.isPublic(tx, ourPubKey)
nodeIsPublic, err = c.isPublic(tx, node.PubKeyBytes, ourPubKey)
return err
}, func() {
nodeIsPublic = false

View File

@ -1056,30 +1056,39 @@ func TestGraphTraversal(t *testing.T) {
// outgoing channels for a particular node.
numNodeChans := 0
firstNode, secondNode := nodeList[0], nodeList[1]
err = firstNode.ForEachChannel(nil, func(_ kvdb.RTx, _ *ChannelEdgeInfo,
outEdge, inEdge *ChannelEdgePolicy) error {
err = graph.ForEachNodeChannel(nil, firstNode.PubKeyBytes,
func(_ kvdb.RTx, _ *ChannelEdgeInfo, outEdge,
inEdge *ChannelEdgePolicy) error {
// All channels between first and second node should have fully
// (both sides) specified policies.
if inEdge == nil || outEdge == nil {
return fmt.Errorf("channel policy not present")
}
// All channels between first and second node should
// have fully (both sides) specified policies.
if inEdge == nil || outEdge == nil {
return fmt.Errorf("channel policy not present")
}
// Each should indicate that it's outgoing (pointed
// towards the second node).
if !bytes.Equal(outEdge.Node.PubKeyBytes[:], secondNode.PubKeyBytes[:]) {
return fmt.Errorf("wrong outgoing edge")
}
// Each should indicate that it's outgoing (pointed
// towards the second node).
if !bytes.Equal(
outEdge.Node.PubKeyBytes[:],
secondNode.PubKeyBytes[:],
) {
// The incoming edge should also indicate that it's pointing to
// the origin node.
if !bytes.Equal(inEdge.Node.PubKeyBytes[:], firstNode.PubKeyBytes[:]) {
return fmt.Errorf("wrong outgoing edge")
}
return fmt.Errorf("wrong outgoing edge")
}
numNodeChans++
return nil
})
// The incoming edge should also indicate that it's
// pointing to the origin node.
if !bytes.Equal(
inEdge.Node.PubKeyBytes[:],
firstNode.PubKeyBytes[:],
) {
return fmt.Errorf("wrong outgoing edge")
}
numNodeChans++
return nil
})
require.NoError(t, err)
require.Equal(t, numChannels, numNodeChans)
}
@ -2280,29 +2289,30 @@ func TestIncompleteChannelPolicies(t *testing.T) {
// Ensure that channel is reported with unknown policies.
checkPolicies := func(node *LightningNode, expectedIn, expectedOut bool) {
calls := 0
err := node.ForEachChannel(nil, func(_ kvdb.RTx, _ *ChannelEdgeInfo,
outEdge, inEdge *ChannelEdgePolicy) error {
err := graph.ForEachNodeChannel(nil, node.PubKeyBytes,
func(_ kvdb.RTx, _ *ChannelEdgeInfo, outEdge,
inEdge *ChannelEdgePolicy) error {
if !expectedOut && outEdge != nil {
t.Fatalf("Expected no outgoing policy")
}
if !expectedOut && outEdge != nil {
t.Fatalf("Expected no outgoing policy")
}
if expectedOut && outEdge == nil {
t.Fatalf("Expected an outgoing policy")
}
if expectedOut && outEdge == nil {
t.Fatalf("Expected an outgoing policy")
}
if !expectedIn && inEdge != nil {
t.Fatalf("Expected no incoming policy")
}
if !expectedIn && inEdge != nil {
t.Fatalf("Expected no incoming policy")
}
if expectedIn && inEdge == nil {
t.Fatalf("Expected an incoming policy")
}
if expectedIn && inEdge == nil {
t.Fatalf("Expected an incoming policy")
}
calls++
calls++
return nil
})
return nil
})
if err != nil {
t.Fatalf("unable to scan channels: %v", err)
}
@ -3470,8 +3480,8 @@ func BenchmarkForEachChannel(b *testing.B) {
}
}
// TestGraphCacheForEachNodeChannel tests that the ForEachNodeChannel method
// works as expected, and is able to handle nil self edges.
// TestGraphCacheForEachNodeChannel tests that the ForEachNodeDirectedChannel
// method works as expected, and is able to handle nil self edges.
func TestGraphCacheForEachNodeChannel(t *testing.T) {
graph, err := MakeTestGraph(t)
require.NoError(t, err)
@ -3498,11 +3508,13 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) {
// We should be able to accumulate the single channel added, even
// though we have a nil edge policy here.
var numChans int
err = graph.ForEachNodeChannel(nil, node1.PubKeyBytes,
func(channel *DirectedChannel) error {
err = graph.ForEachNodeDirectedChannel(nil, node1.PubKeyBytes,
func(_ *DirectedChannel) error {
numChans++
return nil
})
},
)
require.NoError(t, err)
require.Equal(t, numChans, 1)

View File

@ -75,7 +75,7 @@ func (g *CachedGraph) Close() error {
func (g *CachedGraph) forEachNodeChannel(nodePub route.Vertex,
cb func(channel *channeldb.DirectedChannel) error) error {
return g.graph.ForEachNodeChannel(g.tx, nodePub, cb)
return g.graph.ForEachNodeDirectedChannel(g.tx, nodePub, cb)
}
// sourceNode returns the source node of the graph.

View File

@ -2824,16 +2824,19 @@ func (r *ChannelRouter) ForEachNode(
func (r *ChannelRouter) ForAllOutgoingChannels(cb func(kvdb.RTx,
*channeldb.ChannelEdgeInfo, *channeldb.ChannelEdgePolicy) error) error {
return r.selfNode.ForEachChannel(nil, func(tx kvdb.RTx,
c *channeldb.ChannelEdgeInfo,
e, _ *channeldb.ChannelEdgePolicy) error {
return r.cfg.Graph.ForEachNodeChannel(nil, r.selfNode.PubKeyBytes,
func(tx kvdb.RTx, c *channeldb.ChannelEdgeInfo,
e *channeldb.ChannelEdgePolicy,
_ *channeldb.ChannelEdgePolicy) error {
if e == nil {
return fmt.Errorf("channel from self node has no policy")
}
if e == nil {
return fmt.Errorf("channel from self node " +
"has no policy")
}
return cb(tx, c, e)
})
return cb(tx, c, e)
},
)
}
// AddProof updates the channel edge info with proof which is needed to

View File

@ -6151,30 +6151,33 @@ func (r *rpcServer) GetNodeInfo(ctx context.Context,
channels []*lnrpc.ChannelEdge
)
if err := node.ForEachChannel(nil, func(_ kvdb.RTx,
edge *channeldb.ChannelEdgeInfo,
c1, c2 *channeldb.ChannelEdgePolicy) error {
err = graph.ForEachNodeChannel(nil, node.PubKeyBytes,
func(_ kvdb.RTx, edge *channeldb.ChannelEdgeInfo,
c1, c2 *channeldb.ChannelEdgePolicy) error {
numChannels++
totalCapacity += edge.Capacity
numChannels++
totalCapacity += edge.Capacity
// Only populate the node's channels if the user requested them.
if in.IncludeChannels {
// Do not include unannounced channels - private
// channels or public channels whose authentication
// proof were not confirmed yet.
if edge.AuthProof == nil {
return nil
// Only populate the node's channels if the user
// requested them.
if in.IncludeChannels {
// Do not include unannounced channels - private
// channels or public channels whose
// authentication proof were not confirmed yet.
if edge.AuthProof == nil {
return nil
}
// Convert the database's edge format into the
// network/RPC edge format.
channelEdge := marshalDbEdge(edge, c1, c2)
channels = append(channels, channelEdge)
}
// Convert the database's edge format into the
// network/RPC edge format.
channelEdge := marshalDbEdge(edge, c1, c2)
channels = append(channels, channelEdge)
}
return nil
}); err != nil {
return nil
},
)
if err != nil {
return nil, err
}
@ -6763,34 +6766,39 @@ func (r *rpcServer) FeeReport(ctx context.Context,
}
var feeReports []*lnrpc.ChannelFeeReport
err = selfNode.ForEachChannel(nil, func(_ kvdb.RTx, chanInfo *channeldb.ChannelEdgeInfo,
edgePolicy, _ *channeldb.ChannelEdgePolicy) error {
err = channelGraph.ForEachNodeChannel(nil, selfNode.PubKeyBytes,
func(_ kvdb.RTx, chanInfo *channeldb.ChannelEdgeInfo,
edgePolicy, _ *channeldb.ChannelEdgePolicy) error {
// Self node should always have policies for its channels.
if edgePolicy == nil {
return fmt.Errorf("no policy for outgoing channel %v ",
chanInfo.ChannelID)
}
// Self node should always have policies for its
// channels.
if edgePolicy == nil {
return fmt.Errorf("no policy for outgoing "+
"channel %v ", chanInfo.ChannelID)
}
// We'll compute the effective fee rate by converting from a
// fixed point fee rate to a floating point fee rate. The fee
// rate field in the database the amount of mSAT charged per
// 1mil mSAT sent, so will divide by this to get the proper fee
// rate.
feeRateFixedPoint := edgePolicy.FeeProportionalMillionths
feeRate := float64(feeRateFixedPoint) / feeBase
// We'll compute the effective fee rate by converting
// from a fixed point fee rate to a floating point fee
// rate. The fee rate field in the database the amount
// of mSAT charged per 1mil mSAT sent, so will divide by
// this to get the proper fee rate.
feeRateFixedPoint :=
edgePolicy.FeeProportionalMillionths
feeRate := float64(feeRateFixedPoint) / feeBase
// TODO(roasbeef): also add stats for revenue for each channel
feeReports = append(feeReports, &lnrpc.ChannelFeeReport{
ChanId: chanInfo.ChannelID,
ChannelPoint: chanInfo.ChannelPoint.String(),
BaseFeeMsat: int64(edgePolicy.FeeBaseMSat),
FeePerMil: int64(feeRateFixedPoint),
FeeRate: feeRate,
})
// TODO(roasbeef): also add stats for revenue for each
// channel
feeReports = append(feeReports, &lnrpc.ChannelFeeReport{
ChanId: chanInfo.ChannelID,
ChannelPoint: chanInfo.ChannelPoint.String(),
BaseFeeMsat: int64(edgePolicy.FeeBaseMSat),
FeePerMil: int64(feeRateFixedPoint),
FeeRate: feeRate,
})
return nil
})
return nil
},
)
if err != nil {
return nil, err
}

View File

@ -3092,7 +3092,7 @@ func (s *server) establishPersistentConnections() error {
// TODO(roasbeef): instead iterate over link nodes and query graph for
// each of the nodes.
selfPub := s.identityECDH.PubKey().SerializeCompressed()
err = sourceNode.ForEachChannel(nil, func(
err = s.graphDB.ForEachNodeChannel(nil, sourceNode.PubKeyBytes, func(
tx kvdb.RTx,
chanInfo *channeldb.ChannelEdgeInfo,
policy, _ *channeldb.ChannelEdgePolicy) error {