routing: store missioncontrol state in blocks and eliminate cursor use

This commit changes missioncontrol's store update from per payment to
every second. Updating the missioncontrol store on every payment caused
gradual slowdown when using etcd.
We also completely eliminate the use of the cursor, further reducing
the performance bottleneck.
This commit is contained in:
Andras Banki-Horvath 2021-06-25 22:22:12 +02:00
parent 79010cc097
commit 6d80ddfe91
No known key found for this signature in database
GPG Key ID: 80E5375C094198D8
7 changed files with 214 additions and 61 deletions

View File

@ -49,6 +49,7 @@ func DefaultConfig() *Config {
AttemptCost: routing.DefaultAttemptCost.ToSatoshis(),
AttemptCostPPM: routing.DefaultAttemptCostPPM,
MaxMcHistory: routing.DefaultMaxMcHistory,
McFlushInterval: routing.DefaultMcFlushInterval,
}
return &Config{
@ -66,5 +67,6 @@ func GetRoutingConfig(cfg *Config) *RoutingConfig {
AttemptCostPPM: cfg.AttemptCostPPM,
PenaltyHalfLife: cfg.PenaltyHalfLife,
MaxMcHistory: cfg.MaxMcHistory,
McFlushInterval: cfg.McFlushInterval,
}
}

View File

@ -43,4 +43,8 @@ type RoutingConfig struct {
// MaxMcHistory defines the maximum number of payment results that
// are held on disk by mission control.
MaxMcHistory int `long:"maxmchistory" description:"the maximum number of payment results that are held on disk by mission control"`
// McFlushInterval defines the timer interval to use to flush mission
// control state to the DB.
McFlushInterval time.Duration `long:"mcflushinterval" description:"the timer interval to use to flush mission control state to the DB"`
}

View File

@ -45,6 +45,10 @@ const (
// DefaultMaxMcHistory is the default maximum history size.
DefaultMaxMcHistory = 1000
// DefaultMcFlushInterval is the defaul inteval we use to flush MC state
// to the database.
DefaultMcFlushInterval = time.Second
// prevSuccessProbability is the assumed probability for node pairs that
// successfully relayed the previous attempt.
prevSuccessProbability = 0.95
@ -119,6 +123,10 @@ type MissionControlConfig struct {
// held on disk.
MaxMcHistory int
// McFlushInterval defines the ticker interval when we flush the
// accumulated state to the DB.
McFlushInterval time.Duration
// MinFailureRelaxInterval is the minimum time that must have passed
// since the previously recorded failure before the failure amount may
// be raised.
@ -209,7 +217,9 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex,
return nil, err
}
store, err := newMissionControlStore(db, cfg.MaxMcHistory)
store, err := newMissionControlStore(
db, cfg.MaxMcHistory, cfg.McFlushInterval,
)
if err != nil {
return nil, err
}
@ -234,6 +244,16 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex,
return mc, nil
}
// RunStoreTicker runs the mission control store's ticker.
func (m *MissionControl) RunStoreTicker() {
m.store.run()
}
// StopStoreTicker stops the mission control store's ticker.
func (m *MissionControl) StopStoreTicker() {
m.store.stop()
}
// init initializes mission control with historical data.
func (m *MissionControl) init() error {
log.Debugf("Mission control state reconstruction started")
@ -265,6 +285,7 @@ func (m *MissionControl) GetConfig() *MissionControlConfig {
return &MissionControlConfig{
ProbabilityEstimatorCfg: m.estimator.ProbabilityEstimatorCfg,
MaxMcHistory: m.store.maxRecords,
McFlushInterval: m.store.flushInterval,
MinFailureRelaxInterval: m.state.minFailureRelaxInterval,
}
}
@ -429,9 +450,7 @@ func (m *MissionControl) processPaymentResult(result *paymentResult) (
*channeldb.FailureReason, error) {
// Store complete result in database.
if err := m.store.AddResult(result); err != nil {
return nil, err
}
m.store.AddResult(result)
m.Lock()
defer m.Unlock()

View File

@ -2,8 +2,10 @@ package routing
import (
"bytes"
"container/list"
"encoding/binary"
"fmt"
"sync"
"time"
"github.com/btcsuite/btcd/wire"
@ -35,13 +37,38 @@ const (
// Also changes to mission control parameters can be applied to historical data.
// Finally, it enables importing raw data from an external source.
type missionControlStore struct {
done chan struct{}
wg sync.WaitGroup
db kvdb.Backend
queueMx sync.Mutex
// queue stores all pending payment results not yet added to the store.
queue *list.List
// keys holds the stored MC store item keys in the order of storage.
// We use this list when adding/deleting items from the database to
// avoid cursor use which may be slow in the remote DB case.
keys *list.List
// keysMap holds the stored MC store item keys. We use this map to check
// if a new payment result has already been stored.
keysMap map[string]struct{}
// maxRecords is the maximum amount of records we will store in the db.
maxRecords int
numRecords int
// flushInterval is the configured interval we use to store new results
// and delete outdated ones from the db.
flushInterval time.Duration
}
func newMissionControlStore(db kvdb.Backend, maxRecords int) (*missionControlStore, error) {
var store *missionControlStore
func newMissionControlStore(db kvdb.Backend, maxRecords int,
flushInterval time.Duration) (*missionControlStore, error) {
var (
keys *list.List
keysMap map[string]struct{}
)
// Create buckets if not yet existing.
err := kvdb.Update(db, func(tx kvdb.RwTx) error {
@ -51,32 +78,40 @@ func newMissionControlStore(db kvdb.Backend, maxRecords int) (*missionControlSto
err)
}
// Count initial number of results and track this number in
// memory to avoid calling Stats().KeyN. The reliability of
// Stats() is doubtful and seemed to have caused crashes in the
// past (see #1874).
// Collect all keys to be able to quickly calculate the
// difference when updating the DB state.
c := resultsBucket.ReadCursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() {
store.numRecords++
keys.PushBack(k)
keysMap[string(k)] = struct{}{}
}
return nil
}, func() {
store = &missionControlStore{
db: db,
maxRecords: maxRecords,
}
keys = list.New()
keysMap = make(map[string]struct{})
})
if err != nil {
return nil, err
}
return store, nil
return &missionControlStore{
done: make(chan struct{}),
db: db,
queue: list.New(),
keys: keys,
keysMap: keysMap,
maxRecords: maxRecords,
flushInterval: flushInterval,
}, nil
}
// clear removes all results from the db.
func (b *missionControlStore) clear() error {
return kvdb.Update(b.db, func(tx kvdb.RwTx) error {
b.queueMx.Lock()
defer b.queueMx.Unlock()
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
if err := tx.DeleteTopLevelBucket(resultsKey); err != nil {
return err
}
@ -84,6 +119,13 @@ func (b *missionControlStore) clear() error {
_, err := tx.CreateTopLevelBucket(resultsKey)
return err
}, func() {})
if err != nil {
return err
}
b.queue = list.New()
return nil
}
// fetchAll returns all results currently stored in the database.
@ -221,39 +263,117 @@ func deserializeResult(k, v []byte) (*paymentResult, error) {
}
// AddResult adds a new result to the db.
func (b *missionControlStore) AddResult(rp *paymentResult) error {
return kvdb.Update(b.db, func(tx kvdb.RwTx) error {
func (b *missionControlStore) AddResult(rp *paymentResult) {
b.queueMx.Lock()
defer b.queueMx.Unlock()
b.queue.PushBack(rp)
}
// stop stops the store ticker goroutine.
func (b *missionControlStore) stop() {
close(b.done)
b.wg.Wait()
}
// run runs the MC store ticker goroutine.
func (b *missionControlStore) run() {
b.wg.Add(1)
go func() {
ticker := time.NewTicker(b.flushInterval)
defer ticker.Stop()
defer b.wg.Done()
for {
select {
case <-ticker.C:
if err := b.storeResults(); err != nil {
log.Errorf("Failed to update mission "+
"control store: %v", err)
}
case <-b.done:
return
}
}
}()
}
// storeResults stores all accumulated results.
func (b *missionControlStore) storeResults() error {
b.queueMx.Lock()
l := b.queue
b.queue = list.New()
b.queueMx.Unlock()
var (
keys *list.List
keysMap map[string]struct{}
)
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
bucket := tx.ReadWriteBucket(resultsKey)
// Prune oldest entries.
if b.maxRecords > 0 {
for b.numRecords >= b.maxRecords {
cursor := bucket.ReadWriteCursor()
cursor.First()
if err := cursor.Delete(); err != nil {
return err
}
b.numRecords--
}
}
for e := l.Front(); e != nil; e = e.Next() {
pr := e.Value.(*paymentResult)
// Serialize result into key and value byte slices.
k, v, err := serializeResult(rp)
k, v, err := serializeResult(pr)
if err != nil {
return err
}
// The store is assumed to be idempotent. It could be that the
// same result is added twice and in that case the counter
// shouldn't be increased.
if bucket.Get(k) == nil {
b.numRecords++
// The store is assumed to be idempotent. It could be
// that the same result is added twice and in that case
// we don't need to put the value again.
if _, ok := keysMap[string(k)]; ok {
continue
}
// Put into results bucket.
return bucket.Put(k, v)
}, func() {})
if err := bucket.Put(k, v); err != nil {
return err
}
keys.PushBack(k)
keysMap[string(k)] = struct{}{}
}
// Prune oldest entries.
for {
if b.maxRecords == 0 || keys.Len() <= b.maxRecords {
break
}
front := keys.Front()
key := front.Value.([]byte)
if err := bucket.Delete(key); err != nil {
return err
}
keys.Remove(front)
delete(keysMap, string(key))
}
return nil
}, func() {
keys = list.New()
keys.PushBackList(b.keys)
keysMap = make(map[string]struct{})
for k := range b.keysMap {
keysMap[k] = struct{}{}
}
})
if err != nil {
return err
}
b.keys = keys
b.keysMap = keysMap
return nil
}
// getResultKey returns a byte slice representing a unique key for this payment

View File

@ -10,8 +10,8 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/stretchr/testify/require"
)
const testMaxRecords = 2
@ -40,7 +40,7 @@ func TestMissionControlStore(t *testing.T) {
defer db.Close()
defer os.Remove(dbPath)
store, err := newMissionControlStore(db, testMaxRecords)
store, err := newMissionControlStore(db, testMaxRecords, time.Second)
if err != nil {
t.Fatal(err)
}
@ -80,27 +80,21 @@ func TestMissionControlStore(t *testing.T) {
result2.id = 2
// Store result.
err = store.AddResult(&result2)
if err != nil {
t.Fatal(err)
}
store.AddResult(&result2)
// Store again to test idempotency.
err = store.AddResult(&result2)
if err != nil {
t.Fatal(err)
}
store.AddResult(&result2)
// Store second result which has an earlier timestamp.
err = store.AddResult(&result1)
if err != nil {
t.Fatal(err)
}
store.AddResult(&result1)
require.NoError(t, store.storeResults())
results, err = store.fetchAll()
if err != nil {
t.Fatal(err)
}
require.Equal(t, 2, len(results))
if len(results) != 2 {
t.Fatal("expected two results")
}
@ -116,7 +110,7 @@ func TestMissionControlStore(t *testing.T) {
}
// Recreate store to test pruning.
store, err = newMissionControlStore(db, testMaxRecords)
store, err = newMissionControlStore(db, testMaxRecords, time.Second)
if err != nil {
t.Fatal(err)
}
@ -128,16 +122,15 @@ func TestMissionControlStore(t *testing.T) {
result3.id = 3
result3.failure = &lnwire.FailMPPTimeout{}
err = store.AddResult(&result3)
if err != nil {
t.Fatal(err)
}
store.AddResult(&result3)
require.NoError(t, store.storeResults())
// Check that results are pruned.
results, err = store.fetchAll()
if err != nil {
t.Fatal(err)
}
require.Equal(t, 2, len(results))
if len(results) != 2 {
t.Fatal("expected two results")
}

View File

@ -9,6 +9,7 @@ import (
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/stretchr/testify/require"
)
var (
@ -77,6 +78,12 @@ func createMcTestContext(t *testing.T) *mcTestContext {
// restartMc creates a new instances of mission control on the same database.
func (ctx *mcTestContext) restartMc() {
// Since we don't run a timer to store results in unit tests, we store
// them here before fetching back everything in NewMissionControl.
if ctx.mc != nil {
require.NoError(ctx.t, ctx.mc.store.storeResults())
}
mc, err := NewMissionControl(
ctx.db, mcTestSelf,
&MissionControlConfig{

View File

@ -746,6 +746,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
&routing.MissionControlConfig{
ProbabilityEstimatorCfg: estimatorCfg,
MaxMcHistory: routingConfig.MaxMcHistory,
McFlushInterval: routingConfig.McFlushInterval,
MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval,
},
)
@ -1663,6 +1664,12 @@ func (s *server) Start() error {
return nil
})
s.missionControl.RunStoreTicker()
cleanup.add(func() error {
s.missionControl.StopStoreTicker()
return nil
})
// Before we start the connMgr, we'll check to see if we have
// any backups to recover. We do this now as we want to ensure
// that have all the information we need to handle channel
@ -1869,6 +1876,7 @@ func (s *server) Stop() error {
srvrLog.Warnf("failed to stop chanSubSwapper: %v", err)
}
s.chanEventStore.Stop()
s.missionControl.StopStoreTicker()
// Disconnect from each active peers to ensure that
// peerTerminationWatchers signal completion to each peer.