Add polling for connection through websocket

This commit is contained in:
nicolas.dorier 2018-01-08 16:50:56 +09:00
parent 8753dd15de
commit 6003aa4236
2 changed files with 97 additions and 59 deletions

View File

@ -32,11 +32,6 @@ namespace BTCPayServer
return null;
}
internal object GetExplorerClient(object network)
{
throw new NotImplementedException();
}
public ExplorerClient GetExplorerClient(BTCPayNetwork network)
{
return GetExplorerClient(network.CryptoCode);

View File

@ -23,11 +23,15 @@ namespace BTCPayServer.HostedServices
InvoiceRepository _InvoiceRepository;
private TaskCompletionSource<bool> _RunningTask;
private CancellationTokenSource _Cts;
NBXplorerDashboard _Dashboards;
public NBXplorerListener(ExplorerClientProvider explorerClients,
NBXplorerDashboard dashboard,
InvoiceRepository invoiceRepository,
EventAggregator aggregator, IApplicationLifetime lifetime)
{
PollInterval = TimeSpan.FromMinutes(1.0);
_Dashboards = dashboard;
_InvoiceRepository = invoiceRepository;
_ExplorerClients = explorerClients;
_Aggregator = aggregator;
@ -36,6 +40,24 @@ namespace BTCPayServer.HostedServices
CompositeDisposable leases = new CompositeDisposable();
ConcurrentDictionary<string, NotificationSession> _Sessions = new ConcurrentDictionary<string, NotificationSession>();
private Timer _ListenPoller;
TimeSpan _PollInterval;
public TimeSpan PollInterval
{
get
{
return _PollInterval;
}
set
{
_PollInterval = value;
if (_ListenPoller != null)
{
_ListenPoller.Change(0, (int)value.TotalMilliseconds);
}
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
@ -45,61 +67,22 @@ namespace BTCPayServer.HostedServices
{
if (nbxplorerEvent.NewState == NBXplorerState.Ready)
{
if (_Sessions.ContainsKey(nbxplorerEvent.Network.CryptoCode))
return;
var client = _ExplorerClients.GetExplorerClient(nbxplorerEvent.Network);
var session = await client.CreateNotificationSessionAsync(_Cts.Token);
if (!_Sessions.TryAdd(nbxplorerEvent.Network.CryptoCode, session))
{
await session.DisposeAsync();
return;
}
try
{
using (session)
{
await session.ListenNewBlockAsync(_Cts.Token);
await session.ListenDerivationSchemesAsync((await GetStrategies(nbxplorerEvent)).ToArray(), _Cts.Token);
Logs.PayServer.LogInformation($"Start Listening {nbxplorerEvent.Network.CryptoCode} explorer events");
while (true)
{
var newEvent = await session.NextEventAsync(_Cts.Token);
switch (newEvent)
{
case NBXplorer.Models.NewBlockEvent evt:
_Aggregator.Publish(new Events.NewBlockEvent());
break;
case NBXplorer.Models.NewTransactionEvent evt:
foreach (var txout in evt.Match.Outputs)
{
_Aggregator.Publish(new Events.TxOutReceivedEvent()
{
Network = nbxplorerEvent.Network,
ScriptPubKey = txout.ScriptPubKey
});
}
break;
default:
Logs.PayServer.LogWarning("Received unknown message from NBXplorer");
break;
}
}
}
}
catch when (_Cts.IsCancellationRequested) { }
finally
{
Logs.PayServer.LogInformation($"Stop listening {nbxplorerEvent.Network.CryptoCode} explorer events");
_Sessions.TryRemove(nbxplorerEvent.Network.CryptoCode, out NotificationSession unused);
if(_Sessions.Count == 0 && _Cts.IsCancellationRequested)
{
_RunningTask.TrySetResult(true);
}
}
await Listen(nbxplorerEvent.Network);
}
}));
_ListenPoller = new Timer(async s =>
{
foreach (var nbxplorerState in _Dashboards.GetAll())
{
if(nbxplorerState.Status != null && nbxplorerState.Status.IsFullySynched)
{
await Listen(nbxplorerState.Network);
}
}
}, null, 0, (int)PollInterval.TotalMilliseconds);
leases.Add(_ListenPoller);
leases.Add(_Aggregator.Subscribe<Events.InvoiceCreatedEvent>(async inv =>
{
var invoice = await _InvoiceRepository.GetInvoice(null, inv.InvoiceId);
@ -117,13 +100,73 @@ namespace BTCPayServer.HostedServices
return Task.CompletedTask;
}
private async Task<List<DerivationStrategyBase>> GetStrategies(NBXplorerStateChangedEvent nbxplorerEvent)
private async Task Listen(BTCPayNetwork network)
{
if (_Sessions.ContainsKey(network.CryptoCode))
return;
var client = _ExplorerClients.GetExplorerClient(network);
if (client == null)
return;
if (_Cts.IsCancellationRequested)
return;
var session = await client.CreateNotificationSessionAsync(_Cts.Token).ConfigureAwait(false);
if (!_Sessions.TryAdd(network.CryptoCode, session))
{
await session.DisposeAsync();
return;
}
try
{
using (session)
{
await session.ListenNewBlockAsync(_Cts.Token).ConfigureAwait(false);
await session.ListenDerivationSchemesAsync((await GetStrategies(network)).ToArray(), _Cts.Token).ConfigureAwait(false);
Logs.PayServer.LogInformation($"Connected to WebSocket of NBXplorer ({network.CryptoCode})");
while (!_Cts.IsCancellationRequested)
{
var newEvent = await session.NextEventAsync(_Cts.Token).ConfigureAwait(false);
switch (newEvent)
{
case NBXplorer.Models.NewBlockEvent evt:
_Aggregator.Publish(new Events.NewBlockEvent());
break;
case NBXplorer.Models.NewTransactionEvent evt:
foreach (var txout in evt.Match.Outputs)
{
_Aggregator.Publish(new Events.TxOutReceivedEvent()
{
Network = network,
ScriptPubKey = txout.ScriptPubKey
});
}
break;
default:
Logs.PayServer.LogWarning("Received unknown message from NBXplorer");
break;
}
}
}
}
catch when (_Cts.IsCancellationRequested) { }
finally
{
Logs.PayServer.LogInformation($"Disconnected from WebSocket of NBXplorer ({network.CryptoCode})");
_Sessions.TryRemove(network.CryptoCode, out NotificationSession unused);
if (_Sessions.Count == 0 && _Cts.IsCancellationRequested)
{
_RunningTask.TrySetResult(true);
}
}
}
private async Task<List<DerivationStrategyBase>> GetStrategies(BTCPayNetwork network)
{
List<DerivationStrategyBase> strategies = new List<DerivationStrategyBase>();
foreach (var invoiceId in await _InvoiceRepository.GetPendingInvoices())
{
var invoice = await _InvoiceRepository.GetInvoice(null, invoiceId);
var strategy = GetStrategy(nbxplorerEvent.Network.CryptoCode, invoice);
var strategy = GetStrategy(network.CryptoCode, invoice);
if (strategy != null)
strategies.Add(strategy);
}