using NBXplorer; using Microsoft.Extensions.Logging; using NBXplorer.DerivationStrategy; using NBXplorer.Models; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using NBitcoin; using BTCPayServer.Logging; using System.Threading; using Microsoft.Extensions.Hosting; using System.Collections.Concurrent; using Hangfire; using BTCPayServer.Services.Wallets; namespace BTCPayServer.Services.Invoices { public class InvoiceWatcher : IHostedService { InvoiceRepository _InvoiceRepository; ExplorerClient _ExplorerClient; DerivationStrategyFactory _DerivationFactory; InvoiceNotificationManager _NotificationManager; BTCPayWallet _Wallet; public InvoiceWatcher(ExplorerClient explorerClient, InvoiceRepository invoiceRepository, BTCPayWallet wallet, InvoiceNotificationManager notificationManager) { LongPollingMode = explorerClient.Network == Network.RegTest; PollInterval = explorerClient.Network == Network.RegTest ? TimeSpan.FromSeconds(10.0) : TimeSpan.FromMinutes(1.0); _Wallet = wallet ?? throw new ArgumentNullException(nameof(wallet)); _ExplorerClient = explorerClient ?? throw new ArgumentNullException(nameof(explorerClient)); _DerivationFactory = new DerivationStrategyFactory(_ExplorerClient.Network); _InvoiceRepository = invoiceRepository ?? throw new ArgumentNullException(nameof(invoiceRepository)); _NotificationManager = notificationManager ?? throw new ArgumentNullException(nameof(notificationManager)); } public bool LongPollingMode { get; set; } public async Task NotifyReceived(Script scriptPubKey) { var invoice = await _InvoiceRepository.GetInvoiceIdFromScriptPubKey(scriptPubKey); if (invoice != null) _WatchRequests.Add(invoice); } public async Task NotifyBlock() { foreach (var invoice in await _InvoiceRepository.GetPendingInvoices()) { _WatchRequests.Add(invoice); } } private async Task UpdateInvoice(string invoiceId) { UTXOChanges changes = null; while (true) { try { var invoice = await _InvoiceRepository.GetInvoice(null, invoiceId).ConfigureAwait(false); if (invoice == null) break; var stateBefore = invoice.Status; var result = await UpdateInvoice(changes, invoice).ConfigureAwait(false); changes = result.Changes; if (result.NeedSave) await _InvoiceRepository.UpdateInvoiceStatus(invoice.Id, invoice.Status, invoice.ExceptionStatus).ConfigureAwait(false); var changed = stateBefore != invoice.Status; if (changed) { Logs.PayServer.LogInformation($"Invoice {invoice.Id}: {stateBefore} => {invoice.Status}"); } var expirationMonitoring = invoice.MonitoringExpiration.HasValue ? invoice.MonitoringExpiration.Value : invoice.InvoiceTime + TimeSpan.FromMinutes(60); if (invoice.Status == "complete" || ((invoice.Status == "invalid" || invoice.Status == "expired") && expirationMonitoring < DateTimeOffset.UtcNow)) { if (await _InvoiceRepository.RemovePendingInvoice(invoice.Id).ConfigureAwait(false)) Logs.PayServer.LogInformation("Stopped watching invoice " + invoiceId); break; } if (!changed || _Cts.Token.IsCancellationRequested) break; } catch (OperationCanceledException) when (_Cts.Token.IsCancellationRequested) { break; } catch (Exception ex) { Logs.PayServer.LogError(ex, "Unhandled error on watching invoice " + invoiceId); await Task.Delay(10000, _Cts.Token).ConfigureAwait(false); } } } private async Task<(bool NeedSave, UTXOChanges Changes)> UpdateInvoice(UTXOChanges changes, InvoiceEntity invoice) { bool needSave = false; //Fetch unknown payments var strategy = _DerivationFactory.Parse(invoice.DerivationStrategy); changes = await _ExplorerClient.SyncAsync(strategy, changes, !LongPollingMode, _Cts.Token).ConfigureAwait(false); var utxos = changes.Confirmed.UTXOs.Concat(changes.Unconfirmed.UTXOs).ToArray(); var invoiceIds = utxos.Select(u => _InvoiceRepository.GetInvoiceIdFromScriptPubKey(u.Output.ScriptPubKey)).ToArray(); utxos = utxos .Where((u, i) => invoiceIds[i].GetAwaiter().GetResult() == invoice.Id) .ToArray(); List receivedCoins = new List(); foreach (var received in utxos) if (received.Output.ScriptPubKey == invoice.DepositAddress.ScriptPubKey) receivedCoins.Add(new Coin(received.Outpoint, received.Output)); var alreadyAccounted = new HashSet(invoice.Payments.Select(p => p.Outpoint)); BitcoinAddress generatedAddress = null; bool dirtyAddress = false; foreach (var coin in receivedCoins.Where(c => !alreadyAccounted.Contains(c.Outpoint))) { var payment = await _InvoiceRepository.AddPayment(invoice.Id, coin).ConfigureAwait(false); invoice.Payments.Add(payment); if (coin.ScriptPubKey == invoice.DepositAddress.ScriptPubKey && generatedAddress == null) { dirtyAddress = true; } } ////// if (invoice.Status == "new" && invoice.ExpirationTime < DateTimeOffset.UtcNow) { needSave = true; await _InvoiceRepository.UnaffectAddress(invoice.Id); invoice.Status = "expired"; } if (invoice.Status == "new" || invoice.Status == "expired") { var totalPaid = invoice.Payments.Select(p => p.Output.Value).Sum(); if (totalPaid >= invoice.GetTotalCryptoDue()) { if (invoice.Status == "new") { invoice.Status = "paid"; if (invoice.FullNotifications) { _NotificationManager.Notify(invoice); } invoice.ExceptionStatus = null; await _InvoiceRepository.UnaffectAddress(invoice.Id); needSave = true; } else if (invoice.Status == "expired") { invoice.ExceptionStatus = "paidLate"; needSave = true; } } if (totalPaid > invoice.GetTotalCryptoDue() && invoice.ExceptionStatus != "paidOver") { invoice.ExceptionStatus = "paidOver"; await _InvoiceRepository.UnaffectAddress(invoice.Id); needSave = true; } if (totalPaid < invoice.GetTotalCryptoDue() && invoice.Payments.Count != 0 && invoice.ExceptionStatus != "paidPartial") { Logs.PayServer.LogInformation("Paid to " + invoice.DepositAddress); invoice.ExceptionStatus = "paidPartial"; needSave = true; if (dirtyAddress) { var address = await _Wallet.ReserveAddressAsync(_DerivationFactory.Parse(invoice.DerivationStrategy)); Logs.PayServer.LogInformation("Generate new " + address); await _InvoiceRepository.NewAddress(invoice.Id, address); } } } if (invoice.Status == "paid") { if (!invoice.MonitoringExpiration.HasValue || invoice.MonitoringExpiration > DateTimeOffset.UtcNow) { var transactions = await GetPaymentsWithTransaction(invoice); if (invoice.SpeedPolicy == SpeedPolicy.HighSpeed) { transactions = transactions.Where(t => !t.Transaction.Transaction.RBF); } else if (invoice.SpeedPolicy == SpeedPolicy.MediumSpeed) { transactions = transactions.Where(t => t.Transaction.Confirmations >= 1); } else if (invoice.SpeedPolicy == SpeedPolicy.LowSpeed) { transactions = transactions.Where(t => t.Transaction.Confirmations >= 6); } var totalConfirmed = transactions.Select(t => t.Payment.Output.Value).Sum(); if (totalConfirmed >= invoice.GetTotalCryptoDue()) { await _InvoiceRepository.UnaffectAddress(invoice.Id); invoice.Status = "confirmed"; _NotificationManager.Notify(invoice); needSave = true; } } else { await _InvoiceRepository.UnaffectAddress(invoice.Id); invoice.Status = "invalid"; needSave = true; } } if (invoice.Status == "confirmed") { var transactions = await GetPaymentsWithTransaction(invoice); transactions = transactions.Where(t => t.Transaction.Confirmations >= 6); var totalConfirmed = transactions.Select(t => t.Payment.Output.Value).Sum(); if (totalConfirmed >= invoice.GetTotalCryptoDue()) { invoice.Status = "complete"; if (invoice.FullNotifications) _NotificationManager.Notify(invoice); needSave = true; } } return (needSave, changes); } private async Task> GetPaymentsWithTransaction(InvoiceEntity invoice) { var getPayments = invoice.Payments .Select(async o => (Payment: o, Transaction: await _ExplorerClient.GetTransactionAsync(o.Outpoint.Hash, _Cts.Token))) .ToArray(); await Task.WhenAll(getPayments).ConfigureAwait(false); var transactions = getPayments.Select(c => (Payment: c.Result.Payment, Transaction: c.Result.Transaction)); return transactions; } TimeSpan _PollInterval; public TimeSpan PollInterval { get { return _PollInterval; } set { _PollInterval = value; if (_UpdatePendingInvoices != null) { _UpdatePendingInvoices.Change(0, (int)value.TotalMilliseconds); } } } public async Task WatchAsync(string invoiceId, bool singleShot = false) { if (invoiceId == null) throw new ArgumentNullException(nameof(invoiceId)); if (!singleShot) await _InvoiceRepository.AddPendingInvoice(invoiceId).ConfigureAwait(false); _WatchRequests.Add(invoiceId); } BlockingCollection _WatchRequests = new BlockingCollection(new ConcurrentQueue()); public void Dispose() { _Cts.Cancel(); } Thread _Thread; TaskCompletionSource _RunningTask; CancellationTokenSource _Cts; Timer _UpdatePendingInvoices; public Task StartAsync(CancellationToken cancellationToken) { _RunningTask = new TaskCompletionSource(); _Cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); _Thread = new Thread(Run) { Name = "InvoiceWatcher" }; _Thread.Start(); _UpdatePendingInvoices = new Timer(async s => { foreach (var pending in await _InvoiceRepository.GetPendingInvoices()) { _WatchRequests.Add(pending); } }, null, 0, (int)PollInterval.TotalMilliseconds); return Task.CompletedTask; } void Run() { Logs.PayServer.LogInformation("Start watching invoices"); ConcurrentDictionary> updating = new ConcurrentDictionary>(); try { foreach (var item in _WatchRequests.GetConsumingEnumerable(_Cts.Token)) { try { _Cts.Token.ThrowIfCancellationRequested(); var localItem = item; // If the invoice is already updating, ignore Lazy updateInvoice = new Lazy(() => UpdateInvoice(localItem), false); if (updating.TryAdd(item, updateInvoice)) { updateInvoice.Value.ContinueWith(i => updating.TryRemove(item, out updateInvoice)); } } catch (Exception ex) when (!_Cts.Token.IsCancellationRequested) { Logs.PayServer.LogCritical(ex, $"Error in the InvoiceWatcher loop (Invoice {item})"); _Cts.Token.WaitHandle.WaitOne(2000); } } } catch (OperationCanceledException) { try { Task.WaitAll(updating.Select(c => c.Value.Value).ToArray()); } catch (AggregateException) { } _RunningTask.TrySetResult(true); } finally { Logs.PayServer.LogInformation("Stop watching invoices"); } } public Task StopAsync(CancellationToken cancellationToken) { _UpdatePendingInvoices.Dispose(); _Cts.Cancel(); return Task.WhenAny(_RunningTask.Task, Task.Delay(-1, cancellationToken)); } } }