Index invoice in a parallel thread

This commit is contained in:
nicolas.dorier 2017-11-12 23:03:33 +09:00
parent 7a173a6692
commit f1f227b746
4 changed files with 138 additions and 26 deletions

View File

@ -274,19 +274,23 @@ namespace BTCPayServer.Tests
}, Facade.Merchant);
var repo = tester.PayTester.GetService<InvoiceRepository>();
var ctx = tester.PayTester.GetService<ApplicationDbContextFactory>().CreateContext();
var textSearchResult = tester.PayTester.Runtime.InvoiceRepository.GetInvoices(new InvoiceQuery()
{
StoreId = user.StoreId,
TextSearch = invoice.OrderId
}).GetAwaiter().GetResult();
Assert.Equal(1, textSearchResult.Length);
textSearchResult = tester.PayTester.Runtime.InvoiceRepository.GetInvoices(new InvoiceQuery()
{
StoreId = user.StoreId,
TextSearch = invoice.Id
}).GetAwaiter().GetResult();
Assert.Equal(1, textSearchResult.Length);
Eventually(() =>
{
var textSearchResult = tester.PayTester.Runtime.InvoiceRepository.GetInvoices(new InvoiceQuery()
{
StoreId = user.StoreId,
TextSearch = invoice.OrderId
}).GetAwaiter().GetResult();
Assert.Equal(1, textSearchResult.Length);
textSearchResult = tester.PayTester.Runtime.InvoiceRepository.GetInvoices(new InvoiceQuery()
{
StoreId = user.StoreId,
TextSearch = invoice.Id
}).GetAwaiter().GetResult();
Assert.Equal(1, textSearchResult.Length);
});
invoice = user.BitPay.GetInvoice(invoice.Id, Facade.Merchant);
Assert.Equal(Money.Coins(0), invoice.BtcPaid);

View File

@ -49,11 +49,6 @@ namespace BTCPayServer.Configuration
{
throw new ConfigException($"Could not connect to NBXplorer, {ex.Message}");
}
DBreezeEngine db = new DBreezeEngine(CreateDBPath(opts, "TokensDB"));
_Resources.Add(db);
db = new DBreezeEngine(CreateDBPath(opts, "InvoiceDB"));
_Resources.Add(db);
ApplicationDbContextFactory dbContext = null;
if (opts.PostgresConnectionString == null)
@ -68,7 +63,9 @@ namespace BTCPayServer.Configuration
dbContext = new ApplicationDbContextFactory(DatabaseType.Postgres, opts.PostgresConnectionString);
}
DBFactory = dbContext;
InvoiceRepository = new InvoiceRepository(dbContext, db, Network);
InvoiceRepository = new InvoiceRepository(dbContext, CreateDBPath(opts, "InvoiceDB"), Network);
_Resources.Add(InvoiceRepository);
}
private static string CreateDBPath(BTCPayServerOptions opts, string name)

View File

@ -0,0 +1,98 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace BTCPayServer
{
class CustomThreadPool : IDisposable
{
CancellationTokenSource _Cancel = new CancellationTokenSource();
TaskCompletionSource<bool> _Exited;
int _ExitedCount = 0;
Thread[] _Threads;
Exception _UnhandledException;
BlockingCollection<(Action, TaskCompletionSource<object>)> _Actions = new BlockingCollection<(Action, TaskCompletionSource<object>)>(new ConcurrentQueue<(Action, TaskCompletionSource<object>)>());
public CustomThreadPool(int threadCount, string threadName)
{
if (threadCount <= 0)
throw new ArgumentOutOfRangeException(nameof(threadCount));
_Exited = new TaskCompletionSource<bool>();
_Threads = Enumerable.Range(0, threadCount).Select(_ => new Thread(RunLoop) { Name = threadName }).ToArray();
foreach (var t in _Threads)
t.Start();
}
public void Do(Action act)
{
DoAsync(act).GetAwaiter().GetResult();
}
public T Do<T>(Func<T> act)
{
return DoAsync(act).GetAwaiter().GetResult();
}
public async Task<T> DoAsync<T>(Func<T> act)
{
TaskCompletionSource<object> done = new TaskCompletionSource<object>();
_Actions.Add((() =>
{
try
{
done.TrySetResult(act());
}
catch (Exception ex) { done.TrySetException(ex); }
}
, done));
return (T)(await done.Task.ConfigureAwait(false));
}
public Task DoAsync(Action act)
{
return DoAsync<object>(() =>
{
act();
return null;
});
}
void RunLoop()
{
try
{
foreach (var act in _Actions.GetConsumingEnumerable(_Cancel.Token))
{
act.Item1();
}
}
catch (OperationCanceledException) when (_Cancel.IsCancellationRequested) { }
catch (Exception ex)
{
_Cancel.Cancel();
_UnhandledException = ex;
}
if (Interlocked.Increment(ref _ExitedCount) == _Threads.Length)
{
foreach (var action in _Actions)
{
try
{
action.Item2.TrySetCanceled();
}
catch { }
}
_Exited.TrySetResult(true);
}
}
public void Dispose()
{
_Cancel.Cancel();
_Exited.Task.GetAwaiter().GetResult();
}
}
}

View File

@ -19,7 +19,7 @@ using BTCPayServer.Models.InvoicingModels;
namespace BTCPayServer.Services.Invoices
{
public class InvoiceRepository
public class InvoiceRepository : IDisposable
{
@ -47,9 +47,11 @@ namespace BTCPayServer.Services.Invoices
}
private ApplicationDbContextFactory _ContextFactory;
public InvoiceRepository(ApplicationDbContextFactory contextFactory, DBreezeEngine engine, Network network)
private CustomThreadPool _IndexerThread;
public InvoiceRepository(ApplicationDbContextFactory contextFactory, string dbreezePath, Network network)
{
_Engine = engine;
_Engine = new DBreezeEngine(dbreezePath);
_IndexerThread = new CustomThreadPool(1, "Invoice Indexer");
_Network = network;
_ContextFactory = contextFactory;
}
@ -231,11 +233,14 @@ namespace BTCPayServer.Services.Invoices
void AddToTextSearch(string invoiceId, params string[] terms)
{
using (var tx = _Engine.GetTransaction())
_IndexerThread.DoAsync(() =>
{
tx.TextInsert("InvoiceSearch", Encoders.Base58.DecodeData(invoiceId), string.Join(" ", terms.Where(t => !String.IsNullOrWhiteSpace(t))));
tx.Commit();
}
using (var tx = _Engine.GetTransaction())
{
tx.TextInsert("InvoiceSearch", Encoders.Base58.DecodeData(invoiceId), string.Join(" ", terms.Where(t => !String.IsNullOrWhiteSpace(t))));
tx.Commit();
}
});
}
public async Task UpdateInvoiceStatus(string invoiceId, string status, string exceptionStatus)
@ -451,6 +456,14 @@ namespace BTCPayServer.Services.Invoices
{
return NBitcoin.JsonConverters.Serializer.ToString(data, Network);
}
public void Dispose()
{
if (_Engine != null)
_Engine.Dispose();
if (_IndexerThread != null)
_IndexerThread.Dispose();
}
}
public class InvoiceQuery