From 256b62e0d548ad3157cf49c53a2662fa17917ecb Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Wed, 16 Jun 2021 16:51:46 +0200 Subject: [PATCH] etcd: add kvdb.Prefetch This commit extends the kvdb interface in a backwards compatible way such that we'll be able to prefetch all keys in a bucket in one go reducing the number of roundtrips. --- kvdb/etcd/readwrite_bucket.go | 34 +++++++++++++++++++++++++++++++++ kvdb/etcd/readwrite_tx.go | 7 +++++++ kvdb/interface.go | 36 +++++++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+) diff --git a/kvdb/etcd/readwrite_bucket.go b/kvdb/etcd/readwrite_bucket.go index 7591d7f92..23162f852 100644 --- a/kvdb/etcd/readwrite_bucket.go +++ b/kvdb/etcd/readwrite_bucket.go @@ -371,3 +371,37 @@ func (b *readWriteBucket) Sequence() uint64 { return num } + +func flattenMap(m map[string]struct{}) []string { + result := make([]string, len(m)) + i := 0 + + for key := range m { + result[i] = key + i++ + } + + return result +} + +// Prefetch will prefetch all keys in the passed paths as well as all bucket +// keys along the paths. +func (b *readWriteBucket) Prefetch(paths ...[]string) { + keys := make(map[string]struct{}) + ranges := make(map[string]struct{}) + + for _, path := range paths { + parent := b.id + for _, bucket := range path { + bucketKey := makeBucketKey(parent, []byte(bucket)) + keys[string(bucketKey[:])] = struct{}{} + + id := makeBucketID(bucketKey) + parent = id[:] + } + + ranges[string(parent)] = struct{}{} + } + + b.tx.stm.Prefetch(flattenMap(keys), flattenMap(ranges)) +} diff --git a/kvdb/etcd/readwrite_tx.go b/kvdb/etcd/readwrite_tx.go index 12bc2779b..8e580d5af 100644 --- a/kvdb/etcd/readwrite_tx.go +++ b/kvdb/etcd/readwrite_tx.go @@ -41,6 +41,13 @@ func rootBucket(tx *readWriteTx) *readWriteBucket { return newReadWriteBucket(tx, tx.rootBucketID[:], tx.rootBucketID[:]) } +// RootBucket will return a handle to the root bucket. This is not a real handle +// but just a wrapper around the root bucket ID to allow derivation of child +// keys. +func (tx *readWriteTx) RootBucket() walletdb.ReadBucket { + return rootBucket(tx) +} + // ReadBucket opens the root bucket for read only access. If the bucket // described by the key does not exist, nil is returned. func (tx *readWriteTx) ReadBucket(key []byte) walletdb.ReadBucket { diff --git a/kvdb/interface.go b/kvdb/interface.go index 87593b678..6fe944394 100644 --- a/kvdb/interface.go +++ b/kvdb/interface.go @@ -93,6 +93,42 @@ type RwCursor = walletdb.ReadWriteCursor // writes. When only reads are necessary, consider using a RTx instead. type RwTx = walletdb.ReadWriteTx +// ExtendedRTx is an extension to walletdb.ReadTx to allow prefetching of keys. +type ExtendedRTx interface { + RTx + + // RootBucket returns the "root bucket" which is pseudo bucket used + // when prefetching (keys from) top level buckets. + RootBucket() RBucket +} + +// ExtendedRBucket is an extension to walletdb.ReadBucket to allow prefetching +// of all values inside buckets. +type ExtendedRBucket interface { + RBucket + + // Prefetch will attempt to prefetch all values under a path. + Prefetch(paths ...[]string) +} + +// Prefetch will attempt to prefetch all values under a path from the passed +// bucket. +func Prefetch(b RBucket, paths ...[]string) { + if bucket, ok := b.(ExtendedRBucket); ok { + bucket.Prefetch(paths...) + } +} + +// RootBucket is a wrapper to ExtendedRTx.RootBucket which does nothing if +// the implementation doesn't have ExtendedRTx. +func RootBucket(t RTx) RBucket { + if tx, ok := t.(ExtendedRTx); ok { + return tx.RootBucket() + } + + return nil +} + var ( // ErrBucketNotFound is returned when trying to access a bucket that // has not been created yet.