Clean the LND listener, and make sure it correctly ends.

This commit is contained in:
nicolas.dorier 2018-07-08 22:20:59 +09:00
parent 6307aa8665
commit 9eb36a8b40
5 changed files with 124 additions and 72 deletions

View File

@ -64,6 +64,7 @@ namespace BTCPayServer.Tests.Lnd
public async Task TestWaitListenInvoice()
{
var merchantInvoice = await InvoiceClient.CreateInvoice(10000, "Hello world", TimeSpan.FromSeconds(3600));
var merchantInvoice2 = await InvoiceClient.CreateInvoice(10000, "Hello world", TimeSpan.FromSeconds(3600));
var waitToken = default(CancellationToken);
var listener = await InvoiceClient.Listen(waitToken);
@ -76,8 +77,22 @@ namespace BTCPayServer.Tests.Lnd
});
var invoice = await waitTask;
Assert.True(invoice.PaidAt.HasValue);
var waitTask2 = listener.WaitInvoice(waitToken);
payResponse = await CustomerLnd.SendPaymentSyncAsync(new LnrpcSendRequest
{
Payment_request = merchantInvoice2.BOLT11
});
invoice = await waitTask2;
Assert.True(invoice.PaidAt.HasValue);
var waitTask3 = listener.WaitInvoice(waitToken);
await Task.Delay(100);
listener.Dispose();
Assert.Throws<TaskCanceledException>(()=> waitTask3.GetAwaiter().GetResult());
}
[Fact]

View File

@ -109,7 +109,7 @@ namespace BTCPayServer
public static string GetAbsoluteUri(this HttpRequest request, string redirectUrl)
{
bool isRelative =
bool isRelative =
(redirectUrl.Length > 0 && redirectUrl[0] == '/')
|| !new Uri(redirectUrl, UriKind.RelativeOrAbsolute).IsAbsoluteUri;
return isRelative ? request.GetAbsoluteRoot() + redirectUrl : redirectUrl;
@ -141,7 +141,7 @@ namespace BTCPayServer
public static void AddRange<T>(this HashSet<T> hashSet, IEnumerable<T> items)
{
foreach(var item in items)
foreach (var item in items)
{
hashSet.Add(item);
}
@ -157,6 +157,15 @@ namespace BTCPayServer
NBitcoin.Extensions.TryAdd(ctx.Items, "BitpayAuth", value);
}
public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken)
{
var waiting = Task.Delay(-1, cancellationToken);
var doing = task;
await Task.WhenAny(waiting, doing);
cancellationToken.ThrowIfCancellationRequested();
return await doing;
}
public static (string Signature, String Id, String Authorization) GetBitpayAuth(this HttpContext ctx)
{
ctx.Items.TryGetValue("BitpayAuth", out object obj);

View File

@ -26,7 +26,6 @@ namespace BTCPayServer.Payments.Lightning
else if (connString.ConnectionType == LightningConnectionType.CLightning)
{
return new CLightningRPCClient(connString.ToUri(false), network);
}
else if (connString.ConnectionType == LightningConnectionType.LndREST)
{

View File

@ -10,13 +10,91 @@ using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using NBitcoin;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace BTCPayServer.Payments.Lightning.Lnd
{
public class LndInvoiceClient : ILightningInvoiceClient, ILightningListenInvoiceSession
public class LndInvoiceClient : ILightningInvoiceClient
{
class LndInvoiceClientSession : ILightningListenInvoiceSession
{
private LndSwaggerClient _Parent;
Channel<LightningInvoice> _Invoices = Channel.CreateBounded<LightningInvoice>(50);
CancellationTokenSource _Cts = new CancellationTokenSource();
ManualResetEventSlim _Stopped = new ManualResetEventSlim(false);
public LndInvoiceClientSession(LndSwaggerClient parent)
{
_Parent = parent;
}
public async void StartListening()
{
var urlBuilder = new StringBuilder();
urlBuilder.Append(_Parent.BaseUrl).Append("/v1/invoices/subscribe");
try
{
while (!_Cts.IsCancellationRequested)
{
using (var client = _Parent.CreateHttpClient())
{
client.Timeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
var request = new HttpRequestMessage(HttpMethod.Get, urlBuilder.ToString());
using (var response = await client.SendAsync(
request, HttpCompletionOption.ResponseHeadersRead, _Cts.Token))
{
using (var body = await response.Content.ReadAsStreamAsync())
using (var reader = new StreamReader(body))
{
string line = await reader.ReadLineAsync().WithCancellation(_Cts.Token);
if (line != null && line.StartsWith("{\"result\":", StringComparison.OrdinalIgnoreCase))
{
var invoiceString = JObject.Parse(line)["result"].ToString();
LnrpcInvoice parsedInvoice = _Parent.Deserialize<LnrpcInvoice>(invoiceString);
await _Invoices.Writer.WriteAsync(ConvertLndInvoice(parsedInvoice));
}
}
}
}
}
}
catch when (_Cts.IsCancellationRequested)
{
}
finally
{
_Stopped.Set();
}
}
public async Task<LightningInvoice> WaitInvoice(CancellationToken cancellation)
{
try
{
return await _Invoices.Reader.ReadAsync(cancellation);
}
catch (ChannelClosedException)
{
throw new TaskCanceledException();
}
}
public void Dispose()
{
_Cts.Cancel();
_Stopped.Wait();
_Invoices.Writer.Complete();
}
}
public LndSwaggerClient _rpcClient;
public LndInvoiceClient(LndSwaggerClient swaggerClient)
@ -78,31 +156,15 @@ namespace BTCPayServer.Payments.Lightning.Lnd
var resp = await _rpcClient.LookupInvoiceAsync(invoiceId, null, cancellation);
return ConvertLndInvoice(resp);
}
public Task<ILightningListenInvoiceSession> Listen(CancellationToken cancellation = default(CancellationToken))
{
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
_rpcClient.StartSubscribeInvoiceThread(cancellation);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
return Task.FromResult<ILightningListenInvoiceSession>(this);
var session = new LndInvoiceClientSession(this._rpcClient);
session.StartListening();
return Task.FromResult<ILightningListenInvoiceSession>(session);
}
async Task<LightningInvoice> ILightningListenInvoiceSession.WaitInvoice(CancellationToken cancellation)
{
var resp = await _rpcClient.InvoiceResponse.Task;
return ConvertLndInvoice(resp);
}
// utility static methods... maybe move to separate class
private static string BitString(byte[] bytes)
{
return BitConverter.ToString(bytes)
.Replace("-", "", StringComparison.InvariantCulture)
.ToLower(CultureInfo.InvariantCulture);
}
private static LightningInvoice ConvertLndInvoice(LnrpcInvoice resp)
internal static LightningInvoice ConvertLndInvoice(LnrpcInvoice resp)
{
var invoice = new LightningInvoice
{
@ -129,9 +191,13 @@ namespace BTCPayServer.Payments.Lightning.Lnd
return invoice;
}
public void Dispose()
// utility static methods... maybe move to separate class
private static string BitString(byte[] bytes)
{
//
return BitConverter.ToString(bytes)
.Replace("-", "", StringComparison.InvariantCulture)
.ToLower(CultureInfo.InvariantCulture);
}
// Invariant culture conversion

View File

@ -42,7 +42,7 @@ namespace BTCPayServer.Payments.Lightning.Lnd
_Settings = settings;
}
LndRestSettings _Settings;
private static HttpClient CreateHttpClient(LndRestSettings settings)
internal static HttpClient CreateHttpClient(LndRestSettings settings)
{
var handler = new HttpClientHandler
{
@ -79,51 +79,14 @@ namespace BTCPayServer.Payments.Lightning.Lnd
return httpClient;
}
public TaskCompletionSource<LnrpcInvoice> InvoiceResponse = new TaskCompletionSource<LnrpcInvoice>();
public TaskCompletionSource<LndSwaggerClient> SubscribeLost = new TaskCompletionSource<LndSwaggerClient>();
// TODO: Refactor swagger generated wrapper to include this method directly
public async Task StartSubscribeInvoiceThread(CancellationToken token)
internal HttpClient CreateHttpClient()
{
var urlBuilder = new StringBuilder();
urlBuilder.Append(BaseUrl).Append("/v1/invoices/subscribe");
return LndSwaggerClient.CreateHttpClient(_Settings);
}
using (var client = CreateHttpClient(_Settings))
{
client.Timeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
var request = new HttpRequestMessage(HttpMethod.Get, urlBuilder.ToString());
using (var response = await client.SendAsync(
request, HttpCompletionOption.ResponseHeadersRead, token))
{
using (var body = await response.Content.ReadAsStreamAsync())
using (var reader = new StreamReader(body))
{
try
{
while (!reader.EndOfStream)
{
string line = reader.ReadLine();
if (line != null && line.Contains("\"result\":", StringComparison.OrdinalIgnoreCase))
{
dynamic parsedJson = JObject.Parse(line);
var result = parsedJson.result;
var invoiceString = result.ToString();
LnrpcInvoice parsedInvoice = JsonConvert.DeserializeObject<LnrpcInvoice>(invoiceString, _settings.Value);
InvoiceResponse.SetResult(parsedInvoice);
}
}
}
catch (Exception e)
{
// TODO: check that the exception type is actually from a closed stream.
Debug.WriteLine(e.Message);
SubscribeLost.SetResult(this);
}
}
}
}
internal T Deserialize<T>(string str)
{
return JsonConvert.DeserializeObject<T>(str, _settings.Value);
}
}
}