Refactor the lightning listener, some users complain payments are not detected (should fix #676)

This commit is contained in:
nicolas.dorier 2019-03-27 15:53:06 +09:00
parent e3ab1f5228
commit 3cce7b8b35
3 changed files with 263 additions and 212 deletions

View file

@ -23,6 +23,7 @@ using BTCPayServer.Payments.Lightning;
using BTCPayServer.Lightning.CLightning;
using BTCPayServer.Lightning;
using BTCPayServer.Services;
using BTCPayServer.Tests.Logging;
namespace BTCPayServer.Tests
{
@ -85,9 +86,11 @@ namespace BTCPayServer.Tests
/// Connect a customer LN node to the merchant LN node
/// </summary>
/// <returns></returns>
public Task EnsureChannelsSetup()
public async Task EnsureChannelsSetup()
{
return BTCPayServer.Lightning.Tests.ConnectChannels.ConnectAll(ExplorerNode, GetLightningSenderClients(), GetLightningDestClients());
Logs.Tester.LogInformation("Connecting channels");
await BTCPayServer.Lightning.Tests.ConnectChannels.ConnectAll(ExplorerNode, GetLightningSenderClients(), GetLightningDestClients()).ConfigureAwait(false);
Logs.Tester.LogInformation("Channels connected");
}
private IEnumerable<ILightningClient> GetLightningSenderClients()

View file

@ -462,14 +462,14 @@ namespace BTCPayServer.Tests
}
}
[Fact]
[Fact(Timeout = 60 * 1000)]
[Trait("Integration", "Integration")]
public async Task CanSendLightningPaymentCLightning()
{
await ProcessLightningPayment(LightningConnectionType.CLightning);
}
[Fact]
[Fact(Timeout = 60 * 1000)]
[Trait("Integration", "Integration")]
public async Task CanSendLightningPaymentCharge()
{
@ -516,7 +516,9 @@ namespace BTCPayServer.Tests
ItemDesc = "Some description"
});
await Task.Delay(TimeSpan.FromMilliseconds(1000)); // Give time to listen the new invoices
Logs.Tester.LogInformation($"Trying to send Lightning payment to {invoice.Id}");
await tester.SendLightningPaymentAsync(invoice);
Logs.Tester.LogInformation($"Lightning payment to {invoice.Id} is sent");
await TestUtils.EventuallyAsync(async () =>
{
var localInvoice = await user.BitPay.GetInvoiceAsync(invoice.Id);

View file

@ -10,129 +10,144 @@ using BTCPayServer.Services.Invoices;
using Microsoft.Extensions.Hosting;
using NBXplorer;
using BTCPayServer.Lightning;
using System.Collections.Concurrent;
using System.Threading.Channels;
using Microsoft.Extensions.Caching.Memory;
namespace BTCPayServer.Payments.Lightning
{
public class LightningListener : IHostedService
{
class ListenedInvoice
{
public LightningLikePaymentMethodDetails PaymentMethodDetails { get; set; }
public LightningSupportedPaymentMethod SupportedPaymentMethod { get; set; }
public PaymentMethod PaymentMethod { get; set; }
public string Uri { get; internal set; }
public BTCPayNetwork Network { get; internal set; }
public string InvoiceId { get; internal set; }
}
EventAggregator _Aggregator;
InvoiceRepository _InvoiceRepository;
private readonly IMemoryCache _memoryCache;
BTCPayNetworkProvider _NetworkProvider;
Channel<string> _CheckInvoices = Channel.CreateUnbounded<string>();
Task _CheckingInvoice;
Dictionary<(string, string), LightningInstanceListener> _InstanceListeners = new Dictionary<(string, string), LightningInstanceListener>();
public LightningListener(EventAggregator aggregator,
InvoiceRepository invoiceRepository,
IMemoryCache memoryCache,
BTCPayNetworkProvider networkProvider)
{
_Aggregator = aggregator;
_InvoiceRepository = invoiceRepository;
_memoryCache = memoryCache;
_NetworkProvider = networkProvider;
}
async Task CheckingInvoice(CancellationToken cancellation)
{
while(await _CheckInvoices.Reader.WaitToReadAsync(cancellation) &&
_CheckInvoices.Reader.TryRead(out var invoiceId))
{
try
{
foreach (var listenedInvoice in (await GetListenedInvoices(invoiceId)).Where(i => !i.IsExpired()))
{
var instanceListenerKey = (listenedInvoice.Network.CryptoCode, listenedInvoice.SupportedPaymentMethod.GetLightningUrl().ToString());
if (!_InstanceListeners.TryGetValue(instanceListenerKey, out var instanceListener) ||
!instanceListener.IsListening)
{
instanceListener = instanceListener ?? new LightningInstanceListener(_InvoiceRepository, _Aggregator, listenedInvoice.SupportedPaymentMethod, listenedInvoice.Network);
var status = await instanceListener.PollPayment(listenedInvoice, cancellation);
if (status is null ||
status is LightningInvoiceStatus.Paid ||
status is LightningInvoiceStatus.Expired)
{
continue;
}
instanceListener.AddListenedInvoice(listenedInvoice);
instanceListener.EnsureListening(cancellation);
_InstanceListeners.TryAdd(instanceListenerKey, instanceListener);
}
else
{
instanceListener.AddListenedInvoice(listenedInvoice);
}
}
foreach (var kv in _InstanceListeners)
{
kv.Value.RemoveExpiredInvoices();
}
foreach (var k in _InstanceListeners
.Where(kv => !kv.Value.IsListening)
.Select(kv => kv.Key).ToArray())
{
_InstanceListeners.Remove(k);
}
}
catch when (!_Cts.Token.IsCancellationRequested)
{
}
}
}
private Task<List<ListenedInvoice>> GetListenedInvoices(string invoiceId)
{
return _memoryCache.GetOrCreateAsync(invoiceId, async (cacheEntry) =>
{
var listenedInvoices = new List<ListenedInvoice>();
var invoice = await _InvoiceRepository.GetInvoice(invoiceId);
foreach (var paymentMethod in invoice.GetPaymentMethods(_NetworkProvider)
.Where(c => c.GetId().PaymentType == PaymentTypes.LightningLike))
{
var lightningMethod = paymentMethod.GetPaymentMethodDetails() as LightningLikePaymentMethodDetails;
if (lightningMethod == null)
continue;
var lightningSupportedMethod = invoice.GetSupportedPaymentMethod<LightningSupportedPaymentMethod>(_NetworkProvider)
.FirstOrDefault(c => c.CryptoCode == paymentMethod.GetId().CryptoCode);
if (lightningSupportedMethod == null)
continue;
var network = _NetworkProvider.GetNetwork(paymentMethod.GetId().CryptoCode);
listenedInvoices.Add(new ListenedInvoice()
{
Expiration = invoice.ExpirationTime,
Uri = lightningSupportedMethod.GetLightningUrl().BaseUri.AbsoluteUri,
PaymentMethodDetails = lightningMethod,
SupportedPaymentMethod = lightningSupportedMethod,
PaymentMethod = paymentMethod,
Network = network,
InvoiceId = invoice.Id
});
}
var expiredIn = DateTimeOffset.UtcNow - invoice.ExpirationTime;
cacheEntry.AbsoluteExpiration = DateTimeOffset.UtcNow + (expiredIn >= TimeSpan.FromMinutes(5.0) ? expiredIn : TimeSpan.FromMinutes(5.0));
return listenedInvoices;
});
}
ConcurrentDictionary<string, LightningInstanceListener> _ListeningInstances = new ConcurrentDictionary<string, LightningInstanceListener>();
CompositeDisposable leases = new CompositeDisposable();
public Task StartAsync(CancellationToken cancellationToken)
{
leases.Add(_Aggregator.Subscribe<Events.InvoiceEvent>(async inv =>
leases.Add(_Aggregator.Subscribe<Events.InvoiceEvent>(inv =>
{
if (inv.Name == InvoiceEvent.Created)
{
await EnsureListening(inv.Invoice.Id, false);
_CheckInvoices.Writer.TryWrite(inv.Invoice.Id);
}
}));
_CheckingInvoice = CheckingInvoice(_Cts.Token);
_ListenPoller = new Timer(async s =>
{
try
{
await Task.WhenAll((await _InvoiceRepository.GetPendingInvoices())
.Select(async invoiceId => await EnsureListening(invoiceId, true))
.ToArray());
}
catch (AggregateException ex)
{
Logs.PayServer.LogError(ex.InnerException ?? ex.InnerExceptions.FirstOrDefault(), $"Lightning: Uncaught error");
}
catch (Exception ex)
{
Logs.PayServer.LogError(ex, $"Lightning: Uncaught error");
var invoiceIds = await _InvoiceRepository.GetPendingInvoices();
foreach (var invoiceId in invoiceIds)
_CheckInvoices.Writer.TryWrite(invoiceId);
}
catch { } // Never throw an unhandled exception on async void
}, null, 0, (int)PollInterval.TotalMilliseconds);
leases.Add(_ListenPoller);
return Task.CompletedTask;
}
private async Task EnsureListening(string invoiceId, bool poll)
{
if (Listening(invoiceId))
return;
var invoice = await _InvoiceRepository.GetInvoice(invoiceId);
foreach (var paymentMethod in invoice.GetPaymentMethods(_NetworkProvider)
.Where(c => c.GetId().PaymentType == PaymentTypes.LightningLike))
{
var lightningMethod = paymentMethod.GetPaymentMethodDetails() as LightningLikePaymentMethodDetails;
if (lightningMethod == null)
continue;
var lightningSupportedMethod = invoice.GetSupportedPaymentMethod<LightningSupportedPaymentMethod>(_NetworkProvider)
.FirstOrDefault(c => c.CryptoCode == paymentMethod.GetId().CryptoCode);
if (lightningSupportedMethod == null)
continue;
var network = _NetworkProvider.GetNetwork(paymentMethod.GetId().CryptoCode);
var listenedInvoice = new ListenedInvoice()
{
Uri = lightningSupportedMethod.GetLightningUrl().BaseUri.AbsoluteUri,
PaymentMethodDetails = lightningMethod,
SupportedPaymentMethod = lightningSupportedMethod,
PaymentMethod = paymentMethod,
Network = network,
InvoiceId = invoice.Id
};
if (poll)
{
var charge = lightningSupportedMethod.CreateClient(network);
LightningInvoice chargeInvoice = null;
string errorMessage = $"{lightningSupportedMethod.CryptoCode} (Lightning): Can't connect to the lightning server";
try
{
chargeInvoice = await charge.GetInvoice(lightningMethod.InvoiceId);
}
catch (System.Net.Sockets.SocketException socketEx) when (socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionRefused)
{
Logs.PayServer.LogError(errorMessage);
continue;
}
catch (Exception ex) when (ex.InnerException is System.Net.Sockets.SocketException socketEx
&& socketEx.SocketErrorCode == System.Net.Sockets.SocketError.ConnectionRefused)
{
Logs.PayServer.LogError(errorMessage);
continue;
}
catch (Exception ex)
{
Logs.PayServer.LogError(ex, errorMessage);
continue;
}
if (chargeInvoice == null)
continue;
if (chargeInvoice.Status == LightningInvoiceStatus.Paid)
await AddPayment(network, chargeInvoice, listenedInvoice);
if (chargeInvoice.Status == LightningInvoiceStatus.Paid || chargeInvoice.Status == LightningInvoiceStatus.Expired)
continue;
}
StartListening(listenedInvoice);
}
}
TimeSpan _PollInterval = TimeSpan.FromMinutes(1.0);
public TimeSpan PollInterval
{
@ -151,56 +166,155 @@ namespace BTCPayServer.Payments.Lightning
}
CancellationTokenSource _Cts = new CancellationTokenSource();
private async Task Listen(LightningSupportedPaymentMethod supportedPaymentMethod, BTCPayNetwork network)
HashSet<string> _InvoiceIds = new HashSet<string>();
private Timer _ListenPoller;
public async Task StopAsync(CancellationToken cancellationToken)
{
ILightningInvoiceListener session = null;
leases.Dispose();
_Cts.Cancel();
try
{
await _CheckingInvoice;
}
catch (OperationCanceledException)
{
}
try
{
await Task.WhenAll(_ListeningInstances.Select(c => c.Value.Listening).ToArray());
}
catch (OperationCanceledException)
{
}
Logs.PayServer.LogInformation("Lightning listened stopped");
}
}
public class LightningInstanceListener
{
private LightningSupportedPaymentMethod supportedPaymentMethod;
private readonly InvoiceRepository invoiceRepository;
private readonly EventAggregator _eventAggregator;
private readonly BTCPayNetwork network;
public LightningInstanceListener(InvoiceRepository invoiceRepository,
EventAggregator eventAggregator,
LightningSupportedPaymentMethod supportedPaymentMethod,
BTCPayNetwork network)
{
this.supportedPaymentMethod = supportedPaymentMethod;
this.invoiceRepository = invoiceRepository;
_eventAggregator = eventAggregator;
this.network = network;
}
internal bool AddListenedInvoice(ListenedInvoice invoice)
{
return _ListenedInvoices.TryAdd(invoice.PaymentMethodDetails.InvoiceId, invoice);
}
internal async Task<LightningInvoiceStatus?> PollPayment(ListenedInvoice listenedInvoice, CancellationToken cancellation)
{
var client = supportedPaymentMethod.CreateClient(network);
LightningInvoice lightningInvoice = await client.GetInvoice(listenedInvoice.PaymentMethodDetails.InvoiceId);
if (lightningInvoice?.Status is LightningInvoiceStatus.Paid &&
await AddPayment(lightningInvoice, listenedInvoice.InvoiceId))
{
Logs.PayServer.LogInformation($"{supportedPaymentMethod.CryptoCode} (Lightning): Payment detected via polling on {listenedInvoice.InvoiceId}");
}
return lightningInvoice?.Status;
}
public bool IsListening => Listening?.Status is TaskStatus.Running || Listening?.Status is TaskStatus.WaitingForActivation;
public Task Listening { get; set; }
public void EnsureListening(CancellationToken cancellation)
{
if (!IsListening)
{
StopListeningCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellation);
Listening = Listen(StopListeningCancellationTokenSource.Token);
}
}
public CancellationTokenSource StopListeningCancellationTokenSource;
async Task Listen(CancellationToken cancellation)
{
Logs.PayServer.LogInformation($"{supportedPaymentMethod.CryptoCode} (Lightning): Start listening {supportedPaymentMethod.GetLightningUrl().BaseUri}");
try
{
Logs.PayServer.LogInformation($"{supportedPaymentMethod.CryptoCode} (Lightning): Start listening {supportedPaymentMethod.GetLightningUrl().BaseUri}");
var lightningClient = supportedPaymentMethod.CreateClient(network);
session = await lightningClient.Listen(_Cts.Token);
while (true)
using (var session = await lightningClient.Listen(cancellation))
{
var notification = await session.WaitInvoice(_Cts.Token);
ListenedInvoice listenedInvoice = GetListenedInvoice(notification.Id);
if (listenedInvoice == null)
continue;
if (notification.Id == listenedInvoice.PaymentMethodDetails.InvoiceId &&
notification.BOLT11 == listenedInvoice.PaymentMethodDetails.BOLT11)
// Just in case the payment arrived after our last poll but before we listened.
await PollAllListenedInvoices(cancellation);
if (_ErrorAlreadyLogged)
{
if (notification.Status == LightningInvoiceStatus.Paid &&
notification.PaidAt.HasValue && notification.Amount != null)
Logs.PayServer.LogInformation($"{supportedPaymentMethod.CryptoCode} (Lightning): Could reconnect successfully to {supportedPaymentMethod.GetLightningUrl().BaseUri}");
}
_ErrorAlreadyLogged = false;
while (!_ListenedInvoices.IsEmpty)
{
var notification = await session.WaitInvoice(cancellation);
if (!_ListenedInvoices.TryGetValue(notification.Id, out var listenedInvoice))
continue;
if (notification.Id == listenedInvoice.PaymentMethodDetails.InvoiceId &&
notification.BOLT11 == listenedInvoice.PaymentMethodDetails.BOLT11)
{
await AddPayment(network, notification, listenedInvoice);
if (DoneListening(listenedInvoice))
break;
}
if (notification.Status == LightningInvoiceStatus.Expired)
{
if (DoneListening(listenedInvoice))
break;
if (notification.Status == LightningInvoiceStatus.Paid &&
notification.PaidAt.HasValue && notification.Amount != null)
{
if (await AddPayment(notification, listenedInvoice.InvoiceId))
{
Logs.PayServer.LogInformation($"{supportedPaymentMethod.CryptoCode} (Lightning): Payment detected via notification ({listenedInvoice.InvoiceId})");
}
_ListenedInvoices.TryRemove(notification.Id, out var _);
}
else if (notification.Status == LightningInvoiceStatus.Expired)
{
_ListenedInvoices.TryRemove(notification.Id, out var _);
}
}
}
}
}
catch when (_Cts.IsCancellationRequested)
{
}
catch (Exception ex)
catch (Exception ex) when (!cancellation.IsCancellationRequested && !_ErrorAlreadyLogged)
{
_ErrorAlreadyLogged = true;
Logs.PayServer.LogError(ex, $"{supportedPaymentMethod.CryptoCode} (Lightning): Error while contacting {supportedPaymentMethod.GetLightningUrl().BaseUri}");
DoneListening(supportedPaymentMethod.GetLightningUrl());
Logs.PayServer.LogInformation($"{supportedPaymentMethod.CryptoCode} (Lightning): Stop listening {supportedPaymentMethod.GetLightningUrl().BaseUri}");
}
finally
catch (OperationCanceledException) when (cancellation.IsCancellationRequested) { }
if (_ListenedInvoices.IsEmpty)
Logs.PayServer.LogInformation($"{supportedPaymentMethod.CryptoCode} (Lightning): No more invoice to listen on {supportedPaymentMethod.GetLightningUrl().BaseUri}, releasing the connection.");
}
public DateTimeOffset? LastFullPoll { get; set; }
internal async Task PollAllListenedInvoices(CancellationToken cancellation)
{
foreach (var invoice in _ListenedInvoices.Values)
{
session?.Dispose();
var status = await PollPayment(invoice, cancellation);
if (status is null ||
status is LightningInvoiceStatus.Paid ||
status is LightningInvoiceStatus.Expired)
_ListenedInvoices.TryRemove(invoice.PaymentMethodDetails.InvoiceId, out var _);
}
LastFullPoll = DateTimeOffset.UtcNow;
if (_ListenedInvoices.IsEmpty)
{
StopListeningCancellationTokenSource?.Cancel();
}
Logs.PayServer.LogInformation($"{supportedPaymentMethod.CryptoCode} (Lightning): Stop listening {supportedPaymentMethod.GetLightningUrl().BaseUri}");
}
private async Task AddPayment(BTCPayNetwork network, LightningInvoice notification, ListenedInvoice listenedInvoice)
bool _ErrorAlreadyLogged = false;
ConcurrentDictionary<string, ListenedInvoice> _ListenedInvoices = new ConcurrentDictionary<string, ListenedInvoice>();
public async Task<bool> AddPayment(LightningInvoice notification, string invoiceId)
{
var payment = await _InvoiceRepository.AddPayment(listenedInvoice.InvoiceId, notification.PaidAt.Value, new LightningLikePaymentData()
var payment = await invoiceRepository.AddPayment(invoiceId, notification.PaidAt.Value, new LightningLikePaymentData()
{
BOLT11 = notification.BOLT11,
PaymentHash = BOLT11PaymentRequest.Parse(notification.BOLT11, network.NBitcoinNetwork).PaymentHash,
@ -208,102 +322,34 @@ namespace BTCPayServer.Payments.Lightning
}, network, accounted: true);
if (payment != null)
{
var invoice = await _InvoiceRepository.GetInvoice(listenedInvoice.InvoiceId);
var invoice = await invoiceRepository.GetInvoice(invoiceId);
if (invoice != null)
_Aggregator.Publish(new InvoiceEvent(invoice, 1002, InvoiceEvent.ReceivedPayment){Payment = payment});
_eventAggregator.Publish(new InvoiceEvent(invoice, 1002, InvoiceEvent.ReceivedPayment) { Payment = payment });
}
return payment != null;
}
List<Task> _ListeningLightning = new List<Task>();
MultiValueDictionary<string, ListenedInvoice> _ListenedInvoiceByLightningUrl = new MultiValueDictionary<string, ListenedInvoice>();
Dictionary<string, ListenedInvoice> _ListenedInvoiceByChargeInvoiceId = new Dictionary<string, ListenedInvoice>();
HashSet<string> _InvoiceIds = new HashSet<string>();
private Timer _ListenPoller;
/// <summary>
/// Stop listening an invoice
/// </summary>
/// <param name="listenedInvoice">The invoice to stop listening</param>
/// <returns>true if still need to listen the lightning instance</returns>
bool DoneListening(ListenedInvoice listenedInvoice)
internal void RemoveExpiredInvoices()
{
lock (_ListenedInvoiceByLightningUrl)
foreach (var invoice in _ListenedInvoices)
{
_ListenedInvoiceByChargeInvoiceId.Remove(listenedInvoice.PaymentMethodDetails.InvoiceId);
_ListenedInvoiceByLightningUrl.Remove(listenedInvoice.Uri, listenedInvoice);
_InvoiceIds.Remove(listenedInvoice.InvoiceId);
if (!_ListenedInvoiceByLightningUrl.ContainsKey(listenedInvoice.Uri))
{
return true;
}
if (invoice.Value.IsExpired())
_ListenedInvoices.TryRemove(invoice.Key, out var _);
}
return false;
}
/// <summary>
/// Stop listening all invoices on this server
/// </summary>
/// <param name="uri"></param>
private void DoneListening(LightningConnectionString connectionString)
{
var uri = connectionString.BaseUri;
lock (_ListenedInvoiceByChargeInvoiceId)
{
foreach (var listenedInvoice in _ListenedInvoiceByLightningUrl[uri.AbsoluteUri])
{
_ListenedInvoiceByChargeInvoiceId.Remove(listenedInvoice.PaymentMethodDetails.InvoiceId);
_InvoiceIds.Remove(listenedInvoice.InvoiceId);
}
_ListenedInvoiceByLightningUrl.Remove(uri.AbsoluteUri);
}
}
bool Listening(string invoiceId)
{
lock (_ListenedInvoiceByLightningUrl)
{
return _InvoiceIds.Contains(invoiceId);
}
}
private ListenedInvoice GetListenedInvoice(string chargeInvoiceId)
{
ListenedInvoice listenedInvoice = null;
lock (_ListenedInvoiceByLightningUrl)
{
_ListenedInvoiceByChargeInvoiceId.TryGetValue(chargeInvoiceId, out listenedInvoice);
}
return listenedInvoice;
}
bool StartListening(ListenedInvoice listenedInvoice)
{
lock (_ListenedInvoiceByLightningUrl)
{
if (_InvoiceIds.Contains(listenedInvoice.InvoiceId))
return false;
if (!_ListenedInvoiceByLightningUrl.ContainsKey(listenedInvoice.Uri))
{
var listen = Listen(listenedInvoice.SupportedPaymentMethod, listenedInvoice.Network);
_ListeningLightning.Add(listen);
}
_ListenedInvoiceByLightningUrl.Add(listenedInvoice.Uri, listenedInvoice);
_ListenedInvoiceByChargeInvoiceId.Add(listenedInvoice.PaymentMethodDetails.InvoiceId, listenedInvoice);
_InvoiceIds.Add(listenedInvoice.InvoiceId);
}
return true;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
leases.Dispose();
_Cts.Cancel();
Task[] listening = null;
lock (_ListenedInvoiceByLightningUrl)
{
listening = _ListeningLightning.ToArray();
}
await Task.WhenAll(listening);
if (_ListenedInvoices.IsEmpty)
StopListeningCancellationTokenSource?.Cancel();
}
}
class ListenedInvoice
{
public bool IsExpired() { return DateTimeOffset.UtcNow > Expiration; }
public DateTimeOffset Expiration { get; set; }
public LightningLikePaymentMethodDetails PaymentMethodDetails { get; set; }
public LightningSupportedPaymentMethod SupportedPaymentMethod { get; set; }
public PaymentMethod PaymentMethod { get; set; }
public string Uri { get; internal set; }
public BTCPayNetwork Network { get; internal set; }
public string InvoiceId { get; internal set; }
}
}