From 9c47392dfabdfcbc86d4354a9c8103d031509d8a Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Mon, 10 Aug 2020 16:50:06 +0200 Subject: [PATCH] etcd: integrate the commitQueue to the STM commit loop This commit integrates an externally passed commitQueue instance with the STM to reduce retries for conflicting transactions. --- channeldb/kvdb/etcd/db.go | 29 +++++---- channeldb/kvdb/etcd/db_test.go | 2 +- channeldb/kvdb/etcd/stm.go | 109 ++++++++++++++++++++------------ channeldb/kvdb/etcd/stm_test.go | 40 +++++++++--- 4 files changed, 118 insertions(+), 62 deletions(-) diff --git a/channeldb/kvdb/etcd/db.go b/channeldb/kvdb/etcd/db.go index 6089ab712..9f52ad4eb 100644 --- a/channeldb/kvdb/etcd/db.go +++ b/channeldb/kvdb/etcd/db.go @@ -16,8 +16,8 @@ import ( ) const ( - // etcdConnectionTimeout is the timeout until successful connection to the - // etcd instance. + // etcdConnectionTimeout is the timeout until successful connection to + // the etcd instance. etcdConnectionTimeout = 10 * time.Second // etcdLongTimeout is a timeout for longer taking etcd operatons. @@ -34,7 +34,8 @@ type callerStats struct { func (s callerStats) String() string { return fmt.Sprintf("count: %d, retries: %d, rset: %d, wset: %d", - s.count, s.commitStats.Retries, s.commitStats.Rset, s.commitStats.Wset) + s.count, s.commitStats.Retries, s.commitStats.Rset, + s.commitStats.Wset) } // commitStatsCollector collects commit stats for commits succeeding @@ -117,6 +118,7 @@ type db struct { config BackendConfig cli *clientv3.Client commitStatsCollector *commitStatsCollector + txQueue *commitQueue } // Enforce db implements the walletdb.DB interface. @@ -188,8 +190,9 @@ func newEtcdBackend(config BackendConfig) (*db, error) { } backend := &db{ - cli: cli, - config: config, + cli: cli, + config: config, + txQueue: NewCommitQueue(config.Ctx), } if config.CollectCommitStats { @@ -201,7 +204,9 @@ func newEtcdBackend(config BackendConfig) (*db, error) { // getSTMOptions creats all STM options based on the backend config. func (db *db) getSTMOptions() []STMOptionFunc { - opts := []STMOptionFunc{WithAbortContext(db.config.Ctx)} + opts := []STMOptionFunc{ + WithAbortContext(db.config.Ctx), + } if db.config.CollectCommitStats { opts = append(opts, @@ -221,7 +226,7 @@ func (db *db) View(f func(tx walletdb.ReadTx) error) error { return f(newReadWriteTx(stm, db.config.Prefix)) } - return RunSTM(db.cli, apply, db.getSTMOptions()...) + return RunSTM(db.cli, apply, db.txQueue, db.getSTMOptions()...) } // Update opens a database read/write transaction and executes the function f @@ -235,7 +240,7 @@ func (db *db) Update(f func(tx walletdb.ReadWriteTx) error) error { return f(newReadWriteTx(stm, db.config.Prefix)) } - return RunSTM(db.cli, apply, db.getSTMOptions()...) + return RunSTM(db.cli, apply, db.txQueue, db.getSTMOptions()...) } // PrintStats returns all collected stats pretty printed into a string. @@ -247,18 +252,18 @@ func (db *db) PrintStats() string { return "" } -// BeginReadTx opens a database read transaction. +// BeginReadWriteTx opens a database read+write transaction. func (db *db) BeginReadWriteTx() (walletdb.ReadWriteTx, error) { return newReadWriteTx( - NewSTM(db.cli, db.getSTMOptions()...), + NewSTM(db.cli, db.txQueue, db.getSTMOptions()...), db.config.Prefix, ), nil } -// BeginReadWriteTx opens a database read+write transaction. +// BeginReadTx opens a database read transaction. func (db *db) BeginReadTx() (walletdb.ReadTx, error) { return newReadWriteTx( - NewSTM(db.cli, db.getSTMOptions()...), + NewSTM(db.cli, db.txQueue, db.getSTMOptions()...), db.config.Prefix, ), nil } diff --git a/channeldb/kvdb/etcd/db_test.go b/channeldb/kvdb/etcd/db_test.go index c4332db8a..8a5d623e5 100644 --- a/channeldb/kvdb/etcd/db_test.go +++ b/channeldb/kvdb/etcd/db_test.go @@ -63,7 +63,7 @@ func TestAbortContext(t *testing.T) { // Expect that the update will fail. err = db.Update(func(tx walletdb.ReadWriteTx) error { _, err := tx.CreateTopLevelBucket([]byte("bucket")) - require.NoError(t, err) + require.Error(t, err, "context canceled") return nil }) diff --git a/channeldb/kvdb/etcd/stm.go b/channeldb/kvdb/etcd/stm.go index 14bb9ca92..59ac1f457 100644 --- a/channeldb/kvdb/etcd/stm.go +++ b/channeldb/kvdb/etcd/stm.go @@ -134,6 +134,10 @@ type stm struct { // execute in the STM run loop. manual bool + // txQueue is lightweight contention manager, which is used to detect + // transaction conflicts and reduce retries. + txQueue *commitQueue + // options stores optional settings passed by the user. options *STMOptions @@ -183,18 +187,22 @@ func WithCommitStatsCallback(cb func(bool, CommitStats)) STMOptionFunc { // RunSTM runs the apply function by creating an STM using serializable snapshot // isolation, passing it to the apply and handling commit errors and retries. -func RunSTM(cli *v3.Client, apply func(STM) error, so ...STMOptionFunc) error { - return runSTM(makeSTM(cli, false, so...), apply) +func RunSTM(cli *v3.Client, apply func(STM) error, txQueue *commitQueue, + so ...STMOptionFunc) error { + + return runSTM(makeSTM(cli, false, txQueue, so...), apply) } // NewSTM creates a new STM instance, using serializable snapshot isolation. -func NewSTM(cli *v3.Client, so ...STMOptionFunc) STM { - return makeSTM(cli, true, so...) +func NewSTM(cli *v3.Client, txQueue *commitQueue, so ...STMOptionFunc) STM { + return makeSTM(cli, true, txQueue, so...) } // makeSTM is the actual constructor of the stm. It first apply all passed // options then creates the stm object and resets it before returning. -func makeSTM(cli *v3.Client, manual bool, so ...STMOptionFunc) *stm { +func makeSTM(cli *v3.Client, manual bool, txQueue *commitQueue, + so ...STMOptionFunc) *stm { + opts := &STMOptions{ ctx: cli.Ctx(), } @@ -207,6 +215,7 @@ func makeSTM(cli *v3.Client, manual bool, so ...STMOptionFunc) *stm { s := &stm{ client: cli, manual: manual, + txQueue: txQueue, options: opts, prefetch: make(map[string]stmGet), } @@ -222,50 +231,72 @@ func makeSTM(cli *v3.Client, manual bool, so ...STMOptionFunc) *stm { // CommitError which is used to indicate a necessary retry. func runSTM(s *stm, apply func(STM) error) error { var ( - retries int - stats CommitStats - err error + retries int + stats CommitStats + executeErr error ) -loop: - // In a loop try to apply and commit and roll back if the database has - // changed (CommitError). - for { - select { - // Check if the STM is aborted and break the retry loop if it is. - case <-s.options.ctx.Done(): - err = fmt.Errorf("aborted") - break loop + done := make(chan struct{}) - default: + execute := func() { + defer close(done) + + for { + select { + // Check if the STM is aborted and break the retry loop + // if it is. + case <-s.options.ctx.Done(): + executeErr = fmt.Errorf("aborted") + return + + default: + } + + stats, executeErr = s.commit() + + // Re-apply only upon commit error (meaning the + // keys were changed). + if _, ok := executeErr.(CommitError); !ok { + // Anything that's not a CommitError + // aborts the transaction. + return + } + + // Rollback before trying to re-apply. + s.Rollback() + retries++ + + // Re-apply the transaction closure. + if executeErr = apply(s); executeErr != nil { + return + } } - // Apply the transaction closure and abort the STM if there was - // an application error. - if err = apply(s); err != nil { - break loop - } - - stats, err = s.commit() - - // Retry the apply closure only upon commit error (meaning the - // database was changed). - if _, ok := err.(CommitError); !ok { - // Anything that's not a CommitError aborts the STM - // run loop. - break loop - } - - // Rollback before trying to re-apply. - s.Rollback() - retries++ } + // Run the tx closure to construct the read and write sets. + // Also we expect that if there are no conflicting transactions + // in the queue, then we only run apply once. + if preApplyErr := apply(s); preApplyErr != nil { + return preApplyErr + } + + // Queue up the transaction for execution. + s.txQueue.Add(execute, s.rset, s.wset) + + // Wait for the transaction to execute, or break if aborted. + select { + case <-done: + case <-s.options.ctx.Done(): + } + + s.txQueue.Done(s.rset, s.wset) + if s.options.commitStatsCallback != nil { stats.Retries = retries - s.options.commitStatsCallback(err == nil, stats) + s.options.commitStatsCallback(executeErr == nil, stats) } - return err + return executeErr } // add inserts a txn response to the read set. This is useful when the txn diff --git a/channeldb/kvdb/etcd/stm_test.go b/channeldb/kvdb/etcd/stm_test.go index 6beffc284..cde4abf3a 100644 --- a/channeldb/kvdb/etcd/stm_test.go +++ b/channeldb/kvdb/etcd/stm_test.go @@ -21,7 +21,11 @@ func TestPutToEmpty(t *testing.T) { t.Parallel() f := NewEtcdTestFixture(t) - defer f.Cleanup() + txQueue := NewCommitQueue(f.config.Ctx) + defer func() { + f.Cleanup() + txQueue.Wait() + }() db, err := newEtcdBackend(f.BackendConfig()) require.NoError(t, err) @@ -31,7 +35,7 @@ func TestPutToEmpty(t *testing.T) { return nil } - err = RunSTM(db.cli, apply) + err = RunSTM(db.cli, apply, txQueue) require.NoError(t, err) require.Equal(t, "abc", f.Get("123")) @@ -41,7 +45,11 @@ func TestGetPutDel(t *testing.T) { t.Parallel() f := NewEtcdTestFixture(t) - defer f.cleanup() + txQueue := NewCommitQueue(f.config.Ctx) + defer func() { + f.Cleanup() + txQueue.Wait() + }() testKeyValues := []KV{ {"a", "1"}, @@ -105,7 +113,7 @@ func TestGetPutDel(t *testing.T) { return nil } - err = RunSTM(db.cli, apply) + err = RunSTM(db.cli, apply, txQueue) require.NoError(t, err) require.Equal(t, "1", f.Get("a")) @@ -120,7 +128,11 @@ func TestFirstLastNextPrev(t *testing.T) { t.Parallel() f := NewEtcdTestFixture(t) - defer f.Cleanup() + txQueue := NewCommitQueue(f.config.Ctx) + defer func() { + f.Cleanup() + txQueue.Wait() + }() testKeyValues := []KV{ {"kb", "1"}, @@ -255,7 +267,7 @@ func TestFirstLastNextPrev(t *testing.T) { return nil } - err = RunSTM(db.cli, apply) + err = RunSTM(db.cli, apply, txQueue) require.NoError(t, err) require.Equal(t, "0", f.Get("ka")) @@ -271,7 +283,11 @@ func TestCommitError(t *testing.T) { t.Parallel() f := NewEtcdTestFixture(t) - defer f.Cleanup() + txQueue := NewCommitQueue(f.config.Ctx) + defer func() { + f.Cleanup() + txQueue.Wait() + }() db, err := newEtcdBackend(f.BackendConfig()) require.NoError(t, err) @@ -301,7 +317,7 @@ func TestCommitError(t *testing.T) { return nil } - err = RunSTM(db.cli, apply) + err = RunSTM(db.cli, apply, txQueue) require.NoError(t, err) require.Equal(t, 2, cnt) @@ -312,7 +328,11 @@ func TestManualTxError(t *testing.T) { t.Parallel() f := NewEtcdTestFixture(t) - defer f.Cleanup() + txQueue := NewCommitQueue(f.config.Ctx) + defer func() { + f.Cleanup() + txQueue.Wait() + }() db, err := newEtcdBackend(f.BackendConfig()) require.NoError(t, err) @@ -320,7 +340,7 @@ func TestManualTxError(t *testing.T) { // Preset DB state. f.Put("123", "xyz") - stm := NewSTM(db.cli) + stm := NewSTM(db.cli, txQueue) val, err := stm.Get("123") require.NoError(t, err)