Create EventHostedServiceBase and make AppHubStreamer use this

This commit is contained in:
nicolas.dorier 2019-02-20 12:27:10 +09:00
parent 119f82fd4e
commit 010c653995
2 changed files with 111 additions and 69 deletions

View file

@ -0,0 +1,85 @@
using System;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using BTCPayServer.Logging;
using Microsoft.Extensions.Hosting;
namespace BTCPayServer.HostedServices
{
public class EventHostedServiceBase : IHostedService
{
private readonly EventAggregator _EventAggregator;
private List<IEventAggregatorSubscription> _Subscriptions;
private CancellationTokenSource _Cts;
public EventHostedServiceBase(EventAggregator eventAggregator)
{
_EventAggregator = eventAggregator;
}
Channel<object> _Events = Channel.CreateUnbounded<object>();
public async Task ProcessEvents(CancellationToken cancellationToken)
{
while (await _Events.Reader.WaitToReadAsync(cancellationToken))
{
if (_Events.Reader.TryRead(out var evt))
{
try
{
await ProcessEvent(evt, cancellationToken);
}
catch when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
Logs.PayServer.LogWarning(ex, $"Unhandled exception in {this.GetType().Name}");
}
}
}
}
protected virtual Task ProcessEvent(object evt, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
protected virtual void SubscibeToEvents()
{
}
protected void Subscribe<T>()
{
_Subscriptions.Add(_EventAggregator.Subscribe<T>(e => _Events.Writer.TryWrite(e)));
}
public Task StartAsync(CancellationToken cancellationToken)
{
_Subscriptions = new List<IEventAggregatorSubscription>();
SubscibeToEvents();
_Cts = new CancellationTokenSource();
_ProcessingEvents = ProcessEvents(_Cts.Token);
return Task.CompletedTask;
}
Task _ProcessingEvents = Task.CompletedTask;
public async Task StopAsync(CancellationToken cancellationToken)
{
_Subscriptions?.ForEach(subscription => subscription.Dispose());
_Cts?.Cancel();
try
{
await _ProcessingEvents;
}
catch (OperationCanceledException)
{ }
}
}
}

View file

@ -8,6 +8,7 @@ using System.Threading.Tasks;
using BTCPayServer.Controllers; using BTCPayServer.Controllers;
using BTCPayServer.Data; using BTCPayServer.Data;
using BTCPayServer.Events; using BTCPayServer.Events;
using BTCPayServer.HostedServices;
using BTCPayServer.Logging; using BTCPayServer.Logging;
using BTCPayServer.Models.AppViewModels; using BTCPayServer.Models.AppViewModels;
using BTCPayServer.Payments; using BTCPayServer.Payments;
@ -23,68 +24,49 @@ using NBitcoin;
namespace BTCPayServer.Services.Apps namespace BTCPayServer.Services.Apps
{ {
public class AppHubStreamer : IHostedService public class AppHubStreamer : EventHostedServiceBase
{ {
private readonly EventAggregator _EventAggregator;
private readonly IHubContext<AppHub> _HubContext;
private readonly AppService _appService; private readonly AppService _appService;
private List<IEventAggregatorSubscription> _Subscriptions; private IHubContext<AppHub> _HubContext;
private CancellationTokenSource _Cts;
public AppHubStreamer(EventAggregator eventAggregator, public AppHubStreamer(EventAggregator eventAggregator,
IHubContext<AppHub> hubContext, IHubContext<AppHub> hubContext,
AppService appService) AppService appService) : base(eventAggregator)
{ {
_EventAggregator = eventAggregator;
_HubContext = hubContext;
_appService = appService; _appService = appService;
_HubContext = hubContext;
} }
private async Task NotifyClients(string appId, InvoiceEvent invoiceEvent, CancellationToken cancellationToken) protected override void SubscibeToEvents()
{ {
if (invoiceEvent.Name == InvoiceEvent.ReceivedPayment) Subscribe<InvoiceEvent>();
Subscribe<AppsController.AppUpdated>();
}
protected override async Task ProcessEvent(object evt, CancellationToken cancellationToken)
{
if (evt is InvoiceEvent invoiceEvent)
{ {
var data = invoiceEvent.Payment.GetCryptoPaymentData(); foreach (var appId in AppService.GetAppInternalTags(invoiceEvent.Invoice.InternalTags))
await _HubContext.Clients.Group(appId).SendCoreAsync(AppHub.PaymentReceived, new object[] {
if (invoiceEvent.Name == InvoiceEvent.ReceivedPayment)
{ {
var data = invoiceEvent.Payment.GetCryptoPaymentData();
await _HubContext.Clients.Group(appId).SendCoreAsync(AppHub.PaymentReceived, new object[]
{
data.GetValue(), data.GetValue(),
invoiceEvent.Payment.GetCryptoCode(), invoiceEvent.Payment.GetCryptoCode(),
Enum.GetName(typeof(PaymentTypes), Enum.GetName(typeof(PaymentTypes),
invoiceEvent.Payment.GetPaymentMethodId().PaymentType) invoiceEvent.Payment.GetPaymentMethodId().PaymentType)
}, cancellationToken); }, cancellationToken);
}
await InfoUpdated(appId);
}
Channel<object> _Events = Channel.CreateUnbounded<object>();
public async Task ProcessEvents(CancellationToken cancellationToken)
{
while (await _Events.Reader.WaitToReadAsync(cancellationToken))
{
if (_Events.Reader.TryRead(out var evt))
{
try
{
if (evt is InvoiceEvent invoiceEvent)
{
foreach (var appId in AppService.GetAppInternalTags(invoiceEvent.Invoice.InternalTags))
await NotifyClients(appId, invoiceEvent, cancellationToken);
}
else if (evt is AppsController.AppUpdated app)
{
await InfoUpdated(app.AppId);
}
}
catch when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
Logs.PayServer.LogWarning(ex, "Unhandled exception in CrowdfundHubStream");
} }
await InfoUpdated(appId);
} }
} }
else if (evt is AppsController.AppUpdated app)
{
await InfoUpdated(app.AppId);
}
} }
private async Task InfoUpdated(string appId) private async Task InfoUpdated(string appId)
@ -92,30 +74,5 @@ namespace BTCPayServer.Services.Apps
var info = await _appService.GetAppInfo(appId); var info = await _appService.GetAppInfo(appId);
await _HubContext.Clients.Group(appId).SendCoreAsync(AppHub.InfoUpdated, new object[] { info }); await _HubContext.Clients.Group(appId).SendCoreAsync(AppHub.InfoUpdated, new object[] { info });
} }
public Task StartAsync(CancellationToken cancellationToken)
{
_Subscriptions = new List<IEventAggregatorSubscription>()
{
_EventAggregator.Subscribe<InvoiceEvent>(e => _Events.Writer.TryWrite(e)),
_EventAggregator.Subscribe<AppsController.AppUpdated>(e => _Events.Writer.TryWrite(e))
};
_Cts = new CancellationTokenSource();
_ProcessingEvents = ProcessEvents(_Cts.Token);
return Task.CompletedTask;
}
Task _ProcessingEvents = Task.CompletedTask;
public async Task StopAsync(CancellationToken cancellationToken)
{
_Subscriptions?.ForEach(subscription => subscription.Dispose());
_Cts?.Cancel();
try
{
await _ProcessingEvents;
}
catch (OperationCanceledException)
{ }
}
} }
} }