From c6df43363f572ce3839d52378780cbe87052a134 Mon Sep 17 00:00:00 2001 From: "nicolas.dorier" Date: Tue, 11 Jan 2022 11:48:03 +0900 Subject: [PATCH 1/4] Extract MultiProcessingQueue from WebhookNotificationManager --- BTCPayServer.Common/MultiProcessingQueue.cs | 116 ++++++++++++++++++ BTCPayServer.Tests/FastTests.cs | 62 ++++++++++ .../StoresController.Integrations.cs | 5 +- .../WebhookNotificationManager.cs | 94 +++++++------- 4 files changed, 225 insertions(+), 52 deletions(-) create mode 100644 BTCPayServer.Common/MultiProcessingQueue.cs diff --git a/BTCPayServer.Common/MultiProcessingQueue.cs b/BTCPayServer.Common/MultiProcessingQueue.cs new file mode 100644 index 000000000..9d3565d6f --- /dev/null +++ b/BTCPayServer.Common/MultiProcessingQueue.cs @@ -0,0 +1,116 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using ProcessingAction = System.Func; + +namespace BTCPayServer +{ + /// + /// This class make sure that enqueued actions sharing the same queue name + /// are executed sequentially. + /// This is useful to preserve order of events. + /// + public class MultiProcessingQueue + { + Dictionary _Queues = new Dictionary(); + class ProcessingQueue + { + internal Channel Chan = Channel.CreateUnbounded(); + internal Task ProcessTask; + public async Task Process(CancellationToken cancellationToken) + { + retry: + while (Chan.Reader.TryRead(out var item)) + { + await item(cancellationToken); + } + if (Chan.Writer.TryComplete()) + { + goto retry; + } + } + } + + public int QueueCount + { + get + { + lock (_Queues) + { + Cleanup(); + return _Queues.Count; + } + } + } + CancellationTokenSource cts = new CancellationTokenSource(); + bool stopped; + public void Enqueue(string queueName, ProcessingAction act) + { + lock (_Queues) + { + retry: + if (stopped) + return; + Cleanup(); + bool created = false; + if (!_Queues.TryGetValue(queueName, out var queue)) + { + queue = new ProcessingQueue(); + _Queues.Add(queueName, queue); + created = true; + } + if (!queue.Chan.Writer.TryWrite(act)) + goto retry; + if (created) + queue.ProcessTask = queue.Process(cts.Token); + } + } + + private void Cleanup() + { + var removeList = new List(); + foreach (var q in _Queues) + { + if (q.Value.Chan.Reader.Completion.IsCompletedSuccessfully) + { + removeList.Add(q.Key); + } + } + foreach (var q in removeList) + { + _Queues.Remove(q); + } + } + + public async Task Abort(CancellationToken cancellationToken) + { + stopped = true; + ProcessingQueue[] queues = null; + lock (_Queues) + { + queues = _Queues.Select(c => c.Value).ToArray(); + } + cts.Cancel(); + var delay = Task.Delay(-1, cancellationToken); + foreach (var q in queues) + { + try + { + await Task.WhenAny(q.ProcessTask, delay); + } + catch + { + } + } + cancellationToken.ThrowIfCancellationRequested(); + lock (_Queues) + { + Cleanup(); + } + } + } +} diff --git a/BTCPayServer.Tests/FastTests.cs b/BTCPayServer.Tests/FastTests.cs index db541466d..cbda65097 100644 --- a/BTCPayServer.Tests/FastTests.cs +++ b/BTCPayServer.Tests/FastTests.cs @@ -1028,6 +1028,68 @@ namespace BTCPayServer.Tests Assert.False(CurrencyValue.TryParse("1.501", out result)); } + [Fact] + public async Task MultiProcessingQueueTests() + { + MultiProcessingQueue q = new MultiProcessingQueue(); + var q10 = Enqueue(q, "q1"); + var q11 = Enqueue(q, "q1"); + var q20 = Enqueue(q, "q2"); + var q30 = Enqueue(q, "q3"); + q10.AssertStarted(); + q11.AssertStopped(); + q20.AssertStarted(); + q30.AssertStarted(); + Assert.Equal(3, q.QueueCount); + q10.Done(); + q10.AssertStopped(); + q11.AssertStarted(); + q20.AssertStarted(); + Assert.Equal(3, q.QueueCount); + q30.Done(); + q30.AssertStopped(); + TestUtils.Eventually(() => Assert.Equal(2, q.QueueCount), 1000); + await q.Abort(default); + q11.AssertAborted(); + q20.AssertAborted(); + Assert.Equal(0, q.QueueCount); + } + class MultiProcessingQueueTest + { + public bool Started; + public bool Aborted; + public TaskCompletionSource Tcs; + public void Done() { Tcs.TrySetResult(); } + + public void AssertStarted() + { + TestUtils.Eventually(() => Assert.True(Started), 1000); + } + public void AssertStopped() + { + TestUtils.Eventually(() => Assert.False(Started), 1000); + } + public void AssertAborted() + { + TestUtils.Eventually(() => Assert.True(Aborted), 1000); + } + } + private static MultiProcessingQueueTest Enqueue(MultiProcessingQueue q, string queueName) + { + MultiProcessingQueueTest t = new MultiProcessingQueueTest(); + t.Tcs = new TaskCompletionSource(); + q.Enqueue(queueName, async (cancellationToken) => { + t.Started = true; + try + { + await t.Tcs.Task.WaitAsync(cancellationToken); + } + catch { t.Aborted = true; } + t.Started = false; + }); + return t; + } + [Fact] public async Task CanScheduleBackgroundTasks() { diff --git a/BTCPayServer/Controllers/StoresController.Integrations.cs b/BTCPayServer/Controllers/StoresController.Integrations.cs index a775526b9..ed5d2cf19 100644 --- a/BTCPayServer/Controllers/StoresController.Integrations.cs +++ b/BTCPayServer/Controllers/StoresController.Integrations.cs @@ -1,5 +1,6 @@ #nullable enable using System.Linq; +using System.Threading; using System.Threading.Tasks; using BTCPayServer.Client.Models; using BTCPayServer.Data; @@ -132,9 +133,9 @@ namespace BTCPayServer.Controllers } [HttpPost("{storeId}/webhooks/{webhookId}/test")] - public async Task TestWebhook(string webhookId, TestWebhookViewModel viewModel) + public async Task TestWebhook(string webhookId, TestWebhookViewModel viewModel, CancellationToken cancellationToken) { - var result = await WebhookNotificationManager.TestWebhook(CurrentStore.Id, webhookId, viewModel.Type); + var result = await WebhookNotificationManager.TestWebhook(CurrentStore.Id, webhookId, viewModel.Type, cancellationToken); if (result.Success) { diff --git a/BTCPayServer/HostedServices/WebhookNotificationManager.cs b/BTCPayServer/HostedServices/WebhookNotificationManager.cs index f4d5a8b36..cb1287732 100644 --- a/BTCPayServer/HostedServices/WebhookNotificationManager.cs +++ b/BTCPayServer/HostedServices/WebhookNotificationManager.cs @@ -55,7 +55,8 @@ namespace BTCPayServer.HostedServices WebhookBlob = webhookBlob; } } - Dictionary> _InvoiceEventsByWebhookId = new Dictionary>(); + + MultiProcessingQueue _processingQueue = new MultiProcessingQueue(); public StoreRepository StoreRepository { get; } public IHttpClientFactory HttpClientFactory { get; } @@ -124,7 +125,7 @@ namespace BTCPayServer.HostedServices return webhookEvent; } - public async Task TestWebhook(string storeId, string webhookId, WebhookEventType webhookEventType) + public async Task TestWebhook(string storeId, string webhookId, WebhookEventType webhookEventType, CancellationToken cancellationToken) { var delivery = NewDelivery(webhookId); var webhook = (await StoreRepository.GetWebhooks(storeId)).FirstOrDefault(w => w.Id == webhookId); @@ -134,8 +135,7 @@ namespace BTCPayServer.HostedServices delivery, webhook.GetBlob() ); - - return await SendDelivery(deliveryRequest); + return await SendDelivery(deliveryRequest, cancellationToken); } protected override async Task ProcessEvent(object evt, CancellationToken cancellationToken) @@ -166,15 +166,7 @@ namespace BTCPayServer.HostedServices private void EnqueueDelivery(WebhookDeliveryRequest context) { - if (_InvoiceEventsByWebhookId.TryGetValue(context.WebhookId, out var channel)) - { - if (channel.Writer.TryWrite(context)) - return; - } - channel = Channel.CreateUnbounded(); - _InvoiceEventsByWebhookId.Add(context.WebhookId, channel); - channel.Writer.TryWrite(context); - _ = Process(context.WebhookId, channel); + _processingQueue.Enqueue(context.WebhookId, (cancellationToken) => Process(context, cancellationToken)); } private WebhookInvoiceEvent GetWebhookEvent(WebhookEventType webhookEventType) @@ -252,24 +244,21 @@ namespace BTCPayServer.HostedServices } } - private async Task Process(string id, Channel channel) + private async Task Process(WebhookDeliveryRequest ctx, CancellationToken cancellationToken) { - await foreach (var originalCtx in channel.Reader.ReadAllAsync()) + try { - try + var wh = (await StoreRepository.GetWebhook(ctx.WebhookId))?.GetBlob(); + if (wh is null || !ShouldDeliver(ctx.WebhookEvent.Type, wh)) + return; + var result = await SendAndSaveDelivery(ctx, cancellationToken); + if (ctx.WebhookBlob.AutomaticRedelivery && + !result.Success && + result.DeliveryId is string) { - var ctx = originalCtx; - var wh = (await StoreRepository.GetWebhook(ctx.WebhookId))?.GetBlob(); - if (wh is null || !ShouldDeliver(ctx.WebhookEvent.Type, wh)) - continue; - var result = await SendAndSaveDelivery(ctx); - if (ctx.WebhookBlob.AutomaticRedelivery && - !result.Success && - result.DeliveryId is string) + var originalDeliveryId = result.DeliveryId; + foreach (var wait in new[] { - var originalDeliveryId = result.DeliveryId; - foreach (var wait in new[] - { TimeSpan.FromSeconds(10), TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(10), @@ -279,27 +268,25 @@ namespace BTCPayServer.HostedServices TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(10), }) - { - await Task.Delay(wait, CancellationToken); - ctx = await CreateRedeliveryRequest(originalDeliveryId); - // This may have changed - if (ctx is null || !ctx.WebhookBlob.AutomaticRedelivery || - !ShouldDeliver(ctx.WebhookEvent.Type, ctx.WebhookBlob)) - break; - result = await SendAndSaveDelivery(ctx); - if (result.Success) - break; - } + { + await Task.Delay(wait, cancellationToken); + ctx = (await CreateRedeliveryRequest(originalDeliveryId))!; + // This may have changed + if (ctx is null || !ctx.WebhookBlob.AutomaticRedelivery || + !ShouldDeliver(ctx.WebhookEvent.Type, ctx.WebhookBlob)) + return; + result = await SendAndSaveDelivery(ctx, cancellationToken); + if (result.Success) + return; } } - catch when (CancellationToken.IsCancellationRequested) - { - break; - } - catch (Exception ex) - { - Logs.PayServer.LogError(ex, "Unexpected error when processing a webhook"); - } + } + catch when (cancellationToken.IsCancellationRequested) + { + } + catch (Exception ex) + { + Logs.PayServer.LogError(ex, "Unexpected error when processing a webhook"); } } @@ -315,7 +302,7 @@ namespace BTCPayServer.HostedServices public string? ErrorMessage { get; set; } } - private async Task SendDelivery(WebhookDeliveryRequest ctx) + private async Task SendDelivery(WebhookDeliveryRequest ctx, CancellationToken cancellationToken) { var uri = new Uri(ctx.WebhookBlob.Url, UriKind.Absolute); var httpClient = GetClient(uri); @@ -333,7 +320,7 @@ namespace BTCPayServer.HostedServices deliveryBlob.Request = bytes; try { - using var response = await httpClient.SendAsync(request, CancellationToken); + using var response = await httpClient.SendAsync(request, cancellationToken); if (!response.IsSuccessStatusCode) { deliveryBlob.Status = WebhookDeliveryStatus.HttpError; @@ -361,9 +348,9 @@ namespace BTCPayServer.HostedServices } - private async Task SendAndSaveDelivery(WebhookDeliveryRequest ctx) + private async Task SendAndSaveDelivery(WebhookDeliveryRequest ctx, CancellationToken cancellationToken) { - var result = await SendDelivery(ctx); + var result = await SendDelivery(ctx, cancellationToken); await StoreRepository.AddWebhookDelivery(ctx.Delivery); return result; @@ -385,5 +372,12 @@ namespace BTCPayServer.HostedServices WebhookId = webhookId }; } + + public override async Task StopAsync(CancellationToken cancellationToken) + { + var stopping = _processingQueue.Abort(cancellationToken); + await base.StopAsync(cancellationToken); + await stopping; + } } } From cb295e20d4dfb341f41938c2b87a623f1249d80b Mon Sep 17 00:00:00 2001 From: "nicolas.dorier" Date: Tue, 11 Jan 2022 13:14:10 +0900 Subject: [PATCH 2/4] Rename InvoiceNotificationManager -> BitpayIPNSender --- .../{InvoiceNotificationManager.cs => BitpayIPNSender.cs} | 4 ++-- BTCPayServer/Hosting/BTCPayServerServices.cs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename BTCPayServer/HostedServices/{InvoiceNotificationManager.cs => BitpayIPNSender.cs} (99%) diff --git a/BTCPayServer/HostedServices/InvoiceNotificationManager.cs b/BTCPayServer/HostedServices/BitpayIPNSender.cs similarity index 99% rename from BTCPayServer/HostedServices/InvoiceNotificationManager.cs rename to BTCPayServer/HostedServices/BitpayIPNSender.cs index ca1e2ba08..67b232af6 100644 --- a/BTCPayServer/HostedServices/InvoiceNotificationManager.cs +++ b/BTCPayServer/HostedServices/BitpayIPNSender.cs @@ -22,7 +22,7 @@ using Newtonsoft.Json.Linq; namespace BTCPayServer.HostedServices { - public class InvoiceNotificationManager : IHostedService + public class BitpayIPNSender : IHostedService { readonly HttpClient _Client; @@ -45,7 +45,7 @@ namespace BTCPayServer.HostedServices private readonly EmailSenderFactory _EmailSenderFactory; private readonly StoreRepository _StoreRepository; - public InvoiceNotificationManager( + public BitpayIPNSender( IHttpClientFactory httpClientFactory, IBackgroundJobClient jobClient, EventAggregator eventAggregator, diff --git a/BTCPayServer/Hosting/BTCPayServerServices.cs b/BTCPayServer/Hosting/BTCPayServerServices.cs index d63330704..0d98dec3b 100644 --- a/BTCPayServer/Hosting/BTCPayServerServices.cs +++ b/BTCPayServer/Hosting/BTCPayServerServices.cs @@ -342,7 +342,7 @@ namespace BTCPayServer.Hosting services.AddSingleton(); services.AddSingleton(); - services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); From 6999abe1ca7c6362e0f24ddcc738ae84d6f7c234 Mon Sep 17 00:00:00 2001 From: "nicolas.dorier" Date: Tue, 11 Jan 2022 13:15:48 +0900 Subject: [PATCH 3/4] Rename WebhookNotificationManager -> WebhookSender --- .../Controllers/GreenField/StoreWebhooksController.cs | 4 ++-- BTCPayServer/Controllers/InvoiceController.cs | 4 ++-- BTCPayServer/Controllers/StoresController.cs | 4 ++-- BTCPayServer/Data/WebhookDataExtensions.cs | 6 +++--- .../{WebhookNotificationManager.cs => WebhookSender.cs} | 6 +++--- BTCPayServer/Hosting/BTCPayServerServices.cs | 6 +++--- 6 files changed, 15 insertions(+), 15 deletions(-) rename BTCPayServer/HostedServices/{WebhookNotificationManager.cs => WebhookSender.cs} (98%) diff --git a/BTCPayServer/Controllers/GreenField/StoreWebhooksController.cs b/BTCPayServer/Controllers/GreenField/StoreWebhooksController.cs index 00ac5c257..9f0409cec 100644 --- a/BTCPayServer/Controllers/GreenField/StoreWebhooksController.cs +++ b/BTCPayServer/Controllers/GreenField/StoreWebhooksController.cs @@ -26,14 +26,14 @@ namespace BTCPayServer.Controllers.GreenField [EnableCors(CorsPolicies.All)] public class StoreWebhooksController : ControllerBase { - public StoreWebhooksController(StoreRepository storeRepository, WebhookNotificationManager webhookNotificationManager) + public StoreWebhooksController(StoreRepository storeRepository, WebhookSender webhookNotificationManager) { StoreRepository = storeRepository; WebhookNotificationManager = webhookNotificationManager; } public StoreRepository StoreRepository { get; } - public WebhookNotificationManager WebhookNotificationManager { get; } + public WebhookSender WebhookNotificationManager { get; } [HttpGet("~/api/v1/stores/{storeId}/webhooks/{webhookId?}")] public async Task ListWebhooks(string storeId, string webhookId) diff --git a/BTCPayServer/Controllers/InvoiceController.cs b/BTCPayServer/Controllers/InvoiceController.cs index 415cc4ff0..e9bfd5d58 100644 --- a/BTCPayServer/Controllers/InvoiceController.cs +++ b/BTCPayServer/Controllers/InvoiceController.cs @@ -43,7 +43,7 @@ namespace BTCPayServer.Controllers private readonly PullPaymentHostedService _paymentHostedService; private readonly LanguageService _languageService; - public WebhookNotificationManager WebhookNotificationManager { get; } + public WebhookSender WebhookNotificationManager { get; } public InvoiceController( InvoiceRepository invoiceRepository, @@ -57,7 +57,7 @@ namespace BTCPayServer.Controllers PaymentMethodHandlerDictionary paymentMethodHandlerDictionary, ApplicationDbContextFactory dbContextFactory, PullPaymentHostedService paymentHostedService, - WebhookNotificationManager webhookNotificationManager, + WebhookSender webhookNotificationManager, LanguageService languageService) { _CurrencyNameTable = currencyNameTable ?? throw new ArgumentNullException(nameof(currencyNameTable)); diff --git a/BTCPayServer/Controllers/StoresController.cs b/BTCPayServer/Controllers/StoresController.cs index 27895ce25..294ed5850 100644 --- a/BTCPayServer/Controllers/StoresController.cs +++ b/BTCPayServer/Controllers/StoresController.cs @@ -58,7 +58,7 @@ namespace BTCPayServer.Controllers IAuthorizationService authorizationService, EventAggregator eventAggregator, AppService appService, - WebhookNotificationManager webhookNotificationManager, + WebhookSender webhookNotificationManager, IDataProtectionProvider dataProtector, NBXplorerDashboard Dashboard) { @@ -818,7 +818,7 @@ namespace BTCPayServer.Controllers } public string GeneratedPairingCode { get; set; } - public WebhookNotificationManager WebhookNotificationManager { get; } + public WebhookSender WebhookNotificationManager { get; } public IDataProtector DataProtector { get; } [HttpGet] diff --git a/BTCPayServer/Data/WebhookDataExtensions.cs b/BTCPayServer/Data/WebhookDataExtensions.cs index ffc03c3b1..f5010faf6 100644 --- a/BTCPayServer/Data/WebhookDataExtensions.cs +++ b/BTCPayServer/Data/WebhookDataExtensions.cs @@ -33,7 +33,7 @@ namespace BTCPayServer.Data public byte[] Request { get; set; } public T ReadRequestAs() { - return JsonConvert.DeserializeObject(UTF8Encoding.UTF8.GetString(Request), HostedServices.WebhookNotificationManager.DefaultSerializerSettings); + return JsonConvert.DeserializeObject(UTF8Encoding.UTF8.GetString(Request), HostedServices.WebhookSender.DefaultSerializerSettings); } } public class WebhookBlob @@ -56,11 +56,11 @@ namespace BTCPayServer.Data } public static WebhookDeliveryBlob GetBlob(this WebhookDeliveryData webhook) { - return JsonConvert.DeserializeObject(ZipUtils.Unzip(webhook.Blob), HostedServices.WebhookNotificationManager.DefaultSerializerSettings); + return JsonConvert.DeserializeObject(ZipUtils.Unzip(webhook.Blob), HostedServices.WebhookSender.DefaultSerializerSettings); } public static void SetBlob(this WebhookDeliveryData webhook, WebhookDeliveryBlob blob) { - webhook.Blob = ZipUtils.Zip(JsonConvert.SerializeObject(blob, Formatting.None, HostedServices.WebhookNotificationManager.DefaultSerializerSettings)); + webhook.Blob = ZipUtils.Zip(JsonConvert.SerializeObject(blob, Formatting.None, HostedServices.WebhookSender.DefaultSerializerSettings)); } } } diff --git a/BTCPayServer/HostedServices/WebhookNotificationManager.cs b/BTCPayServer/HostedServices/WebhookSender.cs similarity index 98% rename from BTCPayServer/HostedServices/WebhookNotificationManager.cs rename to BTCPayServer/HostedServices/WebhookSender.cs index cb1287732..446ee521e 100644 --- a/BTCPayServer/HostedServices/WebhookNotificationManager.cs +++ b/BTCPayServer/HostedServices/WebhookSender.cs @@ -27,11 +27,11 @@ namespace BTCPayServer.HostedServices /// This class send webhook notifications /// It also make sure the events sent to a webhook are sent in order to the webhook /// - public class WebhookNotificationManager : EventHostedServiceBase + public class WebhookSender : EventHostedServiceBase { readonly Encoding UTF8 = new UTF8Encoding(false); public readonly static JsonSerializerSettings DefaultSerializerSettings; - static WebhookNotificationManager() + static WebhookSender() { DefaultSerializerSettings = WebhookEvent.DefaultSerializerSettings; } @@ -60,7 +60,7 @@ namespace BTCPayServer.HostedServices public StoreRepository StoreRepository { get; } public IHttpClientFactory HttpClientFactory { get; } - public WebhookNotificationManager(EventAggregator eventAggregator, + public WebhookSender(EventAggregator eventAggregator, StoreRepository storeRepository, IHttpClientFactory httpClientFactory, Logs logs) : base(eventAggregator, logs) diff --git a/BTCPayServer/Hosting/BTCPayServerServices.cs b/BTCPayServer/Hosting/BTCPayServerServices.cs index 0d98dec3b..65a4de617 100644 --- a/BTCPayServer/Hosting/BTCPayServerServices.cs +++ b/BTCPayServer/Hosting/BTCPayServerServices.cs @@ -307,9 +307,9 @@ namespace BTCPayServer.Hosting services.AddSingleton(); services.AddSingleton(o => o.GetRequiredService()); - services.AddSingleton(); - services.AddSingleton(o => o.GetRequiredService()); - services.AddHttpClient(WebhookNotificationManager.OnionNamedClient) + services.AddSingleton(); + services.AddSingleton(o => o.GetRequiredService()); + services.AddHttpClient(WebhookSender.OnionNamedClient) .ConfigurePrimaryHttpMessageHandler(); From ba101015f620a8d5b498fb6b60f4bfcf78c6e82d Mon Sep 17 00:00:00 2001 From: "nicolas.dorier" Date: Tue, 11 Jan 2022 14:48:45 +0900 Subject: [PATCH 4/4] Use the MultiProcessingQueue in IPNSender --- .../HostedServices/BitpayIPNSender.cs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/BTCPayServer/HostedServices/BitpayIPNSender.cs b/BTCPayServer/HostedServices/BitpayIPNSender.cs index 67b232af6..38e8f3c6b 100644 --- a/BTCPayServer/HostedServices/BitpayIPNSender.cs +++ b/BTCPayServer/HostedServices/BitpayIPNSender.cs @@ -39,6 +39,7 @@ namespace BTCPayServer.HostedServices } } + MultiProcessingQueue _Queue = new MultiProcessingQueue(); readonly IBackgroundJobClient _JobClient; readonly EventAggregator _EventAggregator; readonly InvoiceRepository _InvoiceRepository; @@ -140,14 +141,12 @@ namespace BTCPayServer.HostedServices if (invoice.NotificationURL != null) { - var invoiceStr = NBitcoin.JsonConverters.Serializer.ToString(new ScheduledJob() { TryCount = 0, Notification = notification }); - _JobClient.Schedule((cancellation) => NotifyHttp(invoiceStr, cancellation), TimeSpan.Zero); + _Queue.Enqueue(invoice.Id, (cancellationToken) => NotifyHttp(new ScheduledJob() { TryCount = 0, Notification = notification }, cancellationToken)); } } - public async Task NotifyHttp(string invoiceData, CancellationToken cancellationToken) + public async Task NotifyHttp(ScheduledJob job, CancellationToken cancellationToken) { - var job = NBitcoin.JsonConverters.Serializer.ToObject(invoiceData); bool reschedule = false; var aggregatorEvent = new InvoiceIPNEvent(job.Notification.Data.Id, job.Notification.Event.Code, job.Notification.Event.Name); try @@ -159,9 +158,7 @@ namespace BTCPayServer.HostedServices } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { - // When the JobClient will be persistent, this will reschedule the job for after reboot - invoiceData = NBitcoin.JsonConverters.Serializer.ToString(job); - _JobClient.Schedule((cancellation) => NotifyHttp(invoiceData, cancellation), TimeSpan.FromMinutes(10.0)); + _JobClient.Schedule((cancellation) => NotifyHttp(job, cancellation), TimeSpan.FromMinutes(10.0)); return; } catch (OperationCanceledException) @@ -190,8 +187,7 @@ namespace BTCPayServer.HostedServices if (job.TryCount < MaxTry && reschedule) { - invoiceData = NBitcoin.JsonConverters.Serializer.ToString(job); - _JobClient.Schedule((cancellation) => NotifyHttp(invoiceData, cancellation), TimeSpan.FromMinutes(10.0)); + _JobClient.Schedule((cancellation) => NotifyHttp(job, cancellation), TimeSpan.FromMinutes(10.0)); } } @@ -307,6 +303,7 @@ namespace BTCPayServer.HostedServices readonly int MaxTry = 6; readonly CompositeDisposable leases = new CompositeDisposable(); + public Task StartAsync(CancellationToken cancellationToken) { leases.Add(_EventAggregator.SubscribeAsync(async e => @@ -347,10 +344,10 @@ namespace BTCPayServer.HostedServices return Task.CompletedTask; } - public Task StopAsync(CancellationToken cancellationToken) + public async Task StopAsync(CancellationToken cancellationToken) { leases.Dispose(); - return Task.CompletedTask; + await _Queue.Abort(cancellationToken); } } }