Parallel payout ln (#5781)

This commit is contained in:
Andrew Camilleri 2024-03-14 10:29:14 +01:00 committed by GitHub
parent e497903bf4
commit 0e64df3bbf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 88 additions and 69 deletions

View file

@ -68,6 +68,7 @@ public class LightningPendingPayoutListener : BaseAsyncService
foreach (IGrouping<string, PayoutData> payoutByStore in payouts.GroupBy(data => data.StoreDataId)) foreach (IGrouping<string, PayoutData> payoutByStore in payouts.GroupBy(data => data.StoreDataId))
{ {
//this should never happen
if (!stores.TryGetValue(payoutByStore.Key, out var store)) if (!stores.TryGetValue(payoutByStore.Key, out var store))
{ {
foreach (PayoutData payoutData in payoutByStore) foreach (PayoutData payoutData in payoutByStore)

View file

@ -87,7 +87,15 @@ public abstract class BaseAutomatedPayoutProcessor<T> : BaseAsyncService where T
return Task.CompletedTask; return Task.CompletedTask;
} }
protected abstract Task Process(ISupportedPaymentMethod paymentMethod, List<PayoutData> payouts); protected virtual Task Process(ISupportedPaymentMethod paymentMethod, List<PayoutData> payouts) =>
throw new NotImplementedException();
protected virtual async Task<bool> ProcessShouldSave(ISupportedPaymentMethod paymentMethod,
List<PayoutData> payouts)
{
await Process(paymentMethod, payouts);
return true;
}
private async Task Act() private async Task Act()
{ {
@ -114,8 +122,8 @@ public abstract class BaseAutomatedPayoutProcessor<T> : BaseAsyncService where T
{ {
Logs.PayServer.LogInformation( Logs.PayServer.LogInformation(
$"{payouts.Count} found to process. Starting (and after will sleep for {blob.Interval})"); $"{payouts.Count} found to process. Starting (and after will sleep for {blob.Interval})");
await Process(paymentMethod, payouts); if (await ProcessShouldSave(paymentMethod, payouts))
{
await context.SaveChangesAsync(); await context.SaveChangesAsync();
foreach (var payoutData in payouts.Where(payoutData => payoutData.State != PayoutState.AwaitingPayment)) foreach (var payoutData in payouts.Where(payoutData => payoutData.State != PayoutState.AwaitingPayment))
@ -124,6 +132,8 @@ public abstract class BaseAutomatedPayoutProcessor<T> : BaseAsyncService where T
} }
} }
}
// Allow plugins do to something after automatic payout processing // Allow plugins do to something after automatic payout processing
await _pluginHookService.ApplyAction("after-automated-payout-processing", await _pluginHookService.ApplyAction("after-automated-payout-processing",
new AfterPayoutActionData(store, PayoutProcessorSettings, payouts)); new AfterPayoutActionData(store, PayoutProcessorSettings, payouts));

View file

@ -15,9 +15,11 @@ using BTCPayServer.Payments;
using BTCPayServer.Payments.Lightning; using BTCPayServer.Payments.Lightning;
using BTCPayServer.Services; using BTCPayServer.Services;
using BTCPayServer.Services.Stores; using BTCPayServer.Services.Stores;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using NBitcoin; using NBitcoin;
using MarkPayoutRequest = BTCPayServer.HostedServices.MarkPayoutRequest;
using PayoutData = BTCPayServer.Data.PayoutData; using PayoutData = BTCPayServer.Data.PayoutData;
using PayoutProcessorData = BTCPayServer.Data.PayoutProcessorData; using PayoutProcessorData = BTCPayServer.Data.PayoutProcessorData;
@ -29,9 +31,9 @@ public class LightningAutomatedPayoutProcessor : BaseAutomatedPayoutProcessor<Li
private readonly LightningClientFactoryService _lightningClientFactoryService; private readonly LightningClientFactoryService _lightningClientFactoryService;
private readonly UserService _userService; private readonly UserService _userService;
private readonly IOptions<LightningNetworkOptions> _options; private readonly IOptions<LightningNetworkOptions> _options;
private readonly PullPaymentHostedService _pullPaymentHostedService;
private readonly LightningLikePayoutHandler _payoutHandler; private readonly LightningLikePayoutHandler _payoutHandler;
private readonly BTCPayNetwork _network; private readonly BTCPayNetwork _network;
private readonly ConcurrentDictionary<string, int> _failedPayoutCounter = new();
public LightningAutomatedPayoutProcessor( public LightningAutomatedPayoutProcessor(
BTCPayNetworkJsonSerializerSettings btcPayNetworkJsonSerializerSettings, BTCPayNetworkJsonSerializerSettings btcPayNetworkJsonSerializerSettings,
@ -43,7 +45,8 @@ public class LightningAutomatedPayoutProcessor : BaseAutomatedPayoutProcessor<Li
ApplicationDbContextFactory applicationDbContextFactory, ApplicationDbContextFactory applicationDbContextFactory,
BTCPayNetworkProvider btcPayNetworkProvider, BTCPayNetworkProvider btcPayNetworkProvider,
IPluginHookService pluginHookService, IPluginHookService pluginHookService,
EventAggregator eventAggregator) : EventAggregator eventAggregator,
PullPaymentHostedService pullPaymentHostedService) :
base(logger, storeRepository, payoutProcessorSettings, applicationDbContextFactory, base(logger, storeRepository, payoutProcessorSettings, applicationDbContextFactory,
btcPayNetworkProvider, pluginHookService, eventAggregator) btcPayNetworkProvider, pluginHookService, eventAggregator)
{ {
@ -51,30 +54,27 @@ public class LightningAutomatedPayoutProcessor : BaseAutomatedPayoutProcessor<Li
_lightningClientFactoryService = lightningClientFactoryService; _lightningClientFactoryService = lightningClientFactoryService;
_userService = userService; _userService = userService;
_options = options; _options = options;
_pullPaymentHostedService = pullPaymentHostedService;
_payoutHandler = (LightningLikePayoutHandler)payoutHandlers.FindPayoutHandler(PaymentMethodId); _payoutHandler = (LightningLikePayoutHandler)payoutHandlers.FindPayoutHandler(PaymentMethodId);
_network = _btcPayNetworkProvider.GetNetwork<BTCPayNetwork>(PayoutProcessorSettings.GetPaymentMethodId().CryptoCode); _network = _btcPayNetworkProvider.GetNetwork<BTCPayNetwork>(PayoutProcessorSettings.GetPaymentMethodId()
.CryptoCode);
} }
protected override async Task Process(ISupportedPaymentMethod paymentMethod, List<PayoutData> payouts) private async Task HandlePayout(PayoutData payoutData, ILightningClient lightningClient)
{ {
var processorBlob = GetBlob(PayoutProcessorSettings); if (payoutData.State != PayoutState.AwaitingPayment)
var lightningSupportedPaymentMethod = (LightningSupportedPaymentMethod)paymentMethod; return;
if (lightningSupportedPaymentMethod.IsInternalNode && var res = await _pullPaymentHostedService.MarkPaid(new MarkPayoutRequest()
!(await Task.WhenAll((await _storeRepository.GetStoreUsers(PayoutProcessorSettings.StoreId)) {
.Where(user => user.StoreRole.ToPermissionSet( PayoutProcessorSettings.StoreId).Contains(Policies.CanModifyStoreSettings, PayoutProcessorSettings.StoreId)).Select(user => user.Id) State = PayoutState.InProgress, PayoutId = payoutData.Id, Proof = null
.Select(s => _userService.IsAdminUser(s)))).Any(b => b)) });
if (res != MarkPayoutRequest.PayoutPaidResult.Ok)
{ {
return; return;
} }
var client =
lightningSupportedPaymentMethod.CreateLightningClient(_network, _options.Value,
_lightningClientFactoryService);
foreach (var payoutData in payouts)
{
var blob = payoutData.GetBlob(_btcPayNetworkJsonSerializerSettings); var blob = payoutData.GetBlob(_btcPayNetworkJsonSerializerSettings);
var failed = false;
var claim = await _payoutHandler.ParseClaimDestination(PaymentMethodId, blob.Destination, CancellationToken); var claim = await _payoutHandler.ParseClaimDestination(PaymentMethodId, blob.Destination, CancellationToken);
try try
{ {
@ -84,53 +84,61 @@ public class LightningAutomatedPayoutProcessor : BaseAutomatedPayoutProcessor<Li
var lnurlResult = await UILightningLikePayoutController.GetInvoiceFromLNURL(payoutData, var lnurlResult = await UILightningLikePayoutController.GetInvoiceFromLNURL(payoutData,
_payoutHandler, blob, _payoutHandler, blob,
lnurlPayClaimDestinaton, _network.NBitcoinNetwork, CancellationToken); lnurlPayClaimDestinaton, _network.NBitcoinNetwork, CancellationToken);
if (lnurlResult.Item2 is not null) if (lnurlResult.Item2 is null)
{ {
continue; await TrypayBolt(lightningClient, blob, payoutData,
}
failed = !await TrypayBolt(client, blob, payoutData,
lnurlResult.Item1); lnurlResult.Item1);
}
break; break;
case BoltInvoiceClaimDestination item1: case BoltInvoiceClaimDestination item1:
failed = !await TrypayBolt(client, blob, payoutData, item1.PaymentRequest); await TrypayBolt(lightningClient, blob, payoutData, item1.PaymentRequest);
break; break;
} }
} }
catch (Exception e) catch (Exception e)
{ {
Logs.PayServer.LogError(e, $"Could not process payout {payoutData.Id}"); Logs.PayServer.LogError(e, $"Could not process payout {payoutData.Id}");
failed = true;
} }
if (failed && processorBlob.CancelPayoutAfterFailures is not null) if (payoutData.State != PayoutState.InProgress || payoutData.Proof is not null)
{ {
if (!_failedPayoutCounter.TryGetValue(payoutData.Id, out int counter)) await _pullPaymentHostedService.MarkPaid(new MarkPayoutRequest()
{ {
counter = 0; State = payoutData.State, PayoutId = payoutData.Id, Proof = payoutData.GetProofBlobJson()
});
} }
counter++; }
if(counter >= processorBlob.CancelPayoutAfterFailures)
protected override async Task<bool>ProcessShouldSave(ISupportedPaymentMethod paymentMethod, List<PayoutData> payouts)
{ {
payoutData.State = PayoutState.Cancelled; var processorBlob = GetBlob(PayoutProcessorSettings);
Logs.PayServer.LogError($"Payout {payoutData.Id} has failed {counter} times, cancelling it"); var lightningSupportedPaymentMethod = (LightningSupportedPaymentMethod)paymentMethod;
} if (lightningSupportedPaymentMethod.IsInternalNode &&
else !(await Task.WhenAll((await _storeRepository.GetStoreUsers(PayoutProcessorSettings.StoreId))
.Where(user =>
user.StoreRole.ToPermissionSet(PayoutProcessorSettings.StoreId)
.Contains(Policies.CanModifyStoreSettings, PayoutProcessorSettings.StoreId))
.Select(user => user.Id)
.Select(s => _userService.IsAdminUser(s)))).Any(b => b))
{ {
_failedPayoutCounter.AddOrReplace(payoutData.Id, counter); return false;
}
}
if (payoutData.State == PayoutState.Cancelled)
{
_failedPayoutCounter.TryRemove(payoutData.Id, out _);
}
} }
var client =
lightningSupportedPaymentMethod.CreateLightningClient(_network, _options.Value,
_lightningClientFactoryService);
await Task.WhenAll(payouts.Select(data => HandlePayout(data, client)));
//we return false because this processor handles db updates on its own
return false;
} }
//we group per store and init the transfers by each //we group per store and init the transfers by each
async Task<bool> TrypayBolt(ILightningClient lightningClient, PayoutBlob payoutBlob, PayoutData payoutData, async Task<bool> TrypayBolt(ILightningClient lightningClient, PayoutBlob payoutBlob, PayoutData payoutData,
BOLT11PaymentRequest bolt11PaymentRequest) BOLT11PaymentRequest bolt11PaymentRequest)
{ {
return (await UILightningLikePayoutController.TrypayBolt(lightningClient, payoutBlob, payoutData, bolt11PaymentRequest, return (await UILightningLikePayoutController.TrypayBolt(lightningClient, payoutBlob, payoutData,
payoutData.GetPaymentMethodId(), CancellationToken)).Result == PayResult.Ok; bolt11PaymentRequest,
payoutData.GetPaymentMethodId(), CancellationToken)).Result is PayResult.Ok ;
} }
} }