From fb372b2915e988d8b1266df58a4ea3833eb801ef Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Tue, 10 Jul 2018 15:37:56 +0300 Subject: [PATCH] Split reading from indexing blk*.dat files Also, use a single Vec for write operation --- Cargo.toml | 2 +- examples/bench_parse.rs | 11 ++++++----- src/bin/electrs.rs | 10 ++++------ src/bulk.rs | 27 ++++++++++++++------------- src/fake.rs | 2 +- src/index.rs | 6 +++--- src/store.rs | 10 ++++------ 7 files changed, 33 insertions(+), 35 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 306edf9..3c80df7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ chan = "0.1" chan-signal = "0.3" clap = "2.31" error-chain = "0.12" +glob = "0.2" hex = "0.3" log = "0.4" prometheus = "0.4" @@ -30,4 +31,3 @@ serde_json = "1.0" stderrlog = "0.4.1" time = "0.1" tiny_http = "0.6" -glob = "0.2" diff --git a/examples/bench_parse.rs b/examples/bench_parse.rs index f38abb3..a760d25 100644 --- a/examples/bench_parse.rs +++ b/examples/bench_parse.rs @@ -1,8 +1,8 @@ -extern crate bitcoin; extern crate electrs; #[macro_use] extern crate log; + extern crate error_chain; use std::path::Path; @@ -25,10 +25,11 @@ fn run(config: Config) -> Result<()> { &metrics, )?; let store = DBStore::open(Path::new("./test-db"), StoreOptions { bulk_import: true }); - - let parser = Parser::new(&daemon, &store, &metrics)?; - for rows in parser.start().iter() { - store.write(rows?); + let parser = Parser::new(&daemon, &metrics)?; + for path in daemon.list_blk_files()? { + let blob = parser.read_blkfile(&path)?; + let rows = parser.index_blkfile(blob)?; + store.write(rows); } Ok(()) } diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 69382dc..0abfbfc 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -1,6 +1,5 @@ extern crate electrs; -#[macro_use] extern crate error_chain; #[macro_use] extern crate log; @@ -20,12 +19,11 @@ fn bulk_index(store: DBStore, daemon: &Daemon, signal: &Waiter, metrics: &Metric return Ok(()); } let parser = Parser::new(daemon, &metrics)?; - let blkfiles = daemon.list_blk_files()?; - for path in &blkfiles { - let rows = parser.index_blkfile(path)?; - store.write(&rows); - trace!("indexed {:?}", path); + for path in daemon.list_blk_files()? { signal.poll_err()?; + let blob = parser.read_blkfile(&path)?; + let rows = parser.index_blkfile(blob)?; + store.write(rows); } store.flush(); store.compact(); diff --git a/src/bulk.rs b/src/bulk.rs index 7762330..5bdbefd 100644 --- a/src/bulk.rs +++ b/src/bulk.rs @@ -2,16 +2,13 @@ use bitcoin::blockdata::block::Block; use bitcoin::network::serialize::BitcoinHash; use bitcoin::network::serialize::SimpleDecoder; use bitcoin::network::serialize::{deserialize, RawDecoder}; -// use bitcoin::util::hash::Sha256dHash; -// use std::collections::HashSet; use std::fs; use std::io::{Cursor, Seek, SeekFrom}; use std::path::Path; -// use std::sync::mpsc::Receiver; use daemon::Daemon; use index::{index_block, last_indexed_block}; -use metrics::{CounterVec, HistogramOpts, HistogramVec, MetricOpts, Metrics}; +use metrics::{CounterVec, Histogram, HistogramOpts, HistogramVec, MetricOpts, Metrics}; use store::Row; use util::HeaderList; @@ -25,6 +22,7 @@ pub struct Parser { // metrics duration: HistogramVec, block_count: CounterVec, + bytes_read: Histogram, } impl Parser { @@ -40,25 +38,33 @@ impl Parser { MetricOpts::new("parse_blocks", "# of block parsed (from blk*.dat)"), &["type"], ), + + bytes_read: metrics.histogram(HistogramOpts::new( + "parse_bytes_read", + "# of bytes read (from blk*.dat)", + )), }) } - pub fn index_blkfile(&self, path: &Path) -> Result>> { + pub fn read_blkfile(&self, path: &Path) -> Result> { let timer = self.duration.with_label_values(&["read"]).start_timer(); let blob = fs::read(&path).chain_err(|| format!("failed to read {:?}", path))?; timer.observe_duration(); - trace!("read {:.2} MB from {:?}", blob.len() as f32 / 1e6, path); + self.bytes_read.observe(blob.len() as f64); + return Ok(blob); + } + pub fn index_blkfile(&self, blob: Vec) -> Result> { let timer = self.duration.with_label_values(&["parse"]).start_timer(); let blocks = parse_blocks(blob, self.magic)?; timer.observe_duration(); - let mut rows = vec![]; + let mut rows = Vec::::new(); let timer = self.duration.with_label_values(&["index"]).start_timer(); for block in blocks { let blockhash = block.bitcoin_hash(); if let Some(header) = self.current_headers.header_by_blockhash(&blockhash) { - rows.push(index_block(&block, header.height())); + rows.extend(index_block(&block, header.height())); self.block_count.with_label_values(&["indexed"]).inc(); } else { debug!("skipping block {}", blockhash); // will be indexed later (after bulk load is over) @@ -102,11 +108,6 @@ fn parse_blocks(blob: Vec, magic: u32) -> Result> { .chain_err(|| format!("failed to parse block at {}..{}", start, end))?; blocks.push(block); } - trace!( - "parsed {} blocks from {:.2} MB blob", - blocks.len(), - blob.len() as f32 / 1e6 - ); Ok(blocks) } diff --git a/src/fake.rs b/src/fake.rs index f7b6658..44e8424 100644 --- a/src/fake.rs +++ b/src/fake.rs @@ -13,6 +13,6 @@ impl ReadStore for FakeStore { } impl WriteStore for FakeStore { - fn write(&self, _rows_vec: &[Vec]) {} + fn write(&self, _rows: Vec) {} fn flush(&self) {} } diff --git a/src/index.rs b/src/index.rs index dc71236..32e7894 100644 --- a/src/index.rs +++ b/src/index.rs @@ -368,7 +368,7 @@ impl Index { break; } - let mut rows_vec = vec![]; + let mut rows = vec![]; for block in &batch { let blockhash = block.bitcoin_hash(); let height = *height_map @@ -378,12 +378,12 @@ impl Index { let timer = self.stats.start_timer("index"); let mut block_rows = index_block(block, height); block_rows.push(last_indexed_block(&blockhash)); - rows_vec.push(block_rows); + rows.extend(block_rows); timer.observe_duration(); self.stats.update(block, height); } let timer = self.stats.start_timer("write"); - store.write(&rows_vec); + store.write(rows); timer.observe_duration(); } let timer = self.stats.start_timer("flush"); diff --git a/src/store.rs b/src/store.rs index 74365ad..d9c168c 100644 --- a/src/store.rs +++ b/src/store.rs @@ -44,7 +44,7 @@ pub trait ReadStore: Sync { } pub trait WriteStore: Sync { - fn write(&self, rows_vec: &[Vec]); + fn write(&self, rows: Vec); fn flush(&self); } @@ -117,12 +117,10 @@ impl ReadStore for DBStore { } impl WriteStore for DBStore { - fn write(&self, rows_vec: &[Vec]) { + fn write(&self, rows: Vec) { let batch = rocksdb::WriteBatch::default(); - for rows in rows_vec { - for row in rows { - batch.put(row.key.as_slice(), row.value.as_slice()).unwrap(); - } + for row in rows { + batch.put(row.key.as_slice(), row.value.as_slice()).unwrap(); } let mut opts = rocksdb::WriteOptions::new(); opts.set_sync(!self.opts.bulk_import);