using System; using System.Collections.Concurrent; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace BTCPayServer { public class CustomThreadPool : IDisposable { readonly CancellationTokenSource _Cancel = new CancellationTokenSource(); readonly TaskCompletionSource _Exited; int _ExitedCount = 0; readonly Thread[] _Threads; Exception _UnhandledException; readonly BlockingCollection<(Action, TaskCompletionSource)> _Actions = new BlockingCollection<(Action, TaskCompletionSource)>(new ConcurrentQueue<(Action, TaskCompletionSource)>()); public CustomThreadPool(int threadCount, string threadName) { if (threadCount <= 0) throw new ArgumentOutOfRangeException(nameof(threadCount)); _Exited = new TaskCompletionSource(); _Threads = Enumerable.Range(0, threadCount).Select(_ => new Thread(RunLoop) { Name = threadName }).ToArray(); foreach (var t in _Threads) t.Start(); } public void Do(Action act) { DoAsync(act).GetAwaiter().GetResult(); } public T Do(Func act) { return DoAsync(act).GetAwaiter().GetResult(); } public async Task DoAsync(Func act) { TaskCompletionSource done = new TaskCompletionSource(); _Actions.Add((() => { try { done.TrySetResult(act()); } catch (Exception ex) { done.TrySetException(ex); } } , done)); return (T)(await done.Task.ConfigureAwait(false)); } public Task DoAsync(Action act) { return DoAsync(() => { act(); return null; }); } void RunLoop() { try { foreach (var act in _Actions.GetConsumingEnumerable(_Cancel.Token)) { act.Item1(); } } catch (OperationCanceledException) when (_Cancel.IsCancellationRequested) { } catch (Exception ex) { _Cancel.Cancel(); _UnhandledException = ex; } if (Interlocked.Increment(ref _ExitedCount) == _Threads.Length) { foreach (var action in _Actions) { try { action.Item2.TrySetCanceled(); } catch { } } _Exited.TrySetResult(true); } } public void Dispose() { _Cancel.Cancel(); _Exited.Task.GetAwaiter().GetResult(); } } }