From 1b2122cd353a45cd5d8eb77077bc01058a859f69 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Fri, 18 Aug 2023 23:38:45 +0900 Subject: [PATCH 1/4] Don't overload core with mempool tx requests --- backend/src/api/transaction-utils.ts | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/backend/src/api/transaction-utils.ts b/backend/src/api/transaction-utils.ts index 02ee7c055..9fb633312 100644 --- a/backend/src/api/transaction-utils.ts +++ b/backend/src/api/transaction-utils.ts @@ -74,8 +74,18 @@ class TransactionUtils { public async $getMempoolTransactionsExtended(txids: string[], addPrevouts = false, lazyPrevouts = false, forceCore = false): Promise { if (forceCore || config.MEMPOOL.BACKEND !== 'esplora') { - const results = await Promise.allSettled(txids.map(txid => this.$getTransactionExtended(txid, addPrevouts, lazyPrevouts, forceCore, true))); - return (results.filter(r => r.status === 'fulfilled') as PromiseFulfilledResult[]).map(r => r.value); + const results: MempoolTransactionExtended[] = []; + for (const txid of txids) { + try { + const result = await this.$getMempoolTransactionExtended(txid, addPrevouts, lazyPrevouts, forceCore); + if (result) { + results.push(result); + } + } catch { + // skip failures + } + } + return results; } else { const transactions = await bitcoinApi.$getMempoolTransactions(txids); return transactions.map(transaction => { From e4fcadf39b4bdfc8f0cba15b404f02cb58cb7ce5 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Sat, 19 Aug 2023 02:06:57 +0900 Subject: [PATCH 2/4] More verbose comments on $getMempoolTransactionsExtended --- backend/src/api/transaction-utils.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/backend/src/api/transaction-utils.ts b/backend/src/api/transaction-utils.ts index 9fb633312..9b29784ce 100644 --- a/backend/src/api/transaction-utils.ts +++ b/backend/src/api/transaction-utils.ts @@ -82,7 +82,8 @@ class TransactionUtils { results.push(result); } } catch { - // skip failures + // we don't always expect to find a transaction for every txid + // so it's fine to silently skip failures } } return results; From 2a8a403da7078372ed9fcbc177df1cf6dac7ef36 Mon Sep 17 00:00:00 2001 From: junderw Date: Fri, 18 Aug 2023 23:42:13 -0700 Subject: [PATCH 3/4] Use p-limit to limit concurrent requests --- backend/src/api/transaction-utils.ts | 20 ++- backend/src/utils/p-limit.ts | 179 +++++++++++++++++++++++++++ 2 files changed, 186 insertions(+), 13 deletions(-) create mode 100644 backend/src/utils/p-limit.ts diff --git a/backend/src/api/transaction-utils.ts b/backend/src/api/transaction-utils.ts index 9b29784ce..00d8c9af3 100644 --- a/backend/src/api/transaction-utils.ts +++ b/backend/src/api/transaction-utils.ts @@ -5,6 +5,7 @@ import bitcoinApi, { bitcoinCoreApi } from './bitcoin/bitcoin-api-factory'; import * as bitcoinjs from 'bitcoinjs-lib'; import logger from '../logger'; import config from '../config'; +import pLimit from '../utils/p-limit'; class TransactionUtils { constructor() { } @@ -74,19 +75,12 @@ class TransactionUtils { public async $getMempoolTransactionsExtended(txids: string[], addPrevouts = false, lazyPrevouts = false, forceCore = false): Promise { if (forceCore || config.MEMPOOL.BACKEND !== 'esplora') { - const results: MempoolTransactionExtended[] = []; - for (const txid of txids) { - try { - const result = await this.$getMempoolTransactionExtended(txid, addPrevouts, lazyPrevouts, forceCore); - if (result) { - results.push(result); - } - } catch { - // we don't always expect to find a transaction for every txid - // so it's fine to silently skip failures - } - } - return results; + const limiter = pLimit(32); // Run 32 requests at a time + const results = await Promise.allSettled(txids.map( + txid => limiter(() => this.$getMempoolTransactionExtended(txid, addPrevouts, lazyPrevouts, forceCore)) + )); + return results.filter(reply => reply.status === 'fulfilled') + .map(r => (r as PromiseFulfilledResult).value); } else { const transactions = await bitcoinApi.$getMempoolTransactions(txids); return transactions.map(transaction => { diff --git a/backend/src/utils/p-limit.ts b/backend/src/utils/p-limit.ts new file mode 100644 index 000000000..20cead411 --- /dev/null +++ b/backend/src/utils/p-limit.ts @@ -0,0 +1,179 @@ +/* +MIT License + +Copyright (c) Sindre Sorhus (https://sindresorhus.com) + +Permission is hereby granted, free of charge, to any person obtaining a copy of this +software and associated documentation files (the "Software"), to deal in the Software +without restriction, including without limitation the rights to use, copy, modify, +merge, publish, distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice shall be included in all copies +or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, +INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF +CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE +OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +/* +How it works: +`this._head` is an instance of `Node` which keeps track of its current value and nests +another instance of `Node` that keeps the value that comes after it. When a value is +provided to `.enqueue()`, the code needs to iterate through `this._head`, going deeper +and deeper to find the last value. However, iterating through every single item is slow. +This problem is solved by saving a reference to the last value as `this._tail` so that +it can reference it to add a new value. +*/ + +class Node { + value; + next; + + constructor(value) { + this.value = value; + } +} + +class Queue { + private _head; + private _tail; + private _size; + + constructor() { + this.clear(); + } + + enqueue(value) { + const node = new Node(value); + + if (this._head) { + this._tail.next = node; + this._tail = node; + } else { + this._head = node; + this._tail = node; + } + + this._size++; + } + + dequeue() { + const current = this._head; + if (!current) { + return; + } + + this._head = this._head.next; + this._size--; + return current.value; + } + + clear() { + this._head = undefined; + this._tail = undefined; + this._size = 0; + } + + get size() { + return this._size; + } + + *[Symbol.iterator]() { + let current = this._head; + + while (current) { + yield current.value; + current = current.next; + } + } +} + +interface LimitFunction { + readonly activeCount: number; + readonly pendingCount: number; + clearQueue: () => void; + ( + fn: (...args: Arguments) => PromiseLike | ReturnType, + ...args: Arguments + ): Promise; +} + +export default function pLimit(concurrency: number): LimitFunction { + if ( + !( + (Number.isInteger(concurrency) || + concurrency === Number.POSITIVE_INFINITY) && + concurrency > 0 + ) + ) { + throw new TypeError('Expected `concurrency` to be a number from 1 and up'); + } + + const queue = new Queue(); + let activeCount = 0; + + const next = () => { + activeCount--; + + if (queue.size > 0) { + queue.dequeue()(); + } + }; + + const run = async (fn, resolve, args) => { + activeCount++; + + const result = (async () => fn(...args))(); + + resolve(result); + + try { + await result; + } catch {} + + next(); + }; + + const enqueue = (fn, resolve, args) => { + queue.enqueue(run.bind(undefined, fn, resolve, args)); + + (async () => { + // This function needs to wait until the next microtask before comparing + // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously + // when the run function is dequeued and called. The comparison in the if-statement + // needs to happen asynchronously as well to get an up-to-date value for `activeCount`. + await Promise.resolve(); + + if (activeCount < concurrency && queue.size > 0) { + queue.dequeue()(); + } + })(); + }; + + const generator = (fn, ...args) => + new Promise((resolve) => { + enqueue(fn, resolve, args); + }); + + Object.defineProperties(generator, { + activeCount: { + get: () => activeCount, + }, + pendingCount: { + get: () => queue.size, + }, + clearQueue: { + value: () => { + queue.clear(); + }, + }, + }); + + return generator as any; +} From 2819cea50943c079846cb2fdfd225314444b1d07 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Sat, 19 Aug 2023 19:02:30 +0900 Subject: [PATCH 4/4] Reduce core mempool tx sync to 8 concurrent requests --- backend/src/api/transaction-utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/api/transaction-utils.ts b/backend/src/api/transaction-utils.ts index 00d8c9af3..ef4a34012 100644 --- a/backend/src/api/transaction-utils.ts +++ b/backend/src/api/transaction-utils.ts @@ -75,7 +75,7 @@ class TransactionUtils { public async $getMempoolTransactionsExtended(txids: string[], addPrevouts = false, lazyPrevouts = false, forceCore = false): Promise { if (forceCore || config.MEMPOOL.BACKEND !== 'esplora') { - const limiter = pLimit(32); // Run 32 requests at a time + const limiter = pLimit(8); // Run 8 requests at a time const results = await Promise.allSettled(txids.map( txid => limiter(() => this.$getMempoolTransactionExtended(txid, addPrevouts, lazyPrevouts, forceCore)) ));