channeldb/graph+db: integrate reject and channel caches

This commit is contained in:
Conner Fromknecht 2019-04-01 16:32:52 -07:00
parent b20a254faa
commit ae3a00a5da
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
2 changed files with 207 additions and 44 deletions

View File

@ -899,7 +899,12 @@ type ChannelShell struct {
func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error {
chanGraph := d.ChannelGraph()
return d.Update(func(tx *bbolt.Tx) error {
// TODO(conner): find way to do this w/o accessing internal members?
chanGraph.cacheMu.Lock()
defer chanGraph.cacheMu.Unlock()
var chansRestored []uint64
err := d.Update(func(tx *bbolt.Tx) error {
for _, channelShell := range channelShells {
channel := channelShell.Chan
@ -984,10 +989,22 @@ func (d *DB) RestoreChannelShells(channelShells ...*ChannelShell) error {
if err != nil {
return err
}
chansRestored = append(chansRestored, edgeInfo.ChannelID)
}
return nil
})
if err != nil {
return err
}
for _, chanid := range chansRestored {
chanGraph.rejectCache.remove(chanid)
chanGraph.chanCache.remove(chanid)
}
return nil
}
// AddrsForNode consults the graph and channel database for all addresses known

View File

@ -10,6 +10,7 @@ import (
"io"
"math"
"net"
"sync"
"time"
"github.com/btcsuite/btcd/btcec"
@ -157,15 +158,18 @@ const (
type ChannelGraph struct {
db *DB
// TODO(roasbeef): store and update bloom filter to reduce disk access
// due to current gossip model
// * LRU cache for edges?
cacheMu sync.RWMutex
rejectCache *rejectCache
chanCache *channelCache
}
// newChannelGraph allocates a new ChannelGraph backed by a DB instance.
// newChannelGraph allocates a new ChannelGraph backed by a DB instance. The
// returned instance has its own unique reject cache and channel cache.
func newChannelGraph(db *DB) *ChannelGraph {
return &ChannelGraph{
db: db,
db: db,
rejectCache: newRejectCache(50000),
chanCache: newChannelCache(20000),
}
}
@ -498,9 +502,20 @@ func (c *ChannelGraph) deleteLightningNode(nodes *bbolt.Bucket,
// the channel supports. The chanPoint and chanID are used to uniquely identify
// the edge globally within the database.
func (c *ChannelGraph) AddChannelEdge(edge *ChannelEdgeInfo) error {
return c.db.Update(func(tx *bbolt.Tx) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
err := c.db.Update(func(tx *bbolt.Tx) error {
return c.addChannelEdge(tx, edge)
})
if err != nil {
return err
}
c.rejectCache.remove(edge.ChannelID)
c.chanCache.remove(edge.ChannelID)
return nil
}
// addChannelEdge is the private form of AddChannelEdge that allows callers to
@ -605,17 +620,42 @@ func (c *ChannelGraph) addChannelEdge(tx *bbolt.Tx, edge *ChannelEdgeInfo) error
// was updated for both directed edges are returned along with the boolean. If
// it is not found, then the zombie index is checked and its result is returned
// as the second boolean.
func (c *ChannelGraph) HasChannelEdge(chanID uint64,
) (time.Time, time.Time, bool, bool, error) {
func (c *ChannelGraph) HasChannelEdge(
chanID uint64) (time.Time, time.Time, bool, bool, error) {
var (
node1UpdateTime time.Time
node2UpdateTime time.Time
exists bool
isZombie bool
upd1Time time.Time
upd2Time time.Time
exists bool
isZombie bool
)
err := c.db.View(func(tx *bbolt.Tx) error {
// We'll query the cache with the shared lock held to allow multiple
// readers to access values in the cache concurrently if they exist.
c.cacheMu.RLock()
if entry, ok := c.rejectCache.get(chanID); ok {
c.cacheMu.RUnlock()
upd1Time = time.Unix(entry.upd1Time, 0)
upd2Time = time.Unix(entry.upd2Time, 0)
exists, isZombie = entry.flags.unpack()
return upd1Time, upd2Time, exists, isZombie, nil
}
c.cacheMu.RUnlock()
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
// The item was not found with the shared lock, so we'll acquire the
// exclusive lock and check the cache again in case another method added
// the entry to the cache while no lock was held.
if entry, ok := c.rejectCache.get(chanID); ok {
upd1Time = time.Unix(entry.upd1Time, 0)
upd2Time = time.Unix(entry.upd2Time, 0)
exists, isZombie = entry.flags.unpack()
return upd1Time, upd2Time, exists, isZombie, nil
}
if err := c.db.View(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
@ -634,7 +674,9 @@ func (c *ChannelGraph) HasChannelEdge(chanID uint64,
exists = false
zombieIndex := edges.Bucket(zombieBucket)
if zombieIndex != nil {
isZombie, _, _ = isZombieEdge(zombieIndex, chanID)
isZombie, _, _ = isZombieEdge(
zombieIndex, chanID,
)
}
return nil
@ -660,16 +702,24 @@ func (c *ChannelGraph) HasChannelEdge(chanID uint64,
// As we may have only one of the edges populated, only set the
// update time if the edge was found in the database.
if e1 != nil {
node1UpdateTime = e1.LastUpdate
upd1Time = e1.LastUpdate
}
if e2 != nil {
node2UpdateTime = e2.LastUpdate
upd2Time = e2.LastUpdate
}
return nil
}); err != nil {
return time.Time{}, time.Time{}, exists, isZombie, err
}
c.rejectCache.insert(chanID, rejectCacheEntry{
upd1Time: upd1Time.Unix(),
upd2Time: upd2Time.Unix(),
flags: packRejectFlags(exists, isZombie),
})
return node1UpdateTime, node2UpdateTime, exists, isZombie, err
return upd1Time, upd2Time, exists, isZombie, nil
}
// UpdateChannelEdge retrieves and update edge of the graph database. Method
@ -720,6 +770,9 @@ const (
func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
blockHash *chainhash.Hash, blockHeight uint32) ([]*ChannelEdgeInfo, error) {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
var chansClosed []*ChannelEdgeInfo
err := c.db.Update(func(tx *bbolt.Tx) error {
@ -823,6 +876,11 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint,
return nil, err
}
for _, channel := range chansClosed {
c.rejectCache.remove(channel.ChannelID)
c.chanCache.remove(channel.ChannelID)
}
return chansClosed, nil
}
@ -973,6 +1031,9 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf
var chanIDEnd [8]byte
byteOrder.PutUint64(chanIDEnd[:], endShortChanID.ToUint64())
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
// Keep track of the channels that are removed from the graph.
var removedChans []*ChannelEdgeInfo
@ -1051,6 +1112,11 @@ func (c *ChannelGraph) DisconnectBlockAtHeight(height uint32) ([]*ChannelEdgeInf
return nil, err
}
for _, channel := range removedChans {
c.rejectCache.remove(channel.ChannelID)
c.chanCache.remove(channel.ChannelID)
}
return removedChans, nil
}
@ -1106,7 +1172,17 @@ func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error {
// channels
// TODO(roasbeef): don't delete both edges?
return c.db.Update(func(tx *bbolt.Tx) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
var chanID uint64
err := c.db.Update(func(tx *bbolt.Tx) error {
var err error
chanID, err = getChanID(tx, chanPoint)
if err != nil {
return err
}
// First grab the edges bucket which houses the information
// we'd like to delete
edges := tx.Bucket(edgeBucket)
@ -1138,6 +1214,14 @@ func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error {
chanPoint, true,
)
})
if err != nil {
return err
}
c.rejectCache.remove(chanID)
c.chanCache.remove(chanID)
return nil
}
// ChannelID attempt to lookup the 8-byte compact channel ID which maps to the
@ -1145,33 +1229,39 @@ func (c *ChannelGraph) DeleteChannelEdge(chanPoint *wire.OutPoint) error {
// the database, then ErrEdgeNotFound is returned.
func (c *ChannelGraph) ChannelID(chanPoint *wire.OutPoint) (uint64, error) {
var chanID uint64
if err := c.db.View(func(tx *bbolt.Tx) error {
var err error
chanID, err = getChanID(tx, chanPoint)
return err
}); err != nil {
return 0, err
}
return chanID, nil
}
// getChanID returns the assigned channel ID for a given channel point.
func getChanID(tx *bbolt.Tx, chanPoint *wire.OutPoint) (uint64, error) {
var b bytes.Buffer
if err := writeOutpoint(&b, chanPoint); err != nil {
return 0, nil
}
if err := c.db.View(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
}
chanIndex := edges.Bucket(channelPointBucket)
if chanIndex == nil {
return ErrGraphNoEdgesFound
}
chanIDBytes := chanIndex.Get(b.Bytes())
if chanIDBytes == nil {
return ErrEdgeNotFound
}
chanID = byteOrder.Uint64(chanIDBytes)
return nil
}); err != nil {
return 0, err
edges := tx.Bucket(edgeBucket)
if edges == nil {
return 0, ErrGraphNoEdgesFound
}
chanIndex := edges.Bucket(channelPointBucket)
if chanIndex == nil {
return 0, ErrGraphNoEdgesFound
}
chanIDBytes := chanIndex.Get(b.Bytes())
if chanIDBytes == nil {
return 0, ErrEdgeNotFound
}
chanID := byteOrder.Uint64(chanIDBytes)
return chanID, nil
}
@ -1241,8 +1331,13 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha
// additional map to keep track of the edges already seen to prevent
// re-adding it.
edgesSeen := make(map[uint64]struct{})
edgesToCache := make(map[uint64]ChannelEdge)
var edgesInHorizon []ChannelEdge
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
var hits int
err := c.db.View(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
@ -1292,6 +1387,13 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha
continue
}
if channel, ok := c.chanCache.get(chanIDInt); ok {
hits++
edgesSeen[chanIDInt] = struct{}{}
edgesInHorizon = append(edgesInHorizon, channel)
continue
}
// First, we'll fetch the static edge information.
edgeInfo, err := fetchChanEdgeInfo(edgeIndex, chanID)
if err != nil {
@ -1316,11 +1418,13 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha
// Finally, we'll collate this edge with the rest of
// edges to be returned.
edgesSeen[chanIDInt] = struct{}{}
edgesInHorizon = append(edgesInHorizon, ChannelEdge{
channel := ChannelEdge{
Info: &edgeInfo,
Policy1: edge1,
Policy2: edge2,
})
}
edgesInHorizon = append(edgesInHorizon, channel)
edgesToCache[chanIDInt] = channel
}
return nil
@ -1335,6 +1439,15 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]Cha
return nil, err
}
// Insert any edges loaded from disk into the cache.
for chanid, channel := range edgesToCache {
c.chanCache.insert(chanid, channel)
}
log.Debugf("ChanUpdatesInHorizon hit percentage: %f (%d/%d)",
float64(hits)/float64(len(edgesInHorizon)), hits,
len(edgesInHorizon))
return edgesInHorizon, nil
}
@ -1699,9 +1812,20 @@ func delChannelByEdge(edges, edgeIndex, chanIndex, zombieIndex,
// determined by the lexicographical ordering of the identity public keys of
// the nodes on either side of the channel.
func (c *ChannelGraph) UpdateEdgePolicy(edge *ChannelEdgePolicy) error {
return c.db.Update(func(tx *bbolt.Tx) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
err := c.db.Update(func(tx *bbolt.Tx) error {
return updateEdgePolicy(tx, edge)
})
if err != nil {
return err
}
c.rejectCache.remove(edge.ChannelID)
c.chanCache.remove(edge.ChannelID)
return nil
}
// updateEdgePolicy attempts to update an edge's policy within the relevant
@ -2888,7 +3012,10 @@ func (c *ChannelGraph) NewChannelEdgePolicy() *ChannelEdgePolicy {
func (c *ChannelGraph) MarkEdgeZombie(chanID uint64, pubKey1,
pubKey2 [33]byte) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
err := c.db.Update(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
@ -2899,6 +3026,14 @@ func (c *ChannelGraph) MarkEdgeZombie(chanID uint64, pubKey1,
}
return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2)
})
if err != nil {
return err
}
c.rejectCache.remove(chanID)
c.chanCache.remove(chanID)
return nil
}
// markEdgeZombie marks an edge as a zombie within our zombie index. The public
@ -2919,7 +3054,10 @@ func markEdgeZombie(zombieIndex *bbolt.Bucket, chanID uint64, pubKey1,
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
return c.db.Batch(func(tx *bbolt.Tx) error {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
err := c.db.Update(func(tx *bbolt.Tx) error {
edges := tx.Bucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
@ -2933,6 +3071,14 @@ func (c *ChannelGraph) MarkEdgeLive(chanID uint64) error {
byteOrder.PutUint64(k[:], chanID)
return zombieIndex.Delete(k[:])
})
if err != nil {
return err
}
c.rejectCache.remove(chanID)
c.chanCache.remove(chanID)
return nil
}
// IsZombieEdge returns whether the edge is considered zombie. If it is a