diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index fff7ad20e..dc3634a6f 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -326,6 +326,7 @@ class Mempool { if (config.REDIS.ENABLED) { await redisCache.$addTransactions(newTransactions); await redisCache.$removeTransactions(deletedTransactions.map(tx => tx.txid)); + await rbfCache.updateCache(); } const end = new Date().getTime(); diff --git a/backend/src/api/rbf-cache.ts b/backend/src/api/rbf-cache.ts index f28dd0de3..b5ae74072 100644 --- a/backend/src/api/rbf-cache.ts +++ b/backend/src/api/rbf-cache.ts @@ -1,15 +1,17 @@ +import config from "../config"; import logger from "../logger"; import { MempoolTransactionExtended, TransactionStripped } from "../mempool.interfaces"; import bitcoinApi from './bitcoin/bitcoin-api-factory'; import { Common } from "./common"; +import redisCache from "./redis-cache"; -interface RbfTransaction extends TransactionStripped { +export interface RbfTransaction extends TransactionStripped { rbf?: boolean; mined?: boolean; fullRbf?: boolean; } -interface RbfTree { +export interface RbfTree { tx: RbfTransaction; time: number; interval?: number; @@ -28,6 +30,19 @@ export interface ReplacementInfo { newVsize: number; } +enum CacheOp { + Remove = 0, + Add = 1, + Change = 2, +} + +interface CacheEvent { + op: CacheOp; + type: 'tx' | 'tree' | 'exp'; + txid: string, + value?: any, +} + class RbfCache { private replacedBy: Map = new Map(); private replaces: Map = new Map(); @@ -36,11 +51,43 @@ class RbfCache { private treeMap: Map = new Map(); // map of txids to sequence ids private txs: Map = new Map(); private expiring: Map = new Map(); + private cacheQueue: CacheEvent[] = []; constructor() { setInterval(this.cleanup.bind(this), 1000 * 60 * 10); } + private addTx(txid: string, tx: MempoolTransactionExtended): void { + this.txs.set(txid, tx); + this.cacheQueue.push({ op: CacheOp.Add, type: 'tx', txid }); + } + + private addTree(txid: string, tree: RbfTree): void { + this.rbfTrees.set(txid, tree); + this.dirtyTrees.add(txid); + this.cacheQueue.push({ op: CacheOp.Add, type: 'tree', txid }); + } + + private addExpiration(txid: string, expiry: number): void { + this.expiring.set(txid, expiry); + this.cacheQueue.push({ op: CacheOp.Add, type: 'exp', txid, value: expiry }); + } + + private removeTx(txid: string): void { + this.txs.delete(txid); + this.cacheQueue.push({ op: CacheOp.Remove, type: 'tx', txid }); + } + + private removeTree(txid: string): void { + this.rbfTrees.delete(txid); + this.cacheQueue.push({ op: CacheOp.Remove, type: 'tree', txid }); + } + + private removeExpiration(txid: string): void { + this.expiring.delete(txid); + this.cacheQueue.push({ op: CacheOp.Remove, type: 'exp', txid }); + } + public add(replaced: MempoolTransactionExtended[], newTxExtended: MempoolTransactionExtended): void { if (!newTxExtended || !replaced?.length || this.txs.has(newTxExtended.txid)) { return; @@ -49,7 +96,7 @@ class RbfCache { const newTx = Common.stripTransaction(newTxExtended) as RbfTransaction; const newTime = newTxExtended.firstSeen || (Date.now() / 1000); newTx.rbf = newTxExtended.vin.some((v) => v.sequence < 0xfffffffe); - this.txs.set(newTx.txid, newTxExtended); + this.addTx(newTx.txid, newTxExtended); // maintain rbf trees let txFullRbf = false; @@ -66,7 +113,7 @@ class RbfCache { const treeId = this.treeMap.get(replacedTx.txid); if (treeId) { const tree = this.rbfTrees.get(treeId); - this.rbfTrees.delete(treeId); + this.removeTree(treeId); if (tree) { tree.interval = newTime - tree?.time; replacedTrees.push(tree); @@ -83,7 +130,7 @@ class RbfCache { replaces: [], }); treeFullRbf = treeFullRbf || !replacedTx.rbf; - this.txs.set(replacedTx.txid, replacedTxExtended); + this.addTx(replacedTx.txid, replacedTxExtended); } } newTx.fullRbf = txFullRbf; @@ -94,10 +141,9 @@ class RbfCache { fullRbf: treeFullRbf, replaces: replacedTrees }; - this.rbfTrees.set(treeId, newTree); + this.addTree(treeId, newTree); this.updateTreeMap(treeId, newTree); this.replaces.set(newTx.txid, replacedTrees.map(tree => tree.tx.txid)); - this.dirtyTrees.add(treeId); } public has(txId: string): boolean { @@ -191,6 +237,7 @@ class RbfCache { this.setTreeMined(tree, txid); tree.mined = true; this.dirtyTrees.add(treeId); + this.cacheQueue.push({ op: CacheOp.Change, type: 'tree', txid: treeId }); } } this.evict(txid); @@ -199,7 +246,8 @@ class RbfCache { // flag a transaction as removed from the mempool public evict(txid: string, fast: boolean = false): void { if (this.txs.has(txid) && (fast || !this.expiring.has(txid))) { - this.expiring.set(txid, fast ? Date.now() + (1000 * 60 * 10) : Date.now() + (1000 * 86400)); // 24 hours + const expiryTime = fast ? Date.now() + (1000 * 60 * 10) : Date.now() + (1000 * 86400); // 24 hours + this.addExpiration(txid, expiryTime); } } @@ -220,11 +268,11 @@ class RbfCache { const now = Date.now(); for (const txid of this.expiring.keys()) { if ((this.expiring.get(txid) || 0) < now) { - this.expiring.delete(txid); + this.removeExpiration(txid); this.remove(txid); } } - logger.debug(`rbf cache contains ${this.txs.size} txs, ${this.expiring.size} due to expire`); + logger.debug(`rbf cache contains ${this.txs.size} txs, ${this.rbfTrees.size} trees, ${this.expiring.size} due to expire`); } // remove a transaction & all previous versions from the cache @@ -234,14 +282,14 @@ class RbfCache { const replaces = this.replaces.get(txid); this.replaces.delete(txid); this.treeMap.delete(txid); - this.txs.delete(txid); - this.expiring.delete(txid); + this.removeTx(txid); + this.removeExpiration(txid); for (const tx of (replaces || [])) { // recursively remove prior versions from the cache this.replacedBy.delete(tx); // if this is the id of a tree, remove that too if (this.treeMap.get(tx) === tx) { - this.rbfTrees.delete(tx); + this.removeTree(tx); } this.remove(tx); } @@ -273,6 +321,33 @@ class RbfCache { } } + public async updateCache(): Promise { + if (!config.REDIS.ENABLED) { + return; + } + // Update the Redis cache by replaying queued events + for (const e of this.cacheQueue) { + if (e.op === CacheOp.Add || e.op === CacheOp.Change) { + let value = e.value; + switch(e.type) { + case 'tx': { + value = this.txs.get(e.txid); + } break; + case 'tree': { + const tree = this.rbfTrees.get(e.txid); + value = tree ? this.exportTree(tree) : null; + } break; + } + if (value != null) { + await redisCache.$setRbfEntry(e.type, e.txid, value); + } + } else if (e.op === CacheOp.Remove) { + await redisCache.$removeRbfEntry(e.type, e.txid); + } + } + this.cacheQueue = []; + } + public dump(): any { const trees = Array.from(this.rbfTrees.values()).map((tree: RbfTree) => { return this.exportTree(tree); }); @@ -378,8 +453,7 @@ class RbfCache { }; this.treeMap.set(txid, root); if (root === txid) { - this.rbfTrees.set(root, tree); - this.dirtyTrees.add(root); + this.addTree(root, tree); } return tree; } diff --git a/backend/src/api/redis-cache.ts b/backend/src/api/redis-cache.ts index c3cdc9e28..2598c406c 100644 --- a/backend/src/api/redis-cache.ts +++ b/backend/src/api/redis-cache.ts @@ -4,6 +4,7 @@ import blocks from './blocks'; import logger from '../logger'; import config from '../config'; import { BlockExtended, BlockSummary, TransactionExtended } from '../mempool.interfaces'; +import rbfCache from './rbf-cache'; class RedisCache { private client; @@ -73,6 +74,24 @@ class RedisCache { } } + async $setRbfEntry(type: string, txid: string, value: any): Promise { + try { + await this.$ensureConnected(); + await this.client.json.set(`rbf:${type}:${txid}`, '$', value); + } catch (e) { + logger.warn(`Failed to set RBF ${type} in Redis cache: ${e instanceof Error ? e.message : e}`); + } + } + + async $removeRbfEntry(type: string, txid: string): Promise { + try { + await this.$ensureConnected(); + await this.client.del(`rbf:${type}:${txid}`); + } catch (e) { + logger.warn(`Failed to remove RBF ${type} from Redis cache: ${e instanceof Error ? e.message : e}`); + } + } + async $getBlocks(): Promise { try { await this.$ensureConnected(); @@ -121,6 +140,26 @@ class RedisCache { return mempool; } + async $getRbfEntries(type: string): Promise { + try { + await this.$ensureConnected(); + const keys = await this.client.keys(`rbf:${type}:*`); + const promises: Promise[] = []; + for (let i = 0; i < keys.length; i += 10000) { + const keySlice = keys.slice(i, i + 10000); + if (!keySlice.length) { + continue; + } + promises.push(this.client.json.mGet(keySlice, '$').then(chunk => chunk?.length ? chunk.flat().map((v, i) => [keySlice[i].slice(`rbf:${type}:`.length), v]) : [] )); + } + const entries = await Promise.all(promises); + return entries.flat(); + } catch (e) { + logger.warn(`Failed to retrieve Rbf ${type}s from Redis cache: ${e instanceof Error ? e.message : e}`); + return []; + } + } + async $loadCache() { logger.info('Restoring mempool and blocks data from Redis cache'); // Load block data @@ -128,11 +167,20 @@ class RedisCache { const loadedBlockSummaries = await this.$getBlockSummaries(); // Load mempool const loadedMempool = await this.$getMempool(); + // Load rbf data + const rbfTxs = await this.$getRbfEntries('tx'); + const rbfTrees = await this.$getRbfEntries('tree'); + const rbfExpirations = await this.$getRbfEntries('exp'); // Set loaded data blocks.setBlocks(loadedBlocks || []); blocks.setBlockSummaries(loadedBlockSummaries || []); await memPool.$setMempool(loadedMempool); + await rbfCache.load({ + txs: rbfTxs, + trees: rbfTrees.map(loadedTree => loadedTree[1]), + expiring: rbfExpirations, + }); } }