Merge pull request #5873 from Roasbeef/rpc-graph-cache

channeldb+rpc: optimize graph related RPC calls
This commit is contained in:
Olaoluwa Osuntokun 2021-11-04 17:38:28 -07:00 committed by GitHub
commit 5404fe82c1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 206 additions and 9 deletions

View file

@ -458,6 +458,72 @@ func (c *ChannelGraph) FetchNodeFeatures(
}
}
// ForEachNodeCached is similar to ForEachNode, but it utilizes the channel
// graph cache instead. Note that this doesn't return all the information the
// regular ForEachNode method does.
//
// NOTE: The callback contents MUST not be modified.
func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex,
chans map[uint64]*DirectedChannel) error) error {
if c.graphCache != nil {
return c.graphCache.ForEachNode(cb)
}
// Otherwise call back to a version that uses the database directly.
// We'll iterate over each node, then the set of channels for each
// node, and construct a similar callback functiopn signature as the
// main funcotin expects.
return c.ForEachNode(func(tx kvdb.RTx, node *LightningNode) error {
channels := make(map[uint64]*DirectedChannel)
err := node.ForEachChannel(tx, func(tx kvdb.RTx,
e *ChannelEdgeInfo, p1 *ChannelEdgePolicy,
p2 *ChannelEdgePolicy) error {
toNodeCallback := func() route.Vertex {
return node.PubKeyBytes
}
toNodeFeatures, err := c.FetchNodeFeatures(
node.PubKeyBytes,
)
if err != nil {
return err
}
var cachedInPolicy *CachedEdgePolicy
if p2 != nil {
cachedInPolicy := NewCachedPolicy(p2)
cachedInPolicy.ToNodePubKey = toNodeCallback
cachedInPolicy.ToNodeFeatures = toNodeFeatures
}
directedChannel := &DirectedChannel{
ChannelID: e.ChannelID,
IsNode1: node.PubKeyBytes == e.NodeKey1Bytes,
OtherNode: e.NodeKey2Bytes,
Capacity: e.Capacity,
OutPolicySet: p1 != nil,
InPolicy: cachedInPolicy,
}
if node.PubKeyBytes == e.NodeKey2Bytes {
directedChannel.OtherNode = e.NodeKey1Bytes
}
channels[e.ChannelID] = directedChannel
return nil
})
if err != nil {
return err
}
return cb(node.PubKeyBytes, channels)
})
}
// DisabledChannelIDs returns the channel ids of disabled channels.
// A channel is disabled when two of the associated ChanelEdgePolicies
// have their disabled bit on.

View file

@ -447,6 +447,30 @@ func (c *GraphCache) ForEachChannel(node route.Vertex,
return nil
}
// ForEachNode iterates over the adjacency list of the graph, executing the
// call back for each node and the set of channels that emanate from the given
// node.
//
// NOTE: This method should be considered _read only_, the channels or nodes
// passed in MUST NOT be modified.
func (c *GraphCache) ForEachNode(cb func(node route.Vertex,
channels map[uint64]*DirectedChannel) error) error {
c.mtx.RLock()
defer c.mtx.RUnlock()
for node, channels := range c.nodeChannels {
// We don't make a copy here since this is a read-only RPC
// call. We also don't need the node features either for this
// call.
if err := cb(node, channels); err != nil {
return err
}
}
return nil
}
// GetFeatures returns the features of the node with the given ID. If no
// features are known for the node, an empty feature vector is returned.
func (c *GraphCache) GetFeatures(node route.Vertex) *lnwire.FeatureVector {

View file

@ -120,6 +120,24 @@ func TestGraphCacheAddNode(t *testing.T) {
require.Equal(t, inPolicy1 != nil, toChannels[0].OutPolicySet)
assertCachedPolicyEqual(t, outPolicy1, toChannels[0].InPolicy)
// Now that we've inserted two nodes into the graph, check that
// we'll recover the same set of channels during ForEachNode.
nodes := make(map[route.Vertex]struct{})
chans := make(map[uint64]struct{})
_ = cache.ForEachNode(func(node route.Vertex,
edges map[uint64]*DirectedChannel) error {
nodes[node] = struct{}{}
for chanID := range edges {
chans[chanID] = struct{}{}
}
return nil
})
require.Len(t, nodes, 2)
require.Len(t, chans, 1)
}
runTest(pubKey1, pubKey2)

View file

@ -1103,6 +1103,34 @@ func TestGraphTraversal(t *testing.T) {
const numChannels = 5
chanIndex, nodeList := fillTestGraph(t, graph, numNodes, numChannels)
// Make an index of the node list for easy look up below.
nodeIndex := make(map[route.Vertex]struct{})
for _, node := range nodeList {
nodeIndex[node.PubKeyBytes] = struct{}{}
}
// If we turn the channel graph cache _off_, then iterate through the
// set of channels (to force the fall back), we should find all the
// channel as well as the nodes included.
graph.graphCache = nil
err = graph.ForEachNodeCached(func(node route.Vertex,
chans map[uint64]*DirectedChannel) error {
if _, ok := nodeIndex[node]; !ok {
return fmt.Errorf("node %x not found in graph", node)
}
for chanID := range chans {
if _, ok := chanIndex[chanID]; !ok {
return fmt.Errorf("chan %v not found in "+
"graph", chanID)
}
}
return nil
})
require.NoError(t, err)
// Iterate through all the known channels within the graph DB, once
// again if the map is empty that indicates that all edges have
// properly been reached.

View file

@ -520,6 +520,11 @@ messages directly. There is no routing/path finding involved.
buffer each time we decrypt an incoming message, as we
recycle these buffers in the peer.
* [The `DescribeGraph` and `GetNetworkInfo` calls have been
optimized](https://github.com/lightningnetwork/lnd/pull/5873) by caching the
response periodically, or using the new channel graph cache directly. This
should significantly cut down on the garbage these two calls generate.
## Log system
* [Save compressed log files from logrorate during

View file

@ -1,6 +1,9 @@
package lncfg
import "fmt"
import (
"fmt"
"time"
)
const (
// MinRejectCacheSize is a floor on the maximum capacity allowed for
@ -10,6 +13,10 @@ const (
// MinChannelCacheSize is a floor on the maximum capacity allowed for
// channeldb's channel cache. This amounts to roughly 2 MB when full.
MinChannelCacheSize = 1000
// DefaultRPCGraphCacheDuration is the default interval that the RPC
// response to DescribeGraph should be cached for.
DefaultRPCGraphCacheDuration = time.Minute
)
// Caches holds the configuration for various caches within lnd.
@ -24,6 +31,10 @@ type Caches struct {
// peers querying for gossip traffic. Memory usage is roughly 2Kb per
// entry.
ChannelCacheSize int `long:"channel-cache-size" description:"Maximum number of entries contained in the channel cache, which is used to reduce memory allocations from gossip queries from peers. Each entry requires roughly 2Kb."`
// RPCGraphCacheDuration is used to control the flush interval of the
// channel graph cache.
RPCGraphCacheDuration time.Duration `long:"rpc-graph-cache-duration" description:"The period of time expressed as a duration (1s, 1m, 1h, etc) that the RPC response to DescribeGraph should be cached for."`
}
// Validate checks the Caches configuration for values that are too small to be

View file

@ -303,6 +303,7 @@ func (cfg NodeConfig) genArgs() []string {
args = append(args, fmt.Sprintf("--invoicemacaroonpath=%v", cfg.InvoiceMacPath))
args = append(args, fmt.Sprintf("--trickledelay=%v", trickleDelay))
args = append(args, fmt.Sprintf("--profile=%d", cfg.ProfilePort))
args = append(args, fmt.Sprintf("--caches.rpc-graph-cache-duration=0"))
if !cfg.HasSeed {
args = append(args, "--noseedbackup")

View file

@ -628,6 +628,10 @@ type rpcServer struct {
// interceptor is used to be able to request a shutdown
interceptor signal.Interceptor
graphCache sync.RWMutex
describeGraphResp *lnrpc.ChannelGraph
graphCacheEvictor *time.Timer
}
// A compile time check to ensure that rpcServer fully implements the
@ -813,6 +817,23 @@ func (r *rpcServer) addDeps(s *server, macService *macaroons.Service,
r.chanPredicate = chanPredicate
r.macService = macService
r.selfNode = selfNode.PubKeyBytes
graphCacheDuration := r.cfg.Caches.RPCGraphCacheDuration
if graphCacheDuration != 0 {
r.graphCacheEvictor = time.AfterFunc(graphCacheDuration, func() {
// Grab the mutex and purge the current populated
// describe graph response.
r.graphCache.Lock()
defer r.graphCache.Unlock()
r.describeGraphResp = nil
// Reset ourselves as well at the end so we run again
// after the duration.
r.graphCacheEvictor.Reset(graphCacheDuration)
})
}
return nil
}
@ -5381,6 +5402,20 @@ func (r *rpcServer) DescribeGraph(ctx context.Context,
resp := &lnrpc.ChannelGraph{}
includeUnannounced := req.IncludeUnannounced
// Check to see if the cache is already populated, if so then we can
// just return it directly.
//
// TODO(roasbeef): move this to an interceptor level feature?
graphCacheActive := r.cfg.Caches.RPCGraphCacheDuration != 0
if graphCacheActive {
r.graphCache.Lock()
defer r.graphCache.Unlock()
if r.describeGraphResp != nil {
return r.describeGraphResp, nil
}
}
// Obtain the pointer to the global singleton channel graph, this will
// provide a consistent view of the graph due to bolt db's
// transactional model.
@ -5439,6 +5474,13 @@ func (r *rpcServer) DescribeGraph(ctx context.Context,
return nil, err
}
// We still have the mutex held, so we can safely populate the cache
// now to save on GC churn for this query, but only if the cache isn't
// disabled.
if graphCacheActive {
r.describeGraphResp = resp
}
return resp, nil
}
@ -5708,7 +5750,9 @@ func (r *rpcServer) GetNetworkInfo(ctx context.Context,
// network, tallying up the total number of nodes, and also gathering
// each node so we can measure the graph diameter and degree stats
// below.
if err := graph.ForEachNode(func(tx kvdb.RTx, node *channeldb.LightningNode) error {
err := graph.ForEachNodeCached(func(node route.Vertex,
edges map[uint64]*channeldb.DirectedChannel) error {
// Increment the total number of nodes with each iteration.
numNodes++
@ -5718,9 +5762,7 @@ func (r *rpcServer) GetNetworkInfo(ctx context.Context,
// through the db transaction from the outer view so we can
// re-use it within this inner view.
var outDegree uint32
if err := node.ForEachChannel(tx, func(_ kvdb.RTx,
edge *channeldb.ChannelEdgeInfo, _, _ *channeldb.ChannelEdgePolicy) error {
for _, edge := range edges {
// Bump up the out degree for this node for each
// channel encountered.
outDegree++
@ -5751,9 +5793,6 @@ func (r *rpcServer) GetNetworkInfo(ctx context.Context,
seenChans[edge.ChannelID] = struct{}{}
allChans = append(allChans, edge.Capacity)
return nil
}); err != nil {
return err
}
// Finally, if the out degree of this node is greater than what
@ -5763,7 +5802,8 @@ func (r *rpcServer) GetNetworkInfo(ctx context.Context,
}
return nil
}); err != nil {
})
if err != nil {
return nil, err
}

View file

@ -1088,6 +1088,10 @@ litecoin.node=ltcd
; roughly 2Kb. (default: 20000)
; caches.channel-cache-size=9000000
; The duration that the response to DescribeGraph should be cached for. Setting
; the value to zero disables the cache. (default: 1m)
; caches.rpc-graph-cache-duration=10m
[protocol]