Merge pull request #4598 from mempool/mononaut/mempool-delta-subscription

Feature: track-mempool websocket subscriptions
This commit is contained in:
softsimon 2024-05-04 13:47:29 +07:00 committed by GitHub
commit 24ea3199dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 97 additions and 8 deletions

View File

@ -3,6 +3,7 @@ import * as WebSocket from 'ws';
import {
BlockExtended, TransactionExtended, MempoolTransactionExtended, WebsocketResponse,
OptimizedStatistic, ILoadingIndicators, GbtCandidates, TxTrackingInfo,
MempoolBlockDelta, MempoolDelta, MempoolDeltaTxids
} from '../mempool.interfaces';
import blocks from './blocks';
import memPool from './mempool';
@ -364,6 +365,18 @@ class WebsocketHandler {
client['track-donation'] = parsedMessage['track-donation'];
}
if (parsedMessage['track-mempool-txids'] === true) {
client['track-mempool-txids'] = true;
} else if (parsedMessage['track-mempool-txids'] === false) {
delete client['track-mempool-txids'];
}
if (parsedMessage['track-mempool'] === true) {
client['track-mempool'] = true;
} else if (parsedMessage['track-mempool'] === false) {
delete client['track-mempool'];
}
if (Object.keys(response).length) {
client.send(this.serializeResponse(response));
}
@ -545,6 +558,33 @@ class WebsocketHandler {
const latestTransactions = memPool.getLatestTransactions();
if (memPool.isInSync()) {
this.mempoolSequence++;
}
const replacedTransactions: { replaced: string, by: TransactionExtended }[] = [];
for (const tx of newTransactions) {
if (rbfTransactions[tx.txid]) {
for (const replaced of rbfTransactions[tx.txid]) {
replacedTransactions.push({ replaced: replaced.txid, by: tx });
}
}
}
const mempoolDeltaTxids: MempoolDeltaTxids = {
sequence: this.mempoolSequence,
added: newTransactions.map(tx => tx.txid),
removed: deletedTransactions.map(tx => tx.txid),
mined: [],
replaced: replacedTransactions.map(replacement => ({ replaced: replacement.replaced, by: replacement.by.txid })),
};
const mempoolDelta: MempoolDelta = {
sequence: this.mempoolSequence,
added: newTransactions,
removed: deletedTransactions.map(tx => tx.txid),
mined: [],
replaced: replacedTransactions,
};
// update init data
const socketDataFields = {
'mempoolInfo': mempoolInfo,
@ -604,10 +644,6 @@ class WebsocketHandler {
const addressCache = this.makeAddressCache(newTransactions);
const removedAddressCache = this.makeAddressCache(deletedTransactions);
if (memPool.isInSync()) {
this.mempoolSequence++;
}
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach(async (client) => {
@ -847,6 +883,14 @@ class WebsocketHandler {
response['rbfLatestSummary'] = getCachedResponse('rbfLatestSummary', rbfSummary);
}
if (client['track-mempool-txids']) {
response['mempool-txids'] = getCachedResponse('mempool-txids', mempoolDeltaTxids);
}
if (client['track-mempool']) {
response['mempool-transactions'] = getCachedResponse('mempool-transactions', mempoolDelta);
}
if (Object.keys(response).length) {
client.send(this.serializeResponse(response));
}
@ -992,6 +1036,31 @@ class WebsocketHandler {
const mBlocksWithTransactions = mempoolBlocks.getMempoolBlocksWithTransactions();
if (memPool.isInSync()) {
this.mempoolSequence++;
}
const replacedTransactions: { replaced: string, by: TransactionExtended }[] = [];
for (const txid of Object.keys(rbfTransactions)) {
for (const replaced of rbfTransactions[txid].replaced) {
replacedTransactions.push({ replaced: replaced.txid, by: rbfTransactions[txid].replacedBy });
}
}
const mempoolDeltaTxids: MempoolDeltaTxids = {
sequence: this.mempoolSequence,
added: [],
removed: [],
mined: transactions.map(tx => tx.txid),
replaced: replacedTransactions.map(replacement => ({ replaced: replacement.replaced, by: replacement.by.txid })),
};
const mempoolDelta: MempoolDelta = {
sequence: this.mempoolSequence,
added: [],
removed: [],
mined: transactions.map(tx => tx.txid),
replaced: replacedTransactions,
};
const responseCache = { ...this.socketData };
function getCachedResponse(key, data): string {
if (!responseCache[key]) {
@ -1000,10 +1069,6 @@ class WebsocketHandler {
return responseCache[key];
}
if (memPool.isInSync()) {
this.mempoolSequence++;
}
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
@ -1185,6 +1250,14 @@ class WebsocketHandler {
}
}
if (client['track-mempool-txids']) {
response['mempool-txids'] = getCachedResponse('mempool-txids', mempoolDeltaTxids);
}
if (client['track-mempool']) {
response['mempool-transactions'] = getCachedResponse('mempool-transactions', mempoolDelta);
}
if (Object.keys(response).length) {
client.send(this.serializeResponse(response));
}

View File

@ -71,6 +71,22 @@ export interface MempoolBlockDelta {
changed: MempoolDeltaChange[];
}
export interface MempoolDeltaTxids {
sequence: number,
added: string[];
removed: string[];
mined: string[];
replaced: { replaced: string, by: string }[];
}
export interface MempoolDelta {
sequence: number,
added: MempoolTransactionExtended[];
removed: string[];
mined: string[];
replaced: { replaced: string, by: TransactionExtended }[];
}
interface VinStrippedToScriptsig {
scriptsig: string;
}