Merge pull request #3282 from NicolasDorier/wofiq

The IPN notification manager should preserve IPN ordering
This commit is contained in:
Nicolas Dorier 2022-01-11 15:22:07 +09:00 committed by GitHub
commit 3c5d809cf9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 251 additions and 81 deletions

View file

@ -0,0 +1,116 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using ProcessingAction = System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task>;
namespace BTCPayServer
{
/// <summary>
/// This class make sure that enqueued actions sharing the same queue name
/// are executed sequentially.
/// This is useful to preserve order of events.
/// </summary>
public class MultiProcessingQueue
{
Dictionary<string, ProcessingQueue> _Queues = new Dictionary<string, ProcessingQueue>();
class ProcessingQueue
{
internal Channel<ProcessingAction> Chan = Channel.CreateUnbounded<ProcessingAction>();
internal Task ProcessTask;
public async Task Process(CancellationToken cancellationToken)
{
retry:
while (Chan.Reader.TryRead(out var item))
{
await item(cancellationToken);
}
if (Chan.Writer.TryComplete())
{
goto retry;
}
}
}
public int QueueCount
{
get
{
lock (_Queues)
{
Cleanup();
return _Queues.Count;
}
}
}
CancellationTokenSource cts = new CancellationTokenSource();
bool stopped;
public void Enqueue(string queueName, ProcessingAction act)
{
lock (_Queues)
{
retry:
if (stopped)
return;
Cleanup();
bool created = false;
if (!_Queues.TryGetValue(queueName, out var queue))
{
queue = new ProcessingQueue();
_Queues.Add(queueName, queue);
created = true;
}
if (!queue.Chan.Writer.TryWrite(act))
goto retry;
if (created)
queue.ProcessTask = queue.Process(cts.Token);
}
}
private void Cleanup()
{
var removeList = new List<string>();
foreach (var q in _Queues)
{
if (q.Value.Chan.Reader.Completion.IsCompletedSuccessfully)
{
removeList.Add(q.Key);
}
}
foreach (var q in removeList)
{
_Queues.Remove(q);
}
}
public async Task Abort(CancellationToken cancellationToken)
{
stopped = true;
ProcessingQueue[] queues = null;
lock (_Queues)
{
queues = _Queues.Select(c => c.Value).ToArray();
}
cts.Cancel();
var delay = Task.Delay(-1, cancellationToken);
foreach (var q in queues)
{
try
{
await Task.WhenAny(q.ProcessTask, delay);
}
catch
{
}
}
cancellationToken.ThrowIfCancellationRequested();
lock (_Queues)
{
Cleanup();
}
}
}
}

View file

@ -1028,6 +1028,68 @@ namespace BTCPayServer.Tests
Assert.False(CurrencyValue.TryParse("1.501", out result));
}
[Fact]
public async Task MultiProcessingQueueTests()
{
MultiProcessingQueue q = new MultiProcessingQueue();
var q10 = Enqueue(q, "q1");
var q11 = Enqueue(q, "q1");
var q20 = Enqueue(q, "q2");
var q30 = Enqueue(q, "q3");
q10.AssertStarted();
q11.AssertStopped();
q20.AssertStarted();
q30.AssertStarted();
Assert.Equal(3, q.QueueCount);
q10.Done();
q10.AssertStopped();
q11.AssertStarted();
q20.AssertStarted();
Assert.Equal(3, q.QueueCount);
q30.Done();
q30.AssertStopped();
TestUtils.Eventually(() => Assert.Equal(2, q.QueueCount), 1000);
await q.Abort(default);
q11.AssertAborted();
q20.AssertAborted();
Assert.Equal(0, q.QueueCount);
}
class MultiProcessingQueueTest
{
public bool Started;
public bool Aborted;
public TaskCompletionSource Tcs;
public void Done() { Tcs.TrySetResult(); }
public void AssertStarted()
{
TestUtils.Eventually(() => Assert.True(Started), 1000);
}
public void AssertStopped()
{
TestUtils.Eventually(() => Assert.False(Started), 1000);
}
public void AssertAborted()
{
TestUtils.Eventually(() => Assert.True(Aborted), 1000);
}
}
private static MultiProcessingQueueTest Enqueue(MultiProcessingQueue q, string queueName)
{
MultiProcessingQueueTest t = new MultiProcessingQueueTest();
t.Tcs = new TaskCompletionSource();
q.Enqueue(queueName, async (cancellationToken) => {
t.Started = true;
try
{
await t.Tcs.Task.WaitAsync(cancellationToken);
}
catch { t.Aborted = true; }
t.Started = false;
});
return t;
}
[Fact]
public async Task CanScheduleBackgroundTasks()
{

View file

@ -26,14 +26,14 @@ namespace BTCPayServer.Controllers.GreenField
[EnableCors(CorsPolicies.All)]
public class StoreWebhooksController : ControllerBase
{
public StoreWebhooksController(StoreRepository storeRepository, WebhookNotificationManager webhookNotificationManager)
public StoreWebhooksController(StoreRepository storeRepository, WebhookSender webhookNotificationManager)
{
StoreRepository = storeRepository;
WebhookNotificationManager = webhookNotificationManager;
}
public StoreRepository StoreRepository { get; }
public WebhookNotificationManager WebhookNotificationManager { get; }
public WebhookSender WebhookNotificationManager { get; }
[HttpGet("~/api/v1/stores/{storeId}/webhooks/{webhookId?}")]
public async Task<IActionResult> ListWebhooks(string storeId, string webhookId)

View file

@ -43,7 +43,7 @@ namespace BTCPayServer.Controllers
private readonly PullPaymentHostedService _paymentHostedService;
private readonly LanguageService _languageService;
public WebhookNotificationManager WebhookNotificationManager { get; }
public WebhookSender WebhookNotificationManager { get; }
public InvoiceController(
InvoiceRepository invoiceRepository,
@ -57,7 +57,7 @@ namespace BTCPayServer.Controllers
PaymentMethodHandlerDictionary paymentMethodHandlerDictionary,
ApplicationDbContextFactory dbContextFactory,
PullPaymentHostedService paymentHostedService,
WebhookNotificationManager webhookNotificationManager,
WebhookSender webhookNotificationManager,
LanguageService languageService)
{
_CurrencyNameTable = currencyNameTable ?? throw new ArgumentNullException(nameof(currencyNameTable));

View file

@ -1,5 +1,6 @@
#nullable enable
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BTCPayServer.Client.Models;
using BTCPayServer.Data;
@ -132,9 +133,9 @@ namespace BTCPayServer.Controllers
}
[HttpPost("{storeId}/webhooks/{webhookId}/test")]
public async Task<IActionResult> TestWebhook(string webhookId, TestWebhookViewModel viewModel)
public async Task<IActionResult> TestWebhook(string webhookId, TestWebhookViewModel viewModel, CancellationToken cancellationToken)
{
var result = await WebhookNotificationManager.TestWebhook(CurrentStore.Id, webhookId, viewModel.Type);
var result = await WebhookNotificationManager.TestWebhook(CurrentStore.Id, webhookId, viewModel.Type, cancellationToken);
if (result.Success)
{

View file

@ -58,7 +58,7 @@ namespace BTCPayServer.Controllers
IAuthorizationService authorizationService,
EventAggregator eventAggregator,
AppService appService,
WebhookNotificationManager webhookNotificationManager,
WebhookSender webhookNotificationManager,
IDataProtectionProvider dataProtector,
NBXplorerDashboard Dashboard)
{
@ -818,7 +818,7 @@ namespace BTCPayServer.Controllers
}
public string GeneratedPairingCode { get; set; }
public WebhookNotificationManager WebhookNotificationManager { get; }
public WebhookSender WebhookNotificationManager { get; }
public IDataProtector DataProtector { get; }
[HttpGet]

View file

@ -33,7 +33,7 @@ namespace BTCPayServer.Data
public byte[] Request { get; set; }
public T ReadRequestAs<T>()
{
return JsonConvert.DeserializeObject<T>(UTF8Encoding.UTF8.GetString(Request), HostedServices.WebhookNotificationManager.DefaultSerializerSettings);
return JsonConvert.DeserializeObject<T>(UTF8Encoding.UTF8.GetString(Request), HostedServices.WebhookSender.DefaultSerializerSettings);
}
}
public class WebhookBlob
@ -56,11 +56,11 @@ namespace BTCPayServer.Data
}
public static WebhookDeliveryBlob GetBlob(this WebhookDeliveryData webhook)
{
return JsonConvert.DeserializeObject<WebhookDeliveryBlob>(ZipUtils.Unzip(webhook.Blob), HostedServices.WebhookNotificationManager.DefaultSerializerSettings);
return JsonConvert.DeserializeObject<WebhookDeliveryBlob>(ZipUtils.Unzip(webhook.Blob), HostedServices.WebhookSender.DefaultSerializerSettings);
}
public static void SetBlob(this WebhookDeliveryData webhook, WebhookDeliveryBlob blob)
{
webhook.Blob = ZipUtils.Zip(JsonConvert.SerializeObject(blob, Formatting.None, HostedServices.WebhookNotificationManager.DefaultSerializerSettings));
webhook.Blob = ZipUtils.Zip(JsonConvert.SerializeObject(blob, Formatting.None, HostedServices.WebhookSender.DefaultSerializerSettings));
}
}
}

View file

@ -22,7 +22,7 @@ using Newtonsoft.Json.Linq;
namespace BTCPayServer.HostedServices
{
public class InvoiceNotificationManager : IHostedService
public class BitpayIPNSender : IHostedService
{
readonly HttpClient _Client;
@ -39,13 +39,14 @@ namespace BTCPayServer.HostedServices
}
}
MultiProcessingQueue _Queue = new MultiProcessingQueue();
readonly IBackgroundJobClient _JobClient;
readonly EventAggregator _EventAggregator;
readonly InvoiceRepository _InvoiceRepository;
private readonly EmailSenderFactory _EmailSenderFactory;
private readonly StoreRepository _StoreRepository;
public InvoiceNotificationManager(
public BitpayIPNSender(
IHttpClientFactory httpClientFactory,
IBackgroundJobClient jobClient,
EventAggregator eventAggregator,
@ -140,14 +141,12 @@ namespace BTCPayServer.HostedServices
if (invoice.NotificationURL != null)
{
var invoiceStr = NBitcoin.JsonConverters.Serializer.ToString(new ScheduledJob() { TryCount = 0, Notification = notification });
_JobClient.Schedule((cancellation) => NotifyHttp(invoiceStr, cancellation), TimeSpan.Zero);
_Queue.Enqueue(invoice.Id, (cancellationToken) => NotifyHttp(new ScheduledJob() { TryCount = 0, Notification = notification }, cancellationToken));
}
}
public async Task NotifyHttp(string invoiceData, CancellationToken cancellationToken)
public async Task NotifyHttp(ScheduledJob job, CancellationToken cancellationToken)
{
var job = NBitcoin.JsonConverters.Serializer.ToObject<ScheduledJob>(invoiceData);
bool reschedule = false;
var aggregatorEvent = new InvoiceIPNEvent(job.Notification.Data.Id, job.Notification.Event.Code, job.Notification.Event.Name);
try
@ -159,9 +158,7 @@ namespace BTCPayServer.HostedServices
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// When the JobClient will be persistent, this will reschedule the job for after reboot
invoiceData = NBitcoin.JsonConverters.Serializer.ToString(job);
_JobClient.Schedule((cancellation) => NotifyHttp(invoiceData, cancellation), TimeSpan.FromMinutes(10.0));
_JobClient.Schedule((cancellation) => NotifyHttp(job, cancellation), TimeSpan.FromMinutes(10.0));
return;
}
catch (OperationCanceledException)
@ -190,8 +187,7 @@ namespace BTCPayServer.HostedServices
if (job.TryCount < MaxTry && reschedule)
{
invoiceData = NBitcoin.JsonConverters.Serializer.ToString(job);
_JobClient.Schedule((cancellation) => NotifyHttp(invoiceData, cancellation), TimeSpan.FromMinutes(10.0));
_JobClient.Schedule((cancellation) => NotifyHttp(job, cancellation), TimeSpan.FromMinutes(10.0));
}
}
@ -307,6 +303,7 @@ namespace BTCPayServer.HostedServices
readonly int MaxTry = 6;
readonly CompositeDisposable leases = new CompositeDisposable();
public Task StartAsync(CancellationToken cancellationToken)
{
leases.Add(_EventAggregator.SubscribeAsync<InvoiceEvent>(async e =>
@ -347,10 +344,10 @@ namespace BTCPayServer.HostedServices
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
public async Task StopAsync(CancellationToken cancellationToken)
{
leases.Dispose();
return Task.CompletedTask;
await _Queue.Abort(cancellationToken);
}
}
}

View file

@ -27,11 +27,11 @@ namespace BTCPayServer.HostedServices
/// This class send webhook notifications
/// It also make sure the events sent to a webhook are sent in order to the webhook
/// </summary>
public class WebhookNotificationManager : EventHostedServiceBase
public class WebhookSender : EventHostedServiceBase
{
readonly Encoding UTF8 = new UTF8Encoding(false);
public readonly static JsonSerializerSettings DefaultSerializerSettings;
static WebhookNotificationManager()
static WebhookSender()
{
DefaultSerializerSettings = WebhookEvent.DefaultSerializerSettings;
}
@ -55,11 +55,12 @@ namespace BTCPayServer.HostedServices
WebhookBlob = webhookBlob;
}
}
Dictionary<string, Channel<WebhookDeliveryRequest>> _InvoiceEventsByWebhookId = new Dictionary<string, Channel<WebhookDeliveryRequest>>();
MultiProcessingQueue _processingQueue = new MultiProcessingQueue();
public StoreRepository StoreRepository { get; }
public IHttpClientFactory HttpClientFactory { get; }
public WebhookNotificationManager(EventAggregator eventAggregator,
public WebhookSender(EventAggregator eventAggregator,
StoreRepository storeRepository,
IHttpClientFactory httpClientFactory,
Logs logs) : base(eventAggregator, logs)
@ -124,7 +125,7 @@ namespace BTCPayServer.HostedServices
return webhookEvent;
}
public async Task<DeliveryResult> TestWebhook(string storeId, string webhookId, WebhookEventType webhookEventType)
public async Task<DeliveryResult> TestWebhook(string storeId, string webhookId, WebhookEventType webhookEventType, CancellationToken cancellationToken)
{
var delivery = NewDelivery(webhookId);
var webhook = (await StoreRepository.GetWebhooks(storeId)).FirstOrDefault(w => w.Id == webhookId);
@ -134,8 +135,7 @@ namespace BTCPayServer.HostedServices
delivery,
webhook.GetBlob()
);
return await SendDelivery(deliveryRequest);
return await SendDelivery(deliveryRequest, cancellationToken);
}
protected override async Task ProcessEvent(object evt, CancellationToken cancellationToken)
@ -166,15 +166,7 @@ namespace BTCPayServer.HostedServices
private void EnqueueDelivery(WebhookDeliveryRequest context)
{
if (_InvoiceEventsByWebhookId.TryGetValue(context.WebhookId, out var channel))
{
if (channel.Writer.TryWrite(context))
return;
}
channel = Channel.CreateUnbounded<WebhookDeliveryRequest>();
_InvoiceEventsByWebhookId.Add(context.WebhookId, channel);
channel.Writer.TryWrite(context);
_ = Process(context.WebhookId, channel);
_processingQueue.Enqueue(context.WebhookId, (cancellationToken) => Process(context, cancellationToken));
}
private WebhookInvoiceEvent GetWebhookEvent(WebhookEventType webhookEventType)
@ -252,24 +244,21 @@ namespace BTCPayServer.HostedServices
}
}
private async Task Process(string id, Channel<WebhookDeliveryRequest> channel)
private async Task Process(WebhookDeliveryRequest ctx, CancellationToken cancellationToken)
{
await foreach (var originalCtx in channel.Reader.ReadAllAsync())
try
{
try
var wh = (await StoreRepository.GetWebhook(ctx.WebhookId))?.GetBlob();
if (wh is null || !ShouldDeliver(ctx.WebhookEvent.Type, wh))
return;
var result = await SendAndSaveDelivery(ctx, cancellationToken);
if (ctx.WebhookBlob.AutomaticRedelivery &&
!result.Success &&
result.DeliveryId is string)
{
var ctx = originalCtx;
var wh = (await StoreRepository.GetWebhook(ctx.WebhookId))?.GetBlob();
if (wh is null || !ShouldDeliver(ctx.WebhookEvent.Type, wh))
continue;
var result = await SendAndSaveDelivery(ctx);
if (ctx.WebhookBlob.AutomaticRedelivery &&
!result.Success &&
result.DeliveryId is string)
var originalDeliveryId = result.DeliveryId;
foreach (var wait in new[]
{
var originalDeliveryId = result.DeliveryId;
foreach (var wait in new[]
{
TimeSpan.FromSeconds(10),
TimeSpan.FromMinutes(1),
TimeSpan.FromMinutes(10),
@ -279,27 +268,25 @@ namespace BTCPayServer.HostedServices
TimeSpan.FromMinutes(10),
TimeSpan.FromMinutes(10),
})
{
await Task.Delay(wait, CancellationToken);
ctx = await CreateRedeliveryRequest(originalDeliveryId);
// This may have changed
if (ctx is null || !ctx.WebhookBlob.AutomaticRedelivery ||
!ShouldDeliver(ctx.WebhookEvent.Type, ctx.WebhookBlob))
break;
result = await SendAndSaveDelivery(ctx);
if (result.Success)
break;
}
{
await Task.Delay(wait, cancellationToken);
ctx = (await CreateRedeliveryRequest(originalDeliveryId))!;
// This may have changed
if (ctx is null || !ctx.WebhookBlob.AutomaticRedelivery ||
!ShouldDeliver(ctx.WebhookEvent.Type, ctx.WebhookBlob))
return;
result = await SendAndSaveDelivery(ctx, cancellationToken);
if (result.Success)
return;
}
}
catch when (CancellationToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
Logs.PayServer.LogError(ex, "Unexpected error when processing a webhook");
}
}
catch when (cancellationToken.IsCancellationRequested)
{
}
catch (Exception ex)
{
Logs.PayServer.LogError(ex, "Unexpected error when processing a webhook");
}
}
@ -315,7 +302,7 @@ namespace BTCPayServer.HostedServices
public string? ErrorMessage { get; set; }
}
private async Task<DeliveryResult> SendDelivery(WebhookDeliveryRequest ctx)
private async Task<DeliveryResult> SendDelivery(WebhookDeliveryRequest ctx, CancellationToken cancellationToken)
{
var uri = new Uri(ctx.WebhookBlob.Url, UriKind.Absolute);
var httpClient = GetClient(uri);
@ -333,7 +320,7 @@ namespace BTCPayServer.HostedServices
deliveryBlob.Request = bytes;
try
{
using var response = await httpClient.SendAsync(request, CancellationToken);
using var response = await httpClient.SendAsync(request, cancellationToken);
if (!response.IsSuccessStatusCode)
{
deliveryBlob.Status = WebhookDeliveryStatus.HttpError;
@ -361,9 +348,9 @@ namespace BTCPayServer.HostedServices
}
private async Task<DeliveryResult> SendAndSaveDelivery(WebhookDeliveryRequest ctx)
private async Task<DeliveryResult> SendAndSaveDelivery(WebhookDeliveryRequest ctx, CancellationToken cancellationToken)
{
var result = await SendDelivery(ctx);
var result = await SendDelivery(ctx, cancellationToken);
await StoreRepository.AddWebhookDelivery(ctx.Delivery);
return result;
@ -385,5 +372,12 @@ namespace BTCPayServer.HostedServices
WebhookId = webhookId
};
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
var stopping = _processingQueue.Abort(cancellationToken);
await base.StopAsync(cancellationToken);
await stopping;
}
}
}

View file

@ -307,9 +307,9 @@ namespace BTCPayServer.Hosting
services.AddSingleton<HostedServices.CheckConfigurationHostedService>();
services.AddSingleton<IHostedService, HostedServices.CheckConfigurationHostedService>(o => o.GetRequiredService<CheckConfigurationHostedService>());
services.AddSingleton<HostedServices.WebhookNotificationManager>();
services.AddSingleton<IHostedService, WebhookNotificationManager>(o => o.GetRequiredService<WebhookNotificationManager>());
services.AddHttpClient(WebhookNotificationManager.OnionNamedClient)
services.AddSingleton<HostedServices.WebhookSender>();
services.AddSingleton<IHostedService, WebhookSender>(o => o.GetRequiredService<WebhookSender>());
services.AddHttpClient(WebhookSender.OnionNamedClient)
.ConfigurePrimaryHttpMessageHandler<Socks5HttpClientHandler>();
@ -342,7 +342,7 @@ namespace BTCPayServer.Hosting
services.AddSingleton<IHostedService, NBXplorerWaiters>();
services.AddSingleton<IHostedService, InvoiceEventSaverService>();
services.AddSingleton<IHostedService, InvoiceNotificationManager>();
services.AddSingleton<IHostedService, BitpayIPNSender>();
services.AddSingleton<IHostedService, InvoiceWatcher>();
services.AddSingleton<IHostedService, RatesHostedService>();
services.AddSingleton<IHostedService, BackgroundJobSchedulerHostedService>();