btcpayserver/BTCPayServer/HostedServices/EventHostedServiceBase.cs
Andrew Camilleri b31fb1a269
Auto label utxos based on invoice and payjoin (#1499)
* Auto label utxos based on invoice and payjoin

This PR introduces automatic labelling to transactions.
* If it is an incoming tx to an invoice, it will tag it as such.
* If it was a payjoin tx , it will tag it as such.
* If a transaction's inputs were exposed to a payjoin sender, we tag it as such.

* wip

* wip

* support in coinselection

* remove ugly hack

* support wallet transactions page

* remove messy loop

* better label template helpers

* add tests and susbcribe to event

* simplify data

* fix test

* fix label  + color

* fix remove label

* renove useless call

* add toString

* fix potential crash by txid

* trim json keyword in manual label

* format file
2020-04-28 15:06:28 +09:00

85 lines
2.6 KiB
C#

using System;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using BTCPayServer.Logging;
using Microsoft.Extensions.Hosting;
namespace BTCPayServer.HostedServices
{
public class EventHostedServiceBase : IHostedService
{
private readonly EventAggregator _EventAggregator;
private List<IEventAggregatorSubscription> _Subscriptions;
private CancellationTokenSource _Cts;
public EventHostedServiceBase(EventAggregator eventAggregator)
{
_EventAggregator = eventAggregator;
}
Channel<object> _Events = Channel.CreateUnbounded<object>();
public async Task ProcessEvents(CancellationToken cancellationToken)
{
while (await _Events.Reader.WaitToReadAsync(cancellationToken))
{
if (_Events.Reader.TryRead(out var evt))
{
try
{
await ProcessEvent(evt, cancellationToken);
}
catch when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
Logs.PayServer.LogWarning(ex, $"Unhandled exception in {this.GetType().Name}");
}
}
}
}
protected virtual Task ProcessEvent(object evt, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
protected virtual void SubscribeToEvents()
{
}
protected void Subscribe<T>()
{
_Subscriptions.Add(_EventAggregator.Subscribe<T>(e => _Events.Writer.TryWrite(e)));
}
public virtual Task StartAsync(CancellationToken cancellationToken)
{
_Subscriptions = new List<IEventAggregatorSubscription>();
SubscribeToEvents();
_Cts = new CancellationTokenSource();
_ProcessingEvents = ProcessEvents(_Cts.Token);
return Task.CompletedTask;
}
Task _ProcessingEvents = Task.CompletedTask;
public virtual async Task StopAsync(CancellationToken cancellationToken)
{
_Subscriptions?.ForEach(subscription => subscription.Dispose());
_Cts?.Cancel();
try
{
await _ProcessingEvents;
}
catch (OperationCanceledException)
{ }
}
}
}