1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 09:54:09 +01:00

Monitor various DB statistics via Prometheus

d52b520d51/include/rocksdb/db.h (L953)

Note: `index_db_size` mertrics is removed.
This commit is contained in:
Roman Zeyde 2021-10-31 15:51:54 +02:00
parent 3731abd7ee
commit 53e16f89f7
2 changed files with 103 additions and 47 deletions

View File

@ -46,6 +46,43 @@ const COLUMN_FAMILIES: &[&str] = &[CONFIG_CF, HEADERS_CF, TXID_CF, FUNDING_CF, S
const CONFIG_KEY: &str = "C";
const TIP_KEY: &[u8] = b"T";
// Taken from https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L654-L689
const DB_PROPERIES: &[&str] = &[
"rocksdb.num-immutable-mem-table",
"rocksdb.mem-table-flush-pending",
"rocksdb.compaction-pending",
"rocksdb.background-errors",
"rocksdb.cur-size-active-mem-table",
"rocksdb.cur-size-all-mem-tables",
"rocksdb.size-all-mem-tables",
"rocksdb.num-entries-active-mem-table",
"rocksdb.num-entries-imm-mem-tables",
"rocksdb.num-deletes-active-mem-table",
"rocksdb.num-deletes-imm-mem-tables",
"rocksdb.estimate-num-keys",
"rocksdb.estimate-table-readers-mem",
"rocksdb.is-file-deletions-enabled",
"rocksdb.num-snapshots",
"rocksdb.oldest-snapshot-time",
"rocksdb.num-live-versions",
"rocksdb.current-super-version-number",
"rocksdb.estimate-live-data-size",
"rocksdb.min-log-number-to-keep",
"rocksdb.min-obsolete-sst-number-to-keep",
"rocksdb.total-sst-files-size",
"rocksdb.live-sst-files-size",
"rocksdb.base-level",
"rocksdb.estimate-pending-compaction-bytes",
"rocksdb.num-running-compactions",
"rocksdb.num-running-flushes",
"rocksdb.actual-delayed-write-rate",
"rocksdb.is-write-stopped",
"rocksdb.estimate-oldest-key-time",
"rocksdb.block-cache-capacity",
"rocksdb.block-cache-usage",
"rocksdb.block-cache-pinned-usage",
];
#[derive(Debug, Deserialize, Serialize)]
struct Config {
compacted: bool,
@ -220,21 +257,21 @@ impl DBStore {
.expect("get_tip failed")
}
pub(crate) fn write(&self, batch: WriteBatch) {
pub(crate) fn write(&self, batch: &WriteBatch) {
let mut db_batch = rocksdb::WriteBatch::default();
for key in batch.funding_rows {
for key in &batch.funding_rows {
db_batch.put_cf(self.funding_cf(), key, b"");
}
for key in batch.spending_rows {
for key in &batch.spending_rows {
db_batch.put_cf(self.spending_cf(), key, b"");
}
for key in batch.txid_rows {
for key in &batch.txid_rows {
db_batch.put_cf(self.txid_cf(), key, b"");
}
for key in batch.header_rows {
for key in &batch.header_rows {
db_batch.put_cf(self.headers_cf(), key, b"");
}
db_batch.put_cf(self.headers_cf(), TIP_KEY, batch.tip_row);
db_batch.put_cf(self.headers_cf(), TIP_KEY, &batch.tip_row);
let mut opts = rocksdb::WriteOptions::new();
let bulk_import = self.bulk_import.load(Ordering::Relaxed);
@ -272,8 +309,19 @@ impl DBStore {
}
}
pub(crate) fn get_size(&self) -> Result<u64> {
fs_extra::dir::get_size(self.db.path()).context("failed to get DB size")
pub(crate) fn get_properties(
&self,
) -> impl Iterator<Item = (&'static str, &'static str, u64)> + '_ {
COLUMN_FAMILIES.iter().flat_map(move |cf_name| {
let cf = self.db.cf_handle(cf_name).expect("missing CF");
DB_PROPERIES.iter().filter_map(move |property_name| {
let value = self
.db
.property_int_value_cf(cf, property_name)
.expect("failed to get property");
Some((*cf_name, *property_name, value?))
})
})
}
fn start_compactions(&self) {
@ -380,7 +428,7 @@ mod tests {
let mut batch = WriteBatch::default();
batch.txid_rows = to_rows(&items);
store.write(batch);
store.write(&batch);
let rows = store.iter_txid(b"abcdefgh".to_vec().into_boxed_slice());
assert_eq!(rows.collect::<Vec<_>>(), to_rows(&items[1..5]));

View File

@ -16,7 +16,7 @@ struct Stats {
update_duration: Histogram,
update_size: Histogram,
height: Gauge,
db_size: Gauge,
db_properties: Gauge,
}
impl Stats {
@ -35,7 +35,40 @@ impl Stats {
metrics::default_size_buckets(),
),
height: metrics.gauge("index_height", "Indexed block height", "type"),
db_size: metrics.gauge("index_db_size", "Index DB size (bytes)", "type"),
db_properties: metrics.gauge("index_db_properties", "Index DB properties", "name"),
}
}
fn observe_duration<T>(&self, label: &str, f: impl FnOnce() -> T) -> T {
self.update_duration.observe_duration(label, f)
}
fn observe_size(&self, label: &str, rows: &[Row]) {
self.update_size.observe(label, db_rows_size(rows));
}
fn observe_batch(&self, batch: &WriteBatch) {
self.observe_size("write_funding_rows", &batch.funding_rows);
self.observe_size("write_spending_rows", &batch.spending_rows);
self.observe_size("write_txid_rows", &batch.txid_rows);
self.observe_size("write_header_rows", &batch.header_rows);
debug!(
"writing {} funding and {} spending rows from {} transactions, {} blocks",
batch.funding_rows.len(),
batch.spending_rows.len(),
batch.txid_rows.len(),
batch.header_rows.len()
);
}
fn observe_chain(&self, chain: &Chain) {
self.height.set("tip", chain.height() as f64);
}
fn observe_db(&self, store: &DBStore) {
for (cf, name, value) in store.get_properties() {
self.db_properties
.set(&format!("{}:{}", name, cf), value as f64);
}
}
}
@ -91,11 +124,9 @@ impl Index {
chain.load(headers, tip);
chain.drop_last_headers(reindex_last_blocks);
};
let stats = Stats::new(metrics);
stats.height.set("tip", chain.height() as f64);
stats.db_size.set("total", store.get_size()? as f64);
stats.observe_chain(&chain);
stats.observe_db(&store);
Ok(Index {
store,
batch_size,
@ -148,35 +179,12 @@ impl Index {
.filter_map(move |height| self.chain.get_block_hash(height))
}
fn observe_duration<T>(&self, label: &str, f: impl FnOnce() -> T) -> T {
self.stats.update_duration.observe_duration(label, f)
}
fn observe_size(&self, label: &str, rows: &[Row]) {
self.stats.update_size.observe(label, db_rows_size(rows));
}
fn report_stats(&self, batch: &WriteBatch) {
self.observe_size("write_funding_rows", &batch.funding_rows);
self.observe_size("write_spending_rows", &batch.spending_rows);
self.observe_size("write_txid_rows", &batch.txid_rows);
self.observe_size("write_header_rows", &batch.header_rows);
debug!(
"writing {} funding and {} spending rows from {} transactions, {} blocks",
batch.funding_rows.len(),
batch.spending_rows.len(),
batch.txid_rows.len(),
batch.header_rows.len()
);
}
pub(crate) fn sync(&mut self, daemon: &Daemon, exit_flag: &ExitFlag) -> Result<()> {
self.stats.observe_db(&self.store);
loop {
self.stats
.db_size
.set("total", self.store.get_size()? as f64);
let new_headers =
self.observe_duration("headers", || daemon.get_new_headers(&self.chain))?;
let new_headers = self
.stats
.observe_duration("headers", || daemon.get_new_headers(&self.chain))?;
if new_headers.is_empty() {
break;
}
@ -199,7 +207,7 @@ impl Index {
let mut batch = WriteBatch::default();
daemon.for_blocks(blockhashes, |_blockhash, block| {
let height = heights.next().expect("unexpected block");
self.observe_duration("block", || {
self.stats.observe_duration("block", || {
index_single_block(block, height).extend(&mut batch)
});
self.stats.height.set("tip", height as f64);
@ -211,13 +219,13 @@ impl Index {
heights
);
batch.sort();
self.report_stats(&batch);
self.observe_duration("write", || self.store.write(batch));
self.stats.observe_batch(&batch);
self.stats
.db_size
.set("total", self.store.get_size()? as f64);
.observe_duration("write", || self.store.write(&batch));
self.stats.observe_db(&self.store);
}
self.chain.update(new_headers);
self.stats.observe_chain(&self.chain);
}
self.store.flush();
Ok(())