mirror of
https://github.com/lightningnetwork/lnd.git
synced 2024-11-19 18:10:34 +01:00
3df216f6c3
It may happen that we do pathfinding while also attempt to change the graph. In this case changing the database, channel state, graph while reading from the same sources may create deadlocks. To resolve this we change locking policy in the graph cache when pathfinding.
511 lines
16 KiB
Go
511 lines
16 KiB
Go
package channeldb
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/btcsuite/btcutil"
|
|
|
|
"github.com/lightningnetwork/lnd/kvdb"
|
|
"github.com/lightningnetwork/lnd/lnwire"
|
|
"github.com/lightningnetwork/lnd/routing/route"
|
|
)
|
|
|
|
// GraphCacheNode is an interface for all the information the cache needs to know
|
|
// about a lightning node.
|
|
type GraphCacheNode interface {
|
|
// PubKey is the node's public identity key.
|
|
PubKey() route.Vertex
|
|
|
|
// Features returns the node's p2p features.
|
|
Features() *lnwire.FeatureVector
|
|
|
|
// ForEachChannel iterates through all channels of a given node,
|
|
// executing the passed callback with an edge info structure and the
|
|
// policies of each end of the channel. The first edge policy is the
|
|
// outgoing edge *to* the connecting node, while the second is the
|
|
// incoming edge *from* the connecting node. If the callback returns an
|
|
// error, then the iteration is halted with the error propagated back up
|
|
// to the caller.
|
|
ForEachChannel(kvdb.RTx,
|
|
func(kvdb.RTx, *ChannelEdgeInfo, *ChannelEdgePolicy,
|
|
*ChannelEdgePolicy) error) error
|
|
}
|
|
|
|
// CachedEdgePolicy is a struct that only caches the information of a
|
|
// ChannelEdgePolicy that we actually use for pathfinding and therefore need to
|
|
// store in the cache.
|
|
type CachedEdgePolicy struct {
|
|
// ChannelID is the unique channel ID for the channel. The first 3
|
|
// bytes are the block height, the next 3 the index within the block,
|
|
// and the last 2 bytes are the output index for the channel.
|
|
ChannelID uint64
|
|
|
|
// MessageFlags is a bitfield which indicates the presence of optional
|
|
// fields (like max_htlc) in the policy.
|
|
MessageFlags lnwire.ChanUpdateMsgFlags
|
|
|
|
// ChannelFlags is a bitfield which signals the capabilities of the
|
|
// channel as well as the directed edge this update applies to.
|
|
ChannelFlags lnwire.ChanUpdateChanFlags
|
|
|
|
// TimeLockDelta is the number of blocks this node will subtract from
|
|
// the expiry of an incoming HTLC. This value expresses the time buffer
|
|
// the node would like to HTLC exchanges.
|
|
TimeLockDelta uint16
|
|
|
|
// MinHTLC is the smallest value HTLC this node will forward, expressed
|
|
// in millisatoshi.
|
|
MinHTLC lnwire.MilliSatoshi
|
|
|
|
// MaxHTLC is the largest value HTLC this node will forward, expressed
|
|
// in millisatoshi.
|
|
MaxHTLC lnwire.MilliSatoshi
|
|
|
|
// FeeBaseMSat is the base HTLC fee that will be charged for forwarding
|
|
// ANY HTLC, expressed in mSAT's.
|
|
FeeBaseMSat lnwire.MilliSatoshi
|
|
|
|
// FeeProportionalMillionths is the rate that the node will charge for
|
|
// HTLCs for each millionth of a satoshi forwarded.
|
|
FeeProportionalMillionths lnwire.MilliSatoshi
|
|
|
|
// ToNodePubKey is a function that returns the to node of a policy.
|
|
// Since we only ever store the inbound policy, this is always the node
|
|
// that we query the channels for in ForEachChannel(). Therefore, we can
|
|
// save a lot of space by not storing this information in the memory and
|
|
// instead just set this function when we copy the policy from cache in
|
|
// ForEachChannel().
|
|
ToNodePubKey func() route.Vertex
|
|
|
|
// ToNodeFeatures are the to node's features. They are never set while
|
|
// the edge is in the cache, only on the copy that is returned in
|
|
// ForEachChannel().
|
|
ToNodeFeatures *lnwire.FeatureVector
|
|
}
|
|
|
|
// ComputeFee computes the fee to forward an HTLC of `amt` milli-satoshis over
|
|
// the passed active payment channel. This value is currently computed as
|
|
// specified in BOLT07, but will likely change in the near future.
|
|
func (c *CachedEdgePolicy) ComputeFee(
|
|
amt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
|
|
|
|
return c.FeeBaseMSat + (amt*c.FeeProportionalMillionths)/feeRateParts
|
|
}
|
|
|
|
// ComputeFeeFromIncoming computes the fee to forward an HTLC given the incoming
|
|
// amount.
|
|
func (c *CachedEdgePolicy) ComputeFeeFromIncoming(
|
|
incomingAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
|
|
|
|
return incomingAmt - divideCeil(
|
|
feeRateParts*(incomingAmt-c.FeeBaseMSat),
|
|
feeRateParts+c.FeeProportionalMillionths,
|
|
)
|
|
}
|
|
|
|
// NewCachedPolicy turns a full policy into a minimal one that can be cached.
|
|
func NewCachedPolicy(policy *ChannelEdgePolicy) *CachedEdgePolicy {
|
|
return &CachedEdgePolicy{
|
|
ChannelID: policy.ChannelID,
|
|
MessageFlags: policy.MessageFlags,
|
|
ChannelFlags: policy.ChannelFlags,
|
|
TimeLockDelta: policy.TimeLockDelta,
|
|
MinHTLC: policy.MinHTLC,
|
|
MaxHTLC: policy.MaxHTLC,
|
|
FeeBaseMSat: policy.FeeBaseMSat,
|
|
FeeProportionalMillionths: policy.FeeProportionalMillionths,
|
|
}
|
|
}
|
|
|
|
// DirectedChannel is a type that stores the channel information as seen from
|
|
// one side of the channel.
|
|
type DirectedChannel struct {
|
|
// ChannelID is the unique identifier of this channel.
|
|
ChannelID uint64
|
|
|
|
// IsNode1 indicates if this is the node with the smaller public key.
|
|
IsNode1 bool
|
|
|
|
// OtherNode is the public key of the node on the other end of this
|
|
// channel.
|
|
OtherNode route.Vertex
|
|
|
|
// Capacity is the announced capacity of this channel in satoshis.
|
|
Capacity btcutil.Amount
|
|
|
|
// OutPolicySet is a boolean that indicates whether the node has an
|
|
// outgoing policy set. For pathfinding only the existence of the policy
|
|
// is important to know, not the actual content.
|
|
OutPolicySet bool
|
|
|
|
// InPolicy is the incoming policy *from* the other node to this node.
|
|
// In path finding, we're walking backward from the destination to the
|
|
// source, so we're always interested in the edge that arrives to us
|
|
// from the other node.
|
|
InPolicy *CachedEdgePolicy
|
|
}
|
|
|
|
// DeepCopy creates a deep copy of the channel, including the incoming policy.
|
|
func (c *DirectedChannel) DeepCopy() *DirectedChannel {
|
|
channelCopy := *c
|
|
|
|
if channelCopy.InPolicy != nil {
|
|
inPolicyCopy := *channelCopy.InPolicy
|
|
channelCopy.InPolicy = &inPolicyCopy
|
|
|
|
// The fields for the ToNode can be overwritten by the path
|
|
// finding algorithm, which is why we need a deep copy in the
|
|
// first place. So we always start out with nil values, just to
|
|
// be sure they don't contain any old data.
|
|
channelCopy.InPolicy.ToNodePubKey = nil
|
|
channelCopy.InPolicy.ToNodeFeatures = nil
|
|
}
|
|
|
|
return &channelCopy
|
|
}
|
|
|
|
// GraphCache is a type that holds a minimal set of information of the public
|
|
// channel graph that can be used for pathfinding.
|
|
type GraphCache struct {
|
|
nodeChannels map[route.Vertex]map[uint64]*DirectedChannel
|
|
nodeFeatures map[route.Vertex]*lnwire.FeatureVector
|
|
|
|
mtx sync.RWMutex
|
|
}
|
|
|
|
// NewGraphCache creates a new graphCache.
|
|
func NewGraphCache(preAllocNumNodes int) *GraphCache {
|
|
return &GraphCache{
|
|
nodeChannels: make(
|
|
map[route.Vertex]map[uint64]*DirectedChannel,
|
|
// A channel connects two nodes, so we can look it up
|
|
// from both sides, meaning we get double the number of
|
|
// entries.
|
|
preAllocNumNodes*2,
|
|
),
|
|
nodeFeatures: make(
|
|
map[route.Vertex]*lnwire.FeatureVector,
|
|
preAllocNumNodes,
|
|
),
|
|
}
|
|
}
|
|
|
|
// Stats returns statistics about the current cache size.
|
|
func (c *GraphCache) Stats() string {
|
|
c.mtx.RLock()
|
|
defer c.mtx.RUnlock()
|
|
|
|
numChannels := 0
|
|
for node := range c.nodeChannels {
|
|
numChannels += len(c.nodeChannels[node])
|
|
}
|
|
return fmt.Sprintf("num_node_features=%d, num_nodes=%d, "+
|
|
"num_channels=%d", len(c.nodeFeatures), len(c.nodeChannels),
|
|
numChannels)
|
|
}
|
|
|
|
// AddNode adds a graph node, including all the (directed) channels of that
|
|
// node.
|
|
func (c *GraphCache) AddNode(tx kvdb.RTx, node GraphCacheNode) error {
|
|
nodePubKey := node.PubKey()
|
|
|
|
// Only hold the lock for a short time. The `ForEachChannel()` below is
|
|
// possibly slow as it has to go to the backend, so we can unlock
|
|
// between the calls. And the AddChannel() method will acquire its own
|
|
// lock anyway.
|
|
c.mtx.Lock()
|
|
c.nodeFeatures[nodePubKey] = node.Features()
|
|
c.mtx.Unlock()
|
|
|
|
return node.ForEachChannel(
|
|
tx, func(tx kvdb.RTx, info *ChannelEdgeInfo,
|
|
outPolicy *ChannelEdgePolicy,
|
|
inPolicy *ChannelEdgePolicy) error {
|
|
|
|
c.AddChannel(info, outPolicy, inPolicy)
|
|
|
|
return nil
|
|
},
|
|
)
|
|
}
|
|
|
|
// AddChannel adds a non-directed channel, meaning that the order of policy 1
|
|
// and policy 2 does not matter, the directionality is extracted from the info
|
|
// and policy flags automatically. The policy will be set as the outgoing policy
|
|
// on one node and the incoming policy on the peer's side.
|
|
func (c *GraphCache) AddChannel(info *ChannelEdgeInfo,
|
|
policy1 *ChannelEdgePolicy, policy2 *ChannelEdgePolicy) {
|
|
|
|
if info == nil {
|
|
return
|
|
}
|
|
|
|
if policy1 != nil && policy1.IsDisabled() &&
|
|
policy2 != nil && policy2.IsDisabled() {
|
|
|
|
return
|
|
}
|
|
|
|
// Create the edge entry for both nodes.
|
|
c.mtx.Lock()
|
|
c.updateOrAddEdge(info.NodeKey1Bytes, &DirectedChannel{
|
|
ChannelID: info.ChannelID,
|
|
IsNode1: true,
|
|
OtherNode: info.NodeKey2Bytes,
|
|
Capacity: info.Capacity,
|
|
})
|
|
c.updateOrAddEdge(info.NodeKey2Bytes, &DirectedChannel{
|
|
ChannelID: info.ChannelID,
|
|
IsNode1: false,
|
|
OtherNode: info.NodeKey1Bytes,
|
|
Capacity: info.Capacity,
|
|
})
|
|
c.mtx.Unlock()
|
|
|
|
// The policy's node is always the to_node. So if policy 1 has to_node
|
|
// of node 2 then we have the policy 1 as seen from node 1.
|
|
if policy1 != nil {
|
|
fromNode, toNode := info.NodeKey1Bytes, info.NodeKey2Bytes
|
|
if policy1.Node.PubKeyBytes != info.NodeKey2Bytes {
|
|
fromNode, toNode = toNode, fromNode
|
|
}
|
|
isEdge1 := policy1.ChannelFlags&lnwire.ChanUpdateDirection == 0
|
|
c.UpdatePolicy(policy1, fromNode, toNode, isEdge1)
|
|
}
|
|
if policy2 != nil {
|
|
fromNode, toNode := info.NodeKey2Bytes, info.NodeKey1Bytes
|
|
if policy2.Node.PubKeyBytes != info.NodeKey1Bytes {
|
|
fromNode, toNode = toNode, fromNode
|
|
}
|
|
isEdge1 := policy2.ChannelFlags&lnwire.ChanUpdateDirection == 0
|
|
c.UpdatePolicy(policy2, fromNode, toNode, isEdge1)
|
|
}
|
|
}
|
|
|
|
// updateOrAddEdge makes sure the edge information for a node is either updated
|
|
// if it already exists or is added to that node's list of channels.
|
|
func (c *GraphCache) updateOrAddEdge(node route.Vertex, edge *DirectedChannel) {
|
|
if len(c.nodeChannels[node]) == 0 {
|
|
c.nodeChannels[node] = make(map[uint64]*DirectedChannel)
|
|
}
|
|
|
|
c.nodeChannels[node][edge.ChannelID] = edge
|
|
}
|
|
|
|
// UpdatePolicy updates a single policy on both the from and to node. The order
|
|
// of the from and to node is not strictly important. But we assume that a
|
|
// channel edge was added beforehand so that the directed channel struct already
|
|
// exists in the cache.
|
|
func (c *GraphCache) UpdatePolicy(policy *ChannelEdgePolicy, fromNode,
|
|
toNode route.Vertex, edge1 bool) {
|
|
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
updatePolicy := func(nodeKey route.Vertex) {
|
|
if len(c.nodeChannels[nodeKey]) == 0 {
|
|
return
|
|
}
|
|
|
|
channel, ok := c.nodeChannels[nodeKey][policy.ChannelID]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// Edge 1 is defined as the policy for the direction of node1 to
|
|
// node2.
|
|
switch {
|
|
// This is node 1, and it is edge 1, so this is the outgoing
|
|
// policy for node 1.
|
|
case channel.IsNode1 && edge1:
|
|
channel.OutPolicySet = true
|
|
|
|
// This is node 2, and it is edge 2, so this is the outgoing
|
|
// policy for node 2.
|
|
case !channel.IsNode1 && !edge1:
|
|
channel.OutPolicySet = true
|
|
|
|
// The other two cases left mean it's the inbound policy for the
|
|
// node.
|
|
default:
|
|
channel.InPolicy = NewCachedPolicy(policy)
|
|
}
|
|
}
|
|
|
|
updatePolicy(fromNode)
|
|
updatePolicy(toNode)
|
|
}
|
|
|
|
// RemoveNode completely removes a node and all its channels (including the
|
|
// peer's side).
|
|
func (c *GraphCache) RemoveNode(node route.Vertex) {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
delete(c.nodeFeatures, node)
|
|
|
|
// First remove all channels from the other nodes' lists.
|
|
for _, channel := range c.nodeChannels[node] {
|
|
c.removeChannelIfFound(channel.OtherNode, channel.ChannelID)
|
|
}
|
|
|
|
// Then remove our whole node completely.
|
|
delete(c.nodeChannels, node)
|
|
}
|
|
|
|
// RemoveChannel removes a single channel between two nodes.
|
|
func (c *GraphCache) RemoveChannel(node1, node2 route.Vertex, chanID uint64) {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
// Remove that one channel from both sides.
|
|
c.removeChannelIfFound(node1, chanID)
|
|
c.removeChannelIfFound(node2, chanID)
|
|
}
|
|
|
|
// removeChannelIfFound removes a single channel from one side.
|
|
func (c *GraphCache) removeChannelIfFound(node route.Vertex, chanID uint64) {
|
|
if len(c.nodeChannels[node]) == 0 {
|
|
return
|
|
}
|
|
|
|
delete(c.nodeChannels[node], chanID)
|
|
}
|
|
|
|
// UpdateChannel updates the channel edge information for a specific edge. We
|
|
// expect the edge to already exist and be known. If it does not yet exist, this
|
|
// call is a no-op.
|
|
func (c *GraphCache) UpdateChannel(info *ChannelEdgeInfo) {
|
|
c.mtx.Lock()
|
|
defer c.mtx.Unlock()
|
|
|
|
if len(c.nodeChannels[info.NodeKey1Bytes]) == 0 ||
|
|
len(c.nodeChannels[info.NodeKey2Bytes]) == 0 {
|
|
|
|
return
|
|
}
|
|
|
|
channel, ok := c.nodeChannels[info.NodeKey1Bytes][info.ChannelID]
|
|
if ok {
|
|
// We only expect to be called when the channel is already
|
|
// known.
|
|
channel.Capacity = info.Capacity
|
|
channel.OtherNode = info.NodeKey2Bytes
|
|
}
|
|
|
|
channel, ok = c.nodeChannels[info.NodeKey2Bytes][info.ChannelID]
|
|
if ok {
|
|
channel.Capacity = info.Capacity
|
|
channel.OtherNode = info.NodeKey1Bytes
|
|
}
|
|
}
|
|
|
|
// getChannels returns a copy of the passed node's channels or nil if there
|
|
// isn't any.
|
|
func (c *GraphCache) getChannels(node route.Vertex) []*DirectedChannel {
|
|
c.mtx.RLock()
|
|
defer c.mtx.RUnlock()
|
|
|
|
channels, ok := c.nodeChannels[node]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
features, ok := c.nodeFeatures[node]
|
|
if !ok {
|
|
// If the features were set to nil explicitly, that's fine here.
|
|
// The router will overwrite the features of the destination
|
|
// node with those found in the invoice if necessary. But if we
|
|
// didn't yet get a node announcement we want to mimic the
|
|
// behavior of the old DB based code that would always set an
|
|
// empty feature vector instead of leaving it nil.
|
|
features = lnwire.EmptyFeatureVector()
|
|
}
|
|
|
|
toNodeCallback := func() route.Vertex {
|
|
return node
|
|
}
|
|
|
|
i := 0
|
|
channelsCopy := make([]*DirectedChannel, len(channels))
|
|
for _, channel := range channels {
|
|
// We need to copy the channel and policy to avoid it being
|
|
// updated in the cache if the path finding algorithm sets
|
|
// fields on it (currently only the ToNodeFeatures of the
|
|
// policy).
|
|
channelCopy := channel.DeepCopy()
|
|
if channelCopy.InPolicy != nil {
|
|
channelCopy.InPolicy.ToNodePubKey = toNodeCallback
|
|
channelCopy.InPolicy.ToNodeFeatures = features
|
|
}
|
|
|
|
channelsCopy[i] = channelCopy
|
|
i++
|
|
}
|
|
|
|
return channelsCopy
|
|
}
|
|
|
|
// ForEachChannel invokes the given callback for each channel of the given node.
|
|
func (c *GraphCache) ForEachChannel(node route.Vertex,
|
|
cb func(channel *DirectedChannel) error) error {
|
|
|
|
// Obtain a copy of the node's channels. We need do this in order to
|
|
// avoid deadlocks caused by interaction with the graph cache, channel
|
|
// state and the graph database from multiple goroutines. This snapshot
|
|
// is only used for path finding where being stale is acceptable since
|
|
// the real world graph and our representation may always become
|
|
// slightly out of sync for a short time and the actual channel state
|
|
// is stored separately.
|
|
channels := c.getChannels(node)
|
|
for _, channel := range channels {
|
|
if err := cb(channel); err != nil {
|
|
return err
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ForEachNode iterates over the adjacency list of the graph, executing the
|
|
// call back for each node and the set of channels that emanate from the given
|
|
// node.
|
|
//
|
|
// NOTE: This method should be considered _read only_, the channels or nodes
|
|
// passed in MUST NOT be modified.
|
|
func (c *GraphCache) ForEachNode(cb func(node route.Vertex,
|
|
channels map[uint64]*DirectedChannel) error) error {
|
|
|
|
c.mtx.RLock()
|
|
defer c.mtx.RUnlock()
|
|
|
|
for node, channels := range c.nodeChannels {
|
|
// We don't make a copy here since this is a read-only RPC
|
|
// call. We also don't need the node features either for this
|
|
// call.
|
|
if err := cb(node, channels); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetFeatures returns the features of the node with the given ID. If no
|
|
// features are known for the node, an empty feature vector is returned.
|
|
func (c *GraphCache) GetFeatures(node route.Vertex) *lnwire.FeatureVector {
|
|
c.mtx.RLock()
|
|
defer c.mtx.RUnlock()
|
|
|
|
features, ok := c.nodeFeatures[node]
|
|
if !ok || features == nil {
|
|
// The router expects the features to never be nil, so we return
|
|
// an empty feature set instead.
|
|
return lnwire.EmptyFeatureVector()
|
|
}
|
|
|
|
return features
|
|
}
|