diff --git a/src/main.rs b/src/main.rs index 5e16881..e996ef2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,7 +20,7 @@ use byteorder::{LittleEndian, WriteBytesExt}; use crypto::digest::Digest; use crypto::sha2::Sha256; use itertools::enumerate; -use rocksdb::{DBCompactionStyle, DBCompressionType, WriteBatch, DB}; +use rocksdb::{DBCompactionStyle, DBCompressionType, WriteBatch, WriteOptions, DB}; // use serde_json::Value; use std::collections::{HashMap, VecDeque}; use std::fmt::Write; @@ -31,14 +31,24 @@ const HEADER_SIZE: usize = 80; const HASH_LEN: usize = 8; type HeaderMap = HashMap; +type Bytes = Vec; + +fn be_hex(data: &[u8]) -> String { + let mut ret = String::with_capacity(data.len() * 2); + for item in data.iter().rev() { + ret.push(std::char::from_digit((*item / 0x10) as u32, 16).unwrap()); + ret.push(std::char::from_digit((*item & 0x0f) as u32, 16).unwrap()); + } + ret +} fn get(resource: &str) -> reqwest::Response { let url = format!("http://localhost:8332/rest/{}", resource); reqwest::get(&url).unwrap() } -fn get_bin(resource: &str) -> Vec { - let mut buf: Vec = vec![]; +fn get_bin(resource: &str) -> Bytes { + let mut buf = Bytes::new(); let mut resp = get(resource); resp.copy_to(&mut buf).unwrap(); buf @@ -48,7 +58,6 @@ fn get_headers() -> (HeaderMap, String) { let mut headers = HashMap::new(); let mut blockhash = String::from("000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"); // genesis - info!("loading headers from {}", blockhash); loop { let data = get_bin(&format!("headers/2000/{}.bin", blockhash)); let num_of_headers = data.len() / HEADER_SIZE; @@ -79,8 +88,8 @@ fn enumerate_headers(headers: &HeaderMap, bestblockhash: &str) -> Vec<(usize, St } struct Row { - key: Vec, - value: Vec, + key: Bytes, + value: Bytes, } fn index_block(block: &Block, height: usize) -> Vec { @@ -200,24 +209,28 @@ struct Store { start: PreciseTime, } +struct StoreOptions { + auto_compact: bool, +} + impl Store { /// Opens a new RocksDB at the specified location. - pub fn open(path: &str) -> Store { - let mut opts = rocksdb::Options::default(); - opts.create_if_missing(true); - opts.set_compaction_style(DBCompactionStyle::Level); - opts.set_max_open_files(256); - opts.set_use_fsync(false); - opts.set_compression_type(DBCompressionType::None); - opts.set_target_file_size_base(64 << 20); - opts.set_write_buffer_size(256 << 20); + pub fn open(path: &str, opts: StoreOptions) -> Store { + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.set_compaction_style(DBCompactionStyle::Level); + db_opts.set_max_open_files(256); + db_opts.set_use_fsync(false); + db_opts.set_compression_type(DBCompressionType::None); + db_opts.set_target_file_size_base(64 << 20); + db_opts.set_write_buffer_size(256 << 20); + db_opts.set_disable_auto_compactions(!opts.auto_compact); let mut block_opts = rocksdb::BlockBasedOptions::default(); block_opts.set_block_size(256 << 10); - - // opts.set_disable_auto_compactions(true); // for initial sync? + info!("opening {}", path); Store { - db: DB::open(&opts, &path).unwrap(), + db: DB::open(&db_opts, &path).unwrap(), rows: vec![], start: PreciseTime::now(), } @@ -229,21 +242,63 @@ impl Store { if elapsed < Duration::seconds(60) && self.rows.len() < 10_000_000 { return; } + self.flush(); + } + + pub fn flush(&mut self) { let mut batch = WriteBatch::default(); for row in &self.rows { batch.put(row.key.as_slice(), row.value.as_slice()).unwrap(); } - self.db.write(batch).unwrap(); + let mut opts = WriteOptions::new(); + opts.set_sync(true); + self.db.write_opt(batch, &opts).unwrap(); self.rows.clear(); self.start = PreciseTime::now(); } + + pub fn read_headers(&mut self) -> HashMap { + let mut headers = HashMap::new(); + for row in self.scan(b"B") { + let blockhash = be_hex(&row.key); + let header: BlockHeader = deserialize(&row.value).unwrap(); + headers.insert(blockhash, header); + } + headers + } + + // Use generators ??? + fn scan(&mut self, prefix: &[u8]) -> Vec { + let mut rows = Vec::new(); + let mut iter = self.db.raw_iterator(); + let prefix_len = prefix.len(); + iter.seek(prefix); + while iter.valid() { + let key = &iter.key().unwrap(); + if &key[..prefix_len] != prefix { + break; + } + rows.push(Row { + key: key[prefix_len..].to_vec(), + value: iter.value().unwrap().to_vec(), + }); + iter.next(); + } + rows + } } fn main() { simple_logger::init_with_level(log::Level::Info).unwrap(); + let mut store = Store::open("db/mainnet", StoreOptions { auto_compact: true }); + + let indexed_headers = store.read_headers(); + info!("indexed {} headers", indexed_headers.len()); + let (headers, blockhash) = get_headers(); - let hashes = enumerate_headers(&headers, &blockhash); - info!("loading {} blocks", hashes.len()); + let mut hashes: Vec<(usize, String)> = enumerate_headers(&headers, &blockhash); + hashes.retain(|item| !indexed_headers.contains_key(&item.1)); + info!("indexing {} blocks", hashes.len()); let mut timer = Timer::new(); @@ -251,14 +306,12 @@ fn main() { let mut rows_size = 0usize; let mut num_of_rows = 0usize; - let mut store = Store::open("db/mainnet"); - for &(height, ref blockhash) in &hashes { timer.start("get"); let buf = get_bin(&format!("block/{}.bin", &blockhash)); timer.start("parse"); - let block: Block = deserialize(buf.as_slice()).unwrap(); + let block: Block = deserialize(&buf).unwrap(); timer.start("index"); let rows = index_block(&block, height); @@ -285,4 +338,5 @@ fn main() { ); } } + store.flush(); }