From 84bb6056d33d3a433660db483b2d899814a99599 Mon Sep 17 00:00:00 2001 From: "nicolas.dorier" Date: Sun, 17 Dec 2017 14:17:42 +0900 Subject: [PATCH] Use EventAggregator to decouple several classes --- BTCPayServer.Tests/ServerTester.cs | 4 +- BTCPayServer/CompositeDisposable.cs | 19 +++ .../Controllers/CallbackController.cs | 49 ++++--- BTCPayServer/Controllers/StoresController.cs | 2 +- BTCPayServer/EventAggregator.cs | 130 ++++++++++++++++++ .../Events/InvoiceDataChangedEvent.cs | 17 +++ .../Events/InvoiceStatusChangedEvent.cs | 19 +++ .../Events/NBXplorerStateChangedEvent.cs | 24 ++++ BTCPayServer/Events/NewBlockEvent.cs | 15 ++ BTCPayServer/Events/TxOutReceivedEvent.cs | 20 +++ BTCPayServer/Hosting/BTCPayServerServices.cs | 10 +- BTCPayServer/Hosting/BTCpayMiddleware.cs | 27 ---- BTCPayServer/Initializer.cs | 45 ++++++ BTCPayServer/Logging/Logs.cs | 7 + BTCPayServer/NBXplorerWaiter.cs | 39 +----- .../Invoices/InvoiceNotificationManager.cs | 110 +++++++++++---- .../Services/Invoices/InvoiceWatcher.cs | 59 ++++---- 17 files changed, 461 insertions(+), 135 deletions(-) create mode 100644 BTCPayServer/CompositeDisposable.cs create mode 100644 BTCPayServer/EventAggregator.cs create mode 100644 BTCPayServer/Events/InvoiceDataChangedEvent.cs create mode 100644 BTCPayServer/Events/InvoiceStatusChangedEvent.cs create mode 100644 BTCPayServer/Events/NBXplorerStateChangedEvent.cs create mode 100644 BTCPayServer/Events/NewBlockEvent.cs create mode 100644 BTCPayServer/Events/TxOutReceivedEvent.cs create mode 100644 BTCPayServer/Initializer.cs diff --git a/BTCPayServer.Tests/ServerTester.cs b/BTCPayServer.Tests/ServerTester.cs index 2956275ee..fd961584d 100644 --- a/BTCPayServer.Tests/ServerTester.cs +++ b/BTCPayServer.Tests/ServerTester.cs @@ -230,7 +230,7 @@ namespace BTCPayServer.Tests var match = new TransactionMatch(); match.Outputs.Add(new KeyPathInformation() { ScriptPubKey = address.ScriptPubKey }); var content = new StringContent(new NBXplorer.Serializer(Network).ToString(match), new UTF8Encoding(false), "application/json"); - var uri = controller.GetCallbackUriAsync(req).GetAwaiter().GetResult(); + var uri = controller.GetCallbackUriAsync().GetAwaiter().GetResult(); HttpRequestMessage message = new HttpRequestMessage(); message.Method = HttpMethod.Post; @@ -242,7 +242,7 @@ namespace BTCPayServer.Tests else { - var uri = controller.GetCallbackBlockUriAsync(req).GetAwaiter().GetResult(); + var uri = controller.GetCallbackBlockUriAsync().GetAwaiter().GetResult(); HttpRequestMessage message = new HttpRequestMessage(); message.Method = HttpMethod.Post; message.RequestUri = uri; diff --git a/BTCPayServer/CompositeDisposable.cs b/BTCPayServer/CompositeDisposable.cs new file mode 100644 index 000000000..0fd71df21 --- /dev/null +++ b/BTCPayServer/CompositeDisposable.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace BTCPayServer +{ + public class CompositeDisposable : IDisposable + { + List _Disposables = new List(); + public void Add(IDisposable disposable) { _Disposables.Add(disposable); } + public void Dispose() + { + foreach (var d in _Disposables) + d.Dispose(); + _Disposables.Clear(); + } + } +} diff --git a/BTCPayServer/Controllers/CallbackController.cs b/BTCPayServer/Controllers/CallbackController.cs index c8805459e..0c57d1795 100644 --- a/BTCPayServer/Controllers/CallbackController.cs +++ b/BTCPayServer/Controllers/CallbackController.cs @@ -16,6 +16,10 @@ using System.Linq; using System.Text; using System.Threading.Tasks; using BTCPayServer.Configuration; +using BTCPayServer.Events; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Hosting.Server.Features; +using Microsoft.AspNetCore.Hosting.Server; namespace BTCPayServer.Controllers { @@ -30,21 +34,23 @@ namespace BTCPayServer.Controllers } SettingsRepository _Settings; Network _Network; - InvoiceWatcher _Watcher; ExplorerClient _Explorer; BTCPayServerOptions _Options; - + EventAggregator _EventAggregator; + IServer _Server; public CallbackController(SettingsRepository repo, ExplorerClient explorer, - InvoiceWatcherAccessor watcher, + EventAggregator eventAggregator, BTCPayServerOptions options, + IServer server, Network network) { _Settings = repo; _Network = network; - _Watcher = watcher.Instance; _Explorer = explorer; _Options = options; + _EventAggregator = eventAggregator; + _Server = server; } [Route("callbacks/transactions")] @@ -52,7 +58,6 @@ namespace BTCPayServer.Controllers public async Task NewTransaction(string token) { await AssertToken(token); - Logs.PayServer.LogInformation("New transaction callback"); //We don't want to register all the json converter at MVC level, so we parse here var serializer = new NBXplorer.Serializer(_Network); var content = await new StreamReader(Request.Body, new UTF8Encoding(false), false, 1024, true).ReadToEndAsync(); @@ -60,7 +65,10 @@ namespace BTCPayServer.Controllers foreach (var output in match.Outputs) { - await _Watcher.NotifyReceived(output.ScriptPubKey); + var evt = new TxOutReceivedEvent(); + evt.ScriptPubKey = output.ScriptPubKey; + evt.Address = output.ScriptPubKey.GetDestinationAddress(_Network); + _EventAggregator.Publish(evt); } } @@ -69,8 +77,7 @@ namespace BTCPayServer.Controllers public async Task NewBlock(string token) { await AssertToken(token); - Logs.PayServer.LogInformation("New block callback"); - await _Watcher.NotifyBlock(); + _EventAggregator.Publish(new NewBlockEvent()); } private async Task AssertToken(string token) @@ -80,15 +87,15 @@ namespace BTCPayServer.Controllers throw new BTCPayServer.BitpayHttpException(400, "invalid-callback-token"); } - public async Task GetCallbackUriAsync(HttpRequest request) + public async Task GetCallbackUriAsync() { string token = await GetToken(); - return BuildCallbackUri(request, "callbacks/transactions?token=" + token); + return BuildCallbackUri("callbacks/transactions?token=" + token); } - public async Task RegisterCallbackUriAsync(DerivationStrategyBase derivationScheme, HttpRequest request) + public async Task RegisterCallbackUriAsync(DerivationStrategyBase derivationScheme) { - var uri = await GetCallbackUriAsync(request); + var uri = await GetCallbackUriAsync(); await _Explorer.SubscribeToWalletAsync(uri, derivationScheme); } @@ -104,19 +111,29 @@ namespace BTCPayServer.Controllers return token; } - public async Task GetCallbackBlockUriAsync(HttpRequest request) + public async Task GetCallbackBlockUriAsync() { string token = await GetToken(); - return BuildCallbackUri(request, "callbacks/blocks?token=" + token); + return BuildCallbackUri("callbacks/blocks?token=" + token); } - private Uri BuildCallbackUri(HttpRequest request, string callbackPath) + private Uri BuildCallbackUri(string callbackPath) { - string baseUrl = _Options.InternalUrl == null ? request.GetAbsoluteRoot() : _Options.InternalUrl.AbsolutePath; + var address = _Server.Features.Get().Addresses + .Select(c => new Uri(TransformToRoutable(c))) + .First(); + var baseUrl = _Options.InternalUrl == null ? address.AbsoluteUri : _Options.InternalUrl.AbsoluteUri; baseUrl = baseUrl.WithTrailingSlash(); return new Uri(baseUrl + callbackPath); } + private string TransformToRoutable(string host) + { + if (host.StartsWith("http://0.0.0.0")) + host = host.Replace("http://0.0.0.0", "http://127.0.0.1"); + return host; + } + public async Task RegisterCallbackBlockUriAsync(Uri uri) { await _Explorer.SubscribeToBlocksAsync(uri); diff --git a/BTCPayServer/Controllers/StoresController.cs b/BTCPayServer/Controllers/StoresController.cs index a9918d39a..e770f1882 100644 --- a/BTCPayServer/Controllers/StoresController.cs +++ b/BTCPayServer/Controllers/StoresController.cs @@ -198,7 +198,7 @@ namespace BTCPayServer.Controllers { var strategy = ParseDerivationStrategy(model.DerivationScheme, model.DerivationSchemeFormat); await _Wallet.TrackAsync(strategy); - await _CallbackController.RegisterCallbackUriAsync(strategy, Request); + await _CallbackController.RegisterCallbackUriAsync(strategy); } store.DerivationStrategy = model.DerivationScheme; } diff --git a/BTCPayServer/EventAggregator.cs b/BTCPayServer/EventAggregator.cs new file mode 100644 index 000000000..e9ff509ad --- /dev/null +++ b/BTCPayServer/EventAggregator.cs @@ -0,0 +1,130 @@ +using System; +using Microsoft.Extensions.Logging; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using BTCPayServer.Logging; + +namespace BTCPayServer +{ + public interface IEventAggregatorSubscription : IDisposable + { + void Unsubscribe(); + void Resubscribe(); + } + public class EventAggregator : IDisposable + { + class Subscription : IEventAggregatorSubscription + { + private EventAggregator aggregator; + Type t; + Action act; + public Subscription(EventAggregator aggregator, Type t) + { + this.aggregator = aggregator; + this.t = t; + } + + public Action Act { get; set; } + + public void Dispose() + { + lock (this.aggregator._Subscriptions) + { + if (this.aggregator._Subscriptions.TryGetValue(t, out Dictionary> actions)) + { + if (actions.Remove(this)) + { + if (actions.Count == 0) + this.aggregator._Subscriptions.Remove(t); + } + } + } + } + + public void Resubscribe() + { + aggregator.Subscribe(t, this); + } + + public void Unsubscribe() + { + Dispose(); + } + } + + public void Publish(T evt) where T : class + { + if (evt == null) + throw new ArgumentNullException(nameof(evt)); + List> actionList = new List>(); + lock (_Subscriptions) + { + if (_Subscriptions.TryGetValue(typeof(T), out Dictionary> actions)) + { + actionList = actions.Values.ToList(); + } + } + + Logs.Events.LogInformation($"New event: {evt.ToString()}"); + foreach (var sub in actionList) + { + try + { + sub(evt); + } + catch (Exception ex) + { + Logs.Events.LogError(ex, $"Error while calling event handler"); + } + } + } + + public IEventAggregatorSubscription Subscribe(Action subscription) + { + var eventType = typeof(T); + var s = new Subscription(this, eventType); + s.Act = (o) => subscription(s, (T)o); + return Subscribe(eventType, s); + } + + private IEventAggregatorSubscription Subscribe(Type eventType, Subscription subscription) + { + lock (_Subscriptions) + { + if (!_Subscriptions.TryGetValue(eventType, out Dictionary> actions)) + { + actions = new Dictionary>(); + _Subscriptions.Add(eventType, actions); + } + actions.Add(subscription, subscription.Act); + } + return subscription; + } + + Dictionary>> _Subscriptions = new Dictionary>>(); + + public IEventAggregatorSubscription Subscribe(Func subscription) + { + return Subscribe(new Action((t) => subscription(t))); + } + + public IEventAggregatorSubscription Subscribe(Func subscription) + { + return Subscribe(new Action((sub, t) => subscription(sub, t))); + } + + public IEventAggregatorSubscription Subscribe(Action subscription) + { + return Subscribe(new Action((sub, t) => subscription(t))); + } + + public void Dispose() + { + lock (_Subscriptions) + { + _Subscriptions.Clear(); + } + } + } +} diff --git a/BTCPayServer/Events/InvoiceDataChangedEvent.cs b/BTCPayServer/Events/InvoiceDataChangedEvent.cs new file mode 100644 index 000000000..38d0f08fd --- /dev/null +++ b/BTCPayServer/Events/InvoiceDataChangedEvent.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace BTCPayServer.Events +{ + public class InvoiceDataChangedEvent + { + public string InvoiceId { get; set; } + + public override string ToString() + { + return $"Invoice {InvoiceId} data changed"; + } + } +} diff --git a/BTCPayServer/Events/InvoiceStatusChangedEvent.cs b/BTCPayServer/Events/InvoiceStatusChangedEvent.cs new file mode 100644 index 000000000..f3a31d051 --- /dev/null +++ b/BTCPayServer/Events/InvoiceStatusChangedEvent.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace BTCPayServer.Events +{ + public class InvoiceStatusChangedEvent + { + public string InvoiceId { get; set; } + public string OldState { get; set; } + public string NewState { get; set; } + + public override string ToString() + { + return $"Invoice {InvoiceId} changed status: {OldState} => {NewState}"; + } + } +} diff --git a/BTCPayServer/Events/NBXplorerStateChangedEvent.cs b/BTCPayServer/Events/NBXplorerStateChangedEvent.cs new file mode 100644 index 000000000..32eb440fd --- /dev/null +++ b/BTCPayServer/Events/NBXplorerStateChangedEvent.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace BTCPayServer.Events +{ + public class NBXplorerStateChangedEvent + { + public NBXplorerStateChangedEvent(NBXplorerState old, NBXplorerState newState) + { + NewState = newState; + OldState = old; + } + + public NBXplorerState NewState { get; set; } + public NBXplorerState OldState { get; set; } + + public override string ToString() + { + return $"NBXplorer: {OldState} => {NewState}"; + } + } +} diff --git a/BTCPayServer/Events/NewBlockEvent.cs b/BTCPayServer/Events/NewBlockEvent.cs new file mode 100644 index 000000000..f0349afc5 --- /dev/null +++ b/BTCPayServer/Events/NewBlockEvent.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace BTCPayServer.Events +{ + public class NewBlockEvent + { + public override string ToString() + { + return "New block"; + } + } +} diff --git a/BTCPayServer/Events/TxOutReceivedEvent.cs b/BTCPayServer/Events/TxOutReceivedEvent.cs new file mode 100644 index 000000000..df368e7d6 --- /dev/null +++ b/BTCPayServer/Events/TxOutReceivedEvent.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using NBitcoin; + +namespace BTCPayServer.Events +{ + public class TxOutReceivedEvent + { + public Script ScriptPubKey { get; set; } + public BitcoinAddress Address { get; set; } + + public override string ToString() + { + String address = Address?.ToString() ?? ScriptPubKey.ToHex(); + return $"{address} received a transaction"; + } + } +} diff --git a/BTCPayServer/Hosting/BTCPayServerServices.cs b/BTCPayServer/Hosting/BTCPayServerServices.cs index 338ccfb80..89234f1dd 100644 --- a/BTCPayServer/Hosting/BTCPayServerServices.cs +++ b/BTCPayServer/Hosting/BTCPayServerServices.cs @@ -106,6 +106,7 @@ namespace BTCPayServer.Hosting }); services.AddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton(o => o.GetRequiredService().Network); services.TryAddSingleton(o => { @@ -158,11 +159,11 @@ namespace BTCPayServer.Hosting return new CachedRateProvider(new FallbackRateProvider(new IRateProvider[] { coinaverage, bitpay }), o.GetRequiredService()) { CacheSpan = TimeSpan.FromMinutes(1.0) }; }); - services.TryAddSingleton(); + services.AddSingleton(); services.TryAddSingleton(); services.AddSingleton(); - + services.TryAddSingleton(); services.TryAddScoped(); services.TryAddSingleton(); services.AddTransient(); @@ -197,6 +198,11 @@ namespace BTCPayServer.Hosting scope.ServiceProvider.GetRequiredService().Database.Migrate(); }); } + + + + var initialize = app.ApplicationServices.GetService(); + initialize.Init(); app.UseMiddleware(); return app; } diff --git a/BTCPayServer/Hosting/BTCpayMiddleware.cs b/BTCPayServer/Hosting/BTCpayMiddleware.cs index ac707355a..e93d27198 100644 --- a/BTCPayServer/Hosting/BTCpayMiddleware.cs +++ b/BTCPayServer/Hosting/BTCpayMiddleware.cs @@ -31,19 +31,15 @@ namespace BTCPayServer.Hosting RequestDelegate _Next; CallbackController _CallbackController; BTCPayServerOptions _Options; - private NBXplorerWaiterAccessor _NbxplorerAwaiter; public BTCPayMiddleware(RequestDelegate next, TokenRepository tokenRepo, BTCPayServerOptions options, - NBXplorerWaiterAccessor nbxplorerAwaiter, CallbackController callbackController) { _TokenRepository = tokenRepo ?? throw new ArgumentNullException(nameof(tokenRepo)); _Next = next ?? throw new ArgumentNullException(nameof(next)); - _CallbackController = callbackController; _Options = options ?? throw new ArgumentNullException(nameof(options)); - _NbxplorerAwaiter = (nbxplorerAwaiter ?? throw new ArgumentNullException(nameof(nbxplorerAwaiter))); } @@ -52,7 +48,6 @@ namespace BTCPayServer.Hosting public async Task Invoke(HttpContext httpContext) { RewriteHostIfNeeded(httpContext); - await EnsureBlockCallbackRegistered(httpContext); httpContext.Request.Headers.TryGetValue("x-signature", out StringValues values); var sig = values.FirstOrDefault(); @@ -154,28 +149,6 @@ namespace BTCPayServer.Hosting } } - private async Task EnsureBlockCallbackRegistered(HttpContext httpContext) - { - if (!_Registered) - { - var callback = await _CallbackController.GetCallbackBlockUriAsync(httpContext.Request); - var unused = _NbxplorerAwaiter.Instance.WhenReady(async c => - { - try - { - await _CallbackController.RegisterCallbackBlockUriAsync(callback); - Logs.PayServer.LogInformation($"Registering block callback to " + callback); - } - catch (Exception ex) - { - Logs.PayServer.LogError(ex, "Could not register block callback"); - } - return true; - }); - _Registered = true; - } - } - private static async Task HandleBitpayHttpException(HttpContext httpContext, BitpayHttpException ex) { httpContext.Response.StatusCode = ex.StatusCode; diff --git a/BTCPayServer/Initializer.cs b/BTCPayServer/Initializer.cs new file mode 100644 index 000000000..862289fb0 --- /dev/null +++ b/BTCPayServer/Initializer.cs @@ -0,0 +1,45 @@ +using System; +using Microsoft.Extensions.Logging; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using BTCPayServer.Controllers; +using BTCPayServer.Logging; +using BTCPayServer.Events; + +namespace BTCPayServer +{ + public class Initializer + { + EventAggregator _Aggregator; + CallbackController _CallbackController; + public Initializer(EventAggregator aggregator, + CallbackController callbackController + ) + { + _Aggregator = aggregator; + _CallbackController = callbackController; + } + public void Init() + { + _Aggregator.Subscribe(async (s, evt) => + { + if (evt.NewState == NBXplorerState.Ready) + { + s.Unsubscribe(); + try + { + var callback = await _CallbackController.GetCallbackBlockUriAsync(); + await _CallbackController.RegisterCallbackBlockUriAsync(callback); + Logs.PayServer.LogInformation($"Registering block callback to " + callback); + } + catch (Exception ex) + { + Logs.PayServer.LogError(ex, "Could not register block callback"); + s.Resubscribe(); + } + } + }); + } + } +} diff --git a/BTCPayServer/Logging/Logs.cs b/BTCPayServer/Logging/Logs.cs index 463911995..96a0ee8c1 100644 --- a/BTCPayServer/Logging/Logs.cs +++ b/BTCPayServer/Logging/Logs.cs @@ -17,6 +17,7 @@ namespace BTCPayServer.Logging { Configuration = factory.CreateLogger("Configuration"); PayServer = factory.CreateLogger("PayServer"); + Events = factory.CreateLogger("PayServer"); } public static ILogger Configuration { @@ -26,6 +27,12 @@ namespace BTCPayServer.Logging { get; set; } + + public static ILogger Events + { + get; set; + } + public const int ColumnLength = 16; } diff --git a/BTCPayServer/NBXplorerWaiter.cs b/BTCPayServer/NBXplorerWaiter.cs index 7b1bf5ff8..6f518fde5 100644 --- a/BTCPayServer/NBXplorerWaiter.cs +++ b/BTCPayServer/NBXplorerWaiter.cs @@ -9,6 +9,7 @@ using Microsoft.Extensions.Hosting; using NBXplorer; using NBXplorer.Models; using System.Collections.Concurrent; +using BTCPayServer.Events; namespace BTCPayServer { @@ -22,14 +23,17 @@ namespace BTCPayServer Synching, Ready } + public class NBXplorerWaiter : IHostedService { - public NBXplorerWaiter(ExplorerClient client, NBXplorerWaiterAccessor accessor) + public NBXplorerWaiter(ExplorerClient client, EventAggregator aggregator, NBXplorerWaiterAccessor accessor) { _Client = client; + _Aggregator = aggregator; accessor.Instance = this; } + EventAggregator _Aggregator; ExplorerClient _Client; Timer _Timer; ManualResetEventSlim _Idle = new ManualResetEventSlim(true); @@ -64,15 +68,6 @@ namespace BTCPayServer { } - List tasks = new List(); - if (State == NBXplorerState.Ready) - { - while (_WhenReady.TryDequeue(out Func act)) - { - tasks.Add(act(_Client)); - } - } - await Task.WhenAll(tasks); } private async Task StepAsync() @@ -123,7 +118,6 @@ namespace BTCPayServer LastStatus = status; if (oldState != State) { - Logs.PayServer.LogInformation($"NBXplorerWaiter status changed: {oldState} => {State}"); if (State == NBXplorerState.Synching) { SetInterval(TimeSpan.FromSeconds(10)); @@ -132,6 +126,7 @@ namespace BTCPayServer { SetInterval(TimeSpan.FromMinutes(1)); } + _Aggregator.Publish(new NBXplorerStateChangedEvent(oldState, State)); } return oldState != State; } @@ -145,28 +140,6 @@ namespace BTCPayServer catch { } } - public Task WhenReady(Func> act) - { - if (State == NBXplorerState.Ready) - return act(_Client); - TaskCompletionSource completion = new TaskCompletionSource(); - _WhenReady.Enqueue(async client => - { - try - { - var result = await act(client); - completion.SetResult(result); - } - catch (Exception ex) - { - completion.SetException(ex); - } - }); - return completion.Task; - } - - ConcurrentQueue> _WhenReady = new ConcurrentQueue>(); - private async Task GetStatusWithTimeout() { CancellationTokenSource cts = new CancellationTokenSource(); diff --git a/BTCPayServer/Services/Invoices/InvoiceNotificationManager.cs b/BTCPayServer/Services/Invoices/InvoiceNotificationManager.cs index 76e3040e3..bdb930b77 100644 --- a/BTCPayServer/Services/Invoices/InvoiceNotificationManager.cs +++ b/BTCPayServer/Services/Invoices/InvoiceNotificationManager.cs @@ -15,10 +15,12 @@ using NBitpayClient; using Newtonsoft.Json.Linq; using Newtonsoft.Json; using System.Collections.Concurrent; +using Microsoft.Extensions.Hosting; +using BTCPayServer.Events; namespace BTCPayServer.Services.Invoices { - public class InvoiceNotificationManager + public class InvoiceNotificationManager : IHostedService { public static HttpClient _Client = new HttpClient(); @@ -41,16 +43,32 @@ namespace BTCPayServer.Services.Invoices } IBackgroundJobClient _JobClient; + EventAggregator _EventAggregator; + InvoiceRepository _InvoiceRepository; + public InvoiceNotificationManager( IBackgroundJobClient jobClient, + EventAggregator eventAggregator, + InvoiceRepository invoiceRepository, ILogger logger) { Logger = logger as ILogger ?? NullLogger.Instance; _JobClient = jobClient; + _EventAggregator = eventAggregator; + _InvoiceRepository = invoiceRepository; } - public void Notify(InvoiceEntity invoice) + async Task Notify(InvoiceEntity invoice) { + CancellationTokenSource cts = new CancellationTokenSource(10000); + try + { + await SendNotification(invoice, cts.Token); + return; + } + catch // It fails, it is OK because we try with hangfire after + { + } var invoiceStr = NBitcoin.JsonConverters.Serializer.ToString(new ScheduledJob() { TryCount = 0, Invoice = invoice }); if (!string.IsNullOrEmpty(invoice.NotificationURL)) _JobClient.Schedule(() => NotifyHttp(invoiceStr), TimeSpan.Zero); @@ -70,31 +88,7 @@ namespace BTCPayServer.Services.Invoices CancellationTokenSource cts = new CancellationTokenSource(10000); try { - var request = new HttpRequestMessage(); - request.Method = HttpMethod.Post; - - var dto = job.Invoice.EntityToDTO(); - InvoicePaymentNotification notification = new InvoicePaymentNotification() - { - Id = dto.Id, - Url = dto.Url, - BTCDue = dto.BTCDue, - BTCPaid = dto.BTCPaid, - BTCPrice = dto.BTCPrice, - Currency = dto.Currency, - CurrentTime = dto.CurrentTime, - ExceptionStatus = dto.ExceptionStatus, - ExpirationTime = dto.ExpirationTime, - InvoiceTime = dto.InvoiceTime, - PosData = dto.PosData, - Price = dto.Price, - Rate = dto.Rate, - Status = dto.Status, - BuyerFields = job.Invoice.RefundMail == null ? null : new Newtonsoft.Json.Linq.JObject() { new JProperty("buyerEmail", job.Invoice.RefundMail) } - }; - request.RequestUri = new Uri(job.Invoice.NotificationURL, UriKind.Absolute); - request.Content = new StringContent(JsonConvert.SerializeObject(notification), Encoding.UTF8, "application/json"); - var response = await _Client.SendAsync(request, cts.Token); + HttpResponseMessage response = await SendNotification(job.Invoice, cts.Token); reschedule = response.StatusCode != System.Net.HttpStatusCode.OK; Logger.LogInformation("Job " + jobId + " returned " + response.StatusCode); } @@ -116,11 +110,73 @@ namespace BTCPayServer.Services.Invoices } } + private static async Task SendNotification(InvoiceEntity invoice, CancellationToken cancellation) + { + var request = new HttpRequestMessage(); + request.Method = HttpMethod.Post; + + var dto = invoice.EntityToDTO(); + InvoicePaymentNotification notification = new InvoicePaymentNotification() + { + Id = dto.Id, + Url = dto.Url, + BTCDue = dto.BTCDue, + BTCPaid = dto.BTCPaid, + BTCPrice = dto.BTCPrice, + Currency = dto.Currency, + CurrentTime = dto.CurrentTime, + ExceptionStatus = dto.ExceptionStatus, + ExpirationTime = dto.ExpirationTime, + InvoiceTime = dto.InvoiceTime, + PosData = dto.PosData, + Price = dto.Price, + Rate = dto.Rate, + Status = dto.Status, + BuyerFields = invoice.RefundMail == null ? null : new Newtonsoft.Json.Linq.JObject() { new JProperty("buyerEmail", invoice.RefundMail) } + }; + request.RequestUri = new Uri(invoice.NotificationURL, UriKind.Absolute); + request.Content = new StringContent(JsonConvert.SerializeObject(notification), Encoding.UTF8, "application/json"); + var response = await _Client.SendAsync(request, cancellation); + return response; + } + int MaxTry = 6; private static string GetHttpJobId(InvoiceEntity invoice) { return $"{invoice.Id}-{invoice.Status}-HTTP"; } + + CompositeDisposable leases = new CompositeDisposable(); + public Task StartAsync(CancellationToken cancellationToken) + { + leases.Add(_EventAggregator.Subscribe(async e => + { + var invoice = await _InvoiceRepository.GetInvoice(null, e.InvoiceId); + + // we need to use the status in the event and not in the invoice. The invoice might now be in another status. + if (invoice.FullNotifications) + { + if (e.NewState == "expired" || + e.NewState == "paid" || + e.NewState == "invalid" || + e.NewState == "complete" + ) + await Notify(invoice); + } + + if(e.NewState == "confirmed") + { + await Notify(invoice); + } + })); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + leases.Dispose(); + return Task.CompletedTask; + } } } diff --git a/BTCPayServer/Services/Invoices/InvoiceWatcher.cs b/BTCPayServer/Services/Invoices/InvoiceWatcher.cs index 5b620c93c..b29c0b0dd 100644 --- a/BTCPayServer/Services/Invoices/InvoiceWatcher.cs +++ b/BTCPayServer/Services/Invoices/InvoiceWatcher.cs @@ -13,6 +13,8 @@ using Microsoft.Extensions.Hosting; using System.Collections.Concurrent; using Hangfire; using BTCPayServer.Services.Wallets; +using BTCPayServer.Controllers; +using BTCPayServer.Events; namespace BTCPayServer.Services.Invoices { @@ -25,14 +27,14 @@ namespace BTCPayServer.Services.Invoices InvoiceRepository _InvoiceRepository; ExplorerClient _ExplorerClient; DerivationStrategyFactory _DerivationFactory; - InvoiceNotificationManager _NotificationManager; + EventAggregator _EventAggregator; BTCPayWallet _Wallet; public InvoiceWatcher(ExplorerClient explorerClient, InvoiceRepository invoiceRepository, + EventAggregator eventAggregator, BTCPayWallet wallet, - InvoiceNotificationManager notificationManager, InvoiceWatcherAccessor accessor) { LongPollingMode = explorerClient.Network == Network.RegTest; @@ -41,23 +43,24 @@ namespace BTCPayServer.Services.Invoices _ExplorerClient = explorerClient ?? throw new ArgumentNullException(nameof(explorerClient)); _DerivationFactory = new DerivationStrategyFactory(_ExplorerClient.Network); _InvoiceRepository = invoiceRepository ?? throw new ArgumentNullException(nameof(invoiceRepository)); - _NotificationManager = notificationManager ?? throw new ArgumentNullException(nameof(notificationManager)); + _EventAggregator = eventAggregator ?? throw new ArgumentNullException(nameof(eventAggregator)); accessor.Instance = this; } + CompositeDisposable leases = new CompositeDisposable(); public bool LongPollingMode { get; set; } - public async Task NotifyReceived(Script scriptPubKey) + async Task NotifyReceived(Script scriptPubKey) { var invoice = await _InvoiceRepository.GetInvoiceIdFromScriptPubKey(scriptPubKey); if (invoice != null) _WatchRequests.Add(invoice); } - public async Task NotifyBlock() + async Task NotifyBlock() { foreach (var invoice in await _InvoiceRepository.GetPendingInvoices()) { @@ -76,17 +79,24 @@ namespace BTCPayServer.Services.Invoices if (invoice == null) break; var stateBefore = invoice.Status; - var result = await UpdateInvoice(changes, invoice).ConfigureAwait(false); + var stateChanges = new List(); + var result = await UpdateInvoice(changes, invoice, stateChanges).ConfigureAwait(false); changes = result.Changes; if (result.NeedSave) + { await _InvoiceRepository.UpdateInvoiceStatus(invoice.Id, invoice.Status, invoice.ExceptionStatus).ConfigureAwait(false); - - var changed = stateBefore != invoice.Status; - if (changed) - { - Logs.PayServer.LogInformation($"Invoice {invoice.Id}: {stateBefore} => {invoice.Status}"); + _EventAggregator.Publish(new InvoiceDataChangedEvent() { InvoiceId = invoice.Id }); } + var changed = stateBefore != invoice.Status; + + foreach(var stateChange in stateChanges) + { + _EventAggregator.Publish(new InvoiceStatusChangedEvent() { InvoiceId = invoice.Id, NewState = stateChange, OldState = stateBefore }); + stateBefore = stateChange; + } + + if (invoice.Status == "complete" || ((invoice.Status == "invalid" || invoice.Status == "expired") && invoice.MonitoringExpiration < DateTimeOffset.UtcNow)) { @@ -111,7 +121,7 @@ namespace BTCPayServer.Services.Invoices } - private async Task<(bool NeedSave, UTXOChanges Changes)> UpdateInvoice(UTXOChanges changes, InvoiceEntity invoice) + private async Task<(bool NeedSave, UTXOChanges Changes)> UpdateInvoice(UTXOChanges changes, InvoiceEntity invoice, List stateChanges) { bool needSave = false; //Fetch unknown payments @@ -139,10 +149,7 @@ namespace BTCPayServer.Services.Invoices needSave = true; await _InvoiceRepository.UnaffectAddress(invoice.Id); invoice.Status = "expired"; - if (invoice.FullNotifications) - { - _NotificationManager.Notify(invoice); - } + stateChanges.Add(invoice.Status); } if (invoice.Status == "new" || invoice.Status == "expired") @@ -153,10 +160,7 @@ namespace BTCPayServer.Services.Invoices if (invoice.Status == "new") { invoice.Status = "paid"; - if (invoice.FullNotifications) - { - _NotificationManager.Notify(invoice); - } + stateChanges.Add(invoice.Status); invoice.ExceptionStatus = null; await _InvoiceRepository.UnaffectAddress(invoice.Id); needSave = true; @@ -216,11 +220,8 @@ namespace BTCPayServer.Services.Invoices { await _InvoiceRepository.UnaffectAddress(invoice.Id); invoice.Status = "invalid"; + stateChanges.Add(invoice.Status); needSave = true; - if (invoice.FullNotifications) - { - _NotificationManager.Notify(invoice); - } } else { @@ -229,7 +230,7 @@ namespace BTCPayServer.Services.Invoices { await _InvoiceRepository.UnaffectAddress(invoice.Id); invoice.Status = "confirmed"; - _NotificationManager.Notify(invoice); + stateChanges.Add(invoice.Status); needSave = true; } } @@ -243,8 +244,7 @@ namespace BTCPayServer.Services.Invoices if (totalConfirmed >= invoice.GetTotalCryptoDue()) { invoice.Status = "complete"; - if (invoice.FullNotifications) - _NotificationManager.Notify(invoice); + stateChanges.Add(invoice.Status); needSave = true; } } @@ -356,6 +356,10 @@ namespace BTCPayServer.Services.Invoices _WatchRequests.Add(pending); } }, null, 0, (int)PollInterval.TotalMilliseconds); + + leases.Add(_EventAggregator.Subscribe(async b => { await NotifyBlock(); })); + leases.Add(_EventAggregator.Subscribe(async b => { await NotifyReceived(b.ScriptPubKey); })); + return Task.CompletedTask; } @@ -402,6 +406,7 @@ namespace BTCPayServer.Services.Invoices public Task StopAsync(CancellationToken cancellationToken) { + leases.Dispose(); _UpdatePendingInvoices.Dispose(); _Cts.Cancel(); return Task.WhenAny(_RunningTask.Task, Task.Delay(-1, cancellationToken));