etcd: extend kvdb_etcd with a real ForAll implementation

This commit is contained in:
Andras Banki-Horvath 2022-01-05 17:11:30 +01:00
parent 4d9a05c2f4
commit 40af029413
No known key found for this signature in database
GPG Key ID: 80E5375C094198D8
3 changed files with 83 additions and 9 deletions

2
go.mod
View File

@ -45,7 +45,7 @@ require (
github.com/lightningnetwork/lnd/cert v1.1.0
github.com/lightningnetwork/lnd/clock v1.1.0
github.com/lightningnetwork/lnd/healthcheck v1.2.0
github.com/lightningnetwork/lnd/kvdb v1.3.0
github.com/lightningnetwork/lnd/kvdb v1.3.1
github.com/lightningnetwork/lnd/queue v1.1.0
github.com/lightningnetwork/lnd/ticker v1.1.0
github.com/lightningnetwork/lnd/tlv v1.0.1

View File

@ -17,6 +17,9 @@ type readWriteBucket struct {
// appropriate prefix to prefix the key.
id []byte
// key is the bucket key.
key []byte
// tx holds the parent transaction.
tx *readWriteTx
}
@ -26,6 +29,7 @@ type readWriteBucket struct {
func newReadWriteBucket(tx *readWriteTx, key, id []byte) *readWriteBucket {
return &readWriteBucket{
id: id,
key: key,
tx: tx,
}
}
@ -66,6 +70,31 @@ func (b *readWriteBucket) ForEach(cb func(k, v []byte) error) error {
return nil
}
// ForAll is an optimized version of ForEach for the case when we know we will
// fetch all (or almost all) items.
//
// NOTE: ForAll differs from ForEach in that no additional queries can
// be executed within the callback.
func (b *readWriteBucket) ForAll(cb func(k, v []byte) error) error {
// When we opened this bucket, we fetched the bucket key using the STM
// which put a revision "lock" in the read set. We can leverage this
// by incrementing the revision on the bucket, making any transaction
// retry that'd touch this same bucket. This way we can safely read all
// keys from the bucket and not cache them in the STM.
// To increment the bucket's revision, we simply put in the bucket key
// value again (which is idempotent if the bucket has just been created).
b.tx.stm.Put(string(b.key), string(b.id))
// TODO(bhandras): page size should be configurable in ForAll.
return b.tx.stm.FetchRangePaginatedRaw(
string(b.id), 1000,
func(kv KV) error {
key, val := getKeyVal(&kv)
return cb(key, val)
},
)
}
// Get returns the value for the given key. Returns nil if the key does
// not exist in this bucket.
func (b *readWriteBucket) Get(key []byte) []byte {
@ -406,9 +435,3 @@ func (b *readWriteBucket) Prefetch(paths ...[]string) {
b.tx.stm.Prefetch(flattenMap(keys), flattenMap(ranges))
}
// ForAll is an optimized version of ForEach with the limitation that no
// additional queries can be executed within the callback.
func (b *readWriteBucket) ForAll(cb func(k, v []byte) error) error {
return b.ForEach(cb)
}

View File

@ -81,6 +81,11 @@ type STM interface {
// Prefetch prefetches the passed keys and prefixes. For prefixes it'll
// fetch the whole range.
Prefetch(keys []string, prefix []string)
// FetchRangePaginatedRaw will fetch the range with the passed prefix up
// to the passed limit per page.
FetchRangePaginatedRaw(prefix string, limit int64,
cb func(kv KV) error) error
}
// CommitError is used to check if there was an error
@ -589,6 +594,52 @@ func (ws writeSet) puts() []v3.Op {
return puts
}
// FetchRangePaginatedRaw will fetch the range with the passed prefix up to the
// passed limit per page.
func (s *stm) FetchRangePaginatedRaw(prefix string, limit int64,
cb func(kv KV) error) error {
s.callCount++
opts := []v3.OpOption{
v3.WithSort(v3.SortByKey, v3.SortAscend),
v3.WithRange(v3.GetPrefixRangeEnd(prefix)),
v3.WithLimit(limit),
}
key := prefix
for {
resp, err := s.client.Get(
s.options.ctx, key, append(opts, s.getOpts...)...,
)
if err != nil {
return DatabaseError{
msg: "stm.fetch() failed",
err: err,
}
}
// Fill the read set with key/values returned.
for _, kv := range resp.Kvs {
err := cb(KV{string(kv.Key), string(kv.Value)})
if err != nil {
return err
}
}
// We've reached the range end.
if !resp.More {
break
}
// Continue from the page end + "\x00".
key = string(resp.Kvs[len(resp.Kvs)-1].Key) + "\x00"
}
return nil
}
// fetch is a helper to fetch key/value given options. If a value is returned
// then fetch will try to fix the STM's snapshot revision (if not already set).
// We'll also cache the returned key/value in the read set.