From 590f1d2b040b2c01765516374540626680278d96 Mon Sep 17 00:00:00 2001 From: softsimon Date: Mon, 13 Apr 2020 02:06:10 +0700 Subject: [PATCH] Improvments to the mempool transaction subscription. --- backend/src/api/websocket-handler.ts | 25 +++++++++++++++---- backend/src/interfaces.ts | 8 +++++- .../transaction/transaction.component.ts | 6 ++--- .../src/app/interfaces/websocket.interface.ts | 1 + .../src/app/services/websocket.service.ts | 5 ++++ 5 files changed, 36 insertions(+), 9 deletions(-) diff --git a/backend/src/api/websocket-handler.ts b/backend/src/api/websocket-handler.ts index 1e742ae4a..6d6b2cd56 100644 --- a/backend/src/api/websocket-handler.ts +++ b/backend/src/api/websocket-handler.ts @@ -2,7 +2,7 @@ const config = require('../../mempool-config.json'); import * as WebSocket from 'ws'; import * as fs from 'fs'; -import { Block, TransactionExtended, Statistic } from '../interfaces'; +import { Block, TransactionExtended, Statistic, WebsocketResponse } from '../interfaces'; import blocks from './blocks'; import memPool from './mempool'; import mempoolBlocks from './mempool-blocks'; @@ -36,7 +36,8 @@ class WebsocketHandler { this.wss.on('connection', (client: WebSocket) => { client.on('message', (message: any) => { try { - const parsedMessage = JSON.parse(message); + const parsedMessage: WebsocketResponse = JSON.parse(message); + const response = {}; if (parsedMessage.action === 'want') { client['want-blocks'] = parsedMessage.data.indexOf('blocks') > -1; @@ -48,6 +49,15 @@ class WebsocketHandler { if (parsedMessage && parsedMessage['track-tx']) { if (/^[a-fA-F0-9]{64}$/.test(parsedMessage['track-tx'])) { client['track-tx'] = parsedMessage['track-tx']; + // Client is telling the transaction wasn't found but it might have appeared before we had the time to start watching for it + if (parsedMessage['watch-mempool']) { + const tx = memPool.getMempool()[client['track-tx']]; + if (tx) { + response['tx'] = tx; + } else { + client['track-mempool-tx'] = parsedMessage['track-tx']; + } + } } else { client['track-tx'] = null; } @@ -78,7 +88,11 @@ class WebsocketHandler { } if (parsedMessage.action === 'ping') { - client.send(JSON.stringify({'pong': true})); + response['pong'] = true; + } + + if (Object.keys(response).length) { + client.send(JSON.stringify(response)); } } catch (e) { console.log(e); @@ -133,10 +147,11 @@ class WebsocketHandler { response['mempool-blocks'] = mBlocks; } - if (client['track-tx']) { - const tx = newTransactions.find((t) => t.txid === client['track-tx']); + if (client['track-mempool-tx']) { + const tx = newTransactions.find((t) => t.txid === client['track-mempool-tx']); if (tx) { response['tx'] = tx; + client['track-mempool-tx'] = null; } } diff --git a/backend/src/interfaces.ts b/backend/src/interfaces.ts index 7396efa6a..b67c2b094 100644 --- a/backend/src/interfaces.ts +++ b/backend/src/interfaces.ts @@ -180,4 +180,10 @@ export interface Outspend { vin: number; status: Status; } - +export interface WebsocketResponse { + action: string; + data: string[]; + 'track-tx': string; + 'track-address': string; + 'watch-mempool': boolean; +} diff --git a/frontend/src/app/components/transaction/transaction.component.ts b/frontend/src/app/components/transaction/transaction.component.ts index a85b93fc4..412a865d4 100644 --- a/frontend/src/app/components/transaction/transaction.component.ts +++ b/frontend/src/app/components/transaction/transaction.component.ts @@ -1,9 +1,9 @@ import { Component, OnInit, OnDestroy } from '@angular/core'; import { ElectrsApiService } from '../../services/electrs-api.service'; import { ActivatedRoute, ParamMap } from '@angular/router'; -import { switchMap, filter, take, catchError, mergeMap, flatMap, mergeAll, tap, map } from 'rxjs/operators'; +import { switchMap, filter, take, catchError, flatMap } from 'rxjs/operators'; import { Transaction, Block } from '../../interfaces/electrs.interface'; -import { of, merge, Subscription, Observable, scheduled } from 'rxjs'; +import { of, merge, Subscription, Observable } from 'rxjs'; import { StateService } from '../../services/state.service'; import { WebsocketService } from '../../services/websocket.service'; import { AudioService } from 'src/app/services/audio.service'; @@ -122,7 +122,7 @@ export class TransactionComponent implements OnInit, OnDestroy { handleLoadElectrsTransactionError(error: any): Observable { if (error.status === 404 && /^[a-fA-F0-9]{64}$/.test(this.txId)) { - this.websocketService.startTrackTransaction(this.txId); + this.websocketService.startMultiTrackTransaction(this.txId); this.waitingForTransaction = true; } this.error = error; diff --git a/frontend/src/app/interfaces/websocket.interface.ts b/frontend/src/app/interfaces/websocket.interface.ts index 2ae01e064..15b557746 100644 --- a/frontend/src/app/interfaces/websocket.interface.ts +++ b/frontend/src/app/interfaces/websocket.interface.ts @@ -13,6 +13,7 @@ export interface WebsocketResponse { tx?: Transaction; 'track-tx'?: string; 'track-address'?: string; + 'watch-mempool'?: boolean; } export interface MempoolBlock { diff --git a/frontend/src/app/services/websocket.service.ts b/frontend/src/app/services/websocket.service.ts index e549c7c13..a64a35bc0 100644 --- a/frontend/src/app/services/websocket.service.ts +++ b/frontend/src/app/services/websocket.service.ts @@ -133,6 +133,11 @@ export class WebsocketService { this.isTrackingTx = true; } + startMultiTrackTransaction(txId: string) { + this.websocketSubject.next({ 'track-tx': txId, 'watch-mempool': true }); + this.isTrackingTx = true; + } + stopTrackingTransaction() { if (this.isTrackingTx === false) { return;