mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-01-18 13:27:56 +01:00
9acbbf0ec7
This commit fixes `InitPayment` method to make sure the subscribers get notified when new payments are created.
440 lines
14 KiB
Go
440 lines
14 KiB
Go
package routing
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/lightningnetwork/lnd/channeldb"
|
|
"github.com/lightningnetwork/lnd/lntypes"
|
|
"github.com/lightningnetwork/lnd/multimutex"
|
|
"github.com/lightningnetwork/lnd/queue"
|
|
)
|
|
|
|
// dbMPPayment is an interface derived from channeldb.MPPayment that is used by
|
|
// the payment lifecycle.
|
|
type dbMPPayment interface {
|
|
// GetState returns the current state of the payment.
|
|
GetState() *channeldb.MPPaymentState
|
|
|
|
// Terminated returns true if the payment is in a final state.
|
|
Terminated() bool
|
|
|
|
// GetStatus returns the current status of the payment.
|
|
GetStatus() channeldb.PaymentStatus
|
|
|
|
// NeedWaitAttempts specifies whether the payment needs to wait for the
|
|
// outcome of an attempt.
|
|
NeedWaitAttempts() (bool, error)
|
|
|
|
// GetHTLCs returns all HTLCs of this payment.
|
|
GetHTLCs() []channeldb.HTLCAttempt
|
|
|
|
// InFlightHTLCs returns all HTLCs that are in flight.
|
|
InFlightHTLCs() []channeldb.HTLCAttempt
|
|
|
|
// AllowMoreAttempts is used to decide whether we can safely attempt
|
|
// more HTLCs for a given payment state. Return an error if the payment
|
|
// is in an unexpected state.
|
|
AllowMoreAttempts() (bool, error)
|
|
|
|
// TerminalInfo returns the settled HTLC attempt or the payment's
|
|
// failure reason.
|
|
TerminalInfo() (*channeldb.HTLCAttempt, *channeldb.FailureReason)
|
|
}
|
|
|
|
// ControlTower tracks all outgoing payments made, whose primary purpose is to
|
|
// prevent duplicate payments to the same payment hash. In production, a
|
|
// persistent implementation is preferred so that tracking can survive across
|
|
// restarts. Payments are transitioned through various payment states, and the
|
|
// ControlTower interface provides access to driving the state transitions.
|
|
type ControlTower interface {
|
|
// This method checks that no succeeded payment exist for this payment
|
|
// hash.
|
|
InitPayment(lntypes.Hash, *channeldb.PaymentCreationInfo) error
|
|
|
|
// DeleteFailedAttempts removes all failed HTLCs from the db. It should
|
|
// be called for a given payment whenever all inflight htlcs are
|
|
// completed, and the payment has reached a final settled state.
|
|
DeleteFailedAttempts(lntypes.Hash) error
|
|
|
|
// RegisterAttempt atomically records the provided HTLCAttemptInfo.
|
|
RegisterAttempt(lntypes.Hash, *channeldb.HTLCAttemptInfo) error
|
|
|
|
// SettleAttempt marks the given attempt settled with the preimage. If
|
|
// this is a multi shard payment, this might implicitly mean the the
|
|
// full payment succeeded.
|
|
//
|
|
// After invoking this method, InitPayment should always return an
|
|
// error to prevent us from making duplicate payments to the same
|
|
// payment hash. The provided preimage is atomically saved to the DB
|
|
// for record keeping.
|
|
SettleAttempt(lntypes.Hash, uint64, *channeldb.HTLCSettleInfo) (
|
|
*channeldb.HTLCAttempt, error)
|
|
|
|
// FailAttempt marks the given payment attempt failed.
|
|
FailAttempt(lntypes.Hash, uint64, *channeldb.HTLCFailInfo) (
|
|
*channeldb.HTLCAttempt, error)
|
|
|
|
// FetchPayment fetches the payment corresponding to the given payment
|
|
// hash.
|
|
FetchPayment(paymentHash lntypes.Hash) (dbMPPayment, error)
|
|
|
|
// FailPayment transitions a payment into the Failed state, and records
|
|
// the ultimate reason the payment failed. Note that this should only
|
|
// be called when all active attempts are already failed. After
|
|
// invoking this method, InitPayment should return nil on its next call
|
|
// for this payment hash, allowing the user to make a subsequent
|
|
// payment.
|
|
FailPayment(lntypes.Hash, channeldb.FailureReason) error
|
|
|
|
// FetchInFlightPayments returns all payments with status InFlight.
|
|
FetchInFlightPayments() ([]*channeldb.MPPayment, error)
|
|
|
|
// SubscribePayment subscribes to updates for the payment with the given
|
|
// hash. A first update with the current state of the payment is always
|
|
// sent out immediately.
|
|
SubscribePayment(paymentHash lntypes.Hash) (ControlTowerSubscriber,
|
|
error)
|
|
|
|
// SubscribeAllPayments subscribes to updates for all payments. A first
|
|
// update with the current state of every inflight payment is always
|
|
// sent out immediately.
|
|
SubscribeAllPayments() (ControlTowerSubscriber, error)
|
|
}
|
|
|
|
// ControlTowerSubscriber contains the state for a payment update subscriber.
|
|
type ControlTowerSubscriber interface {
|
|
// Updates is the channel over which *channeldb.MPPayment updates can be
|
|
// received.
|
|
Updates() <-chan interface{}
|
|
|
|
// Close signals that the subscriber is no longer interested in updates.
|
|
Close()
|
|
}
|
|
|
|
// ControlTowerSubscriberImpl contains the state for a payment update
|
|
// subscriber.
|
|
type controlTowerSubscriberImpl struct {
|
|
updates <-chan interface{}
|
|
queue *queue.ConcurrentQueue
|
|
quit chan struct{}
|
|
}
|
|
|
|
// newControlTowerSubscriber instantiates a new subscriber state object.
|
|
func newControlTowerSubscriber() *controlTowerSubscriberImpl {
|
|
// Create a queue for payment updates.
|
|
queue := queue.NewConcurrentQueue(20)
|
|
queue.Start()
|
|
|
|
return &controlTowerSubscriberImpl{
|
|
updates: queue.ChanOut(),
|
|
queue: queue,
|
|
quit: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Close signals that the subscriber is no longer interested in updates.
|
|
func (s *controlTowerSubscriberImpl) Close() {
|
|
// Close quit channel so that any pending writes to the queue are
|
|
// cancelled.
|
|
close(s.quit)
|
|
|
|
// Stop the queue goroutine so that it won't leak.
|
|
s.queue.Stop()
|
|
}
|
|
|
|
// Updates is the channel over which *channeldb.MPPayment updates can be
|
|
// received.
|
|
func (s *controlTowerSubscriberImpl) Updates() <-chan interface{} {
|
|
return s.updates
|
|
}
|
|
|
|
// controlTower is persistent implementation of ControlTower to restrict
|
|
// double payment sending.
|
|
type controlTower struct {
|
|
db *channeldb.PaymentControl
|
|
|
|
// subscriberIndex is used to provide a unique id for each subscriber
|
|
// to all payments. This is used to easily remove the subscriber when
|
|
// necessary.
|
|
subscriberIndex uint64
|
|
subscribersAllPayments map[uint64]*controlTowerSubscriberImpl
|
|
subscribers map[lntypes.Hash][]*controlTowerSubscriberImpl
|
|
subscribersMtx sync.Mutex
|
|
|
|
// paymentsMtx provides synchronization on the payment level to ensure
|
|
// that no race conditions occur in between updating the database and
|
|
// sending a notification.
|
|
paymentsMtx *multimutex.Mutex[lntypes.Hash]
|
|
}
|
|
|
|
// NewControlTower creates a new instance of the controlTower.
|
|
func NewControlTower(db *channeldb.PaymentControl) ControlTower {
|
|
return &controlTower{
|
|
db: db,
|
|
subscribersAllPayments: make(
|
|
map[uint64]*controlTowerSubscriberImpl,
|
|
),
|
|
subscribers: make(map[lntypes.Hash][]*controlTowerSubscriberImpl),
|
|
paymentsMtx: multimutex.NewMutex[lntypes.Hash](),
|
|
}
|
|
}
|
|
|
|
// InitPayment checks or records the given PaymentCreationInfo with the DB,
|
|
// making sure it does not already exist as an in-flight payment. Then this
|
|
// method returns successfully, the payment is guaranteed to be in the
|
|
// Initiated state.
|
|
func (p *controlTower) InitPayment(paymentHash lntypes.Hash,
|
|
info *channeldb.PaymentCreationInfo) error {
|
|
|
|
err := p.db.InitPayment(paymentHash, info)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Take lock before querying the db to prevent missing or duplicating
|
|
// an update.
|
|
p.paymentsMtx.Lock(paymentHash)
|
|
defer p.paymentsMtx.Unlock(paymentHash)
|
|
|
|
payment, err := p.db.FetchPayment(paymentHash)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
p.notifySubscribers(paymentHash, payment)
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteFailedAttempts deletes all failed htlcs if the payment was
|
|
// successfully settled.
|
|
func (p *controlTower) DeleteFailedAttempts(paymentHash lntypes.Hash) error {
|
|
return p.db.DeleteFailedAttempts(paymentHash)
|
|
}
|
|
|
|
// RegisterAttempt atomically records the provided HTLCAttemptInfo to the
|
|
// DB.
|
|
func (p *controlTower) RegisterAttempt(paymentHash lntypes.Hash,
|
|
attempt *channeldb.HTLCAttemptInfo) error {
|
|
|
|
p.paymentsMtx.Lock(paymentHash)
|
|
defer p.paymentsMtx.Unlock(paymentHash)
|
|
|
|
payment, err := p.db.RegisterAttempt(paymentHash, attempt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Notify subscribers of the attempt registration.
|
|
p.notifySubscribers(paymentHash, payment)
|
|
|
|
return nil
|
|
}
|
|
|
|
// SettleAttempt marks the given attempt settled with the preimage. If
|
|
// this is a multi shard payment, this might implicitly mean the the
|
|
// full payment succeeded.
|
|
func (p *controlTower) SettleAttempt(paymentHash lntypes.Hash,
|
|
attemptID uint64, settleInfo *channeldb.HTLCSettleInfo) (
|
|
*channeldb.HTLCAttempt, error) {
|
|
|
|
p.paymentsMtx.Lock(paymentHash)
|
|
defer p.paymentsMtx.Unlock(paymentHash)
|
|
|
|
payment, err := p.db.SettleAttempt(paymentHash, attemptID, settleInfo)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Notify subscribers of success event.
|
|
p.notifySubscribers(paymentHash, payment)
|
|
|
|
return payment.GetAttempt(attemptID)
|
|
}
|
|
|
|
// FailAttempt marks the given payment attempt failed.
|
|
func (p *controlTower) FailAttempt(paymentHash lntypes.Hash,
|
|
attemptID uint64, failInfo *channeldb.HTLCFailInfo) (
|
|
*channeldb.HTLCAttempt, error) {
|
|
|
|
p.paymentsMtx.Lock(paymentHash)
|
|
defer p.paymentsMtx.Unlock(paymentHash)
|
|
|
|
payment, err := p.db.FailAttempt(paymentHash, attemptID, failInfo)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Notify subscribers of failed attempt.
|
|
p.notifySubscribers(paymentHash, payment)
|
|
|
|
return payment.GetAttempt(attemptID)
|
|
}
|
|
|
|
// FetchPayment fetches the payment corresponding to the given payment hash.
|
|
func (p *controlTower) FetchPayment(paymentHash lntypes.Hash) (
|
|
dbMPPayment, error) {
|
|
|
|
return p.db.FetchPayment(paymentHash)
|
|
}
|
|
|
|
// FailPayment transitions a payment into the Failed state, and records the
|
|
// reason the payment failed. After invoking this method, InitPayment should
|
|
// return nil on its next call for this payment hash, allowing the switch to
|
|
// make a subsequent payment.
|
|
func (p *controlTower) FailPayment(paymentHash lntypes.Hash,
|
|
reason channeldb.FailureReason) error {
|
|
|
|
p.paymentsMtx.Lock(paymentHash)
|
|
defer p.paymentsMtx.Unlock(paymentHash)
|
|
|
|
payment, err := p.db.Fail(paymentHash, reason)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Notify subscribers of fail event.
|
|
p.notifySubscribers(paymentHash, payment)
|
|
|
|
return nil
|
|
}
|
|
|
|
// FetchInFlightPayments returns all payments with status InFlight.
|
|
func (p *controlTower) FetchInFlightPayments() ([]*channeldb.MPPayment, error) {
|
|
return p.db.FetchInFlightPayments()
|
|
}
|
|
|
|
// SubscribePayment subscribes to updates for the payment with the given hash. A
|
|
// first update with the current state of the payment is always sent out
|
|
// immediately.
|
|
func (p *controlTower) SubscribePayment(paymentHash lntypes.Hash) (
|
|
ControlTowerSubscriber, error) {
|
|
|
|
// Take lock before querying the db to prevent missing or duplicating an
|
|
// update.
|
|
p.paymentsMtx.Lock(paymentHash)
|
|
defer p.paymentsMtx.Unlock(paymentHash)
|
|
|
|
payment, err := p.db.FetchPayment(paymentHash)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
subscriber := newControlTowerSubscriber()
|
|
|
|
// Always write current payment state to the channel.
|
|
subscriber.queue.ChanIn() <- payment
|
|
|
|
// Payment is currently in flight. Register this subscriber for further
|
|
// updates. Otherwise this update is the final update and the incoming
|
|
// channel can be closed. This will close the queue's outgoing channel
|
|
// when all updates have been written.
|
|
if !payment.Terminated() {
|
|
p.subscribersMtx.Lock()
|
|
p.subscribers[paymentHash] = append(
|
|
p.subscribers[paymentHash], subscriber,
|
|
)
|
|
p.subscribersMtx.Unlock()
|
|
} else {
|
|
close(subscriber.queue.ChanIn())
|
|
}
|
|
|
|
return subscriber, nil
|
|
}
|
|
|
|
// SubscribeAllPayments subscribes to updates for all inflight payments. A first
|
|
// update with the current state of every inflight payment is always sent out
|
|
// immediately.
|
|
// Note: If payments are in-flight while starting a new subscription, the start
|
|
// of the payment stream could produce out-of-order and/or duplicate events. In
|
|
// order to get updates for every in-flight payment attempt make sure to
|
|
// subscribe to this method before initiating any payments.
|
|
func (p *controlTower) SubscribeAllPayments() (ControlTowerSubscriber, error) {
|
|
subscriber := newControlTowerSubscriber()
|
|
|
|
// Add the subscriber to the list before fetching in-flight payments, so
|
|
// no events are missed. If a payment attempt update occurs after
|
|
// appending and before fetching in-flight payments, an out-of-order
|
|
// duplicate may be produced, because it is then fetched in below call
|
|
// and notified through the subscription.
|
|
p.subscribersMtx.Lock()
|
|
p.subscribersAllPayments[p.subscriberIndex] = subscriber
|
|
p.subscriberIndex++
|
|
p.subscribersMtx.Unlock()
|
|
|
|
inflightPayments, err := p.db.FetchInFlightPayments()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for index := range inflightPayments {
|
|
// Always write current payment state to the channel.
|
|
subscriber.queue.ChanIn() <- inflightPayments[index]
|
|
}
|
|
|
|
return subscriber, nil
|
|
}
|
|
|
|
// notifySubscribers sends a final payment event to all subscribers of this
|
|
// payment. The channel will be closed after this. Note that this function must
|
|
// be executed atomically (by means of a lock) with the database update to
|
|
// guarantee consistency of the notifications.
|
|
func (p *controlTower) notifySubscribers(paymentHash lntypes.Hash,
|
|
event *channeldb.MPPayment) {
|
|
|
|
// Get all subscribers for this payment.
|
|
p.subscribersMtx.Lock()
|
|
|
|
subscribersPaymentHash, ok := p.subscribers[paymentHash]
|
|
if !ok && len(p.subscribersAllPayments) == 0 {
|
|
p.subscribersMtx.Unlock()
|
|
return
|
|
}
|
|
|
|
// If the payment reached a terminal state, the subscriber list can be
|
|
// cleared. There won't be any more updates.
|
|
terminal := event.Terminated()
|
|
if terminal {
|
|
delete(p.subscribers, paymentHash)
|
|
}
|
|
|
|
// Copy subscribers to all payments locally while holding the lock in
|
|
// order to avoid concurrency issues while reading/writing the map.
|
|
subscribersAllPayments := make(map[uint64]*controlTowerSubscriberImpl)
|
|
for k, v := range p.subscribersAllPayments {
|
|
subscribersAllPayments[k] = v
|
|
}
|
|
p.subscribersMtx.Unlock()
|
|
|
|
// Notify all subscribers that subscribed to the current payment hash.
|
|
for _, subscriber := range subscribersPaymentHash {
|
|
select {
|
|
case subscriber.queue.ChanIn() <- event:
|
|
// If this event is the last, close the incoming channel
|
|
// of the queue. This will signal the subscriber that
|
|
// there won't be any more updates.
|
|
if terminal {
|
|
close(subscriber.queue.ChanIn())
|
|
}
|
|
|
|
// If subscriber disappeared, skip notification. For further
|
|
// notifications, we'll keep skipping over this subscriber.
|
|
case <-subscriber.quit:
|
|
}
|
|
}
|
|
|
|
// Notify all subscribers that subscribed to all payments.
|
|
for key, subscriber := range subscribersAllPayments {
|
|
select {
|
|
case subscriber.queue.ChanIn() <- event:
|
|
|
|
// If subscriber disappeared, remove it from the subscribers
|
|
// list.
|
|
case <-subscriber.quit:
|
|
p.subscribersMtx.Lock()
|
|
delete(p.subscribersAllPayments, key)
|
|
p.subscribersMtx.Unlock()
|
|
}
|
|
}
|
|
}
|