Live 2H graph is now fetched through the websocket.

Tell the web socket what to fetch with "want" request.
This commit is contained in:
Simon Lindh 2019-07-26 12:48:32 +03:00
parent 2b6426a126
commit 8dd58db42a
10 changed files with 122 additions and 105 deletions

View file

@ -41,7 +41,7 @@ class Mempool {
return this.vBytesPerSecond; return this.vBytesPerSecond;
} }
public async getMemPoolInfo() { public async updateMemPoolInfo() {
try { try {
this.mempoolInfo = await bitcoinApi.getMempoolInfo(); this.mempoolInfo = await bitcoinApi.getMempoolInfo();
} catch (err) { } catch (err) {

View file

@ -5,6 +5,11 @@ import { ITransaction, IMempoolStats } from '../interfaces';
class Statistics { class Statistics {
protected intervalTimer: NodeJS.Timer | undefined; protected intervalTimer: NodeJS.Timer | undefined;
protected newStatisticsEntryCallback: Function | undefined;
public setNewStatisticsEntryCallback(fn: Function) {
this.newStatisticsEntryCallback = fn;
}
constructor() { constructor() {
} }
@ -21,7 +26,7 @@ class Statistics {
}, difference); }, difference);
} }
private runStatistics(): void { private async runStatistics(): Promise<void> {
const currentMempool = memPool.getMempool(); const currentMempool = memPool.getMempool();
const txPerSecond = memPool.getTxPerSecond(); const txPerSecond = memPool.getTxPerSecond();
const vBytesPerSecond = memPool.getVBytesPerSecond(); const vBytesPerSecond = memPool.getVBytesPerSecond();
@ -81,7 +86,7 @@ class Statistics {
} }
}); });
this.$create({ const insertId = await this.$create({
added: 'NOW()', added: 'NOW()',
unconfirmed_transactions: memPoolArray.length, unconfirmed_transactions: memPoolArray.length,
tx_per_second: txPerSecond, tx_per_second: txPerSecond,
@ -131,9 +136,14 @@ class Statistics {
vsize_1800: weightVsizeFees['1800'] || 0, vsize_1800: weightVsizeFees['1800'] || 0,
vsize_2000: weightVsizeFees['2000'] || 0, vsize_2000: weightVsizeFees['2000'] || 0,
}); });
if (this.newStatisticsEntryCallback && insertId) {
const newStats = await this.$get(insertId);
this.newStatisticsEntryCallback(newStats);
}
} }
private async $create(statistics: IMempoolStats): Promise<void> { private async $create(statistics: IMempoolStats): Promise<number | undefined> {
try { try {
const connection = await DB.pool.getConnection(); const connection = await DB.pool.getConnection();
const query = `INSERT INTO statistics( const query = `INSERT INTO statistics(
@ -232,26 +242,14 @@ class Statistics {
statistics.vsize_1800, statistics.vsize_1800,
statistics.vsize_2000, statistics.vsize_2000,
]; ];
await connection.query(query, params); const [result]: any = await connection.query(query, params);
connection.release(); connection.release();
return result.insertId;
} catch (e) { } catch (e) {
console.log('$create() error', e); console.log('$create() error', e);
} }
} }
public async $listLatestFromId(fromId: number): Promise<IMempoolStats[]> {
try {
const connection = await DB.pool.getConnection();
const query = `SELECT * FROM statistics WHERE id > ? ORDER BY id DESC`;
const [rows] = await connection.query<any>(query, [fromId]);
connection.release();
return rows;
} catch (e) {
console.log('$listLatestFromId() error', e);
return [];
}
}
private getQueryForDays(days: number, groupBy: number) { private getQueryForDays(days: number, groupBy: number) {
return `SELECT id, added, unconfirmed_transactions, return `SELECT id, added, unconfirmed_transactions,
@ -297,6 +295,18 @@ class Statistics {
AVG(vsize_2000) AS vsize_2000 FROM statistics GROUP BY UNIX_TIMESTAMP(added) DIV ${groupBy} ORDER BY id DESC LIMIT ${days}`; AVG(vsize_2000) AS vsize_2000 FROM statistics GROUP BY UNIX_TIMESTAMP(added) DIV ${groupBy} ORDER BY id DESC LIMIT ${days}`;
} }
public async $get(id: number): Promise<IMempoolStats | undefined> {
try {
const connection = await DB.pool.getConnection();
const query = `SELECT * FROM statistics WHERE id = ?`;
const [rows] = await connection.query<any>(query, [id]);
connection.release();
return rows[0];
} catch (e) {
console.log('$list2H() error', e);
}
}
public async $list2H(): Promise<IMempoolStats[]> { public async $list2H(): Promise<IMempoolStats[]> {
try { try {
const connection = await DB.pool.getConnection(); const connection = await DB.pool.getConnection();

View file

@ -12,7 +12,7 @@ import memPool from './api/mempool';
import blocks from './api/blocks'; import blocks from './api/blocks';
import projectedBlocks from './api/projected-blocks'; import projectedBlocks from './api/projected-blocks';
import statistics from './api/statistics'; import statistics from './api/statistics';
import { IBlock, IMempool } from './interfaces'; import { IBlock, IMempool, ITransaction, IMempoolStats } from './interfaces';
import routes from './routes'; import routes from './routes';
import fiatConversion from './api/fiat-conversion'; import fiatConversion from './api/fiat-conversion';
@ -57,7 +57,7 @@ class MempoolSpace {
private async runMempoolIntervalFunctions() { private async runMempoolIntervalFunctions() {
await blocks.updateBlocks(); await blocks.updateBlocks();
await memPool.getMemPoolInfo(); await memPool.updateMemPoolInfo();
await memPool.updateMempool(); await memPool.updateMempool();
setTimeout(this.runMempoolIntervalFunctions.bind(this), config.MEMPOOL_REFRESH_RATE_MS); setTimeout(this.runMempoolIntervalFunctions.bind(this), config.MEMPOOL_REFRESH_RATE_MS);
} }
@ -79,7 +79,6 @@ class MempoolSpace {
this.wss.on('connection', (client: WebSocket) => { this.wss.on('connection', (client: WebSocket) => {
let theBlocks = blocks.getBlocks(); let theBlocks = blocks.getBlocks();
theBlocks = theBlocks.concat([]).splice(theBlocks.length - config.INITIAL_BLOCK_AMOUNT); theBlocks = theBlocks.concat([]).splice(theBlocks.length - config.INITIAL_BLOCK_AMOUNT);
const formatedBlocks = theBlocks.map((b) => blocks.formatBlock(b)); const formatedBlocks = theBlocks.map((b) => blocks.formatBlock(b));
client.send(JSON.stringify({ client.send(JSON.stringify({
@ -94,6 +93,14 @@ class MempoolSpace {
client.on('message', async (message: any) => { client.on('message', async (message: any) => {
try { try {
const parsedMessage = JSON.parse(message); const parsedMessage = JSON.parse(message);
if (parsedMessage.action === 'want') {
client['want-stats'] = parsedMessage.data.indexOf('stats') > -1;
client['want-blocks'] = parsedMessage.data.indexOf('blocks') > -1;
client['want-projected-blocks'] = parsedMessage.data.indexOf('projected-blocks') > -1;
client['want-live-2h-chart'] = parsedMessage.data.indexOf('live-2h-chart') > -1;
}
if (parsedMessage.action === 'track-tx' && parsedMessage.txId && /^[a-fA-F0-9]{64}$/.test(parsedMessage.txId)) { if (parsedMessage.action === 'track-tx' && parsedMessage.txId && /^[a-fA-F0-9]{64}$/.test(parsedMessage.txId)) {
const tx = await memPool.getRawTransaction(parsedMessage.txId); const tx = await memPool.getRawTransaction(parsedMessage.txId);
if (tx) { if (tx) {
@ -168,26 +175,29 @@ class MempoolSpace {
return; return;
} }
const response = {};
if (client['trackingTx'] === true && client['blockHeight'] === 0) { if (client['trackingTx'] === true && client['blockHeight'] === 0) {
if (block.tx.some((tx) => tx === client['txId'])) { if (block.tx.some((tx: ITransaction) => tx === client['txId'])) {
client['blockHeight'] = block.height; client['blockHeight'] = block.height;
} }
} }
client.send(JSON.stringify({ response['track-tx'] = {
'block': formattedBlocks, tracking: client['trackingTx'] || false,
'track-tx': { blockHeight: client['blockHeight'],
tracking: client['trackingTx'] || false, };
blockHeight: client['blockHeight'],
} response['block'] = formattedBlocks;
}));
client.send(JSON.stringify(response));
}); });
}); });
memPool.setMempoolChangedCallback((newMempool: IMempool) => { memPool.setMempoolChangedCallback((newMempool: IMempool) => {
projectedBlocks.updateProjectedBlocks(newMempool); projectedBlocks.updateProjectedBlocks(newMempool);
let pBlocks = projectedBlocks.getProjectedBlocks(); const pBlocks = projectedBlocks.getProjectedBlocks();
const mempoolInfo = memPool.getMempoolInfo(); const mempoolInfo = memPool.getMempoolInfo();
const txPerSecond = memPool.getTxPerSecond(); const txPerSecond = memPool.getTxPerSecond();
const vBytesPerSecond = memPool.getVBytesPerSecond(); const vBytesPerSecond = memPool.getVBytesPerSecond();
@ -197,20 +207,41 @@ class MempoolSpace {
return; return;
} }
if (client['trackingTx'] && client['blockHeight'] === 0) { const response = {};
pBlocks = projectedBlocks.getProjectedBlocks(client['txId']);
}
client.send(JSON.stringify({ if (client['want-stats']) {
'projectedBlocks': pBlocks, response['mempoolInfo'] = mempoolInfo;
'mempoolInfo': mempoolInfo, response['txPerSecond'] = txPerSecond;
'txPerSecond': txPerSecond, response['vBytesPerSecond'] = vBytesPerSecond;
'vBytesPerSecond': vBytesPerSecond, response['track-tx'] = {
'track-tx': {
tracking: client['trackingTx'] || false, tracking: client['trackingTx'] || false,
blockHeight: client['blockHeight'], blockHeight: client['blockHeight'],
} };
})); }
if (client['want-projected-blocks'] && client['trackingTx'] && client['blockHeight'] === 0) {
response['projectedBlocks'] = projectedBlocks.getProjectedBlocks(client['txId']);
} else if (client['want-projected-blocks']) {
response['projectedBlocks'] = pBlocks;
}
if (Object.keys(response).length) {
client.send(JSON.stringify(response));
}
});
});
statistics.setNewStatisticsEntryCallback((stats: IMempoolStats) => {
this.wss.clients.forEach((client: WebSocket) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
if (client['want-live-2h-chart']) {
client.send(JSON.stringify({
'live-2h-chart': stats
}));
}
}); });
}); });
} }
@ -220,7 +251,6 @@ class MempoolSpace {
.get(config.API_ENDPOINT + 'transactions/height/:id', routes.$getgetTransactionsForBlock) .get(config.API_ENDPOINT + 'transactions/height/:id', routes.$getgetTransactionsForBlock)
.get(config.API_ENDPOINT + 'transactions/projected/:id', routes.getgetTransactionsForProjectedBlock) .get(config.API_ENDPOINT + 'transactions/projected/:id', routes.getgetTransactionsForProjectedBlock)
.get(config.API_ENDPOINT + 'fees/recommended', routes.getRecommendedFees) .get(config.API_ENDPOINT + 'fees/recommended', routes.getRecommendedFees)
.get(config.API_ENDPOINT + 'statistics/live', routes.getLiveResult)
.get(config.API_ENDPOINT + 'statistics/2h', routes.get2HStatistics) .get(config.API_ENDPOINT + 'statistics/2h', routes.get2HStatistics)
.get(config.API_ENDPOINT + 'statistics/24h', routes.get24HStatistics) .get(config.API_ENDPOINT + 'statistics/24h', routes.get24HStatistics)
.get(config.API_ENDPOINT + 'statistics/1w', routes.get1WHStatistics) .get(config.API_ENDPOINT + 'statistics/1w', routes.get1WHStatistics)

View file

@ -5,11 +5,6 @@ import projectedBlocks from './api/projected-blocks';
class Routes { class Routes {
constructor() {} constructor() {}
public async getLiveResult(req, res) {
const result = await statistics.$listLatestFromId(req.query.lastId);
res.send(result);
}
public async get2HStatistics(req, res) { public async get2HStatistics(req, res) {
const result = await statistics.$list2H(); const result = await statistics.$list2H();
res.send(result); res.send(result);

View file

@ -1,4 +1,5 @@
import { Component, OnInit } from '@angular/core'; import { Component, OnInit } from '@angular/core';
import { ApiService } from '../services/api.service';
@Component({ @Component({
selector: 'app-about', selector: 'app-about',
@ -7,9 +8,12 @@ import { Component, OnInit } from '@angular/core';
}) })
export class AboutComponent implements OnInit { export class AboutComponent implements OnInit {
constructor() { } constructor(
private apiService: ApiService,
) { }
ngOnInit() { ngOnInit() {
this.apiService.sendWebSocket({'action': 'want', data: []});
} }
} }

View file

@ -26,6 +26,8 @@ export class BlockchainComponent implements OnInit, OnDestroy {
) {} ) {}
ngOnInit() { ngOnInit() {
this.apiService.sendWebSocket({'action': 'want', data: ['stats', 'blocks', 'projected-blocks']});
this.txTrackingSubscription = this.memPoolService.txTracking$ this.txTrackingSubscription = this.memPoolService.txTracking$
.subscribe((response: ITxTracking) => { .subscribe((response: ITxTracking) => {
this.txTrackingLoading = false; this.txTrackingLoading = false;

View file

@ -12,6 +12,7 @@ export interface IMempoolDefaultResponse {
blocks?: IBlock[]; blocks?: IBlock[];
block?: IBlock; block?: IBlock;
projectedBlocks?: IProjectedBlock[]; projectedBlocks?: IProjectedBlock[];
'live-2h-chart'?: IMempoolStats;
txPerSecond?: number; txPerSecond?: number;
vBytesPerSecond: number; vBytesPerSecond: number;
'track-tx'?: ITrackTx; 'track-tx'?: ITrackTx;

View file

@ -13,7 +13,7 @@ const API_BASE_URL = '/api/v1';
providedIn: 'root' providedIn: 'root'
}) })
export class ApiService { export class ApiService {
private websocketSubject: Observable<IMempoolDefaultResponse> = webSocket<IMempoolDefaultResponse | any>(WEB_SOCKET_URL) private websocketSubject: Observable<IMempoolDefaultResponse> = webSocket<IMempoolDefaultResponse | any>(WEB_SOCKET_URL);
constructor( constructor(
private httpClient: HttpClient, private httpClient: HttpClient,
@ -91,12 +91,16 @@ export class ApiService {
notFound: txShowTxNotFound, notFound: txShowTxNotFound,
}); });
} }
}),
if (response['live-2h-chart']) {
this.memPoolService.live2Chart$.next(response['live-2h-chart']);
}
},
(err: Error) => { (err: Error) => {
console.log(err); console.log(err);
console.log('Error, retrying in 10 sec'); console.log('Error, retrying in 10 sec');
setTimeout(() => this.startSubscription(), 10000); setTimeout(() => this.startSubscription(), 10000);
}; });
} }
sendWebSocket(data: any) { sendWebSocket(data: any) {
@ -112,15 +116,6 @@ export class ApiService {
return this.httpClient.get<IBlockTransaction[]>(API_BASE_URL + '/transactions/projected/' + index); return this.httpClient.get<IBlockTransaction[]>(API_BASE_URL + '/transactions/projected/' + index);
} }
listLiveStatistics$(lastId: number): Observable<IMempoolStats[]> {
const params = new HttpParams()
.set('lastId', lastId.toString());
return this.httpClient.get<IMempoolStats[]>(API_BASE_URL + '/statistics/live', {
params: params
});
}
list2HStatistics$(): Observable<IMempoolStats[]> { list2HStatistics$(): Observable<IMempoolStats[]> {
return this.httpClient.get<IMempoolStats[]>(API_BASE_URL + '/statistics/2h'); return this.httpClient.get<IMempoolStats[]>(API_BASE_URL + '/statistics/2h');
} }

View file

@ -1,6 +1,6 @@
import { Injectable } from '@angular/core'; import { Injectable } from '@angular/core';
import { ReplaySubject, BehaviorSubject } from 'rxjs'; import { ReplaySubject, BehaviorSubject, Subject } from 'rxjs';
import { IMempoolInfo, IBlock, IProjectedBlock, ITransaction } from '../blockchain/interfaces'; import { IMempoolInfo, IBlock, IProjectedBlock, ITransaction, IMempoolStats } from '../blockchain/interfaces';
export interface IMemPoolState { export interface IMemPoolState {
memPoolInfo: IMempoolInfo; memPoolInfo: IMempoolInfo;
@ -24,6 +24,7 @@ export class MemPoolService {
txIdSearch$ = new ReplaySubject<string>(); txIdSearch$ = new ReplaySubject<string>();
conversions$ = new ReplaySubject<any>(); conversions$ = new ReplaySubject<any>();
mempoolWeight$ = new ReplaySubject<number>(); mempoolWeight$ = new ReplaySubject<number>();
live2Chart$ = new Subject<IMempoolStats>();
txTracking$ = new BehaviorSubject<ITxTracking>({ txTracking$ = new BehaviorSubject<ITxTracking>({
enabled: false, enabled: false,
tx: null, tx: null,

View file

@ -9,6 +9,7 @@ import { IMempoolStats } from '../blockchain/interfaces';
import { Subject, of, merge} from 'rxjs'; import { Subject, of, merge} from 'rxjs';
import { switchMap, tap } from 'rxjs/operators'; import { switchMap, tap } from 'rxjs/operators';
import { ActivatedRoute } from '@angular/router'; import { ActivatedRoute } from '@angular/router';
import { MemPoolService } from '../services/mem-pool.service';
@Component({ @Component({
selector: 'app-statistics', selector: 'app-statistics',
@ -32,14 +33,13 @@ export class StatisticsComponent implements OnInit {
radioGroupForm: FormGroup; radioGroupForm: FormGroup;
reloadData$: Subject<any> = new Subject();
constructor( constructor(
private apiService: ApiService, private apiService: ApiService,
@Inject(LOCALE_ID) private locale: string, @Inject(LOCALE_ID) private locale: string,
private bytesPipe: BytesPipe, private bytesPipe: BytesPipe,
private formBuilder: FormBuilder, private formBuilder: FormBuilder,
private route: ActivatedRoute, private route: ActivatedRoute,
private memPoolService: MemPoolService,
) { ) {
this.radioGroupForm = this.formBuilder.group({ this.radioGroupForm = this.formBuilder.group({
'dateSpan': '2h' 'dateSpan': '2h'
@ -47,19 +47,6 @@ export class StatisticsComponent implements OnInit {
} }
ngOnInit() { ngOnInit() {
const now = new Date();
const nextInterval = new Date(now.getFullYear(), now.getMonth(), now.getDate(), now.getHours(),
Math.floor(now.getMinutes() / 1) * 1 + 1, 0, 0);
const difference = nextInterval.getTime() - now.getTime();
setTimeout(() => {
setInterval(() => {
if (this.radioGroupForm.controls['dateSpan'].value === '2h') {
this.reloadData$.next();
}
}, 60 * 1000);
}, difference + 1000); // Next whole minute + 1 second
const labelInterpolationFnc = (value: any, index: any) => { const labelInterpolationFnc = (value: any, index: any) => {
const nr = 6; const nr = 6;
@ -156,7 +143,6 @@ export class StatisticsComponent implements OnInit {
merge( merge(
of(''), of(''),
this.reloadData$,
this.radioGroupForm.controls['dateSpan'].valueChanges this.radioGroupForm.controls['dateSpan'].valueChanges
.pipe( .pipe(
tap(() => { tap(() => {
@ -167,46 +153,39 @@ export class StatisticsComponent implements OnInit {
.pipe( .pipe(
switchMap(() => { switchMap(() => {
this.spinnerLoading = true; this.spinnerLoading = true;
if (this.radioGroupForm.controls['dateSpan'].value === '6m') { if (this.radioGroupForm.controls['dateSpan'].value === '2h') {
return this.apiService.list6MStatistics$(); this.apiService.sendWebSocket({'action': 'want', data: ['live-2h-chart']});
return this.apiService.list2HStatistics$();
} }
if (this.radioGroupForm.controls['dateSpan'].value === '3m') { this.apiService.sendWebSocket({'action': 'want', data: ['']});
return this.apiService.list3MStatistics$(); if (this.radioGroupForm.controls['dateSpan'].value === '24h') {
} return this.apiService.list24HStatistics$();
if (this.radioGroupForm.controls['dateSpan'].value === '1m') {
return this.apiService.list1MStatistics$();
} }
if (this.radioGroupForm.controls['dateSpan'].value === '1w') { if (this.radioGroupForm.controls['dateSpan'].value === '1w') {
return this.apiService.list1WStatistics$(); return this.apiService.list1WStatistics$();
} }
if (this.radioGroupForm.controls['dateSpan'].value === '24h') { if (this.radioGroupForm.controls['dateSpan'].value === '1m') {
return this.apiService.list24HStatistics$(); return this.apiService.list1MStatistics$();
} }
if (this.radioGroupForm.controls['dateSpan'].value === '2h' && !this.mempoolStats.length) { if (this.radioGroupForm.controls['dateSpan'].value === '3m') {
return this.apiService.list2HStatistics$(); return this.apiService.list3MStatistics$();
} }
const lastId = this.mempoolStats[0].id; return this.apiService.list6MStatistics$();
return this.apiService.listLiveStatistics$(lastId);
}) })
) )
.subscribe((mempoolStats) => { .subscribe((mempoolStats) => {
let hasChange = false; this.mempoolStats = mempoolStats;
if (this.radioGroupForm.controls['dateSpan'].value === '2h' && this.mempoolStats.length) { this.handleNewMempoolData(this.mempoolStats.concat([]));
if (mempoolStats.length) {
this.mempoolStats = mempoolStats.concat(this.mempoolStats);
this.mempoolStats = this.mempoolStats.slice(0, this.mempoolStats.length - mempoolStats.length);
hasChange = true;
}
} else {
this.mempoolStats = mempoolStats;
hasChange = true;
}
if (hasChange) {
this.handleNewMempoolData(this.mempoolStats.concat([]));
}
this.loading = false; this.loading = false;
this.spinnerLoading = false; this.spinnerLoading = false;
}); });
this.memPoolService.live2Chart$
.subscribe((mempoolStats) => {
this.mempoolStats.unshift(mempoolStats);
this.mempoolStats = this.mempoolStats.slice(0, this.mempoolStats.length - 1);
this.handleNewMempoolData(this.mempoolStats.concat([]));
});
} }
handleNewMempoolData(mempoolStats: IMempoolStats[]) { handleNewMempoolData(mempoolStats: IMempoolStats[]) {