mirror of
https://github.com/mempool/mempool.git
synced 2025-02-22 14:22:44 +01:00
Make cpfp db save operations atomic
This commit is contained in:
parent
9ff5ce0d37
commit
1f442b9ea6
4 changed files with 65 additions and 62 deletions
|
@ -1050,9 +1050,13 @@ class Blocks {
|
|||
}
|
||||
|
||||
public async $saveCpfp(hash: string, height: number, cpfpSummary: CpfpSummary): Promise<void> {
|
||||
const result = await cpfpRepository.$batchSaveClusters(cpfpSummary.clusters);
|
||||
if (!result) {
|
||||
await cpfpRepository.$insertProgressMarker(height);
|
||||
try {
|
||||
const result = await cpfpRepository.$batchSaveClusters(cpfpSummary.clusters);
|
||||
if (!result) {
|
||||
await cpfpRepository.$insertProgressMarker(height);
|
||||
}
|
||||
} catch (e) {
|
||||
// not a fatal error, we'll try again next time the indexer runs
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ import { FieldPacket, OkPacket, PoolOptions, ResultSetHeader, RowDataPacket } fr
|
|||
}
|
||||
|
||||
public async query<T extends RowDataPacket[][] | RowDataPacket[] | OkPacket |
|
||||
OkPacket[] | ResultSetHeader>(query, params?): Promise<[T, FieldPacket[]]>
|
||||
OkPacket[] | ResultSetHeader>(query, params?, connection?: PoolConnection): Promise<[T, FieldPacket[]]>
|
||||
{
|
||||
this.checkDBFlag();
|
||||
let hardTimeout;
|
||||
|
@ -45,7 +45,9 @@ import { FieldPacket, OkPacket, PoolOptions, ResultSetHeader, RowDataPacket } fr
|
|||
reject(new Error(`DB query failed to return, reject or time out within ${hardTimeout / 1000}s - ${query?.sql?.slice(0, 160) || (typeof(query) === 'string' || query instanceof String ? query?.slice(0, 160) : 'unknown query')}`));
|
||||
}, hardTimeout);
|
||||
|
||||
this.getPool().then(pool => {
|
||||
// Use a specific connection if provided, otherwise delegate to the pool
|
||||
const connectionPromise = connection ? Promise.resolve(connection) : this.getPool();
|
||||
connectionPromise.then((pool: PoolConnection | Pool) => {
|
||||
return pool.query(query, params) as Promise<[T, FieldPacket[]]>;
|
||||
}).then(result => {
|
||||
resolve(result);
|
||||
|
@ -61,6 +63,33 @@ import { FieldPacket, OkPacket, PoolOptions, ResultSetHeader, RowDataPacket } fr
|
|||
}
|
||||
}
|
||||
|
||||
public async $atomicQuery<T extends RowDataPacket[][] | RowDataPacket[] | OkPacket |
|
||||
OkPacket[] | ResultSetHeader>(queries: { query, params }[]): Promise<[T, FieldPacket[]][]>
|
||||
{
|
||||
const pool = await this.getPool();
|
||||
const connection = await pool.getConnection();
|
||||
try {
|
||||
await connection.beginTransaction();
|
||||
|
||||
const results: [T, FieldPacket[]][] = [];
|
||||
for (const query of queries) {
|
||||
const result = await this.query(query.query, query.params, connection) as [T, FieldPacket[]];
|
||||
results.push(result);
|
||||
}
|
||||
|
||||
await connection.commit();
|
||||
|
||||
return results;
|
||||
} catch (e) {
|
||||
logger.err('Could not complete db transaction, rolling back: ' + (e instanceof Error ? e.message : e));
|
||||
connection.rollback();
|
||||
connection.release();
|
||||
throw e;
|
||||
} finally {
|
||||
connection.release();
|
||||
}
|
||||
}
|
||||
|
||||
public async checkDbConnection() {
|
||||
this.checkDBFlag();
|
||||
try {
|
||||
|
|
|
@ -5,52 +5,10 @@ import { Ancestor, CpfpCluster } from '../mempool.interfaces';
|
|||
import transactionRepository from '../repositories/TransactionRepository';
|
||||
|
||||
class CpfpRepository {
|
||||
public async $saveCluster(clusterRoot: string, height: number, txs: Ancestor[], effectiveFeePerVsize: number): Promise<boolean> {
|
||||
if (!txs[0]) {
|
||||
return false;
|
||||
}
|
||||
// skip clusters of transactions with the same fees
|
||||
const roundedEffectiveFee = Math.round(effectiveFeePerVsize * 100) / 100;
|
||||
const equalFee = txs.length > 1 && txs.reduce((acc, tx) => {
|
||||
return (acc && Math.round(((tx.fee || 0) / (tx.weight / 4)) * 100) / 100 === roundedEffectiveFee);
|
||||
}, true);
|
||||
if (equalFee) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
const packedTxs = Buffer.from(this.pack(txs));
|
||||
await DB.query(
|
||||
`
|
||||
INSERT INTO compact_cpfp_clusters(root, height, txs, fee_rate)
|
||||
VALUE (UNHEX(?), ?, ?, ?)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
height = ?,
|
||||
txs = ?,
|
||||
fee_rate = ?
|
||||
`,
|
||||
[clusterRoot, height, packedTxs, effectiveFeePerVsize, height, packedTxs, effectiveFeePerVsize]
|
||||
);
|
||||
const maxChunk = 10;
|
||||
let chunkIndex = 0;
|
||||
while (chunkIndex < txs.length) {
|
||||
const chunk = txs.slice(chunkIndex, chunkIndex + maxChunk).map(tx => {
|
||||
return { txid: tx.txid, cluster: clusterRoot };
|
||||
});
|
||||
await transactionRepository.$batchSetCluster(chunk);
|
||||
chunkIndex += maxChunk;
|
||||
}
|
||||
return true;
|
||||
} catch (e: any) {
|
||||
logger.err(`Cannot save cpfp cluster into db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public async $batchSaveClusters(clusters: { root: string, height: number, txs: Ancestor[], effectiveFeePerVsize: number }[]): Promise<boolean> {
|
||||
try {
|
||||
const clusterValues: any[] = [];
|
||||
const txs: any[] = [];
|
||||
const clusterValues: [string, number, Buffer, number][] = [];
|
||||
const txs: { txid: string, cluster: string }[] = [];
|
||||
|
||||
for (const cluster of clusters) {
|
||||
if (cluster.txs?.length) {
|
||||
|
@ -76,6 +34,8 @@ class CpfpRepository {
|
|||
return false;
|
||||
}
|
||||
|
||||
const queries: { query, params }[] = [];
|
||||
|
||||
const maxChunk = 100;
|
||||
let chunkIndex = 0;
|
||||
// insert clusters in batches of up to 100 rows
|
||||
|
@ -89,10 +49,10 @@ class CpfpRepository {
|
|||
return (' (UNHEX(?), ?, ?, ?)');
|
||||
}) + ';';
|
||||
const values = chunk.flat();
|
||||
await DB.query(
|
||||
queries.push({
|
||||
query,
|
||||
values
|
||||
);
|
||||
params: values,
|
||||
});
|
||||
chunkIndex += maxChunk;
|
||||
}
|
||||
|
||||
|
@ -100,10 +60,12 @@ class CpfpRepository {
|
|||
// insert transactions in batches of up to 100 rows
|
||||
while (chunkIndex < txs.length) {
|
||||
const chunk = txs.slice(chunkIndex, chunkIndex + maxChunk);
|
||||
await transactionRepository.$batchSetCluster(chunk);
|
||||
queries.push(transactionRepository.buildBatchSetQuery(chunk));
|
||||
chunkIndex += maxChunk;
|
||||
}
|
||||
|
||||
await DB.$atomicQuery(queries);
|
||||
|
||||
return true;
|
||||
} catch (e: any) {
|
||||
logger.err(`Cannot save cpfp clusters into db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||
|
|
|
@ -25,9 +25,8 @@ class TransactionRepository {
|
|||
}
|
||||
}
|
||||
|
||||
public async $batchSetCluster(txs): Promise<void> {
|
||||
try {
|
||||
let query = `
|
||||
public buildBatchSetQuery(txs: { txid: string, cluster: string }[]): { query, params } {
|
||||
let query = `
|
||||
INSERT IGNORE INTO compact_transactions
|
||||
(
|
||||
txid,
|
||||
|
@ -35,13 +34,22 @@ class TransactionRepository {
|
|||
)
|
||||
VALUES
|
||||
`;
|
||||
query += txs.map(tx => {
|
||||
return (' (UNHEX(?), UNHEX(?))');
|
||||
}) + ';';
|
||||
const values = txs.map(tx => [tx.txid, tx.cluster]).flat();
|
||||
query += txs.map(tx => {
|
||||
return (' (UNHEX(?), UNHEX(?))');
|
||||
}) + ';';
|
||||
const values = txs.map(tx => [tx.txid, tx.cluster]).flat();
|
||||
return {
|
||||
query,
|
||||
params: values,
|
||||
};
|
||||
}
|
||||
|
||||
public async $batchSetCluster(txs): Promise<void> {
|
||||
try {
|
||||
const query = this.buildBatchSetQuery(txs);
|
||||
await DB.query(
|
||||
query,
|
||||
values
|
||||
query.query,
|
||||
query.params,
|
||||
);
|
||||
} catch (e: any) {
|
||||
logger.err(`Cannot save cpfp transactions into db. Reason: ` + (e instanceof Error ? e.message : e));
|
||||
|
|
Loading…
Add table
Reference in a new issue