Make sure the SubscribeAsync handlers are executed in order

This commit is contained in:
nicolas.dorier 2022-01-17 12:55:35 +09:00
parent 14da3023d8
commit a42323a527
No known key found for this signature in database
GPG Key ID: 6618763EF09186FE
3 changed files with 52 additions and 14 deletions

View File

@ -2,6 +2,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using BTCPayServer.Logging;
using Microsoft.Extensions.Logging;
@ -11,7 +12,6 @@ namespace BTCPayServer
public interface IEventAggregatorSubscription : IDisposable
{
void Unsubscribe();
void Resubscribe();
}
public class EventAggregator : IDisposable
{
@ -50,11 +50,6 @@ namespace BTCPayServer
}
}
public void Resubscribe()
{
aggregator.Subscribe(t, this);
}
public void Unsubscribe()
{
Dispose();
@ -149,10 +144,53 @@ namespace BTCPayServer
{
return Subscribe(new Action<IEventAggregatorSubscription, T>((sub, t) => subscription(sub, t)));
}
class ChannelSubscription<T> : IEventAggregatorSubscription
{
private Channel<T> _evts;
private IEventAggregatorSubscription _innerSubscription;
private Func<T, Task> _act;
private Logs _logs;
public ChannelSubscription(Channel<T> evts, IEventAggregatorSubscription innerSubscription, Func<T, Task> act, Logs logs)
{
_evts = evts;
_innerSubscription = innerSubscription;
_act = act;
_logs = logs;
_ = Listen();
}
private async Task Listen()
{
await foreach (var item in _evts.Reader.ReadAllAsync())
{
try
{
await _act(item);
}
catch (Exception ex)
{
_logs.Events.LogError(ex, $"Error while calling event async handler");
}
}
}
public void Dispose()
{
Unsubscribe();
}
public void Unsubscribe()
{
_innerSubscription.Unsubscribe();
_evts.Writer.TryComplete();
}
}
public IEventAggregatorSubscription SubscribeAsync<T>(Func<T, Task> subscription)
{
return Subscribe(new Action<IEventAggregatorSubscription, T>((sub, t) => _ = subscription(t)));
Channel<T> evts = Channel.CreateUnbounded<T>();
var innerSubscription = Subscribe(new Action<IEventAggregatorSubscription, T>((sub, t) => evts.Writer.TryWrite(t)));
return new ChannelSubscription<T>(evts, innerSubscription, subscription, Logs);
}
public IEventAggregatorSubscription Subscribe<T>(Action<T> subscription)
{

View File

@ -270,20 +270,20 @@ namespace BTCPayServer.HostedServices
e.Name == InvoiceEvent.ExpiredPaidPartial
)
{
_ = Notify(invoice, e, false, sendMail);
await Notify(invoice, e, false, sendMail);
sendMail = false;
}
}
if (e.Name == InvoiceEvent.Confirmed)
{
_ = Notify(invoice, e, false, sendMail);
await Notify(invoice, e, false, sendMail);
sendMail = false;
}
if (invoice.ExtendedNotifications)
{
_ = Notify(invoice, e, true, sendMail);
await Notify(invoice, e, true, sendMail);
sendMail = false;
}
}));

View File

@ -80,25 +80,25 @@ namespace BTCPayServer.Payments.Bitcoin
{
_RunningTask = new TaskCompletionSource<bool>();
_Cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
leases.Add(_Aggregator.SubscribeAsync<Events.NBXplorerStateChangedEvent>(async nbxplorerEvent =>
leases.Add(_Aggregator.Subscribe<Events.NBXplorerStateChangedEvent>(nbxplorerEvent =>
{
if (nbxplorerEvent.NewState == NBXplorerState.Ready)
{
var wallet = _Wallets.GetWallet(nbxplorerEvent.Network);
if (_Wallets.IsAvailable(wallet.Network))
{
await Listen(wallet);
_ = Listen(wallet);
}
}
}));
_ListenPoller = new Timer(async s =>
_ListenPoller = new Timer(s =>
{
foreach (var wallet in _Wallets.GetWallets())
{
if (_Wallets.IsAvailable(wallet.Network))
{
await Listen(wallet);
_ = Listen(wallet);
}
}
}, null, 0, (int)PollInterval.TotalMilliseconds);