btcpayserver/BTCPayServer/HostedServices/BackgroundJobSchedulerHostedService.cs

142 lines
4.3 KiB
C#
Raw Normal View History

2020-06-28 21:44:35 -05:00
using System;
2019-01-16 19:14:45 +09:00
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using BTCPayServer.Logging;
using BTCPayServer.Services;
using Microsoft.Extensions.Hosting;
2020-06-28 17:55:27 +09:00
using Microsoft.Extensions.Logging;
using NBitcoin;
2019-01-16 19:14:45 +09:00
namespace BTCPayServer.HostedServices
{
public class BackgroundJobSchedulerHostedService : IHostedService
{
public BackgroundJobSchedulerHostedService(IBackgroundJobClient backgroundJobClient)
{
BackgroundJobClient = (BackgroundJobClient)backgroundJobClient;
}
public BackgroundJobClient BackgroundJobClient { get; }
Task _Loop;
public Task StartAsync(CancellationToken cancellationToken)
{
_Stop = new CancellationTokenSource();
_Loop = BackgroundJobClient.ProcessJobs(_Stop.Token);
return Task.CompletedTask;
}
CancellationTokenSource _Stop;
public async Task StopAsync(CancellationToken cancellationToken)
{
if (_Stop == null)
2019-03-27 18:58:56 +09:00
return;
2019-01-16 19:14:45 +09:00
_Stop.Cancel();
try
{
await _Loop;
}
catch (OperationCanceledException)
{
}
try
{
await BackgroundJobClient.WaitAllRunning(cancellationToken);
}
catch (OperationCanceledException)
{
}
2019-01-16 19:14:45 +09:00
}
}
public class BackgroundJobClient : IBackgroundJobClient
{
class BackgroundJob
{
public Func<CancellationToken, Task> Action;
2019-01-16 19:14:45 +09:00
public TimeSpan Delay;
public IDelay DelayImplementation;
public BackgroundJob(Func<CancellationToken, Task> action, TimeSpan delay, IDelay delayImplementation)
2019-01-16 19:14:45 +09:00
{
this.Action = action;
this.Delay = delay;
this.DelayImplementation = delayImplementation;
}
public async Task Run(CancellationToken cancellationToken)
{
await DelayImplementation.Wait(Delay, cancellationToken);
await Action(cancellationToken);
2019-01-16 19:14:45 +09:00
}
}
public IDelay Delay { get; set; } = TaskDelay.Instance;
public int GetExecutingCount()
{
lock (_Processing)
{
2020-01-12 15:32:26 +09:00
return _Processing.Count;
2019-01-16 19:14:45 +09:00
}
}
private readonly Channel<BackgroundJob> _Jobs = Channel.CreateUnbounded<BackgroundJob>();
readonly HashSet<Task> _Processing = new HashSet<Task>();
public void Schedule(Func<CancellationToken, Task> act, TimeSpan scheduledIn)
2019-01-16 19:14:45 +09:00
{
_Jobs.Writer.TryWrite(new BackgroundJob(act, scheduledIn, Delay));
2019-01-16 19:14:45 +09:00
}
public async Task WaitAllRunning(CancellationToken cancellationToken)
{
Task[] processing = null;
lock (_Processing)
{
if (_Processing.Count == 0)
return;
2019-01-16 19:14:45 +09:00
processing = _Processing.ToArray();
}
try
{
await Task.WhenAll(processing).WithCancellation(cancellationToken);
}
2019-03-31 13:31:50 +09:00
catch (Exception) when (!cancellationToken.IsCancellationRequested)
2019-01-16 19:14:45 +09:00
{
}
}
public async Task ProcessJobs(CancellationToken cancellationToken)
{
while (await _Jobs.Reader.WaitToReadAsync(cancellationToken))
{
if (_Jobs.Reader.TryRead(out var job))
{
var processing = job.Run(cancellationToken);
lock (_Processing)
{
_Processing.Add(processing);
}
_ = processing.ContinueWith(t =>
2019-01-16 19:14:45 +09:00
{
if (t.IsFaulted)
{
Logs.PayServer.LogWarning(t.Exception, "Unhandled exception while job running");
}
lock (_Processing)
{
_Processing.Remove(processing);
}
}, default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
}
}
}
}