From 5138f9a25407da53652b1775c7cc9b37e6b82cb2 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Fri, 12 May 2023 16:29:45 -0600 Subject: [PATCH 01/10] Implement Redis cache for block and mempool data --- backend/package-lock.json | 158 ++++++++++++++++++ backend/package.json | 3 +- .../__fixtures__/mempool-config.template.json | 5 + backend/src/__tests__/config.test.ts | 8 + backend/src/api/blocks.ts | 12 +- backend/src/api/disk-cache.ts | 6 +- backend/src/api/mempool.ts | 12 +- backend/src/api/redis-cache.ts | 140 ++++++++++++++++ backend/src/config.ts | 16 +- backend/src/index.ts | 7 +- 10 files changed, 358 insertions(+), 9 deletions(-) create mode 100644 backend/src/api/redis-cache.ts diff --git a/backend/package-lock.json b/backend/package-lock.json index 0a0b8b9d1..1a92552cb 100644 --- a/backend/package-lock.json +++ b/backend/package-lock.json @@ -19,6 +19,7 @@ "maxmind": "~4.3.11", "mysql2": "~3.5.2", "rust-gbt": "file:./rust-gbt", + "redis": "^4.6.6", "socks-proxy-agent": "~7.0.0", "typescript": "~4.9.3", "ws": "~8.13.0" @@ -1555,6 +1556,64 @@ "node": ">= 8" } }, + "node_modules/@redis/bloom": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@redis/bloom/-/bloom-1.2.0.tgz", + "integrity": "sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/client": { + "version": "1.5.7", + "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.7.tgz", + "integrity": "sha512-gaOBOuJPjK5fGtxSseaKgSvjiZXQCdLlGg9WYQst+/GRUjmXaiB5kVkeQMRtPc7Q2t93XZcJfBMSwzs/XS9UZw==", + "dependencies": { + "cluster-key-slot": "1.1.2", + "generic-pool": "3.9.0", + "yallist": "4.0.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/@redis/client/node_modules/yallist": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", + "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==" + }, + "node_modules/@redis/graph": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@redis/graph/-/graph-1.1.0.tgz", + "integrity": "sha512-16yZWngxyXPd+MJxeSr0dqh2AIOi8j9yXKcKCwVaKDbH3HTuETpDVPcLujhFYVPtYrngSco31BUcSa9TH31Gqg==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/json": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@redis/json/-/json-1.0.4.tgz", + "integrity": "sha512-LUZE2Gdrhg0Rx7AN+cZkb1e6HjoSKaeeW8rYnt89Tly13GBI5eP4CwDVr+MY8BAYfCg4/N15OUrtLoona9uSgw==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/search": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/@redis/search/-/search-1.1.2.tgz", + "integrity": "sha512-/cMfstG/fOh/SsE+4/BQGeuH/JJloeWuH+qJzM8dbxuWvdWibWAOAHHCZTMPhV3xIlH4/cUEIA8OV5QnYpaVoA==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/time-series": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-1.0.4.tgz", + "integrity": "sha512-ThUIgo2U/g7cCuZavucQTQzA9g9JbDDY2f64u3AbAoz/8vE2lt2U37LamDUVChhaDA3IRT9R6VvJwqnUfTJzng==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, "node_modules/@sinclair/typebox": { "version": "0.25.24", "resolved": "https://registry.npmjs.org/@sinclair/typebox/-/typebox-0.25.24.tgz", @@ -2718,6 +2777,14 @@ "node": ">=12" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/co": { "version": "4.6.0", "resolved": "https://registry.npmjs.org/co/-/co-4.6.0.tgz", @@ -3678,6 +3745,14 @@ "is-property": "^1.0.2" } }, + "node_modules/generic-pool": { + "version": "3.9.0", + "resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz", + "integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==", + "engines": { + "node": ">= 4" + } + }, "node_modules/gensync": { "version": "1.0.0-beta.2", "resolved": "https://registry.npmjs.org/gensync/-/gensync-1.0.0-beta.2.tgz", @@ -6577,6 +6652,19 @@ "integrity": "sha512-xWGDIW6x921xtzPkhiULtthJHoJvBbF3q26fzloPCK0hsvxtPVelvftw3zjbHWSkR2km9Z+4uxbDDK/6Zw9B8w==", "dev": true }, + "node_modules/redis": { + "version": "4.6.6", + "resolved": "https://registry.npmjs.org/redis/-/redis-4.6.6.tgz", + "integrity": "sha512-aLs2fuBFV/VJ28oLBqYykfnhGGkFxvx0HdCEBYdJ99FFbSEMZ7c1nVKwR6ZRv+7bb7JnC0mmCzaqu8frgOYhpA==", + "dependencies": { + "@redis/bloom": "1.2.0", + "@redis/client": "1.5.7", + "@redis/graph": "1.1.0", + "@redis/json": "1.0.4", + "@redis/search": "1.1.2", + "@redis/time-series": "1.0.4" + } + }, "node_modules/require-directory": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/require-directory/-/require-directory-2.1.1.tgz", @@ -8704,6 +8792,53 @@ "fastq": "^1.6.0" } }, + "@redis/bloom": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@redis/bloom/-/bloom-1.2.0.tgz", + "integrity": "sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==", + "requires": {} + }, + "@redis/client": { + "version": "1.5.7", + "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.5.7.tgz", + "integrity": "sha512-gaOBOuJPjK5fGtxSseaKgSvjiZXQCdLlGg9WYQst+/GRUjmXaiB5kVkeQMRtPc7Q2t93XZcJfBMSwzs/XS9UZw==", + "requires": { + "cluster-key-slot": "1.1.2", + "generic-pool": "3.9.0", + "yallist": "4.0.0" + }, + "dependencies": { + "yallist": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/yallist/-/yallist-4.0.0.tgz", + "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==" + } + } + }, + "@redis/graph": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@redis/graph/-/graph-1.1.0.tgz", + "integrity": "sha512-16yZWngxyXPd+MJxeSr0dqh2AIOi8j9yXKcKCwVaKDbH3HTuETpDVPcLujhFYVPtYrngSco31BUcSa9TH31Gqg==", + "requires": {} + }, + "@redis/json": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@redis/json/-/json-1.0.4.tgz", + "integrity": "sha512-LUZE2Gdrhg0Rx7AN+cZkb1e6HjoSKaeeW8rYnt89Tly13GBI5eP4CwDVr+MY8BAYfCg4/N15OUrtLoona9uSgw==", + "requires": {} + }, + "@redis/search": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/@redis/search/-/search-1.1.2.tgz", + "integrity": "sha512-/cMfstG/fOh/SsE+4/BQGeuH/JJloeWuH+qJzM8dbxuWvdWibWAOAHHCZTMPhV3xIlH4/cUEIA8OV5QnYpaVoA==", + "requires": {} + }, + "@redis/time-series": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-1.0.4.tgz", + "integrity": "sha512-ThUIgo2U/g7cCuZavucQTQzA9g9JbDDY2f64u3AbAoz/8vE2lt2U37LamDUVChhaDA3IRT9R6VvJwqnUfTJzng==", + "requires": {} + }, "@sinclair/typebox": { "version": "0.25.24", "resolved": "https://registry.npmjs.org/@sinclair/typebox/-/typebox-0.25.24.tgz", @@ -9604,6 +9739,11 @@ "wrap-ansi": "^7.0.0" } }, + "cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==" + }, "co": { "version": "4.6.0", "resolved": "https://registry.npmjs.org/co/-/co-4.6.0.tgz", @@ -10332,6 +10472,11 @@ "is-property": "^1.0.2" } }, + "generic-pool": { + "version": "3.9.0", + "resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz", + "integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==" + }, "gensync": { "version": "1.0.0-beta.2", "resolved": "https://registry.npmjs.org/gensync/-/gensync-1.0.0-beta.2.tgz", @@ -12454,6 +12599,19 @@ "integrity": "sha512-xWGDIW6x921xtzPkhiULtthJHoJvBbF3q26fzloPCK0hsvxtPVelvftw3zjbHWSkR2km9Z+4uxbDDK/6Zw9B8w==", "dev": true }, + "redis": { + "version": "4.6.6", + "resolved": "https://registry.npmjs.org/redis/-/redis-4.6.6.tgz", + "integrity": "sha512-aLs2fuBFV/VJ28oLBqYykfnhGGkFxvx0HdCEBYdJ99FFbSEMZ7c1nVKwR6ZRv+7bb7JnC0mmCzaqu8frgOYhpA==", + "requires": { + "@redis/bloom": "1.2.0", + "@redis/client": "1.5.7", + "@redis/graph": "1.1.0", + "@redis/json": "1.0.4", + "@redis/search": "1.1.2", + "@redis/time-series": "1.0.4" + } + }, "require-directory": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/require-directory/-/require-directory-2.1.1.tgz", diff --git a/backend/package.json b/backend/package.json index 7ebc2e970..24da55e17 100644 --- a/backend/package.json +++ b/backend/package.json @@ -47,13 +47,14 @@ "maxmind": "~4.3.11", "mysql2": "~3.5.2", "rust-gbt": "file:./rust-gbt", + "redis": "^4.6.6", "socks-proxy-agent": "~7.0.0", "typescript": "~4.9.3", "ws": "~8.13.0" }, "devDependencies": { - "@babel/core": "^7.21.3", "@babel/code-frame": "^7.18.6", + "@babel/core": "^7.21.3", "@types/compression": "^1.7.2", "@types/crypto-js": "^4.1.1", "@types/express": "^4.17.17", diff --git a/backend/src/__fixtures__/mempool-config.template.json b/backend/src/__fixtures__/mempool-config.template.json index 4213f0ffb..ab700c466 100644 --- a/backend/src/__fixtures__/mempool-config.template.json +++ b/backend/src/__fixtures__/mempool-config.template.json @@ -10,6 +10,7 @@ "AUTOMATIC_BLOCK_REINDEXING": false, "POLL_RATE_MS": 3, "CACHE_DIR": "__MEMPOOL_CACHE_DIR__", + "CACHE_ENABLED": true, "CLEAR_PROTECTION_MINUTES": 4, "RECOMMENDED_FEE_PERCENTILE": 5, "BLOCK_WEIGHT_UNITS": 6, @@ -127,5 +128,9 @@ "AUDIT": false, "AUDIT_START_HEIGHT": 774000, "SERVERS": [] + }, + "REDIS": { + "ENABLED": false, + "UNIX_SOCKET_PATH": "/tmp/redis.sock" } } diff --git a/backend/src/__tests__/config.test.ts b/backend/src/__tests__/config.test.ts index dc1beaa46..0c06b03e1 100644 --- a/backend/src/__tests__/config.test.ts +++ b/backend/src/__tests__/config.test.ts @@ -23,6 +23,7 @@ describe('Mempool Backend Config', () => { AUTOMATIC_BLOCK_REINDEXING: false, POLL_RATE_MS: 2000, CACHE_DIR: './cache', + CACHE_ENABLED: true, CLEAR_PROTECTION_MINUTES: 20, RECOMMENDED_FEE_PERCENTILE: 50, BLOCK_WEIGHT_UNITS: 4000000, @@ -127,6 +128,11 @@ describe('Mempool Backend Config', () => { AUDIT_START_HEIGHT: 774000, SERVERS: [] }); + + expect(config.REDIS).toStrictEqual({ + ENABLED: false, + UNIX_SOCKET_PATH: '' + }); }); }); @@ -160,6 +166,8 @@ describe('Mempool Backend Config', () => { expect(config.PRICE_DATA_SERVER).toStrictEqual(fixture.PRICE_DATA_SERVER); expect(config.EXTERNAL_DATA_SERVER).toStrictEqual(fixture.EXTERNAL_DATA_SERVER); + + expect(config.REDIS).toStrictEqual(fixture.REDIS); }); }); diff --git a/backend/src/api/blocks.ts b/backend/src/api/blocks.ts index ba7a55149..f86bc53e9 100644 --- a/backend/src/api/blocks.ts +++ b/backend/src/api/blocks.ts @@ -26,6 +26,8 @@ import PricesRepository from '../repositories/PricesRepository'; import priceUpdater from '../tasks/price-updater'; import chainTips from './chain-tips'; import websocketHandler from './websocket-handler'; +import redisCache from './redis-cache'; +import rbfCache from './rbf-cache'; class Blocks { private blocks: BlockExtended[] = []; @@ -804,10 +806,18 @@ class Blocks { if (this.newBlockCallbacks.length) { this.newBlockCallbacks.forEach((cb) => cb(blockExtended, txIds, transactions)); } - if (!memPool.hasPriority() && (block.height % config.MEMPOOL.DISK_CACHE_BLOCK_INTERVAL === 0)) { + if (config.MEMPOOL.CACHE_ENABLED && !memPool.hasPriority() && (block.height % config.MEMPOOL.DISK_CACHE_BLOCK_INTERVAL === 0)) { diskCache.$saveCacheToDisk(); } + // Update Redis cache + if (config.REDIS.ENABLED) { + await redisCache.$updateBlocks(this.blocks); + await redisCache.$updateBlockSummaries(this.blockSummaries); + await redisCache.$removeTransactions(txIds); + await rbfCache.updateCache(); + } + handledBlocks++; } diff --git a/backend/src/api/disk-cache.ts b/backend/src/api/disk-cache.ts index 1e428d8b6..04328a72a 100644 --- a/backend/src/api/disk-cache.ts +++ b/backend/src/api/disk-cache.ts @@ -29,7 +29,7 @@ class DiskCache { }; constructor() { - if (!cluster.isPrimary) { + if (!cluster.isPrimary || !config.MEMPOOL.CACHE_ENABLED) { return; } process.on('SIGINT', (e) => { @@ -39,7 +39,7 @@ class DiskCache { } async $saveCacheToDisk(sync: boolean = false): Promise { - if (!cluster.isPrimary) { + if (!cluster.isPrimary || !config.MEMPOOL.CACHE_ENABLED) { return; } if (this.isWritingCache) { @@ -175,7 +175,7 @@ class DiskCache { } async $loadMempoolCache(): Promise { - if (!fs.existsSync(DiskCache.FILE_NAME)) { + if (!config.MEMPOOL.CACHE_ENABLED || !fs.existsSync(DiskCache.FILE_NAME)) { return; } try { diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index e822ba329..fff7ad20e 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -9,7 +9,7 @@ import loadingIndicators from './loading-indicators'; import bitcoinClient from './bitcoin/bitcoin-client'; import bitcoinSecondClient from './bitcoin/bitcoin-second-client'; import rbfCache from './rbf-cache'; -import { IEsploraApi } from './bitcoin/esplora-api.interface'; +import redisCache from './redis-cache'; class Mempool { private inSync: boolean = false; @@ -102,6 +102,10 @@ class Mempool { await this.$asyncMempoolChangedCallback(this.mempoolCache, count, [], []); } this.addToSpendMap(Object.values(this.mempoolCache)); + if (config.MEMPOOL.CACHE_ENABLED && config.REDIS.ENABLED) { + logger.debug('copying mempool from disk cache into Redis'); + await redisCache.$addTransactions(Object.values(mempoolData)); + } } public async $reloadMempool(expectedCount: number): Promise { @@ -318,6 +322,12 @@ class Mempool { loadingIndicators.setProgress('mempool', 100); } + // Update Redis cache + if (config.REDIS.ENABLED) { + await redisCache.$addTransactions(newTransactions); + await redisCache.$removeTransactions(deletedTransactions.map(tx => tx.txid)); + } + const end = new Date().getTime(); const time = end - start; logger.debug(`Mempool updated in ${time / 1000} seconds. New size: ${Object.keys(this.mempoolCache).length} (${diff > 0 ? '+' + diff : diff})`); diff --git a/backend/src/api/redis-cache.ts b/backend/src/api/redis-cache.ts new file mode 100644 index 000000000..c3cdc9e28 --- /dev/null +++ b/backend/src/api/redis-cache.ts @@ -0,0 +1,140 @@ +import { createClient } from 'redis'; +import memPool from './mempool'; +import blocks from './blocks'; +import logger from '../logger'; +import config from '../config'; +import { BlockExtended, BlockSummary, TransactionExtended } from '../mempool.interfaces'; + +class RedisCache { + private client; + private connected = false; + + constructor() { + if (config.REDIS.ENABLED) { + const redisConfig = { + socket: { + path: config.REDIS.UNIX_SOCKET_PATH + } + }; + this.client = createClient(redisConfig); + this.client.on('error', (e) => { + logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`); + }); + this.$ensureConnected(); + } + } + + private async $ensureConnected(): Promise { + if (!this.connected && config.REDIS.ENABLED) { + return this.client.connect().then(() => { + this.connected = true; + logger.info(`Redis client connected`); + }); + } + } + + async $updateBlocks(blocks: BlockExtended[]) { + try { + await this.$ensureConnected(); + await this.client.json.set('blocks', '$', blocks); + } catch (e) { + logger.warn(`Failed to update blocks in Redis cache: ${e instanceof Error ? e.message : e}`); + } + } + + async $updateBlockSummaries(summaries: BlockSummary[]) { + try { + await this.$ensureConnected(); + await this.client.json.set('block-summaries', '$', summaries); + } catch (e) { + logger.warn(`Failed to update blocks in Redis cache: ${e instanceof Error ? e.message : e}`); + } + } + + async $addTransactions(newTransactions: TransactionExtended[]) { + try { + await this.$ensureConnected(); + await Promise.all(newTransactions.map(tx => { + return this.client.json.set('tx:' + tx.txid, '$', tx); + })); + } catch (e) { + logger.warn(`Failed to add ${newTransactions.length} transactions to Redis cache: ${e instanceof Error ? e.message : e}`); + } + } + + async $removeTransactions(transactions: string[]) { + try { + await this.$ensureConnected(); + await Promise.all(transactions.map(txid => { + return this.client.del('tx:' + txid); + })); + } catch (e) { + logger.warn(`Failed to remove ${transactions.length} transactions from Redis cache: ${e instanceof Error ? e.message : e}`); + } + } + + async $getBlocks(): Promise { + try { + await this.$ensureConnected(); + return this.client.json.get('blocks'); + } catch (e) { + logger.warn(`Failed to retrieve blocks from Redis cache: ${e instanceof Error ? e.message : e}`); + return []; + } + } + + async $getBlockSummaries(): Promise { + try { + await this.$ensureConnected(); + return this.client.json.get('block-summaries'); + } catch (e) { + logger.warn(`Failed to retrieve blocks from Redis cache: ${e instanceof Error ? e.message : e}`); + return []; + } + } + + async $getMempool(): Promise<{ [txid: string]: TransactionExtended }> { + const mempool = {}; + try { + await this.$ensureConnected(); + const keys = await this.client.keys('tx:*'); + const promises: Promise[] = []; + for (let i = 0; i < keys.length; i += 10000) { + const keySlice = keys.slice(i, i + 10000); + if (!keySlice.length) { + continue; + } + promises.push(this.client.json.mGet(keySlice, '$').then(chunk => { + for (const txs of chunk) { + for (const tx of txs) { + if (tx) { + mempool[tx.txid] = tx; + } + } + } + })); + } + await Promise.all(promises); + } catch (e) { + logger.warn(`Failed to retrieve mempool from Redis cache: ${e instanceof Error ? e.message : e}`); + } + return mempool; + } + + async $loadCache() { + logger.info('Restoring mempool and blocks data from Redis cache'); + // Load block data + const loadedBlocks = await this.$getBlocks(); + const loadedBlockSummaries = await this.$getBlockSummaries(); + // Load mempool + const loadedMempool = await this.$getMempool(); + + // Set loaded data + blocks.setBlocks(loadedBlocks || []); + blocks.setBlockSummaries(loadedBlockSummaries || []); + await memPool.$setMempool(loadedMempool); + } + +} + +export default new RedisCache(); diff --git a/backend/src/config.ts b/backend/src/config.ts index 09d279537..3a028d0cd 100644 --- a/backend/src/config.ts +++ b/backend/src/config.ts @@ -12,6 +12,7 @@ interface IConfig { API_URL_PREFIX: string; POLL_RATE_MS: number; CACHE_DIR: string; + CACHE_ENABLED: boolean; CLEAR_PROTECTION_MINUTES: number; RECOMMENDED_FEE_PERCENTILE: number; BLOCK_WEIGHT_UNITS: number; @@ -137,7 +138,11 @@ interface IConfig { AUDIT: boolean; AUDIT_START_HEIGHT: number; SERVERS: string[]; - } + }, + REDIS: { + ENABLED: boolean; + UNIX_SOCKET_PATH: string; + }, } const defaults: IConfig = { @@ -150,6 +155,7 @@ const defaults: IConfig = { 'API_URL_PREFIX': '/api/v1/', 'POLL_RATE_MS': 2000, 'CACHE_DIR': './cache', + 'CACHE_ENABLED': true, 'CLEAR_PROTECTION_MINUTES': 20, 'RECOMMENDED_FEE_PERCENTILE': 50, 'BLOCK_WEIGHT_UNITS': 4000000, @@ -275,7 +281,11 @@ const defaults: IConfig = { 'AUDIT': false, 'AUDIT_START_HEIGHT': 774000, 'SERVERS': [], - } + }, + 'REDIS': { + 'ENABLED': false, + 'UNIX_SOCKET_PATH': '', + }, }; class Config implements IConfig { @@ -296,6 +306,7 @@ class Config implements IConfig { EXTERNAL_DATA_SERVER: IConfig['EXTERNAL_DATA_SERVER']; MAXMIND: IConfig['MAXMIND']; REPLICATION: IConfig['REPLICATION']; + REDIS: IConfig['REDIS']; constructor() { const configs = this.merge(configFromFile, defaults); @@ -316,6 +327,7 @@ class Config implements IConfig { this.EXTERNAL_DATA_SERVER = configs.EXTERNAL_DATA_SERVER; this.MAXMIND = configs.MAXMIND; this.REPLICATION = configs.REPLICATION; + this.REDIS = configs.REDIS; } merge = (...objects: object[]): IConfig => { diff --git a/backend/src/index.ts b/backend/src/index.ts index bbfaa9ff3..51d407f6f 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -41,6 +41,7 @@ import chainTips from './api/chain-tips'; import { AxiosError } from 'axios'; import v8 from 'v8'; import { formatBytes, getBytesUnit } from './utils/format'; +import redisCache from './api/redis-cache'; class Server { private wss: WebSocket.Server | undefined; @@ -122,7 +123,11 @@ class Server { await poolsUpdater.updatePoolsJson(); // Needs to be done before loading the disk cache because we sometimes wipe it await syncAssets.syncAssets$(); if (config.MEMPOOL.ENABLED) { - await diskCache.$loadMempoolCache(); + if (config.MEMPOOL.CACHE_ENABLED) { + await diskCache.$loadMempoolCache(); + } else if (config.REDIS.ENABLED) { + await redisCache.$loadCache(); + } } if (config.STATISTICS.ENABLED && config.DATABASE.ENABLED && cluster.isPrimary) { From aea2b1ec6b46678d4072999741751b2fddbf27d2 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Fri, 12 May 2023 16:31:01 -0600 Subject: [PATCH 02/10] Add RBF data to Redis cache --- backend/src/api/mempool.ts | 1 + backend/src/api/rbf-cache.ts | 104 ++++++++++++++++++++++++++++----- backend/src/api/redis-cache.ts | 48 +++++++++++++++ 3 files changed, 138 insertions(+), 15 deletions(-) diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index fff7ad20e..dc3634a6f 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -326,6 +326,7 @@ class Mempool { if (config.REDIS.ENABLED) { await redisCache.$addTransactions(newTransactions); await redisCache.$removeTransactions(deletedTransactions.map(tx => tx.txid)); + await rbfCache.updateCache(); } const end = new Date().getTime(); diff --git a/backend/src/api/rbf-cache.ts b/backend/src/api/rbf-cache.ts index f28dd0de3..b5ae74072 100644 --- a/backend/src/api/rbf-cache.ts +++ b/backend/src/api/rbf-cache.ts @@ -1,15 +1,17 @@ +import config from "../config"; import logger from "../logger"; import { MempoolTransactionExtended, TransactionStripped } from "../mempool.interfaces"; import bitcoinApi from './bitcoin/bitcoin-api-factory'; import { Common } from "./common"; +import redisCache from "./redis-cache"; -interface RbfTransaction extends TransactionStripped { +export interface RbfTransaction extends TransactionStripped { rbf?: boolean; mined?: boolean; fullRbf?: boolean; } -interface RbfTree { +export interface RbfTree { tx: RbfTransaction; time: number; interval?: number; @@ -28,6 +30,19 @@ export interface ReplacementInfo { newVsize: number; } +enum CacheOp { + Remove = 0, + Add = 1, + Change = 2, +} + +interface CacheEvent { + op: CacheOp; + type: 'tx' | 'tree' | 'exp'; + txid: string, + value?: any, +} + class RbfCache { private replacedBy: Map = new Map(); private replaces: Map = new Map(); @@ -36,11 +51,43 @@ class RbfCache { private treeMap: Map = new Map(); // map of txids to sequence ids private txs: Map = new Map(); private expiring: Map = new Map(); + private cacheQueue: CacheEvent[] = []; constructor() { setInterval(this.cleanup.bind(this), 1000 * 60 * 10); } + private addTx(txid: string, tx: MempoolTransactionExtended): void { + this.txs.set(txid, tx); + this.cacheQueue.push({ op: CacheOp.Add, type: 'tx', txid }); + } + + private addTree(txid: string, tree: RbfTree): void { + this.rbfTrees.set(txid, tree); + this.dirtyTrees.add(txid); + this.cacheQueue.push({ op: CacheOp.Add, type: 'tree', txid }); + } + + private addExpiration(txid: string, expiry: number): void { + this.expiring.set(txid, expiry); + this.cacheQueue.push({ op: CacheOp.Add, type: 'exp', txid, value: expiry }); + } + + private removeTx(txid: string): void { + this.txs.delete(txid); + this.cacheQueue.push({ op: CacheOp.Remove, type: 'tx', txid }); + } + + private removeTree(txid: string): void { + this.rbfTrees.delete(txid); + this.cacheQueue.push({ op: CacheOp.Remove, type: 'tree', txid }); + } + + private removeExpiration(txid: string): void { + this.expiring.delete(txid); + this.cacheQueue.push({ op: CacheOp.Remove, type: 'exp', txid }); + } + public add(replaced: MempoolTransactionExtended[], newTxExtended: MempoolTransactionExtended): void { if (!newTxExtended || !replaced?.length || this.txs.has(newTxExtended.txid)) { return; @@ -49,7 +96,7 @@ class RbfCache { const newTx = Common.stripTransaction(newTxExtended) as RbfTransaction; const newTime = newTxExtended.firstSeen || (Date.now() / 1000); newTx.rbf = newTxExtended.vin.some((v) => v.sequence < 0xfffffffe); - this.txs.set(newTx.txid, newTxExtended); + this.addTx(newTx.txid, newTxExtended); // maintain rbf trees let txFullRbf = false; @@ -66,7 +113,7 @@ class RbfCache { const treeId = this.treeMap.get(replacedTx.txid); if (treeId) { const tree = this.rbfTrees.get(treeId); - this.rbfTrees.delete(treeId); + this.removeTree(treeId); if (tree) { tree.interval = newTime - tree?.time; replacedTrees.push(tree); @@ -83,7 +130,7 @@ class RbfCache { replaces: [], }); treeFullRbf = treeFullRbf || !replacedTx.rbf; - this.txs.set(replacedTx.txid, replacedTxExtended); + this.addTx(replacedTx.txid, replacedTxExtended); } } newTx.fullRbf = txFullRbf; @@ -94,10 +141,9 @@ class RbfCache { fullRbf: treeFullRbf, replaces: replacedTrees }; - this.rbfTrees.set(treeId, newTree); + this.addTree(treeId, newTree); this.updateTreeMap(treeId, newTree); this.replaces.set(newTx.txid, replacedTrees.map(tree => tree.tx.txid)); - this.dirtyTrees.add(treeId); } public has(txId: string): boolean { @@ -191,6 +237,7 @@ class RbfCache { this.setTreeMined(tree, txid); tree.mined = true; this.dirtyTrees.add(treeId); + this.cacheQueue.push({ op: CacheOp.Change, type: 'tree', txid: treeId }); } } this.evict(txid); @@ -199,7 +246,8 @@ class RbfCache { // flag a transaction as removed from the mempool public evict(txid: string, fast: boolean = false): void { if (this.txs.has(txid) && (fast || !this.expiring.has(txid))) { - this.expiring.set(txid, fast ? Date.now() + (1000 * 60 * 10) : Date.now() + (1000 * 86400)); // 24 hours + const expiryTime = fast ? Date.now() + (1000 * 60 * 10) : Date.now() + (1000 * 86400); // 24 hours + this.addExpiration(txid, expiryTime); } } @@ -220,11 +268,11 @@ class RbfCache { const now = Date.now(); for (const txid of this.expiring.keys()) { if ((this.expiring.get(txid) || 0) < now) { - this.expiring.delete(txid); + this.removeExpiration(txid); this.remove(txid); } } - logger.debug(`rbf cache contains ${this.txs.size} txs, ${this.expiring.size} due to expire`); + logger.debug(`rbf cache contains ${this.txs.size} txs, ${this.rbfTrees.size} trees, ${this.expiring.size} due to expire`); } // remove a transaction & all previous versions from the cache @@ -234,14 +282,14 @@ class RbfCache { const replaces = this.replaces.get(txid); this.replaces.delete(txid); this.treeMap.delete(txid); - this.txs.delete(txid); - this.expiring.delete(txid); + this.removeTx(txid); + this.removeExpiration(txid); for (const tx of (replaces || [])) { // recursively remove prior versions from the cache this.replacedBy.delete(tx); // if this is the id of a tree, remove that too if (this.treeMap.get(tx) === tx) { - this.rbfTrees.delete(tx); + this.removeTree(tx); } this.remove(tx); } @@ -273,6 +321,33 @@ class RbfCache { } } + public async updateCache(): Promise { + if (!config.REDIS.ENABLED) { + return; + } + // Update the Redis cache by replaying queued events + for (const e of this.cacheQueue) { + if (e.op === CacheOp.Add || e.op === CacheOp.Change) { + let value = e.value; + switch(e.type) { + case 'tx': { + value = this.txs.get(e.txid); + } break; + case 'tree': { + const tree = this.rbfTrees.get(e.txid); + value = tree ? this.exportTree(tree) : null; + } break; + } + if (value != null) { + await redisCache.$setRbfEntry(e.type, e.txid, value); + } + } else if (e.op === CacheOp.Remove) { + await redisCache.$removeRbfEntry(e.type, e.txid); + } + } + this.cacheQueue = []; + } + public dump(): any { const trees = Array.from(this.rbfTrees.values()).map((tree: RbfTree) => { return this.exportTree(tree); }); @@ -378,8 +453,7 @@ class RbfCache { }; this.treeMap.set(txid, root); if (root === txid) { - this.rbfTrees.set(root, tree); - this.dirtyTrees.add(root); + this.addTree(root, tree); } return tree; } diff --git a/backend/src/api/redis-cache.ts b/backend/src/api/redis-cache.ts index c3cdc9e28..2598c406c 100644 --- a/backend/src/api/redis-cache.ts +++ b/backend/src/api/redis-cache.ts @@ -4,6 +4,7 @@ import blocks from './blocks'; import logger from '../logger'; import config from '../config'; import { BlockExtended, BlockSummary, TransactionExtended } from '../mempool.interfaces'; +import rbfCache from './rbf-cache'; class RedisCache { private client; @@ -73,6 +74,24 @@ class RedisCache { } } + async $setRbfEntry(type: string, txid: string, value: any): Promise { + try { + await this.$ensureConnected(); + await this.client.json.set(`rbf:${type}:${txid}`, '$', value); + } catch (e) { + logger.warn(`Failed to set RBF ${type} in Redis cache: ${e instanceof Error ? e.message : e}`); + } + } + + async $removeRbfEntry(type: string, txid: string): Promise { + try { + await this.$ensureConnected(); + await this.client.del(`rbf:${type}:${txid}`); + } catch (e) { + logger.warn(`Failed to remove RBF ${type} from Redis cache: ${e instanceof Error ? e.message : e}`); + } + } + async $getBlocks(): Promise { try { await this.$ensureConnected(); @@ -121,6 +140,26 @@ class RedisCache { return mempool; } + async $getRbfEntries(type: string): Promise { + try { + await this.$ensureConnected(); + const keys = await this.client.keys(`rbf:${type}:*`); + const promises: Promise[] = []; + for (let i = 0; i < keys.length; i += 10000) { + const keySlice = keys.slice(i, i + 10000); + if (!keySlice.length) { + continue; + } + promises.push(this.client.json.mGet(keySlice, '$').then(chunk => chunk?.length ? chunk.flat().map((v, i) => [keySlice[i].slice(`rbf:${type}:`.length), v]) : [] )); + } + const entries = await Promise.all(promises); + return entries.flat(); + } catch (e) { + logger.warn(`Failed to retrieve Rbf ${type}s from Redis cache: ${e instanceof Error ? e.message : e}`); + return []; + } + } + async $loadCache() { logger.info('Restoring mempool and blocks data from Redis cache'); // Load block data @@ -128,11 +167,20 @@ class RedisCache { const loadedBlockSummaries = await this.$getBlockSummaries(); // Load mempool const loadedMempool = await this.$getMempool(); + // Load rbf data + const rbfTxs = await this.$getRbfEntries('tx'); + const rbfTrees = await this.$getRbfEntries('tree'); + const rbfExpirations = await this.$getRbfEntries('exp'); // Set loaded data blocks.setBlocks(loadedBlocks || []); blocks.setBlockSummaries(loadedBlockSummaries || []); await memPool.$setMempool(loadedMempool); + await rbfCache.load({ + txs: rbfTxs, + trees: rbfTrees.map(loadedTree => loadedTree[1]), + expiring: rbfExpirations, + }); } } From b6cb5394705fae6e8be61d4becbaa8c579c16489 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Fri, 16 Jun 2023 18:55:22 -0400 Subject: [PATCH 03/10] Fix redis feature merge conflicts --- backend/src/api/redis-cache.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/backend/src/api/redis-cache.ts b/backend/src/api/redis-cache.ts index 2598c406c..e89b2302d 100644 --- a/backend/src/api/redis-cache.ts +++ b/backend/src/api/redis-cache.ts @@ -3,7 +3,7 @@ import memPool from './mempool'; import blocks from './blocks'; import logger from '../logger'; import config from '../config'; -import { BlockExtended, BlockSummary, TransactionExtended } from '../mempool.interfaces'; +import { BlockExtended, BlockSummary, MempoolTransactionExtended } from '../mempool.interfaces'; import rbfCache from './rbf-cache'; class RedisCache { @@ -52,7 +52,7 @@ class RedisCache { } } - async $addTransactions(newTransactions: TransactionExtended[]) { + async $addTransactions(newTransactions: MempoolTransactionExtended[]) { try { await this.$ensureConnected(); await Promise.all(newTransactions.map(tx => { @@ -112,12 +112,12 @@ class RedisCache { } } - async $getMempool(): Promise<{ [txid: string]: TransactionExtended }> { + async $getMempool(): Promise<{ [txid: string]: MempoolTransactionExtended }> { const mempool = {}; try { await this.$ensureConnected(); const keys = await this.client.keys('tx:*'); - const promises: Promise[] = []; + const promises: Promise[] = []; for (let i = 0; i < keys.length; i += 10000) { const keySlice = keys.slice(i, i + 10000); if (!keySlice.length) { @@ -144,7 +144,7 @@ class RedisCache { try { await this.$ensureConnected(); const keys = await this.client.keys(`rbf:${type}:*`); - const promises: Promise[] = []; + const promises: Promise[] = []; for (let i = 0; i < keys.length; i += 10000) { const keySlice = keys.slice(i, i + 10000); if (!keySlice.length) { From d65bddd30b2833872c27741d98772fb881c35562 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Fri, 16 Jun 2023 18:56:34 -0400 Subject: [PATCH 04/10] Add transactions to Redis cache in manageable batches --- backend/src/api/mempool.ts | 22 +++++++++++++++++----- backend/src/api/redis-cache.ts | 18 ++++++++++++++++++ 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index dc3634a6f..73a6fdfeb 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -86,6 +86,10 @@ class Mempool { public async $setMempool(mempoolData: { [txId: string]: MempoolTransactionExtended }) { this.mempoolCache = mempoolData; let count = 0; + const redisTimer = Date.now(); + if (config.MEMPOOL.CACHE_ENABLED && config.REDIS.ENABLED) { + logger.debug(`Migrating ${Object.keys(this.mempoolCache).length} transactions from disk cache to Redis cache`); + } for (const txid of Object.keys(this.mempoolCache)) { if (!this.mempoolCache[txid].sigops || this.mempoolCache[txid].effectiveFeePerVsize == null) { this.mempoolCache[txid] = transactionUtils.extendMempoolTransaction(this.mempoolCache[txid]); @@ -94,6 +98,13 @@ class Mempool { this.mempoolCache[txid].order = transactionUtils.txidToOrdering(txid); } count++; + if (config.MEMPOOL.CACHE_ENABLED && config.REDIS.ENABLED) { + await redisCache.$addTransaction(this.mempoolCache[txid]); + } + } + if (config.MEMPOOL.CACHE_ENABLED && config.REDIS.ENABLED) { + await redisCache.$flushTransactions(); + logger.debug(`Finished migrating cache transactions in ${((Date.now() - redisTimer) / 1000).toFixed(2)} seconds`); } if (this.mempoolChangedCallback) { this.mempoolChangedCallback(this.mempoolCache, [], []); @@ -102,10 +113,6 @@ class Mempool { await this.$asyncMempoolChangedCallback(this.mempoolCache, count, [], []); } this.addToSpendMap(Object.values(this.mempoolCache)); - if (config.MEMPOOL.CACHE_ENABLED && config.REDIS.ENABLED) { - logger.debug('copying mempool from disk cache into Redis'); - await redisCache.$addTransactions(Object.values(mempoolData)); - } } public async $reloadMempool(expectedCount: number): Promise { @@ -212,6 +219,7 @@ class Mempool { logger.info(`Missing ${transactions.length - currentMempoolSize} mempool transactions, attempting to reload in bulk from esplora`); try { newTransactions = await this.$reloadMempool(transactions.length); + redisCache.$addTransactions(newTransactions); loaded = true; } catch (e) { logger.err('failed to load mempool in bulk from esplora, falling back to fetching individual transactions'); @@ -234,6 +242,10 @@ class Mempool { } hasChange = true; newTransactions.push(transaction); + + if (config.REDIS.ENABLED) { + await redisCache.$addTransaction(transaction); + } } catch (e: any) { if (config.MEMPOOL.BACKEND === 'esplora' && e.response?.status === 404) { this.missingTxCount++; @@ -324,7 +336,7 @@ class Mempool { // Update Redis cache if (config.REDIS.ENABLED) { - await redisCache.$addTransactions(newTransactions); + await redisCache.$flushTransactions(); await redisCache.$removeTransactions(deletedTransactions.map(tx => tx.txid)); await rbfCache.updateCache(); } diff --git a/backend/src/api/redis-cache.ts b/backend/src/api/redis-cache.ts index e89b2302d..4b3c956c0 100644 --- a/backend/src/api/redis-cache.ts +++ b/backend/src/api/redis-cache.ts @@ -10,6 +10,9 @@ class RedisCache { private client; private connected = false; + private cacheQueue: MempoolTransactionExtended[] = []; + private txFlushLimit: number = 1000; + constructor() { if (config.REDIS.ENABLED) { const redisConfig = { @@ -52,6 +55,18 @@ class RedisCache { } } + async $addTransaction(tx: MempoolTransactionExtended) { + this.cacheQueue.push(tx); + if (this.cacheQueue.length > this.txFlushLimit) { + await this.$flushTransactions(); + } + } + + async $flushTransactions() { + await this.$addTransactions(this.cacheQueue); + this.cacheQueue = []; + } + async $addTransactions(newTransactions: MempoolTransactionExtended[]) { try { await this.$ensureConnected(); @@ -118,6 +133,7 @@ class RedisCache { await this.$ensureConnected(); const keys = await this.client.keys('tx:*'); const promises: Promise[] = []; + let returned = 0; for (let i = 0; i < keys.length; i += 10000) { const keySlice = keys.slice(i, i + 10000); if (!keySlice.length) { @@ -131,6 +147,8 @@ class RedisCache { } } } + logger.info(`Loaded ${(returned * 10000) + (chunk.length)}/${keys.length} transactions from Redis cache`); + returned++; })); } await Promise.all(promises); From a9f8bbbcce197dfb62b4a1ad25930942ab861f6b Mon Sep 17 00:00:00 2001 From: Mononaut Date: Fri, 16 Jun 2023 19:00:52 -0400 Subject: [PATCH 05/10] Add network and schema versioning to redis cache --- backend/src/api/redis-cache.ts | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/backend/src/api/redis-cache.ts b/backend/src/api/redis-cache.ts index 4b3c956c0..facbafd34 100644 --- a/backend/src/api/redis-cache.ts +++ b/backend/src/api/redis-cache.ts @@ -6,9 +6,18 @@ import config from '../config'; import { BlockExtended, BlockSummary, MempoolTransactionExtended } from '../mempool.interfaces'; import rbfCache from './rbf-cache'; +enum NetworkDB { + mainnet = 0, + testnet, + signet, + liquid, + liquidtestnet, +} + class RedisCache { private client; private connected = false; + private schemaVersion = 1; private cacheQueue: MempoolTransactionExtended[] = []; private txFlushLimit: number = 1000; @@ -18,7 +27,8 @@ class RedisCache { const redisConfig = { socket: { path: config.REDIS.UNIX_SOCKET_PATH - } + }, + database: NetworkDB[config.MEMPOOL.NETWORK], }; this.client = createClient(redisConfig); this.client.on('error', (e) => { @@ -30,9 +40,16 @@ class RedisCache { private async $ensureConnected(): Promise { if (!this.connected && config.REDIS.ENABLED) { - return this.client.connect().then(() => { + return this.client.connect().then(async () => { this.connected = true; logger.info(`Redis client connected`); + const version = await this.client.get('schema_version'); + if (version !== this.schemaVersion) { + // schema changed + // perform migrations or flush DB if necessary + logger.info(`Redis schema version changed from ${version} to ${this.schemaVersion}`); + await this.client.set('schema_version', this.schemaVersion); + } }); } } @@ -63,18 +80,22 @@ class RedisCache { } async $flushTransactions() { - await this.$addTransactions(this.cacheQueue); - this.cacheQueue = []; + const success = await this.$addTransactions(this.cacheQueue); + if (success) { + this.cacheQueue = []; + } } - async $addTransactions(newTransactions: MempoolTransactionExtended[]) { + async $addTransactions(newTransactions: MempoolTransactionExtended[]): Promise { try { await this.$ensureConnected(); await Promise.all(newTransactions.map(tx => { return this.client.json.set('tx:' + tx.txid, '$', tx); })); + return true; } catch (e) { logger.warn(`Failed to add ${newTransactions.length} transactions to Redis cache: ${e instanceof Error ? e.message : e}`); + return false; } } From 6ac58f2da791fe478e42f484c9ef1cd06b157cf3 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Thu, 13 Jul 2023 09:25:05 +0900 Subject: [PATCH 06/10] store redis mempool in sharded json object --- backend/src/api/redis-cache.ts | 43 +++++++++++++++------------------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/backend/src/api/redis-cache.ts b/backend/src/api/redis-cache.ts index facbafd34..8f7d54606 100644 --- a/backend/src/api/redis-cache.ts +++ b/backend/src/api/redis-cache.ts @@ -35,6 +35,13 @@ class RedisCache { logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`); }); this.$ensureConnected(); + this.client.exists('mempool:0').then((mempoolExists) => { + if (!mempoolExists) { + for (let i = 0; i < 16; i++) { + this.client.json.set(`mempool:${i.toString(16)}`, '$', {}); + } + } + }); } } @@ -90,7 +97,7 @@ class RedisCache { try { await this.$ensureConnected(); await Promise.all(newTransactions.map(tx => { - return this.client.json.set('tx:' + tx.txid, '$', tx); + return this.client.json.set(`mempool:${tx.txid.slice(0,1)}`, tx.txid, tx); })); return true; } catch (e) { @@ -103,7 +110,7 @@ class RedisCache { try { await this.$ensureConnected(); await Promise.all(transactions.map(txid => { - return this.client.del('tx:' + txid); + return this.client.json.del(`mempool:${txid.slice(0,1)}`, txid); })); } catch (e) { logger.warn(`Failed to remove ${transactions.length} transactions from Redis cache: ${e instanceof Error ? e.message : e}`); @@ -149,34 +156,22 @@ class RedisCache { } async $getMempool(): Promise<{ [txid: string]: MempoolTransactionExtended }> { - const mempool = {}; + const start = Date.now(); + let mempool = {}; try { await this.$ensureConnected(); - const keys = await this.client.keys('tx:*'); - const promises: Promise[] = []; - let returned = 0; - for (let i = 0; i < keys.length; i += 10000) { - const keySlice = keys.slice(i, i + 10000); - if (!keySlice.length) { - continue; - } - promises.push(this.client.json.mGet(keySlice, '$').then(chunk => { - for (const txs of chunk) { - for (const tx of txs) { - if (tx) { - mempool[tx.txid] = tx; - } - } - } - logger.info(`Loaded ${(returned * 10000) + (chunk.length)}/${keys.length} transactions from Redis cache`); - returned++; - })); + for (let i = 0; i < 16; i++) { + const shard = await this.client.json.get(`mempool:${i.toString(16)}`); + logger.info(`Loaded ${Object.keys(shard).length} transactions from redis cache ${i.toString(16)}`); + mempool = Object.assign(mempool, shard); } - await Promise.all(promises); + logger.info(`Total ${Object.keys(mempool).length} transactions loaded from redis cache `); + logger.info(`Loaded redis cache in ${Date.now() - start} ms`); + return mempool || {}; } catch (e) { logger.warn(`Failed to retrieve mempool from Redis cache: ${e instanceof Error ? e.message : e}`); } - return mempool; + return {}; } async $getRbfEntries(type: string): Promise { From a393f42b5eada9cf7cd7e01e79d9d9c13e8887a0 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Tue, 25 Jul 2023 16:35:21 +0900 Subject: [PATCH 07/10] strip non-essential data from redis cache txs --- backend/src/api/bitcoin/bitcoin-api.ts | 138 +++---------------------- backend/src/api/mempool.ts | 8 +- backend/src/api/redis-cache.ts | 39 ++++++- backend/src/api/transaction-utils.ts | 116 +++++++++++++++++++++ 4 files changed, 172 insertions(+), 129 deletions(-) diff --git a/backend/src/api/bitcoin/bitcoin-api.ts b/backend/src/api/bitcoin/bitcoin-api.ts index a1cf767d9..132cda91a 100644 --- a/backend/src/api/bitcoin/bitcoin-api.ts +++ b/backend/src/api/bitcoin/bitcoin-api.ts @@ -5,6 +5,7 @@ import { IEsploraApi } from './esplora-api.interface'; import blocks from '../blocks'; import mempool from '../mempool'; import { TransactionExtended } from '../../mempool.interfaces'; +import transactionUtils from '../transaction-utils'; class BitcoinApi implements AbstractBitcoinApi { private rawMempoolCache: IBitcoinApi.RawMempool | null = null; @@ -63,9 +64,16 @@ class BitcoinApi implements AbstractBitcoinApi { return Promise.resolve([]); } - $getTransactionHex(txId: string): Promise { - return this.$getRawTransaction(txId, true) - .then((tx) => tx.hex || ''); + async $getTransactionHex(txId: string): Promise { + const txInMempool = mempool.getMempool()[txId]; + if (txInMempool && txInMempool.hex) { + return txInMempool.hex; + } + + return this.bitcoindClient.getRawTransaction(txId, true) + .then((transaction: IBitcoinApi.Transaction) => { + return transaction.hex; + }); } $getBlockHeightTip(): Promise { @@ -209,7 +217,7 @@ class BitcoinApi implements AbstractBitcoinApi { scriptpubkey: vout.scriptPubKey.hex, scriptpubkey_address: vout.scriptPubKey && vout.scriptPubKey.address ? vout.scriptPubKey.address : vout.scriptPubKey.addresses ? vout.scriptPubKey.addresses[0] : '', - scriptpubkey_asm: vout.scriptPubKey.asm ? this.convertScriptSigAsm(vout.scriptPubKey.hex) : '', + scriptpubkey_asm: vout.scriptPubKey.asm ? transactionUtils.convertScriptSigAsm(vout.scriptPubKey.hex) : '', scriptpubkey_type: this.translateScriptPubKeyType(vout.scriptPubKey.type), }; }); @@ -219,7 +227,7 @@ class BitcoinApi implements AbstractBitcoinApi { is_coinbase: !!vin.coinbase, prevout: null, scriptsig: vin.scriptSig && vin.scriptSig.hex || vin.coinbase || '', - scriptsig_asm: vin.scriptSig && this.convertScriptSigAsm(vin.scriptSig.hex) || '', + scriptsig_asm: vin.scriptSig && transactionUtils.convertScriptSigAsm(vin.scriptSig.hex) || '', sequence: vin.sequence, txid: vin.txid || '', vout: vin.vout || 0, @@ -291,7 +299,7 @@ class BitcoinApi implements AbstractBitcoinApi { } const innerTx = await this.$getRawTransaction(vin.txid, false, false); vin.prevout = innerTx.vout[vin.vout]; - this.addInnerScriptsToVin(vin); + transactionUtils.addInnerScriptsToVin(vin); } return transaction; } @@ -330,7 +338,7 @@ class BitcoinApi implements AbstractBitcoinApi { } const innerTx = await this.$getRawTransaction(transaction.vin[i].txid, false, false); transaction.vin[i].prevout = innerTx.vout[transaction.vin[i].vout]; - this.addInnerScriptsToVin(transaction.vin[i]); + transactionUtils.addInnerScriptsToVin(transaction.vin[i]); totalIn += innerTx.vout[transaction.vin[i].vout].value; } if (lazyPrevouts && transaction.vin.length > 12) { @@ -342,122 +350,6 @@ class BitcoinApi implements AbstractBitcoinApi { return transaction; } - private convertScriptSigAsm(hex: string): string { - const buf = Buffer.from(hex, 'hex'); - - const b: string[] = []; - - let i = 0; - while (i < buf.length) { - const op = buf[i]; - if (op >= 0x01 && op <= 0x4e) { - i++; - let push: number; - if (op === 0x4c) { - push = buf.readUInt8(i); - b.push('OP_PUSHDATA1'); - i += 1; - } else if (op === 0x4d) { - push = buf.readUInt16LE(i); - b.push('OP_PUSHDATA2'); - i += 2; - } else if (op === 0x4e) { - push = buf.readUInt32LE(i); - b.push('OP_PUSHDATA4'); - i += 4; - } else { - push = op; - b.push('OP_PUSHBYTES_' + push); - } - - const data = buf.slice(i, i + push); - if (data.length !== push) { - break; - } - - b.push(data.toString('hex')); - i += data.length; - } else { - if (op === 0x00) { - b.push('OP_0'); - } else if (op === 0x4f) { - b.push('OP_PUSHNUM_NEG1'); - } else if (op === 0xb1) { - b.push('OP_CLTV'); - } else if (op === 0xb2) { - b.push('OP_CSV'); - } else if (op === 0xba) { - b.push('OP_CHECKSIGADD'); - } else { - const opcode = bitcoinjs.script.toASM([ op ]); - if (opcode && op < 0xfd) { - if (/^OP_(\d+)$/.test(opcode)) { - b.push(opcode.replace(/^OP_(\d+)$/, 'OP_PUSHNUM_$1')); - } else { - b.push(opcode); - } - } else { - b.push('OP_RETURN_' + op); - } - } - i += 1; - } - } - - return b.join(' '); - } - - private addInnerScriptsToVin(vin: IEsploraApi.Vin): void { - if (!vin.prevout) { - return; - } - - if (vin.prevout.scriptpubkey_type === 'p2sh') { - const redeemScript = vin.scriptsig_asm.split(' ').reverse()[0]; - vin.inner_redeemscript_asm = this.convertScriptSigAsm(redeemScript); - if (vin.witness && vin.witness.length > 2) { - const witnessScript = vin.witness[vin.witness.length - 1]; - vin.inner_witnessscript_asm = this.convertScriptSigAsm(witnessScript); - } - } - - if (vin.prevout.scriptpubkey_type === 'v0_p2wsh' && vin.witness) { - const witnessScript = vin.witness[vin.witness.length - 1]; - vin.inner_witnessscript_asm = this.convertScriptSigAsm(witnessScript); - } - - if (vin.prevout.scriptpubkey_type === 'v1_p2tr' && vin.witness) { - const witnessScript = this.witnessToP2TRScript(vin.witness); - if (witnessScript !== null) { - vin.inner_witnessscript_asm = this.convertScriptSigAsm(witnessScript); - } - } - } - - /** - * This function must only be called when we know the witness we are parsing - * is a taproot witness. - * @param witness An array of hex strings that represents the witness stack of - * the input. - * @returns null if the witness is not a script spend, and the hex string of - * the script item if it is a script spend. - */ - private witnessToP2TRScript(witness: string[]): string | null { - if (witness.length < 2) return null; - // Note: see BIP341 for parsing details of witness stack - - // If there are at least two witness elements, and the first byte of the - // last element is 0x50, this last element is called annex a and - // is removed from the witness stack. - const hasAnnex = witness[witness.length - 1].substring(0, 2) === '50'; - // If there are at least two witness elements left, script path spending is used. - // Call the second-to-last stack element s, the script. - // (Note: this phrasing from BIP341 assumes we've *removed* the annex from the stack) - if (hasAnnex && witness.length < 3) return null; - const positionOfScript = hasAnnex ? witness.length - 3 : witness.length - 2; - return witness[positionOfScript]; - } - } export default BitcoinApi; diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index 73a6fdfeb..d5214de5d 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -149,8 +149,8 @@ class Mempool { logger.err('failed to fetch bulk mempool transactions from esplora'); } } - return newTransactions; logger.info(`Done inserting loaded mempool transactions into local cache`); + return newTransactions; } public async $updateMemPoolInfo() { @@ -219,7 +219,11 @@ class Mempool { logger.info(`Missing ${transactions.length - currentMempoolSize} mempool transactions, attempting to reload in bulk from esplora`); try { newTransactions = await this.$reloadMempool(transactions.length); - redisCache.$addTransactions(newTransactions); + if (config.REDIS.ENABLED) { + for (const tx of newTransactions) { + await redisCache.$addTransaction(tx); + } + } loaded = true; } catch (e) { logger.err('failed to load mempool in bulk from esplora, falling back to fetching individual transactions'); diff --git a/backend/src/api/redis-cache.ts b/backend/src/api/redis-cache.ts index 8f7d54606..540467caf 100644 --- a/backend/src/api/redis-cache.ts +++ b/backend/src/api/redis-cache.ts @@ -5,6 +5,7 @@ import logger from '../logger'; import config from '../config'; import { BlockExtended, BlockSummary, MempoolTransactionExtended } from '../mempool.interfaces'; import rbfCache from './rbf-cache'; +import transactionUtils from './transaction-utils'; enum NetworkDB { mainnet = 0, @@ -20,7 +21,7 @@ class RedisCache { private schemaVersion = 1; private cacheQueue: MempoolTransactionExtended[] = []; - private txFlushLimit: number = 1000; + private txFlushLimit: number = 10000; constructor() { if (config.REDIS.ENABLED) { @@ -81,7 +82,7 @@ class RedisCache { async $addTransaction(tx: MempoolTransactionExtended) { this.cacheQueue.push(tx); - if (this.cacheQueue.length > this.txFlushLimit) { + if (this.cacheQueue.length >= this.txFlushLimit) { await this.$flushTransactions(); } } @@ -89,15 +90,28 @@ class RedisCache { async $flushTransactions() { const success = await this.$addTransactions(this.cacheQueue); if (success) { + logger.info(`Flushed ${this.cacheQueue.length} transactions to Redis cache`); this.cacheQueue = []; + } else { + logger.err(`Failed to flush ${this.cacheQueue.length} transactions to Redis cache`); } } - async $addTransactions(newTransactions: MempoolTransactionExtended[]): Promise { + private async $addTransactions(newTransactions: MempoolTransactionExtended[]): Promise { try { await this.$ensureConnected(); await Promise.all(newTransactions.map(tx => { - return this.client.json.set(`mempool:${tx.txid.slice(0,1)}`, tx.txid, tx); + const minified: any = { ...tx }; + delete minified.hex; + for (const vin of minified.vin) { + delete vin.inner_redeemscript_asm; + delete vin.inner_witnessscript_asm; + delete vin.scriptsig_asm; + } + for (const vout of minified.vout) { + delete vout.scriptpubkey_asm; + } + return this.client.json.set(`mempool:${tx.txid.slice(0,1)}`, tx.txid, minified); })); return true; } catch (e) { @@ -201,6 +215,7 @@ class RedisCache { const loadedBlockSummaries = await this.$getBlockSummaries(); // Load mempool const loadedMempool = await this.$getMempool(); + this.inflateLoadedTxs(loadedMempool); // Load rbf data const rbfTxs = await this.$getRbfEntries('tx'); const rbfTrees = await this.$getRbfEntries('tree'); @@ -217,6 +232,22 @@ class RedisCache { }); } + private inflateLoadedTxs(mempool: { [txid: string]: MempoolTransactionExtended }) { + for (const tx of Object.values(mempool)) { + for (const vin of tx.vin) { + if (vin.scriptsig) { + vin.scriptsig_asm = transactionUtils.convertScriptSigAsm(vin.scriptsig); + transactionUtils.addInnerScriptsToVin(vin); + } + } + for (const vout of tx.vout) { + if (vout.scriptpubkey) { + vout.scriptpubkey_asm = transactionUtils.convertScriptSigAsm(vout.scriptpubkey); + } + } + } + } + } export default new RedisCache(); diff --git a/backend/src/api/transaction-utils.ts b/backend/src/api/transaction-utils.ts index 009fe1dde..e141a6076 100644 --- a/backend/src/api/transaction-utils.ts +++ b/backend/src/api/transaction-utils.ts @@ -188,6 +188,122 @@ class TransactionUtils { 16 ); } + + public addInnerScriptsToVin(vin: IEsploraApi.Vin): void { + if (!vin.prevout) { + return; + } + + if (vin.prevout.scriptpubkey_type === 'p2sh') { + const redeemScript = vin.scriptsig_asm.split(' ').reverse()[0]; + vin.inner_redeemscript_asm = this.convertScriptSigAsm(redeemScript); + if (vin.witness && vin.witness.length > 2) { + const witnessScript = vin.witness[vin.witness.length - 1]; + vin.inner_witnessscript_asm = this.convertScriptSigAsm(witnessScript); + } + } + + if (vin.prevout.scriptpubkey_type === 'v0_p2wsh' && vin.witness) { + const witnessScript = vin.witness[vin.witness.length - 1]; + vin.inner_witnessscript_asm = this.convertScriptSigAsm(witnessScript); + } + + if (vin.prevout.scriptpubkey_type === 'v1_p2tr' && vin.witness) { + const witnessScript = this.witnessToP2TRScript(vin.witness); + if (witnessScript !== null) { + vin.inner_witnessscript_asm = this.convertScriptSigAsm(witnessScript); + } + } + } + + public convertScriptSigAsm(hex: string): string { + const buf = Buffer.from(hex, 'hex'); + + const b: string[] = []; + + let i = 0; + while (i < buf.length) { + const op = buf[i]; + if (op >= 0x01 && op <= 0x4e) { + i++; + let push: number; + if (op === 0x4c) { + push = buf.readUInt8(i); + b.push('OP_PUSHDATA1'); + i += 1; + } else if (op === 0x4d) { + push = buf.readUInt16LE(i); + b.push('OP_PUSHDATA2'); + i += 2; + } else if (op === 0x4e) { + push = buf.readUInt32LE(i); + b.push('OP_PUSHDATA4'); + i += 4; + } else { + push = op; + b.push('OP_PUSHBYTES_' + push); + } + + const data = buf.slice(i, i + push); + if (data.length !== push) { + break; + } + + b.push(data.toString('hex')); + i += data.length; + } else { + if (op === 0x00) { + b.push('OP_0'); + } else if (op === 0x4f) { + b.push('OP_PUSHNUM_NEG1'); + } else if (op === 0xb1) { + b.push('OP_CLTV'); + } else if (op === 0xb2) { + b.push('OP_CSV'); + } else if (op === 0xba) { + b.push('OP_CHECKSIGADD'); + } else { + const opcode = bitcoinjs.script.toASM([ op ]); + if (opcode && op < 0xfd) { + if (/^OP_(\d+)$/.test(opcode)) { + b.push(opcode.replace(/^OP_(\d+)$/, 'OP_PUSHNUM_$1')); + } else { + b.push(opcode); + } + } else { + b.push('OP_RETURN_' + op); + } + } + i += 1; + } + } + + return b.join(' '); + } + + /** + * This function must only be called when we know the witness we are parsing + * is a taproot witness. + * @param witness An array of hex strings that represents the witness stack of + * the input. + * @returns null if the witness is not a script spend, and the hex string of + * the script item if it is a script spend. + */ + public witnessToP2TRScript(witness: string[]): string | null { + if (witness.length < 2) return null; + // Note: see BIP341 for parsing details of witness stack + + // If there are at least two witness elements, and the first byte of the + // last element is 0x50, this last element is called annex a and + // is removed from the witness stack. + const hasAnnex = witness[witness.length - 1].substring(0, 2) === '50'; + // If there are at least two witness elements left, script path spending is used. + // Call the second-to-last stack element s, the script. + // (Note: this phrasing from BIP341 assumes we've *removed* the annex from the stack) + if (hasAnnex && witness.length < 3) return null; + const positionOfScript = hasAnnex ? witness.length - 3 : witness.length - 2; + return witness[positionOfScript]; + } } export default new TransactionUtils(); From c79a597c96a4048d5710ec545561ac87448263c7 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Sun, 30 Jul 2023 16:01:03 +0900 Subject: [PATCH 08/10] switch from redis-json to simple key-value redis entries --- backend/src/api/disk-cache.ts | 3 ++ backend/src/api/rbf-cache.ts | 6 +-- backend/src/api/redis-cache.ts | 93 ++++++++++++++++++++-------------- 3 files changed, 62 insertions(+), 40 deletions(-) diff --git a/backend/src/api/disk-cache.ts b/backend/src/api/disk-cache.ts index 04328a72a..6f603489a 100644 --- a/backend/src/api/disk-cache.ts +++ b/backend/src/api/disk-cache.ts @@ -179,6 +179,7 @@ class DiskCache { return; } try { + const start = Date.now(); let data: any = {}; const cacheData = fs.readFileSync(DiskCache.FILE_NAME, 'utf8'); if (cacheData) { @@ -220,6 +221,8 @@ class DiskCache { } } + logger.info(`Loaded mempool from disk cache in ${Date.now() - start} ms`); + await memPool.$setMempool(data.mempool); if (!this.ignoreBlocksCache) { blocks.setBlocks(data.blocks); diff --git a/backend/src/api/rbf-cache.ts b/backend/src/api/rbf-cache.ts index b5ae74072..b5592252c 100644 --- a/backend/src/api/rbf-cache.ts +++ b/backend/src/api/rbf-cache.ts @@ -360,14 +360,14 @@ class RbfCache { public async load({ txs, trees, expiring }): Promise { txs.forEach(txEntry => { - this.txs.set(txEntry[0], txEntry[1]); + this.txs.set(txEntry.key, txEntry.value); }); for (const deflatedTree of trees) { await this.importTree(deflatedTree.root, deflatedTree.root, deflatedTree, this.txs); } expiring.forEach(expiringEntry => { - if (this.txs.has(expiringEntry[0])) { - this.expiring.set(expiringEntry[0], new Date(expiringEntry[1]).getTime()); + if (this.txs.has(expiringEntry.key)) { + this.expiring.set(expiringEntry.key, new Date(expiringEntry.value).getTime()); } }); this.cleanup(); diff --git a/backend/src/api/redis-cache.ts b/backend/src/api/redis-cache.ts index 540467caf..4a1375419 100644 --- a/backend/src/api/redis-cache.ts +++ b/backend/src/api/redis-cache.ts @@ -36,13 +36,6 @@ class RedisCache { logger.err(`Error in Redis client: ${e instanceof Error ? e.message : e}`); }); this.$ensureConnected(); - this.client.exists('mempool:0').then((mempoolExists) => { - if (!mempoolExists) { - for (let i = 0; i < 16; i++) { - this.client.json.set(`mempool:${i.toString(16)}`, '$', {}); - } - } - }); } } @@ -65,7 +58,7 @@ class RedisCache { async $updateBlocks(blocks: BlockExtended[]) { try { await this.$ensureConnected(); - await this.client.json.set('blocks', '$', blocks); + await this.client.set('blocks', JSON.stringify(blocks)); } catch (e) { logger.warn(`Failed to update blocks in Redis cache: ${e instanceof Error ? e.message : e}`); } @@ -74,7 +67,7 @@ class RedisCache { async $updateBlockSummaries(summaries: BlockSummary[]) { try { await this.$ensureConnected(); - await this.client.json.set('block-summaries', '$', summaries); + await this.client.set('block-summaries', JSON.stringify(summaries)); } catch (e) { logger.warn(`Failed to update blocks in Redis cache: ${e instanceof Error ? e.message : e}`); } @@ -98,9 +91,12 @@ class RedisCache { } private async $addTransactions(newTransactions: MempoolTransactionExtended[]): Promise { + if (!newTransactions.length) { + return true; + } try { await this.$ensureConnected(); - await Promise.all(newTransactions.map(tx => { + const msetData = newTransactions.map(tx => { const minified: any = { ...tx }; delete minified.hex; for (const vin of minified.vin) { @@ -111,8 +107,9 @@ class RedisCache { for (const vout of minified.vout) { delete vout.scriptpubkey_asm; } - return this.client.json.set(`mempool:${tx.txid.slice(0,1)}`, tx.txid, minified); - })); + return [`mempool:tx:${tx.txid}`, JSON.stringify(minified)]; + }); + await this.client.MSET(msetData); return true; } catch (e) { logger.warn(`Failed to add ${newTransactions.length} transactions to Redis cache: ${e instanceof Error ? e.message : e}`); @@ -123,9 +120,9 @@ class RedisCache { async $removeTransactions(transactions: string[]) { try { await this.$ensureConnected(); - await Promise.all(transactions.map(txid => { - return this.client.json.del(`mempool:${txid.slice(0,1)}`, txid); - })); + for (let i = 0; i < Math.ceil(transactions.length / 1000); i++) { + await this.client.del(transactions.slice(i * 1000, (i + 1) * 1000).map(txid => `mempool:tx:${txid}`)); + } } catch (e) { logger.warn(`Failed to remove ${transactions.length} transactions from Redis cache: ${e instanceof Error ? e.message : e}`); } @@ -134,7 +131,7 @@ class RedisCache { async $setRbfEntry(type: string, txid: string, value: any): Promise { try { await this.$ensureConnected(); - await this.client.json.set(`rbf:${type}:${txid}`, '$', value); + await this.client.set(`rbf:${type}:${txid}`, JSON.stringify(value)); } catch (e) { logger.warn(`Failed to set RBF ${type} in Redis cache: ${e instanceof Error ? e.message : e}`); } @@ -152,7 +149,8 @@ class RedisCache { async $getBlocks(): Promise { try { await this.$ensureConnected(); - return this.client.json.get('blocks'); + const json = await this.client.get('blocks'); + return JSON.parse(json); } catch (e) { logger.warn(`Failed to retrieve blocks from Redis cache: ${e instanceof Error ? e.message : e}`); return []; @@ -162,7 +160,8 @@ class RedisCache { async $getBlockSummaries(): Promise { try { await this.$ensureConnected(); - return this.client.json.get('block-summaries'); + const json = await this.client.get('block-summaries'); + return JSON.parse(json); } catch (e) { logger.warn(`Failed to retrieve blocks from Redis cache: ${e instanceof Error ? e.message : e}`); return []; @@ -171,16 +170,14 @@ class RedisCache { async $getMempool(): Promise<{ [txid: string]: MempoolTransactionExtended }> { const start = Date.now(); - let mempool = {}; + const mempool = {}; try { await this.$ensureConnected(); - for (let i = 0; i < 16; i++) { - const shard = await this.client.json.get(`mempool:${i.toString(16)}`); - logger.info(`Loaded ${Object.keys(shard).length} transactions from redis cache ${i.toString(16)}`); - mempool = Object.assign(mempool, shard); + const mempoolList = await this.scanKeys('mempool:tx:*'); + for (const tx of mempoolList) { + mempool[tx.key] = tx.value; } - logger.info(`Total ${Object.keys(mempool).length} transactions loaded from redis cache `); - logger.info(`Loaded redis cache in ${Date.now() - start} ms`); + logger.info(`Loaded mempool from Redis cache in ${Date.now() - start} ms`); return mempool || {}; } catch (e) { logger.warn(`Failed to retrieve mempool from Redis cache: ${e instanceof Error ? e.message : e}`); @@ -191,17 +188,8 @@ class RedisCache { async $getRbfEntries(type: string): Promise { try { await this.$ensureConnected(); - const keys = await this.client.keys(`rbf:${type}:*`); - const promises: Promise[] = []; - for (let i = 0; i < keys.length; i += 10000) { - const keySlice = keys.slice(i, i + 10000); - if (!keySlice.length) { - continue; - } - promises.push(this.client.json.mGet(keySlice, '$').then(chunk => chunk?.length ? chunk.flat().map((v, i) => [keySlice[i].slice(`rbf:${type}:`.length), v]) : [] )); - } - const entries = await Promise.all(promises); - return entries.flat(); + const rbfEntries = await this.scanKeys(`rbf:${type}:*`); + return rbfEntries; } catch (e) { logger.warn(`Failed to retrieve Rbf ${type}s from Redis cache: ${e instanceof Error ? e.message : e}`); return []; @@ -227,7 +215,7 @@ class RedisCache { await memPool.$setMempool(loadedMempool); await rbfCache.load({ txs: rbfTxs, - trees: rbfTrees.map(loadedTree => loadedTree[1]), + trees: rbfTrees.map(loadedTree => loadedTree.value), expiring: rbfExpirations, }); } @@ -248,6 +236,37 @@ class RedisCache { } } + private async scanKeys(pattern): Promise<{ key: string, value: T }[]> { + logger.info(`loading Redis entries for ${pattern}`); + let keys: string[] = []; + const result: { key: string, value: T }[] = []; + const patternLength = pattern.length - 1; + let count = 0; + const processValues = async (keys): Promise => { + const values = await this.client.MGET(keys); + for (let i = 0; i < values.length; i++) { + if (values[i]) { + result.push({ key: keys[i].slice(patternLength), value: JSON.parse(values[i]) }); + count++; + } + } + logger.info(`loaded ${count} entries from Redis cache`); + }; + for await (const key of this.client.scanIterator({ + MATCH: pattern, + COUNT: 100 + })) { + keys.push(key); + if (keys.length >= 10000) { + await processValues(keys); + keys = []; + } + } + if (keys.length) { + await processValues(keys); + } + return result; + } } export default new RedisCache(); From dcfab218fbee5affecb4ffedb2b9827dada6232c Mon Sep 17 00:00:00 2001 From: Mononaut Date: Mon, 31 Jul 2023 12:19:28 +0900 Subject: [PATCH 09/10] Improve Redis logging --- backend/src/api/redis-cache.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/backend/src/api/redis-cache.ts b/backend/src/api/redis-cache.ts index 4a1375419..0c256a698 100644 --- a/backend/src/api/redis-cache.ts +++ b/backend/src/api/redis-cache.ts @@ -59,6 +59,7 @@ class RedisCache { try { await this.$ensureConnected(); await this.client.set('blocks', JSON.stringify(blocks)); + logger.debug(`Saved latest blocks to Redis cache`); } catch (e) { logger.warn(`Failed to update blocks in Redis cache: ${e instanceof Error ? e.message : e}`); } @@ -68,8 +69,9 @@ class RedisCache { try { await this.$ensureConnected(); await this.client.set('block-summaries', JSON.stringify(summaries)); + logger.debug(`Saved latest block summaries to Redis cache`); } catch (e) { - logger.warn(`Failed to update blocks in Redis cache: ${e instanceof Error ? e.message : e}`); + logger.warn(`Failed to update block summaries in Redis cache: ${e instanceof Error ? e.message : e}`); } } @@ -83,10 +85,10 @@ class RedisCache { async $flushTransactions() { const success = await this.$addTransactions(this.cacheQueue); if (success) { - logger.info(`Flushed ${this.cacheQueue.length} transactions to Redis cache`); + logger.debug(`Saved ${this.cacheQueue.length} transactions to Redis cache`); this.cacheQueue = []; } else { - logger.err(`Failed to flush ${this.cacheQueue.length} transactions to Redis cache`); + logger.err(`Failed to save ${this.cacheQueue.length} transactions to Redis cache`); } } @@ -122,6 +124,7 @@ class RedisCache { await this.$ensureConnected(); for (let i = 0; i < Math.ceil(transactions.length / 1000); i++) { await this.client.del(transactions.slice(i * 1000, (i + 1) * 1000).map(txid => `mempool:tx:${txid}`)); + logger.info(`Deleted ${transactions.length} transactions from the Redis cache`); } } catch (e) { logger.warn(`Failed to remove ${transactions.length} transactions from Redis cache: ${e instanceof Error ? e.message : e}`); From 73b71c491411e3ab2a09019e1418ac88e0b294d9 Mon Sep 17 00:00:00 2001 From: softsimon Date: Mon, 31 Jul 2023 14:28:56 +0900 Subject: [PATCH 10/10] Fixing docker config and tests --- backend/mempool-config.sample.json | 1 + docker/backend/mempool-config.json | 5 +++++ docker/backend/start.sh | 9 +++++++++ 3 files changed, 15 insertions(+) diff --git a/backend/mempool-config.sample.json b/backend/mempool-config.sample.json index e3df7d2fe..7948049fc 100644 --- a/backend/mempool-config.sample.json +++ b/backend/mempool-config.sample.json @@ -8,6 +8,7 @@ "API_URL_PREFIX": "/api/v1/", "POLL_RATE_MS": 2000, "CACHE_DIR": "./cache", + "CACHE_ENABLED": true, "CLEAR_PROTECTION_MINUTES": 20, "RECOMMENDED_FEE_PERCENTILE": 50, "BLOCK_WEIGHT_UNITS": 4000000, diff --git a/docker/backend/mempool-config.json b/docker/backend/mempool-config.json index 2ff76d5dd..8b47d53b8 100644 --- a/docker/backend/mempool-config.json +++ b/docker/backend/mempool-config.json @@ -8,6 +8,7 @@ "API_URL_PREFIX": "__MEMPOOL_API_URL_PREFIX__", "POLL_RATE_MS": __MEMPOOL_POLL_RATE_MS__, "CACHE_DIR": "__MEMPOOL_CACHE_DIR__", + "CACHE_ENABLED": __MEMPOOL_CACHE_ENABLED__, "CLEAR_PROTECTION_MINUTES": __MEMPOOL_CLEAR_PROTECTION_MINUTES__, "RECOMMENDED_FEE_PERCENTILE": __MEMPOOL_RECOMMENDED_FEE_PERCENTILE__, "BLOCK_WEIGHT_UNITS": __MEMPOOL_BLOCK_WEIGHT_UNITS__, @@ -133,5 +134,9 @@ "AUDIT": __REPLICATION_AUDIT__, "AUDIT_START_HEIGHT": __REPLICATION_AUDIT_START_HEIGHT__, "SERVERS": __REPLICATION_SERVERS__ + }, + "REDIS": { + "ENABLED": __REDIS_ENABLED__, + "UNIX_SOCKET_PATH": "__REDIS_UNIX_SOCKET_PATH__" } } diff --git a/docker/backend/start.sh b/docker/backend/start.sh index c34d804b4..7071493fa 100755 --- a/docker/backend/start.sh +++ b/docker/backend/start.sh @@ -9,6 +9,7 @@ __MEMPOOL_SPAWN_CLUSTER_PROCS__=${MEMPOOL_SPAWN_CLUSTER_PROCS:=0} __MEMPOOL_API_URL_PREFIX__=${MEMPOOL_API_URL_PREFIX:=/api/v1/} __MEMPOOL_POLL_RATE_MS__=${MEMPOOL_POLL_RATE_MS:=2000} __MEMPOOL_CACHE_DIR__=${MEMPOOL_CACHE_DIR:=./cache} +__MEMPOOL_CACHE_ENABLED__=${MEMPOOL_CACHE_ENABLED:=true} __MEMPOOL_CLEAR_PROTECTION_MINUTES__=${MEMPOOL_CLEAR_PROTECTION_MINUTES:=20} __MEMPOOL_RECOMMENDED_FEE_PERCENTILE__=${MEMPOOL_RECOMMENDED_FEE_PERCENTILE:=50} __MEMPOOL_BLOCK_WEIGHT_UNITS__=${MEMPOOL_BLOCK_WEIGHT_UNITS:=4000000} @@ -136,6 +137,9 @@ __REPLICATION_AUDIT__=${REPLICATION_AUDIT:=true} __REPLICATION_AUDIT_START_HEIGHT__=${REPLICATION_AUDIT_START_HEIGHT:=774000} __REPLICATION_SERVERS__=${REPLICATION_SERVERS:=[]} +# REDIS +__REDIS_ENABLED__=${REDIS_ENABLED:=true} +__REDIS_UNIX_SOCKET_PATH__=${REDIS_UNIX_SOCKET_PATH:=true} mkdir -p "${__MEMPOOL_CACHE_DIR__}" @@ -147,6 +151,7 @@ sed -i "s!__MEMPOOL_SPAWN_CLUSTER_PROCS__!${__MEMPOOL_SPAWN_CLUSTER_PROCS__}!g" sed -i "s!__MEMPOOL_API_URL_PREFIX__!${__MEMPOOL_API_URL_PREFIX__}!g" mempool-config.json sed -i "s!__MEMPOOL_POLL_RATE_MS__!${__MEMPOOL_POLL_RATE_MS__}!g" mempool-config.json sed -i "s!__MEMPOOL_CACHE_DIR__!${__MEMPOOL_CACHE_DIR__}!g" mempool-config.json +sed -i "s!__MEMPOOL_CACHE_ENABLED__!${__MEMPOOL_CACHE_ENABLED__}!g" mempool-config.json sed -i "s!__MEMPOOL_CLEAR_PROTECTION_MINUTES__!${__MEMPOOL_CLEAR_PROTECTION_MINUTES__}!g" mempool-config.json sed -i "s!__MEMPOOL_RECOMMENDED_FEE_PERCENTILE__!${__MEMPOOL_RECOMMENDED_FEE_PERCENTILE__}!g" mempool-config.json sed -i "s!__MEMPOOL_BLOCK_WEIGHT_UNITS__!${__MEMPOOL_BLOCK_WEIGHT_UNITS__}!g" mempool-config.json @@ -262,4 +267,8 @@ sed -i "s!__REPLICATION_AUDIT__!${__REPLICATION_AUDIT__}!g" mempool-config.json sed -i "s!__REPLICATION_AUDIT_START_HEIGHT__!${__REPLICATION_AUDIT_START_HEIGHT__}!g" mempool-config.json sed -i "s!__REPLICATION_SERVERS__!${__REPLICATION_SERVERS__}!g" mempool-config.json +# REDIS +sed -i "s!__REDIS_ENABLED__!${__REDIS_ENABLED__}!g" mempool-config.json +sed -i "s!__REDIS_UNIX_SOCKET_PATH__!${__REDIS_UNIX_SOCKET_PATH__}!g" mempool-config.json + node /backend/package/index.js