1
0
Fork 0
mirror of https://github.com/romanz/electrs.git synced 2025-02-23 22:56:55 +01:00

Refactor blk*.dat parser

This commit is contained in:
Roman Zeyde 2018-06-25 10:46:30 +03:00
parent 2625070cef
commit 4200648c8d
No known key found for this signature in database
GPG key ID: 87CAE5FA46917CBB
2 changed files with 62 additions and 65 deletions

View file

@ -10,10 +10,9 @@ use electrs::{config::Config,
daemon::Daemon,
errors::*,
metrics::Metrics,
parse::Parser,
parse::parser,
signal::Waiter,
store::{DBStore, StoreOptions, WriteStore},
util::{HeaderEntry, HeaderList}};
store::{DBStore, StoreOptions, WriteStore}};
use error_chain::ChainedError;
@ -25,16 +24,7 @@ fn run(config: Config) -> Result<()> {
let daemon = Daemon::new(config.network_type, &metrics)?;
let store = DBStore::open("./test-db", StoreOptions { bulk_import: true });
let tip = daemon.getbestblockhash()?;
let new_headers: Vec<HeaderEntry> = {
let indexed_headers = HeaderList::empty();
indexed_headers.order(daemon.get_new_headers(&indexed_headers, &tip)?)
};
new_headers.last().map(|tip| {
info!("{:?} ({} left to index)", tip, new_headers.len());
});
let chan = Parser::new(&daemon, &metrics)?.start(new_headers);
let chan = parser(&daemon, &store, &metrics)?;
for rows in chan.iter() {
if let Some(sig) = signal.poll() {
bail!("indexing interrupted by SIG{:?}", sig);

View file

@ -2,81 +2,88 @@ use bitcoin::blockdata::block::Block;
use bitcoin::network::serialize::BitcoinHash;
use bitcoin::network::serialize::SimpleDecoder;
use bitcoin::network::serialize::{deserialize, RawDecoder};
use bitcoin::util::hash::Sha256dHash;
use std::collections::HashMap;
use std::fs;
use std::io::{Cursor, Seek, SeekFrom};
use std::iter::FromIterator;
use std::path::PathBuf;
use std::sync::mpsc::Receiver;
use daemon::Daemon;
use index::index_block;
use index::{index_block, last_indexed_block, read_indexed_blockhashes};
use metrics::{HistogramOpts, HistogramVec, Metrics};
use store::Row;
use util::{spawn_thread, HeaderEntry, SyncChannel};
use store::{ReadStore, Row};
use util::{spawn_thread, HeaderList, SyncChannel};
use errors::*;
/// An efficient parser for Bitcoin blk*.dat files.
pub struct Parser {
files: Vec<PathBuf>,
// metrics
duration: HistogramVec,
fn load_headers(daemon: &Daemon) -> Result<HeaderList> {
let tip = daemon.getbestblockhash()?;
let mut headers = HeaderList::empty();
let new_headers = headers.order(daemon.get_new_headers(&headers, &tip)?);
headers.apply(new_headers);
Ok(headers)
}
impl Parser {
pub fn new(daemon: &Daemon, metrics: &Metrics) -> Result<Parser> {
Ok(Parser {
files: daemon.list_blk_files()?,
duration: metrics.histogram_vec(
HistogramOpts::new("parse_duration", "Block parsing duration (in seconds)"),
&["step"],
),
})
}
pub fn start(self, headers: Vec<HeaderEntry>) -> Receiver<Result<Vec<Vec<Row>>>> {
let height_map = HashMap::<Sha256dHash, usize>::from_iter(
headers.iter().map(|h| (*h.hash(), h.height())),
);
let chan = SyncChannel::new(1);
let tx = chan.sender();
let parser = parse_files(self.files.clone(), self.duration.clone());
let duration = self.duration.clone();
spawn_thread("bulk_indexer", move || {
for msg in parser.iter() {
match msg {
Ok(blocks) => {
let mut rows = vec![];
for block in &blocks {
let blockhash = block.bitcoin_hash();
if let Some(height) = height_map.get(&blockhash) {
let timer = duration.with_label_values(&["index"]).start_timer();
rows.push(index_block(block, *height));
timer.observe_duration();
} else {
warn!("unknown block {}", blockhash);
}
pub fn parser(
daemon: &Daemon,
store: &ReadStore,
metrics: &Metrics,
) -> Result<Receiver<Result<Vec<Vec<Row>>>>> {
let duration = metrics.histogram_vec(
HistogramOpts::new("parse_duration", "Block parsing duration (in seconds)"),
&["step"],
);
let chan = SyncChannel::new(1);
let tx = chan.sender();
let current_headers = load_headers(daemon)?;
let mut indexed_blockhashes = read_indexed_blockhashes(store);
info!("loaded {} blockhashes", indexed_blockhashes.len());
let parser = parse_files(daemon.list_blk_files()?, duration.clone());
spawn_thread("bulk_indexer", move || {
for msg in parser.iter() {
match msg {
Ok(blocks) => {
let mut rows = vec![];
for block in &blocks {
let blockhash = block.bitcoin_hash();
if indexed_blockhashes.contains(&blockhash) {
continue;
}
if let Some(header) = current_headers.header_by_blockhash(&blockhash) {
let timer = duration.with_label_values(&["index"]).start_timer();
rows.push(index_block(block, header.height()));
timer.observe_duration();
indexed_blockhashes.insert(blockhash);
} else {
warn!("unknown block {}", blockhash);
}
trace!("indexed {} rows from {} blocks", rows.len(), blocks.len());
tx.send(Ok(rows)).unwrap();
}
Err(err) => {
tx.send(Err(err)).unwrap();
}
trace!("indexed {} rows from {} blocks", rows.len(), blocks.len());
tx.send(Ok(rows)).unwrap();
}
Err(err) => {
tx.send(Err(err)).unwrap();
}
}
});
chan.into_receiver()
}
}
for header in current_headers.iter().rev() {
if indexed_blockhashes.contains(header.hash()) {
info!("{:?}", header);
let rows = vec![last_indexed_block(header.hash())];
tx.send(Ok(vec![rows])).unwrap();
return;
}
}
warn!("could not found tip")
});
Ok(chan.into_receiver())
}
fn parse_files(files: Vec<PathBuf>, duration: HistogramVec) -> Receiver<Result<Vec<Block>>> {
let chan = SyncChannel::new(1);
let tx = chan.sender();
let blobs = read_files(files, duration.clone());
let duration = duration.clone();
spawn_thread("bulk_parser", move || {
for msg in blobs.iter() {
match msg {