From 0f743be7852e68c695470bce069a31d3cb861b6c Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Wed, 11 Apr 2018 10:59:22 +0300 Subject: [PATCH] Split main.rs into a few modules --- .gitignore | 1 + src/daemon.rs | 62 ++++++++++++ src/main.rs | 266 +++----------------------------------------------- src/store.rs | 129 ++++++++++++++++++++++++ src/timer.rs | 50 ++++++++++ src/waiter.rs | 23 +++++ 6 files changed, 278 insertions(+), 253 deletions(-) create mode 100644 src/daemon.rs create mode 100644 src/store.rs create mode 100644 src/timer.rs create mode 100644 src/waiter.rs diff --git a/.gitignore b/.gitignore index c7f2cd9..8e1347e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ target Cargo.lock db/ +*.log diff --git a/src/daemon.rs b/src/daemon.rs new file mode 100644 index 0000000..0682ac0 --- /dev/null +++ b/src/daemon.rs @@ -0,0 +1,62 @@ +extern crate itertools; +extern crate reqwest; + +use bitcoin::blockdata::block::BlockHeader; +use bitcoin::network::encodable::ConsensusDecodable; +use bitcoin::network::serialize::BitcoinHash; +use bitcoin::network::serialize::RawDecoder; +use bitcoin::util::hash::Sha256dHash; +use self::itertools::enumerate; +use std::collections::{HashMap, VecDeque}; +use std::io::Cursor; + +use Bytes; + +const HEADER_SIZE: usize = 80; + +type HeaderMap = HashMap; + +fn get(resource: &str) -> reqwest::Response { + let url = format!("http://localhost:8332/rest/{}", resource); + reqwest::get(&url).unwrap() +} + +pub fn get_bin(resource: &str) -> Bytes { + let mut buf = Bytes::new(); + let mut resp = get(resource); + resp.copy_to(&mut buf).unwrap(); + buf +} + +pub fn get_headers() -> (HeaderMap, String) { + let mut headers = HashMap::new(); + let mut blockhash = + String::from("000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"); // genesis + loop { + let data = get_bin(&format!("headers/2000/{}.bin", blockhash)); + let num_of_headers = data.len() / HEADER_SIZE; + let mut decoder = RawDecoder::new(Cursor::new(data)); + for _ in 0..num_of_headers { + let header: BlockHeader = ConsensusDecodable::consensus_decode(&mut decoder).unwrap(); + blockhash = header.bitcoin_hash().be_hex_string(); + headers.insert(blockhash.to_string(), header); + } + if num_of_headers == 1 { + break; + } + } + info!("loaded {} headers till {}", headers.len(), blockhash); + (headers, blockhash) +} + +pub fn enumerate_headers(headers: &HeaderMap, bestblockhash: &str) -> Vec<(usize, String)> { + let null_hash = Sha256dHash::default().be_hex_string(); + let mut hashes = VecDeque::::new(); + let mut blockhash = bestblockhash.to_string(); + while blockhash != null_hash { + let header: &BlockHeader = headers.get(&blockhash).unwrap(); + hashes.push_front(blockhash); + blockhash = header.prev_blockhash.be_hex_string(); + } + enumerate(hashes).collect() +} diff --git a/src/main.rs b/src/main.rs index 46841cd..a2341cb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,95 +4,27 @@ extern crate log; extern crate bitcoin; extern crate byteorder; extern crate crypto; -extern crate itertools; -extern crate reqwest; -extern crate rocksdb; -extern crate serde_json; extern crate simple_logger; -extern crate time; -extern crate zmq; -use bitcoin::blockdata::block::{Block, BlockHeader}; -use bitcoin::network::encodable::ConsensusDecodable; +mod daemon; +mod store; +mod timer; +mod waiter; + +use bitcoin::blockdata::block::Block; use bitcoin::network::serialize::BitcoinHash; -use bitcoin::network::serialize::{deserialize, serialize, RawDecoder}; +use bitcoin::network::serialize::{deserialize, serialize}; use bitcoin::util::hash::Sha256dHash; use byteorder::{LittleEndian, WriteBytesExt}; use crypto::digest::Digest; use crypto::sha2::Sha256; -use itertools::enumerate; -use rocksdb::{DBCompactionStyle, DBCompressionType, WriteBatch, WriteOptions, DB}; -// use serde_json::Value; -use std::collections::{HashMap, VecDeque}; -use std::fmt::Write; -use std::io::Cursor; -use time::{Duration, PreciseTime}; +use store::{Row, Store, StoreOptions}; +use timer::Timer; -const HEADER_SIZE: usize = 80; const HASH_LEN: usize = 8; -type HeaderMap = HashMap; type Bytes = Vec; -fn revhex(data: &[u8]) -> String { - let mut ret = String::with_capacity(data.len() * 2); - for item in data.iter().rev() { - ret.push(std::char::from_digit((*item / 0x10) as u32, 16).unwrap()); - ret.push(std::char::from_digit((*item & 0x0f) as u32, 16).unwrap()); - } - ret -} - -fn get(resource: &str) -> reqwest::Response { - let url = format!("http://localhost:8332/rest/{}", resource); - reqwest::get(&url).unwrap() -} - -fn get_bin(resource: &str) -> Bytes { - let mut buf = Bytes::new(); - let mut resp = get(resource); - resp.copy_to(&mut buf).unwrap(); - buf -} - -fn get_headers() -> (HeaderMap, String) { - let mut headers = HashMap::new(); - let mut blockhash = - String::from("000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"); // genesis - loop { - let data = get_bin(&format!("headers/2000/{}.bin", blockhash)); - let num_of_headers = data.len() / HEADER_SIZE; - let mut decoder = RawDecoder::new(Cursor::new(data)); - for _ in 0..num_of_headers { - let header: BlockHeader = ConsensusDecodable::consensus_decode(&mut decoder).unwrap(); - blockhash = header.bitcoin_hash().be_hex_string(); - headers.insert(blockhash.to_string(), header); - } - if num_of_headers == 1 { - break; - } - } - info!("loaded {} headers till {}", headers.len(), blockhash); - (headers, blockhash) -} - -fn enumerate_headers(headers: &HeaderMap, bestblockhash: &str) -> Vec<(usize, String)> { - let null_hash = Sha256dHash::default().be_hex_string(); - let mut hashes = VecDeque::::new(); - let mut blockhash = bestblockhash.to_string(); - while blockhash != null_hash { - let header: &BlockHeader = headers.get(&blockhash).unwrap(); - hashes.push_front(blockhash); - blockhash = header.prev_blockhash.be_hex_string(); - } - enumerate(hashes).collect() -} - -struct Row { - key: Bytes, - value: Bytes, -} - fn index_block(block: &Block, height: usize) -> Vec { let null_hash = Sha256dHash::default(); let mut rows = Vec::new(); @@ -153,164 +85,12 @@ fn index_block(block: &Block, height: usize) -> Vec { rows } -// fn get_bestblockhash() -> String { -// let data = get("chaininfo.json").text().unwrap(); -// let val: Value = serde_json::from_str(&data).unwrap(); -// val["bestblockhash"].as_str().unwrap().to_string() -// } - -struct Timer { - durations: HashMap, - start: Option, - name: String, -} - -impl Timer { - pub fn new() -> Timer { - Timer { - durations: HashMap::new(), - start: None, - name: String::from(""), - } - } - - pub fn start(&mut self, name: &str) { - self.start = Some(self.stop()); - self.name = name.to_string(); - } - - pub fn stop(&mut self) -> PreciseTime { - let now = PreciseTime::now(); - if let Some(start) = self.start { - let duration = self.durations - .entry(self.name.to_string()) // ??? - .or_insert(Duration::zero()); - *duration = *duration + start.to(now); - } - self.start = None; - now - } - - pub fn stats(&self) -> String { - let mut s = String::new(); - let mut total = 0f64; - for (k, v) in self.durations.iter() { - let t = v.num_milliseconds() as f64 / 1e3; - total += t; - write!(&mut s, "{}: {:.2}s ", k, t).unwrap(); - } - write!(&mut s, "total: {:.2}s", total).unwrap(); - return s; - } -} - -struct Store { - db: DB, - rows: Vec, - start: PreciseTime, -} - -struct StoreOptions { - auto_compact: bool, -} - -impl Store { - /// Opens a new RocksDB at the specified location. - pub fn open(path: &str, opts: StoreOptions) -> Store { - let mut db_opts = rocksdb::Options::default(); - db_opts.create_if_missing(true); - db_opts.set_compaction_style(DBCompactionStyle::Level); - db_opts.set_max_open_files(256); - db_opts.set_use_fsync(false); - db_opts.set_compression_type(DBCompressionType::Snappy); - db_opts.set_target_file_size_base(128 << 20); - db_opts.set_write_buffer_size(256 << 20); - // db_opts.set_compaction_readahead_size(2 << 20); - db_opts.set_disable_auto_compactions(!opts.auto_compact); - - let mut block_opts = rocksdb::BlockBasedOptions::default(); - block_opts.set_block_size(256 << 10); - info!("opening {}", path); - Store { - db: DB::open(&db_opts, &path).unwrap(), - rows: vec![], - start: PreciseTime::now(), - } - } - - pub fn persist(&mut self, mut rows: Vec) { - self.rows.append(&mut rows); - let elapsed: Duration = self.start.to(PreciseTime::now()); - if elapsed < Duration::seconds(60) && self.rows.len() < 10_000_000 { - return; - } - self.flush(); - } - - pub fn flush(&mut self) { - let mut batch = WriteBatch::default(); - for row in &self.rows { - batch.put(row.key.as_slice(), row.value.as_slice()).unwrap(); - } - let mut opts = WriteOptions::new(); - opts.set_sync(true); - self.db.write_opt(batch, &opts).unwrap(); - self.rows.clear(); - self.start = PreciseTime::now(); - } - - pub fn read_headers(&mut self) -> HashMap { - let mut headers = HashMap::new(); - for row in self.scan(b"B") { - let blockhash = revhex(&row.key); - let header: BlockHeader = deserialize(&row.value).unwrap(); - headers.insert(blockhash, header); - } - headers - } - - pub fn has_block(&mut self, blockhash: &[u8]) -> bool { - let key: &[u8] = &[b"B", blockhash].concat(); - self.db.get(key).unwrap().is_some() - } - - pub fn compact_if_needed(&mut self) { - let key = b"F"; // full compaction - if self.db.get(key).unwrap().is_some() { - return; - } - info!("full compaction"); - self.db.compact_range(None, None); // should take a while - self.db.put(key, b"").unwrap(); - } - - // Use generators ??? - fn scan(&mut self, prefix: &[u8]) -> Vec { - let mut rows = Vec::new(); - let mut iter = self.db.raw_iterator(); - let prefix_len = prefix.len(); - iter.seek(prefix); - while iter.valid() { - let key = &iter.key().unwrap(); - if &key[..prefix_len] != prefix { - break; - } - rows.push(Row { - key: key[prefix_len..].to_vec(), - value: iter.value().unwrap().to_vec(), - }); - iter.next(); - } - rows - } -} - fn index_blocks(store: &mut Store) { let indexed_headers = store.read_headers(); info!("indexed {} headers", indexed_headers.len()); - let (headers, blockhash) = get_headers(); - let mut hashes: Vec<(usize, String)> = enumerate_headers(&headers, &blockhash); + let (headers, blockhash) = daemon::get_headers(); + let mut hashes: Vec<(usize, String)> = daemon::enumerate_headers(&headers, &blockhash); hashes.retain(|item| !indexed_headers.contains_key(&item.1)); info!("indexing {} blocks", hashes.len()); @@ -322,7 +102,7 @@ fn index_blocks(store: &mut Store) { for (height, blockhash) in hashes { timer.start("get"); - let buf = get_bin(&format!("block/{}.bin", &blockhash)); + let buf: Bytes = daemon::get_bin(&format!("block/{}.bin", &blockhash)); timer.start("parse"); let block: Block = deserialize(&buf).unwrap(); @@ -354,29 +134,9 @@ fn index_blocks(store: &mut Store) { store.flush(); } -struct Waiter { - sock: zmq::Socket, -} - -impl Waiter { - pub fn new(endpoint: &str) -> Waiter { - let ctx = zmq::Context::new(); - let sock = ctx.socket(zmq::SocketType::SUB).unwrap(); - sock.set_subscribe(b"hashblock").unwrap(); - sock.connect(endpoint).unwrap(); - Waiter { sock } - } - - pub fn wait(&self) -> Bytes { - let mut blockhash = self.sock.recv_multipart(0).unwrap().remove(1); - blockhash.reverse(); - blockhash - } -} - fn main() { simple_logger::init_with_level(log::Level::Info).unwrap(); - let waiter = Waiter::new("tcp://localhost:28332"); + let waiter = waiter::Waiter::new("tcp://localhost:28332"); { let mut store = Store::open( "db/mainnet", diff --git a/src/store.rs b/src/store.rs new file mode 100644 index 0000000..4ac936a --- /dev/null +++ b/src/store.rs @@ -0,0 +1,129 @@ +extern crate bitcoin; +extern crate byteorder; +extern crate crypto; +extern crate rocksdb; +extern crate time; + +use self::rocksdb::{DBCompactionStyle, DBCompressionType, WriteBatch, WriteOptions, DB}; +use bitcoin::blockdata::block::BlockHeader; +use bitcoin::network::serialize::deserialize; +use std::char::from_digit; +use std::collections::HashMap; +use self::time::{Duration, PreciseTime}; + +use Bytes; + +pub struct Store { + db: DB, + rows: Vec, + start: PreciseTime, +} + +pub struct Row { + pub key: Bytes, + pub value: Bytes, +} + +pub struct StoreOptions { + pub auto_compact: bool, +} + +fn revhex(data: &[u8]) -> String { + let mut ret = String::with_capacity(data.len() * 2); + for item in data.iter().rev() { + ret.push(from_digit((*item / 0x10) as u32, 16).unwrap()); + ret.push(from_digit((*item & 0x0f) as u32, 16).unwrap()); + } + ret +} + +impl Store { + /// Opens a new RocksDB at the specified location. + pub fn open(path: &str, opts: StoreOptions) -> Store { + let mut db_opts = rocksdb::Options::default(); + db_opts.create_if_missing(true); + db_opts.set_compaction_style(DBCompactionStyle::Level); + db_opts.set_max_open_files(256); + db_opts.set_use_fsync(false); + db_opts.set_compression_type(DBCompressionType::Snappy); + db_opts.set_target_file_size_base(128 << 20); + db_opts.set_write_buffer_size(256 << 20); + // db_opts.set_compaction_readahead_size(2 << 20); + db_opts.set_disable_auto_compactions(!opts.auto_compact); + + let mut block_opts = rocksdb::BlockBasedOptions::default(); + block_opts.set_block_size(256 << 10); + info!("opening {}", path); + Store { + db: DB::open(&db_opts, &path).unwrap(), + rows: vec![], + start: PreciseTime::now(), + } + } + + pub fn persist(&mut self, mut rows: Vec) { + self.rows.append(&mut rows); + let elapsed: Duration = self.start.to(PreciseTime::now()); + if elapsed < Duration::seconds(60) && self.rows.len() < 10_000_000 { + return; + } + self.flush(); + } + + pub fn flush(&mut self) { + let mut batch = WriteBatch::default(); + for row in &self.rows { + batch.put(row.key.as_slice(), row.value.as_slice()).unwrap(); + } + let mut opts = WriteOptions::new(); + opts.set_sync(true); + self.db.write_opt(batch, &opts).unwrap(); + self.rows.clear(); + self.start = PreciseTime::now(); + } + + pub fn read_headers(&mut self) -> HashMap { + let mut headers = HashMap::new(); + for row in self.scan(b"B") { + let blockhash = revhex(&row.key); + let header: BlockHeader = deserialize(&row.value).unwrap(); + headers.insert(blockhash, header); + } + headers + } + + pub fn has_block(&mut self, blockhash: &[u8]) -> bool { + let key: &[u8] = &[b"B", blockhash].concat(); + self.db.get(key).unwrap().is_some() + } + + pub fn compact_if_needed(&mut self) { + let key = b"F"; // full compaction + if self.db.get(key).unwrap().is_some() { + return; + } + info!("full compaction"); + self.db.compact_range(None, None); // should take a while + self.db.put(key, b"").unwrap(); + } + + // Use generators ??? + fn scan(&mut self, prefix: &[u8]) -> Vec { + let mut rows = Vec::new(); + let mut iter = self.db.raw_iterator(); + let prefix_len = prefix.len(); + iter.seek(prefix); + while iter.valid() { + let key = &iter.key().unwrap(); + if &key[..prefix_len] != prefix { + break; + } + rows.push(Row { + key: key[prefix_len..].to_vec(), + value: iter.value().unwrap().to_vec(), + }); + iter.next(); + } + rows + } +} diff --git a/src/timer.rs b/src/timer.rs new file mode 100644 index 0000000..b5443ac --- /dev/null +++ b/src/timer.rs @@ -0,0 +1,50 @@ +extern crate time; + +use std::fmt::Write; +use std::collections::HashMap; +use self::time::{Duration, PreciseTime}; + +pub struct Timer { + durations: HashMap, + start: Option, + name: String, +} + +impl Timer { + pub fn new() -> Timer { + Timer { + durations: HashMap::new(), + start: None, + name: String::from(""), + } + } + + pub fn start(&mut self, name: &str) { + self.start = Some(self.stop()); + self.name = name.to_string(); + } + + pub fn stop(&mut self) -> PreciseTime { + let now = PreciseTime::now(); + if let Some(start) = self.start { + let duration = self.durations + .entry(self.name.to_string()) // ??? + .or_insert(Duration::zero()); + *duration = *duration + start.to(now); + } + self.start = None; + now + } + + pub fn stats(&self) -> String { + let mut s = String::new(); + let mut total = 0f64; + for (k, v) in self.durations.iter() { + let t = v.num_milliseconds() as f64 / 1e3; + total += t; + write!(&mut s, "{}: {:.2}s ", k, t).unwrap(); + } + write!(&mut s, "total: {:.2}s", total).unwrap(); + return s; + } +} diff --git a/src/waiter.rs b/src/waiter.rs new file mode 100644 index 0000000..2d81f9c --- /dev/null +++ b/src/waiter.rs @@ -0,0 +1,23 @@ +extern crate zmq; + +use Bytes; + +pub struct Waiter { + sock: zmq::Socket, +} + +impl Waiter { + pub fn new(endpoint: &str) -> Waiter { + let ctx = zmq::Context::new(); + let sock = ctx.socket(zmq::SocketType::SUB).unwrap(); + sock.set_subscribe(b"hashblock").unwrap(); + sock.connect(endpoint).unwrap(); + Waiter { sock } + } + + pub fn wait(&self) -> Bytes { + let mut blockhash = self.sock.recv_multipart(0).unwrap().remove(1); + blockhash.reverse(); + blockhash + } +}