mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-03-04 09:48:19 +01:00
This removes duplication of in-memory data during the periodic flushing stage of the mission control store. The existing code entirely duplicates the in-memory cache of the store, which is very wasteful when only a few additional results are being rotated into the store. This has a significant performance penalty specially for wallets that remain online for a long time with a low volume of payments. The worst case scenario are wallets that see at most 1 new payment a second, where the entire in-memory cache is recreated every second. This commit improves the situation by determining what will be the actual changes that need to be committed before initiating the db transaction and only keeping track of these to update the in-memory cache if the db tx is successful.
496 lines
12 KiB
Go
496 lines
12 KiB
Go
package routing
|
|
|
|
import (
|
|
"bytes"
|
|
"container/list"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"math"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/btcsuite/btcd/wire"
|
|
"github.com/lightningnetwork/lnd/channeldb"
|
|
"github.com/lightningnetwork/lnd/kvdb"
|
|
"github.com/lightningnetwork/lnd/lnwire"
|
|
)
|
|
|
|
var (
|
|
// resultsKey is the fixed key under which the attempt results are
|
|
// stored.
|
|
resultsKey = []byte("missioncontrol-results")
|
|
|
|
// Big endian is the preferred byte order, due to cursor scans over
|
|
// integer keys iterating in order.
|
|
byteOrder = binary.BigEndian
|
|
)
|
|
|
|
const (
|
|
// unknownFailureSourceIdx is the database encoding of an unknown error
|
|
// source.
|
|
unknownFailureSourceIdx = -1
|
|
)
|
|
|
|
// missionControlStore is a bolt db based implementation of a mission control
|
|
// store. It stores the raw payment attempt data from which the internal mission
|
|
// controls state can be rederived on startup. This allows the mission control
|
|
// internal data structure to be changed without requiring a database migration.
|
|
// Also changes to mission control parameters can be applied to historical data.
|
|
// Finally, it enables importing raw data from an external source.
|
|
type missionControlStore struct {
|
|
done chan struct{}
|
|
wg sync.WaitGroup
|
|
db kvdb.Backend
|
|
|
|
// queueCond is signalled when items are put into the queue.
|
|
queueCond *sync.Cond
|
|
|
|
// queue stores all pending payment results not yet added to the store.
|
|
// Access is protected by the queueCond.L mutex.
|
|
queue *list.List
|
|
|
|
// keys holds the stored MC store item keys in the order of storage.
|
|
// We use this list when adding/deleting items from the database to
|
|
// avoid cursor use which may be slow in the remote DB case.
|
|
keys *list.List
|
|
|
|
// keysMap holds the stored MC store item keys. We use this map to check
|
|
// if a new payment result has already been stored.
|
|
keysMap map[string]struct{}
|
|
|
|
// maxRecords is the maximum amount of records we will store in the db.
|
|
maxRecords int
|
|
|
|
// flushInterval is the configured interval we use to store new results
|
|
// and delete outdated ones from the db.
|
|
flushInterval time.Duration
|
|
}
|
|
|
|
func newMissionControlStore(db kvdb.Backend, maxRecords int,
|
|
flushInterval time.Duration) (*missionControlStore, error) {
|
|
|
|
var (
|
|
keys *list.List
|
|
keysMap map[string]struct{}
|
|
)
|
|
|
|
// Create buckets if not yet existing.
|
|
err := kvdb.Update(db, func(tx kvdb.RwTx) error {
|
|
resultsBucket, err := tx.CreateTopLevelBucket(resultsKey)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot create results bucket: %w",
|
|
err)
|
|
}
|
|
|
|
// Collect all keys to be able to quickly calculate the
|
|
// difference when updating the DB state.
|
|
c := resultsBucket.ReadCursor()
|
|
for k, _ := c.First(); k != nil; k, _ = c.Next() {
|
|
keys.PushBack(string(k))
|
|
keysMap[string(k)] = struct{}{}
|
|
}
|
|
|
|
return nil
|
|
}, func() {
|
|
keys = list.New()
|
|
keysMap = make(map[string]struct{})
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log.Infof("Loaded %d mission control entries", len(keysMap))
|
|
|
|
return &missionControlStore{
|
|
done: make(chan struct{}),
|
|
db: db,
|
|
queueCond: sync.NewCond(&sync.Mutex{}),
|
|
queue: list.New(),
|
|
keys: keys,
|
|
keysMap: keysMap,
|
|
maxRecords: maxRecords,
|
|
flushInterval: flushInterval,
|
|
}, nil
|
|
}
|
|
|
|
// clear removes all results from the db.
|
|
func (b *missionControlStore) clear() error {
|
|
b.queueCond.L.Lock()
|
|
defer b.queueCond.L.Unlock()
|
|
|
|
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
|
|
if err := tx.DeleteTopLevelBucket(resultsKey); err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err := tx.CreateTopLevelBucket(resultsKey)
|
|
return err
|
|
}, func() {})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
b.queue = list.New()
|
|
return nil
|
|
}
|
|
|
|
// fetchAll returns all results currently stored in the database.
|
|
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
|
|
var results []*paymentResult
|
|
|
|
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
|
|
resultBucket := tx.ReadBucket(resultsKey)
|
|
results = make([]*paymentResult, 0)
|
|
|
|
return resultBucket.ForEach(func(k, v []byte) error {
|
|
result, err := deserializeResult(k, v)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
results = append(results, result)
|
|
|
|
return nil
|
|
})
|
|
|
|
}, func() {
|
|
results = nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// serializeResult serializes a payment result and returns a key and value byte
|
|
// slice to insert into the bucket.
|
|
func serializeResult(rp *paymentResult) ([]byte, []byte, error) {
|
|
// Write timestamps, success status, failure source index and route.
|
|
var b bytes.Buffer
|
|
|
|
var dbFailureSourceIdx int32
|
|
if rp.failureSourceIdx == nil {
|
|
dbFailureSourceIdx = unknownFailureSourceIdx
|
|
} else {
|
|
dbFailureSourceIdx = int32(*rp.failureSourceIdx)
|
|
}
|
|
|
|
err := channeldb.WriteElements(
|
|
&b,
|
|
uint64(rp.timeFwd.UnixNano()),
|
|
uint64(rp.timeReply.UnixNano()),
|
|
rp.success, dbFailureSourceIdx,
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
if err := channeldb.SerializeRoute(&b, *rp.route); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Write failure. If there is no failure message, write an empty
|
|
// byte slice.
|
|
var failureBytes bytes.Buffer
|
|
if rp.failure != nil {
|
|
err := lnwire.EncodeFailureMessage(&failureBytes, rp.failure, 0)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
err = wire.WriteVarBytes(&b, 0, failureBytes.Bytes())
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Compose key that identifies this result.
|
|
key := getResultKey(rp)
|
|
|
|
return key, b.Bytes(), nil
|
|
}
|
|
|
|
// deserializeResult deserializes a payment result.
|
|
func deserializeResult(k, v []byte) (*paymentResult, error) {
|
|
// Parse payment id.
|
|
result := paymentResult{
|
|
id: byteOrder.Uint64(k[8:]),
|
|
}
|
|
|
|
r := bytes.NewReader(v)
|
|
|
|
// Read timestamps, success status and failure source index.
|
|
var (
|
|
timeFwd, timeReply uint64
|
|
dbFailureSourceIdx int32
|
|
)
|
|
|
|
err := channeldb.ReadElements(
|
|
r, &timeFwd, &timeReply, &result.success, &dbFailureSourceIdx,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Convert time stamps to local time zone for consistent logging.
|
|
result.timeFwd = time.Unix(0, int64(timeFwd)).Local()
|
|
result.timeReply = time.Unix(0, int64(timeReply)).Local()
|
|
|
|
// Convert from unknown index magic number to nil value.
|
|
if dbFailureSourceIdx != unknownFailureSourceIdx {
|
|
failureSourceIdx := int(dbFailureSourceIdx)
|
|
result.failureSourceIdx = &failureSourceIdx
|
|
}
|
|
|
|
// Read route.
|
|
route, err := channeldb.DeserializeRoute(r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
result.route = &route
|
|
|
|
// Read failure.
|
|
failureBytes, err := wire.ReadVarBytes(
|
|
r, 0, math.MaxUint16, "failure",
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(failureBytes) > 0 {
|
|
result.failure, err = lnwire.DecodeFailureMessage(
|
|
bytes.NewReader(failureBytes), 0,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
// AddResult adds a new result to the db.
|
|
func (b *missionControlStore) AddResult(rp *paymentResult) {
|
|
b.queueCond.L.Lock()
|
|
b.queue.PushBack(rp)
|
|
b.queueCond.L.Unlock()
|
|
|
|
b.queueCond.Signal()
|
|
}
|
|
|
|
// stop stops the store ticker goroutine.
|
|
func (b *missionControlStore) stop() {
|
|
close(b.done)
|
|
|
|
b.queueCond.Signal()
|
|
|
|
b.wg.Wait()
|
|
}
|
|
|
|
// run runs the MC store ticker goroutine.
|
|
func (b *missionControlStore) run() {
|
|
b.wg.Add(1)
|
|
|
|
go func() {
|
|
defer b.wg.Done()
|
|
|
|
timer := time.NewTimer(b.flushInterval)
|
|
|
|
// Immediately stop the timer. It will be started once new
|
|
// items are added to the store. As the doc for time.Timer
|
|
// states, every call to Stop() done on a timer that is not
|
|
// known to have been fired needs to be checked and the timer's
|
|
// channel needs to be drained appropriately. This could happen
|
|
// if the flushInterval is very small (e.g. 1 nanosecond).
|
|
if !timer.Stop() {
|
|
<-timer.C
|
|
}
|
|
|
|
for {
|
|
// Wait for the queue to not be empty.
|
|
b.queueCond.L.Lock()
|
|
for b.queue.Front() == nil {
|
|
b.queueCond.Wait()
|
|
|
|
select {
|
|
case <-b.done:
|
|
b.queueCond.L.Unlock()
|
|
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
b.queueCond.L.Unlock()
|
|
|
|
// Restart the timer.
|
|
timer.Reset(b.flushInterval)
|
|
|
|
select {
|
|
case <-timer.C:
|
|
if err := b.storeResults(); err != nil {
|
|
log.Errorf("Failed to update mission "+
|
|
"control store: %v", err)
|
|
}
|
|
|
|
case <-b.done:
|
|
// Release the timer's resources.
|
|
if !timer.Stop() {
|
|
<-timer.C
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// storeResults stores all accumulated results.
|
|
func (b *missionControlStore) storeResults() error {
|
|
// We copy a reference to the queue and clear the original queue to be
|
|
// able to release the lock.
|
|
b.queueCond.L.Lock()
|
|
l := b.queue
|
|
|
|
if l.Len() == 0 {
|
|
b.queueCond.L.Unlock()
|
|
|
|
return nil
|
|
}
|
|
b.queue = list.New()
|
|
b.queueCond.L.Unlock()
|
|
|
|
var (
|
|
newKeys map[string]struct{}
|
|
delKeys []string
|
|
storeCount int
|
|
pruneCount int
|
|
)
|
|
|
|
// Create a deduped list of new entries.
|
|
newKeys = make(map[string]struct{}, l.Len())
|
|
for e := l.Front(); e != nil; e = e.Next() {
|
|
pr, ok := e.Value.(*paymentResult)
|
|
if !ok {
|
|
return fmt.Errorf("wrong type %T (not *paymentResult)",
|
|
e.Value)
|
|
}
|
|
key := string(getResultKey(pr))
|
|
if _, ok := b.keysMap[key]; ok {
|
|
l.Remove(e)
|
|
continue
|
|
}
|
|
if _, ok := newKeys[key]; ok {
|
|
l.Remove(e)
|
|
continue
|
|
}
|
|
newKeys[key] = struct{}{}
|
|
}
|
|
|
|
// Create a list of entries to delete.
|
|
toDelete := b.keys.Len() + len(newKeys) - b.maxRecords
|
|
if b.maxRecords > 0 && toDelete > 0 {
|
|
delKeys = make([]string, 0, toDelete)
|
|
|
|
// Delete as many as needed from old keys.
|
|
for e := b.keys.Front(); len(delKeys) < toDelete && e != nil; {
|
|
key, ok := e.Value.(string)
|
|
if !ok {
|
|
return fmt.Errorf("wrong type %T (not string)",
|
|
e.Value)
|
|
}
|
|
delKeys = append(delKeys, key)
|
|
e = e.Next()
|
|
}
|
|
|
|
// If more deletions are needed, simply do not add from the
|
|
// list of new keys.
|
|
for e := l.Front(); len(delKeys) < toDelete && e != nil; {
|
|
toDelete--
|
|
pr, ok := e.Value.(*paymentResult)
|
|
if !ok {
|
|
return fmt.Errorf("wrong type %T (not "+
|
|
"*paymentResult )", e.Value)
|
|
}
|
|
key := string(getResultKey(pr))
|
|
delete(newKeys, key)
|
|
l.Remove(e)
|
|
e = l.Front()
|
|
}
|
|
}
|
|
|
|
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
|
|
bucket := tx.ReadWriteBucket(resultsKey)
|
|
|
|
for e := l.Front(); e != nil; e = e.Next() {
|
|
pr, ok := e.Value.(*paymentResult)
|
|
if !ok {
|
|
return fmt.Errorf("wrong type %T (not "+
|
|
"*paymentResult)", e.Value)
|
|
}
|
|
|
|
// Serialize result into key and value byte slices.
|
|
k, v, err := serializeResult(pr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Put into results bucket.
|
|
if err := bucket.Put(k, v); err != nil {
|
|
return err
|
|
}
|
|
|
|
storeCount++
|
|
}
|
|
|
|
// Prune oldest entries.
|
|
for _, key := range delKeys {
|
|
if err := bucket.Delete([]byte(key)); err != nil {
|
|
return err
|
|
}
|
|
pruneCount++
|
|
}
|
|
|
|
return nil
|
|
}, func() {
|
|
storeCount, pruneCount = 0, 0
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Debugf("Stored mission control results: %d added, %d deleted",
|
|
storeCount, pruneCount)
|
|
|
|
// DB Update was successful, update the in-memory cache.
|
|
for _, key := range delKeys {
|
|
delete(b.keysMap, key)
|
|
b.keys.Remove(b.keys.Front())
|
|
}
|
|
for e := l.Front(); e != nil; e = e.Next() {
|
|
pr, ok := e.Value.(*paymentResult)
|
|
if !ok {
|
|
return fmt.Errorf("wrong type %T (not *paymentResult)",
|
|
e.Value)
|
|
}
|
|
key := string(getResultKey(pr))
|
|
b.keys.PushBack(key)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// getResultKey returns a byte slice representing a unique key for this payment
|
|
// result.
|
|
func getResultKey(rp *paymentResult) []byte {
|
|
var keyBytes [8 + 8 + 33]byte
|
|
|
|
// Identify records by a combination of time, payment id and sender pub
|
|
// key. This allows importing mission control data from an external
|
|
// source without key collisions and keeps the records sorted
|
|
// chronologically.
|
|
byteOrder.PutUint64(keyBytes[:], uint64(rp.timeReply.UnixNano()))
|
|
byteOrder.PutUint64(keyBytes[8:], rp.id)
|
|
copy(keyBytes[16:], rp.route.SourcePubKey[:])
|
|
|
|
return keyBytes[:]
|
|
}
|