From ca6ddd609d13fdccf31b1ba36593a25412baf0dc Mon Sep 17 00:00:00 2001 From: Mononaut Date: Mon, 12 Jun 2023 15:31:47 -0400 Subject: [PATCH 1/2] clean up backend websocket logic --- backend/src/api/blocks.ts | 10 +- backend/src/api/websocket-handler.ts | 164 ++++++++++++++++++--------- backend/src/index.ts | 2 +- 3 files changed, 117 insertions(+), 59 deletions(-) diff --git a/backend/src/api/blocks.ts b/backend/src/api/blocks.ts index fae1d453b..bc6c9638e 100644 --- a/backend/src/api/blocks.ts +++ b/backend/src/api/blocks.ts @@ -730,6 +730,11 @@ class Blocks { this.currentDifficulty = block.difficulty; } + // wait for pending async callbacks to finish + this.updateTimerProgress(timer, `waiting for async callbacks to complete for ${this.currentBlockHeight}`); + await Promise.all(callbackPromises); + this.updateTimerProgress(timer, `async callbacks completed for ${this.currentBlockHeight}`); + this.blocks.push(blockExtended); if (this.blocks.length > config.MEMPOOL.INITIAL_BLOCKS_AMOUNT * 4) { this.blocks = this.blocks.slice(-config.MEMPOOL.INITIAL_BLOCKS_AMOUNT * 4); @@ -746,11 +751,6 @@ class Blocks { diskCache.$saveCacheToDisk(); } - // wait for pending async callbacks to finish - this.updateTimerProgress(timer, `waiting for async callbacks to complete for ${this.currentBlockHeight}`); - await Promise.all(callbackPromises); - this.updateTimerProgress(timer, `async callbacks completed for ${this.currentBlockHeight}`); - handledBlocks++; } diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index 8aaab5ab5..dd7cdc8f2 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -22,6 +22,14 @@ import { deepClone } from '../utils/clone'; import priceUpdater from '../tasks/price-updater'; import { ApiPrice } from '../repositories/PricesRepository'; +// valid 'want' subscriptions +const wantable = [ + 'blocks', + 'mempool-blocks', + 'live-2h-chart', + 'stats', +]; + class WebsocketHandler { private wss: WebSocket.Server | undefined; private extraInitProperties = {}; @@ -30,7 +38,7 @@ class WebsocketHandler { private numConnected = 0; private numDisconnected = 0; - private initData: { [key: string]: string } = {}; + private socketData: { [key: string]: string } = {}; private serializedInitData: string = '{}'; constructor() { } @@ -39,28 +47,28 @@ class WebsocketHandler { this.wss = wss; } - setExtraInitProperties(property: string, value: any) { + setExtraInitData(property: string, value: any) { this.extraInitProperties[property] = value; - this.setInitDataFields(this.extraInitProperties); + this.updateSocketDataFields(this.extraInitProperties); } - private setInitDataFields(data: { [property: string]: any }): void { + private updateSocketDataFields(data: { [property: string]: any }): void { for (const property of Object.keys(data)) { if (data[property] != null) { - this.initData[property] = JSON.stringify(data[property]); + this.socketData[property] = JSON.stringify(data[property]); } else { - delete this.initData[property]; + delete this.socketData[property]; } } this.serializedInitData = '{' - + Object.keys(this.initData).map(key => `"${key}": ${this.initData[key]}`).join(', ') - + '}'; + + Object.keys(this.socketData).map(key => `"${key}": ${this.socketData[key]}`).join(', ') + + '}'; } - private updateInitData(): void { + private updateSocketData(): void { const _blocks = blocks.getBlocks().slice(-config.MEMPOOL.INITIAL_BLOCKS_AMOUNT); const da = difficultyAdjustment.getDifficultyAdjustment(); - this.setInitDataFields({ + this.updateSocketDataFields({ 'mempoolInfo': memPool.getMempoolInfo(), 'vBytesPerSecond': memPool.getVBytesPerSecond(), 'blocks': _blocks, @@ -94,11 +102,33 @@ class WebsocketHandler { const parsedMessage: WebsocketResponse = JSON.parse(message); const response = {}; - if (parsedMessage.action === 'want') { - client['want-blocks'] = parsedMessage.data.indexOf('blocks') > -1; - client['want-mempool-blocks'] = parsedMessage.data.indexOf('mempool-blocks') > -1; - client['want-live-2h-chart'] = parsedMessage.data.indexOf('live-2h-chart') > -1; - client['want-stats'] = parsedMessage.data.indexOf('stats') > -1; + const wantNow = {}; + if (parsedMessage && parsedMessage.action === 'want' && Array.isArray(parsedMessage.data)) { + for (const sub of wantable) { + const key = `want-${sub}`; + const wants = parsedMessage.data.includes(sub); + if (wants && client['wants'] && !client[key]) { + wantNow[key] = true; + } + client[key] = wants; + } + client['wants'] = true; + } + + // send initial data when a client first starts a subscription + if (wantNow['want-blocks']) { + response['blocks'] = this.socketData['blocks']; + } + + if (wantNow['want-mempool-blocks']) { + response['mempool-blocks'] = this.socketData['mempool-blocks']; + } + + if (wantNow['want-stats']) { + response['mempoolInfo'] = this.socketData['mempoolInfo']; + response['vBytesPerSecond'] = this.socketData['vBytesPerSecond']; + response['fees'] = this.socketData['fees']; + response['da'] = this.socketData['da']; } if (parsedMessage && parsedMessage['track-tx']) { @@ -109,21 +139,21 @@ class WebsocketHandler { if (parsedMessage['watch-mempool']) { const rbfCacheTxid = rbfCache.getReplacedBy(trackTxid); if (rbfCacheTxid) { - response['txReplaced'] = { + response['txReplaced'] = JSON.stringify({ txid: rbfCacheTxid, - }; + }); client['track-tx'] = null; } else { // It might have appeared before we had the time to start watching for it const tx = memPool.getMempool()[trackTxid]; if (tx) { if (config.MEMPOOL.BACKEND === 'esplora') { - response['tx'] = tx; + response['tx'] = JSON.stringify(tx); } else { // tx.prevout is missing from transactions when in bitcoind mode try { const fullTx = await transactionUtils.$getMempoolTransactionExtended(tx.txid, true); - response['tx'] = fullTx; + response['tx'] = JSON.stringify(fullTx); } catch (e) { logger.debug('Error finding transaction: ' + (e instanceof Error ? e.message : e)); } @@ -131,7 +161,7 @@ class WebsocketHandler { } else { try { const fullTx = await transactionUtils.$getMempoolTransactionExtended(client['track-tx'], true); - response['tx'] = fullTx; + response['tx'] = JSON.stringify(fullTx); } catch (e) { logger.debug('Error finding transaction. ' + (e instanceof Error ? e.message : e)); client['track-mempool-tx'] = parsedMessage['track-tx']; @@ -141,10 +171,10 @@ class WebsocketHandler { } const tx = memPool.getMempool()[trackTxid]; if (tx && tx.position) { - response['txPosition'] = { + response['txPosition'] = JSON.stringify({ txid: trackTxid, position: tx.position, - }; + }); } } else { client['track-tx'] = null; @@ -177,10 +207,10 @@ class WebsocketHandler { const index = parsedMessage['track-mempool-block']; client['track-mempool-block'] = index; const mBlocksWithTransactions = mempoolBlocks.getMempoolBlocksWithTransactions(); - response['projected-block-transactions'] = { + response['projected-block-transactions'] = JSON.stringify({ index: index, blockTransactions: mBlocksWithTransactions[index]?.transactions || [], - }; + }); } else { client['track-mempool-block'] = null; } @@ -189,23 +219,24 @@ class WebsocketHandler { if (parsedMessage && parsedMessage['track-rbf'] !== undefined) { if (['all', 'fullRbf'].includes(parsedMessage['track-rbf'])) { client['track-rbf'] = parsedMessage['track-rbf']; + response['rbfLatest'] = JSON.stringify(rbfCache.getRbfTrees(parsedMessage['track-rbf'] === 'fullRbf')); } else { client['track-rbf'] = false; } } if (parsedMessage.action === 'init') { - if (!this.initData['blocks']?.length || !this.initData['da']) { - this.updateInitData(); + if (!this.socketData['blocks']?.length || !this.socketData['da']) { + this.updateSocketData(); } - if (!this.initData['blocks']?.length) { + if (!this.socketData['blocks']?.length) { return; } client.send(this.serializedInitData); } if (parsedMessage.action === 'ping') { - response['pong'] = true; + response['pong'] = JSON.stringify(true); } if (parsedMessage['track-donation'] && parsedMessage['track-donation'].length === 22) { @@ -221,7 +252,8 @@ class WebsocketHandler { } if (Object.keys(response).length) { - client.send(JSON.stringify(response)); + const serializedResponse = this.serializeResponse(response); + client.send(serializedResponse); } } catch (e) { logger.debug('Error parsing websocket message: ' + (e instanceof Error ? e.message : e)); @@ -250,7 +282,7 @@ class WebsocketHandler { throw new Error('WebSocket.Server is not set'); } - this.setInitDataFields({ 'loadingIndicators': indicators }); + this.updateSocketDataFields({ 'loadingIndicators': indicators }); const response = JSON.stringify({ loadingIndicators: indicators }); this.wss.clients.forEach((client) => { @@ -266,7 +298,7 @@ class WebsocketHandler { throw new Error('WebSocket.Server is not set'); } - this.setInitDataFields({ 'conversions': conversionRates }); + this.updateSocketDataFields({ 'conversions': conversionRates }); const response = JSON.stringify({ conversions: conversionRates }); this.wss.clients.forEach((client) => { @@ -336,11 +368,21 @@ class WebsocketHandler { memPool.addToSpendMap(newTransactions); const recommendedFees = feeApi.getRecommendedFee(); + const latestTransactions = newTransactions.slice(0, 6).map((tx) => Common.stripTransaction(tx)); + // update init data - this.updateInitData(); + this.updateSocketDataFields({ + 'mempoolInfo': mempoolInfo, + 'vBytesPerSecond': vBytesPerSecond, + 'mempool-blocks': mBlocks, + 'transactions': latestTransactions, + 'loadingIndicators': loadingIndicators.getLoadingIndicators(), + 'da': da?.previousTime ? da : undefined, + 'fees': recommendedFees, + }); // cache serialized objects to avoid stringify-ing the same thing for every client - const responseCache = { ...this.initData }; + const responseCache = { ...this.socketData }; function getCachedResponse(key: string, data): string { if (!responseCache[key]) { responseCache[key] = JSON.stringify(data); @@ -371,8 +413,6 @@ class WebsocketHandler { } } - const latestTransactions = newTransactions.slice(0, 6).map((tx) => Common.stripTransaction(tx)); - this.wss.clients.forEach(async (client) => { if (client.readyState !== WebSocket.OPEN) { return; @@ -490,7 +530,7 @@ class WebsocketHandler { if (rbfReplacedBy) { response['rbfTransaction'] = JSON.stringify({ txid: rbfReplacedBy, - }) + }); } const rbfChange = rbfChanges.map[client['track-tx']]; @@ -524,9 +564,7 @@ class WebsocketHandler { } if (Object.keys(response).length) { - const serializedResponse = '{' - + Object.keys(response).map(key => `"${key}": ${response[key]}`).join(', ') - + '}'; + const serializedResponse = this.serializeResponse(response); client.send(serializedResponse); } }); @@ -633,11 +671,19 @@ class WebsocketHandler { const da = difficultyAdjustment.getDifficultyAdjustment(); const fees = feeApi.getRecommendedFee(); + const mempoolInfo = memPool.getMempoolInfo(); // update init data - this.updateInitData(); + this.updateSocketDataFields({ + 'mempoolInfo': mempoolInfo, + 'blocks': [...blocks.getBlocks(), block].slice(-config.MEMPOOL.INITIAL_BLOCKS_AMOUNT), + 'mempool-blocks': mBlocks, + 'loadingIndicators': loadingIndicators.getLoadingIndicators(), + 'da': da?.previousTime ? da : undefined, + 'fees': fees, + }); - const responseCache = { ...this.initData }; + const responseCache = { ...this.socketData }; function getCachedResponse(key, data): string { if (!responseCache[key]) { responseCache[key] = JSON.stringify(data); @@ -645,22 +691,26 @@ class WebsocketHandler { return responseCache[key]; } - const mempoolInfo = memPool.getMempoolInfo(); - this.wss.clients.forEach((client) => { if (client.readyState !== WebSocket.OPEN) { return; } - if (!client['want-blocks']) { - return; + const response = {}; + + if (client['want-blocks']) { + response['block'] = getCachedResponse('block', block); } - const response = {}; - response['block'] = getCachedResponse('block', block); - response['mempoolInfo'] = getCachedResponse('mempoolInfo', mempoolInfo); - response['da'] = getCachedResponse('da', da?.previousTime ? da : undefined); - response['fees'] = getCachedResponse('fees', fees); + if (client['want-stats']) { + response['mempoolInfo'] = getCachedResponse('mempoolInfo', mempoolInfo); + response['vBytesPerSecond'] = getCachedResponse('vBytesPerSecond', memPool.getVBytesPerSecond()); + response['fees'] = getCachedResponse('fees', fees); + + if (da?.previousTime) { + response['da'] = getCachedResponse('da', da); + } + } if (mBlocks && client['want-mempool-blocks']) { response['mempool-blocks'] = getCachedResponse('mempool-blocks', mBlocks); @@ -755,11 +805,19 @@ class WebsocketHandler { } } - const serializedResponse = '{' + if (Object.keys(response).length) { + const serializedResponse = this.serializeResponse(response); + client.send(serializedResponse); + } + }); + } + + // takes a dictionary of JSON serialized values + // and zips it together into a valid JSON object + private serializeResponse(response): string { + return '{' + Object.keys(response).map(key => `"${key}": ${response[key]}`).join(', ') + '}'; - client.send(serializedResponse); - }); } private printLogs(): void { diff --git a/backend/src/index.ts b/backend/src/index.ts index 9f543d644..81863a208 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -150,7 +150,7 @@ class Server { if (config.BISQ.ENABLED) { bisq.startBisqService(); - bisq.setPriceCallbackFunction((price) => websocketHandler.setExtraInitProperties('bsq-price', price)); + bisq.setPriceCallbackFunction((price) => websocketHandler.setExtraInitData('bsq-price', price)); blocks.setNewBlockCallback(bisq.handleNewBitcoinBlock.bind(bisq)); bisqMarkets.startBisqService(); } From eaad63a082cab89ada5e9eabcabad6fb0cf6f281 Mon Sep 17 00:00:00 2001 From: Mononaut Date: Mon, 12 Jun 2023 16:32:04 -0400 Subject: [PATCH 2/2] frontend resync recent blocks when necessary --- backend/src/api/websocket-handler.ts | 2 +- frontend/src/app/interfaces/websocket.interface.ts | 1 + frontend/src/app/services/websocket.service.ts | 10 +++++++++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index dd7cdc8f2..1e2e381c2 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -116,7 +116,7 @@ class WebsocketHandler { } // send initial data when a client first starts a subscription - if (wantNow['want-blocks']) { + if (wantNow['want-blocks'] || (parsedMessage && parsedMessage['refresh-blocks'])) { response['blocks'] = this.socketData['blocks']; } diff --git a/frontend/src/app/interfaces/websocket.interface.ts b/frontend/src/app/interfaces/websocket.interface.ts index 83a0c636e..41643fb73 100644 --- a/frontend/src/app/interfaces/websocket.interface.ts +++ b/frontend/src/app/interfaces/websocket.interface.ts @@ -31,6 +31,7 @@ export interface WebsocketResponse { 'track-rbf'?: string; 'watch-mempool'?: boolean; 'track-bisq-market'?: string; + 'refresh-blocks'?: boolean; } export interface ReplacedTransaction extends Transaction { diff --git a/frontend/src/app/services/websocket.service.ts b/frontend/src/app/services/websocket.service.ts index af7a465f8..d22717b2a 100644 --- a/frontend/src/app/services/websocket.service.ts +++ b/frontend/src/app/services/websocket.service.ts @@ -235,6 +235,8 @@ export class WebsocketService { } handleResponse(response: WebsocketResponse) { + let reinitBlocks = false; + if (response.blocks && response.blocks.length) { const blocks = response.blocks; let maxHeight = 0; @@ -256,9 +258,11 @@ export class WebsocketService { } if (response.block) { - if (response.block.height > this.stateService.latestBlockHeight) { + if (response.block.height === this.stateService.latestBlockHeight + 1) { this.stateService.updateChainTip(response.block.height); this.stateService.blocks$.next([response.block, response.txConfirmed || '']); + } else if (response.block.height > this.stateService.latestBlockHeight + 1) { + reinitBlocks = true; } if (response.txConfirmed) { @@ -369,5 +373,9 @@ export class WebsocketService { if (response['git-commit']) { this.stateService.backendInfo$.next(response['git-commit']); } + + if (reinitBlocks) { + this.websocketSubject.next({'refresh-blocks': true}); + } } }