Use p-limit to limit concurrent requests

This commit is contained in:
junderw 2023-08-18 23:42:13 -07:00
parent e4fcadf39b
commit ebf43bd074
No known key found for this signature in database
GPG Key ID: B256185D3A971908
2 changed files with 186 additions and 13 deletions

View File

@ -5,6 +5,7 @@ import bitcoinApi, { bitcoinCoreApi } from './bitcoin/bitcoin-api-factory';
import * as bitcoinjs from 'bitcoinjs-lib';
import logger from '../logger';
import config from '../config';
import pLimit from '../utils/p-limit';
class TransactionUtils {
constructor() { }
@ -74,19 +75,12 @@ class TransactionUtils {
public async $getMempoolTransactionsExtended(txids: string[], addPrevouts = false, lazyPrevouts = false, forceCore = false): Promise<MempoolTransactionExtended[]> {
if (forceCore || config.MEMPOOL.BACKEND !== 'esplora') {
const results: MempoolTransactionExtended[] = [];
for (const txid of txids) {
try {
const result = await this.$getMempoolTransactionExtended(txid, addPrevouts, lazyPrevouts, forceCore);
if (result) {
results.push(result);
}
} catch {
// we don't always expect to find a transaction for every txid
// so it's fine to silently skip failures
}
}
return results;
const limiter = pLimit(32); // Run 32 requests at a time
const results = await Promise.allSettled(txids.map(
txid => limiter(() => this.$getMempoolTransactionExtended(txid, addPrevouts, lazyPrevouts, forceCore))
));
return results.filter(reply => reply.status === 'fulfilled')
.map(r => (r as PromiseFulfilledResult<MempoolTransactionExtended>).value);
} else {
const transactions = await bitcoinApi.$getMempoolTransactions(txids);
return transactions.map(transaction => {

View File

@ -0,0 +1,179 @@
/*
MIT License
Copyright (c) Sindre Sorhus <sindresorhus@gmail.com> (https://sindresorhus.com)
Permission is hereby granted, free of charge, to any person obtaining a copy of this
software and associated documentation files (the "Software"), to deal in the Software
without restriction, including without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice shall be included in all copies
or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/*
How it works:
`this._head` is an instance of `Node` which keeps track of its current value and nests
another instance of `Node` that keeps the value that comes after it. When a value is
provided to `.enqueue()`, the code needs to iterate through `this._head`, going deeper
and deeper to find the last value. However, iterating through every single item is slow.
This problem is solved by saving a reference to the last value as `this._tail` so that
it can reference it to add a new value.
*/
class Node {
value;
next;
constructor(value) {
this.value = value;
}
}
class Queue {
private _head;
private _tail;
private _size;
constructor() {
this.clear();
}
enqueue(value) {
const node = new Node(value);
if (this._head) {
this._tail.next = node;
this._tail = node;
} else {
this._head = node;
this._tail = node;
}
this._size++;
}
dequeue() {
const current = this._head;
if (!current) {
return;
}
this._head = this._head.next;
this._size--;
return current.value;
}
clear() {
this._head = undefined;
this._tail = undefined;
this._size = 0;
}
get size() {
return this._size;
}
*[Symbol.iterator]() {
let current = this._head;
while (current) {
yield current.value;
current = current.next;
}
}
}
interface LimitFunction {
readonly activeCount: number;
readonly pendingCount: number;
clearQueue: () => void;
<Arguments extends unknown[], ReturnType>(
fn: (...args: Arguments) => PromiseLike<ReturnType> | ReturnType,
...args: Arguments
): Promise<ReturnType>;
}
export default function pLimit(concurrency: number): LimitFunction {
if (
!(
(Number.isInteger(concurrency) ||
concurrency === Number.POSITIVE_INFINITY) &&
concurrency > 0
)
) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
const queue = new Queue();
let activeCount = 0;
const next = () => {
activeCount--;
if (queue.size > 0) {
queue.dequeue()();
}
};
const run = async (fn, resolve, args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
const enqueue = (fn, resolve, args) => {
queue.enqueue(run.bind(undefined, fn, resolve, args));
(async () => {
// This function needs to wait until the next microtask before comparing
// `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
// when the run function is dequeued and called. The comparison in the if-statement
// needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
await Promise.resolve();
if (activeCount < concurrency && queue.size > 0) {
queue.dequeue()();
}
})();
};
const generator = (fn, ...args) =>
new Promise((resolve) => {
enqueue(fn, resolve, args);
});
Object.defineProperties(generator, {
activeCount: {
get: () => activeCount,
},
pendingCount: {
get: () => queue.size,
},
clearQueue: {
value: () => {
queue.clear();
},
},
});
return generator as any;
}