Add RBF data to Redis cache

This commit is contained in:
Mononaut 2023-05-12 16:31:01 -06:00
parent 5138f9a254
commit aea2b1ec6b
No known key found for this signature in database
GPG Key ID: A3F058E41374C04E
3 changed files with 138 additions and 15 deletions

View File

@ -326,6 +326,7 @@ class Mempool {
if (config.REDIS.ENABLED) {
await redisCache.$addTransactions(newTransactions);
await redisCache.$removeTransactions(deletedTransactions.map(tx => tx.txid));
await rbfCache.updateCache();
}
const end = new Date().getTime();

View File

@ -1,15 +1,17 @@
import config from "../config";
import logger from "../logger";
import { MempoolTransactionExtended, TransactionStripped } from "../mempool.interfaces";
import bitcoinApi from './bitcoin/bitcoin-api-factory';
import { Common } from "./common";
import redisCache from "./redis-cache";
interface RbfTransaction extends TransactionStripped {
export interface RbfTransaction extends TransactionStripped {
rbf?: boolean;
mined?: boolean;
fullRbf?: boolean;
}
interface RbfTree {
export interface RbfTree {
tx: RbfTransaction;
time: number;
interval?: number;
@ -28,6 +30,19 @@ export interface ReplacementInfo {
newVsize: number;
}
enum CacheOp {
Remove = 0,
Add = 1,
Change = 2,
}
interface CacheEvent {
op: CacheOp;
type: 'tx' | 'tree' | 'exp';
txid: string,
value?: any,
}
class RbfCache {
private replacedBy: Map<string, string> = new Map();
private replaces: Map<string, string[]> = new Map();
@ -36,11 +51,43 @@ class RbfCache {
private treeMap: Map<string, string> = new Map(); // map of txids to sequence ids
private txs: Map<string, MempoolTransactionExtended> = new Map();
private expiring: Map<string, number> = new Map();
private cacheQueue: CacheEvent[] = [];
constructor() {
setInterval(this.cleanup.bind(this), 1000 * 60 * 10);
}
private addTx(txid: string, tx: MempoolTransactionExtended): void {
this.txs.set(txid, tx);
this.cacheQueue.push({ op: CacheOp.Add, type: 'tx', txid });
}
private addTree(txid: string, tree: RbfTree): void {
this.rbfTrees.set(txid, tree);
this.dirtyTrees.add(txid);
this.cacheQueue.push({ op: CacheOp.Add, type: 'tree', txid });
}
private addExpiration(txid: string, expiry: number): void {
this.expiring.set(txid, expiry);
this.cacheQueue.push({ op: CacheOp.Add, type: 'exp', txid, value: expiry });
}
private removeTx(txid: string): void {
this.txs.delete(txid);
this.cacheQueue.push({ op: CacheOp.Remove, type: 'tx', txid });
}
private removeTree(txid: string): void {
this.rbfTrees.delete(txid);
this.cacheQueue.push({ op: CacheOp.Remove, type: 'tree', txid });
}
private removeExpiration(txid: string): void {
this.expiring.delete(txid);
this.cacheQueue.push({ op: CacheOp.Remove, type: 'exp', txid });
}
public add(replaced: MempoolTransactionExtended[], newTxExtended: MempoolTransactionExtended): void {
if (!newTxExtended || !replaced?.length || this.txs.has(newTxExtended.txid)) {
return;
@ -49,7 +96,7 @@ class RbfCache {
const newTx = Common.stripTransaction(newTxExtended) as RbfTransaction;
const newTime = newTxExtended.firstSeen || (Date.now() / 1000);
newTx.rbf = newTxExtended.vin.some((v) => v.sequence < 0xfffffffe);
this.txs.set(newTx.txid, newTxExtended);
this.addTx(newTx.txid, newTxExtended);
// maintain rbf trees
let txFullRbf = false;
@ -66,7 +113,7 @@ class RbfCache {
const treeId = this.treeMap.get(replacedTx.txid);
if (treeId) {
const tree = this.rbfTrees.get(treeId);
this.rbfTrees.delete(treeId);
this.removeTree(treeId);
if (tree) {
tree.interval = newTime - tree?.time;
replacedTrees.push(tree);
@ -83,7 +130,7 @@ class RbfCache {
replaces: [],
});
treeFullRbf = treeFullRbf || !replacedTx.rbf;
this.txs.set(replacedTx.txid, replacedTxExtended);
this.addTx(replacedTx.txid, replacedTxExtended);
}
}
newTx.fullRbf = txFullRbf;
@ -94,10 +141,9 @@ class RbfCache {
fullRbf: treeFullRbf,
replaces: replacedTrees
};
this.rbfTrees.set(treeId, newTree);
this.addTree(treeId, newTree);
this.updateTreeMap(treeId, newTree);
this.replaces.set(newTx.txid, replacedTrees.map(tree => tree.tx.txid));
this.dirtyTrees.add(treeId);
}
public has(txId: string): boolean {
@ -191,6 +237,7 @@ class RbfCache {
this.setTreeMined(tree, txid);
tree.mined = true;
this.dirtyTrees.add(treeId);
this.cacheQueue.push({ op: CacheOp.Change, type: 'tree', txid: treeId });
}
}
this.evict(txid);
@ -199,7 +246,8 @@ class RbfCache {
// flag a transaction as removed from the mempool
public evict(txid: string, fast: boolean = false): void {
if (this.txs.has(txid) && (fast || !this.expiring.has(txid))) {
this.expiring.set(txid, fast ? Date.now() + (1000 * 60 * 10) : Date.now() + (1000 * 86400)); // 24 hours
const expiryTime = fast ? Date.now() + (1000 * 60 * 10) : Date.now() + (1000 * 86400); // 24 hours
this.addExpiration(txid, expiryTime);
}
}
@ -220,11 +268,11 @@ class RbfCache {
const now = Date.now();
for (const txid of this.expiring.keys()) {
if ((this.expiring.get(txid) || 0) < now) {
this.expiring.delete(txid);
this.removeExpiration(txid);
this.remove(txid);
}
}
logger.debug(`rbf cache contains ${this.txs.size} txs, ${this.expiring.size} due to expire`);
logger.debug(`rbf cache contains ${this.txs.size} txs, ${this.rbfTrees.size} trees, ${this.expiring.size} due to expire`);
}
// remove a transaction & all previous versions from the cache
@ -234,14 +282,14 @@ class RbfCache {
const replaces = this.replaces.get(txid);
this.replaces.delete(txid);
this.treeMap.delete(txid);
this.txs.delete(txid);
this.expiring.delete(txid);
this.removeTx(txid);
this.removeExpiration(txid);
for (const tx of (replaces || [])) {
// recursively remove prior versions from the cache
this.replacedBy.delete(tx);
// if this is the id of a tree, remove that too
if (this.treeMap.get(tx) === tx) {
this.rbfTrees.delete(tx);
this.removeTree(tx);
}
this.remove(tx);
}
@ -273,6 +321,33 @@ class RbfCache {
}
}
public async updateCache(): Promise<void> {
if (!config.REDIS.ENABLED) {
return;
}
// Update the Redis cache by replaying queued events
for (const e of this.cacheQueue) {
if (e.op === CacheOp.Add || e.op === CacheOp.Change) {
let value = e.value;
switch(e.type) {
case 'tx': {
value = this.txs.get(e.txid);
} break;
case 'tree': {
const tree = this.rbfTrees.get(e.txid);
value = tree ? this.exportTree(tree) : null;
} break;
}
if (value != null) {
await redisCache.$setRbfEntry(e.type, e.txid, value);
}
} else if (e.op === CacheOp.Remove) {
await redisCache.$removeRbfEntry(e.type, e.txid);
}
}
this.cacheQueue = [];
}
public dump(): any {
const trees = Array.from(this.rbfTrees.values()).map((tree: RbfTree) => { return this.exportTree(tree); });
@ -378,8 +453,7 @@ class RbfCache {
};
this.treeMap.set(txid, root);
if (root === txid) {
this.rbfTrees.set(root, tree);
this.dirtyTrees.add(root);
this.addTree(root, tree);
}
return tree;
}

View File

@ -4,6 +4,7 @@ import blocks from './blocks';
import logger from '../logger';
import config from '../config';
import { BlockExtended, BlockSummary, TransactionExtended } from '../mempool.interfaces';
import rbfCache from './rbf-cache';
class RedisCache {
private client;
@ -73,6 +74,24 @@ class RedisCache {
}
}
async $setRbfEntry(type: string, txid: string, value: any): Promise<void> {
try {
await this.$ensureConnected();
await this.client.json.set(`rbf:${type}:${txid}`, '$', value);
} catch (e) {
logger.warn(`Failed to set RBF ${type} in Redis cache: ${e instanceof Error ? e.message : e}`);
}
}
async $removeRbfEntry(type: string, txid: string): Promise<void> {
try {
await this.$ensureConnected();
await this.client.del(`rbf:${type}:${txid}`);
} catch (e) {
logger.warn(`Failed to remove RBF ${type} from Redis cache: ${e instanceof Error ? e.message : e}`);
}
}
async $getBlocks(): Promise<BlockExtended[]> {
try {
await this.$ensureConnected();
@ -121,6 +140,26 @@ class RedisCache {
return mempool;
}
async $getRbfEntries(type: string): Promise<any[]> {
try {
await this.$ensureConnected();
const keys = await this.client.keys(`rbf:${type}:*`);
const promises: Promise<TransactionExtended[]>[] = [];
for (let i = 0; i < keys.length; i += 10000) {
const keySlice = keys.slice(i, i + 10000);
if (!keySlice.length) {
continue;
}
promises.push(this.client.json.mGet(keySlice, '$').then(chunk => chunk?.length ? chunk.flat().map((v, i) => [keySlice[i].slice(`rbf:${type}:`.length), v]) : [] ));
}
const entries = await Promise.all(promises);
return entries.flat();
} catch (e) {
logger.warn(`Failed to retrieve Rbf ${type}s from Redis cache: ${e instanceof Error ? e.message : e}`);
return [];
}
}
async $loadCache() {
logger.info('Restoring mempool and blocks data from Redis cache');
// Load block data
@ -128,11 +167,20 @@ class RedisCache {
const loadedBlockSummaries = await this.$getBlockSummaries();
// Load mempool
const loadedMempool = await this.$getMempool();
// Load rbf data
const rbfTxs = await this.$getRbfEntries('tx');
const rbfTrees = await this.$getRbfEntries('tree');
const rbfExpirations = await this.$getRbfEntries('exp');
// Set loaded data
blocks.setBlocks(loadedBlocks || []);
blocks.setBlockSummaries(loadedBlockSummaries || []);
await memPool.$setMempool(loadedMempool);
await rbfCache.load({
txs: rbfTxs,
trees: rbfTrees.map(loadedTree => loadedTree[1]),
expiring: rbfExpirations,
});
}
}