mirror of
https://github.com/lightningnetwork/lnd.git
synced 2024-11-19 01:43:16 +01:00
missioncontrolstore: remove duplication of in-memory data
This removes duplication of in-memory data during the periodic flushing stage of the mission control store. The existing code entirely duplicates the in-memory cache of the store, which is very wasteful when only a few additional results are being rotated into the store. This has a significant performance penalty specially for wallets that remain online for a long time with a low volume of payments. The worst case scenario are wallets that see at most 1 new payment a second, where the entire in-memory cache is recreated every second. This commit improves the situation by determining what will be the actual changes that need to be committed before initiating the db transaction and only keeping track of these to update the in-memory cache if the db tx is successful.
This commit is contained in:
parent
9059a655db
commit
0c7a173354
@ -359,68 +359,98 @@ func (b *missionControlStore) storeResults() error {
|
||||
b.queueCond.L.Unlock()
|
||||
|
||||
var (
|
||||
keys *list.List
|
||||
keysMap map[string]struct{}
|
||||
newKeys map[string]struct{}
|
||||
delKeys []string
|
||||
storeCount int
|
||||
pruneCount int
|
||||
)
|
||||
|
||||
// Create a deduped list of new entries.
|
||||
newKeys = make(map[string]struct{}, l.Len())
|
||||
for e := l.Front(); e != nil; e = e.Next() {
|
||||
pr, ok := e.Value.(*paymentResult)
|
||||
if !ok {
|
||||
return fmt.Errorf("wrong type %T (not *paymentResult)",
|
||||
e.Value)
|
||||
}
|
||||
key := string(getResultKey(pr))
|
||||
if _, ok := b.keysMap[key]; ok {
|
||||
l.Remove(e)
|
||||
continue
|
||||
}
|
||||
if _, ok := newKeys[key]; ok {
|
||||
l.Remove(e)
|
||||
continue
|
||||
}
|
||||
newKeys[key] = struct{}{}
|
||||
}
|
||||
|
||||
// Create a list of entries to delete.
|
||||
toDelete := b.keys.Len() + len(newKeys) - b.maxRecords
|
||||
if b.maxRecords > 0 && toDelete > 0 {
|
||||
delKeys = make([]string, 0, toDelete)
|
||||
|
||||
// Delete as many as needed from old keys.
|
||||
for e := b.keys.Front(); len(delKeys) < toDelete && e != nil; {
|
||||
key, ok := e.Value.(string)
|
||||
if !ok {
|
||||
return fmt.Errorf("wrong type %T (not string)",
|
||||
e.Value)
|
||||
}
|
||||
delKeys = append(delKeys, key)
|
||||
e = e.Next()
|
||||
}
|
||||
|
||||
// If more deletions are needed, simply do not add from the
|
||||
// list of new keys.
|
||||
for e := l.Front(); len(delKeys) < toDelete && e != nil; {
|
||||
toDelete--
|
||||
pr, ok := e.Value.(*paymentResult)
|
||||
if !ok {
|
||||
return fmt.Errorf("wrong type %T (not "+
|
||||
"*paymentResult )", e.Value)
|
||||
}
|
||||
key := string(getResultKey(pr))
|
||||
delete(newKeys, key)
|
||||
l.Remove(e)
|
||||
e = l.Front()
|
||||
}
|
||||
}
|
||||
|
||||
err := kvdb.Update(b.db, func(tx kvdb.RwTx) error {
|
||||
bucket := tx.ReadWriteBucket(resultsKey)
|
||||
|
||||
for e := l.Front(); e != nil; e = e.Next() {
|
||||
pr := e.Value.(*paymentResult)
|
||||
pr, ok := e.Value.(*paymentResult)
|
||||
if !ok {
|
||||
return fmt.Errorf("wrong type %T (not "+
|
||||
"*paymentResult)", e.Value)
|
||||
}
|
||||
|
||||
// Serialize result into key and value byte slices.
|
||||
k, v, err := serializeResult(pr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// The store is assumed to be idempotent. It could be
|
||||
// that the same result is added twice and in that case
|
||||
// we don't need to put the value again.
|
||||
if _, ok := keysMap[string(k)]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Put into results bucket.
|
||||
if err := bucket.Put(k, v); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
keys.PushBack(string(k))
|
||||
keysMap[string(k)] = struct{}{}
|
||||
storeCount++
|
||||
}
|
||||
|
||||
// Prune oldest entries.
|
||||
for {
|
||||
if b.maxRecords == 0 || keys.Len() <= b.maxRecords {
|
||||
break
|
||||
}
|
||||
|
||||
front := keys.Front()
|
||||
key := front.Value.(string)
|
||||
|
||||
for _, key := range delKeys {
|
||||
if err := bucket.Delete([]byte(key)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
keys.Remove(front)
|
||||
delete(keysMap, key)
|
||||
pruneCount++
|
||||
}
|
||||
|
||||
return nil
|
||||
}, func() {
|
||||
keys = list.New()
|
||||
keys.PushBackList(b.keys)
|
||||
|
||||
keysMap = make(map[string]struct{})
|
||||
for k := range b.keysMap {
|
||||
keysMap[k] = struct{}{}
|
||||
}
|
||||
|
||||
storeCount, pruneCount = 0, 0
|
||||
})
|
||||
|
||||
@ -431,8 +461,20 @@ func (b *missionControlStore) storeResults() error {
|
||||
log.Debugf("Stored mission control results: %d added, %d deleted",
|
||||
storeCount, pruneCount)
|
||||
|
||||
b.keys = keys
|
||||
b.keysMap = keysMap
|
||||
// DB Update was successful, update the in-memory cache.
|
||||
for _, key := range delKeys {
|
||||
delete(b.keysMap, key)
|
||||
b.keys.Remove(b.keys.Front())
|
||||
}
|
||||
for e := l.Front(); e != nil; e = e.Next() {
|
||||
pr, ok := e.Value.(*paymentResult)
|
||||
if !ok {
|
||||
return fmt.Errorf("wrong type %T (not *paymentResult)",
|
||||
e.Value)
|
||||
}
|
||||
key := string(getResultKey(pr))
|
||||
b.keys.PushBack(key)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user