From 010c6539959ed8a3c844eb2c226377de4544698d Mon Sep 17 00:00:00 2001 From: "nicolas.dorier" Date: Wed, 20 Feb 2019 12:27:10 +0900 Subject: [PATCH] Create EventHostedServiceBase and make AppHubStreamer use this --- .../HostedServices/EventHostedServiceBase.cs | 85 +++++++++++++++++ BTCPayServer/Services/Apps/AppHubStreamer.cs | 95 +++++-------------- 2 files changed, 111 insertions(+), 69 deletions(-) create mode 100644 BTCPayServer/HostedServices/EventHostedServiceBase.cs diff --git a/BTCPayServer/HostedServices/EventHostedServiceBase.cs b/BTCPayServer/HostedServices/EventHostedServiceBase.cs new file mode 100644 index 000000000..a74b1e3a0 --- /dev/null +++ b/BTCPayServer/HostedServices/EventHostedServiceBase.cs @@ -0,0 +1,85 @@ +using System; +using System.Collections.Generic; +using Microsoft.Extensions.Logging; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using BTCPayServer.Logging; +using Microsoft.Extensions.Hosting; + +namespace BTCPayServer.HostedServices +{ + public class EventHostedServiceBase : IHostedService + { + private readonly EventAggregator _EventAggregator; + + private List _Subscriptions; + private CancellationTokenSource _Cts; + + public EventHostedServiceBase(EventAggregator eventAggregator) + { + _EventAggregator = eventAggregator; + } + + Channel _Events = Channel.CreateUnbounded(); + public async Task ProcessEvents(CancellationToken cancellationToken) + { + while (await _Events.Reader.WaitToReadAsync(cancellationToken)) + { + if (_Events.Reader.TryRead(out var evt)) + { + try + { + await ProcessEvent(evt, cancellationToken); + } + catch when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch (Exception ex) + { + Logs.PayServer.LogWarning(ex, $"Unhandled exception in {this.GetType().Name}"); + } + } + } + } + + protected virtual Task ProcessEvent(object evt, CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + + protected virtual void SubscibeToEvents() + { + + } + + protected void Subscribe() + { + _Subscriptions.Add(_EventAggregator.Subscribe(e => _Events.Writer.TryWrite(e))); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + _Subscriptions = new List(); + SubscibeToEvents(); + _Cts = new CancellationTokenSource(); + _ProcessingEvents = ProcessEvents(_Cts.Token); + return Task.CompletedTask; + } + Task _ProcessingEvents = Task.CompletedTask; + + public async Task StopAsync(CancellationToken cancellationToken) + { + _Subscriptions?.ForEach(subscription => subscription.Dispose()); + _Cts?.Cancel(); + try + { + await _ProcessingEvents; + } + catch (OperationCanceledException) + { } + } + } +} diff --git a/BTCPayServer/Services/Apps/AppHubStreamer.cs b/BTCPayServer/Services/Apps/AppHubStreamer.cs index 01b9c1222..ce3c7186d 100644 --- a/BTCPayServer/Services/Apps/AppHubStreamer.cs +++ b/BTCPayServer/Services/Apps/AppHubStreamer.cs @@ -8,6 +8,7 @@ using System.Threading.Tasks; using BTCPayServer.Controllers; using BTCPayServer.Data; using BTCPayServer.Events; +using BTCPayServer.HostedServices; using BTCPayServer.Logging; using BTCPayServer.Models.AppViewModels; using BTCPayServer.Payments; @@ -23,68 +24,49 @@ using NBitcoin; namespace BTCPayServer.Services.Apps { - public class AppHubStreamer : IHostedService + public class AppHubStreamer : EventHostedServiceBase { - private readonly EventAggregator _EventAggregator; - private readonly IHubContext _HubContext; private readonly AppService _appService; - private List _Subscriptions; - private CancellationTokenSource _Cts; + private IHubContext _HubContext; public AppHubStreamer(EventAggregator eventAggregator, - IHubContext hubContext, - AppService appService) + IHubContext hubContext, + AppService appService) : base(eventAggregator) { - _EventAggregator = eventAggregator; - _HubContext = hubContext; _appService = appService; + _HubContext = hubContext; } - private async Task NotifyClients(string appId, InvoiceEvent invoiceEvent, CancellationToken cancellationToken) + protected override void SubscibeToEvents() { - if (invoiceEvent.Name == InvoiceEvent.ReceivedPayment) + Subscribe(); + Subscribe(); + } + + protected override async Task ProcessEvent(object evt, CancellationToken cancellationToken) + { + if (evt is InvoiceEvent invoiceEvent) { - var data = invoiceEvent.Payment.GetCryptoPaymentData(); - await _HubContext.Clients.Group(appId).SendCoreAsync(AppHub.PaymentReceived, new object[] + foreach (var appId in AppService.GetAppInternalTags(invoiceEvent.Invoice.InternalTags)) + { + if (invoiceEvent.Name == InvoiceEvent.ReceivedPayment) { + var data = invoiceEvent.Payment.GetCryptoPaymentData(); + await _HubContext.Clients.Group(appId).SendCoreAsync(AppHub.PaymentReceived, new object[] + { data.GetValue(), invoiceEvent.Payment.GetCryptoCode(), Enum.GetName(typeof(PaymentTypes), invoiceEvent.Payment.GetPaymentMethodId().PaymentType) - }, cancellationToken); - } - await InfoUpdated(appId); - } - - Channel _Events = Channel.CreateUnbounded(); - public async Task ProcessEvents(CancellationToken cancellationToken) - { - while (await _Events.Reader.WaitToReadAsync(cancellationToken)) - { - if (_Events.Reader.TryRead(out var evt)) - { - try - { - if (evt is InvoiceEvent invoiceEvent) - { - foreach (var appId in AppService.GetAppInternalTags(invoiceEvent.Invoice.InternalTags)) - await NotifyClients(appId, invoiceEvent, cancellationToken); - } - else if (evt is AppsController.AppUpdated app) - { - await InfoUpdated(app.AppId); - } - } - catch when (cancellationToken.IsCancellationRequested) - { - throw; - } - catch (Exception ex) - { - Logs.PayServer.LogWarning(ex, "Unhandled exception in CrowdfundHubStream"); + }, cancellationToken); } + await InfoUpdated(appId); } } + else if (evt is AppsController.AppUpdated app) + { + await InfoUpdated(app.AppId); + } } private async Task InfoUpdated(string appId) @@ -92,30 +74,5 @@ namespace BTCPayServer.Services.Apps var info = await _appService.GetAppInfo(appId); await _HubContext.Clients.Group(appId).SendCoreAsync(AppHub.InfoUpdated, new object[] { info }); } - - public Task StartAsync(CancellationToken cancellationToken) - { - _Subscriptions = new List() - { - _EventAggregator.Subscribe(e => _Events.Writer.TryWrite(e)), - _EventAggregator.Subscribe(e => _Events.Writer.TryWrite(e)) - }; - _Cts = new CancellationTokenSource(); - _ProcessingEvents = ProcessEvents(_Cts.Token); - return Task.CompletedTask; - } - Task _ProcessingEvents = Task.CompletedTask; - - public async Task StopAsync(CancellationToken cancellationToken) - { - _Subscriptions?.ForEach(subscription => subscription.Dispose()); - _Cts?.Cancel(); - try - { - await _ProcessingEvents; - } - catch (OperationCanceledException) - { } - } } }