channeldb: write chan updates through reject+channel cache

This commit is contained in:
Conner Fromknecht 2019-04-01 16:34:51 -07:00
parent 63b15fd8fb
commit 5d98a94d60
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
3 changed files with 48 additions and 12 deletions

View File

@ -992,7 +992,7 @@ func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error {
chanEdge.ChannelFlags |= lnwire.ChanUpdateDirection chanEdge.ChannelFlags |= lnwire.ChanUpdateDirection
} }
err = updateEdgePolicy(tx, &chanEdge) _, err = updateEdgePolicy(tx, &chanEdge)
if err != nil { if err != nil {
return err return err
} }

View File

@ -1815,34 +1815,62 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error {
c.cacheMu.Lock() c.cacheMu.Lock()
defer c.cacheMu.Unlock() defer c.cacheMu.Unlock()
var isUpdate1 bool
err := c.db.Update(func(tx *bbolt.Tx) error { err := c.db.Update(func(tx *bbolt.Tx) error {
return updateEdgePolicy(tx, edge) var err error
isUpdate1, err = updateEdgePolicy(tx, edge)
return err
}) })
if err != nil { if err != nil {
return err return err
} }
c.rejectCache.remove(edge.ChannelID) // If an entry for this channel is found in reject cache, we'll modify
c.chanCache.remove(edge.ChannelID) // the entry with the updated timestamp for the direction that was just
// written. If the edge doesn't exist, we'll load the cache entry lazily
// during the next query for this edge.
if entry, ok := c.rejectCache.get(edge.ChannelID); ok {
if isUpdate1 {
entry.upd1Time = edge.LastUpdate.Unix()
} else {
entry.upd2Time = edge.LastUpdate.Unix()
}
c.rejectCache.insert(edge.ChannelID, entry)
}
// If an entry for this channel is found in channel cache, we'll modify
// the entry with the updated policy for the direction that was just
// written. If the edge doesn't exist, we'll defer loading the info and
// policies and lazily read from disk during the next query.
if channel, ok := c.chanCache.get(edge.ChannelID); ok {
if isUpdate1 {
channel.Policy1 = edge
} else {
channel.Policy2 = edge
}
c.chanCache.insert(edge.ChannelID, channel)
}
return nil return nil
} }
// updateEdgePolicy attempts to update an edge's policy within the relevant // updateEdgePolicy attempts to update an edge's policy within the relevant
// buckets using an existing database transaction. // buckets using an existing database transaction. The returned boolean will be
func updateEdgePolicy(tx *bbolt.Tx, edge *ChannelEdgePolicy) error { // true if the updated policy belongs to node1, and false if the policy belonged
// to node2.
func updateEdgePolicy(tx *bbolt.Tx, edge *ChannelEdgePolicy) (bool, error) {
edges := tx.Bucket(edgeBucket) edges := tx.Bucket(edgeBucket)
if edges == nil { if edges == nil {
return ErrEdgeNotFound return false, ErrEdgeNotFound
} }
edgeIndex := edges.Bucket(edgeIndexBucket) edgeIndex := edges.Bucket(edgeIndexBucket)
if edgeIndex == nil { if edgeIndex == nil {
return ErrEdgeNotFound return false, ErrEdgeNotFound
} }
nodes, err := tx.CreateBucketIfNotExists(nodeBucket) nodes, err := tx.CreateBucketIfNotExists(nodeBucket)
if err != nil { if err != nil {
return err return false, err
} }
// Create the channelID key be converting the channel ID // Create the channelID key be converting the channel ID
@ -1854,23 +1882,31 @@ func updateEdgePolicy(tx *bbolt.Tx, edge *ChannelEdgePolicy) error {
// nodes which connect this channel edge. // nodes which connect this channel edge.
nodeInfo := edgeIndex.Get(chanID[:]) nodeInfo := edgeIndex.Get(chanID[:])
if nodeInfo == nil { if nodeInfo == nil {
return ErrEdgeNotFound return false, ErrEdgeNotFound
} }
// Depending on the flags value passed above, either the first // Depending on the flags value passed above, either the first
// or second edge policy is being updated. // or second edge policy is being updated.
var fromNode, toNode []byte var fromNode, toNode []byte
var isUpdate1 bool
if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 { if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
fromNode = nodeInfo[:33] fromNode = nodeInfo[:33]
toNode = nodeInfo[33:66] toNode = nodeInfo[33:66]
isUpdate1 = true
} else { } else {
fromNode = nodeInfo[33:66] fromNode = nodeInfo[33:66]
toNode = nodeInfo[:33] toNode = nodeInfo[:33]
isUpdate1 = false
} }
// Finally, with the direction of the edge being updated // Finally, with the direction of the edge being updated
// identified, we update the on-disk edge representation. // identified, we update the on-disk edge representation.
return putChanEdgePolicy(edges, nodes, edge, fromNode, toNode) err = putChanEdgePolicy(edges, nodes, edge, fromNode, toNode)
if err != nil {
return false, err
}
return isUpdate1, nil
} }
// LightningNode represents an individual vertex/node within the channel graph. // LightningNode represents an individual vertex/node within the channel graph.

View File

@ -564,7 +564,7 @@ func migratePruneEdgeUpdateIndex(tx *bbolt.Tx) error {
return err return err
} }
err = updateEdgePolicy(tx, edgePolicy) _, err = updateEdgePolicy(tx, edgePolicy)
if err != nil { if err != nil {
return err return err
} }