Merge pull request #5515 from bhandras/mc_store_optimization

routing: store missioncontrol state in blocks and eliminate cursor use
This commit is contained in:
András Bánki-Horváth 2021-07-26 23:36:51 +02:00 committed by GitHub
commit 5de6af798d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 222 additions and 61 deletions

View file

@ -48,6 +48,11 @@ addholdinvoice call](https://github.com/lightningnetwork/lnd/pull/5533).
[Optimized payment sequence generation](https://github.com/lightningnetwork/lnd/pull/5514/) [Optimized payment sequence generation](https://github.com/lightningnetwork/lnd/pull/5514/)
to make LNDs payment throughput (and latency) with better when using etcd. to make LNDs payment throughput (and latency) with better when using etcd.
## Performance improvements
* [Update MC store in blocks](https://github.com/lightningnetwork/lnd/pull/5515)
to make payment throughput better when using etcd.
# Contributors (Alphabetical Order) # Contributors (Alphabetical Order)
* ErikEk * ErikEk
* Zero-1729 * Zero-1729

View file

@ -49,6 +49,7 @@ func DefaultConfig() *Config {
AttemptCost: routing.DefaultAttemptCost.ToSatoshis(), AttemptCost: routing.DefaultAttemptCost.ToSatoshis(),
AttemptCostPPM: routing.DefaultAttemptCostPPM, AttemptCostPPM: routing.DefaultAttemptCostPPM,
MaxMcHistory: routing.DefaultMaxMcHistory, MaxMcHistory: routing.DefaultMaxMcHistory,
McFlushInterval: routing.DefaultMcFlushInterval,
} }
return &Config{ return &Config{
@ -66,5 +67,6 @@ func GetRoutingConfig(cfg *Config) *RoutingConfig {
AttemptCostPPM: cfg.AttemptCostPPM, AttemptCostPPM: cfg.AttemptCostPPM,
PenaltyHalfLife: cfg.PenaltyHalfLife, PenaltyHalfLife: cfg.PenaltyHalfLife,
MaxMcHistory: cfg.MaxMcHistory, MaxMcHistory: cfg.MaxMcHistory,
McFlushInterval: cfg.McFlushInterval,
} }
} }

View file

@ -43,4 +43,8 @@ type RoutingConfig struct {
// MaxMcHistory defines the maximum number of payment results that // MaxMcHistory defines the maximum number of payment results that
// are held on disk by mission control. // are held on disk by mission control.
MaxMcHistory int `long:"maxmchistory" description:"the maximum number of payment results that are held on disk by mission control"` MaxMcHistory int `long:"maxmchistory" description:"the maximum number of payment results that are held on disk by mission control"`
// McFlushInterval defines the timer interval to use to flush mission
// control state to the DB.
McFlushInterval time.Duration `long:"mcflushinterval" description:"the timer interval to use to flush mission control state to the DB"`
} }

View file

@ -45,6 +45,10 @@ const (
// DefaultMaxMcHistory is the default maximum history size. // DefaultMaxMcHistory is the default maximum history size.
DefaultMaxMcHistory = 1000 DefaultMaxMcHistory = 1000
// DefaultMcFlushInterval is the defaul inteval we use to flush MC state
// to the database.
DefaultMcFlushInterval = time.Second
// prevSuccessProbability is the assumed probability for node pairs that // prevSuccessProbability is the assumed probability for node pairs that
// successfully relayed the previous attempt. // successfully relayed the previous attempt.
prevSuccessProbability = 0.95 prevSuccessProbability = 0.95
@ -119,6 +123,10 @@ type MissionControlConfig struct {
// held on disk. // held on disk.
MaxMcHistory int MaxMcHistory int
// McFlushInterval defines the ticker interval when we flush the
// accumulated state to the DB.
McFlushInterval time.Duration
// MinFailureRelaxInterval is the minimum time that must have passed // MinFailureRelaxInterval is the minimum time that must have passed
// since the previously recorded failure before the failure amount may // since the previously recorded failure before the failure amount may
// be raised. // be raised.
@ -209,7 +217,9 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex,
return nil, err return nil, err
} }
store, err := newMissionControlStore(db, cfg.MaxMcHistory) store, err := newMissionControlStore(
db, cfg.MaxMcHistory, cfg.McFlushInterval,
)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -234,6 +244,16 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex,
return mc, nil return mc, nil
} }
// RunStoreTicker runs the mission control store's ticker.
func (m *MissionControl) RunStoreTicker() {
m.store.run()
}
// StopStoreTicker stops the mission control store's ticker.
func (m *MissionControl) StopStoreTicker() {
m.store.stop()
}
// init initializes mission control with historical data. // init initializes mission control with historical data.
func (m *MissionControl) init() error { func (m *MissionControl) init() error {
log.Debugf("Mission control state reconstruction started") log.Debugf("Mission control state reconstruction started")
@ -265,6 +285,7 @@ func (m *MissionControl) GetConfig() *MissionControlConfig {
return &MissionControlConfig{ return &MissionControlConfig{
ProbabilityEstimatorCfg: m.estimator.ProbabilityEstimatorCfg, ProbabilityEstimatorCfg: m.estimator.ProbabilityEstimatorCfg,
MaxMcHistory: m.store.maxRecords, MaxMcHistory: m.store.maxRecords,
McFlushInterval: m.store.flushInterval,
MinFailureRelaxInterval: m.state.minFailureRelaxInterval, MinFailureRelaxInterval: m.state.minFailureRelaxInterval,
} }
} }
@ -429,9 +450,7 @@ func (m *MissionControl) processPaymentResult(result *paymentResult) (
*channeldb.FailureReason, error) { *channeldb.FailureReason, error) {
// Store complete result in database. // Store complete result in database.
if err := m.store.AddResult(result); err != nil { m.store.AddResult(result)
return nil, err
}
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()

View file

@ -2,8 +2,10 @@ package routing
import ( import (
"bytes" "bytes"
"container/list"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
@ -35,13 +37,38 @@ const (
// Also changes to mission control parameters can be applied to historical data. // Also changes to mission control parameters can be applied to historical data.
// Finally, it enables importing raw data from an external source. // Finally, it enables importing raw data from an external source.
type missionControlStore struct { type missionControlStore struct {
done chan struct{}
wg sync.WaitGroup
db kvdb.Backend db kvdb.Backend
queueMx sync.Mutex
// queue stores all pending payment results not yet added to the store.
queue *list.List
// keys holds the stored MC store item keys in the order of storage.
// We use this list when adding/deleting items from the database to
// avoid cursor use which may be slow in the remote DB case.
keys *list.List
// keysMap holds the stored MC store item keys. We use this map to check
// if a new payment result has already been stored.
keysMap map[string]struct{}
// maxRecords is the maximum amount of records we will store in the db.
maxRecords int maxRecords int
numRecords int
// flushInterval is the configured interval we use to store new results
// and delete outdated ones from the db.
flushInterval time.Duration
} }
func newMissionControlStore(db kvdb.Backend, maxRecords int) (*missionControlStore, error) { func newMissionControlStore(db kvdb.Backend, maxRecords int,
var store *missionControlStore flushInterval time.Duration) (*missionControlStore, error) {
var (
keys *list.List
keysMap map[string]struct{}
)
// Create buckets if not yet existing. // Create buckets if not yet existing.
err := kvdb.Update(db, func(tx kvdb.RwTx) error { err := kvdb.Update(db, func(tx kvdb.RwTx) error {
@ -51,32 +78,40 @@ func newMissionControlStore(db kvdb.Backend, maxRecords int) (*missionControlSto
err) err)
} }
// Count initial number of results and track this number in // Collect all keys to be able to quickly calculate the
// memory to avoid calling Stats().KeyN. The reliability of // difference when updating the DB state.
// Stats() is doubtful and seemed to have caused crashes in the
// past (see #1874).
c := resultsBucket.ReadCursor() c := resultsBucket.ReadCursor()
for k, _ := c.First(); k != nil; k, _ = c.Next() { for k, _ := c.First(); k != nil; k, _ = c.Next() {
store.numRecords++ keys.PushBack(k)
keysMap[string(k)] = struct{}{}
} }
return nil return nil
}, func() { }, func() {
store = &missionControlStore{ keys = list.New()
db: db, keysMap = make(map[string]struct{})
maxRecords: maxRecords,
}
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
return store, nil return &missionControlStore{
done: make(chan struct{}),
db: db,
queue: list.New(),
keys: keys,
keysMap: keysMap,
maxRecords: maxRecords,
flushInterval: flushInterval,
}, nil
} }
// clear removes all results from the db. // clear removes all results from the db.
func (b *missionControlStore) clear() error { func (b *missionControlStore) clear() error {
return kvdb.Update(b.db, func(tx kvdb.RwTx) error { b.queueMx.Lock()
defer b.queueMx.Unlock()
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
if err := tx.DeleteTopLevelBucket(resultsKey); err != nil { if err := tx.DeleteTopLevelBucket(resultsKey); err != nil {
return err return err
} }
@ -84,6 +119,13 @@ func (b *missionControlStore) clear() error {
_, err := tx.CreateTopLevelBucket(resultsKey) _, err := tx.CreateTopLevelBucket(resultsKey)
return err return err
}, func() {}) }, func() {})
if err != nil {
return err
}
b.queue = list.New()
return nil
} }
// fetchAll returns all results currently stored in the database. // fetchAll returns all results currently stored in the database.
@ -221,39 +263,117 @@ func deserializeResult(k, v []byte) (*paymentResult, error) {
} }
// AddResult adds a new result to the db. // AddResult adds a new result to the db.
func (b *missionControlStore) AddResult(rp *paymentResult) error { func (b *missionControlStore) AddResult(rp *paymentResult) {
return kvdb.Update(b.db, func(tx kvdb.RwTx) error { b.queueMx.Lock()
defer b.queueMx.Unlock()
b.queue.PushBack(rp)
}
// stop stops the store ticker goroutine.
func (b *missionControlStore) stop() {
close(b.done)
b.wg.Wait()
}
// run runs the MC store ticker goroutine.
func (b *missionControlStore) run() {
b.wg.Add(1)
go func() {
ticker := time.NewTicker(b.flushInterval)
defer ticker.Stop()
defer b.wg.Done()
for {
select {
case <-ticker.C:
if err := b.storeResults(); err != nil {
log.Errorf("Failed to update mission "+
"control store: %v", err)
}
case <-b.done:
return
}
}
}()
}
// storeResults stores all accumulated results.
func (b *missionControlStore) storeResults() error {
b.queueMx.Lock()
l := b.queue
b.queue = list.New()
b.queueMx.Unlock()
var (
keys *list.List
keysMap map[string]struct{}
)
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
bucket := tx.ReadWriteBucket(resultsKey) bucket := tx.ReadWriteBucket(resultsKey)
// Prune oldest entries. for e := l.Front(); e != nil; e = e.Next() {
if b.maxRecords > 0 { pr := e.Value.(*paymentResult)
for b.numRecords >= b.maxRecords {
cursor := bucket.ReadWriteCursor()
cursor.First()
if err := cursor.Delete(); err != nil {
return err
}
b.numRecords--
}
}
// Serialize result into key and value byte slices. // Serialize result into key and value byte slices.
k, v, err := serializeResult(rp) k, v, err := serializeResult(pr)
if err != nil { if err != nil {
return err return err
} }
// The store is assumed to be idempotent. It could be that the // The store is assumed to be idempotent. It could be
// same result is added twice and in that case the counter // that the same result is added twice and in that case
// shouldn't be increased. // we don't need to put the value again.
if bucket.Get(k) == nil { if _, ok := keysMap[string(k)]; ok {
b.numRecords++ continue
} }
// Put into results bucket. // Put into results bucket.
return bucket.Put(k, v) if err := bucket.Put(k, v); err != nil {
}, func() {}) return err
}
keys.PushBack(k)
keysMap[string(k)] = struct{}{}
}
// Prune oldest entries.
for {
if b.maxRecords == 0 || keys.Len() <= b.maxRecords {
break
}
front := keys.Front()
key := front.Value.([]byte)
if err := bucket.Delete(key); err != nil {
return err
}
keys.Remove(front)
delete(keysMap, string(key))
}
return nil
}, func() {
keys = list.New()
keys.PushBackList(b.keys)
keysMap = make(map[string]struct{})
for k := range b.keysMap {
keysMap[k] = struct{}{}
}
})
if err != nil {
return err
}
b.keys = keys
b.keysMap = keysMap
return nil
} }
// getResultKey returns a byte slice representing a unique key for this payment // getResultKey returns a byte slice representing a unique key for this payment

View file

@ -10,8 +10,8 @@ import (
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
"github.com/stretchr/testify/require"
) )
const testMaxRecords = 2 const testMaxRecords = 2
@ -40,7 +40,7 @@ func TestMissionControlStore(t *testing.T) {
defer db.Close() defer db.Close()
defer os.Remove(dbPath) defer os.Remove(dbPath)
store, err := newMissionControlStore(db, testMaxRecords) store, err := newMissionControlStore(db, testMaxRecords, time.Second)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -80,27 +80,21 @@ func TestMissionControlStore(t *testing.T) {
result2.id = 2 result2.id = 2
// Store result. // Store result.
err = store.AddResult(&result2) store.AddResult(&result2)
if err != nil {
t.Fatal(err)
}
// Store again to test idempotency. // Store again to test idempotency.
err = store.AddResult(&result2) store.AddResult(&result2)
if err != nil {
t.Fatal(err)
}
// Store second result which has an earlier timestamp. // Store second result which has an earlier timestamp.
err = store.AddResult(&result1) store.AddResult(&result1)
if err != nil { require.NoError(t, store.storeResults())
t.Fatal(err)
}
results, err = store.fetchAll() results, err = store.fetchAll()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
require.Equal(t, 2, len(results))
if len(results) != 2 { if len(results) != 2 {
t.Fatal("expected two results") t.Fatal("expected two results")
} }
@ -116,7 +110,7 @@ func TestMissionControlStore(t *testing.T) {
} }
// Recreate store to test pruning. // Recreate store to test pruning.
store, err = newMissionControlStore(db, testMaxRecords) store, err = newMissionControlStore(db, testMaxRecords, time.Second)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -128,16 +122,15 @@ func TestMissionControlStore(t *testing.T) {
result3.id = 3 result3.id = 3
result3.failure = &lnwire.FailMPPTimeout{} result3.failure = &lnwire.FailMPPTimeout{}
err = store.AddResult(&result3) store.AddResult(&result3)
if err != nil { require.NoError(t, store.storeResults())
t.Fatal(err)
}
// Check that results are pruned. // Check that results are pruned.
results, err = store.fetchAll() results, err = store.fetchAll()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
require.Equal(t, 2, len(results))
if len(results) != 2 { if len(results) != 2 {
t.Fatal("expected two results") t.Fatal("expected two results")
} }

View file

@ -9,6 +9,7 @@ import (
"github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
"github.com/stretchr/testify/require"
) )
var ( var (
@ -77,6 +78,12 @@ func createMcTestContext(t *testing.T) *mcTestContext {
// restartMc creates a new instances of mission control on the same database. // restartMc creates a new instances of mission control on the same database.
func (ctx *mcTestContext) restartMc() { func (ctx *mcTestContext) restartMc() {
// Since we don't run a timer to store results in unit tests, we store
// them here before fetching back everything in NewMissionControl.
if ctx.mc != nil {
require.NoError(ctx.t, ctx.mc.store.storeResults())
}
mc, err := NewMissionControl( mc, err := NewMissionControl(
ctx.db, mcTestSelf, ctx.db, mcTestSelf,
&MissionControlConfig{ &MissionControlConfig{

View file

@ -1023,6 +1023,9 @@ litecoin.node=ltcd
; (default: 1000) ; (default: 1000)
; routerrpc.maxmchistory=900 ; routerrpc.maxmchistory=900
; The time interval with which the MC store state is flushed to the DB.
; routerrpc.mcflushinterval=1m
; Path to the router macaroon ; Path to the router macaroon
; routerrpc.routermacaroonpath=~/.lnd/data/chain/bitcoin/simnet/router.macaroon ; routerrpc.routermacaroonpath=~/.lnd/data/chain/bitcoin/simnet/router.macaroon

View file

@ -746,6 +746,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
&routing.MissionControlConfig{ &routing.MissionControlConfig{
ProbabilityEstimatorCfg: estimatorCfg, ProbabilityEstimatorCfg: estimatorCfg,
MaxMcHistory: routingConfig.MaxMcHistory, MaxMcHistory: routingConfig.MaxMcHistory,
McFlushInterval: routingConfig.McFlushInterval,
MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval, MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval,
}, },
) )
@ -1663,6 +1664,12 @@ func (s *server) Start() error {
return nil return nil
}) })
s.missionControl.RunStoreTicker()
cleanup.add(func() error {
s.missionControl.StopStoreTicker()
return nil
})
// Before we start the connMgr, we'll check to see if we have // Before we start the connMgr, we'll check to see if we have
// any backups to recover. We do this now as we want to ensure // any backups to recover. We do this now as we want to ensure
// that have all the information we need to handle channel // that have all the information we need to handle channel
@ -1869,6 +1876,7 @@ func (s *server) Stop() error {
srvrLog.Warnf("failed to stop chanSubSwapper: %v", err) srvrLog.Warnf("failed to stop chanSubSwapper: %v", err)
} }
s.chanEventStore.Stop() s.chanEventStore.Stop()
s.missionControl.StopStoreTicker()
// Disconnect from each active peers to ensure that // Disconnect from each active peers to ensure that
// peerTerminationWatchers signal completion to each peer. // peerTerminationWatchers signal completion to each peer.