lnd/routing/missioncontrol_store.go
yyforyongyu 2608c0893e
multi: make sure missionControlStore catches done signal
This commit makes sure `missionControlStore` catches the shutdown signal
when draining the ticker. A few debug logs are added to aid the process.
2024-07-23 21:30:08 +08:00

506 lines
12 KiB
Go

package routing
import (
"bytes"
"container/list"
"encoding/binary"
"fmt"
"math"
"sync"
"time"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
)
var (
// resultsKey is the fixed key under which the attempt results are
// stored.
resultsKey = []byte("missioncontrol-results")
// Big endian is the preferred byte order, due to cursor scans over
// integer keys iterating in order.
byteOrder = binary.BigEndian
)
const (
// unknownFailureSourceIdx is the database encoding of an unknown error
// source.
unknownFailureSourceIdx = -1
)
// missionControlStore is a bolt db based implementation of a mission control
// store. It stores the raw payment attempt data from which the internal mission
// controls state can be rederived on startup. This allows the mission control
// internal data structure to be changed without requiring a database migration.
// Also changes to mission control parameters can be applied to historical data.
// Finally, it enables importing raw data from an external source.
type missionControlStore struct {
done chan struct{}
wg sync.WaitGroup
db kvdb.Backend
// queueCond is signalled when items are put into the queue.
queueCond *sync.Cond
// queue stores all pending payment results not yet added to the store.
// Access is protected by the queueCond.L mutex.
queue *list.List
// keys holds the stored MC store item keys in the order of storage.
// We use this list when adding/deleting items from the database to
// avoid cursor use which may be slow in the remote DB case.
keys *list.List
// keysMap holds the stored MC store item keys. We use this map to check
// if a new payment result has already been stored.
keysMap map[string]struct{}
// maxRecords is the maximum amount of records we will store in the db.
maxRecords int
// flushInterval is the configured interval we use to store new results
// and delete outdated ones from the db.
flushInterval time.Duration
}
func newMissionControlStore(db kvdb.Backend, maxRecords int,
flushInterval time.Duration) (*missionControlStore, error) {
var (
keys *list.List
keysMap map[string]struct{}
)
// Create buckets if not yet existing.
err := kvdb.Update(db, func(tx kvdb.RwTx) error {
resultsBucket, err := tx.CreateTopLevelBucket(resultsKey)
if err != nil {
return fmt.Errorf("cannot create results bucket: %w",
err)
}
// Collect all keys to be able to quickly calculate the
// difference when updating the DB state.
c := resultsBucket.ReadCursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
keys.PushBack(string(k))
keysMap[string(k)] = struct{}{}
}
return nil
}, func() {
keys = list.New()
keysMap = make(map[string]struct{})
})
if err != nil {
return nil, err
}
log.Infof("Loaded %d mission control entries", len(keysMap))
return &missionControlStore{
done: make(chan struct{}),
db: db,
queueCond: sync.NewCond(&sync.Mutex{}),
queue: list.New(),
keys: keys,
keysMap: keysMap,
maxRecords: maxRecords,
flushInterval: flushInterval,
}, nil
}
// clear removes all results from the db.
func (b *missionControlStore) clear() error {
b.queueCond.L.Lock()
defer b.queueCond.L.Unlock()
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
if err := tx.DeleteTopLevelBucket(resultsKey); err != nil {
return err
}
_, err := tx.CreateTopLevelBucket(resultsKey)
return err
}, func() {})
if err != nil {
return err
}
b.queue = list.New()
return nil
}
// fetchAll returns all results currently stored in the database.
func (b *missionControlStore) fetchAll() ([]*paymentResult, error) {
var results []*paymentResult
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
resultBucket := tx.ReadBucket(resultsKey)
results = make([]*paymentResult, 0)
return resultBucket.ForEach(func(k, v []byte) error {
result, err := deserializeResult(k, v)
if err != nil {
return err
}
results = append(results, result)
return nil
})
}, func() {
results = nil
})
if err != nil {
return nil, err
}
return results, nil
}
// serializeResult serializes a payment result and returns a key and value byte
// slice to insert into the bucket.
func serializeResult(rp *paymentResult) ([]byte, []byte, error) {
// Write timestamps, success status, failure source index and route.
var b bytes.Buffer
var dbFailureSourceIdx int32
if rp.failureSourceIdx == nil {
dbFailureSourceIdx = unknownFailureSourceIdx
} else {
dbFailureSourceIdx = int32(*rp.failureSourceIdx)
}
err := channeldb.WriteElements(
&b,
uint64(rp.timeFwd.UnixNano()),
uint64(rp.timeReply.UnixNano()),
rp.success, dbFailureSourceIdx,
)
if err != nil {
return nil, nil, err
}
if err := channeldb.SerializeRoute(&b, *rp.route); err != nil {
return nil, nil, err
}
// Write failure. If there is no failure message, write an empty
// byte slice.
var failureBytes bytes.Buffer
if rp.failure != nil {
err := lnwire.EncodeFailureMessage(&failureBytes, rp.failure, 0)
if err != nil {
return nil, nil, err
}
}
err = wire.WriteVarBytes(&b, 0, failureBytes.Bytes())
if err != nil {
return nil, nil, err
}
// Compose key that identifies this result.
key := getResultKey(rp)
return key, b.Bytes(), nil
}
// deserializeResult deserializes a payment result.
func deserializeResult(k, v []byte) (*paymentResult, error) {
// Parse payment id.
result := paymentResult{
id: byteOrder.Uint64(k[8:]),
}
r := bytes.NewReader(v)
// Read timestamps, success status and failure source index.
var (
timeFwd, timeReply uint64
dbFailureSourceIdx int32
)
err := channeldb.ReadElements(
r, &timeFwd, &timeReply, &result.success, &dbFailureSourceIdx,
)
if err != nil {
return nil, err
}
// Convert time stamps to local time zone for consistent logging.
result.timeFwd = time.Unix(0, int64(timeFwd)).Local()
result.timeReply = time.Unix(0, int64(timeReply)).Local()
// Convert from unknown index magic number to nil value.
if dbFailureSourceIdx != unknownFailureSourceIdx {
failureSourceIdx := int(dbFailureSourceIdx)
result.failureSourceIdx = &failureSourceIdx
}
// Read route.
route, err := channeldb.DeserializeRoute(r)
if err != nil {
return nil, err
}
result.route = &route
// Read failure.
failureBytes, err := wire.ReadVarBytes(
r, 0, math.MaxUint16, "failure",
)
if err != nil {
return nil, err
}
if len(failureBytes) > 0 {
result.failure, err = lnwire.DecodeFailureMessage(
bytes.NewReader(failureBytes), 0,
)
if err != nil {
return nil, err
}
}
return &result, nil
}
// AddResult adds a new result to the db.
func (b *missionControlStore) AddResult(rp *paymentResult) {
b.queueCond.L.Lock()
b.queue.PushBack(rp)
b.queueCond.L.Unlock()
b.queueCond.Signal()
}
// stop stops the store ticker goroutine.
func (b *missionControlStore) stop() {
close(b.done)
b.queueCond.Signal()
b.wg.Wait()
}
// run runs the MC store ticker goroutine.
func (b *missionControlStore) run() {
b.wg.Add(1)
go func() {
defer b.wg.Done()
timer := time.NewTimer(b.flushInterval)
// Immediately stop the timer. It will be started once new
// items are added to the store. As the doc for time.Timer
// states, every call to Stop() done on a timer that is not
// known to have been fired needs to be checked and the timer's
// channel needs to be drained appropriately. This could happen
// if the flushInterval is very small (e.g. 1 nanosecond).
if !timer.Stop() {
select {
case <-timer.C:
case <-b.done:
log.Debugf("Stopping mission control store")
}
}
for {
// Wait for the queue to not be empty.
b.queueCond.L.Lock()
for b.queue.Front() == nil {
b.queueCond.Wait()
select {
case <-b.done:
b.queueCond.L.Unlock()
return
default:
}
}
b.queueCond.L.Unlock()
// Restart the timer.
timer.Reset(b.flushInterval)
select {
case <-timer.C:
if err := b.storeResults(); err != nil {
log.Errorf("Failed to update mission "+
"control store: %v", err)
}
case <-b.done:
// Release the timer's resources.
if !timer.Stop() {
select {
case <-timer.C:
case <-b.done:
log.Debugf("Mission control " +
"store stopped")
}
}
return
}
}
}()
}
// storeResults stores all accumulated results.
func (b *missionControlStore) storeResults() error {
// We copy a reference to the queue and clear the original queue to be
// able to release the lock.
b.queueCond.L.Lock()
l := b.queue
if l.Len() == 0 {
b.queueCond.L.Unlock()
return nil
}
b.queue = list.New()
b.queueCond.L.Unlock()
var (
newKeys map[string]struct{}
delKeys []string
storeCount int
pruneCount int
)
// Create a deduped list of new entries.
newKeys = make(map[string]struct{}, l.Len())
for e := l.Front(); e != nil; e = e.Next() {
pr, ok := e.Value.(*paymentResult)
if !ok {
return fmt.Errorf("wrong type %T (not *paymentResult)",
e.Value)
}
key := string(getResultKey(pr))
if _, ok := b.keysMap[key]; ok {
l.Remove(e)
continue
}
if _, ok := newKeys[key]; ok {
l.Remove(e)
continue
}
newKeys[key] = struct{}{}
}
// Create a list of entries to delete.
toDelete := b.keys.Len() + len(newKeys) - b.maxRecords
if b.maxRecords > 0 && toDelete > 0 {
delKeys = make([]string, 0, toDelete)
// Delete as many as needed from old keys.
for e := b.keys.Front(); len(delKeys) < toDelete && e != nil; {
key, ok := e.Value.(string)
if !ok {
return fmt.Errorf("wrong type %T (not string)",
e.Value)
}
delKeys = append(delKeys, key)
e = e.Next()
}
// If more deletions are needed, simply do not add from the
// list of new keys.
for e := l.Front(); len(delKeys) < toDelete && e != nil; {
toDelete--
pr, ok := e.Value.(*paymentResult)
if !ok {
return fmt.Errorf("wrong type %T (not "+
"*paymentResult )", e.Value)
}
key := string(getResultKey(pr))
delete(newKeys, key)
l.Remove(e)
e = l.Front()
}
}
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
bucket := tx.ReadWriteBucket(resultsKey)
for e := l.Front(); e != nil; e = e.Next() {
pr, ok := e.Value.(*paymentResult)
if !ok {
return fmt.Errorf("wrong type %T (not "+
"*paymentResult)", e.Value)
}
// Serialize result into key and value byte slices.
k, v, err := serializeResult(pr)
if err != nil {
return err
}
// Put into results bucket.
if err := bucket.Put(k, v); err != nil {
return err
}
storeCount++
}
// Prune oldest entries.
for _, key := range delKeys {
if err := bucket.Delete([]byte(key)); err != nil {
return err
}
pruneCount++
}
return nil
}, func() {
storeCount, pruneCount = 0, 0
})
if err != nil {
return err
}
log.Debugf("Stored mission control results: %d added, %d deleted",
storeCount, pruneCount)
// DB Update was successful, update the in-memory cache.
for _, key := range delKeys {
delete(b.keysMap, key)
b.keys.Remove(b.keys.Front())
}
for e := l.Front(); e != nil; e = e.Next() {
pr, ok := e.Value.(*paymentResult)
if !ok {
return fmt.Errorf("wrong type %T (not *paymentResult)",
e.Value)
}
key := string(getResultKey(pr))
b.keys.PushBack(key)
}
return nil
}
// getResultKey returns a byte slice representing a unique key for this payment
// result.
func getResultKey(rp *paymentResult) []byte {
var keyBytes [8 + 8 + 33]byte
// Identify records by a combination of time, payment id and sender pub
// key. This allows importing mission control data from an external
// source without key collisions and keeps the records sorted
// chronologically.
byteOrder.PutUint64(keyBytes[:], uint64(rp.timeReply.UnixNano()))
byteOrder.PutUint64(keyBytes[8:], rp.id)
copy(keyBytes[16:], rp.route.SourcePubKey[:])
return keyBytes[:]
}