Fix: If connection to Lightning node was interrupted, payments would be missed (#4865)

This commit is contained in:
Nicolas Dorier 2023-04-10 19:35:01 +09:00 committed by GitHub
parent 516efe56f4
commit a4aa85ebab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 119 additions and 81 deletions

View file

@ -2,9 +2,11 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Amazon.Runtime.Internal;
using BTCPayServer.Client.Models;
using BTCPayServer.Configuration;
using BTCPayServer.Data;
@ -62,107 +64,121 @@ namespace BTCPayServer.Payments.Lightning
Options = options;
}
bool needCheckOfflinePayments = true;
async Task CheckingInvoice(CancellationToken cancellation)
{
while (await _CheckInvoices.Reader.WaitToReadAsync(cancellation) &&
_CheckInvoices.Reader.TryRead(out var invoiceId))
retry:
try
{
try
Logs.PayServer.LogInformation("Checking if any payment arrived on lightning while the server was offline...");
foreach (var invoice in await _InvoiceRepository.GetPendingInvoices(cancellationToken: cancellation))
{
foreach (var listenedInvoice in (await GetListenedInvoices(invoiceId)).Where(i => !i.IsExpired()))
if (GetListenedInvoices(invoice).Count > 0)
{
_CheckInvoices.Writer.TryWrite(invoice.Id);
_memoryCache.Set(GetCacheKey(invoice.Id), invoice, GetExpiration(invoice));
}
}
needCheckOfflinePayments = false;
Logs.PayServer.LogInformation("Processing lightning payments...");
while (await _CheckInvoices.Reader.WaitToReadAsync(cancellation) &&
_CheckInvoices.Reader.TryRead(out var invoiceId))
{
var invoice = await GetInvoice(invoiceId);
foreach (var listenedInvoice in GetListenedInvoices(invoice))
{
var instanceListenerKey = (listenedInvoice.Network.CryptoCode, GetLightningUrl(listenedInvoice.SupportedPaymentMethod).ToString());
if (!_InstanceListeners.TryGetValue(instanceListenerKey, out var instanceListener) ||
!instanceListener.IsListening)
lock (_InstanceListeners)
{
instanceListener ??= new LightningInstanceListener(_InvoiceRepository, _Aggregator, lightningClientFactory, listenedInvoice.Network, GetLightningUrl(listenedInvoice.SupportedPaymentMethod), _paymentService, Logs);
var status = await instanceListener.PollPayment(listenedInvoice, cancellation);
if (status is null ||
status is LightningInvoiceStatus.Paid ||
status is LightningInvoiceStatus.Expired)
if (!_InstanceListeners.TryGetValue(instanceListenerKey, out var instanceListener))
{
continue;
instanceListener ??= new LightningInstanceListener(_InvoiceRepository, _Aggregator, lightningClientFactory, listenedInvoice.Network, GetLightningUrl(listenedInvoice.SupportedPaymentMethod), _paymentService, Logs);
_InstanceListeners.TryAdd(instanceListenerKey, instanceListener);
}
instanceListener.AddListenedInvoice(listenedInvoice);
instanceListener.EnsureListening(cancellation);
_InstanceListeners.TryAdd(instanceListenerKey, instanceListener);
}
else
{
instanceListener.AddListenedInvoice(listenedInvoice);
_ = instanceListener.PollPayment(listenedInvoice, cancellation);
}
}
foreach (var kv in _InstanceListeners)
{
kv.Value.RemoveExpiredInvoices();
}
foreach (var k in _InstanceListeners
.Where(kv => !kv.Value.IsListening)
.Select(kv => kv.Key).ToArray())
{
_InstanceListeners.Remove(k);
}
}
catch when (!_Cts.Token.IsCancellationRequested)
{
if (_CheckInvoices.Reader.Count is 0)
this.CheckConnections();
}
}
catch when (cancellation.IsCancellationRequested)
{
}
catch (Exception ex)
{
await Task.Delay(1000, cancellation);
Logs.PayServer.LogWarning(ex, "Unhandled error in the LightningListener");
goto retry;
}
}
private string GetCacheKey(string invoiceId)
{
return $"{nameof(GetListenedInvoices)}-{invoiceId}";
}
private Task<List<ListenedInvoice>> GetListenedInvoices(string invoiceId)
private Task<InvoiceEntity> GetInvoice(string invoiceId)
{
return _memoryCache.GetOrCreateAsync(GetCacheKey(invoiceId), async (cacheEntry) =>
{
var listenedInvoices = new List<ListenedInvoice>();
var invoice = await _InvoiceRepository.GetInvoice(invoiceId);
foreach (var paymentMethod in invoice.GetPaymentMethods()
.Where(c => new[] { PaymentTypes.LightningLike, LNURLPayPaymentType.Instance }.Contains(c.GetId().PaymentType)))
{
LightningLikePaymentMethodDetails lightningMethod;
LightningSupportedPaymentMethod lightningSupportedMethod;
switch (paymentMethod.GetPaymentMethodDetails())
{
case LNURLPayPaymentMethodDetails lnurlPayPaymentMethodDetails:
{
var invoice = await _InvoiceRepository.GetInvoice(invoiceId);
cacheEntry.AbsoluteExpiration = GetExpiration(invoice);
return invoice;
});
}
lightningMethod = lnurlPayPaymentMethodDetails;
private static DateTimeOffset GetExpiration(InvoiceEntity invoice)
{
var expiredIn = DateTimeOffset.UtcNow - invoice.ExpirationTime;
return DateTimeOffset.UtcNow + (expiredIn >= TimeSpan.FromMinutes(5.0) ? expiredIn : TimeSpan.FromMinutes(5.0));
}
lightningSupportedMethod = lnurlPayPaymentMethodDetails.LightningSupportedPaymentMethod;
private List<ListenedInvoice> GetListenedInvoices(InvoiceEntity invoice)
{
var listenedInvoices = new List<ListenedInvoice>();
foreach (var paymentMethod in invoice.GetPaymentMethods()
.Where(c => new[] { PaymentTypes.LightningLike, LNURLPayPaymentType.Instance }.Contains(c.GetId().PaymentType)))
{
LightningLikePaymentMethodDetails lightningMethod;
LightningSupportedPaymentMethod lightningSupportedMethod;
switch (paymentMethod.GetPaymentMethodDetails())
{
case LNURLPayPaymentMethodDetails lnurlPayPaymentMethodDetails:
break;
case LightningLikePaymentMethodDetails { Activated: true } lightningLikePaymentMethodDetails:
lightningMethod = lightningLikePaymentMethodDetails;
lightningSupportedMethod = invoice.GetSupportedPaymentMethod<LightningSupportedPaymentMethod>()
.FirstOrDefault(c => c.CryptoCode == paymentMethod.GetId().CryptoCode);
lightningMethod = lnurlPayPaymentMethodDetails;
break;
default:
continue;
}
lightningSupportedMethod = lnurlPayPaymentMethodDetails.LightningSupportedPaymentMethod;
if (lightningSupportedMethod == null || string.IsNullOrEmpty(lightningMethod.InvoiceId))
continue;
var network = _NetworkProvider.GetNetwork<BTCPayNetwork>(paymentMethod.GetId().CryptoCode);
break;
case LightningLikePaymentMethodDetails { Activated: true } lightningLikePaymentMethodDetails:
lightningMethod = lightningLikePaymentMethodDetails;
lightningSupportedMethod = invoice.GetSupportedPaymentMethod<LightningSupportedPaymentMethod>()
.FirstOrDefault(c => c.CryptoCode == paymentMethod.GetId().CryptoCode);
listenedInvoices.Add(new ListenedInvoice()
{
Expiration = invoice.ExpirationTime,
Uri = GetLightningUrl(lightningSupportedMethod).BaseUri.AbsoluteUri,
PaymentMethodDetails = lightningMethod,
SupportedPaymentMethod = lightningSupportedMethod,
PaymentMethod = paymentMethod,
Network = network,
InvoiceId = invoice.Id
});
}
var expiredIn = DateTimeOffset.UtcNow - invoice.ExpirationTime;
cacheEntry.AbsoluteExpiration = DateTimeOffset.UtcNow + (expiredIn >= TimeSpan.FromMinutes(5.0) ? expiredIn : TimeSpan.FromMinutes(5.0));
return listenedInvoices;
});
break;
default:
continue;
}
if (lightningSupportedMethod == null || string.IsNullOrEmpty(lightningMethod.InvoiceId))
continue;
var network = _NetworkProvider.GetNetwork<BTCPayNetwork>(paymentMethod.GetId().CryptoCode);
listenedInvoices.Add(new ListenedInvoice()
{
Expiration = invoice.ExpirationTime,
Uri = GetLightningUrl(lightningSupportedMethod).BaseUri.AbsoluteUri,
PaymentMethodDetails = lightningMethod,
SupportedPaymentMethod = lightningSupportedMethod,
PaymentMethod = paymentMethod,
Network = network,
InvoiceId = invoice.Id
});
}
return listenedInvoices;
}
readonly ConcurrentDictionary<string, LightningInstanceListener> _ListeningInstances = new ConcurrentDictionary<string, LightningInstanceListener>();
@ -212,21 +228,34 @@ namespace BTCPayServer.Payments.Lightning
}
}));
_CheckingInvoice = CheckingInvoice(_Cts.Token);
_ListenPoller = new Timer(async s =>
_ListenPoller = new Timer(s =>
{
if (needCheckOfflinePayments)
return;
try
{
var invoiceIds = await _InvoiceRepository.GetPendingInvoiceIds();
foreach (var invoiceId in invoiceIds)
_CheckInvoices.Writer.TryWrite(invoiceId);
CheckConnections();
}
catch { } // Never throw an unhandled exception on async void
catch { }
}, null, 0, (int)PollInterval.TotalMilliseconds);
leases.Add(_ListenPoller);
return Task.CompletedTask;
}
private void CheckConnections()
{
lock (_InstanceListeners)
{
foreach ((var key, var instance) in _InstanceListeners.ToArray())
{
instance.RemoveExpiredInvoices();
if (!instance.Empty)
instance.EnsureListening(_Cts.Token);
}
}
}
private async Task CreateNewLNInvoiceForBTCPayInvoice(InvoiceEntity invoice)
{
var paymentMethods = invoice.GetPaymentMethods()
@ -307,7 +336,12 @@ namespace BTCPayServer.Payments.Lightning
var instanceListenerKey = (paymentMethod.Network.CryptoCode,
GetLightningUrl(supportedMethod).ToString());
if (_InstanceListeners.TryGetValue(instanceListenerKey, out var instanceListener))
LightningInstanceListener instanceListener;
lock (_InstanceListeners)
{
_InstanceListeners.TryGetValue(instanceListenerKey, out instanceListener);
}
if (instanceListener is not null)
{
await _InvoiceRepository.NewPaymentDetails(invoice.Id, newPaymentMethodDetails,
paymentMethod.Network);
@ -366,11 +400,11 @@ namespace BTCPayServer.Payments.Lightning
}
}
}
private Timer _ListenPoller;
public IOptions<LightningNetworkOptions> Options { get; }
readonly CancellationTokenSource _Cts = new CancellationTokenSource();
private Timer _ListenPoller;
public async Task StopAsync(CancellationToken cancellationToken)
{
@ -443,12 +477,15 @@ namespace BTCPayServer.Payments.Lightning
return lightningInvoice?.Status;
}
public bool Empty => _ListenedInvoices.IsEmpty;
public bool IsListening => Listening?.Status is TaskStatus.Running || Listening?.Status is TaskStatus.WaitingForActivation;
public Task Listening { get; set; }
public void EnsureListening(CancellationToken cancellation)
{
if (!IsListening)
{
if (StopListeningCancellationTokenSource != null)
StopListeningCancellationTokenSource.Dispose();
StopListeningCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellation);
Listening = Listen(StopListeningCancellationTokenSource.Token);
}

View file

@ -2,6 +2,7 @@ using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BTCPayServer.Abstractions.Extensions;
using BTCPayServer.Client.Models;
@ -88,7 +89,7 @@ namespace BTCPayServer.Services.Invoices
.ToListAsync()).Select(ToEntity);
}
public async Task<InvoiceEntity[]> GetPendingInvoices(bool includeAddressData = false, bool skipNoPaymentInvoices = false)
public async Task<InvoiceEntity[]> GetPendingInvoices(bool includeAddressData = false, bool skipNoPaymentInvoices = false, CancellationToken cancellationToken = default)
{
using var ctx = _applicationDbContextFactory.CreateContext();
var q = ctx.PendingInvoices.AsQueryable();
@ -99,7 +100,7 @@ namespace BTCPayServer.Services.Invoices
.ThenInclude(o => o.AddressInvoices);
if (skipNoPaymentInvoices)
q = q.Where(i => i.InvoiceData.Payments.Any());
return (await q.Select(o => o.InvoiceData).ToArrayAsync()).Select(ToEntity).ToArray();
return (await q.Select(o => o.InvoiceData).ToArrayAsync(cancellationToken)).Select(ToEntity).ToArray();
}
public async Task<string[]> GetPendingInvoiceIds()
{