Extract MultiProcessingQueue from WebhookNotificationManager

This commit is contained in:
nicolas.dorier 2022-01-11 11:48:03 +09:00
parent 81cec36b68
commit c6df43363f
No known key found for this signature in database
GPG key ID: 6618763EF09186FE
4 changed files with 225 additions and 52 deletions

View file

@ -0,0 +1,116 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using ProcessingAction = System.Func<System.Threading.CancellationToken, System.Threading.Tasks.Task>;
namespace BTCPayServer
{
/// <summary>
/// This class make sure that enqueued actions sharing the same queue name
/// are executed sequentially.
/// This is useful to preserve order of events.
/// </summary>
public class MultiProcessingQueue
{
Dictionary<string, ProcessingQueue> _Queues = new Dictionary<string, ProcessingQueue>();
class ProcessingQueue
{
internal Channel<ProcessingAction> Chan = Channel.CreateUnbounded<ProcessingAction>();
internal Task ProcessTask;
public async Task Process(CancellationToken cancellationToken)
{
retry:
while (Chan.Reader.TryRead(out var item))
{
await item(cancellationToken);
}
if (Chan.Writer.TryComplete())
{
goto retry;
}
}
}
public int QueueCount
{
get
{
lock (_Queues)
{
Cleanup();
return _Queues.Count;
}
}
}
CancellationTokenSource cts = new CancellationTokenSource();
bool stopped;
public void Enqueue(string queueName, ProcessingAction act)
{
lock (_Queues)
{
retry:
if (stopped)
return;
Cleanup();
bool created = false;
if (!_Queues.TryGetValue(queueName, out var queue))
{
queue = new ProcessingQueue();
_Queues.Add(queueName, queue);
created = true;
}
if (!queue.Chan.Writer.TryWrite(act))
goto retry;
if (created)
queue.ProcessTask = queue.Process(cts.Token);
}
}
private void Cleanup()
{
var removeList = new List<string>();
foreach (var q in _Queues)
{
if (q.Value.Chan.Reader.Completion.IsCompletedSuccessfully)
{
removeList.Add(q.Key);
}
}
foreach (var q in removeList)
{
_Queues.Remove(q);
}
}
public async Task Abort(CancellationToken cancellationToken)
{
stopped = true;
ProcessingQueue[] queues = null;
lock (_Queues)
{
queues = _Queues.Select(c => c.Value).ToArray();
}
cts.Cancel();
var delay = Task.Delay(-1, cancellationToken);
foreach (var q in queues)
{
try
{
await Task.WhenAny(q.ProcessTask, delay);
}
catch
{
}
}
cancellationToken.ThrowIfCancellationRequested();
lock (_Queues)
{
Cleanup();
}
}
}
}

View file

@ -1028,6 +1028,68 @@ namespace BTCPayServer.Tests
Assert.False(CurrencyValue.TryParse("1.501", out result));
}
[Fact]
public async Task MultiProcessingQueueTests()
{
MultiProcessingQueue q = new MultiProcessingQueue();
var q10 = Enqueue(q, "q1");
var q11 = Enqueue(q, "q1");
var q20 = Enqueue(q, "q2");
var q30 = Enqueue(q, "q3");
q10.AssertStarted();
q11.AssertStopped();
q20.AssertStarted();
q30.AssertStarted();
Assert.Equal(3, q.QueueCount);
q10.Done();
q10.AssertStopped();
q11.AssertStarted();
q20.AssertStarted();
Assert.Equal(3, q.QueueCount);
q30.Done();
q30.AssertStopped();
TestUtils.Eventually(() => Assert.Equal(2, q.QueueCount), 1000);
await q.Abort(default);
q11.AssertAborted();
q20.AssertAborted();
Assert.Equal(0, q.QueueCount);
}
class MultiProcessingQueueTest
{
public bool Started;
public bool Aborted;
public TaskCompletionSource Tcs;
public void Done() { Tcs.TrySetResult(); }
public void AssertStarted()
{
TestUtils.Eventually(() => Assert.True(Started), 1000);
}
public void AssertStopped()
{
TestUtils.Eventually(() => Assert.False(Started), 1000);
}
public void AssertAborted()
{
TestUtils.Eventually(() => Assert.True(Aborted), 1000);
}
}
private static MultiProcessingQueueTest Enqueue(MultiProcessingQueue q, string queueName)
{
MultiProcessingQueueTest t = new MultiProcessingQueueTest();
t.Tcs = new TaskCompletionSource();
q.Enqueue(queueName, async (cancellationToken) => {
t.Started = true;
try
{
await t.Tcs.Task.WaitAsync(cancellationToken);
}
catch { t.Aborted = true; }
t.Started = false;
});
return t;
}
[Fact]
public async Task CanScheduleBackgroundTasks()
{

View file

@ -1,5 +1,6 @@
#nullable enable
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BTCPayServer.Client.Models;
using BTCPayServer.Data;
@ -132,9 +133,9 @@ namespace BTCPayServer.Controllers
}
[HttpPost("{storeId}/webhooks/{webhookId}/test")]
public async Task<IActionResult> TestWebhook(string webhookId, TestWebhookViewModel viewModel)
public async Task<IActionResult> TestWebhook(string webhookId, TestWebhookViewModel viewModel, CancellationToken cancellationToken)
{
var result = await WebhookNotificationManager.TestWebhook(CurrentStore.Id, webhookId, viewModel.Type);
var result = await WebhookNotificationManager.TestWebhook(CurrentStore.Id, webhookId, viewModel.Type, cancellationToken);
if (result.Success)
{

View file

@ -55,7 +55,8 @@ namespace BTCPayServer.HostedServices
WebhookBlob = webhookBlob;
}
}
Dictionary<string, Channel<WebhookDeliveryRequest>> _InvoiceEventsByWebhookId = new Dictionary<string, Channel<WebhookDeliveryRequest>>();
MultiProcessingQueue _processingQueue = new MultiProcessingQueue();
public StoreRepository StoreRepository { get; }
public IHttpClientFactory HttpClientFactory { get; }
@ -124,7 +125,7 @@ namespace BTCPayServer.HostedServices
return webhookEvent;
}
public async Task<DeliveryResult> TestWebhook(string storeId, string webhookId, WebhookEventType webhookEventType)
public async Task<DeliveryResult> TestWebhook(string storeId, string webhookId, WebhookEventType webhookEventType, CancellationToken cancellationToken)
{
var delivery = NewDelivery(webhookId);
var webhook = (await StoreRepository.GetWebhooks(storeId)).FirstOrDefault(w => w.Id == webhookId);
@ -134,8 +135,7 @@ namespace BTCPayServer.HostedServices
delivery,
webhook.GetBlob()
);
return await SendDelivery(deliveryRequest);
return await SendDelivery(deliveryRequest, cancellationToken);
}
protected override async Task ProcessEvent(object evt, CancellationToken cancellationToken)
@ -166,15 +166,7 @@ namespace BTCPayServer.HostedServices
private void EnqueueDelivery(WebhookDeliveryRequest context)
{
if (_InvoiceEventsByWebhookId.TryGetValue(context.WebhookId, out var channel))
{
if (channel.Writer.TryWrite(context))
return;
}
channel = Channel.CreateUnbounded<WebhookDeliveryRequest>();
_InvoiceEventsByWebhookId.Add(context.WebhookId, channel);
channel.Writer.TryWrite(context);
_ = Process(context.WebhookId, channel);
_processingQueue.Enqueue(context.WebhookId, (cancellationToken) => Process(context, cancellationToken));
}
private WebhookInvoiceEvent GetWebhookEvent(WebhookEventType webhookEventType)
@ -252,24 +244,21 @@ namespace BTCPayServer.HostedServices
}
}
private async Task Process(string id, Channel<WebhookDeliveryRequest> channel)
private async Task Process(WebhookDeliveryRequest ctx, CancellationToken cancellationToken)
{
await foreach (var originalCtx in channel.Reader.ReadAllAsync())
try
{
try
var wh = (await StoreRepository.GetWebhook(ctx.WebhookId))?.GetBlob();
if (wh is null || !ShouldDeliver(ctx.WebhookEvent.Type, wh))
return;
var result = await SendAndSaveDelivery(ctx, cancellationToken);
if (ctx.WebhookBlob.AutomaticRedelivery &&
!result.Success &&
result.DeliveryId is string)
{
var ctx = originalCtx;
var wh = (await StoreRepository.GetWebhook(ctx.WebhookId))?.GetBlob();
if (wh is null || !ShouldDeliver(ctx.WebhookEvent.Type, wh))
continue;
var result = await SendAndSaveDelivery(ctx);
if (ctx.WebhookBlob.AutomaticRedelivery &&
!result.Success &&
result.DeliveryId is string)
var originalDeliveryId = result.DeliveryId;
foreach (var wait in new[]
{
var originalDeliveryId = result.DeliveryId;
foreach (var wait in new[]
{
TimeSpan.FromSeconds(10),
TimeSpan.FromMinutes(1),
TimeSpan.FromMinutes(10),
@ -279,27 +268,25 @@ namespace BTCPayServer.HostedServices
TimeSpan.FromMinutes(10),
TimeSpan.FromMinutes(10),
})
{
await Task.Delay(wait, CancellationToken);
ctx = await CreateRedeliveryRequest(originalDeliveryId);
// This may have changed
if (ctx is null || !ctx.WebhookBlob.AutomaticRedelivery ||
!ShouldDeliver(ctx.WebhookEvent.Type, ctx.WebhookBlob))
break;
result = await SendAndSaveDelivery(ctx);
if (result.Success)
break;
}
{
await Task.Delay(wait, cancellationToken);
ctx = (await CreateRedeliveryRequest(originalDeliveryId))!;
// This may have changed
if (ctx is null || !ctx.WebhookBlob.AutomaticRedelivery ||
!ShouldDeliver(ctx.WebhookEvent.Type, ctx.WebhookBlob))
return;
result = await SendAndSaveDelivery(ctx, cancellationToken);
if (result.Success)
return;
}
}
catch when (CancellationToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
Logs.PayServer.LogError(ex, "Unexpected error when processing a webhook");
}
}
catch when (cancellationToken.IsCancellationRequested)
{
}
catch (Exception ex)
{
Logs.PayServer.LogError(ex, "Unexpected error when processing a webhook");
}
}
@ -315,7 +302,7 @@ namespace BTCPayServer.HostedServices
public string? ErrorMessage { get; set; }
}
private async Task<DeliveryResult> SendDelivery(WebhookDeliveryRequest ctx)
private async Task<DeliveryResult> SendDelivery(WebhookDeliveryRequest ctx, CancellationToken cancellationToken)
{
var uri = new Uri(ctx.WebhookBlob.Url, UriKind.Absolute);
var httpClient = GetClient(uri);
@ -333,7 +320,7 @@ namespace BTCPayServer.HostedServices
deliveryBlob.Request = bytes;
try
{
using var response = await httpClient.SendAsync(request, CancellationToken);
using var response = await httpClient.SendAsync(request, cancellationToken);
if (!response.IsSuccessStatusCode)
{
deliveryBlob.Status = WebhookDeliveryStatus.HttpError;
@ -361,9 +348,9 @@ namespace BTCPayServer.HostedServices
}
private async Task<DeliveryResult> SendAndSaveDelivery(WebhookDeliveryRequest ctx)
private async Task<DeliveryResult> SendAndSaveDelivery(WebhookDeliveryRequest ctx, CancellationToken cancellationToken)
{
var result = await SendDelivery(ctx);
var result = await SendDelivery(ctx, cancellationToken);
await StoreRepository.AddWebhookDelivery(ctx.Delivery);
return result;
@ -385,5 +372,12 @@ namespace BTCPayServer.HostedServices
WebhookId = webhookId
};
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
var stopping = _processingQueue.Abort(cancellationToken);
await base.StopAsync(cancellationToken);
await stopping;
}
}
}