etcd: fix dereferncing issue in etcd.CommitQueue causing contention

This commit fixes an issue where subsequent transaction retries may have
changed the read/write sets inside the STM which in turn left junk
references to these keys in the CommitQueue. The left keys potentially
conflicted with subsequent transactions, queueing them up causing
througput degradation.
This commit is contained in:
Andras Banki-Horvath 2021-06-16 16:20:54 +02:00
parent 407ee84838
commit 02aa77261d
No known key found for this signature in database
GPG key ID: 80E5375C094198D8
3 changed files with 39 additions and 39 deletions

View file

@ -54,14 +54,14 @@ func (c *commitQueue) Wait() {
// Add increases lock counts and queues up tx commit closure for execution.
// Transactions that don't have any conflicts are executed immediately by
// "downgrading" the count mutex to allow concurrency.
func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) {
func (c *commitQueue) Add(commitLoop func(), rset []string, wset []string) {
c.mx.Lock()
blocked := false
// Mark as blocked if there's any writer changing any of the keys in
// the read set. Do not increment the reader counts yet as we'll need to
// use the original reader counts when scanning through the write set.
for key := range rset {
for _, key := range rset {
if c.writerMap[key] > 0 {
blocked = true
break
@ -70,7 +70,7 @@ func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) {
// Mark as blocked if there's any writer or reader for any of the keys
// in the write set.
for key := range wset {
for _, key := range wset {
blocked = blocked || c.readerMap[key] > 0 || c.writerMap[key] > 0
// Increment the writer count.
@ -78,7 +78,7 @@ func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) {
}
// Finally we can increment the reader counts for keys in the read set.
for key := range rset {
for _, key := range rset {
c.readerMap[key] += 1
}
@ -108,18 +108,18 @@ func (c *commitQueue) Add(commitLoop func(), rset readSet, wset writeSet) {
}
// Done decreases lock counts of the keys in the read/write sets.
func (c *commitQueue) Done(rset readSet, wset writeSet) {
func (c *commitQueue) Done(rset []string, wset []string) {
c.mx.Lock()
defer c.mx.Unlock()
for key := range rset {
for _, key := range rset {
c.readerMap[key] -= 1
if c.readerMap[key] == 0 {
delete(c.readerMap, key)
}
}
for key := range wset {
for _, key := range wset {
c.writerMap[key] -= 1
if c.writerMap[key] == 0 {
delete(c.writerMap, key)

View file

@ -39,28 +39,6 @@ func TestCommitQueue(t *testing.T) {
}
}
// Helper function to create a read set from the passed keys.
makeReadSet := func(keys []string) readSet {
rs := make(map[string]stmGet)
for _, key := range keys {
rs[key] = stmGet{}
}
return rs
}
// Helper function to create a write set from the passed keys.
makeWriteSet := func(keys []string) writeSet {
ws := make(map[string]stmPut)
for _, key := range keys {
ws[key] = stmPut{}
}
return ws
}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
q := NewCommitQueue(ctx)
@ -73,26 +51,26 @@ func TestCommitQueue(t *testing.T) {
// Tx1: reads: key1, key2, writes: key3, conflict: none
q.Add(
commit("free", true),
makeReadSet([]string{"key1", "key2"}),
makeWriteSet([]string{"key3"}),
[]string{"key1", "key2"},
[]string{"key3"},
)
// Tx2: reads: key1, key2, writes: key3, conflict: Tx1
q.Add(
commit("blocked1", false),
makeReadSet([]string{"key1", "key2"}),
makeWriteSet([]string{"key3"}),
[]string{"key1", "key2"},
[]string{"key3"},
)
// Tx3: reads: key1, writes: key4, conflict: none
q.Add(
commit("free", true),
makeReadSet([]string{"key1", "key2"}),
makeWriteSet([]string{"key4"}),
[]string{"key1", "key2"},
[]string{"key4"},
)
// Tx4: reads: key2, writes: key4 conflict: Tx3
q.Add(
commit("blocked2", false),
makeReadSet([]string{"key2"}),
makeWriteSet([]string{"key4"}),
[]string{"key2"},
[]string{"key4"},
)
// Wait for all commits.

View file

@ -280,8 +280,30 @@ func runSTM(s *stm, apply func(STM) error) error {
return preApplyErr
}
// Make a copy of the read/write set keys here. The reason why we need
// to do this is because subsequent applies may change (shrink) these
// sets and so when we decrease reference counts in the commit queue in
// Done(...) we'd potentially miss removing references which would
// result in queueing up transactions and contending DB access.
// Copying these strings is cheap due to Go's immutable string which is
// always a reference.
rkeys := make([]string, len(s.rset))
wkeys := make([]string, len(s.wset))
i := 0
for key := range s.rset {
rkeys[i] = key
i++
}
i = 0
for key := range s.wset {
wkeys[i] = key
i++
}
// Queue up the transaction for execution.
s.txQueue.Add(execute, s.rset, s.wset)
s.txQueue.Add(execute, rkeys, wkeys)
// Wait for the transaction to execute, or break if aborted.
select {
@ -289,7 +311,7 @@ func runSTM(s *stm, apply func(STM) error) error {
case <-s.options.ctx.Done():
}
s.txQueue.Done(s.rset, s.wset)
s.txQueue.Done(rkeys, wkeys)
if s.options.commitStatsCallback != nil {
stats.Retries = retries