mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-01-19 05:45:21 +01:00
214 lines
5.1 KiB
Go
214 lines
5.1 KiB
Go
//go:build kvdb_etcd
|
|
// +build kvdb_etcd
|
|
|
|
package etcd
|
|
|
|
import (
|
|
"container/list"
|
|
"context"
|
|
"sync"
|
|
)
|
|
|
|
// commitQueue is a simple execution queue to manage conflicts for transactions
|
|
// and thereby reduce the number of times conflicting transactions need to be
|
|
// retried. When a new transaction is added to the queue, we first upgrade the
|
|
// read/write counts in the queue's own accounting to decide whether the new
|
|
// transaction has any conflicting dependencies. If the transaction does not
|
|
// conflict with any other, then it is committed immediately, otherwise it'll be
|
|
// queued up for later execution.
|
|
// The algorithm is described in: http://www.cs.umd.edu/~abadi/papers/vll-vldb13.pdf
|
|
type commitQueue struct {
|
|
ctx context.Context
|
|
mx sync.Mutex
|
|
readerMap map[string]int
|
|
writerMap map[string]int
|
|
|
|
queue *list.List
|
|
queueMx sync.Mutex
|
|
queueCond *sync.Cond
|
|
|
|
shutdown chan struct{}
|
|
}
|
|
|
|
type commitQueueTxn struct {
|
|
commitLoop func()
|
|
blocked bool
|
|
rset []string
|
|
wset []string
|
|
}
|
|
|
|
// NewCommitQueue creates a new commit queue, with the passed abort context.
|
|
func NewCommitQueue(ctx context.Context) *commitQueue {
|
|
q := &commitQueue{
|
|
ctx: ctx,
|
|
readerMap: make(map[string]int),
|
|
writerMap: make(map[string]int),
|
|
queue: list.New(),
|
|
shutdown: make(chan struct{}),
|
|
}
|
|
q.queueCond = sync.NewCond(&q.queueMx)
|
|
|
|
// Start the queue consumer loop.
|
|
go q.mainLoop()
|
|
|
|
return q
|
|
}
|
|
|
|
// Stop signals the queue to stop after the queue context has been canceled and
|
|
// waits until the has stopped.
|
|
func (c *commitQueue) Stop() {
|
|
// Signal the queue's condition variable to ensure the mainLoop reliably
|
|
// unblocks to check for the exit condition.
|
|
c.queueCond.Signal()
|
|
<-c.shutdown
|
|
}
|
|
|
|
// Add increases lock counts and queues up tx commit closure for execution.
|
|
// Transactions that don't have any conflicts are executed immediately by
|
|
// "downgrading" the count mutex to allow concurrency.
|
|
func (c *commitQueue) Add(commitLoop func(), rset []string, wset []string) {
|
|
c.mx.Lock()
|
|
blocked := false
|
|
|
|
// Mark as blocked if there's any writer changing any of the keys in
|
|
// the read set. Do not increment the reader counts yet as we'll need to
|
|
// use the original reader counts when scanning through the write set.
|
|
for _, key := range rset {
|
|
if c.writerMap[key] > 0 {
|
|
blocked = true
|
|
break
|
|
}
|
|
}
|
|
|
|
// Mark as blocked if there's any writer or reader for any of the keys
|
|
// in the write set.
|
|
for _, key := range wset {
|
|
blocked = blocked || c.readerMap[key] > 0 || c.writerMap[key] > 0
|
|
|
|
// Increment the writer count.
|
|
c.writerMap[key] += 1
|
|
}
|
|
|
|
// Finally we can increment the reader counts for keys in the read set.
|
|
for _, key := range rset {
|
|
c.readerMap[key] += 1
|
|
}
|
|
|
|
c.queueCond.L.Lock()
|
|
c.queue.PushBack(&commitQueueTxn{
|
|
commitLoop: commitLoop,
|
|
blocked: blocked,
|
|
rset: rset,
|
|
wset: wset,
|
|
})
|
|
c.queueCond.L.Unlock()
|
|
|
|
c.mx.Unlock()
|
|
|
|
c.queueCond.Signal()
|
|
}
|
|
|
|
// done decreases lock counts of the keys in the read/write sets.
|
|
func (c *commitQueue) done(rset []string, wset []string) {
|
|
c.mx.Lock()
|
|
defer c.mx.Unlock()
|
|
|
|
for _, key := range rset {
|
|
c.readerMap[key] -= 1
|
|
if c.readerMap[key] == 0 {
|
|
delete(c.readerMap, key)
|
|
}
|
|
}
|
|
|
|
for _, key := range wset {
|
|
c.writerMap[key] -= 1
|
|
if c.writerMap[key] == 0 {
|
|
delete(c.writerMap, key)
|
|
}
|
|
}
|
|
}
|
|
|
|
// mainLoop executes queued transaction commits for transactions that have
|
|
// dependencies. The queue ensures that the top element doesn't conflict with
|
|
// any other transactions and therefore can be executed freely.
|
|
func (c *commitQueue) mainLoop() {
|
|
defer close(c.shutdown)
|
|
|
|
for {
|
|
// Wait until there are no unblocked transactions being
|
|
// executed, and for there to be at least one blocked
|
|
// transaction in our queue.
|
|
c.queueCond.L.Lock()
|
|
for c.queue.Front() == nil {
|
|
c.queueCond.Wait()
|
|
|
|
// Check the exit condition before looping again.
|
|
select {
|
|
case <-c.ctx.Done():
|
|
c.queueCond.L.Unlock()
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
|
|
// Now collect all txns until we find the next blocking one.
|
|
// These shouldn't conflict (if the precollected read/write
|
|
// keys sets don't grow), meaning we can safely commit them
|
|
// in parallel.
|
|
work := make([]*commitQueueTxn, 1)
|
|
e := c.queue.Front()
|
|
work[0] = c.queue.Remove(e).(*commitQueueTxn)
|
|
|
|
for {
|
|
e := c.queue.Front()
|
|
if e == nil {
|
|
break
|
|
}
|
|
|
|
next := e.Value.(*commitQueueTxn)
|
|
if !next.blocked {
|
|
work = append(work, next)
|
|
c.queue.Remove(e)
|
|
} else {
|
|
// We found the next blocking txn which means
|
|
// the block of work needs to be cut here.
|
|
break
|
|
}
|
|
}
|
|
|
|
c.queueCond.L.Unlock()
|
|
|
|
// Check if we need to exit before continuing.
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(work))
|
|
|
|
// Fire up N goroutines where each will run its commit loop
|
|
// and then clean up the reader/writer maps.
|
|
for _, txn := range work {
|
|
go func(txn *commitQueueTxn) {
|
|
defer wg.Done()
|
|
txn.commitLoop()
|
|
|
|
// We can safely cleanup here as done only
|
|
// holds the main mutex.
|
|
c.done(txn.rset, txn.wset)
|
|
}(txn)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Check if we need to exit before continuing.
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}
|