mirror of
https://github.com/lightningnetwork/lnd.git
synced 2024-11-19 09:53:54 +01:00
3aa008ab04
The current implementation of subscribe is difficult to mock because the queue that we send updates on in unexported, so you cannot create a subscribe.Client object and then add your own updates. While it is possible to run a subscribe server in tests, subscribe servers will shutdown before dispatching their udpates to all clients, which can be flakey (and is difficult to workaround). In this commit, we add a subscription interface so that these testing struggles can be addressed with a mock.
223 lines
5.5 KiB
Go
223 lines
5.5 KiB
Go
package subscribe
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/lightningnetwork/lnd/queue"
|
|
)
|
|
|
|
// ErrServerShuttingDown is an error returned in case the server is in the
|
|
// process of shutting down.
|
|
var ErrServerShuttingDown = errors.New("subscription server shutting down")
|
|
|
|
// Client is used to get notified about updates the caller has subscribed to,
|
|
type Client struct {
|
|
// cancel should be called in case the client no longer wants to
|
|
// subscribe for updates from the server.
|
|
cancel func()
|
|
|
|
updates *queue.ConcurrentQueue
|
|
quit chan struct{}
|
|
}
|
|
|
|
// Updates returns a read-only channel where the updates the client has
|
|
// subscribed to will be delivered.
|
|
func (c *Client) Updates() <-chan interface{} {
|
|
return c.updates.ChanOut()
|
|
}
|
|
|
|
// Quit is a channel that will be closed in case the server decides to no
|
|
// longer deliver updates to this client.
|
|
func (c *Client) Quit() <-chan struct{} {
|
|
return c.quit
|
|
}
|
|
|
|
// Cancel should be called in case the client no longer wants to
|
|
// subscribe for updates from the server.
|
|
func (c *Client) Cancel() {
|
|
c.cancel()
|
|
}
|
|
|
|
// Server is a struct that manages a set of subscriptions and their
|
|
// corresponding clients. Any update will be delivered to all active clients.
|
|
type Server struct {
|
|
clientCounter uint64 // To be used atomically.
|
|
|
|
started uint32 // To be used atomically.
|
|
stopped uint32 // To be used atomically.
|
|
|
|
clients map[uint64]*Client
|
|
clientUpdates chan *clientUpdate
|
|
|
|
updates chan interface{}
|
|
|
|
quit chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// clientUpdate is an internal message sent to the subscriptionHandler to
|
|
// either register a new client for subscription or cancel an existing
|
|
// subscription.
|
|
type clientUpdate struct {
|
|
// cancel indicates if the update to the client is cancelling an
|
|
// existing client's subscription. If not then this update will be to
|
|
// subscribe a new client.
|
|
cancel bool
|
|
|
|
// clientID is the unique identifier for this client. Any further
|
|
// updates (deleting or adding) to this notification client will be
|
|
// dispatched according to the target clientID.
|
|
clientID uint64
|
|
|
|
// client is the new client that will receive updates. Will be nil in
|
|
// case this is a cancallation update.
|
|
client *Client
|
|
}
|
|
|
|
// NewServer returns a new Server.
|
|
func NewServer() *Server {
|
|
return &Server{
|
|
clients: make(map[uint64]*Client),
|
|
clientUpdates: make(chan *clientUpdate),
|
|
updates: make(chan interface{}),
|
|
quit: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Start starts the Server, making it ready to accept subscriptions and
|
|
// updates.
|
|
func (s *Server) Start() error {
|
|
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
|
|
return nil
|
|
}
|
|
|
|
s.wg.Add(1)
|
|
go s.subscriptionHandler()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop stops the server.
|
|
func (s *Server) Stop() error {
|
|
if !atomic.CompareAndSwapUint32(&s.stopped, 0, 1) {
|
|
return nil
|
|
}
|
|
|
|
close(s.quit)
|
|
s.wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Subscribe returns a Client that will receive updates any time the Server is
|
|
// made aware of a new event.
|
|
func (s *Server) Subscribe() (*Client, error) {
|
|
// We'll first atomically obtain the next ID for this client from the
|
|
// incrementing client ID counter.
|
|
clientID := atomic.AddUint64(&s.clientCounter, 1)
|
|
|
|
// Create the client that will be returned. The Cancel method is
|
|
// populated to send the cancellation intent to the
|
|
// subscriptionHandler.
|
|
client := &Client{
|
|
updates: queue.NewConcurrentQueue(20),
|
|
quit: make(chan struct{}),
|
|
cancel: func() {
|
|
select {
|
|
case s.clientUpdates <- &clientUpdate{
|
|
cancel: true,
|
|
clientID: clientID,
|
|
}:
|
|
case <-s.quit:
|
|
return
|
|
}
|
|
},
|
|
}
|
|
|
|
select {
|
|
case s.clientUpdates <- &clientUpdate{
|
|
cancel: false,
|
|
clientID: clientID,
|
|
client: client,
|
|
}:
|
|
case <-s.quit:
|
|
return nil, ErrServerShuttingDown
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
// SendUpdate is called to send the passed update to all currently active
|
|
// subscription clients.
|
|
func (s *Server) SendUpdate(update interface{}) error {
|
|
|
|
select {
|
|
case s.updates <- update:
|
|
return nil
|
|
case <-s.quit:
|
|
return ErrServerShuttingDown
|
|
}
|
|
}
|
|
|
|
// subscriptionHandler is the main handler for the Server. It will handle
|
|
// incoming updates and subscriptions, and forward the incoming updates to the
|
|
// registered clients.
|
|
//
|
|
// NOTE: MUST be run as a goroutine.
|
|
func (s *Server) subscriptionHandler() {
|
|
defer s.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
|
|
// If a client update is received, the either a new
|
|
// subscription becomes active, or we cancel and existing one.
|
|
case update := <-s.clientUpdates:
|
|
clientID := update.clientID
|
|
|
|
// In case this is a cancellation, stop the client's
|
|
// underlying queue, and remove the client from the set
|
|
// of active subscription clients.
|
|
if update.cancel {
|
|
client, ok := s.clients[update.clientID]
|
|
if ok {
|
|
client.updates.Stop()
|
|
close(client.quit)
|
|
delete(s.clients, clientID)
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
// If this was not a cancellation, start the underlying
|
|
// queue and add the client to our set of subscription
|
|
// clients. It will be notified about any new updates
|
|
// the server receives.
|
|
update.client.updates.Start()
|
|
s.clients[update.clientID] = update.client
|
|
|
|
// A new update was received, forward it to all active clients.
|
|
case upd := <-s.updates:
|
|
for _, client := range s.clients {
|
|
select {
|
|
case client.updates.ChanIn() <- upd:
|
|
case <-client.quit:
|
|
case <-s.quit:
|
|
return
|
|
}
|
|
}
|
|
|
|
// In case the server is shutting down, stop the clients and
|
|
// close the quit channels to notify them.
|
|
case <-s.quit:
|
|
for _, client := range s.clients {
|
|
client.updates.Stop()
|
|
close(client.quit)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|