1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 09:54:09 +01:00

Move block indexing into parse module

This commit is contained in:
Roman Zeyde 2018-06-24 13:11:13 +03:00
parent 8b0b4323e0
commit 6f3424f734
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB
2 changed files with 59 additions and 66 deletions

View File

@ -6,47 +6,24 @@ extern crate log;
#[macro_use]
extern crate error_chain;
use bitcoin::network::serialize::BitcoinHash;
use bitcoin::util::hash::Sha256dHash;
use std::collections::HashMap;
use std::iter::FromIterator;
use electrs::{config::Config,
daemon::Daemon,
errors::*,
index,
metrics::{HistogramOpts, Metrics},
metrics::Metrics,
parse::Parser,
signal::Waiter,
store::{DBStore, StoreOptions},
store::{ReadStore, Row, WriteStore},
util::{Bytes, HeaderEntry, HeaderList}};
store::{DBStore, StoreOptions, WriteStore},
util::{HeaderEntry, HeaderList}};
use error_chain::ChainedError;
struct FakeStore;
impl ReadStore for FakeStore {
fn get(&self, _key: &[u8]) -> Option<Bytes> {
None
}
fn scan(&self, _prefix: &[u8]) -> Vec<Row> {
vec![]
}
}
impl WriteStore for FakeStore {
fn write(&self, _rows: Vec<Row>) {}
fn flush(&self) {}
}
fn run(config: Config) -> Result<()> {
let signal = Waiter::new();
let metrics = Metrics::new(config.monitoring_addr);
metrics.start();
let daemon = Daemon::new(config.network_type, &metrics)?;
let store = DBStore::open("./bench-db", StoreOptions { bulk_import: true });
let store = DBStore::open("./test-db", StoreOptions { bulk_import: true });
let tip = daemon.getbestblockhash()?;
let new_headers: Vec<HeaderEntry> = {
@ -56,35 +33,13 @@ fn run(config: Config) -> Result<()> {
new_headers.last().map(|tip| {
info!("{:?} ({} left to index)", tip, new_headers.len());
});
let height_map = HashMap::<Sha256dHash, usize>::from_iter(
new_headers.iter().map(|h| (*h.hash(), h.height())),
);
let duration = metrics.histogram_vec(
HistogramOpts::new("index_duration", "indexing duration (in seconds)"),
&["step"],
);
let chan = Parser::new(&daemon, &metrics)?.start();
for blocks in chan.iter() {
let chan = Parser::new(&daemon, &metrics)?.start(new_headers);
for rows in chan.iter() {
if let Some(sig) = signal.poll() {
bail!("indexing interrupted by SIG{:?}", sig);
}
let blocks = blocks?;
let timer = duration.with_label_values(&["index"]).start_timer();
let mut rows = vec![];
for block in &blocks {
let blockhash = block.bitcoin_hash();
if let Some(height) = height_map.get(&blockhash) {
rows.extend(index::index_block(block, *height));
} else {
warn!("unknown block {}", blockhash);
}
}
timer.observe_duration();
let timer = duration.with_label_values(&["write"]).start_timer();
store.write(rows);
timer.observe_duration();
store.write(rows?);
}
debug!("done");
Ok(())

View File

@ -1,15 +1,20 @@
use bitcoin::blockdata::block::Block;
use bitcoin::network::serialize::BitcoinHash;
use bitcoin::network::serialize::SimpleDecoder;
use bitcoin::network::serialize::{deserialize, RawDecoder};
use bitcoin::util::hash::Sha256dHash;
use std::collections::HashMap;
use std::fs;
use std::io::{Cursor, Seek, SeekFrom};
use std::iter::FromIterator;
use std::path::PathBuf;
use std::sync::mpsc::Receiver;
use std::thread;
use daemon::Daemon;
use index::index_block;
use metrics::{HistogramOpts, HistogramVec, Metrics};
use util::SyncChannel;
use store::Row;
use util::{spawn_thread, HeaderEntry, SyncChannel};
use errors::*;
@ -31,36 +36,69 @@ impl Parser {
})
}
pub fn start(self) -> Receiver<Result<Vec<Block>>> {
pub fn start(self, headers: Vec<HeaderEntry>) -> Receiver<Result<Vec<Row>>> {
let height_map = HashMap::<Sha256dHash, usize>::from_iter(
headers.iter().map(|h| (*h.hash(), h.height())),
);
let chan = SyncChannel::new(1);
let tx = chan.sender();
let blobs = read_files(self.files.clone(), self.duration.clone());
let parser = parse_files(self.files.clone(), self.duration.clone());
let duration = self.duration.clone();
thread::spawn(move || {
for msg in blobs.iter() {
spawn_thread("bulk_indexer", move || {
for msg in parser.iter() {
match msg {
Ok(blob) => {
let timer = duration.with_label_values(&["parse"]).start_timer();
let blocks = parse_blocks(&blob);
timer.observe_duration();
tx.send(blocks).unwrap();
Ok(blocks) => {
let mut rows = vec![];
for block in &blocks {
let blockhash = block.bitcoin_hash();
if let Some(height) = height_map.get(&blockhash) {
let timer = duration.with_label_values(&["index"]).start_timer();
rows.extend(index_block(block, *height));
timer.observe_duration();
} else {
warn!("unknown block {}", blockhash);
}
}
trace!("indexed {} rows from {} blocks", rows.len(), blocks.len());
tx.send(Ok(rows)).unwrap();
}
Err(err) => {
tx.send(Err(err)).unwrap();
return;
}
}
}
debug!("parsed {} blk files", self.files.len());
});
chan.into_receiver()
}
}
fn parse_files(files: Vec<PathBuf>, duration: HistogramVec) -> Receiver<Result<Vec<Block>>> {
let chan = SyncChannel::new(1);
let tx = chan.sender();
let blobs = read_files(files, duration.clone());
let duration = duration.clone();
spawn_thread("bulk_parser", move || {
for msg in blobs.iter() {
match msg {
Ok(blob) => {
let timer = duration.with_label_values(&["parse"]).start_timer();
let blocks = parse_blocks(&blob);
timer.observe_duration();
tx.send(blocks).unwrap();
}
Err(err) => {
tx.send(Err(err)).unwrap();
}
}
}
});
chan.into_receiver()
}
fn read_files(files: Vec<PathBuf>, duration: HistogramVec) -> Receiver<Result<Vec<u8>>> {
let chan = SyncChannel::new(1);
let tx = chan.sender();
thread::spawn(move || {
spawn_thread("bulk_reader", move || {
for f in &files {
let timer = duration.with_label_values(&["read"]).start_timer();
let msg = fs::read(f).chain_err(|| format!("failed to read {:?}", f));