Use EventAggregator to decouple several classes

This commit is contained in:
nicolas.dorier 2017-12-17 14:17:42 +09:00
parent dfed2daa8e
commit 84bb6056d3
17 changed files with 461 additions and 135 deletions

View File

@ -230,7 +230,7 @@ namespace BTCPayServer.Tests
var match = new TransactionMatch();
match.Outputs.Add(new KeyPathInformation() { ScriptPubKey = address.ScriptPubKey });
var content = new StringContent(new NBXplorer.Serializer(Network).ToString(match), new UTF8Encoding(false), "application/json");
var uri = controller.GetCallbackUriAsync(req).GetAwaiter().GetResult();
var uri = controller.GetCallbackUriAsync().GetAwaiter().GetResult();
HttpRequestMessage message = new HttpRequestMessage();
message.Method = HttpMethod.Post;
@ -242,7 +242,7 @@ namespace BTCPayServer.Tests
else
{
var uri = controller.GetCallbackBlockUriAsync(req).GetAwaiter().GetResult();
var uri = controller.GetCallbackBlockUriAsync().GetAwaiter().GetResult();
HttpRequestMessage message = new HttpRequestMessage();
message.Method = HttpMethod.Post;
message.RequestUri = uri;

View File

@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace BTCPayServer
{
public class CompositeDisposable : IDisposable
{
List<IDisposable> _Disposables = new List<IDisposable>();
public void Add(IDisposable disposable) { _Disposables.Add(disposable); }
public void Dispose()
{
foreach (var d in _Disposables)
d.Dispose();
_Disposables.Clear();
}
}
}

View File

@ -16,6 +16,10 @@ using System.Linq;
using System.Text;
using System.Threading.Tasks;
using BTCPayServer.Configuration;
using BTCPayServer.Events;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Hosting.Server.Features;
using Microsoft.AspNetCore.Hosting.Server;
namespace BTCPayServer.Controllers
{
@ -30,21 +34,23 @@ namespace BTCPayServer.Controllers
}
SettingsRepository _Settings;
Network _Network;
InvoiceWatcher _Watcher;
ExplorerClient _Explorer;
BTCPayServerOptions _Options;
EventAggregator _EventAggregator;
IServer _Server;
public CallbackController(SettingsRepository repo,
ExplorerClient explorer,
InvoiceWatcherAccessor watcher,
EventAggregator eventAggregator,
BTCPayServerOptions options,
IServer server,
Network network)
{
_Settings = repo;
_Network = network;
_Watcher = watcher.Instance;
_Explorer = explorer;
_Options = options;
_EventAggregator = eventAggregator;
_Server = server;
}
[Route("callbacks/transactions")]
@ -52,7 +58,6 @@ namespace BTCPayServer.Controllers
public async Task NewTransaction(string token)
{
await AssertToken(token);
Logs.PayServer.LogInformation("New transaction callback");
//We don't want to register all the json converter at MVC level, so we parse here
var serializer = new NBXplorer.Serializer(_Network);
var content = await new StreamReader(Request.Body, new UTF8Encoding(false), false, 1024, true).ReadToEndAsync();
@ -60,7 +65,10 @@ namespace BTCPayServer.Controllers
foreach (var output in match.Outputs)
{
await _Watcher.NotifyReceived(output.ScriptPubKey);
var evt = new TxOutReceivedEvent();
evt.ScriptPubKey = output.ScriptPubKey;
evt.Address = output.ScriptPubKey.GetDestinationAddress(_Network);
_EventAggregator.Publish(evt);
}
}
@ -69,8 +77,7 @@ namespace BTCPayServer.Controllers
public async Task NewBlock(string token)
{
await AssertToken(token);
Logs.PayServer.LogInformation("New block callback");
await _Watcher.NotifyBlock();
_EventAggregator.Publish(new NewBlockEvent());
}
private async Task AssertToken(string token)
@ -80,15 +87,15 @@ namespace BTCPayServer.Controllers
throw new BTCPayServer.BitpayHttpException(400, "invalid-callback-token");
}
public async Task<Uri> GetCallbackUriAsync(HttpRequest request)
public async Task<Uri> GetCallbackUriAsync()
{
string token = await GetToken();
return BuildCallbackUri(request, "callbacks/transactions?token=" + token);
return BuildCallbackUri("callbacks/transactions?token=" + token);
}
public async Task RegisterCallbackUriAsync(DerivationStrategyBase derivationScheme, HttpRequest request)
public async Task RegisterCallbackUriAsync(DerivationStrategyBase derivationScheme)
{
var uri = await GetCallbackUriAsync(request);
var uri = await GetCallbackUriAsync();
await _Explorer.SubscribeToWalletAsync(uri, derivationScheme);
}
@ -104,19 +111,29 @@ namespace BTCPayServer.Controllers
return token;
}
public async Task<Uri> GetCallbackBlockUriAsync(HttpRequest request)
public async Task<Uri> GetCallbackBlockUriAsync()
{
string token = await GetToken();
return BuildCallbackUri(request, "callbacks/blocks?token=" + token);
return BuildCallbackUri("callbacks/blocks?token=" + token);
}
private Uri BuildCallbackUri(HttpRequest request, string callbackPath)
private Uri BuildCallbackUri(string callbackPath)
{
string baseUrl = _Options.InternalUrl == null ? request.GetAbsoluteRoot() : _Options.InternalUrl.AbsolutePath;
var address = _Server.Features.Get<IServerAddressesFeature>().Addresses
.Select(c => new Uri(TransformToRoutable(c)))
.First();
var baseUrl = _Options.InternalUrl == null ? address.AbsoluteUri : _Options.InternalUrl.AbsoluteUri;
baseUrl = baseUrl.WithTrailingSlash();
return new Uri(baseUrl + callbackPath);
}
private string TransformToRoutable(string host)
{
if (host.StartsWith("http://0.0.0.0"))
host = host.Replace("http://0.0.0.0", "http://127.0.0.1");
return host;
}
public async Task<Uri> RegisterCallbackBlockUriAsync(Uri uri)
{
await _Explorer.SubscribeToBlocksAsync(uri);

View File

@ -198,7 +198,7 @@ namespace BTCPayServer.Controllers
{
var strategy = ParseDerivationStrategy(model.DerivationScheme, model.DerivationSchemeFormat);
await _Wallet.TrackAsync(strategy);
await _CallbackController.RegisterCallbackUriAsync(strategy, Request);
await _CallbackController.RegisterCallbackUriAsync(strategy);
}
store.DerivationStrategy = model.DerivationScheme;
}

View File

@ -0,0 +1,130 @@
using System;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using BTCPayServer.Logging;
namespace BTCPayServer
{
public interface IEventAggregatorSubscription : IDisposable
{
void Unsubscribe();
void Resubscribe();
}
public class EventAggregator : IDisposable
{
class Subscription : IEventAggregatorSubscription
{
private EventAggregator aggregator;
Type t;
Action<object> act;
public Subscription(EventAggregator aggregator, Type t)
{
this.aggregator = aggregator;
this.t = t;
}
public Action<Object> Act { get; set; }
public void Dispose()
{
lock (this.aggregator._Subscriptions)
{
if (this.aggregator._Subscriptions.TryGetValue(t, out Dictionary<Subscription, Action<object>> actions))
{
if (actions.Remove(this))
{
if (actions.Count == 0)
this.aggregator._Subscriptions.Remove(t);
}
}
}
}
public void Resubscribe()
{
aggregator.Subscribe(t, this);
}
public void Unsubscribe()
{
Dispose();
}
}
public void Publish<T>(T evt) where T : class
{
if (evt == null)
throw new ArgumentNullException(nameof(evt));
List<Action<object>> actionList = new List<Action<object>>();
lock (_Subscriptions)
{
if (_Subscriptions.TryGetValue(typeof(T), out Dictionary<Subscription, Action<object>> actions))
{
actionList = actions.Values.ToList();
}
}
Logs.Events.LogInformation($"New event: {evt.ToString()}");
foreach (var sub in actionList)
{
try
{
sub(evt);
}
catch (Exception ex)
{
Logs.Events.LogError(ex, $"Error while calling event handler");
}
}
}
public IEventAggregatorSubscription Subscribe<T>(Action<IEventAggregatorSubscription, T> subscription)
{
var eventType = typeof(T);
var s = new Subscription(this, eventType);
s.Act = (o) => subscription(s, (T)o);
return Subscribe(eventType, s);
}
private IEventAggregatorSubscription Subscribe(Type eventType, Subscription subscription)
{
lock (_Subscriptions)
{
if (!_Subscriptions.TryGetValue(eventType, out Dictionary<Subscription, Action<object>> actions))
{
actions = new Dictionary<Subscription, Action<object>>();
_Subscriptions.Add(eventType, actions);
}
actions.Add(subscription, subscription.Act);
}
return subscription;
}
Dictionary<Type, Dictionary<Subscription, Action<object>>> _Subscriptions = new Dictionary<Type, Dictionary<Subscription, Action<object>>>();
public IEventAggregatorSubscription Subscribe<T, TReturn>(Func<T, TReturn> subscription)
{
return Subscribe(new Action<T>((t) => subscription(t)));
}
public IEventAggregatorSubscription Subscribe<T, TReturn>(Func<IEventAggregatorSubscription, T, TReturn> subscription)
{
return Subscribe(new Action<IEventAggregatorSubscription, T>((sub, t) => subscription(sub, t)));
}
public IEventAggregatorSubscription Subscribe<T>(Action<T> subscription)
{
return Subscribe(new Action<IEventAggregatorSubscription, T>((sub, t) => subscription(t)));
}
public void Dispose()
{
lock (_Subscriptions)
{
_Subscriptions.Clear();
}
}
}
}

View File

@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace BTCPayServer.Events
{
public class InvoiceDataChangedEvent
{
public string InvoiceId { get; set; }
public override string ToString()
{
return $"Invoice {InvoiceId} data changed";
}
}
}

View File

@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace BTCPayServer.Events
{
public class InvoiceStatusChangedEvent
{
public string InvoiceId { get; set; }
public string OldState { get; set; }
public string NewState { get; set; }
public override string ToString()
{
return $"Invoice {InvoiceId} changed status: {OldState} => {NewState}";
}
}
}

View File

@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace BTCPayServer.Events
{
public class NBXplorerStateChangedEvent
{
public NBXplorerStateChangedEvent(NBXplorerState old, NBXplorerState newState)
{
NewState = newState;
OldState = old;
}
public NBXplorerState NewState { get; set; }
public NBXplorerState OldState { get; set; }
public override string ToString()
{
return $"NBXplorer: {OldState} => {NewState}";
}
}
}

View File

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace BTCPayServer.Events
{
public class NewBlockEvent
{
public override string ToString()
{
return "New block";
}
}
}

View File

@ -0,0 +1,20 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using NBitcoin;
namespace BTCPayServer.Events
{
public class TxOutReceivedEvent
{
public Script ScriptPubKey { get; set; }
public BitcoinAddress Address { get; set; }
public override string ToString()
{
String address = Address?.ToString() ?? ScriptPubKey.ToHex();
return $"{address} received a transaction";
}
}
}

View File

@ -106,6 +106,7 @@ namespace BTCPayServer.Hosting
});
services.AddSingleton<BTCPayServerEnvironment>();
services.TryAddSingleton<TokenRepository>();
services.TryAddSingleton<EventAggregator>();
services.TryAddSingleton<Network>(o => o.GetRequiredService<BTCPayServerOptions>().Network);
services.TryAddSingleton<ApplicationDbContextFactory>(o =>
{
@ -158,11 +159,11 @@ namespace BTCPayServer.Hosting
return new CachedRateProvider(new FallbackRateProvider(new IRateProvider[] { coinaverage, bitpay }), o.GetRequiredService<IMemoryCache>()) { CacheSpan = TimeSpan.FromMinutes(1.0) };
});
services.TryAddSingleton<InvoiceNotificationManager>();
services.AddSingleton<IHostedService, InvoiceNotificationManager>();
services.TryAddSingleton<InvoiceWatcherAccessor>();
services.AddSingleton<IHostedService, InvoiceWatcher>();
services.TryAddSingleton<Initializer>();
services.TryAddScoped<IHttpContextAccessor, HttpContextAccessor>();
services.TryAddSingleton<IAuthorizationHandler, OwnStoreHandler>();
services.AddTransient<AccessTokenController>();
@ -197,6 +198,11 @@ namespace BTCPayServer.Hosting
scope.ServiceProvider.GetRequiredService<ApplicationDbContext>().Database.Migrate();
});
}
var initialize = app.ApplicationServices.GetService<Initializer>();
initialize.Init();
app.UseMiddleware<BTCPayMiddleware>();
return app;
}

View File

@ -31,19 +31,15 @@ namespace BTCPayServer.Hosting
RequestDelegate _Next;
CallbackController _CallbackController;
BTCPayServerOptions _Options;
private NBXplorerWaiterAccessor _NbxplorerAwaiter;
public BTCPayMiddleware(RequestDelegate next,
TokenRepository tokenRepo,
BTCPayServerOptions options,
NBXplorerWaiterAccessor nbxplorerAwaiter,
CallbackController callbackController)
{
_TokenRepository = tokenRepo ?? throw new ArgumentNullException(nameof(tokenRepo));
_Next = next ?? throw new ArgumentNullException(nameof(next));
_CallbackController = callbackController;
_Options = options ?? throw new ArgumentNullException(nameof(options));
_NbxplorerAwaiter = (nbxplorerAwaiter ?? throw new ArgumentNullException(nameof(nbxplorerAwaiter)));
}
@ -52,7 +48,6 @@ namespace BTCPayServer.Hosting
public async Task Invoke(HttpContext httpContext)
{
RewriteHostIfNeeded(httpContext);
await EnsureBlockCallbackRegistered(httpContext);
httpContext.Request.Headers.TryGetValue("x-signature", out StringValues values);
var sig = values.FirstOrDefault();
@ -154,28 +149,6 @@ namespace BTCPayServer.Hosting
}
}
private async Task EnsureBlockCallbackRegistered(HttpContext httpContext)
{
if (!_Registered)
{
var callback = await _CallbackController.GetCallbackBlockUriAsync(httpContext.Request);
var unused = _NbxplorerAwaiter.Instance.WhenReady(async c =>
{
try
{
await _CallbackController.RegisterCallbackBlockUriAsync(callback);
Logs.PayServer.LogInformation($"Registering block callback to " + callback);
}
catch (Exception ex)
{
Logs.PayServer.LogError(ex, "Could not register block callback");
}
return true;
});
_Registered = true;
}
}
private static async Task HandleBitpayHttpException(HttpContext httpContext, BitpayHttpException ex)
{
httpContext.Response.StatusCode = ex.StatusCode;

View File

@ -0,0 +1,45 @@
using System;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using BTCPayServer.Controllers;
using BTCPayServer.Logging;
using BTCPayServer.Events;
namespace BTCPayServer
{
public class Initializer
{
EventAggregator _Aggregator;
CallbackController _CallbackController;
public Initializer(EventAggregator aggregator,
CallbackController callbackController
)
{
_Aggregator = aggregator;
_CallbackController = callbackController;
}
public void Init()
{
_Aggregator.Subscribe<NBXplorerStateChangedEvent>(async (s, evt) =>
{
if (evt.NewState == NBXplorerState.Ready)
{
s.Unsubscribe();
try
{
var callback = await _CallbackController.GetCallbackBlockUriAsync();
await _CallbackController.RegisterCallbackBlockUriAsync(callback);
Logs.PayServer.LogInformation($"Registering block callback to " + callback);
}
catch (Exception ex)
{
Logs.PayServer.LogError(ex, "Could not register block callback");
s.Resubscribe();
}
}
});
}
}
}

View File

@ -17,6 +17,7 @@ namespace BTCPayServer.Logging
{
Configuration = factory.CreateLogger("Configuration");
PayServer = factory.CreateLogger("PayServer");
Events = factory.CreateLogger("PayServer");
}
public static ILogger Configuration
{
@ -26,6 +27,12 @@ namespace BTCPayServer.Logging
{
get; set;
}
public static ILogger Events
{
get; set;
}
public const int ColumnLength = 16;
}

View File

@ -9,6 +9,7 @@ using Microsoft.Extensions.Hosting;
using NBXplorer;
using NBXplorer.Models;
using System.Collections.Concurrent;
using BTCPayServer.Events;
namespace BTCPayServer
{
@ -22,14 +23,17 @@ namespace BTCPayServer
Synching,
Ready
}
public class NBXplorerWaiter : IHostedService
{
public NBXplorerWaiter(ExplorerClient client, NBXplorerWaiterAccessor accessor)
public NBXplorerWaiter(ExplorerClient client, EventAggregator aggregator, NBXplorerWaiterAccessor accessor)
{
_Client = client;
_Aggregator = aggregator;
accessor.Instance = this;
}
EventAggregator _Aggregator;
ExplorerClient _Client;
Timer _Timer;
ManualResetEventSlim _Idle = new ManualResetEventSlim(true);
@ -64,15 +68,6 @@ namespace BTCPayServer
{
}
List<Task> tasks = new List<Task>();
if (State == NBXplorerState.Ready)
{
while (_WhenReady.TryDequeue(out Func<ExplorerClient, Task> act))
{
tasks.Add(act(_Client));
}
}
await Task.WhenAll(tasks);
}
private async Task<bool> StepAsync()
@ -123,7 +118,6 @@ namespace BTCPayServer
LastStatus = status;
if (oldState != State)
{
Logs.PayServer.LogInformation($"NBXplorerWaiter status changed: {oldState} => {State}");
if (State == NBXplorerState.Synching)
{
SetInterval(TimeSpan.FromSeconds(10));
@ -132,6 +126,7 @@ namespace BTCPayServer
{
SetInterval(TimeSpan.FromMinutes(1));
}
_Aggregator.Publish(new NBXplorerStateChangedEvent(oldState, State));
}
return oldState != State;
}
@ -145,28 +140,6 @@ namespace BTCPayServer
catch { }
}
public Task<T> WhenReady<T>(Func<ExplorerClient, Task<T>> act)
{
if (State == NBXplorerState.Ready)
return act(_Client);
TaskCompletionSource<T> completion = new TaskCompletionSource<T>();
_WhenReady.Enqueue(async client =>
{
try
{
var result = await act(client);
completion.SetResult(result);
}
catch (Exception ex)
{
completion.SetException(ex);
}
});
return completion.Task;
}
ConcurrentQueue<Func<ExplorerClient, Task>> _WhenReady = new ConcurrentQueue<Func<ExplorerClient, Task>>();
private async Task<StatusResult> GetStatusWithTimeout()
{
CancellationTokenSource cts = new CancellationTokenSource();

View File

@ -15,10 +15,12 @@ using NBitpayClient;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json;
using System.Collections.Concurrent;
using Microsoft.Extensions.Hosting;
using BTCPayServer.Events;
namespace BTCPayServer.Services.Invoices
{
public class InvoiceNotificationManager
public class InvoiceNotificationManager : IHostedService
{
public static HttpClient _Client = new HttpClient();
@ -41,16 +43,32 @@ namespace BTCPayServer.Services.Invoices
}
IBackgroundJobClient _JobClient;
EventAggregator _EventAggregator;
InvoiceRepository _InvoiceRepository;
public InvoiceNotificationManager(
IBackgroundJobClient jobClient,
EventAggregator eventAggregator,
InvoiceRepository invoiceRepository,
ILogger<InvoiceNotificationManager> logger)
{
Logger = logger as ILogger ?? NullLogger.Instance;
_JobClient = jobClient;
_EventAggregator = eventAggregator;
_InvoiceRepository = invoiceRepository;
}
public void Notify(InvoiceEntity invoice)
async Task Notify(InvoiceEntity invoice)
{
CancellationTokenSource cts = new CancellationTokenSource(10000);
try
{
await SendNotification(invoice, cts.Token);
return;
}
catch // It fails, it is OK because we try with hangfire after
{
}
var invoiceStr = NBitcoin.JsonConverters.Serializer.ToString(new ScheduledJob() { TryCount = 0, Invoice = invoice });
if (!string.IsNullOrEmpty(invoice.NotificationURL))
_JobClient.Schedule(() => NotifyHttp(invoiceStr), TimeSpan.Zero);
@ -70,31 +88,7 @@ namespace BTCPayServer.Services.Invoices
CancellationTokenSource cts = new CancellationTokenSource(10000);
try
{
var request = new HttpRequestMessage();
request.Method = HttpMethod.Post;
var dto = job.Invoice.EntityToDTO();
InvoicePaymentNotification notification = new InvoicePaymentNotification()
{
Id = dto.Id,
Url = dto.Url,
BTCDue = dto.BTCDue,
BTCPaid = dto.BTCPaid,
BTCPrice = dto.BTCPrice,
Currency = dto.Currency,
CurrentTime = dto.CurrentTime,
ExceptionStatus = dto.ExceptionStatus,
ExpirationTime = dto.ExpirationTime,
InvoiceTime = dto.InvoiceTime,
PosData = dto.PosData,
Price = dto.Price,
Rate = dto.Rate,
Status = dto.Status,
BuyerFields = job.Invoice.RefundMail == null ? null : new Newtonsoft.Json.Linq.JObject() { new JProperty("buyerEmail", job.Invoice.RefundMail) }
};
request.RequestUri = new Uri(job.Invoice.NotificationURL, UriKind.Absolute);
request.Content = new StringContent(JsonConvert.SerializeObject(notification), Encoding.UTF8, "application/json");
var response = await _Client.SendAsync(request, cts.Token);
HttpResponseMessage response = await SendNotification(job.Invoice, cts.Token);
reschedule = response.StatusCode != System.Net.HttpStatusCode.OK;
Logger.LogInformation("Job " + jobId + " returned " + response.StatusCode);
}
@ -116,11 +110,73 @@ namespace BTCPayServer.Services.Invoices
}
}
private static async Task<HttpResponseMessage> SendNotification(InvoiceEntity invoice, CancellationToken cancellation)
{
var request = new HttpRequestMessage();
request.Method = HttpMethod.Post;
var dto = invoice.EntityToDTO();
InvoicePaymentNotification notification = new InvoicePaymentNotification()
{
Id = dto.Id,
Url = dto.Url,
BTCDue = dto.BTCDue,
BTCPaid = dto.BTCPaid,
BTCPrice = dto.BTCPrice,
Currency = dto.Currency,
CurrentTime = dto.CurrentTime,
ExceptionStatus = dto.ExceptionStatus,
ExpirationTime = dto.ExpirationTime,
InvoiceTime = dto.InvoiceTime,
PosData = dto.PosData,
Price = dto.Price,
Rate = dto.Rate,
Status = dto.Status,
BuyerFields = invoice.RefundMail == null ? null : new Newtonsoft.Json.Linq.JObject() { new JProperty("buyerEmail", invoice.RefundMail) }
};
request.RequestUri = new Uri(invoice.NotificationURL, UriKind.Absolute);
request.Content = new StringContent(JsonConvert.SerializeObject(notification), Encoding.UTF8, "application/json");
var response = await _Client.SendAsync(request, cancellation);
return response;
}
int MaxTry = 6;
private static string GetHttpJobId(InvoiceEntity invoice)
{
return $"{invoice.Id}-{invoice.Status}-HTTP";
}
CompositeDisposable leases = new CompositeDisposable();
public Task StartAsync(CancellationToken cancellationToken)
{
leases.Add(_EventAggregator.Subscribe<InvoiceStatusChangedEvent>(async e =>
{
var invoice = await _InvoiceRepository.GetInvoice(null, e.InvoiceId);
// we need to use the status in the event and not in the invoice. The invoice might now be in another status.
if (invoice.FullNotifications)
{
if (e.NewState == "expired" ||
e.NewState == "paid" ||
e.NewState == "invalid" ||
e.NewState == "complete"
)
await Notify(invoice);
}
if(e.NewState == "confirmed")
{
await Notify(invoice);
}
}));
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
leases.Dispose();
return Task.CompletedTask;
}
}
}

View File

@ -13,6 +13,8 @@ using Microsoft.Extensions.Hosting;
using System.Collections.Concurrent;
using Hangfire;
using BTCPayServer.Services.Wallets;
using BTCPayServer.Controllers;
using BTCPayServer.Events;
namespace BTCPayServer.Services.Invoices
{
@ -25,14 +27,14 @@ namespace BTCPayServer.Services.Invoices
InvoiceRepository _InvoiceRepository;
ExplorerClient _ExplorerClient;
DerivationStrategyFactory _DerivationFactory;
InvoiceNotificationManager _NotificationManager;
EventAggregator _EventAggregator;
BTCPayWallet _Wallet;
public InvoiceWatcher(ExplorerClient explorerClient,
InvoiceRepository invoiceRepository,
EventAggregator eventAggregator,
BTCPayWallet wallet,
InvoiceNotificationManager notificationManager,
InvoiceWatcherAccessor accessor)
{
LongPollingMode = explorerClient.Network == Network.RegTest;
@ -41,23 +43,24 @@ namespace BTCPayServer.Services.Invoices
_ExplorerClient = explorerClient ?? throw new ArgumentNullException(nameof(explorerClient));
_DerivationFactory = new DerivationStrategyFactory(_ExplorerClient.Network);
_InvoiceRepository = invoiceRepository ?? throw new ArgumentNullException(nameof(invoiceRepository));
_NotificationManager = notificationManager ?? throw new ArgumentNullException(nameof(notificationManager));
_EventAggregator = eventAggregator ?? throw new ArgumentNullException(nameof(eventAggregator));
accessor.Instance = this;
}
CompositeDisposable leases = new CompositeDisposable();
public bool LongPollingMode
{
get; set;
}
public async Task NotifyReceived(Script scriptPubKey)
async Task NotifyReceived(Script scriptPubKey)
{
var invoice = await _InvoiceRepository.GetInvoiceIdFromScriptPubKey(scriptPubKey);
if (invoice != null)
_WatchRequests.Add(invoice);
}
public async Task NotifyBlock()
async Task NotifyBlock()
{
foreach (var invoice in await _InvoiceRepository.GetPendingInvoices())
{
@ -76,17 +79,24 @@ namespace BTCPayServer.Services.Invoices
if (invoice == null)
break;
var stateBefore = invoice.Status;
var result = await UpdateInvoice(changes, invoice).ConfigureAwait(false);
var stateChanges = new List<string>();
var result = await UpdateInvoice(changes, invoice, stateChanges).ConfigureAwait(false);
changes = result.Changes;
if (result.NeedSave)
{
await _InvoiceRepository.UpdateInvoiceStatus(invoice.Id, invoice.Status, invoice.ExceptionStatus).ConfigureAwait(false);
var changed = stateBefore != invoice.Status;
if (changed)
{
Logs.PayServer.LogInformation($"Invoice {invoice.Id}: {stateBefore} => {invoice.Status}");
_EventAggregator.Publish(new InvoiceDataChangedEvent() { InvoiceId = invoice.Id });
}
var changed = stateBefore != invoice.Status;
foreach(var stateChange in stateChanges)
{
_EventAggregator.Publish(new InvoiceStatusChangedEvent() { InvoiceId = invoice.Id, NewState = stateChange, OldState = stateBefore });
stateBefore = stateChange;
}
if (invoice.Status == "complete" ||
((invoice.Status == "invalid" || invoice.Status == "expired") && invoice.MonitoringExpiration < DateTimeOffset.UtcNow))
{
@ -111,7 +121,7 @@ namespace BTCPayServer.Services.Invoices
}
private async Task<(bool NeedSave, UTXOChanges Changes)> UpdateInvoice(UTXOChanges changes, InvoiceEntity invoice)
private async Task<(bool NeedSave, UTXOChanges Changes)> UpdateInvoice(UTXOChanges changes, InvoiceEntity invoice, List<string> stateChanges)
{
bool needSave = false;
//Fetch unknown payments
@ -139,10 +149,7 @@ namespace BTCPayServer.Services.Invoices
needSave = true;
await _InvoiceRepository.UnaffectAddress(invoice.Id);
invoice.Status = "expired";
if (invoice.FullNotifications)
{
_NotificationManager.Notify(invoice);
}
stateChanges.Add(invoice.Status);
}
if (invoice.Status == "new" || invoice.Status == "expired")
@ -153,10 +160,7 @@ namespace BTCPayServer.Services.Invoices
if (invoice.Status == "new")
{
invoice.Status = "paid";
if (invoice.FullNotifications)
{
_NotificationManager.Notify(invoice);
}
stateChanges.Add(invoice.Status);
invoice.ExceptionStatus = null;
await _InvoiceRepository.UnaffectAddress(invoice.Id);
needSave = true;
@ -216,11 +220,8 @@ namespace BTCPayServer.Services.Invoices
{
await _InvoiceRepository.UnaffectAddress(invoice.Id);
invoice.Status = "invalid";
stateChanges.Add(invoice.Status);
needSave = true;
if (invoice.FullNotifications)
{
_NotificationManager.Notify(invoice);
}
}
else
{
@ -229,7 +230,7 @@ namespace BTCPayServer.Services.Invoices
{
await _InvoiceRepository.UnaffectAddress(invoice.Id);
invoice.Status = "confirmed";
_NotificationManager.Notify(invoice);
stateChanges.Add(invoice.Status);
needSave = true;
}
}
@ -243,8 +244,7 @@ namespace BTCPayServer.Services.Invoices
if (totalConfirmed >= invoice.GetTotalCryptoDue())
{
invoice.Status = "complete";
if (invoice.FullNotifications)
_NotificationManager.Notify(invoice);
stateChanges.Add(invoice.Status);
needSave = true;
}
}
@ -356,6 +356,10 @@ namespace BTCPayServer.Services.Invoices
_WatchRequests.Add(pending);
}
}, null, 0, (int)PollInterval.TotalMilliseconds);
leases.Add(_EventAggregator.Subscribe<NewBlockEvent>(async b => { await NotifyBlock(); }));
leases.Add(_EventAggregator.Subscribe<TxOutReceivedEvent>(async b => { await NotifyReceived(b.ScriptPubKey); }));
return Task.CompletedTask;
}
@ -402,6 +406,7 @@ namespace BTCPayServer.Services.Invoices
public Task StopAsync(CancellationToken cancellationToken)
{
leases.Dispose();
_UpdatePendingInvoices.Dispose();
_Cts.Cancel();
return Task.WhenAny(_RunningTask.Task, Task.Delay(-1, cancellationToken));