diff --git a/docs/release-notes/release-notes-0.14.0.md b/docs/release-notes/release-notes-0.14.0.md index 515ffe2bc..df46b559f 100644 --- a/docs/release-notes/release-notes-0.14.0.md +++ b/docs/release-notes/release-notes-0.14.0.md @@ -48,6 +48,11 @@ addholdinvoice call](https://github.com/lightningnetwork/lnd/pull/5533). [Optimized payment sequence generation](https://github.com/lightningnetwork/lnd/pull/5514/) to make LNDs payment throughput (and latency) with better when using etcd. +## Performance improvements + +* [Update MC store in blocks](https://github.com/lightningnetwork/lnd/pull/5515) + to make payment throughput better when using etcd. + # Contributors (Alphabetical Order) * ErikEk * Zero-1729 diff --git a/lnrpc/routerrpc/config.go b/lnrpc/routerrpc/config.go index 1d1a6f8d4..ec036258f 100644 --- a/lnrpc/routerrpc/config.go +++ b/lnrpc/routerrpc/config.go @@ -49,6 +49,7 @@ func DefaultConfig() *Config { AttemptCost: routing.DefaultAttemptCost.ToSatoshis(), AttemptCostPPM: routing.DefaultAttemptCostPPM, MaxMcHistory: routing.DefaultMaxMcHistory, + McFlushInterval: routing.DefaultMcFlushInterval, } return &Config{ @@ -66,5 +67,6 @@ func GetRoutingConfig(cfg *Config) *RoutingConfig { AttemptCostPPM: cfg.AttemptCostPPM, PenaltyHalfLife: cfg.PenaltyHalfLife, MaxMcHistory: cfg.MaxMcHistory, + McFlushInterval: cfg.McFlushInterval, } } diff --git a/lnrpc/routerrpc/routing_config.go b/lnrpc/routerrpc/routing_config.go index dd0fe93d1..2c93cb28d 100644 --- a/lnrpc/routerrpc/routing_config.go +++ b/lnrpc/routerrpc/routing_config.go @@ -43,4 +43,8 @@ type RoutingConfig struct { // MaxMcHistory defines the maximum number of payment results that // are held on disk by mission control. MaxMcHistory int `long:"maxmchistory" description:"the maximum number of payment results that are held on disk by mission control"` + + // McFlushInterval defines the timer interval to use to flush mission + // control state to the DB. + McFlushInterval time.Duration `long:"mcflushinterval" description:"the timer interval to use to flush mission control state to the DB"` } diff --git a/routing/missioncontrol.go b/routing/missioncontrol.go index d1dc4294e..a9fad3938 100644 --- a/routing/missioncontrol.go +++ b/routing/missioncontrol.go @@ -45,6 +45,10 @@ const ( // DefaultMaxMcHistory is the default maximum history size. DefaultMaxMcHistory = 1000 + // DefaultMcFlushInterval is the defaul inteval we use to flush MC state + // to the database. + DefaultMcFlushInterval = time.Second + // prevSuccessProbability is the assumed probability for node pairs that // successfully relayed the previous attempt. prevSuccessProbability = 0.95 @@ -119,6 +123,10 @@ type MissionControlConfig struct { // held on disk. MaxMcHistory int + // McFlushInterval defines the ticker interval when we flush the + // accumulated state to the DB. + McFlushInterval time.Duration + // MinFailureRelaxInterval is the minimum time that must have passed // since the previously recorded failure before the failure amount may // be raised. @@ -209,7 +217,9 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex, return nil, err } - store, err := newMissionControlStore(db, cfg.MaxMcHistory) + store, err := newMissionControlStore( + db, cfg.MaxMcHistory, cfg.McFlushInterval, + ) if err != nil { return nil, err } @@ -234,6 +244,16 @@ func NewMissionControl(db kvdb.Backend, self route.Vertex, return mc, nil } +// RunStoreTicker runs the mission control store's ticker. +func (m *MissionControl) RunStoreTicker() { + m.store.run() +} + +// StopStoreTicker stops the mission control store's ticker. +func (m *MissionControl) StopStoreTicker() { + m.store.stop() +} + // init initializes mission control with historical data. func (m *MissionControl) init() error { log.Debugf("Mission control state reconstruction started") @@ -265,6 +285,7 @@ func (m *MissionControl) GetConfig() *MissionControlConfig { return &MissionControlConfig{ ProbabilityEstimatorCfg: m.estimator.ProbabilityEstimatorCfg, MaxMcHistory: m.store.maxRecords, + McFlushInterval: m.store.flushInterval, MinFailureRelaxInterval: m.state.minFailureRelaxInterval, } } @@ -429,9 +450,7 @@ func (m *MissionControl) processPaymentResult(result *paymentResult) ( *channeldb.FailureReason, error) { // Store complete result in database. - if err := m.store.AddResult(result); err != nil { - return nil, err - } + m.store.AddResult(result) m.Lock() defer m.Unlock() diff --git a/routing/missioncontrol_store.go b/routing/missioncontrol_store.go index 9e9974e4a..193f7d13a 100644 --- a/routing/missioncontrol_store.go +++ b/routing/missioncontrol_store.go @@ -2,8 +2,10 @@ package routing import ( "bytes" + "container/list" "encoding/binary" "fmt" + "sync" "time" "github.com/btcsuite/btcd/wire" @@ -35,13 +37,38 @@ const ( // Also changes to mission control parameters can be applied to historical data. // Finally, it enables importing raw data from an external source. type missionControlStore struct { - db kvdb.Backend + done chan struct{} + wg sync.WaitGroup + db kvdb.Backend + queueMx sync.Mutex + + // queue stores all pending payment results not yet added to the store. + queue *list.List + + // keys holds the stored MC store item keys in the order of storage. + // We use this list when adding/deleting items from the database to + // avoid cursor use which may be slow in the remote DB case. + keys *list.List + + // keysMap holds the stored MC store item keys. We use this map to check + // if a new payment result has already been stored. + keysMap map[string]struct{} + + // maxRecords is the maximum amount of records we will store in the db. maxRecords int - numRecords int + + // flushInterval is the configured interval we use to store new results + // and delete outdated ones from the db. + flushInterval time.Duration } -func newMissionControlStore(db kvdb.Backend, maxRecords int) (*missionControlStore, error) { - var store *missionControlStore +func newMissionControlStore(db kvdb.Backend, maxRecords int, + flushInterval time.Duration) (*missionControlStore, error) { + + var ( + keys *list.List + keysMap map[string]struct{} + ) // Create buckets if not yet existing. err := kvdb.Update(db, func(tx kvdb.RwTx) error { @@ -51,32 +78,40 @@ func newMissionControlStore(db kvdb.Backend, maxRecords int) (*missionControlSto err) } - // Count initial number of results and track this number in - // memory to avoid calling Stats().KeyN. The reliability of - // Stats() is doubtful and seemed to have caused crashes in the - // past (see #1874). + // Collect all keys to be able to quickly calculate the + // difference when updating the DB state. c := resultsBucket.ReadCursor() for k, _ := c.First(); k != nil; k, _ = c.Next() { - store.numRecords++ + keys.PushBack(k) + keysMap[string(k)] = struct{}{} } return nil }, func() { - store = &missionControlStore{ - db: db, - maxRecords: maxRecords, - } + keys = list.New() + keysMap = make(map[string]struct{}) }) if err != nil { return nil, err } - return store, nil + return &missionControlStore{ + done: make(chan struct{}), + db: db, + queue: list.New(), + keys: keys, + keysMap: keysMap, + maxRecords: maxRecords, + flushInterval: flushInterval, + }, nil } // clear removes all results from the db. func (b *missionControlStore) clear() error { - return kvdb.Update(b.db, func(tx kvdb.RwTx) error { + b.queueMx.Lock() + defer b.queueMx.Unlock() + + err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { if err := tx.DeleteTopLevelBucket(resultsKey); err != nil { return err } @@ -84,6 +119,13 @@ func (b *missionControlStore) clear() error { _, err := tx.CreateTopLevelBucket(resultsKey) return err }, func() {}) + + if err != nil { + return err + } + + b.queue = list.New() + return nil } // fetchAll returns all results currently stored in the database. @@ -221,39 +263,117 @@ func deserializeResult(k, v []byte) (*paymentResult, error) { } // AddResult adds a new result to the db. -func (b *missionControlStore) AddResult(rp *paymentResult) error { - return kvdb.Update(b.db, func(tx kvdb.RwTx) error { - bucket := tx.ReadWriteBucket(resultsKey) +func (b *missionControlStore) AddResult(rp *paymentResult) { + b.queueMx.Lock() + defer b.queueMx.Unlock() + b.queue.PushBack(rp) +} - // Prune oldest entries. - if b.maxRecords > 0 { - for b.numRecords >= b.maxRecords { - cursor := bucket.ReadWriteCursor() - cursor.First() - if err := cursor.Delete(); err != nil { - return err +// stop stops the store ticker goroutine. +func (b *missionControlStore) stop() { + close(b.done) + b.wg.Wait() +} + +// run runs the MC store ticker goroutine. +func (b *missionControlStore) run() { + b.wg.Add(1) + + go func() { + ticker := time.NewTicker(b.flushInterval) + defer ticker.Stop() + defer b.wg.Done() + + for { + select { + case <-ticker.C: + if err := b.storeResults(); err != nil { + log.Errorf("Failed to update mission "+ + "control store: %v", err) } - b.numRecords-- + case <-b.done: + return } } + }() +} - // Serialize result into key and value byte slices. - k, v, err := serializeResult(rp) - if err != nil { - return err +// storeResults stores all accumulated results. +func (b *missionControlStore) storeResults() error { + b.queueMx.Lock() + l := b.queue + b.queue = list.New() + b.queueMx.Unlock() + + var ( + keys *list.List + keysMap map[string]struct{} + ) + + err := kvdb.Update(b.db, func(tx kvdb.RwTx) error { + bucket := tx.ReadWriteBucket(resultsKey) + + for e := l.Front(); e != nil; e = e.Next() { + pr := e.Value.(*paymentResult) + // Serialize result into key and value byte slices. + k, v, err := serializeResult(pr) + if err != nil { + return err + } + + // The store is assumed to be idempotent. It could be + // that the same result is added twice and in that case + // we don't need to put the value again. + if _, ok := keysMap[string(k)]; ok { + continue + } + + // Put into results bucket. + if err := bucket.Put(k, v); err != nil { + return err + } + + keys.PushBack(k) + keysMap[string(k)] = struct{}{} } - // The store is assumed to be idempotent. It could be that the - // same result is added twice and in that case the counter - // shouldn't be increased. - if bucket.Get(k) == nil { - b.numRecords++ + // Prune oldest entries. + for { + if b.maxRecords == 0 || keys.Len() <= b.maxRecords { + break + } + + front := keys.Front() + key := front.Value.([]byte) + + if err := bucket.Delete(key); err != nil { + return err + } + + keys.Remove(front) + delete(keysMap, string(key)) } - // Put into results bucket. - return bucket.Put(k, v) - }, func() {}) + return nil + }, func() { + keys = list.New() + keys.PushBackList(b.keys) + + keysMap = make(map[string]struct{}) + for k := range b.keysMap { + keysMap[k] = struct{}{} + } + }) + + if err != nil { + return err + } + + b.keys = keys + b.keysMap = keysMap + + return nil } // getResultKey returns a byte slice representing a unique key for this payment diff --git a/routing/missioncontrol_store_test.go b/routing/missioncontrol_store_test.go index 4653c51eb..f1e3a45f0 100644 --- a/routing/missioncontrol_store_test.go +++ b/routing/missioncontrol_store_test.go @@ -10,8 +10,8 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" - "github.com/lightningnetwork/lnd/routing/route" + "github.com/stretchr/testify/require" ) const testMaxRecords = 2 @@ -40,7 +40,7 @@ func TestMissionControlStore(t *testing.T) { defer db.Close() defer os.Remove(dbPath) - store, err := newMissionControlStore(db, testMaxRecords) + store, err := newMissionControlStore(db, testMaxRecords, time.Second) if err != nil { t.Fatal(err) } @@ -80,27 +80,21 @@ func TestMissionControlStore(t *testing.T) { result2.id = 2 // Store result. - err = store.AddResult(&result2) - if err != nil { - t.Fatal(err) - } + store.AddResult(&result2) // Store again to test idempotency. - err = store.AddResult(&result2) - if err != nil { - t.Fatal(err) - } + store.AddResult(&result2) // Store second result which has an earlier timestamp. - err = store.AddResult(&result1) - if err != nil { - t.Fatal(err) - } + store.AddResult(&result1) + require.NoError(t, store.storeResults()) results, err = store.fetchAll() if err != nil { t.Fatal(err) } + require.Equal(t, 2, len(results)) + if len(results) != 2 { t.Fatal("expected two results") } @@ -116,7 +110,7 @@ func TestMissionControlStore(t *testing.T) { } // Recreate store to test pruning. - store, err = newMissionControlStore(db, testMaxRecords) + store, err = newMissionControlStore(db, testMaxRecords, time.Second) if err != nil { t.Fatal(err) } @@ -128,16 +122,15 @@ func TestMissionControlStore(t *testing.T) { result3.id = 3 result3.failure = &lnwire.FailMPPTimeout{} - err = store.AddResult(&result3) - if err != nil { - t.Fatal(err) - } + store.AddResult(&result3) + require.NoError(t, store.storeResults()) // Check that results are pruned. results, err = store.fetchAll() if err != nil { t.Fatal(err) } + require.Equal(t, 2, len(results)) if len(results) != 2 { t.Fatal("expected two results") } diff --git a/routing/missioncontrol_test.go b/routing/missioncontrol_test.go index 6be9c4cb6..2e79ffb0f 100644 --- a/routing/missioncontrol_test.go +++ b/routing/missioncontrol_test.go @@ -9,6 +9,7 @@ import ( "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" + "github.com/stretchr/testify/require" ) var ( @@ -77,6 +78,12 @@ func createMcTestContext(t *testing.T) *mcTestContext { // restartMc creates a new instances of mission control on the same database. func (ctx *mcTestContext) restartMc() { + // Since we don't run a timer to store results in unit tests, we store + // them here before fetching back everything in NewMissionControl. + if ctx.mc != nil { + require.NoError(ctx.t, ctx.mc.store.storeResults()) + } + mc, err := NewMissionControl( ctx.db, mcTestSelf, &MissionControlConfig{ diff --git a/sample-lnd.conf b/sample-lnd.conf index 0e6c70a4b..56fba9d67 100644 --- a/sample-lnd.conf +++ b/sample-lnd.conf @@ -1023,6 +1023,9 @@ litecoin.node=ltcd ; (default: 1000) ; routerrpc.maxmchistory=900 +; The time interval with which the MC store state is flushed to the DB. +; routerrpc.mcflushinterval=1m + ; Path to the router macaroon ; routerrpc.routermacaroonpath=~/.lnd/data/chain/bitcoin/simnet/router.macaroon diff --git a/server.go b/server.go index bc2d75c00..2b914b90a 100644 --- a/server.go +++ b/server.go @@ -746,6 +746,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr, &routing.MissionControlConfig{ ProbabilityEstimatorCfg: estimatorCfg, MaxMcHistory: routingConfig.MaxMcHistory, + McFlushInterval: routingConfig.McFlushInterval, MinFailureRelaxInterval: routing.DefaultMinFailureRelaxInterval, }, ) @@ -1663,6 +1664,12 @@ func (s *server) Start() error { return nil }) + s.missionControl.RunStoreTicker() + cleanup.add(func() error { + s.missionControl.StopStoreTicker() + return nil + }) + // Before we start the connMgr, we'll check to see if we have // any backups to recover. We do this now as we want to ensure // that have all the information we need to handle channel @@ -1869,6 +1876,7 @@ func (s *server) Stop() error { srvrLog.Warnf("failed to stop chanSubSwapper: %v", err) } s.chanEventStore.Stop() + s.missionControl.StopStoreTicker() // Disconnect from each active peers to ensure that // peerTerminationWatchers signal completion to each peer.