Merge pull request #8336 from lightningnetwork/fn-module-goodies

fn: add some new goodies to sub-module to be used in future PRs
This commit is contained in:
Olaoluwa Osuntokun 2024-01-23 19:18:34 -08:00 committed by GitHub
commit 758ae6fbec
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 574 additions and 0 deletions

128
fn/conc_queue.go Normal file
View file

@ -0,0 +1,128 @@
package fn
import (
"sync"
"github.com/lightninglabs/neutrino/cache/lru"
)
// ConcurrentQueue is a typed concurrent-safe FIFO queue with unbounded
// capacity. Clients interact with the queue by pushing items into the in
// channel and popping items from the out channel. There is a goroutine that
// manages moving items from the in channel to the out channel in the correct
// order that must be started by calling Start().
type ConcurrentQueue[T any] struct {
started sync.Once
stopped sync.Once
chanIn chan T
chanOut chan T
overflow *lru.List[T]
wg sync.WaitGroup
quit chan struct{}
}
// NewConcurrentQueue constructs a ConcurrentQueue. The bufferSize parameter is
// the capacity of the output channel. When the size of the queue is below this
// threshold, pushes do n[?12;4$yot incur the overhead of the less efficient overflow
// structure.
func NewConcurrentQueue[T any](bufferSize int) *ConcurrentQueue[T] {
return &ConcurrentQueue[T]{
chanIn: make(chan T),
chanOut: make(chan T, bufferSize),
overflow: lru.NewList[T](),
quit: make(chan struct{}),
}
}
// ChanIn returns a channel that can be used to push new items into the queue.
func (cq *ConcurrentQueue[T]) ChanIn() chan<- T {
return cq.chanIn
}
// ChanOut returns a channel that can be used to pop items from the queue.
func (cq *ConcurrentQueue[T]) ChanOut() <-chan T {
return cq.chanOut
}
// Start begins a goroutine that manages moving items from the in channel to the
// out channel. The queue tries to move items directly to the out channel
// minimize overhead, but if the out channel is full it pushes items to an
// overflow queue. This must be called before using the queue.
func (cq *ConcurrentQueue[T]) Start() {
cq.started.Do(cq.start)
}
func (cq *ConcurrentQueue[T]) start() {
cq.wg.Add(1)
go func() {
defer cq.wg.Done()
readLoop:
for {
nextElement := cq.overflow.Front()
if nextElement == nil {
// Overflow queue is empty so incoming items can
// be pushed directly to the output channel. If
// output channel is full though, push to
// overflow.
select {
case item, ok := <-cq.chanIn:
if !ok {
break readLoop
}
select {
case cq.chanOut <- item:
// Optimistically push directly
// to chanOut.
default:
cq.overflow.PushBack(item)
}
case <-cq.quit:
return
}
} else {
// Overflow queue is not empty, so any new items
// get pushed to the back to preserve order.
select {
case item, ok := <-cq.chanIn:
if !ok {
break readLoop
}
cq.overflow.PushBack(item)
case cq.chanOut <- nextElement.Value:
cq.overflow.Remove(nextElement)
case <-cq.quit:
return
}
}
}
// Incoming channel has been closed. Empty overflow queue into
// the outgoing channel.
nextElement := cq.overflow.Front()
for nextElement != nil {
select {
case cq.chanOut <- nextElement.Value:
cq.overflow.Remove(nextElement)
case <-cq.quit:
return
}
nextElement = cq.overflow.Front()
}
// Close outgoing channel.
close(cq.chanOut)
}()
}
// Stop ends the goroutine that moves items from the in channel to the out
// channel. This does not clear the queue state, so the queue can be restarted
// without dropping items.
func (cq *ConcurrentQueue[T]) Stop() {
cq.stopped.Do(func() {
close(cq.quit)
cq.wg.Wait()
})
}

48
fn/either.go Normal file
View file

@ -0,0 +1,48 @@
package fn
// Either is a type that can be either left or right.
type Either[L any, R any] struct {
left Option[L]
right Option[R]
}
// NewLeft returns an Either with a left value.
func NewLeft[L any, R any](l L) Either[L, R] {
return Either[L, R]{left: Some(l), right: None[R]()}
}
// NewRight returns an Either with a right value.
func NewRight[L any, R any](r R) Either[L, R] {
return Either[L, R]{left: None[L](), right: Some(r)}
}
// WhenLeft executes the given function if the Either is left.
func (e Either[L, R]) WhenLeft(f func(L)) {
e.left.WhenSome(f)
}
// WhenRight executes the given function if the Either is right.
func (e Either[L, R]) WhenRight(f func(R)) {
e.right.WhenSome(f)
}
// IsLeft returns true if the Either is left.
func (e Either[L, R]) IsLeft() bool {
return e.left.IsSome()
}
// IsRight returns true if the Either is right.
func (e Either[L, R]) IsRight() bool {
return e.right.IsSome()
}
// MapLeft maps the left value of the Either to a new value.
func MapLeft[L any, R any, O any](f func(L) O) func(Either[L, R]) Option[O] {
return func(e Either[L, R]) Option[O] {
if e.IsLeft() {
return MapOption(f)(e.left)
}
return None[O]()
}
}

142
fn/events.go Normal file
View file

@ -0,0 +1,142 @@
package fn
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
const (
// DefaultQueueSize is the default size to use for concurrent queues.
DefaultQueueSize = 10
)
var (
// nextID is the next subscription ID that will be used for a new event
// receiver. This MUST be used atomically.
nextID uint64
)
// EventReceiver is a struct type that holds two queues for new and removed
// items respectively.
type EventReceiver[T any] struct {
// id is the internal process-unique ID of the subscription.
id uint64
// NewItemCreated is sent to when a new item was created successfully.
NewItemCreated *ConcurrentQueue[T]
// ItemRemoved is sent to when an existing item was removed.
ItemRemoved *ConcurrentQueue[T]
}
// ID returns the internal process-unique ID of the subscription.
func (e *EventReceiver[T]) ID() uint64 {
return e.id
}
// Stop stops the receiver from processing events.
func (e *EventReceiver[T]) Stop() {
e.NewItemCreated.Stop()
e.ItemRemoved.Stop()
}
// NewEventReceiver creates a new event receiver with concurrent queues of the
// given size.
func NewEventReceiver[T any](queueSize int) *EventReceiver[T] {
created := NewConcurrentQueue[T](queueSize)
created.Start()
removed := NewConcurrentQueue[T](queueSize)
removed.Start()
id := atomic.AddUint64(&nextID, 1)
return &EventReceiver[T]{
id: id,
NewItemCreated: created,
ItemRemoved: removed,
}
}
// EventPublisher is an interface type for a component that offers event based
// subscriptions for publishing events.
type EventPublisher[T any, Q any] interface {
// RegisterSubscriber adds a new subscriber for receiving events. The
// deliverExisting boolean indicates whether already existing items
// should be sent to the NewItemCreated channel when the subscription is
// started. An optional deliverFrom can be specified to indicate from
// which timestamp/index/marker onward existing items should be
// delivered on startup. If deliverFrom is nil/zero/empty then all
// existing items will be delivered.
RegisterSubscriber(receiver *EventReceiver[T], deliverExisting bool,
deliverFrom Q) error
// RemoveSubscriber removes the given subscriber and also stops it from
// processing events.
RemoveSubscriber(subscriber *EventReceiver[T]) error
}
// Event is a generic event that can be sent to a subscriber.
type Event interface {
Timestamp() time.Time
}
// EventDistributor is a struct type that helps to distribute events to multiple
// subscribers.
type EventDistributor[T any] struct {
// subscribers is a map of components that want to be notified on new
// events, keyed by their subscription ID.
subscribers map[uint64]*EventReceiver[T]
// subscriberMtx guards the subscribers map and access to the
// subscriptionID.
subscriberMtx sync.Mutex
}
// NewEventDistributor creates a new event distributor of the declared type.
func NewEventDistributor[T any]() *EventDistributor[T] {
return &EventDistributor[T]{
subscribers: make(map[uint64]*EventReceiver[T]),
}
}
// RegisterSubscriber adds a new subscriber for receiving events.
func (d *EventDistributor[T]) RegisterSubscriber(subscriber *EventReceiver[T]) {
d.subscriberMtx.Lock()
defer d.subscriberMtx.Unlock()
d.subscribers[subscriber.ID()] = subscriber
}
// RemoveSubscriber removes the given subscriber and also stops it from
// processing events.
func (d *EventDistributor[T]) RemoveSubscriber(
subscriber *EventReceiver[T]) error {
d.subscriberMtx.Lock()
defer d.subscriberMtx.Unlock()
_, ok := d.subscribers[subscriber.ID()]
if !ok {
return fmt.Errorf("subscriber with ID %d not found",
subscriber.ID())
}
subscriber.Stop()
delete(d.subscribers, subscriber.ID())
return nil
}
// NotifySubscribers sends the given events to all subscribers.
func (d *EventDistributor[T]) NotifySubscribers(events ...T) {
d.subscriberMtx.Lock()
for i := range events {
event := events[i]
for id := range d.subscribers {
d.subscribers[id].NewItemCreated.ChanIn() <- event
}
}
d.subscriberMtx.Unlock()
}

17
fn/func.go Normal file
View file

@ -0,0 +1,17 @@
package fn
// Reducer represents a function that takes an accumulator and the value, then
// returns a new accumulator.
type Reducer[T, V any] func(accum T, value V) T
// Reduce takes a slice of something, and a reducer, and produces a final
// accumulated value.
func Reduce[T any, V any, S []V](s S, f Reducer[T, V]) T {
var accum T
for _, x := range s {
accum = f(accum, x)
}
return accum
}

View file

@ -1,3 +1,8 @@
module github.com/lightningnetwork/lnd/fn
go 1.19
require (
github.com/lightninglabs/neutrino/cache v1.1.2
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b
)

8
fn/go.sum Normal file
View file

@ -0,0 +1,8 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/lightninglabs/neutrino/cache v1.1.2 h1:C9DY/DAPaPxbFC+xNNEI/z1SJY9GS3shmlu5hIQ798g=
github.com/lightninglabs/neutrino/cache v1.1.2/go.mod h1:XJNcgdOw1LQnanGjw8Vj44CvguYA25IMKjWFZczwZuo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b h1:kLiC65FbiHWFAOu+lxwNPujcsl8VYyTYYEZnsOO1WK4=
golang.org/x/exp v0.0.0-20231226003508-02704c960a9b/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

View file

@ -48,6 +48,22 @@ func (o Option[A]) UnwrapOr(a A) A {
return a
}
// UnwrapOrFunc is used to extract a value from an option, and we supply a
// thunk to be evaluated in the case when the Option is empty.
func (o Option[A]) UnwrapOrFunc(f func() A) A {
return ElimOption(o, f, func(a A) A { return a })
}
// UnwrapOrFuncErr is used to extract a value from an option, and we supply a
// thunk to be evaluated in the case when the Option is empty.
func (o Option[A]) UnwrapOrFuncErr(f func() (A, error)) (A, error) {
if o.isSome {
return o.some, nil
}
return f()
}
// WhenSome is used to conditionally perform a side-effecting function that
// accepts a value of the type that parameterizes the option. If this function
// performs no side effects, WhenSome is useless.
@ -117,6 +133,19 @@ func MapOption[A, B any](f func(A) B) func(Option[A]) Option[B] {
}
}
// MapOptionZ transforms a pure function A -> B into one that will operate
// inside the Option context. Unlike MapOption, this function will return the
// default/zero argument of the return type if the Option is empty.
func MapOptionZ[A, B any](o Option[A], f func(A) B) B {
var zero B
if o.IsNone() {
return zero
}
return f(o.some)
}
// LiftA2Option transforms a pure function (A, B) -> C into one that will
// operate in an Option context. For the returned function, if either of its
// arguments are None, then the result will be None.

51
fn/queue.go Normal file
View file

@ -0,0 +1,51 @@
package fn
// Queue is a generic queue implementation.
type Queue[T any] struct {
items []T
}
// NewQueue creates a new Queue.
func NewQueue[T any](startingItems ...T) Queue[T] {
return Queue[T]{
items: startingItems,
}
}
// Enqueue adds one or more an items to the end of the Queue.
func (q *Queue[T]) Enqueue(value ...T) {
q.items = append(q.items, value...)
}
// Dequeue removes an element from the front of the Queue. If there're no items
// in the queue, then None is returned.
func (q *Queue[T]) Dequeue() Option[T] {
if len(q.items) == 0 {
return None[T]()
}
value := q.items[0]
q.items = q.items[1:]
return Some(value)
}
// Peek returns the first item in the queue without removing it. If the queue
// is empty, then None is returned.
func (q *Queue[T]) Peek() Option[T] {
if q.IsEmpty() {
return None[T]()
}
return Some(q.items[0])
}
// IsEmpty returns true if the Queue is empty
func (q *Queue[T]) IsEmpty() bool {
return len(q.items) == 0
}
// Size returns the number of items in the Queue
func (q *Queue[T]) Size() int {
return len(q.items)
}

38
fn/recv.go Normal file
View file

@ -0,0 +1,38 @@
package fn
import (
"fmt"
"time"
)
// RecvOrTimeout attempts to recv over chan c, returning the value. If the
// timeout passes before the recv succeeds, an error is returned
func RecvOrTimeout[T any](c <-chan T, timeout time.Duration) (T, error) {
select {
case m := <-c:
return m, nil
case <-time.After(timeout):
var zero T
return zero, fmt.Errorf("timeout hit")
}
}
// RecvResp takes three channels: a response channel, an error channel and a
// quit channel. If either of these channels are sent on, then the function
// will exit with that response. This can be used to wait for a response,
// error, or a quit signal.
func RecvResp[T any](r <-chan T, e <-chan error, q <-chan struct{}) (T, error) {
var noResp T
select {
case resp := <-r:
return resp, nil
case err := <-e:
return noResp, err
case <-q:
return noResp, fmt.Errorf("quitting")
}
}

13
fn/send.go Normal file
View file

@ -0,0 +1,13 @@
package fn
// SendOrQuit attempts to and a message through channel c. If this succeeds,
// then bool is returned. Otherwise if a quit signal is received first, then
// false is returned.
func SendOrQuit[T any, Q any](c chan<- T, msg T, quit chan Q) bool {
select {
case c <- msg:
return true
case <-quit:
return false
}
}

95
fn/set.go Normal file
View file

@ -0,0 +1,95 @@
package fn
import "golang.org/x/exp/maps"
// Set is a generic set using type params that supports the following
// operations: diff, union, intersection, and subset.
type Set[T comparable] map[T]struct{}
// NewSet returns a new set with the given elements.
func NewSet[T comparable](elems ...T) Set[T] {
s := make(Set[T])
for _, e := range elems {
s.Add(e)
}
return s
}
// Add adds an element to the set.
func (s Set[T]) Add(e T) {
s[e] = struct{}{}
}
// Remove removes an element from the set.
func (s Set[T]) Remove(e T) {
delete(s, e)
}
// Contains returns true if the set contains the element.
func (s Set[T]) Contains(e T) bool {
_, ok := s[e]
return ok
}
// Diff returns the difference between two sets.
func (s Set[T]) Diff(other Set[T]) Set[T] {
diff := make(Set[T])
for e := range s {
if !other.Contains(e) {
diff.Add(e)
}
}
return diff
}
// Union returns the union of two sets.
func (s Set[T]) Union(other Set[T]) Set[T] {
union := make(Set[T])
for e := range s {
union.Add(e)
}
for e := range other {
union.Add(e)
}
return union
}
// Intersect returns the intersection of two sets.
func (s Set[T]) Intersect(other Set[T]) Set[T] {
intersect := make(Set[T])
for e := range s {
if other.Contains(e) {
intersect.Add(e)
}
}
return intersect
}
// Subset returns true if the set is a subset of the other set.
func (s Set[T]) Subset(other Set[T]) bool {
for e := range s {
if !other.Contains(e) {
return false
}
}
return true
}
// Equal returns true if the set is equal to the other set.
func (s Set[T]) Equal(other Set[T]) bool {
return s.Subset(other) && other.Subset(s)
}
// ToSlice returns the set as a slice.
func (s Set[T]) ToSlice() []T {
return maps.Keys(s)
}
// SetDiff returns all the items that are in the first set but not in the
// second.
func SetDiff[T comparable](a, b []T) []T {
setA := NewSet(a...)
setB := NewSet(b...)
return setA.Diff(setB).ToSlice()
}