mirror of
https://github.com/lightningnetwork/lnd.git
synced 2024-11-19 09:53:54 +01:00
1779 lines
53 KiB
Go
1779 lines
53 KiB
Go
package invoices
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/lightningnetwork/lnd/clock"
|
|
"github.com/lightningnetwork/lnd/lntypes"
|
|
"github.com/lightningnetwork/lnd/lnwire"
|
|
"github.com/lightningnetwork/lnd/queue"
|
|
"github.com/lightningnetwork/lnd/record"
|
|
)
|
|
|
|
var (
|
|
// ErrInvoiceExpiryTooSoon is returned when an invoice is attempted to
|
|
// be accepted or settled with not enough blocks remaining.
|
|
ErrInvoiceExpiryTooSoon = errors.New("invoice expiry too soon")
|
|
|
|
// ErrInvoiceAmountTooLow is returned when an invoice is attempted to
|
|
// be accepted or settled with an amount that is too low.
|
|
ErrInvoiceAmountTooLow = errors.New(
|
|
"paid amount less than invoice amount",
|
|
)
|
|
|
|
// ErrShuttingDown is returned when an operation failed because the
|
|
// invoice registry is shutting down.
|
|
ErrShuttingDown = errors.New("invoice registry shutting down")
|
|
)
|
|
|
|
const (
|
|
// DefaultHtlcHoldDuration defines the default for how long mpp htlcs
|
|
// are held while waiting for the other set members to arrive.
|
|
DefaultHtlcHoldDuration = 120 * time.Second
|
|
)
|
|
|
|
// RegistryConfig contains the configuration parameters for invoice registry.
|
|
type RegistryConfig struct {
|
|
// FinalCltvRejectDelta defines the number of blocks before the expiry
|
|
// of the htlc where we no longer settle it as an exit hop and instead
|
|
// cancel it back. Normally this value should be lower than the cltv
|
|
// expiry of any invoice we create and the code effectuating this should
|
|
// not be hit.
|
|
FinalCltvRejectDelta int32
|
|
|
|
// HtlcHoldDuration defines for how long mpp htlcs are held while
|
|
// waiting for the other set members to arrive.
|
|
HtlcHoldDuration time.Duration
|
|
|
|
// Clock holds the clock implementation that is used to provide
|
|
// Now() and TickAfter() and is useful to stub out the clock functions
|
|
// during testing.
|
|
Clock clock.Clock
|
|
|
|
// AcceptKeySend indicates whether we want to accept spontaneous key
|
|
// send payments.
|
|
AcceptKeySend bool
|
|
|
|
// AcceptAMP indicates whether we want to accept spontaneous AMP
|
|
// payments.
|
|
AcceptAMP bool
|
|
|
|
// GcCanceledInvoicesOnStartup if set, we'll attempt to garbage collect
|
|
// all canceled invoices upon start.
|
|
GcCanceledInvoicesOnStartup bool
|
|
|
|
// GcCanceledInvoicesOnTheFly if set, we'll garbage collect all newly
|
|
// canceled invoices on the fly.
|
|
GcCanceledInvoicesOnTheFly bool
|
|
|
|
// KeysendHoldTime indicates for how long we want to accept and hold
|
|
// spontaneous keysend payments.
|
|
KeysendHoldTime time.Duration
|
|
}
|
|
|
|
// htlcReleaseEvent describes an htlc auto-release event. It is used to release
|
|
// mpp htlcs for which the complete set didn't arrive in time.
|
|
type htlcReleaseEvent struct {
|
|
// invoiceRef identifiers the invoice this htlc belongs to.
|
|
invoiceRef InvoiceRef
|
|
|
|
// key is the circuit key of the htlc to release.
|
|
key CircuitKey
|
|
|
|
// releaseTime is the time at which to release the htlc.
|
|
releaseTime time.Time
|
|
}
|
|
|
|
// Less is used to order PriorityQueueItem's by their release time such that
|
|
// items with the older release time are at the top of the queue.
|
|
//
|
|
// NOTE: Part of the queue.PriorityQueueItem interface.
|
|
func (r *htlcReleaseEvent) Less(other queue.PriorityQueueItem) bool {
|
|
return r.releaseTime.Before(other.(*htlcReleaseEvent).releaseTime)
|
|
}
|
|
|
|
// InvoiceRegistry is a central registry of all the outstanding invoices
|
|
// created by the daemon. The registry is a thin wrapper around a map in order
|
|
// to ensure that all updates/reads are thread safe.
|
|
type InvoiceRegistry struct {
|
|
sync.RWMutex
|
|
|
|
nextClientID uint32 // must be used atomically
|
|
|
|
idb InvoiceDB
|
|
|
|
// cfg contains the registry's configuration parameters.
|
|
cfg *RegistryConfig
|
|
|
|
// notificationClientMux locks notificationClients and
|
|
// singleNotificationClients. Using a separate mutex for these maps is
|
|
// necessary to avoid deadlocks in the registry when processing invoice
|
|
// events.
|
|
notificationClientMux sync.RWMutex
|
|
|
|
notificationClients map[uint32]*InvoiceSubscription
|
|
|
|
// TODO(yy): use map[lntypes.Hash]*SingleInvoiceSubscription for better
|
|
// performance.
|
|
singleNotificationClients map[uint32]*SingleInvoiceSubscription
|
|
|
|
// invoiceEvents is a single channel over which invoice updates are
|
|
// carried.
|
|
invoiceEvents chan *invoiceEvent
|
|
|
|
// hodlSubscriptionsMux locks the hodlSubscriptions and
|
|
// hodlReverseSubscriptions. Using a separate mutex for these maps is
|
|
// necessary to avoid deadlocks in the registry when processing invoice
|
|
// events.
|
|
hodlSubscriptionsMux sync.RWMutex
|
|
|
|
// hodlSubscriptions is a map from a circuit key to a list of
|
|
// subscribers. It is used for efficient notification of links.
|
|
hodlSubscriptions map[CircuitKey]map[chan<- interface{}]struct{}
|
|
|
|
// reverseSubscriptions tracks circuit keys subscribed to per
|
|
// subscriber. This is used to unsubscribe from all hashes efficiently.
|
|
hodlReverseSubscriptions map[chan<- interface{}]map[CircuitKey]struct{}
|
|
|
|
// htlcAutoReleaseChan contains the new htlcs that need to be
|
|
// auto-released.
|
|
htlcAutoReleaseChan chan *htlcReleaseEvent
|
|
|
|
expiryWatcher *InvoiceExpiryWatcher
|
|
|
|
wg sync.WaitGroup
|
|
quit chan struct{}
|
|
}
|
|
|
|
// NewRegistry creates a new invoice registry. The invoice registry
|
|
// wraps the persistent on-disk invoice storage with an additional in-memory
|
|
// layer. The in-memory layer is in place such that debug invoices can be added
|
|
// which are volatile yet available system wide within the daemon.
|
|
func NewRegistry(idb InvoiceDB, expiryWatcher *InvoiceExpiryWatcher,
|
|
cfg *RegistryConfig) *InvoiceRegistry {
|
|
|
|
notificationClients := make(map[uint32]*InvoiceSubscription)
|
|
singleNotificationClients := make(map[uint32]*SingleInvoiceSubscription)
|
|
return &InvoiceRegistry{
|
|
idb: idb,
|
|
notificationClients: notificationClients,
|
|
singleNotificationClients: singleNotificationClients,
|
|
invoiceEvents: make(chan *invoiceEvent, 100),
|
|
hodlSubscriptions: make(
|
|
map[CircuitKey]map[chan<- interface{}]struct{},
|
|
),
|
|
hodlReverseSubscriptions: make(
|
|
map[chan<- interface{}]map[CircuitKey]struct{},
|
|
),
|
|
cfg: cfg,
|
|
htlcAutoReleaseChan: make(chan *htlcReleaseEvent),
|
|
expiryWatcher: expiryWatcher,
|
|
quit: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// scanInvoicesOnStart will scan all invoices on start and add active invoices
|
|
// to the invoice expiry watcher while also attempting to delete all canceled
|
|
// invoices.
|
|
func (i *InvoiceRegistry) scanInvoicesOnStart(ctx context.Context) error {
|
|
pendingInvoices, err := i.idb.FetchPendingInvoices(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var pending []invoiceExpiry
|
|
for paymentHash, invoice := range pendingInvoices {
|
|
invoice := invoice
|
|
expiryRef := makeInvoiceExpiry(paymentHash, &invoice)
|
|
if expiryRef != nil {
|
|
pending = append(pending, expiryRef)
|
|
}
|
|
}
|
|
|
|
log.Debugf("Adding %d pending invoices to the expiry watcher",
|
|
len(pending))
|
|
i.expiryWatcher.AddInvoices(pending...)
|
|
|
|
if i.cfg.GcCanceledInvoicesOnStartup {
|
|
log.Infof("Deleting canceled invoices")
|
|
err = i.idb.DeleteCanceledInvoices(ctx)
|
|
if err != nil {
|
|
log.Warnf("Deleting canceled invoices failed: %v", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Start starts the registry and all goroutines it needs to carry out its task.
|
|
func (i *InvoiceRegistry) Start() error {
|
|
// Start InvoiceExpiryWatcher and prepopulate it with existing active
|
|
// invoices.
|
|
err := i.expiryWatcher.Start(func(hash lntypes.Hash, force bool) error {
|
|
return i.cancelInvoiceImpl(context.Background(), hash, force)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Info("InvoiceRegistry starting")
|
|
|
|
i.wg.Add(1)
|
|
go i.invoiceEventLoop()
|
|
|
|
// Now scan all pending and removable invoices to the expiry watcher or
|
|
// delete them.
|
|
err = i.scanInvoicesOnStart(context.Background())
|
|
if err != nil {
|
|
_ = i.Stop()
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop signals the registry for a graceful shutdown.
|
|
func (i *InvoiceRegistry) Stop() error {
|
|
log.Info("InvoiceRegistry shutting down...")
|
|
defer log.Debug("InvoiceRegistry shutdown complete")
|
|
|
|
i.expiryWatcher.Stop()
|
|
|
|
close(i.quit)
|
|
|
|
i.wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
// invoiceEvent represents a new event that has modified on invoice on disk.
|
|
// Only two event types are currently supported: newly created invoices, and
|
|
// instance where invoices are settled.
|
|
type invoiceEvent struct {
|
|
hash lntypes.Hash
|
|
invoice *Invoice
|
|
setID *[32]byte
|
|
}
|
|
|
|
// tickAt returns a channel that ticks at the specified time. If the time has
|
|
// already passed, it will tick immediately.
|
|
func (i *InvoiceRegistry) tickAt(t time.Time) <-chan time.Time {
|
|
now := i.cfg.Clock.Now()
|
|
return i.cfg.Clock.TickAfter(t.Sub(now))
|
|
}
|
|
|
|
// invoiceEventLoop is the dedicated goroutine responsible for accepting
|
|
// new notification subscriptions, cancelling old subscriptions, and
|
|
// dispatching new invoice events.
|
|
func (i *InvoiceRegistry) invoiceEventLoop() {
|
|
defer i.wg.Done()
|
|
|
|
// Set up a heap for htlc auto-releases.
|
|
autoReleaseHeap := &queue.PriorityQueue{}
|
|
|
|
for {
|
|
// If there is something to release, set up a release tick
|
|
// channel.
|
|
var nextReleaseTick <-chan time.Time
|
|
if autoReleaseHeap.Len() > 0 {
|
|
head := autoReleaseHeap.Top().(*htlcReleaseEvent)
|
|
nextReleaseTick = i.tickAt(head.releaseTime)
|
|
}
|
|
|
|
select {
|
|
// A sub-systems has just modified the invoice state, so we'll
|
|
// dispatch notifications to all registered clients.
|
|
case event := <-i.invoiceEvents:
|
|
// For backwards compatibility, do not notify all
|
|
// invoice subscribers of cancel and accept events.
|
|
state := event.invoice.State
|
|
if state != ContractCanceled &&
|
|
state != ContractAccepted {
|
|
|
|
i.dispatchToClients(event)
|
|
}
|
|
i.dispatchToSingleClients(event)
|
|
|
|
// A new htlc came in for auto-release.
|
|
case event := <-i.htlcAutoReleaseChan:
|
|
log.Debugf("Scheduling auto-release for htlc: "+
|
|
"ref=%v, key=%v at %v",
|
|
event.invoiceRef, event.key, event.releaseTime)
|
|
|
|
// We use an independent timer for every htlc rather
|
|
// than a set timer that is reset with every htlc coming
|
|
// in. Otherwise the sender could keep resetting the
|
|
// timer until the broadcast window is entered and our
|
|
// channel is force closed.
|
|
autoReleaseHeap.Push(event)
|
|
|
|
// The htlc at the top of the heap needs to be auto-released.
|
|
case <-nextReleaseTick:
|
|
event := autoReleaseHeap.Pop().(*htlcReleaseEvent)
|
|
err := i.cancelSingleHtlc(
|
|
event.invoiceRef, event.key, ResultMppTimeout,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("HTLC timer: %v", err)
|
|
}
|
|
|
|
case <-i.quit:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// dispatchToSingleClients passes the supplied event to all notification
|
|
// clients that subscribed to all the invoice this event applies to.
|
|
func (i *InvoiceRegistry) dispatchToSingleClients(event *invoiceEvent) {
|
|
// Dispatch to single invoice subscribers.
|
|
clients := i.copySingleClients()
|
|
for _, client := range clients {
|
|
payHash := client.invoiceRef.PayHash()
|
|
|
|
if payHash == nil || *payHash != event.hash {
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case <-client.backlogDelivered:
|
|
// We won't deliver any events until the backlog has
|
|
// went through first.
|
|
case <-i.quit:
|
|
return
|
|
}
|
|
|
|
client.notify(event)
|
|
}
|
|
}
|
|
|
|
// dispatchToClients passes the supplied event to all notification clients that
|
|
// subscribed to all invoices. Add and settle indices are used to make sure
|
|
// that clients don't receive duplicate or unwanted events.
|
|
func (i *InvoiceRegistry) dispatchToClients(event *invoiceEvent) {
|
|
invoice := event.invoice
|
|
|
|
clients := i.copyClients()
|
|
for clientID, client := range clients {
|
|
// Before we dispatch this event, we'll check
|
|
// to ensure that this client hasn't already
|
|
// received this notification in order to
|
|
// ensure we don't duplicate any events.
|
|
|
|
// TODO(joostjager): Refactor switches.
|
|
state := event.invoice.State
|
|
switch {
|
|
// If we've already sent this settle event to
|
|
// the client, then we can skip this.
|
|
case state == ContractSettled &&
|
|
client.settleIndex >= invoice.SettleIndex:
|
|
continue
|
|
|
|
// Similarly, if we've already sent this add to
|
|
// the client then we can skip this one, but only if this isn't
|
|
// an AMP invoice. AMP invoices always remain in the settle
|
|
// state as a base invoice.
|
|
case event.setID == nil && state == ContractOpen &&
|
|
client.addIndex >= invoice.AddIndex:
|
|
continue
|
|
|
|
// These two states should never happen, but we
|
|
// log them just in case so we can detect this
|
|
// instance.
|
|
case state == ContractOpen &&
|
|
client.addIndex+1 != invoice.AddIndex:
|
|
log.Warnf("client=%v for invoice "+
|
|
"notifications missed an update, "+
|
|
"add_index=%v, new add event index=%v",
|
|
clientID, client.addIndex,
|
|
invoice.AddIndex)
|
|
|
|
case state == ContractSettled &&
|
|
client.settleIndex+1 != invoice.SettleIndex:
|
|
log.Warnf("client=%v for invoice "+
|
|
"notifications missed an update, "+
|
|
"settle_index=%v, new settle event index=%v",
|
|
clientID, client.settleIndex,
|
|
invoice.SettleIndex)
|
|
}
|
|
|
|
select {
|
|
case <-client.backlogDelivered:
|
|
// We won't deliver any events until the backlog has
|
|
// been processed.
|
|
case <-i.quit:
|
|
return
|
|
}
|
|
|
|
err := client.notify(&invoiceEvent{
|
|
invoice: invoice,
|
|
setID: event.setID,
|
|
})
|
|
if err != nil {
|
|
log.Errorf("Failed dispatching to client: %v", err)
|
|
return
|
|
}
|
|
|
|
// Each time we send a notification to a client, we'll record
|
|
// the latest add/settle index it has. We'll use this to ensure
|
|
// we don't send a notification twice, which can happen if a new
|
|
// event is added while we're catching up a new client.
|
|
invState := event.invoice.State
|
|
switch {
|
|
case invState == ContractSettled:
|
|
client.settleIndex = invoice.SettleIndex
|
|
|
|
case invState == ContractOpen && event.setID == nil:
|
|
client.addIndex = invoice.AddIndex
|
|
|
|
// If this is an AMP invoice, then we'll need to use the set ID
|
|
// to keep track of the settle index of the client. AMP
|
|
// invoices never go to the open state, but if a setID is
|
|
// passed, then we know it was just settled and will track the
|
|
// highest settle index so far.
|
|
case invState == ContractOpen && event.setID != nil:
|
|
setID := *event.setID
|
|
client.settleIndex = invoice.AMPState[setID].SettleIndex
|
|
|
|
default:
|
|
log.Errorf("unexpected invoice state: %v",
|
|
event.invoice.State)
|
|
}
|
|
}
|
|
}
|
|
|
|
// deliverBacklogEvents will attempts to query the invoice database for any
|
|
// notifications that the client has missed since it reconnected last.
|
|
func (i *InvoiceRegistry) deliverBacklogEvents(ctx context.Context,
|
|
client *InvoiceSubscription) error {
|
|
|
|
addEvents, err := i.idb.InvoicesAddedSince(ctx, client.addIndex)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
settleEvents, err := i.idb.InvoicesSettledSince(ctx, client.settleIndex)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If we have any to deliver, then we'll append them to the end of the
|
|
// notification queue in order to catch up the client before delivering
|
|
// any new notifications.
|
|
for _, addEvent := range addEvents {
|
|
// We re-bind the loop variable to ensure we don't hold onto
|
|
// the loop reference causing is to point to the same item.
|
|
addEvent := addEvent
|
|
|
|
select {
|
|
case client.ntfnQueue.ChanIn() <- &invoiceEvent{
|
|
invoice: &addEvent,
|
|
}:
|
|
case <-i.quit:
|
|
return ErrShuttingDown
|
|
}
|
|
}
|
|
|
|
for _, settleEvent := range settleEvents {
|
|
// We re-bind the loop variable to ensure we don't hold onto
|
|
// the loop reference causing is to point to the same item.
|
|
settleEvent := settleEvent
|
|
|
|
select {
|
|
case client.ntfnQueue.ChanIn() <- &invoiceEvent{
|
|
invoice: &settleEvent,
|
|
}:
|
|
case <-i.quit:
|
|
return ErrShuttingDown
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// deliverSingleBacklogEvents will attempt to query the invoice database to
|
|
// retrieve the current invoice state and deliver this to the subscriber. Single
|
|
// invoice subscribers will always receive the current state right after
|
|
// subscribing. Only in case the invoice does not yet exist, nothing is sent
|
|
// yet.
|
|
func (i *InvoiceRegistry) deliverSingleBacklogEvents(ctx context.Context,
|
|
client *SingleInvoiceSubscription) error {
|
|
|
|
invoice, err := i.idb.LookupInvoice(ctx, client.invoiceRef)
|
|
|
|
// It is possible that the invoice does not exist yet, but the client is
|
|
// already watching it in anticipation.
|
|
isNotFound := errors.Is(err, ErrInvoiceNotFound)
|
|
isNotCreated := errors.Is(err, ErrNoInvoicesCreated)
|
|
if isNotFound || isNotCreated {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
payHash := client.invoiceRef.PayHash()
|
|
if payHash == nil {
|
|
return nil
|
|
}
|
|
|
|
err = client.notify(&invoiceEvent{
|
|
hash: *payHash,
|
|
invoice: &invoice,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Debugf("Client(id=%v) delivered single backlog event: payHash=%v",
|
|
client.id, payHash)
|
|
|
|
return nil
|
|
}
|
|
|
|
// AddInvoice adds a regular invoice for the specified amount, identified by
|
|
// the passed preimage. Additionally, any memo or receipt data provided will
|
|
// also be stored on-disk. Once this invoice is added, subsystems within the
|
|
// daemon add/forward HTLCs are able to obtain the proper preimage required for
|
|
// redemption in the case that we're the final destination. We also return the
|
|
// addIndex of the newly created invoice which monotonically increases for each
|
|
// new invoice added. A side effect of this function is that it also sets
|
|
// AddIndex on the invoice argument.
|
|
func (i *InvoiceRegistry) AddInvoice(ctx context.Context, invoice *Invoice,
|
|
paymentHash lntypes.Hash) (uint64, error) {
|
|
|
|
i.Lock()
|
|
|
|
ref := InvoiceRefByHash(paymentHash)
|
|
log.Debugf("Invoice%v: added with terms %v", ref, invoice.Terms)
|
|
|
|
addIndex, err := i.idb.AddInvoice(ctx, invoice, paymentHash)
|
|
if err != nil {
|
|
i.Unlock()
|
|
return 0, err
|
|
}
|
|
|
|
// Now that we've added the invoice, we'll send dispatch a message to
|
|
// notify the clients of this new invoice.
|
|
i.notifyClients(paymentHash, invoice, nil)
|
|
i.Unlock()
|
|
|
|
// InvoiceExpiryWatcher.AddInvoice must not be locked by InvoiceRegistry
|
|
// to avoid deadlock when a new invoice is added while an other is being
|
|
// canceled.
|
|
invoiceExpiryRef := makeInvoiceExpiry(paymentHash, invoice)
|
|
if invoiceExpiryRef != nil {
|
|
i.expiryWatcher.AddInvoices(invoiceExpiryRef)
|
|
}
|
|
|
|
return addIndex, nil
|
|
}
|
|
|
|
// LookupInvoice looks up an invoice by its payment hash (R-Hash), if found
|
|
// then we're able to pull the funds pending within an HTLC.
|
|
//
|
|
// TODO(roasbeef): ignore if settled?
|
|
func (i *InvoiceRegistry) LookupInvoice(ctx context.Context,
|
|
rHash lntypes.Hash) (Invoice, error) {
|
|
|
|
// We'll check the database to see if there's an existing matching
|
|
// invoice.
|
|
ref := InvoiceRefByHash(rHash)
|
|
return i.idb.LookupInvoice(ctx, ref)
|
|
}
|
|
|
|
// LookupInvoiceByRef looks up an invoice by the given reference, if found
|
|
// then we're able to pull the funds pending within an HTLC.
|
|
func (i *InvoiceRegistry) LookupInvoiceByRef(ctx context.Context,
|
|
ref InvoiceRef) (Invoice, error) {
|
|
|
|
return i.idb.LookupInvoice(ctx, ref)
|
|
}
|
|
|
|
// startHtlcTimer starts a new timer via the invoice registry main loop that
|
|
// cancels a single htlc on an invoice when the htlc hold duration has passed.
|
|
func (i *InvoiceRegistry) startHtlcTimer(invoiceRef InvoiceRef,
|
|
key CircuitKey, acceptTime time.Time) error {
|
|
|
|
releaseTime := acceptTime.Add(i.cfg.HtlcHoldDuration)
|
|
event := &htlcReleaseEvent{
|
|
invoiceRef: invoiceRef,
|
|
key: key,
|
|
releaseTime: releaseTime,
|
|
}
|
|
|
|
select {
|
|
case i.htlcAutoReleaseChan <- event:
|
|
return nil
|
|
|
|
case <-i.quit:
|
|
return ErrShuttingDown
|
|
}
|
|
}
|
|
|
|
// cancelSingleHtlc cancels a single accepted htlc on an invoice. It takes
|
|
// a resolution result which will be used to notify subscribed links and
|
|
// resolvers of the details of the htlc cancellation.
|
|
func (i *InvoiceRegistry) cancelSingleHtlc(invoiceRef InvoiceRef,
|
|
key CircuitKey, result FailResolutionResult) error {
|
|
|
|
updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
|
|
// Only allow individual htlc cancellation on open invoices.
|
|
if invoice.State != ContractOpen {
|
|
log.Debugf("cancelSingleHtlc: invoice %v no longer "+
|
|
"open", invoiceRef)
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
// Lookup the current status of the htlc in the database.
|
|
var (
|
|
htlcState HtlcState
|
|
setID *SetID
|
|
)
|
|
htlc, ok := invoice.Htlcs[key]
|
|
if !ok {
|
|
// If this is an AMP invoice, then all the HTLCs won't
|
|
// be read out, so we'll consult the other mapping to
|
|
// try to find the HTLC state in question here.
|
|
var found bool
|
|
for ampSetID, htlcSet := range invoice.AMPState {
|
|
ampSetID := ampSetID
|
|
for htlcKey := range htlcSet.InvoiceKeys {
|
|
if htlcKey == key {
|
|
htlcState = htlcSet.State
|
|
setID = &SetID
|
|
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
return nil, fmt.Errorf("htlc %v not found", key)
|
|
}
|
|
} else {
|
|
htlcState = htlc.State
|
|
}
|
|
|
|
// Cancellation is only possible if the htlc wasn't already
|
|
// resolved.
|
|
if htlcState != HtlcStateAccepted {
|
|
log.Debugf("cancelSingleHtlc: htlc %v on invoice %v "+
|
|
"is already resolved", key, invoiceRef)
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
log.Debugf("cancelSingleHtlc: cancelling htlc %v on invoice %v",
|
|
key, invoiceRef)
|
|
|
|
// Return an update descriptor that cancels htlc and keeps
|
|
// invoice open.
|
|
canceledHtlcs := map[CircuitKey]struct{}{
|
|
key: {},
|
|
}
|
|
|
|
return &InvoiceUpdateDesc{
|
|
UpdateType: CancelHTLCsUpdate,
|
|
CancelHtlcs: canceledHtlcs,
|
|
SetID: setID,
|
|
}, nil
|
|
}
|
|
|
|
// Try to mark the specified htlc as canceled in the invoice database.
|
|
// Intercept the update descriptor to set the local updated variable. If
|
|
// no invoice update is performed, we can return early.
|
|
setID := (*SetID)(invoiceRef.SetID())
|
|
var updated bool
|
|
invoice, err := i.idb.UpdateInvoice(
|
|
context.Background(), invoiceRef, setID,
|
|
func(invoice *Invoice) (
|
|
*InvoiceUpdateDesc, error) {
|
|
|
|
updateDesc, err := updateInvoice(invoice)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
updated = updateDesc != nil
|
|
|
|
return updateDesc, err
|
|
},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !updated {
|
|
return nil
|
|
}
|
|
|
|
// The invoice has been updated. Notify subscribers of the htlc
|
|
// resolution.
|
|
htlc, ok := invoice.Htlcs[key]
|
|
if !ok {
|
|
return fmt.Errorf("htlc %v not found", key)
|
|
}
|
|
if htlc.State == HtlcStateCanceled {
|
|
resolution := NewFailResolution(
|
|
key, int32(htlc.AcceptHeight), result,
|
|
)
|
|
|
|
i.notifyHodlSubscribers(resolution)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// processKeySend just-in-time inserts an invoice if this htlc is a keysend
|
|
// htlc.
|
|
func (i *InvoiceRegistry) processKeySend(ctx invoiceUpdateCtx) error {
|
|
// Retrieve keysend record if present.
|
|
preimageSlice, ok := ctx.customRecords[record.KeySendType]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
// Cancel htlc is preimage is invalid.
|
|
preimage, err := lntypes.MakePreimage(preimageSlice)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if preimage.Hash() != ctx.hash {
|
|
return fmt.Errorf("invalid keysend preimage %v for hash %v",
|
|
preimage, ctx.hash)
|
|
}
|
|
|
|
// Only allow keysend for non-mpp payments.
|
|
if ctx.mpp != nil {
|
|
return errors.New("no mpp keysend supported")
|
|
}
|
|
|
|
// Create an invoice for the htlc amount.
|
|
amt := ctx.amtPaid
|
|
|
|
// Set tlv required feature vector on the invoice. Otherwise we wouldn't
|
|
// be able to pay to it with keysend.
|
|
rawFeatures := lnwire.NewRawFeatureVector(
|
|
lnwire.TLVOnionPayloadRequired,
|
|
)
|
|
features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
|
|
|
|
// Use the minimum block delta that we require for settling htlcs.
|
|
finalCltvDelta := i.cfg.FinalCltvRejectDelta
|
|
|
|
// Pre-check expiry here to prevent inserting an invoice that will not
|
|
// be settled.
|
|
if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
|
|
return errors.New("final expiry too soon")
|
|
}
|
|
|
|
// The invoice database indexes all invoices by payment address, however
|
|
// legacy keysend payment do not have one. In order to avoid a new
|
|
// payment type on-disk wrt. to indexing, we'll continue to insert a
|
|
// blank payment address which is special cased in the insertion logic
|
|
// to not be indexed. In the future, once AMP is merged, this should be
|
|
// replaced by generating a random payment address on the behalf of the
|
|
// sender.
|
|
payAddr := BlankPayAddr
|
|
|
|
// Create placeholder invoice.
|
|
invoice := &Invoice{
|
|
CreationDate: i.cfg.Clock.Now(),
|
|
Terms: ContractTerm{
|
|
FinalCltvDelta: finalCltvDelta,
|
|
Value: amt,
|
|
PaymentPreimage: &preimage,
|
|
PaymentAddr: payAddr,
|
|
Features: features,
|
|
},
|
|
}
|
|
|
|
if i.cfg.KeysendHoldTime != 0 {
|
|
invoice.HodlInvoice = true
|
|
invoice.Terms.Expiry = i.cfg.KeysendHoldTime
|
|
}
|
|
|
|
// Insert invoice into database. Ignore duplicates, because this
|
|
// may be a replay.
|
|
_, err = i.AddInvoice(context.Background(), invoice, ctx.hash)
|
|
if err != nil && !errors.Is(err, ErrDuplicateInvoice) {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// processAMP just-in-time inserts an invoice if this htlc is a keysend
|
|
// htlc.
|
|
func (i *InvoiceRegistry) processAMP(ctx invoiceUpdateCtx) error {
|
|
// AMP payments MUST also include an MPP record.
|
|
if ctx.mpp == nil {
|
|
return errors.New("no MPP record for AMP")
|
|
}
|
|
|
|
// Create an invoice for the total amount expected, provided in the MPP
|
|
// record.
|
|
amt := ctx.mpp.TotalMsat()
|
|
|
|
// Set the TLV required and MPP optional features on the invoice. We'll
|
|
// also make the AMP features required so that it can't be paid by
|
|
// legacy or MPP htlcs.
|
|
rawFeatures := lnwire.NewRawFeatureVector(
|
|
lnwire.TLVOnionPayloadRequired,
|
|
lnwire.PaymentAddrOptional,
|
|
lnwire.AMPRequired,
|
|
)
|
|
features := lnwire.NewFeatureVector(rawFeatures, lnwire.Features)
|
|
|
|
// Use the minimum block delta that we require for settling htlcs.
|
|
finalCltvDelta := i.cfg.FinalCltvRejectDelta
|
|
|
|
// Pre-check expiry here to prevent inserting an invoice that will not
|
|
// be settled.
|
|
if ctx.expiry < uint32(ctx.currentHeight+finalCltvDelta) {
|
|
return errors.New("final expiry too soon")
|
|
}
|
|
|
|
// We'll use the sender-generated payment address provided in the HTLC
|
|
// to create our AMP invoice.
|
|
payAddr := ctx.mpp.PaymentAddr()
|
|
|
|
// Create placeholder invoice.
|
|
invoice := &Invoice{
|
|
CreationDate: i.cfg.Clock.Now(),
|
|
Terms: ContractTerm{
|
|
FinalCltvDelta: finalCltvDelta,
|
|
Value: amt,
|
|
PaymentPreimage: nil,
|
|
PaymentAddr: payAddr,
|
|
Features: features,
|
|
},
|
|
}
|
|
|
|
// Insert invoice into database. Ignore duplicates payment hashes and
|
|
// payment addrs, this may be a replay or a different HTLC for the AMP
|
|
// invoice.
|
|
_, err := i.AddInvoice(context.Background(), invoice, ctx.hash)
|
|
isDuplicatedInvoice := errors.Is(err, ErrDuplicateInvoice)
|
|
isDuplicatedPayAddr := errors.Is(err, ErrDuplicatePayAddr)
|
|
switch {
|
|
case isDuplicatedInvoice || isDuplicatedPayAddr:
|
|
return nil
|
|
default:
|
|
return err
|
|
}
|
|
}
|
|
|
|
// NotifyExitHopHtlc attempts to mark an invoice as settled. The return value
|
|
// describes how the htlc should be resolved.
|
|
//
|
|
// When the preimage of the invoice is not yet known (hodl invoice), this
|
|
// function moves the invoice to the accepted state. When SettleHoldInvoice is
|
|
// called later, a resolution message will be send back to the caller via the
|
|
// provided hodlChan. Invoice registry sends on this channel what action needs
|
|
// to be taken on the htlc (settle or cancel). The caller needs to ensure that
|
|
// the channel is either buffered or received on from another goroutine to
|
|
// prevent deadlock.
|
|
//
|
|
// In the case that the htlc is part of a larger set of htlcs that pay to the
|
|
// same invoice (multi-path payment), the htlc is held until the set is
|
|
// complete. If the set doesn't fully arrive in time, a timer will cancel the
|
|
// held htlc.
|
|
func (i *InvoiceRegistry) NotifyExitHopHtlc(rHash lntypes.Hash,
|
|
amtPaid lnwire.MilliSatoshi, expiry uint32, currentHeight int32,
|
|
circuitKey CircuitKey, hodlChan chan<- interface{},
|
|
payload Payload) (HtlcResolution, error) {
|
|
|
|
// Create the update context containing the relevant details of the
|
|
// incoming htlc.
|
|
ctx := invoiceUpdateCtx{
|
|
hash: rHash,
|
|
circuitKey: circuitKey,
|
|
amtPaid: amtPaid,
|
|
expiry: expiry,
|
|
currentHeight: currentHeight,
|
|
finalCltvRejectDelta: i.cfg.FinalCltvRejectDelta,
|
|
customRecords: payload.CustomRecords(),
|
|
mpp: payload.MultiPath(),
|
|
amp: payload.AMPRecord(),
|
|
metadata: payload.Metadata(),
|
|
}
|
|
|
|
switch {
|
|
// If we are accepting spontaneous AMP payments and this payload
|
|
// contains an AMP record, create an AMP invoice that will be settled
|
|
// below.
|
|
case i.cfg.AcceptAMP && ctx.amp != nil:
|
|
err := i.processAMP(ctx)
|
|
if err != nil {
|
|
ctx.log(fmt.Sprintf("amp error: %v", err))
|
|
|
|
return NewFailResolution(
|
|
circuitKey, currentHeight, ResultAmpError,
|
|
), nil
|
|
}
|
|
|
|
// If we are accepting spontaneous keysend payments, create a regular
|
|
// invoice that will be settled below. We also enforce that this is only
|
|
// done when no AMP payload is present since it will only be settle-able
|
|
// by regular HTLCs.
|
|
case i.cfg.AcceptKeySend && ctx.amp == nil:
|
|
err := i.processKeySend(ctx)
|
|
if err != nil {
|
|
ctx.log(fmt.Sprintf("keysend error: %v", err))
|
|
|
|
return NewFailResolution(
|
|
circuitKey, currentHeight, ResultKeySendError,
|
|
), nil
|
|
}
|
|
}
|
|
|
|
// Execute locked notify exit hop logic.
|
|
i.Lock()
|
|
resolution, invoiceToExpire, err := i.notifyExitHopHtlcLocked(
|
|
&ctx, hodlChan,
|
|
)
|
|
i.Unlock()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if invoiceToExpire != nil {
|
|
i.expiryWatcher.AddInvoices(invoiceToExpire)
|
|
}
|
|
|
|
switch r := resolution.(type) {
|
|
// The htlc is held. Start a timer outside the lock if the htlc should
|
|
// be auto-released, because otherwise a deadlock may happen with the
|
|
// main event loop.
|
|
case *htlcAcceptResolution:
|
|
if r.autoRelease {
|
|
var invRef InvoiceRef
|
|
if ctx.amp != nil {
|
|
invRef = InvoiceRefBySetID(*ctx.setID())
|
|
} else {
|
|
invRef = ctx.invoiceRef()
|
|
}
|
|
|
|
err := i.startHtlcTimer(
|
|
invRef, circuitKey, r.acceptTime,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// We return a nil resolution because htlc acceptances are
|
|
// represented as nil resolutions externally.
|
|
// TODO(carla) update calling code to handle accept resolutions.
|
|
return nil, nil
|
|
|
|
// A direct resolution was received for this htlc.
|
|
case HtlcResolution:
|
|
return r, nil
|
|
|
|
// Fail if an unknown resolution type was received.
|
|
default:
|
|
return nil, errors.New("invalid resolution type")
|
|
}
|
|
}
|
|
|
|
// notifyExitHopHtlcLocked is the internal implementation of NotifyExitHopHtlc
|
|
// that should be executed inside the registry lock. The returned invoiceExpiry
|
|
// (if not nil) needs to be added to the expiry watcher outside of the lock.
|
|
func (i *InvoiceRegistry) notifyExitHopHtlcLocked(
|
|
ctx *invoiceUpdateCtx, hodlChan chan<- interface{}) (
|
|
HtlcResolution, invoiceExpiry, error) {
|
|
|
|
// We'll attempt to settle an invoice matching this rHash on disk (if
|
|
// one exists). The callback will update the invoice state and/or htlcs.
|
|
var (
|
|
resolution HtlcResolution
|
|
updateSubscribers bool
|
|
)
|
|
|
|
callback := func(inv *Invoice) (*InvoiceUpdateDesc, error) {
|
|
updateDesc, res, err := updateInvoice(ctx, inv)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Only send an update if the invoice state was changed.
|
|
updateSubscribers = updateDesc != nil &&
|
|
updateDesc.State != nil
|
|
|
|
// Assign resolution to outer scope variable.
|
|
resolution = res
|
|
|
|
return updateDesc, nil
|
|
}
|
|
|
|
invoiceRef := ctx.invoiceRef()
|
|
setID := (*SetID)(ctx.setID())
|
|
invoice, err := i.idb.UpdateInvoice(
|
|
context.Background(), invoiceRef, setID, callback,
|
|
)
|
|
|
|
var duplicateSetIDErr ErrDuplicateSetID
|
|
if errors.As(err, &duplicateSetIDErr) {
|
|
return NewFailResolution(
|
|
ctx.circuitKey, ctx.currentHeight,
|
|
ResultInvoiceNotFound,
|
|
), nil, nil
|
|
}
|
|
|
|
switch err {
|
|
case ErrInvoiceNotFound:
|
|
// If the invoice was not found, return a failure resolution
|
|
// with an invoice not found result.
|
|
return NewFailResolution(
|
|
ctx.circuitKey, ctx.currentHeight,
|
|
ResultInvoiceNotFound,
|
|
), nil, nil
|
|
|
|
case ErrInvRefEquivocation:
|
|
return NewFailResolution(
|
|
ctx.circuitKey, ctx.currentHeight,
|
|
ResultInvoiceNotFound,
|
|
), nil, nil
|
|
|
|
case nil:
|
|
|
|
default:
|
|
ctx.log(err.Error())
|
|
return nil, nil, err
|
|
}
|
|
|
|
var invoiceToExpire invoiceExpiry
|
|
|
|
switch res := resolution.(type) {
|
|
case *HtlcFailResolution:
|
|
// Inspect latest htlc state on the invoice. If it is found,
|
|
// we will update the accept height as it was recorded in the
|
|
// invoice database (which occurs in the case where the htlc
|
|
// reached the database in a previous call). If the htlc was
|
|
// not found on the invoice, it was immediately failed so we
|
|
// send the failure resolution as is, which has the current
|
|
// height set as the accept height.
|
|
invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
|
|
if ok {
|
|
res.AcceptHeight = int32(invoiceHtlc.AcceptHeight)
|
|
}
|
|
|
|
ctx.log(fmt.Sprintf("failure resolution result "+
|
|
"outcome: %v, at accept height: %v",
|
|
res.Outcome, res.AcceptHeight))
|
|
|
|
// Some failures apply to the entire HTLC set. Break here if
|
|
// this isn't one of them.
|
|
if !res.Outcome.IsSetFailure() {
|
|
break
|
|
}
|
|
|
|
// Also cancel any HTLCs in the HTLC set that are also in the
|
|
// canceled state with the same failure result.
|
|
setID := ctx.setID()
|
|
canceledHtlcSet := invoice.HTLCSet(setID, HtlcStateCanceled)
|
|
for key, htlc := range canceledHtlcSet {
|
|
htlcFailResolution := NewFailResolution(
|
|
key, int32(htlc.AcceptHeight), res.Outcome,
|
|
)
|
|
|
|
i.notifyHodlSubscribers(htlcFailResolution)
|
|
}
|
|
|
|
// If the htlc was settled, we will settle any previously accepted
|
|
// htlcs and notify our peer to settle them.
|
|
case *HtlcSettleResolution:
|
|
ctx.log(fmt.Sprintf("settle resolution result "+
|
|
"outcome: %v, at accept height: %v",
|
|
res.Outcome, res.AcceptHeight))
|
|
|
|
// Also settle any previously accepted htlcs. If a htlc is
|
|
// marked as settled, we should follow now and settle the htlc
|
|
// with our peer.
|
|
setID := ctx.setID()
|
|
settledHtlcSet := invoice.HTLCSet(setID, HtlcStateSettled)
|
|
for key, htlc := range settledHtlcSet {
|
|
preimage := res.Preimage
|
|
if htlc.AMP != nil && htlc.AMP.Preimage != nil {
|
|
preimage = *htlc.AMP.Preimage
|
|
}
|
|
|
|
// Notify subscribers that the htlcs should be settled
|
|
// with our peer. Note that the outcome of the
|
|
// resolution is set based on the outcome of the single
|
|
// htlc that we just settled, so may not be accurate
|
|
// for all htlcs.
|
|
htlcSettleResolution := NewSettleResolution(
|
|
preimage, key,
|
|
int32(htlc.AcceptHeight), res.Outcome,
|
|
)
|
|
|
|
// Notify subscribers that the htlc should be settled
|
|
// with our peer.
|
|
i.notifyHodlSubscribers(htlcSettleResolution)
|
|
}
|
|
|
|
// If concurrent payments were attempted to this invoice before
|
|
// the current one was ultimately settled, cancel back any of
|
|
// the HTLCs immediately. As a result of the settle, the HTLCs
|
|
// in other HTLC sets are automatically converted to a canceled
|
|
// state when updating the invoice.
|
|
//
|
|
// TODO(roasbeef): can remove now??
|
|
canceledHtlcSet := invoice.HTLCSetCompliment(
|
|
setID, HtlcStateCanceled,
|
|
)
|
|
for key, htlc := range canceledHtlcSet {
|
|
htlcFailResolution := NewFailResolution(
|
|
key, int32(htlc.AcceptHeight),
|
|
ResultInvoiceAlreadySettled,
|
|
)
|
|
|
|
i.notifyHodlSubscribers(htlcFailResolution)
|
|
}
|
|
|
|
// If we accepted the htlc, subscribe to the hodl invoice and return
|
|
// an accept resolution with the htlc's accept time on it.
|
|
case *htlcAcceptResolution:
|
|
invoiceHtlc, ok := invoice.Htlcs[ctx.circuitKey]
|
|
if !ok {
|
|
return nil, nil, fmt.Errorf("accepted htlc: %v not"+
|
|
" present on invoice: %x", ctx.circuitKey,
|
|
ctx.hash[:])
|
|
}
|
|
|
|
// Determine accepted height of this htlc. If the htlc reached
|
|
// the invoice database (possibly in a previous call to the
|
|
// invoice registry), we'll take the original accepted height
|
|
// as it was recorded in the database.
|
|
acceptHeight := int32(invoiceHtlc.AcceptHeight)
|
|
|
|
ctx.log(fmt.Sprintf("accept resolution result "+
|
|
"outcome: %v, at accept height: %v",
|
|
res.outcome, acceptHeight))
|
|
|
|
// Auto-release the htlc if the invoice is still open. It can
|
|
// only happen for mpp payments that there are htlcs in state
|
|
// Accepted while the invoice is Open.
|
|
if invoice.State == ContractOpen {
|
|
res.acceptTime = invoiceHtlc.AcceptTime
|
|
res.autoRelease = true
|
|
}
|
|
|
|
// If we have fully accepted the set of htlcs for this invoice,
|
|
// we can now add it to our invoice expiry watcher. We do not
|
|
// add invoices before they are fully accepted, because it is
|
|
// possible that we MppTimeout the htlcs, and then our relevant
|
|
// expiry height could change.
|
|
if res.outcome == resultAccepted {
|
|
invoiceToExpire = makeInvoiceExpiry(ctx.hash, invoice)
|
|
}
|
|
|
|
i.hodlSubscribe(hodlChan, ctx.circuitKey)
|
|
|
|
default:
|
|
panic("unknown action")
|
|
}
|
|
|
|
// Now that the links have been notified of any state changes to their
|
|
// HTLCs, we'll go ahead and notify any clients wiaiting on the invoice
|
|
// state changes.
|
|
if updateSubscribers {
|
|
// We'll add a setID onto the notification, but only if this is
|
|
// an AMP invoice being settled.
|
|
var setID *[32]byte
|
|
if _, ok := resolution.(*HtlcSettleResolution); ok {
|
|
setID = ctx.setID()
|
|
}
|
|
|
|
i.notifyClients(ctx.hash, invoice, setID)
|
|
}
|
|
|
|
return resolution, invoiceToExpire, nil
|
|
}
|
|
|
|
// SettleHodlInvoice sets the preimage of a hodl invoice.
|
|
func (i *InvoiceRegistry) SettleHodlInvoice(ctx context.Context,
|
|
preimage lntypes.Preimage) error {
|
|
|
|
i.Lock()
|
|
defer i.Unlock()
|
|
|
|
updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
|
|
switch invoice.State {
|
|
case ContractOpen:
|
|
return nil, ErrInvoiceStillOpen
|
|
|
|
case ContractCanceled:
|
|
return nil, ErrInvoiceAlreadyCanceled
|
|
|
|
case ContractSettled:
|
|
return nil, ErrInvoiceAlreadySettled
|
|
}
|
|
|
|
return &InvoiceUpdateDesc{
|
|
UpdateType: SettleHodlInvoiceUpdate,
|
|
State: &InvoiceStateUpdateDesc{
|
|
NewState: ContractSettled,
|
|
Preimage: &preimage,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
hash := preimage.Hash()
|
|
invoiceRef := InvoiceRefByHash(hash)
|
|
invoice, err := i.idb.UpdateInvoice(ctx, invoiceRef, nil, updateInvoice)
|
|
if err != nil {
|
|
log.Errorf("SettleHodlInvoice with preimage %v: %v",
|
|
preimage, err)
|
|
|
|
return err
|
|
}
|
|
|
|
log.Debugf("Invoice%v: settled with preimage %v", invoiceRef,
|
|
invoice.Terms.PaymentPreimage)
|
|
|
|
// In the callback, we marked the invoice as settled. UpdateInvoice will
|
|
// have seen this and should have moved all htlcs that were accepted to
|
|
// the settled state. In the loop below, we go through all of these and
|
|
// notify links and resolvers that are waiting for resolution. Any htlcs
|
|
// that were already settled before, will be notified again. This isn't
|
|
// necessary but doesn't hurt either.
|
|
for key, htlc := range invoice.Htlcs {
|
|
if htlc.State != HtlcStateSettled {
|
|
continue
|
|
}
|
|
|
|
resolution := NewSettleResolution(
|
|
preimage, key, int32(htlc.AcceptHeight), ResultSettled,
|
|
)
|
|
|
|
i.notifyHodlSubscribers(resolution)
|
|
}
|
|
i.notifyClients(hash, invoice, nil)
|
|
|
|
return nil
|
|
}
|
|
|
|
// CancelInvoice attempts to cancel the invoice corresponding to the passed
|
|
// payment hash.
|
|
func (i *InvoiceRegistry) CancelInvoice(ctx context.Context,
|
|
payHash lntypes.Hash) error {
|
|
|
|
return i.cancelInvoiceImpl(ctx, payHash, true)
|
|
}
|
|
|
|
// shouldCancel examines the state of an invoice and whether we want to
|
|
// cancel already accepted invoices, taking our force cancel boolean into
|
|
// account. This is pulled out into its own function so that tests that mock
|
|
// cancelInvoiceImpl can reuse this logic.
|
|
func shouldCancel(state ContractState, cancelAccepted bool) bool {
|
|
if state != ContractAccepted {
|
|
return true
|
|
}
|
|
|
|
// If the invoice is accepted, we should only cancel if we want to
|
|
// force cancellation of accepted invoices.
|
|
return cancelAccepted
|
|
}
|
|
|
|
// cancelInvoice attempts to cancel the invoice corresponding to the passed
|
|
// payment hash. Accepted invoices will only be canceled if explicitly
|
|
// requested to do so. It notifies subscribing links and resolvers that
|
|
// the associated htlcs were canceled if they change state.
|
|
func (i *InvoiceRegistry) cancelInvoiceImpl(ctx context.Context,
|
|
payHash lntypes.Hash, cancelAccepted bool) error {
|
|
|
|
i.Lock()
|
|
defer i.Unlock()
|
|
|
|
ref := InvoiceRefByHash(payHash)
|
|
log.Debugf("Invoice%v: canceling invoice", ref)
|
|
|
|
updateInvoice := func(invoice *Invoice) (*InvoiceUpdateDesc, error) {
|
|
if !shouldCancel(invoice.State, cancelAccepted) {
|
|
return nil, nil
|
|
}
|
|
|
|
// Move invoice to the canceled state. Rely on validation in
|
|
// channeldb to return an error if the invoice is already
|
|
// settled or canceled.
|
|
return &InvoiceUpdateDesc{
|
|
UpdateType: CancelInvoiceUpdate,
|
|
State: &InvoiceStateUpdateDesc{
|
|
NewState: ContractCanceled,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
invoiceRef := InvoiceRefByHash(payHash)
|
|
invoice, err := i.idb.UpdateInvoice(ctx, invoiceRef, nil, updateInvoice)
|
|
|
|
// Implement idempotency by returning success if the invoice was already
|
|
// canceled.
|
|
if errors.Is(err, ErrInvoiceAlreadyCanceled) {
|
|
log.Debugf("Invoice%v: already canceled", ref)
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Return without cancellation if the invoice state is ContractAccepted.
|
|
if invoice.State == ContractAccepted {
|
|
log.Debugf("Invoice%v: remains accepted as cancel wasn't"+
|
|
"explicitly requested.", ref)
|
|
return nil
|
|
}
|
|
|
|
log.Debugf("Invoice%v: canceled", ref)
|
|
|
|
// In the callback, some htlcs may have been moved to the canceled
|
|
// state. We now go through all of these and notify links and resolvers
|
|
// that are waiting for resolution. Any htlcs that were already canceled
|
|
// before, will be notified again. This isn't necessary but doesn't hurt
|
|
// either.
|
|
for key, htlc := range invoice.Htlcs {
|
|
if htlc.State != HtlcStateCanceled {
|
|
continue
|
|
}
|
|
|
|
i.notifyHodlSubscribers(
|
|
NewFailResolution(
|
|
key, int32(htlc.AcceptHeight), ResultCanceled,
|
|
),
|
|
)
|
|
}
|
|
i.notifyClients(payHash, invoice, nil)
|
|
|
|
// Attempt to also delete the invoice if requested through the registry
|
|
// config.
|
|
if i.cfg.GcCanceledInvoicesOnTheFly {
|
|
// Assemble the delete reference and attempt to delete through
|
|
// the invocice from the DB.
|
|
deleteRef := InvoiceDeleteRef{
|
|
PayHash: payHash,
|
|
AddIndex: invoice.AddIndex,
|
|
SettleIndex: invoice.SettleIndex,
|
|
}
|
|
if invoice.Terms.PaymentAddr != BlankPayAddr {
|
|
deleteRef.PayAddr = &invoice.Terms.PaymentAddr
|
|
}
|
|
|
|
err = i.idb.DeleteInvoice(ctx, []InvoiceDeleteRef{deleteRef})
|
|
// If by any chance deletion failed, then log it instead of
|
|
// returning the error, as the invoice itself has already been
|
|
// canceled.
|
|
if err != nil {
|
|
log.Warnf("Invoice %v could not be deleted: %v", ref,
|
|
err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// notifyClients notifies all currently registered invoice notification clients
|
|
// of a newly added/settled invoice.
|
|
func (i *InvoiceRegistry) notifyClients(hash lntypes.Hash,
|
|
invoice *Invoice, setID *[32]byte) {
|
|
|
|
event := &invoiceEvent{
|
|
invoice: invoice,
|
|
hash: hash,
|
|
setID: setID,
|
|
}
|
|
|
|
select {
|
|
case i.invoiceEvents <- event:
|
|
case <-i.quit:
|
|
}
|
|
}
|
|
|
|
// invoiceSubscriptionKit defines that are common to both all invoice
|
|
// subscribers and single invoice subscribers.
|
|
type invoiceSubscriptionKit struct {
|
|
id uint32 // nolint:structcheck
|
|
|
|
// quit is a chan mouted to InvoiceRegistry that signals a shutdown.
|
|
quit chan struct{}
|
|
|
|
ntfnQueue *queue.ConcurrentQueue
|
|
|
|
canceled uint32 // To be used atomically.
|
|
cancelChan chan struct{}
|
|
|
|
// backlogDelivered is closed when the backlog events have been
|
|
// delivered.
|
|
backlogDelivered chan struct{}
|
|
}
|
|
|
|
// InvoiceSubscription represents an intent to receive updates for newly added
|
|
// or settled invoices. For each newly added invoice, a copy of the invoice
|
|
// will be sent over the NewInvoices channel. Similarly, for each newly settled
|
|
// invoice, a copy of the invoice will be sent over the SettledInvoices
|
|
// channel.
|
|
type InvoiceSubscription struct {
|
|
invoiceSubscriptionKit
|
|
|
|
// NewInvoices is a channel that we'll use to send all newly created
|
|
// invoices with an invoice index greater than the specified
|
|
// StartingInvoiceIndex field.
|
|
NewInvoices chan *Invoice
|
|
|
|
// SettledInvoices is a channel that we'll use to send all settled
|
|
// invoices with an invoices index greater than the specified
|
|
// StartingInvoiceIndex field.
|
|
SettledInvoices chan *Invoice
|
|
|
|
// addIndex is the highest add index the caller knows of. We'll use
|
|
// this information to send out an event backlog to the notifications
|
|
// subscriber. Any new add events with an index greater than this will
|
|
// be dispatched before any new notifications are sent out.
|
|
addIndex uint64
|
|
|
|
// settleIndex is the highest settle index the caller knows of. We'll
|
|
// use this information to send out an event backlog to the
|
|
// notifications subscriber. Any new settle events with an index
|
|
// greater than this will be dispatched before any new notifications
|
|
// are sent out.
|
|
settleIndex uint64
|
|
}
|
|
|
|
// SingleInvoiceSubscription represents an intent to receive updates for a
|
|
// specific invoice.
|
|
type SingleInvoiceSubscription struct {
|
|
invoiceSubscriptionKit
|
|
|
|
invoiceRef InvoiceRef
|
|
|
|
// Updates is a channel that we'll use to send all invoice events for
|
|
// the invoice that is subscribed to.
|
|
Updates chan *Invoice
|
|
}
|
|
|
|
// PayHash returns the optional payment hash of the target invoice.
|
|
//
|
|
// TODO(positiveblue): This method is only supposed to be used in tests. It will
|
|
// be deleted as soon as invoiceregistery_test is in the same module.
|
|
func (s *SingleInvoiceSubscription) PayHash() *lntypes.Hash {
|
|
return s.invoiceRef.PayHash()
|
|
}
|
|
|
|
// Cancel unregisters the InvoiceSubscription, freeing any previously allocated
|
|
// resources.
|
|
func (i *invoiceSubscriptionKit) Cancel() {
|
|
if !atomic.CompareAndSwapUint32(&i.canceled, 0, 1) {
|
|
return
|
|
}
|
|
|
|
i.ntfnQueue.Stop()
|
|
close(i.cancelChan)
|
|
}
|
|
|
|
func (i *invoiceSubscriptionKit) notify(event *invoiceEvent) error {
|
|
select {
|
|
case i.ntfnQueue.ChanIn() <- event:
|
|
|
|
case <-i.cancelChan:
|
|
// This can only be triggered by delivery of non-backlog
|
|
// events.
|
|
return ErrShuttingDown
|
|
case <-i.quit:
|
|
return ErrShuttingDown
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SubscribeNotifications returns an InvoiceSubscription which allows the
|
|
// caller to receive async notifications when any invoices are settled or
|
|
// added. The invoiceIndex parameter is a streaming "checkpoint". We'll start
|
|
// by first sending out all new events with an invoice index _greater_ than
|
|
// this value. Afterwards, we'll send out real-time notifications.
|
|
func (i *InvoiceRegistry) SubscribeNotifications(ctx context.Context,
|
|
addIndex, settleIndex uint64) (*InvoiceSubscription, error) {
|
|
|
|
client := &InvoiceSubscription{
|
|
NewInvoices: make(chan *Invoice),
|
|
SettledInvoices: make(chan *Invoice),
|
|
addIndex: addIndex,
|
|
settleIndex: settleIndex,
|
|
invoiceSubscriptionKit: invoiceSubscriptionKit{
|
|
quit: i.quit,
|
|
ntfnQueue: queue.NewConcurrentQueue(20),
|
|
cancelChan: make(chan struct{}),
|
|
backlogDelivered: make(chan struct{}),
|
|
},
|
|
}
|
|
client.ntfnQueue.Start()
|
|
|
|
// This notifies other goroutines that the backlog phase is over.
|
|
defer close(client.backlogDelivered)
|
|
|
|
// Always increment by 1 first, and our client ID will start with 1,
|
|
// not 0.
|
|
client.id = atomic.AddUint32(&i.nextClientID, 1)
|
|
|
|
// Before we register this new invoice subscription, we'll launch a new
|
|
// goroutine that will proxy all notifications appended to the end of
|
|
// the concurrent queue to the two client-side channels the caller will
|
|
// feed off of.
|
|
i.wg.Add(1)
|
|
go func() {
|
|
defer i.wg.Done()
|
|
defer i.deleteClient(client.id)
|
|
|
|
for {
|
|
select {
|
|
// A new invoice event has been sent by the
|
|
// invoiceRegistry! We'll figure out if this is an add
|
|
// event or a settle event, then dispatch the event to
|
|
// the client.
|
|
case ntfn := <-client.ntfnQueue.ChanOut():
|
|
invoiceEvent := ntfn.(*invoiceEvent)
|
|
|
|
var targetChan chan *Invoice
|
|
state := invoiceEvent.invoice.State
|
|
switch {
|
|
// AMP invoices never move to settled, but will
|
|
// be sent with a set ID if an HTLC set is
|
|
// being settled.
|
|
case state == ContractOpen &&
|
|
invoiceEvent.setID != nil:
|
|
fallthrough
|
|
|
|
case state == ContractSettled:
|
|
targetChan = client.SettledInvoices
|
|
|
|
case state == ContractOpen:
|
|
targetChan = client.NewInvoices
|
|
|
|
default:
|
|
log.Errorf("unknown invoice state: %v",
|
|
state)
|
|
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case targetChan <- invoiceEvent.invoice:
|
|
|
|
case <-client.cancelChan:
|
|
return
|
|
|
|
case <-i.quit:
|
|
return
|
|
}
|
|
|
|
case <-client.cancelChan:
|
|
return
|
|
|
|
case <-i.quit:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
i.notificationClientMux.Lock()
|
|
i.notificationClients[client.id] = client
|
|
i.notificationClientMux.Unlock()
|
|
|
|
// Query the database to see if based on the provided addIndex and
|
|
// settledIndex we need to deliver any backlog notifications.
|
|
err := i.deliverBacklogEvents(ctx, client)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log.Infof("New invoice subscription client: id=%v", client.id)
|
|
|
|
return client, nil
|
|
}
|
|
|
|
// SubscribeSingleInvoice returns an SingleInvoiceSubscription which allows the
|
|
// caller to receive async notifications for a specific invoice.
|
|
func (i *InvoiceRegistry) SubscribeSingleInvoice(ctx context.Context,
|
|
hash lntypes.Hash) (*SingleInvoiceSubscription, error) {
|
|
|
|
client := &SingleInvoiceSubscription{
|
|
Updates: make(chan *Invoice),
|
|
invoiceSubscriptionKit: invoiceSubscriptionKit{
|
|
quit: i.quit,
|
|
ntfnQueue: queue.NewConcurrentQueue(20),
|
|
cancelChan: make(chan struct{}),
|
|
backlogDelivered: make(chan struct{}),
|
|
},
|
|
invoiceRef: InvoiceRefByHash(hash),
|
|
}
|
|
client.ntfnQueue.Start()
|
|
|
|
// This notifies other goroutines that the backlog phase is done.
|
|
defer close(client.backlogDelivered)
|
|
|
|
// Always increment by 1 first, and our client ID will start with 1,
|
|
// not 0.
|
|
client.id = atomic.AddUint32(&i.nextClientID, 1)
|
|
|
|
// Before we register this new invoice subscription, we'll launch a new
|
|
// goroutine that will proxy all notifications appended to the end of
|
|
// the concurrent queue to the two client-side channels the caller will
|
|
// feed off of.
|
|
i.wg.Add(1)
|
|
go func() {
|
|
defer i.wg.Done()
|
|
defer i.deleteClient(client.id)
|
|
|
|
for {
|
|
select {
|
|
// A new invoice event has been sent by the
|
|
// invoiceRegistry. We will dispatch the event to the
|
|
// client.
|
|
case ntfn := <-client.ntfnQueue.ChanOut():
|
|
invoiceEvent := ntfn.(*invoiceEvent)
|
|
|
|
select {
|
|
case client.Updates <- invoiceEvent.invoice:
|
|
|
|
case <-client.cancelChan:
|
|
return
|
|
|
|
case <-i.quit:
|
|
return
|
|
}
|
|
|
|
case <-client.cancelChan:
|
|
return
|
|
|
|
case <-i.quit:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
i.notificationClientMux.Lock()
|
|
i.singleNotificationClients[client.id] = client
|
|
i.notificationClientMux.Unlock()
|
|
|
|
err := i.deliverSingleBacklogEvents(ctx, client)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
log.Infof("New single invoice subscription client: id=%v, ref=%v",
|
|
client.id, client.invoiceRef)
|
|
|
|
return client, nil
|
|
}
|
|
|
|
// notifyHodlSubscribers sends out the htlc resolution to all current
|
|
// subscribers.
|
|
func (i *InvoiceRegistry) notifyHodlSubscribers(htlcResolution HtlcResolution) {
|
|
i.hodlSubscriptionsMux.Lock()
|
|
defer i.hodlSubscriptionsMux.Unlock()
|
|
|
|
subscribers, ok := i.hodlSubscriptions[htlcResolution.CircuitKey()]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// Notify all interested subscribers and remove subscription from both
|
|
// maps. The subscription can be removed as there only ever will be a
|
|
// single resolution for each hash.
|
|
for subscriber := range subscribers {
|
|
select {
|
|
case subscriber <- htlcResolution:
|
|
case <-i.quit:
|
|
return
|
|
}
|
|
|
|
delete(
|
|
i.hodlReverseSubscriptions[subscriber],
|
|
htlcResolution.CircuitKey(),
|
|
)
|
|
}
|
|
|
|
delete(i.hodlSubscriptions, htlcResolution.CircuitKey())
|
|
}
|
|
|
|
// hodlSubscribe adds a new invoice subscription.
|
|
func (i *InvoiceRegistry) hodlSubscribe(subscriber chan<- interface{},
|
|
circuitKey CircuitKey) {
|
|
|
|
i.hodlSubscriptionsMux.Lock()
|
|
defer i.hodlSubscriptionsMux.Unlock()
|
|
|
|
log.Debugf("Hodl subscribe for %v", circuitKey)
|
|
|
|
subscriptions, ok := i.hodlSubscriptions[circuitKey]
|
|
if !ok {
|
|
subscriptions = make(map[chan<- interface{}]struct{})
|
|
i.hodlSubscriptions[circuitKey] = subscriptions
|
|
}
|
|
subscriptions[subscriber] = struct{}{}
|
|
|
|
reverseSubscriptions, ok := i.hodlReverseSubscriptions[subscriber]
|
|
if !ok {
|
|
reverseSubscriptions = make(map[CircuitKey]struct{})
|
|
i.hodlReverseSubscriptions[subscriber] = reverseSubscriptions
|
|
}
|
|
reverseSubscriptions[circuitKey] = struct{}{}
|
|
}
|
|
|
|
// HodlUnsubscribeAll cancels the subscription.
|
|
func (i *InvoiceRegistry) HodlUnsubscribeAll(subscriber chan<- interface{}) {
|
|
i.hodlSubscriptionsMux.Lock()
|
|
defer i.hodlSubscriptionsMux.Unlock()
|
|
|
|
hashes := i.hodlReverseSubscriptions[subscriber]
|
|
for hash := range hashes {
|
|
delete(i.hodlSubscriptions[hash], subscriber)
|
|
}
|
|
|
|
delete(i.hodlReverseSubscriptions, subscriber)
|
|
}
|
|
|
|
// copySingleClients copies i.SingleInvoiceSubscription inside a lock. This is
|
|
// useful when we need to iterate the map to send notifications.
|
|
func (i *InvoiceRegistry) copySingleClients() map[uint32]*SingleInvoiceSubscription { //nolint:lll
|
|
i.notificationClientMux.RLock()
|
|
defer i.notificationClientMux.RUnlock()
|
|
|
|
clients := make(map[uint32]*SingleInvoiceSubscription)
|
|
for k, v := range i.singleNotificationClients {
|
|
clients[k] = v
|
|
}
|
|
return clients
|
|
}
|
|
|
|
// copyClients copies i.notificationClients inside a lock. This is useful when
|
|
// we need to iterate the map to send notifications.
|
|
func (i *InvoiceRegistry) copyClients() map[uint32]*InvoiceSubscription {
|
|
i.notificationClientMux.RLock()
|
|
defer i.notificationClientMux.RUnlock()
|
|
|
|
clients := make(map[uint32]*InvoiceSubscription)
|
|
for k, v := range i.notificationClients {
|
|
clients[k] = v
|
|
}
|
|
return clients
|
|
}
|
|
|
|
// deleteClient removes a client by its ID inside a lock. Noop if the client is
|
|
// not found.
|
|
func (i *InvoiceRegistry) deleteClient(clientID uint32) {
|
|
i.notificationClientMux.Lock()
|
|
defer i.notificationClientMux.Unlock()
|
|
|
|
log.Infof("Cancelling invoice subscription for client=%v", clientID)
|
|
delete(i.notificationClients, clientID)
|
|
delete(i.singleNotificationClients, clientID)
|
|
}
|