Try to add server events

This commit is contained in:
Dennis Reimann 2024-06-27 16:02:06 +02:00
parent 5196bc4548
commit 61f37368f4
No known key found for this signature in database
GPG key ID: 5009E1797F03F8D0
11 changed files with 188 additions and 42 deletions

View file

@ -22,8 +22,7 @@ public class TransactionDetectedRequest
//methods available on the hub in the client
public interface IBTCPayAppHubClient
{
Task NotifyServerEvent(string eventName);
Task NotifyServerEvent(IServerEvent ev);
Task NotifyNetwork(string network);
Task NotifyServerNode(string nodeInfo);
Task TransactionDetected(TransactionDetectedRequest request);
@ -37,6 +36,25 @@ public interface IBTCPayAppHubClient
Task<PayResponse> PayInvoice(string bolt11, long? amountMilliSatoshi);
}
public interface IServerEvent
{
public string Type { get; }
}
public interface IServerEvent<T> : IServerEvent
{
public T? Event { get; }
}
public class ServerEvent(string type) : IServerEvent
{
public string Type { get; } = type;
}
public class ServerEvent<T>(string type, T? evt = default) : ServerEvent(type), IServerEvent<T>
{
public T? Event { get; } = evt;
}
public record TxResp(long Confirmations, long? Height, decimal BalanceChange, DateTimeOffset Timestamp, string TransactionId)
{
public override string ToString()

View file

@ -8,14 +8,17 @@ using BTCPayApp.CommonServer;
using BTCPayApp.CommonServer.Models;
using BTCPayServer.Abstractions.Constants;
using BTCPayServer.Data;
using BTCPayServer.Events;
using BTCPayServer.HostedServices;
using BTCPayServer.Services;
using BTCPayServer.Services.Stores;
using BTCPayServer.Services.Wallets;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Identity;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using NBitcoin;
using NBXplorer;
using NBXplorer.DerivationStrategy;
using NBXplorer.Models;
using Newtonsoft.Json;
@ -97,6 +100,9 @@ public class BTCPayAppHub : Hub<IBTCPayAppHubClient>, IBTCPayAppHubServer
private readonly IFeeProviderFactory _feeProviderFactory;
private readonly ILogger<BTCPayAppHub> _logger;
private readonly UserManager<ApplicationUser> _userManager;
private readonly StoreRepository _storeRepository;
private readonly EventAggregator _eventAggregator;
private CompositeDisposable? _compositeDisposable;
public BTCPayAppHub(BTCPayNetworkProvider btcPayNetworkProvider,
NBXplorerDashboard nbXplorerDashboard,
@ -104,6 +110,8 @@ public class BTCPayAppHub : Hub<IBTCPayAppHubClient>, IBTCPayAppHubServer
ExplorerClientProvider explorerClientProvider,
IFeeProviderFactory feeProviderFactory,
ILogger<BTCPayAppHub> logger,
StoreRepository storeRepository,
EventAggregator eventAggregator,
UserManager<ApplicationUser> userManager)
{
_btcPayNetworkProvider = btcPayNetworkProvider;
@ -113,35 +121,53 @@ public class BTCPayAppHub : Hub<IBTCPayAppHubClient>, IBTCPayAppHubServer
_feeProviderFactory = feeProviderFactory;
_logger = logger;
_userManager = userManager;
_storeRepository = storeRepository;
_eventAggregator = eventAggregator;
}
public override async Task OnConnectedAsync()
{
await _appState.Connected(Context.ConnectionId);
//TODO: this needs to happen BEFORE connection is established
// TODO: this needs to happen BEFORE connection is established
if (!_nbXplorerDashboard.IsFullySynched(_btcPayNetworkProvider.BTC.CryptoCode, out _))
{
Context.Abort();
return;
}
await Groups.AddToGroupAsync(Context.ConnectionId,_userManager.GetUserId(Context.User));
var userId = _userManager.GetUserId(Context.User!)!;
var userStores = await _storeRepository.GetStoresByUserId(userId);
await Clients.Client(Context.ConnectionId).NotifyNetwork(_btcPayNetworkProvider.BTC.NBitcoinNetwork.ToString());
await Groups.AddToGroupAsync(Context.ConnectionId, userId);
foreach (var userStore in userStores)
{
await Groups.AddToGroupAsync(Context.ConnectionId, userStore.Id);
}
_compositeDisposable = new CompositeDisposable();
_compositeDisposable.Add(_eventAggregator.SubscribeAsync<UserStoreAddedEvent>(StoreUserAddedEvent));
_compositeDisposable.Add(_eventAggregator.SubscribeAsync<UserStoreRemovedEvent>(StoreUserRemovedEvent));
}
public override async Task OnDisconnectedAsync(Exception? exception)
{
_compositeDisposable?.Dispose();
await _appState.Disconnected(Context.ConnectionId);
await base.OnDisconnectedAsync(exception);
}
private async Task StoreUserAddedEvent(UserStoreAddedEvent arg)
{
// TODO: Groups and COntext are not accessible here
// await Groups.AddToGroupAsync(Context.ConnectionId, arg.StoreId);
}
private async Task StoreUserRemovedEvent(UserStoreRemovedEvent arg)
{
// TODO: Groups and COntext are not accessible here
// await Groups.RemoveFromGroupAsync(Context.ConnectionId, arg.UserId);
}
public async Task<bool> BroadcastTransaction(string tx)
{
@ -158,15 +184,12 @@ public class BTCPayAppHub : Hub<IBTCPayAppHubClient>, IBTCPayAppHubServer
var feeProvider = _feeProviderFactory.CreateFeeProvider( _btcPayNetworkProvider.BTC);
try
{
return (await feeProvider.GetFeeRateAsync(blockTarget)).SatoshiPerByte;
}
finally
{
_logger.LogInformation($"Getting fee rate for {blockTarget} done");
}
}
public async Task<BestBlockResponse> GetBestBlock()
@ -318,22 +341,16 @@ var resultPsbt = PSBT.Parse(psbt, explorerClient.Network.NBitcoinNetwork);
await _appState.PaymentUpdate(identifier, lightningPayment);
}
public async Task<bool> IdentifierActive(string group, bool active)
{
return await _appState.IdentifierActive(group, Context.ConnectionId, active);
}
public async Task<Dictionary<string, string>> Pair(PairRequest request)
{
return await _appState.Pair(Context.ConnectionId, request);
}
public async Task<AppHandshakeResponse> Handshake(AppHandshake request)
{

View file

@ -3,10 +3,8 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using AngleSharp.Dom.Events;
using BTCPayApp.CommonServer;
using BTCPayServer.Events;
using BTCPayServer.Lightning;
@ -37,6 +35,8 @@ public class BTCPayAppState : IHostedService
public ExplorerClient ExplorerClient { get; private set; }
private DerivationSchemeParser _derivationSchemeParser;
public readonly ConcurrentDictionary<string, string> GroupToConnectionId = new(StringComparer.InvariantCultureIgnoreCase);
private CancellationTokenSource? _cts;
// private readonly ConcurrentDictionary<string, TrackedSource> _connectionScheme = new();
public event EventHandler<(string,LightningPayment)>? OnPaymentUpdate;
@ -60,20 +60,66 @@ public class BTCPayAppState : IHostedService
_cts ??= new CancellationTokenSource();
ExplorerClient = _explorerClientProvider.GetExplorerClient("BTC");
_derivationSchemeParser = new DerivationSchemeParser(_networkProvider.BTC);
_compositeDisposable = new();
_compositeDisposable.Add(
_eventAggregator.Subscribe<NewBlockEvent>(OnNewBlock));
_compositeDisposable.Add(
_eventAggregator.SubscribeAsync<NewOnChainTransactionEvent>(OnNewTransaction));
_compositeDisposable.Add(
_eventAggregator.SubscribeAsync<UserNotificationsUpdatedEvent>(UserNotificationsUpdatedEvent));
_compositeDisposable = new CompositeDisposable();
_compositeDisposable.Add(_eventAggregator.Subscribe<NewBlockEvent>(OnNewBlock));
_compositeDisposable.Add(_eventAggregator.SubscribeAsync<NewOnChainTransactionEvent>(OnNewTransaction));
_compositeDisposable.Add(_eventAggregator.SubscribeAsync<UserNotificationsUpdatedEvent>(UserNotificationsUpdatedEvent));
_compositeDisposable.Add(_eventAggregator.SubscribeAsync<InvoiceEvent>(InvoiceChangedEvent));
// Store events
_compositeDisposable.Add(_eventAggregator.SubscribeAsync<StoreCreatedEvent>(StoreCreatedEvent));
_compositeDisposable.Add(_eventAggregator.SubscribeAsync<StoreUpdatedEvent>(StoreUpdatedEvent));
_compositeDisposable.Add(_eventAggregator.SubscribeAsync<StoreRemovedEvent>(StoreRemovedEvent));
_compositeDisposable.Add(_eventAggregator.SubscribeAsync<UserStoreAddedEvent>(StoreUserAddedEvent));
_compositeDisposable.Add(_eventAggregator.SubscribeAsync<UserStoreUpdatedEvent>(StoreUserUpdatedEvent));
_compositeDisposable.Add(_eventAggregator.SubscribeAsync<UserStoreRemovedEvent>(StoreUserRemovedEvent));
_ = UpdateNodeInfo();
return Task.CompletedTask;
}
private async Task InvoiceChangedEvent(InvoiceEvent arg)
{
await _hubContext.Clients.Group(arg.Invoice.StoreId).NotifyServerEvent(new ServerEvent<InvoiceEvent>("invoice-updated", arg));
}
private async Task UserNotificationsUpdatedEvent(UserNotificationsUpdatedEvent arg)
{
await _hubContext.Clients.Group(arg.UserId).NotifyServerEvent("notifications-updated");
await _hubContext.Clients.Group(arg.UserId).NotifyServerEvent(new ServerEvent<UserNotificationsUpdatedEvent>("notifications-updated", arg));
}
private async Task StoreCreatedEvent(StoreCreatedEvent arg)
{
await _hubContext.Clients.Group(arg.Store.Id).NotifyServerEvent(new ServerEvent<StoreCreatedEvent>("store-created", arg));
}
private async Task StoreUpdatedEvent(StoreUpdatedEvent arg)
{
await _hubContext.Clients.Group(arg.Store.Id).NotifyServerEvent(new ServerEvent<StoreUpdatedEvent>("store-updated", arg));
}
private async Task StoreRemovedEvent(StoreRemovedEvent arg)
{
await _hubContext.Clients.Group(arg.StoreId).NotifyServerEvent(new ServerEvent<StoreRemovedEvent>("store-removed", arg));
}
private async Task StoreUserAddedEvent(UserStoreAddedEvent arg)
{
var ev = new ServerEvent<UserStoreAddedEvent>("user-store-added", arg);
await _hubContext.Clients.Group(arg.StoreId).NotifyServerEvent(ev);
await _hubContext.Clients.Group(arg.UserId).NotifyServerEvent(ev);
}
private async Task StoreUserUpdatedEvent(UserStoreUpdatedEvent arg)
{
var ev = new ServerEvent<UserStoreUpdatedEvent>("user-store-updated", arg);
await _hubContext.Clients.Group(arg.StoreId).NotifyServerEvent(ev);
await _hubContext.Clients.Group(arg.UserId).NotifyServerEvent(ev);
}
private async Task StoreUserRemovedEvent(UserStoreRemovedEvent arg)
{
var ev = new ServerEvent<UserStoreRemovedEvent>("user-store-removed", arg);
await _hubContext.Clients.Group(arg.StoreId).NotifyServerEvent(ev);
await _hubContext.Clients.Group(arg.UserId).NotifyServerEvent(ev);
}
private string _nodeInfo = string.Empty;
@ -81,8 +127,6 @@ public class BTCPayAppState : IHostedService
{
while (!_cts.Token.IsCancellationRequested)
{
try
{
var res = await _serviceProvider.GetRequiredService<PaymentMethodHandlerDictionary>().GetLightningHandler(ExplorerClient.CryptoCode).GetNodeInfo(
@ -112,7 +156,6 @@ public class BTCPayAppState : IHostedService
}
await Task.Delay(TimeSpan.FromMinutes(string.IsNullOrEmpty(_nodeInfo)? 1:5), _cts.Token);
}
}
private async Task OnNewTransaction(NewOnChainTransactionEvent obj)
@ -202,9 +245,6 @@ public class BTCPayAppState : IHostedService
return result;
}
public readonly ConcurrentDictionary<string, string> GroupToConnectionId = new(StringComparer.InvariantCultureIgnoreCase);
private CancellationTokenSource _cts;
public async Task<bool> IdentifierActive(string group, string contextConnectionId, bool active)
{
@ -234,7 +274,6 @@ public class BTCPayAppState : IHostedService
{
GroupRemoved?.Invoke(this, group);
}
}
}

View file

@ -0,0 +1,13 @@
using BTCPayServer.Data;
namespace BTCPayServer.Events;
public class StoreCreatedEvent(StoreData store)
{
public StoreData Store { get; } = store;
public override string ToString()
{
return $"Store \"{Store.StoreName}\" has been created";
}
}

View file

@ -1,12 +1,9 @@
namespace BTCPayServer.Events;
public class StoreRemovedEvent
public class StoreRemovedEvent(string storeId)
{
public StoreRemovedEvent(string storeId)
{
StoreId = storeId;
}
public string StoreId { get; set; }
public string StoreId { get; } = storeId;
public override string ToString()
{
return $"Store {StoreId} has been removed";

View file

@ -0,0 +1,13 @@
using BTCPayServer.Data;
namespace BTCPayServer.Events;
public class StoreUpdatedEvent(StoreData store)
{
public StoreData Store { get; } = store;
public override string ToString()
{
return $"Store \"{Store.StoreName}\" has been updated";
}
}

View file

@ -0,0 +1,9 @@
namespace BTCPayServer.Events;
public class UserStoreAddedEvent(string storeId, string userId) : UserStoreEvent(storeId, userId)
{
public override string ToString()
{
return $"User {UserId} has been added to store {StoreId}";
}
}

View file

@ -0,0 +1,11 @@
namespace BTCPayServer.Events;
public abstract class UserStoreEvent(string storeId, string userId)
{
public string StoreId { get; } = storeId;
public string UserId { get; } = userId;
public new virtual string ToString()
{
return $"StoreUserEvent: User {UserId}, Store {StoreId}";
}
}

View file

@ -0,0 +1,9 @@
namespace BTCPayServer.Events;
public class UserStoreRemovedEvent(string storeId, string userId) : UserStoreEvent(storeId, userId)
{
public override string ToString()
{
return $"User {UserId} has been removed from store {StoreId}";
}
}

View file

@ -0,0 +1,9 @@
namespace BTCPayServer.Events;
public class UserStoreUpdatedEvent(string storeId, string userId) : UserStoreEvent(storeId, userId)
{
public override string ToString()
{
return $"User {UserId} and store {StoreId} relation has been updated";
}
}

View file

@ -295,6 +295,7 @@ namespace BTCPayServer.Services.Stores
try
{
await ctx.SaveChangesAsync();
_eventAggregator.Publish(new UserStoreAddedEvent(storeId, userId));
return true;
}
catch (DbUpdateException)
@ -310,10 +311,12 @@ namespace BTCPayServer.Services.Stores
roleId ??= await GetDefaultRole();
await using var ctx = _ContextFactory.CreateContext();
var userStore = await ctx.UserStore.FindAsync(userId, storeId);
var added = false;
if (userStore is null)
{
userStore = new UserStore { StoreDataId = storeId, ApplicationUserId = userId };
ctx.UserStore.Add(userStore);
added = true;
}
if (userStore.StoreRoleId == roleId.Id)
@ -323,6 +326,10 @@ namespace BTCPayServer.Services.Stores
try
{
await ctx.SaveChangesAsync();
UserStoreEvent evt = added
? new UserStoreAddedEvent(storeId, userId)
: new UserStoreUpdatedEvent(storeId, userId);
_eventAggregator.Publish(evt);
return true;
}
catch (DbUpdateException)
@ -364,8 +371,8 @@ namespace BTCPayServer.Services.Stores
ctx.UserStore.Add(userStore);
ctx.Entry(userStore).State = EntityState.Deleted;
await ctx.SaveChangesAsync();
_eventAggregator.Publish(new UserStoreRemovedEvent(storeId, userId));
return true;
}
private async Task DeleteStoreIfOrphan(string storeId)
@ -404,6 +411,8 @@ namespace BTCPayServer.Services.Stores
ctx.Add(storeData);
ctx.Add(userStore);
await ctx.SaveChangesAsync();
_eventAggregator.Publish(new StoreCreatedEvent(storeData));
_eventAggregator.Publish(new UserStoreAddedEvent(storeData.Id, userStore.ApplicationUserId));
}
public async Task<WebhookData[]> GetWebhooks(string storeId)
@ -552,6 +561,7 @@ namespace BTCPayServer.Services.Stores
{
ctx.Entry(existing).CurrentValues.SetValues(store);
await ctx.SaveChangesAsync().ConfigureAwait(false);
_eventAggregator.Publish(new StoreUpdatedEvent(store));
}
}
@ -573,6 +583,7 @@ retry:
try
{
await ctx.SaveChangesAsync();
_eventAggregator.Publish(new StoreRemovedEvent(store.Id));
}
catch (DbUpdateException ex) when (IsDeadlock(ex) && retry < 5)
{