1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 01:43:29 +01:00

Split reading from indexing blk*.dat files

Also, use a single Vec<Row> for write operation
This commit is contained in:
Roman Zeyde 2018-07-10 15:37:56 +03:00
parent 56229409a0
commit fb372b2915
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB
7 changed files with 33 additions and 35 deletions

View File

@ -19,6 +19,7 @@ chan = "0.1"
chan-signal = "0.3" chan-signal = "0.3"
clap = "2.31" clap = "2.31"
error-chain = "0.12" error-chain = "0.12"
glob = "0.2"
hex = "0.3" hex = "0.3"
log = "0.4" log = "0.4"
prometheus = "0.4" prometheus = "0.4"
@ -30,4 +31,3 @@ serde_json = "1.0"
stderrlog = "0.4.1" stderrlog = "0.4.1"
time = "0.1" time = "0.1"
tiny_http = "0.6" tiny_http = "0.6"
glob = "0.2"

View File

@ -1,8 +1,8 @@
extern crate bitcoin;
extern crate electrs; extern crate electrs;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate error_chain; extern crate error_chain;
use std::path::Path; use std::path::Path;
@ -25,10 +25,11 @@ fn run(config: Config) -> Result<()> {
&metrics, &metrics,
)?; )?;
let store = DBStore::open(Path::new("./test-db"), StoreOptions { bulk_import: true }); let store = DBStore::open(Path::new("./test-db"), StoreOptions { bulk_import: true });
let parser = Parser::new(&daemon, &metrics)?;
let parser = Parser::new(&daemon, &store, &metrics)?; for path in daemon.list_blk_files()? {
for rows in parser.start().iter() { let blob = parser.read_blkfile(&path)?;
store.write(rows?); let rows = parser.index_blkfile(blob)?;
store.write(rows);
} }
Ok(()) Ok(())
} }

View File

@ -1,6 +1,5 @@
extern crate electrs; extern crate electrs;
#[macro_use]
extern crate error_chain; extern crate error_chain;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
@ -20,12 +19,11 @@ fn bulk_index(store: DBStore, daemon: &Daemon, signal: &Waiter, metrics: &Metric
return Ok(()); return Ok(());
} }
let parser = Parser::new(daemon, &metrics)?; let parser = Parser::new(daemon, &metrics)?;
let blkfiles = daemon.list_blk_files()?; for path in daemon.list_blk_files()? {
for path in &blkfiles {
let rows = parser.index_blkfile(path)?;
store.write(&rows);
trace!("indexed {:?}", path);
signal.poll_err()?; signal.poll_err()?;
let blob = parser.read_blkfile(&path)?;
let rows = parser.index_blkfile(blob)?;
store.write(rows);
} }
store.flush(); store.flush();
store.compact(); store.compact();

View File

@ -2,16 +2,13 @@ use bitcoin::blockdata::block::Block;
use bitcoin::network::serialize::BitcoinHash; use bitcoin::network::serialize::BitcoinHash;
use bitcoin::network::serialize::SimpleDecoder; use bitcoin::network::serialize::SimpleDecoder;
use bitcoin::network::serialize::{deserialize, RawDecoder}; use bitcoin::network::serialize::{deserialize, RawDecoder};
// use bitcoin::util::hash::Sha256dHash;
// use std::collections::HashSet;
use std::fs; use std::fs;
use std::io::{Cursor, Seek, SeekFrom}; use std::io::{Cursor, Seek, SeekFrom};
use std::path::Path; use std::path::Path;
// use std::sync::mpsc::Receiver;
use daemon::Daemon; use daemon::Daemon;
use index::{index_block, last_indexed_block}; use index::{index_block, last_indexed_block};
use metrics::{CounterVec, HistogramOpts, HistogramVec, MetricOpts, Metrics}; use metrics::{CounterVec, Histogram, HistogramOpts, HistogramVec, MetricOpts, Metrics};
use store::Row; use store::Row;
use util::HeaderList; use util::HeaderList;
@ -25,6 +22,7 @@ pub struct Parser {
// metrics // metrics
duration: HistogramVec, duration: HistogramVec,
block_count: CounterVec, block_count: CounterVec,
bytes_read: Histogram,
} }
impl Parser { impl Parser {
@ -40,25 +38,33 @@ impl Parser {
MetricOpts::new("parse_blocks", "# of block parsed (from blk*.dat)"), MetricOpts::new("parse_blocks", "# of block parsed (from blk*.dat)"),
&["type"], &["type"],
), ),
bytes_read: metrics.histogram(HistogramOpts::new(
"parse_bytes_read",
"# of bytes read (from blk*.dat)",
)),
}) })
} }
pub fn index_blkfile(&self, path: &Path) -> Result<Vec<Vec<Row>>> { pub fn read_blkfile(&self, path: &Path) -> Result<Vec<u8>> {
let timer = self.duration.with_label_values(&["read"]).start_timer(); let timer = self.duration.with_label_values(&["read"]).start_timer();
let blob = fs::read(&path).chain_err(|| format!("failed to read {:?}", path))?; let blob = fs::read(&path).chain_err(|| format!("failed to read {:?}", path))?;
timer.observe_duration(); timer.observe_duration();
trace!("read {:.2} MB from {:?}", blob.len() as f32 / 1e6, path); self.bytes_read.observe(blob.len() as f64);
return Ok(blob);
}
pub fn index_blkfile(&self, blob: Vec<u8>) -> Result<Vec<Row>> {
let timer = self.duration.with_label_values(&["parse"]).start_timer(); let timer = self.duration.with_label_values(&["parse"]).start_timer();
let blocks = parse_blocks(blob, self.magic)?; let blocks = parse_blocks(blob, self.magic)?;
timer.observe_duration(); timer.observe_duration();
let mut rows = vec![]; let mut rows = Vec::<Row>::new();
let timer = self.duration.with_label_values(&["index"]).start_timer(); let timer = self.duration.with_label_values(&["index"]).start_timer();
for block in blocks { for block in blocks {
let blockhash = block.bitcoin_hash(); let blockhash = block.bitcoin_hash();
if let Some(header) = self.current_headers.header_by_blockhash(&blockhash) { if let Some(header) = self.current_headers.header_by_blockhash(&blockhash) {
rows.push(index_block(&block, header.height())); rows.extend(index_block(&block, header.height()));
self.block_count.with_label_values(&["indexed"]).inc(); self.block_count.with_label_values(&["indexed"]).inc();
} else { } else {
debug!("skipping block {}", blockhash); // will be indexed later (after bulk load is over) debug!("skipping block {}", blockhash); // will be indexed later (after bulk load is over)
@ -102,11 +108,6 @@ fn parse_blocks(blob: Vec<u8>, magic: u32) -> Result<Vec<Block>> {
.chain_err(|| format!("failed to parse block at {}..{}", start, end))?; .chain_err(|| format!("failed to parse block at {}..{}", start, end))?;
blocks.push(block); blocks.push(block);
} }
trace!(
"parsed {} blocks from {:.2} MB blob",
blocks.len(),
blob.len() as f32 / 1e6
);
Ok(blocks) Ok(blocks)
} }

View File

@ -13,6 +13,6 @@ impl ReadStore for FakeStore {
} }
impl WriteStore for FakeStore { impl WriteStore for FakeStore {
fn write(&self, _rows_vec: &[Vec<Row>]) {} fn write(&self, _rows: Vec<Row>) {}
fn flush(&self) {} fn flush(&self) {}
} }

View File

@ -368,7 +368,7 @@ impl Index {
break; break;
} }
let mut rows_vec = vec![]; let mut rows = vec![];
for block in &batch { for block in &batch {
let blockhash = block.bitcoin_hash(); let blockhash = block.bitcoin_hash();
let height = *height_map let height = *height_map
@ -378,12 +378,12 @@ impl Index {
let timer = self.stats.start_timer("index"); let timer = self.stats.start_timer("index");
let mut block_rows = index_block(block, height); let mut block_rows = index_block(block, height);
block_rows.push(last_indexed_block(&blockhash)); block_rows.push(last_indexed_block(&blockhash));
rows_vec.push(block_rows); rows.extend(block_rows);
timer.observe_duration(); timer.observe_duration();
self.stats.update(block, height); self.stats.update(block, height);
} }
let timer = self.stats.start_timer("write"); let timer = self.stats.start_timer("write");
store.write(&rows_vec); store.write(rows);
timer.observe_duration(); timer.observe_duration();
} }
let timer = self.stats.start_timer("flush"); let timer = self.stats.start_timer("flush");

View File

@ -44,7 +44,7 @@ pub trait ReadStore: Sync {
} }
pub trait WriteStore: Sync { pub trait WriteStore: Sync {
fn write(&self, rows_vec: &[Vec<Row>]); fn write(&self, rows: Vec<Row>);
fn flush(&self); fn flush(&self);
} }
@ -117,12 +117,10 @@ impl ReadStore for DBStore {
} }
impl WriteStore for DBStore { impl WriteStore for DBStore {
fn write(&self, rows_vec: &[Vec<Row>]) { fn write(&self, rows: Vec<Row>) {
let batch = rocksdb::WriteBatch::default(); let batch = rocksdb::WriteBatch::default();
for rows in rows_vec { for row in rows {
for row in rows { batch.put(row.key.as_slice(), row.value.as_slice()).unwrap();
batch.put(row.key.as_slice(), row.value.as_slice()).unwrap();
}
} }
let mut opts = rocksdb::WriteOptions::new(); let mut opts = rocksdb::WriteOptions::new();
opts.set_sync(!self.opts.bulk_import); opts.set_sync(!self.opts.bulk_import);