lnutils+lntemp: move SyncMap to lnutils

This commit moves the `SyncMap` from `lntemp/node` into `lnutils` so it
can be used by other packages.
This commit is contained in:
yyforyongyu 2023-01-19 06:37:57 +08:00
parent 89b0e25e2c
commit 692cd4bc4f
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868
4 changed files with 286 additions and 21 deletions

View File

@ -10,6 +10,7 @@ import (
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
"github.com/lightningnetwork/lnd/lntemp/rpc"
"github.com/lightningnetwork/lnd/lnutils"
)
type (
@ -153,18 +154,18 @@ type State struct {
// openChans records each opened channel and how many times it has
// heard the announcements from its graph subscription.
openChans *SyncMap[wire.OutPoint, []*OpenChannelUpdate]
openChans *lnutils.SyncMap[wire.OutPoint, []*OpenChannelUpdate]
// closedChans records each closed channel and its close channel update
// message received from its graph subscription.
closedChans *SyncMap[wire.OutPoint, *lnrpc.ClosedChannelUpdate]
closedChans *lnutils.SyncMap[wire.OutPoint, *lnrpc.ClosedChannelUpdate]
// numChanUpdates records the number of channel updates seen by each
// channel.
numChanUpdates *SyncMap[wire.OutPoint, int]
numChanUpdates *lnutils.SyncMap[wire.OutPoint, int]
// nodeUpdates records the node announcements seen by each node.
nodeUpdates *SyncMap[string, []*lnrpc.NodeUpdate]
nodeUpdates *lnutils.SyncMap[string, []*lnrpc.NodeUpdate]
// policyUpdates defines a type to store channel policy updates. It has
// the format,
@ -179,21 +180,23 @@ type State struct {
// },
// "chanPoint2": ...
// }
policyUpdates *SyncMap[wire.OutPoint, PolicyUpdate]
policyUpdates *lnutils.SyncMap[wire.OutPoint, PolicyUpdate]
}
// newState initialize a new state with every field being set to its zero
// value.
func newState(rpc *rpc.HarnessRPC) *State {
return &State{
rpc: rpc,
openChans: &SyncMap[wire.OutPoint, []*OpenChannelUpdate]{},
closedChans: &SyncMap[
rpc: rpc,
openChans: &lnutils.SyncMap[
wire.OutPoint, []*OpenChannelUpdate,
]{},
closedChans: &lnutils.SyncMap[
wire.OutPoint, *lnrpc.ClosedChannelUpdate,
]{},
numChanUpdates: &SyncMap[wire.OutPoint, int]{},
nodeUpdates: &SyncMap[string, []*lnrpc.NodeUpdate]{},
policyUpdates: &SyncMap[wire.OutPoint, PolicyUpdate]{},
numChanUpdates: &lnutils.SyncMap[wire.OutPoint, int]{},
nodeUpdates: &lnutils.SyncMap[string, []*lnrpc.NodeUpdate]{},
policyUpdates: &lnutils.SyncMap[wire.OutPoint, PolicyUpdate]{},
}
}
@ -352,9 +355,11 @@ func (s *State) resetEphermalStates(rpc *rpc.HarnessRPC) {
// Reset ephermal states which are used to record info from finished
// tests.
s.openChans = &SyncMap[wire.OutPoint, []*OpenChannelUpdate]{}
s.closedChans = &SyncMap[wire.OutPoint, *lnrpc.ClosedChannelUpdate]{}
s.numChanUpdates = &SyncMap[wire.OutPoint, int]{}
s.nodeUpdates = &SyncMap[string, []*lnrpc.NodeUpdate]{}
s.policyUpdates = &SyncMap[wire.OutPoint, PolicyUpdate]{}
s.openChans = &lnutils.SyncMap[wire.OutPoint, []*OpenChannelUpdate]{}
s.closedChans = &lnutils.SyncMap[
wire.OutPoint, *lnrpc.ClosedChannelUpdate,
]{}
s.numChanUpdates = &lnutils.SyncMap[wire.OutPoint, int]{}
s.nodeUpdates = &lnutils.SyncMap[string, []*lnrpc.NodeUpdate]{}
s.policyUpdates = &lnutils.SyncMap[wire.OutPoint, PolicyUpdate]{}
}

View File

@ -14,6 +14,7 @@ import (
"github.com/lightningnetwork/lnd/lntemp/rpc"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lnutils"
)
type chanWatchType uint8
@ -68,8 +69,8 @@ type nodeWatcher struct {
// of edges seen for that channel within the network. When this number
// reaches 2, then it means that both edge advertisements has
// propagated through the network.
openChanWatchers *SyncMap[wire.OutPoint, []chan struct{}]
closeChanWatchers *SyncMap[wire.OutPoint, []chan struct{}]
openChanWatchers *lnutils.SyncMap[wire.OutPoint, []chan struct{}]
closeChanWatchers *lnutils.SyncMap[wire.OutPoint, []chan struct{}]
wg sync.WaitGroup
}
@ -79,8 +80,12 @@ func newNodeWatcher(rpc *rpc.HarnessRPC, state *State) *nodeWatcher {
rpc: rpc,
state: state,
chanWatchRequests: make(chan *chanWatchRequest, 100),
openChanWatchers: &SyncMap[wire.OutPoint, []chan struct{}]{},
closeChanWatchers: &SyncMap[wire.OutPoint, []chan struct{}]{},
openChanWatchers: &lnutils.SyncMap[
wire.OutPoint, []chan struct{},
]{},
closeChanWatchers: &lnutils.SyncMap[
wire.OutPoint, []chan struct{},
]{},
}
}

View File

@ -1,4 +1,4 @@
package node
package lnutils
import "sync"

View File

@ -0,0 +1,255 @@
package lnutils_test
import (
"sync"
"sync/atomic"
"testing"
"github.com/lightningnetwork/lnd/lnutils"
)
func BenchmarkReadMutexMap(b *testing.B) {
// Create a map with a mutex.
m := make(map[int64]struct{})
// k is the unique key for each goroutine.
k := int64(0)
// Create a general mutex.
var mu sync.Mutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Perform a lock read.
mu.Lock()
_ = m[k]
mu.Unlock()
}
})
}
func BenchmarkReadRWMutexMap(b *testing.B) {
// Create a map with a mutex.
m := make(map[int64]struct{})
// k is the unique key for each goroutine.
k := int64(0)
// Create a read write mutex.
var mu sync.RWMutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Perform a lock read.
mu.RLock()
_ = m[k]
mu.RUnlock()
}
})
}
func BenchmarkReadSyncMap(b *testing.B) {
// Create a sync.Map.
syncMap := &sync.Map{}
// k is the unique key for each goroutine.
k := int64(0)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Read the value.
syncMap.Load(k)
}
})
}
func BenchmarkReadLndSyncMap(b *testing.B) {
// Create a sync.Map.
syncMap := &lnutils.SyncMap[int64, struct{}]{}
// k is the unique key for each goroutine.
k := int64(0)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Read the value.
syncMap.Load(k)
}
})
}
func BenchmarkWriteMutexMap(b *testing.B) {
// Create a map with a mutex.
m := make(map[int64]struct{})
// k is the unique key for each goroutine.
k := int64(0)
// Create a general mutex.
var mu sync.Mutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Perform a lock write.
mu.Lock()
m[k] = struct{}{}
mu.Unlock()
}
})
}
func BenchmarkWriteRWMutexMap(b *testing.B) {
// Create a map with a mutex.
m := make(map[int64]struct{})
// k is the unique key for each goroutine.
k := int64(0)
// Create a read write mutex.
var mu sync.RWMutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Perform a lock write.
mu.Lock()
m[k] = struct{}{}
mu.Unlock()
}
})
}
func BenchmarkWriteSyncMap(b *testing.B) {
// Create a sync.Map.
syncMap := &sync.Map{}
// k is the unique key for each goroutine.
k := int64(0)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Write the value.
syncMap.Store(k, struct{}{})
}
})
}
func BenchmarkWriteLndSyncMap(b *testing.B) {
// Create a sync.Map.
syncMap := &lnutils.SyncMap[int64, struct{}]{}
// k is the unique key for each goroutine.
k := int64(0)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Write the value.
syncMap.Store(k, struct{}{})
}
})
}
func BenchmarkDeleteMutexMap(b *testing.B) {
// Create a map with a mutex.
m := make(map[int64]struct{})
// k is the unique key for each goroutine.
k := int64(0)
// Create a general mutex.
var mu sync.Mutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Perform a lock delete.
mu.Lock()
delete(m, k)
mu.Unlock()
}
})
}
func BenchmarkDeleteRWMutexMap(b *testing.B) {
// Create a map with a mutex.
m := make(map[int64]struct{})
// k is the unique key for each goroutine.
k := int64(0)
// Create a read write mutex.
var mu sync.RWMutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Perform a lock delete.
mu.Lock()
delete(m, k)
mu.Unlock()
}
})
}
func BenchmarkDeleteSyncMap(b *testing.B) {
// Create a sync.Map.
syncMap := &sync.Map{}
// k is the unique key for each goroutine.
k := int64(0)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Delete the value.
syncMap.Delete(k)
}
})
}
func BenchmarkDeleteLndSyncMap(b *testing.B) {
// Create a sync.Map.
syncMap := &lnutils.SyncMap[int64, struct{}]{}
// k is the unique key for each goroutine.
k := int64(0)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
// Increment k.
atomic.AddInt64(&k, 1)
// Delete the value.
syncMap.Delete(k)
}
})
}