Refactor the LndInvoiceClient which might solve memory leak

This commit is contained in:
nicolas.dorier 2018-07-13 19:45:50 +09:00
parent 22e700a53e
commit 94a6f20a05
6 changed files with 69 additions and 43 deletions

View file

@ -101,6 +101,7 @@ namespace BTCPayServer.Tests
/// <returns></returns>
private async Task PrepareLightningAsync(ILightningInvoiceClient client)
{
bool awaitingLocking = false;
while (true)
{
var merchantInfo = await WaitLNSynched(client, CustomerLightningD, MerchantLightningD);
@ -126,8 +127,9 @@ namespace BTCPayServer.Tests
await CustomerLightningD.FundChannelAsync(merchantNodeInfo, Money.Satoshis(16777215));
break;
case "CHANNELD_AWAITING_LOCKIN":
ExplorerNode.Generate(1);
ExplorerNode.Generate(awaitingLocking ? 1 : 10);
await WaitLNSynched(client, CustomerLightningD, MerchantLightningD);
awaitingLocking = true;
break;
case "CHANNELD_NORMAL":
return;

View file

@ -1662,7 +1662,7 @@ namespace BTCPayServer.Tests
private async Task EventuallyAsync(Func<Task> act)
{
CancellationTokenSource cts = new CancellationTokenSource(20000);
CancellationTokenSource cts = new CancellationTokenSource(20000000);
while (true)
{
try

View file

@ -185,6 +185,14 @@ namespace BTCPayServer
cancellationToken.ThrowIfCancellationRequested();
return await doing;
}
public static async Task WithCancellation(this Task task, CancellationToken cancellationToken)
{
var waiting = Task.Delay(-1, cancellationToken);
var doing = task;
await Task.WhenAny(waiting, doing);
cancellationToken.ThrowIfCancellationRequested();
await doing;
}
public static (string Signature, String Id, String Authorization) GetBitpayAuth(this HttpContext ctx)
{

View file

@ -123,7 +123,7 @@ namespace BTCPayServer.Payments.Lightning
using (var tcp = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp))
{
await WithTimeout(tcp.ConnectAsync(new IPEndPoint(address, nodeInfo.Port)), cancellation);
await tcp.ConnectAsync(new IPEndPoint(address, nodeInfo.Port)).WithCancellation(cancellation);
}
}
catch (Exception ex)
@ -131,15 +131,5 @@ namespace BTCPayServer.Payments.Lightning
throw new PaymentMethodUnavailableException($"Error while connecting to the lightning node via {nodeInfo.Host}:{nodeInfo.Port} ({ex.Message})");
}
}
static Task WithTimeout(Task task, CancellationToken token)
{
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
var registration = token.Register(() => { try { tcs.TrySetResult(true); } catch { } });
#pragma warning disable CA2008 // Do not create tasks without passing a TaskScheduler
var timeoutTask = tcs.Task;
#pragma warning restore CA2008 // Do not create tasks without passing a TaskScheduler
return Task.WhenAny(task, timeoutTask).Unwrap().ContinueWith(t => registration.Dispose(), TaskScheduler.Default);
}
}
}

View file

@ -146,8 +146,8 @@ namespace BTCPayServer.Payments.Lightning
try
{
Logs.PayServer.LogInformation($"{supportedPaymentMethod.CryptoCode} (Lightning): Start listening {supportedPaymentMethod.GetLightningUrl().BaseUri}");
var charge = _LightningClientFactory.CreateClient(supportedPaymentMethod, network);
var session = await charge.Listen(_Cts.Token);
var lightningClient = _LightningClientFactory.CreateClient(supportedPaymentMethod, network);
var session = await lightningClient.Listen(_Cts.Token);
while (true)
{
var notification = await session.WaitInvoice(_Cts.Token);

View file

@ -6,6 +6,7 @@ using System.Linq;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Net.Security;
using System.Runtime.ExceptionServices;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Text;
@ -27,59 +28,72 @@ namespace BTCPayServer.Payments.Lightning.Lnd
CancellationTokenSource _Cts = new CancellationTokenSource();
ManualResetEventSlim _Stopped = new ManualResetEventSlim(false);
HttpClient _Client;
HttpResponseMessage _Response;
Stream _Body;
StreamReader _Reader;
public LndInvoiceClientSession(LndSwaggerClient parent)
{
_Parent = parent;
}
public async void StartListening()
public async Task StartListening()
{
_Client = _Parent.CreateHttpClient();
_Client.Timeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
var request = new HttpRequestMessage(HttpMethod.Get, _Parent.BaseUrl.WithTrailingSlash() + "v1/invoices/subscribe");
_Response = await _Client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, _Cts.Token);
_Body = await _Response.Content.ReadAsStreamAsync();
_Reader = new StreamReader(_Body);
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
ListenLoop();
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}
private async Task ListenLoop()
{
var urlBuilder = new StringBuilder();
urlBuilder.Append(_Parent.BaseUrl).Append("/v1/invoices/subscribe");
try
{
using (var client = _Parent.CreateHttpClient())
while (!_Cts.IsCancellationRequested)
{
client.Timeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
var request = new HttpRequestMessage(HttpMethod.Get, urlBuilder.ToString());
using (var response = await client.SendAsync(
request, HttpCompletionOption.ResponseHeadersRead, _Cts.Token))
string line = await _Reader.ReadLineAsync().WithCancellation(_Cts.Token);
if (line != null && line.StartsWith("{\"result\":", StringComparison.OrdinalIgnoreCase))
{
using (var body = await response.Content.ReadAsStreamAsync())
using (var reader = new StreamReader(body))
{
while (!_Cts.IsCancellationRequested)
{
string line = await reader.ReadLineAsync().WithCancellation(_Cts.Token);
if (line != null && line.StartsWith("{\"result\":", StringComparison.OrdinalIgnoreCase))
{
var invoiceString = JObject.Parse(line)["result"].ToString();
LnrpcInvoice parsedInvoice = _Parent.Deserialize<LnrpcInvoice>(invoiceString);
await _Invoices.Writer.WriteAsync(ConvertLndInvoice(parsedInvoice));
}
}
}
var invoiceString = JObject.Parse(line)["result"].ToString();
LnrpcInvoice parsedInvoice = _Parent.Deserialize<LnrpcInvoice>(invoiceString);
await _Invoices.Writer.WriteAsync(ConvertLndInvoice(parsedInvoice), _Cts.Token);
}
}
}
catch when (_Cts.IsCancellationRequested)
{
}
catch (Exception ex)
{
_Ex = ex;
}
finally
{
_Stopped.Set();
Dispose();
}
}
Exception _Ex;
public async Task<LightningInvoice> WaitInvoice(CancellationToken cancellation)
{
try
{
return await _Invoices.Reader.ReadAsync(cancellation);
}
catch when (!cancellation.IsCancellationRequested && _Ex != null)
{
ExceptionDispatchInfo.Capture(_Ex).Throw();
throw;
}
catch (ChannelClosedException)
{
throw new TaskCanceledException();
@ -88,6 +102,18 @@ namespace BTCPayServer.Payments.Lightning.Lnd
public void Dispose()
{
if (_Cts.IsCancellationRequested)
return;
_Reader?.Dispose();
_Reader = null;
_Body?.Dispose();
_Body = null;
_Response?.Dispose();
_Response = null;
_Client?.Dispose();
_Client = null;
_Cts.Cancel();
_Stopped.Wait();
_Invoices.Writer.Complete();
@ -157,11 +183,11 @@ namespace BTCPayServer.Payments.Lightning.Lnd
return ConvertLndInvoice(resp);
}
public Task<ILightningListenInvoiceSession> Listen(CancellationToken cancellation = default(CancellationToken))
public async Task<ILightningListenInvoiceSession> Listen(CancellationToken cancellation = default(CancellationToken))
{
var session = new LndInvoiceClientSession(this._rpcClient);
session.StartListening();
return Task.FromResult<ILightningListenInvoiceSession>(session);
await session.StartListening();
return session;
}
internal static LightningInvoice ConvertLndInvoice(LnrpcInvoice resp)