Refactor indexer scheduling to avoid accumulating identical tasks

This commit is contained in:
Mononaut 2023-09-21 17:50:53 +01:00
parent ab8b557e73
commit 00887bc24b
No known key found for this signature in database
GPG Key ID: A3F058E41374C04E
3 changed files with 71 additions and 35 deletions

View File

@ -776,9 +776,7 @@ class Blocks {
this.updateTimerProgress(timer, `saved prices for ${this.currentBlockHeight}`);
} else {
logger.debug(`Cannot save block price for ${blockExtended.height} because the price updater hasnt completed yet. Trying again in 10 seconds.`, logger.tags.mining);
setTimeout(() => {
indexer.runSingleTask('blocksPrices');
}, 10000);
indexer.scheduleSingleTask('blocksPrices', 10000);
}
// Save blocks summary for visualization if it's enabled

View File

@ -206,7 +206,7 @@ class Server {
}
const newMempool = await bitcoinApi.$getRawMempool();
const numHandledBlocks = await blocks.$updateBlocks();
const pollRate = config.MEMPOOL.POLL_RATE_MS * (indexer.indexerRunning ? 10 : 1);
const pollRate = config.MEMPOOL.POLL_RATE_MS * (indexer.indexerIsRunning() ? 10 : 1);
if (numHandledBlocks === 0) {
await memPool.$updateMempool(newMempool, pollRate);
}

View File

@ -15,11 +15,18 @@ export interface CoreIndex {
best_block_height: number;
}
type TaskName = 'blocksPrices' | 'coinStatsIndex';
class Indexer {
runIndexer = true;
indexerRunning = false;
tasksRunning: string[] = [];
coreIndexes: CoreIndex[] = [];
private runIndexer = true;
private indexerRunning = false;
private tasksRunning: { [key in TaskName]?: boolean; } = {};
private tasksScheduled: { [key in TaskName]?: NodeJS.Timeout; } = {};
private coreIndexes: CoreIndex[] = [];
public indexerIsRunning(): boolean {
return this.indexerRunning;
}
/**
* Check which core index is available for indexing
@ -69,38 +76,69 @@ class Indexer {
}
}
public async runSingleTask(task: 'blocksPrices' | 'coinStatsIndex'): Promise<void> {
if (!Common.indexingEnabled()) {
/**
* schedules a single task to run in `timeout` ms
* only one task of each type may be scheduled
*
* @param {TaskName} task - the type of task
* @param {number} timeout - delay in ms
* @param {boolean} replace - `true` replaces any already scheduled task (works like a debounce), `false` ignores subsequent requests (works like a throttle)
*/
public scheduleSingleTask(task: TaskName, timeout: number = 10000, replace = false): void {
if (this.tasksScheduled[task]) {
if (!replace) { //throttle
return;
} else { // debounce
clearTimeout(this.tasksScheduled[task]);
}
}
this.tasksScheduled[task] = setTimeout(async () => {
try {
await this.runSingleTask(task);
} catch (e) {
logger.err(`Unexpected error in scheduled task ${task}: ` + (e instanceof Error ? e.message : e));
} finally {
clearTimeout(this.tasksScheduled[task]);
}
}, timeout);
}
/**
* Runs a single task immediately
*
* (use `scheduleSingleTask` instead to queue a task to run after some timeout)
*/
public async runSingleTask(task: TaskName): Promise<void> {
if (!Common.indexingEnabled() || this.tasksRunning[task]) {
return;
}
this.tasksRunning[task] = true;
if (task === 'blocksPrices' && !this.tasksRunning.includes(task) && !['testnet', 'signet'].includes(config.MEMPOOL.NETWORK)) {
this.tasksRunning.push(task);
let lastestPriceId;
try {
lastestPriceId = await PricesRepository.$getLatestPriceId();
} catch (e) {
logger.debug('failed to fetch latest price id from db: ' + (e instanceof Error ? e.message : e));
}
if (priceUpdater.historyInserted === false || lastestPriceId === null) {
logger.debug(`Blocks prices indexer is waiting for the price updater to complete`, logger.tags.mining);
setTimeout(() => {
this.tasksRunning = this.tasksRunning.filter(runningTask => runningTask !== task);
this.runSingleTask('blocksPrices');
}, 10000);
} else {
logger.debug(`Blocks prices indexer will run now`, logger.tags.mining);
await mining.$indexBlockPrices();
this.tasksRunning = this.tasksRunning.filter(runningTask => runningTask !== task);
}
switch (task) {
case 'blocksPrices': {
if (!['testnet', 'signet'].includes(config.MEMPOOL.NETWORK)) {
let lastestPriceId;
try {
lastestPriceId = await PricesRepository.$getLatestPriceId();
} catch (e) {
logger.debug('failed to fetch latest price id from db: ' + (e instanceof Error ? e.message : e));
} if (priceUpdater.historyInserted === false || lastestPriceId === null) {
logger.debug(`Blocks prices indexer is waiting for the price updater to complete`, logger.tags.mining);
this.scheduleSingleTask(task, 10000);
} else {
logger.debug(`Blocks prices indexer will run now`, logger.tags.mining);
await mining.$indexBlockPrices();
}
}
} break;
case 'coinStatsIndex': {
logger.debug(`Indexing coinStatsIndex now`);
await mining.$indexCoinStatsIndex();
} break;
}
if (task === 'coinStatsIndex' && !this.tasksRunning.includes(task)) {
this.tasksRunning.push(task);
logger.debug(`Indexing coinStatsIndex now`);
await mining.$indexCoinStatsIndex();
this.tasksRunning = this.tasksRunning.filter(runningTask => runningTask !== task);
}
this.tasksRunning[task] = false;
}
public async $run(): Promise<void> {