1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 01:43:29 +01:00

Skip already indexed blocks

This commit is contained in:
Roman Zeyde 2018-04-10 20:06:25 +03:00
parent d875abcf22
commit 2a773d78b0
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB

View File

@ -20,7 +20,7 @@ use byteorder::{LittleEndian, WriteBytesExt};
use crypto::digest::Digest;
use crypto::sha2::Sha256;
use itertools::enumerate;
use rocksdb::{DBCompactionStyle, DBCompressionType, WriteBatch, DB};
use rocksdb::{DBCompactionStyle, DBCompressionType, WriteBatch, WriteOptions, DB};
// use serde_json::Value;
use std::collections::{HashMap, VecDeque};
use std::fmt::Write;
@ -31,14 +31,24 @@ const HEADER_SIZE: usize = 80;
const HASH_LEN: usize = 8;
type HeaderMap = HashMap<String, BlockHeader>;
type Bytes = Vec<u8>;
fn be_hex(data: &[u8]) -> String {
let mut ret = String::with_capacity(data.len() * 2);
for item in data.iter().rev() {
ret.push(std::char::from_digit((*item / 0x10) as u32, 16).unwrap());
ret.push(std::char::from_digit((*item & 0x0f) as u32, 16).unwrap());
}
ret
}
fn get(resource: &str) -> reqwest::Response {
let url = format!("http://localhost:8332/rest/{}", resource);
reqwest::get(&url).unwrap()
}
fn get_bin(resource: &str) -> Vec<u8> {
let mut buf: Vec<u8> = vec![];
fn get_bin(resource: &str) -> Bytes {
let mut buf = Bytes::new();
let mut resp = get(resource);
resp.copy_to(&mut buf).unwrap();
buf
@ -48,7 +58,6 @@ fn get_headers() -> (HeaderMap, String) {
let mut headers = HashMap::new();
let mut blockhash =
String::from("000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"); // genesis
info!("loading headers from {}", blockhash);
loop {
let data = get_bin(&format!("headers/2000/{}.bin", blockhash));
let num_of_headers = data.len() / HEADER_SIZE;
@ -79,8 +88,8 @@ fn enumerate_headers(headers: &HeaderMap, bestblockhash: &str) -> Vec<(usize, St
}
struct Row {
key: Vec<u8>,
value: Vec<u8>,
key: Bytes,
value: Bytes,
}
fn index_block(block: &Block, height: usize) -> Vec<Row> {
@ -200,24 +209,28 @@ struct Store {
start: PreciseTime,
}
struct StoreOptions {
auto_compact: bool,
}
impl Store {
/// Opens a new RocksDB at the specified location.
pub fn open(path: &str) -> Store {
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_compaction_style(DBCompactionStyle::Level);
opts.set_max_open_files(256);
opts.set_use_fsync(false);
opts.set_compression_type(DBCompressionType::None);
opts.set_target_file_size_base(64 << 20);
opts.set_write_buffer_size(256 << 20);
pub fn open(path: &str, opts: StoreOptions) -> Store {
let mut db_opts = rocksdb::Options::default();
db_opts.create_if_missing(true);
db_opts.set_compaction_style(DBCompactionStyle::Level);
db_opts.set_max_open_files(256);
db_opts.set_use_fsync(false);
db_opts.set_compression_type(DBCompressionType::None);
db_opts.set_target_file_size_base(64 << 20);
db_opts.set_write_buffer_size(256 << 20);
db_opts.set_disable_auto_compactions(!opts.auto_compact);
let mut block_opts = rocksdb::BlockBasedOptions::default();
block_opts.set_block_size(256 << 10);
// opts.set_disable_auto_compactions(true); // for initial sync?
info!("opening {}", path);
Store {
db: DB::open(&opts, &path).unwrap(),
db: DB::open(&db_opts, &path).unwrap(),
rows: vec![],
start: PreciseTime::now(),
}
@ -229,21 +242,63 @@ impl Store {
if elapsed < Duration::seconds(60) && self.rows.len() < 10_000_000 {
return;
}
self.flush();
}
pub fn flush(&mut self) {
let mut batch = WriteBatch::default();
for row in &self.rows {
batch.put(row.key.as_slice(), row.value.as_slice()).unwrap();
}
self.db.write(batch).unwrap();
let mut opts = WriteOptions::new();
opts.set_sync(true);
self.db.write_opt(batch, &opts).unwrap();
self.rows.clear();
self.start = PreciseTime::now();
}
pub fn read_headers(&mut self) -> HashMap<String, BlockHeader> {
let mut headers = HashMap::new();
for row in self.scan(b"B") {
let blockhash = be_hex(&row.key);
let header: BlockHeader = deserialize(&row.value).unwrap();
headers.insert(blockhash, header);
}
headers
}
// Use generators ???
fn scan(&mut self, prefix: &[u8]) -> Vec<Row> {
let mut rows = Vec::new();
let mut iter = self.db.raw_iterator();
let prefix_len = prefix.len();
iter.seek(prefix);
while iter.valid() {
let key = &iter.key().unwrap();
if &key[..prefix_len] != prefix {
break;
}
rows.push(Row {
key: key[prefix_len..].to_vec(),
value: iter.value().unwrap().to_vec(),
});
iter.next();
}
rows
}
}
fn main() {
simple_logger::init_with_level(log::Level::Info).unwrap();
let mut store = Store::open("db/mainnet", StoreOptions { auto_compact: true });
let indexed_headers = store.read_headers();
info!("indexed {} headers", indexed_headers.len());
let (headers, blockhash) = get_headers();
let hashes = enumerate_headers(&headers, &blockhash);
info!("loading {} blocks", hashes.len());
let mut hashes: Vec<(usize, String)> = enumerate_headers(&headers, &blockhash);
hashes.retain(|item| !indexed_headers.contains_key(&item.1));
info!("indexing {} blocks", hashes.len());
let mut timer = Timer::new();
@ -251,14 +306,12 @@ fn main() {
let mut rows_size = 0usize;
let mut num_of_rows = 0usize;
let mut store = Store::open("db/mainnet");
for &(height, ref blockhash) in &hashes {
timer.start("get");
let buf = get_bin(&format!("block/{}.bin", &blockhash));
timer.start("parse");
let block: Block = deserialize(buf.as_slice()).unwrap();
let block: Block = deserialize(&buf).unwrap();
timer.start("index");
let rows = index_block(&block, height);
@ -285,4 +338,5 @@ fn main() {
);
}
}
store.flush();
}