diff --git a/backend/src/api/mempool.ts b/backend/src/api/mempool.ts index 1442b05fa..87e7f10cd 100644 --- a/backend/src/api/mempool.ts +++ b/backend/src/api/mempool.ts @@ -10,6 +10,7 @@ import bitcoinClient from './bitcoin/bitcoin-client'; import bitcoinSecondClient from './bitcoin/bitcoin-second-client'; import rbfCache from './rbf-cache'; import { Acceleration } from './services/acceleration'; +import accelerationApi from './services/acceleration'; import redisCache from './redis-cache'; import blocks from './blocks'; @@ -207,7 +208,7 @@ class Mempool { return txTimes; } - public async $updateMempool(transactions: string[], accelerations: Acceleration[] | null, minFeeMempool: string[], minFeeTip: number, pollRate: number): Promise { + public async $updateMempool(transactions: string[], accelerations: Record | null, minFeeMempool: string[], minFeeTip: number, pollRate: number): Promise { logger.debug(`Updating mempool...`); // warn if this run stalls the main loop for more than 2 minutes @@ -354,7 +355,7 @@ class Mempool { const newTransactionsStripped = newTransactions.map((tx) => Common.stripTransaction(tx)); this.latestTransactions = newTransactionsStripped.concat(this.latestTransactions).slice(0, 6); - const accelerationDelta = accelerations != null ? await this.$updateAccelerations(accelerations) : []; + const accelerationDelta = accelerations != null ? await this.updateAccelerations(accelerations) : []; if (accelerationDelta.length) { hasChange = true; } @@ -399,58 +400,11 @@ class Mempool { return this.accelerations; } - public $updateAccelerations(newAccelerations: Acceleration[]): string[] { + public updateAccelerations(newAccelerationMap: Record): string[] { try { - const changed: string[] = []; - - const newAccelerationMap: { [txid: string]: Acceleration } = {}; - for (const acceleration of newAccelerations) { - // skip transactions we don't know about - if (!this.mempoolCache[acceleration.txid]) { - continue; - } - newAccelerationMap[acceleration.txid] = acceleration; - if (this.accelerations[acceleration.txid] == null) { - // new acceleration - changed.push(acceleration.txid); - } else { - if (this.accelerations[acceleration.txid].feeDelta !== acceleration.feeDelta) { - // feeDelta changed - changed.push(acceleration.txid); - } else if (this.accelerations[acceleration.txid].pools?.length) { - let poolsChanged = false; - const pools = new Set(); - this.accelerations[acceleration.txid].pools.forEach(pool => { - pools.add(pool); - }); - acceleration.pools.forEach(pool => { - if (!pools.has(pool)) { - poolsChanged = true; - } else { - pools.delete(pool); - } - }); - if (pools.size > 0) { - poolsChanged = true; - } - if (poolsChanged) { - // pools changed - changed.push(acceleration.txid); - } - } - } - } - - for (const oldTxid of Object.keys(this.accelerations)) { - if (!newAccelerationMap[oldTxid]) { - // removed - changed.push(oldTxid); - } - } - + const accelerationDelta = accelerationApi.getAccelerationDelta(this.accelerations, newAccelerationMap); this.accelerations = newAccelerationMap; - - return changed; + return accelerationDelta; } catch (e: any) { logger.debug(`Failed to update accelerations: ` + (e instanceof Error ? e.message : e)); return []; diff --git a/backend/src/api/mining/mining-routes.ts b/backend/src/api/mining/mining-routes.ts index 69e6d95d4..80f9f8e79 100644 --- a/backend/src/api/mining/mining-routes.ts +++ b/backend/src/api/mining/mining-routes.ts @@ -459,7 +459,7 @@ class MiningRoutes { handleError(req, res, 400, 'Acceleration data is not available.'); return; } - res.status(200).send(accelerationApi.accelerations || []); + res.status(200).send(Object.values(accelerationApi.getAccelerations() || {})); } catch (e) { handleError(req, res, 500, e instanceof Error ? e.message : e); } diff --git a/backend/src/api/services/acceleration.ts b/backend/src/api/services/acceleration.ts index 88289382b..f625b7f15 100644 --- a/backend/src/api/services/acceleration.ts +++ b/backend/src/api/services/acceleration.ts @@ -1,7 +1,10 @@ +import { WebSocket } from 'ws'; import config from '../../config'; import logger from '../../logger'; import { BlockExtended } from '../../mempool.interfaces'; import axios from 'axios'; +import mempool from '../mempool'; +import websocketHandler from '../websocket-handler'; type MyAccelerationStatus = 'requested' | 'accelerating' | 'done'; @@ -37,14 +40,20 @@ export interface AccelerationHistory { }; class AccelerationApi { + private ws: WebSocket | null = null; + private useWebsocket: boolean = config.MEMPOOL.OFFICIAL && config.MEMPOOL_SERVICES.ACCELERATIONS; + private startedWebsocketLoop: boolean = false; + private websocketConnected: boolean = false; private onDemandPollingEnabled = !config.MEMPOOL_SERVICES.ACCELERATIONS; private apiPath = config.MEMPOOL.OFFICIAL ? (config.MEMPOOL_SERVICES.API + '/accelerator/accelerations') : (config.EXTERNAL_DATA_SERVER.MEMPOOL_API + '/accelerations'); - private _accelerations: Acceleration[] | null = null; + private _accelerations: Record = {}; private lastPoll = 0; private forcePoll = false; private myAccelerations: Record = {}; - public get accelerations(): Acceleration[] | null { + public constructor() {} + + public getAccelerations(): Record { return this._accelerations; } @@ -72,11 +81,18 @@ class AccelerationApi { } } - public async $updateAccelerations(): Promise { + public async $updateAccelerations(): Promise | null> { + if (this.useWebsocket && this.websocketConnected) { + return this._accelerations; + } if (!this.onDemandPollingEnabled) { const accelerations = await this.$fetchAccelerations(); if (accelerations) { - this._accelerations = accelerations; + const latestAccelerations = {}; + for (const acc of accelerations) { + latestAccelerations[acc.txid] = acc; + } + this._accelerations = latestAccelerations; return this._accelerations; } } else { @@ -85,7 +101,7 @@ class AccelerationApi { return null; } - private async $updateAccelerationsOnDemand(): Promise { + private async $updateAccelerationsOnDemand(): Promise | null> { const shouldUpdate = this.forcePoll || this.countMyAccelerationsWithStatus('requested') > 0 || (this.countMyAccelerationsWithStatus('accelerating') > 0 && this.lastPoll < (Date.now() - (10 * 60 * 1000))); @@ -120,7 +136,11 @@ class AccelerationApi { } } - this._accelerations = Object.values(this.myAccelerations).map(({ acceleration }) => acceleration).filter(acc => acc) as Acceleration[]; + const latestAccelerations = {}; + for (const acc of Object.values(this.myAccelerations).map(({ acceleration }) => acceleration).filter(acc => acc) as Acceleration[]) { + latestAccelerations[acc.txid] = acc; + } + this._accelerations = latestAccelerations; return this._accelerations; } @@ -152,6 +172,110 @@ class AccelerationApi { } return anyAccelerated; } + + // get a list of accelerations that have changed between two sets of accelerations + public getAccelerationDelta(oldAccelerationMap: Record, newAccelerationMap: Record): string[] { + const changed: string[] = []; + const mempoolCache = mempool.getMempool(); + + for (const acceleration of Object.values(newAccelerationMap)) { + // skip transactions we don't know about + if (!mempoolCache[acceleration.txid]) { + continue; + } + if (oldAccelerationMap[acceleration.txid] == null) { + // new acceleration + changed.push(acceleration.txid); + } else { + if (oldAccelerationMap[acceleration.txid].feeDelta !== acceleration.feeDelta) { + // feeDelta changed + changed.push(acceleration.txid); + } else if (oldAccelerationMap[acceleration.txid].pools?.length) { + let poolsChanged = false; + const pools = new Set(); + oldAccelerationMap[acceleration.txid].pools.forEach(pool => { + pools.add(pool); + }); + acceleration.pools.forEach(pool => { + if (!pools.has(pool)) { + poolsChanged = true; + } else { + pools.delete(pool); + } + }); + if (pools.size > 0) { + poolsChanged = true; + } + if (poolsChanged) { + // pools changed + changed.push(acceleration.txid); + } + } + } + } + + for (const oldTxid of Object.keys(oldAccelerationMap)) { + if (!newAccelerationMap[oldTxid]) { + // removed + changed.push(oldTxid); + } + } + + return changed; + } + + private handleWebsocketMessage(msg: any): void { + if (msg?.accelerations !== null) { + const latestAccelerations = {}; + for (const acc of msg?.accelerations || []) { + latestAccelerations[acc.txid] = acc; + } + this._accelerations = latestAccelerations; + websocketHandler.handleAccelerationsChanged(this._accelerations); + } + } + + public async connectWebsocket(): Promise { + if (this.startedWebsocketLoop) { + return; + } + while (this.useWebsocket) { + this.startedWebsocketLoop = true; + if (!this.ws) { + this.ws = new WebSocket(`${config.MEMPOOL_SERVICES.API.replace('https://', 'ws://').replace('http://', 'ws://')}/accelerator/ws`); + this.websocketConnected = true; + + this.ws.on('open', () => { + logger.info('Acceleration websocket opened'); + this.ws?.send(JSON.stringify({ + 'watch-accelerations': true + })); + }); + + this.ws.on('error', (error) => { + logger.err('Acceleration websocket error: ' + error); + this.ws = null; + this.websocketConnected = false; + }); + + this.ws.on('close', () => { + logger.info('Acceleration websocket closed'); + this.ws = null; + this.websocketConnected = false; + }); + + this.ws.on('message', (data, isBinary) => { + try { + const parsedMsg = JSON.parse((isBinary ? data : data.toString()) as string); + this.handleWebsocketMessage(parsedMsg); + } catch (e) { + logger.warn('Failed to parse acceleration websocket message: ' + (e instanceof Error ? e.message : e)); + } + }); + } + await new Promise(resolve => setTimeout(resolve, 5000)); + } + } } export default new AccelerationApi(); \ No newline at end of file diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index 2a047472e..634596bb9 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -21,6 +21,7 @@ import BlocksSummariesRepository from '../repositories/BlocksSummariesRepository import Audit from './audit'; import priceUpdater from '../tasks/price-updater'; import { ApiPrice } from '../repositories/PricesRepository'; +import { Acceleration } from './services/acceleration'; import accelerationApi from './services/acceleration'; import mempool from './mempool'; import statistics from './statistics/statistics'; @@ -57,6 +58,8 @@ class WebsocketHandler { private lastRbfSummary: ReplacementInfo[] | null = null; private mempoolSequence: number = 0; + private accelerations: Record = {}; + constructor() { } addWebsocketServer(wss: WebSocket.Server) { @@ -484,6 +487,42 @@ class WebsocketHandler { } } + handleAccelerationsChanged(accelerations: Record): void { + if (!this.webSocketServers.length) { + throw new Error('No WebSocket.Server has been set'); + } + + const websocketAccelerationDelta = accelerationApi.getAccelerationDelta(this.accelerations, accelerations); + this.accelerations = accelerations; + + if (!websocketAccelerationDelta.length) { + return; + } + + // pre-compute acceleration delta + const accelerationUpdate = { + added: websocketAccelerationDelta.map(txid => accelerations[txid]).filter(acc => acc != null), + removed: websocketAccelerationDelta.filter(txid => !accelerations[txid]), + }; + + try { + const response = JSON.stringify({ + accelerations: accelerationUpdate, + }); + + for (const server of this.webSocketServers) { + server.clients.forEach((client) => { + if (client.readyState !== WebSocket.OPEN) { + return; + } + client.send(response); + }); + } + } catch (e) { + logger.debug(`Error sending acceleration update to websocket clients: ${e}`); + } + } + handleReorg(): void { if (!this.webSocketServers.length) { throw new Error('No WebSocket.Server have been set'); @@ -560,7 +599,7 @@ class WebsocketHandler { const vBytesPerSecond = memPool.getVBytesPerSecond(); const rbfTransactions = Common.findRbfTransactions(newTransactions, recentlyDeletedTransactions.flat()); const da = difficultyAdjustment.getDifficultyAdjustment(); - const accelerations = memPool.getAccelerations(); + const accelerations = accelerationApi.getAccelerations(); memPool.handleRbfTransactions(rbfTransactions); const rbfChanges = rbfCache.getRbfChanges(); let rbfReplacements; @@ -668,10 +707,13 @@ class WebsocketHandler { const addressCache = this.makeAddressCache(newTransactions); const removedAddressCache = this.makeAddressCache(deletedTransactions); + const websocketAccelerationDelta = accelerationApi.getAccelerationDelta(this.accelerations, accelerations); + this.accelerations = accelerations; + // pre-compute acceleration delta const accelerationUpdate = { - added: accelerationDelta.map(txid => accelerations[txid]).filter(acc => acc != null), - removed: accelerationDelta.filter(txid => !accelerations[txid]), + added: websocketAccelerationDelta.map(txid => accelerations[txid]).filter(acc => acc != null), + removed: websocketAccelerationDelta.filter(txid => !accelerations[txid]), }; // TODO - Fix indentation after PR is merged diff --git a/backend/src/index.ts b/backend/src/index.ts index 1d83c56a3..84a7e5705 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -229,11 +229,11 @@ class Server { const newMempool = await bitcoinApi.$getRawMempool(); const minFeeMempool = memPool.limitGBT ? await bitcoinSecondClient.getRawMemPool() : null; const minFeeTip = memPool.limitGBT ? await bitcoinSecondClient.getBlockCount() : -1; - const newAccelerations = await accelerationApi.$updateAccelerations(); + const latestAccelerations = await accelerationApi.$updateAccelerations(); const numHandledBlocks = await blocks.$updateBlocks(); const pollRate = config.MEMPOOL.POLL_RATE_MS * (indexer.indexerIsRunning() ? 10 : 1); if (numHandledBlocks === 0) { - await memPool.$updateMempool(newMempool, newAccelerations, minFeeMempool, minFeeTip, pollRate); + await memPool.$updateMempool(newMempool, latestAccelerations, minFeeMempool, minFeeTip, pollRate); } indexer.$run(); if (config.FIAT_PRICE.ENABLED) { @@ -310,8 +310,10 @@ class Server { priceUpdater.setRatesChangedCallback(websocketHandler.handleNewConversionRates.bind(websocketHandler)); } loadingIndicators.setProgressChangedCallback(websocketHandler.handleLoadingChanged.bind(websocketHandler)); + + accelerationApi.connectWebsocket(); } - + setUpHttpApiRoutes(): void { bitcoinRoutes.initRoutes(this.app); bitcoinCoreRoutes.initRoutes(this.app);