Remove HangFire dependency

This commit is contained in:
nicolas.dorier 2019-01-16 19:14:45 +09:00
parent cfb51a6be4
commit d85f03ba20
12 changed files with 321 additions and 49 deletions

View File

@ -0,0 +1,64 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace BTCPayServer.Tests
{
public class MockDelay : IDelay
{
class WaitObj
{
public DateTimeOffset Expiration;
public TaskCompletionSource<bool> CTS;
}
List<WaitObj> waits = new List<WaitObj>();
DateTimeOffset _Now = new DateTimeOffset(1970, 1, 1, 0, 0, 0, TimeSpan.Zero);
public async Task Wait(TimeSpan delay, CancellationToken cancellation)
{
WaitObj w = new WaitObj();
w.Expiration = _Now + delay;
w.CTS = new TaskCompletionSource<bool>();
using (cancellation.Register(() =>
{
w.CTS.TrySetCanceled();
}))
{
lock (waits)
{
waits.Add(w);
}
await w.CTS.Task;
}
}
public void Advance(TimeSpan time)
{
_Now += time;
lock (waits)
{
foreach (var wait in waits.ToArray())
{
if (_Now >= wait.Expiration)
{
wait.CTS.TrySetResult(true);
waits.Remove(wait);
}
}
}
}
public void AdvanceMilliseconds(long milli)
{
Advance(TimeSpan.FromMilliseconds(milli));
}
public override string ToString()
{
return _Now.Millisecond.ToString(CultureInfo.InvariantCulture);
}
}
}

View File

@ -1549,6 +1549,81 @@ donation:
}
}
[Fact]
[Trait("Fast", "Fast")]
public void CanScheduleBackgroundTasks()
{
BackgroundJobClient client = new BackgroundJobClient();
MockDelay mockDelay = new MockDelay();
client.Delay = mockDelay;
bool[] jobs = new bool[4];
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
Logs.Tester.LogInformation("Start Job[0] in 5 sec");
client.Schedule(async () => { Logs.Tester.LogInformation("Job[0]"); jobs[0] = true; }, TimeSpan.FromSeconds(5.0));
Logs.Tester.LogInformation("Start Job[1] in 2 sec");
client.Schedule(async () => { Logs.Tester.LogInformation("Job[1]"); jobs[1] = true; }, TimeSpan.FromSeconds(2.0));
Logs.Tester.LogInformation("Start Job[2] fails in 6 sec");
client.Schedule(async () => { jobs[2] = true; throw new Exception("Job[2]"); }, TimeSpan.FromSeconds(6.0));
Logs.Tester.LogInformation("Start Job[3] starts in in 7 sec");
client.Schedule(async () => { Logs.Tester.LogInformation("Job[3]"); jobs[3] = true; }, TimeSpan.FromSeconds(7.0));
Assert.True(new[] { false, false, false, false }.SequenceEqual(jobs));
CancellationTokenSource cts = new CancellationTokenSource();
var processing = client.ProcessJobs(cts.Token);
Assert.Equal(4, client.GetExecutingCount());
var waitJobsFinish = client.WaitAllRunning(default);
mockDelay.Advance(TimeSpan.FromSeconds(2.0));
Assert.True(new[] { false, true, false, false }.SequenceEqual(jobs));
mockDelay.Advance(TimeSpan.FromSeconds(3.0));
Assert.True(new[] { true, true, false, false }.SequenceEqual(jobs));
mockDelay.Advance(TimeSpan.FromSeconds(1.0));
Assert.True(new[] { true, true, true, false }.SequenceEqual(jobs));
Assert.Equal(1, client.GetExecutingCount());
Assert.False(waitJobsFinish.Wait(100));
Assert.False(waitJobsFinish.IsCompletedSuccessfully);
mockDelay.Advance(TimeSpan.FromSeconds(1.0));
Assert.True(new[] { true, true, true, true }.SequenceEqual(jobs));
Assert.True(waitJobsFinish.Wait(100));
Assert.True(waitJobsFinish.IsCompletedSuccessfully);
Assert.True(!waitJobsFinish.IsFaulted);
Assert.Equal(0, client.GetExecutingCount());
bool jobExecuted = false;
Logs.Tester.LogInformation("This job will be cancelled");
client.Schedule(async () => { jobExecuted = true; }, TimeSpan.FromSeconds(1.0));
mockDelay.Advance(TimeSpan.FromSeconds(0.5));
Assert.False(jobExecuted);
Thread.Sleep(100);
Assert.Equal(1, client.GetExecutingCount());
waitJobsFinish = client.WaitAllRunning(default);
Assert.False(waitJobsFinish.Wait(100));
cts.Cancel();
Assert.True(waitJobsFinish.Wait(100));
Assert.True(waitJobsFinish.IsCompletedSuccessfully);
Assert.True(!waitJobsFinish.IsFaulted);
Assert.False(jobExecuted);
mockDelay.Advance(TimeSpan.FromSeconds(1.0));
Thread.Sleep(100); // Make sure it get cancelled
Assert.False(jobExecuted);
Assert.Equal(0, client.GetExecutingCount());
Assert.True(processing.IsCanceled);
Assert.True(client.WaitAllRunning(default).Wait(100));
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
}
[Fact]
[Trait("Fast", "Fast")]
public void PosDataParser_ParsesCorrectly()

View File

@ -36,8 +36,6 @@
<PackageReference Include="BTCPayServer.Lightning.All" Version="1.1.0.4" />
<PackageReference Include="BuildBundlerMinifier" Version="2.7.385" />
<PackageReference Include="DigitalRuby.ExchangeSharp" Version="0.5.3" />
<PackageReference Include="Hangfire" Version="1.6.20" />
<PackageReference Include="Hangfire.MemoryStorage" Version="1.5.2" />
<PackageReference Include="HtmlSanitizer" Version="4.0.199" />
<PackageReference Include="LedgerWallet" Version="2.0.0.3" />
<PackageReference Include="Meziantou.AspNetCore.BundleTagHelpers" Version="2.0.0" />

View File

@ -3,8 +3,6 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Hangfire;
using Hangfire.MemoryStorage;
using Microsoft.EntityFrameworkCore.Migrations;
using Npgsql.EntityFrameworkCore.PostgreSQL.Migrations;
using JetBrains.Annotations;
@ -98,15 +96,5 @@ namespace BTCPayServer.Data
else if (_Type == DatabaseType.MySQL)
builder.UseMySql(_ConnectionString);
}
public void ConfigureHangfireBuilder(IGlobalConfiguration builder)
{
builder.UseMemoryStorage();
//We always use memory storage because of incompatibilities with the latest postgres in 2.1
//if (_Type == DatabaseType.Sqlite)
// builder.UseMemoryStorage(); //Sqlite provider does not support multiple workers
//else if (_Type == DatabaseType.Postgres)
// builder.UsePostgreSqlStorage(_ConnectionString);
}
}
}

View File

@ -0,0 +1,133 @@
using System;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using BTCPayServer.Logging;
using BTCPayServer.Services;
using Microsoft.Extensions.Hosting;
using NicolasDorier.RateLimits;
namespace BTCPayServer.HostedServices
{
public class BackgroundJobSchedulerHostedService : IHostedService
{
public BackgroundJobSchedulerHostedService(IBackgroundJobClient backgroundJobClient)
{
BackgroundJobClient = (BackgroundJobClient)backgroundJobClient;
}
public BackgroundJobClient BackgroundJobClient { get; }
Task _Loop;
public Task StartAsync(CancellationToken cancellationToken)
{
_Stop = new CancellationTokenSource();
_Loop = BackgroundJobClient.ProcessJobs(_Stop.Token);
return Task.CompletedTask;
}
CancellationTokenSource _Stop;
public async Task StopAsync(CancellationToken cancellationToken)
{
_Stop.Cancel();
try
{
await _Loop;
}
catch (OperationCanceledException)
{
}
await BackgroundJobClient.WaitAllRunning(cancellationToken);
}
}
public class BackgroundJobClient : IBackgroundJobClient
{
class BackgroundJob
{
public Func<Task> Action;
public TimeSpan Delay;
public IDelay DelayImplementation;
public BackgroundJob(Func<Task> action, TimeSpan delay, IDelay delayImplementation)
{
this.Action = action;
this.Delay = delay;
this.DelayImplementation = delayImplementation;
}
public async Task Run(CancellationToken cancellationToken)
{
await DelayImplementation.Wait(Delay, cancellationToken);
await Action();
}
}
public IDelay Delay { get; set; } = TaskDelay.Instance;
public int GetExecutingCount()
{
lock (_Processing)
{
return _Processing.Count();
}
}
private Channel<BackgroundJob> _Jobs = Channel.CreateUnbounded<BackgroundJob>();
HashSet<Task> _Processing = new HashSet<Task>();
public void Schedule(Func<Task> action, TimeSpan delay)
{
_Jobs.Writer.TryWrite(new BackgroundJob(action, delay, Delay));
}
public async Task WaitAllRunning(CancellationToken cancellationToken)
{
Task[] processing = null;
lock (_Processing)
{
processing = _Processing.ToArray();
}
try
{
await Task.WhenAll(processing).WithCancellation(cancellationToken);
}
catch (Exception) when (cancellationToken.IsCancellationRequested)
{
throw;
}
}
public async Task ProcessJobs(CancellationToken cancellationToken)
{
while (await _Jobs.Reader.WaitToReadAsync(cancellationToken))
{
if (_Jobs.Reader.TryRead(out var job))
{
var processing = job.Run(cancellationToken);
lock (_Processing)
{
_Processing.Add(processing);
}
#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
processing.ContinueWith(t =>
{
if (t.IsFaulted)
{
Logs.PayServer.LogWarning(t.Exception, "Unhandled exception while job running");
}
lock (_Processing)
{
_Processing.Remove(processing);
}
}, default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
}
}
}
}
}

View File

@ -1,10 +1,7 @@
using Hangfire;
using Hangfire.Common;
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Hangfire.Annotations;
using System.Reflection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
@ -21,6 +18,7 @@ using NBXplorer;
using BTCPayServer.Services.Invoices;
using BTCPayServer.Payments;
using BTCPayServer.Services.Mails;
using BTCPayServer.Services;
namespace BTCPayServer.HostedServices
{

View File

@ -11,7 +11,6 @@ using BTCPayServer.Logging;
using System.Threading;
using Microsoft.Extensions.Hosting;
using System.Collections.Concurrent;
using Hangfire;
using BTCPayServer.Services.Wallets;
using BTCPayServer.Controllers;
using BTCPayServer.Events;

View File

@ -145,6 +145,8 @@ namespace BTCPayServer.Hosting
services.AddSingleton<IHostedService, InvoiceNotificationManager>();
services.AddSingleton<IHostedService, InvoiceWatcher>();
services.AddSingleton<IHostedService, RatesHostedService>();
services.AddSingleton<IHostedService, BackgroundJobSchedulerHostedService>();
services.AddSingleton<IBackgroundJobClient, BackgroundJobClient>();
services.AddTransient<IConfigureOptions<MvcOptions>, BTCPayClaimsFilter>();
services.TryAddSingleton<ExplorerClientProvider>();

View File

@ -19,7 +19,6 @@ using BTCPayServer.Models;
using Microsoft.AspNetCore.Identity;
using BTCPayServer.Data;
using Microsoft.Extensions.Logging;
using Hangfire;
using BTCPayServer.Logging;
using Microsoft.AspNetCore.Authorization;
using System.Threading.Tasks;
@ -27,11 +26,8 @@ using BTCPayServer.Controllers;
using BTCPayServer.Services.Stores;
using BTCPayServer.Services.Mails;
using Microsoft.Extensions.Configuration;
using Hangfire.AspNetCore;
using BTCPayServer.Configuration;
using System.IO;
using Hangfire.Dashboard;
using Hangfire.Annotations;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System.Threading;
using Microsoft.Extensions.Options;
@ -46,18 +42,6 @@ namespace BTCPayServer.Hosting
{
public class Startup
{
class NeedRole : IDashboardAuthorizationFilter
{
string _Role;
public NeedRole(string role)
{
_Role = role;
}
public bool Authorize([NotNull] DashboardContext context)
{
return context.GetHttpContext().User.IsInRole(_Role);
}
}
public Startup(IConfiguration conf, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
Configuration = conf;
@ -108,13 +92,6 @@ namespace BTCPayServer.Hosting
options.Lockout.MaxFailedAccessAttempts = 5;
options.Lockout.AllowedForNewUsers = true;
});
services.AddHangfire((o) =>
{
var scope = AspNetCoreJobActivator.Current.BeginScope(null);
var options = (ApplicationDbContextFactory)scope.Resolve(typeof(ApplicationDbContextFactory));
options.ConfigureHangfireBuilder(o);
});
services.AddCors(o =>
{
o.AddPolicy("BitpayAPI", b =>
@ -193,12 +170,6 @@ namespace BTCPayServer.Hosting
app.UsePayServer();
app.UseStaticFiles();
app.UseAuthentication();
app.UseHangfireServer();
app.UseHangfireDashboard("/hangfire", new DashboardOptions()
{
AppPath = options.GetRootUri(),
Authorization = new[] { new NeedRole(Roles.ServerAdmin) }
});
app.UseSignalR(route =>
{
route.MapHub<CrowdfundHub>("/apps/crowdfund/hub");

33
BTCPayServer/IDelay.cs Normal file
View File

@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace BTCPayServer
{
public interface IDelay
{
Task Wait(TimeSpan delay, CancellationToken cancellationToken);
}
public class TaskDelay : IDelay
{
TaskDelay()
{
}
private static readonly TaskDelay _Instance = new TaskDelay();
public static TaskDelay Instance
{
get
{
return _Instance;
}
}
public Task Wait(TimeSpan delay, CancellationToken cancellationToken)
{
return Task.Delay(delay, cancellationToken);
}
}
}

View File

@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace BTCPayServer.Services
{
public interface IBackgroundJobClient
{
void Schedule(Func<Task> act, TimeSpan zero);
}
}

View File

@ -1,6 +1,5 @@
using BTCPayServer.Logging;
using Microsoft.Extensions.Logging;
using Hangfire;
using System;
using System.Collections.Generic;
using System.Linq;