1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 09:54:09 +01:00

Split main.rs into a few modules

This commit is contained in:
Roman Zeyde 2018-04-11 10:59:22 +03:00
parent 5605ff7451
commit 0f743be785
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB
6 changed files with 278 additions and 253 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
target target
Cargo.lock Cargo.lock
db/ db/
*.log

62
src/daemon.rs Normal file
View File

@ -0,0 +1,62 @@
extern crate itertools;
extern crate reqwest;
use bitcoin::blockdata::block::BlockHeader;
use bitcoin::network::encodable::ConsensusDecodable;
use bitcoin::network::serialize::BitcoinHash;
use bitcoin::network::serialize::RawDecoder;
use bitcoin::util::hash::Sha256dHash;
use self::itertools::enumerate;
use std::collections::{HashMap, VecDeque};
use std::io::Cursor;
use Bytes;
const HEADER_SIZE: usize = 80;
type HeaderMap = HashMap<String, BlockHeader>;
fn get(resource: &str) -> reqwest::Response {
let url = format!("http://localhost:8332/rest/{}", resource);
reqwest::get(&url).unwrap()
}
pub fn get_bin(resource: &str) -> Bytes {
let mut buf = Bytes::new();
let mut resp = get(resource);
resp.copy_to(&mut buf).unwrap();
buf
}
pub fn get_headers() -> (HeaderMap, String) {
let mut headers = HashMap::new();
let mut blockhash =
String::from("000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"); // genesis
loop {
let data = get_bin(&format!("headers/2000/{}.bin", blockhash));
let num_of_headers = data.len() / HEADER_SIZE;
let mut decoder = RawDecoder::new(Cursor::new(data));
for _ in 0..num_of_headers {
let header: BlockHeader = ConsensusDecodable::consensus_decode(&mut decoder).unwrap();
blockhash = header.bitcoin_hash().be_hex_string();
headers.insert(blockhash.to_string(), header);
}
if num_of_headers == 1 {
break;
}
}
info!("loaded {} headers till {}", headers.len(), blockhash);
(headers, blockhash)
}
pub fn enumerate_headers(headers: &HeaderMap, bestblockhash: &str) -> Vec<(usize, String)> {
let null_hash = Sha256dHash::default().be_hex_string();
let mut hashes = VecDeque::<String>::new();
let mut blockhash = bestblockhash.to_string();
while blockhash != null_hash {
let header: &BlockHeader = headers.get(&blockhash).unwrap();
hashes.push_front(blockhash);
blockhash = header.prev_blockhash.be_hex_string();
}
enumerate(hashes).collect()
}

View File

@ -4,95 +4,27 @@ extern crate log;
extern crate bitcoin; extern crate bitcoin;
extern crate byteorder; extern crate byteorder;
extern crate crypto; extern crate crypto;
extern crate itertools;
extern crate reqwest;
extern crate rocksdb;
extern crate serde_json;
extern crate simple_logger; extern crate simple_logger;
extern crate time;
extern crate zmq;
use bitcoin::blockdata::block::{Block, BlockHeader}; mod daemon;
use bitcoin::network::encodable::ConsensusDecodable; mod store;
mod timer;
mod waiter;
use bitcoin::blockdata::block::Block;
use bitcoin::network::serialize::BitcoinHash; use bitcoin::network::serialize::BitcoinHash;
use bitcoin::network::serialize::{deserialize, serialize, RawDecoder}; use bitcoin::network::serialize::{deserialize, serialize};
use bitcoin::util::hash::Sha256dHash; use bitcoin::util::hash::Sha256dHash;
use byteorder::{LittleEndian, WriteBytesExt}; use byteorder::{LittleEndian, WriteBytesExt};
use crypto::digest::Digest; use crypto::digest::Digest;
use crypto::sha2::Sha256; use crypto::sha2::Sha256;
use itertools::enumerate; use store::{Row, Store, StoreOptions};
use rocksdb::{DBCompactionStyle, DBCompressionType, WriteBatch, WriteOptions, DB}; use timer::Timer;
// use serde_json::Value;
use std::collections::{HashMap, VecDeque};
use std::fmt::Write;
use std::io::Cursor;
use time::{Duration, PreciseTime};
const HEADER_SIZE: usize = 80;
const HASH_LEN: usize = 8; const HASH_LEN: usize = 8;
type HeaderMap = HashMap<String, BlockHeader>;
type Bytes = Vec<u8>; type Bytes = Vec<u8>;
fn revhex(data: &[u8]) -> String {
let mut ret = String::with_capacity(data.len() * 2);
for item in data.iter().rev() {
ret.push(std::char::from_digit((*item / 0x10) as u32, 16).unwrap());
ret.push(std::char::from_digit((*item & 0x0f) as u32, 16).unwrap());
}
ret
}
fn get(resource: &str) -> reqwest::Response {
let url = format!("http://localhost:8332/rest/{}", resource);
reqwest::get(&url).unwrap()
}
fn get_bin(resource: &str) -> Bytes {
let mut buf = Bytes::new();
let mut resp = get(resource);
resp.copy_to(&mut buf).unwrap();
buf
}
fn get_headers() -> (HeaderMap, String) {
let mut headers = HashMap::new();
let mut blockhash =
String::from("000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"); // genesis
loop {
let data = get_bin(&format!("headers/2000/{}.bin", blockhash));
let num_of_headers = data.len() / HEADER_SIZE;
let mut decoder = RawDecoder::new(Cursor::new(data));
for _ in 0..num_of_headers {
let header: BlockHeader = ConsensusDecodable::consensus_decode(&mut decoder).unwrap();
blockhash = header.bitcoin_hash().be_hex_string();
headers.insert(blockhash.to_string(), header);
}
if num_of_headers == 1 {
break;
}
}
info!("loaded {} headers till {}", headers.len(), blockhash);
(headers, blockhash)
}
fn enumerate_headers(headers: &HeaderMap, bestblockhash: &str) -> Vec<(usize, String)> {
let null_hash = Sha256dHash::default().be_hex_string();
let mut hashes = VecDeque::<String>::new();
let mut blockhash = bestblockhash.to_string();
while blockhash != null_hash {
let header: &BlockHeader = headers.get(&blockhash).unwrap();
hashes.push_front(blockhash);
blockhash = header.prev_blockhash.be_hex_string();
}
enumerate(hashes).collect()
}
struct Row {
key: Bytes,
value: Bytes,
}
fn index_block(block: &Block, height: usize) -> Vec<Row> { fn index_block(block: &Block, height: usize) -> Vec<Row> {
let null_hash = Sha256dHash::default(); let null_hash = Sha256dHash::default();
let mut rows = Vec::new(); let mut rows = Vec::new();
@ -153,164 +85,12 @@ fn index_block(block: &Block, height: usize) -> Vec<Row> {
rows rows
} }
// fn get_bestblockhash() -> String {
// let data = get("chaininfo.json").text().unwrap();
// let val: Value = serde_json::from_str(&data).unwrap();
// val["bestblockhash"].as_str().unwrap().to_string()
// }
struct Timer {
durations: HashMap<String, Duration>,
start: Option<PreciseTime>,
name: String,
}
impl Timer {
pub fn new() -> Timer {
Timer {
durations: HashMap::new(),
start: None,
name: String::from(""),
}
}
pub fn start(&mut self, name: &str) {
self.start = Some(self.stop());
self.name = name.to_string();
}
pub fn stop(&mut self) -> PreciseTime {
let now = PreciseTime::now();
if let Some(start) = self.start {
let duration = self.durations
.entry(self.name.to_string()) // ???
.or_insert(Duration::zero());
*duration = *duration + start.to(now);
}
self.start = None;
now
}
pub fn stats(&self) -> String {
let mut s = String::new();
let mut total = 0f64;
for (k, v) in self.durations.iter() {
let t = v.num_milliseconds() as f64 / 1e3;
total += t;
write!(&mut s, "{}: {:.2}s ", k, t).unwrap();
}
write!(&mut s, "total: {:.2}s", total).unwrap();
return s;
}
}
struct Store {
db: DB,
rows: Vec<Row>,
start: PreciseTime,
}
struct StoreOptions {
auto_compact: bool,
}
impl Store {
/// Opens a new RocksDB at the specified location.
pub fn open(path: &str, opts: StoreOptions) -> Store {
let mut db_opts = rocksdb::Options::default();
db_opts.create_if_missing(true);
db_opts.set_compaction_style(DBCompactionStyle::Level);
db_opts.set_max_open_files(256);
db_opts.set_use_fsync(false);
db_opts.set_compression_type(DBCompressionType::Snappy);
db_opts.set_target_file_size_base(128 << 20);
db_opts.set_write_buffer_size(256 << 20);
// db_opts.set_compaction_readahead_size(2 << 20);
db_opts.set_disable_auto_compactions(!opts.auto_compact);
let mut block_opts = rocksdb::BlockBasedOptions::default();
block_opts.set_block_size(256 << 10);
info!("opening {}", path);
Store {
db: DB::open(&db_opts, &path).unwrap(),
rows: vec![],
start: PreciseTime::now(),
}
}
pub fn persist(&mut self, mut rows: Vec<Row>) {
self.rows.append(&mut rows);
let elapsed: Duration = self.start.to(PreciseTime::now());
if elapsed < Duration::seconds(60) && self.rows.len() < 10_000_000 {
return;
}
self.flush();
}
pub fn flush(&mut self) {
let mut batch = WriteBatch::default();
for row in &self.rows {
batch.put(row.key.as_slice(), row.value.as_slice()).unwrap();
}
let mut opts = WriteOptions::new();
opts.set_sync(true);
self.db.write_opt(batch, &opts).unwrap();
self.rows.clear();
self.start = PreciseTime::now();
}
pub fn read_headers(&mut self) -> HashMap<String, BlockHeader> {
let mut headers = HashMap::new();
for row in self.scan(b"B") {
let blockhash = revhex(&row.key);
let header: BlockHeader = deserialize(&row.value).unwrap();
headers.insert(blockhash, header);
}
headers
}
pub fn has_block(&mut self, blockhash: &[u8]) -> bool {
let key: &[u8] = &[b"B", blockhash].concat();
self.db.get(key).unwrap().is_some()
}
pub fn compact_if_needed(&mut self) {
let key = b"F"; // full compaction
if self.db.get(key).unwrap().is_some() {
return;
}
info!("full compaction");
self.db.compact_range(None, None); // should take a while
self.db.put(key, b"").unwrap();
}
// Use generators ???
fn scan(&mut self, prefix: &[u8]) -> Vec<Row> {
let mut rows = Vec::new();
let mut iter = self.db.raw_iterator();
let prefix_len = prefix.len();
iter.seek(prefix);
while iter.valid() {
let key = &iter.key().unwrap();
if &key[..prefix_len] != prefix {
break;
}
rows.push(Row {
key: key[prefix_len..].to_vec(),
value: iter.value().unwrap().to_vec(),
});
iter.next();
}
rows
}
}
fn index_blocks(store: &mut Store) { fn index_blocks(store: &mut Store) {
let indexed_headers = store.read_headers(); let indexed_headers = store.read_headers();
info!("indexed {} headers", indexed_headers.len()); info!("indexed {} headers", indexed_headers.len());
let (headers, blockhash) = get_headers(); let (headers, blockhash) = daemon::get_headers();
let mut hashes: Vec<(usize, String)> = enumerate_headers(&headers, &blockhash); let mut hashes: Vec<(usize, String)> = daemon::enumerate_headers(&headers, &blockhash);
hashes.retain(|item| !indexed_headers.contains_key(&item.1)); hashes.retain(|item| !indexed_headers.contains_key(&item.1));
info!("indexing {} blocks", hashes.len()); info!("indexing {} blocks", hashes.len());
@ -322,7 +102,7 @@ fn index_blocks(store: &mut Store) {
for (height, blockhash) in hashes { for (height, blockhash) in hashes {
timer.start("get"); timer.start("get");
let buf = get_bin(&format!("block/{}.bin", &blockhash)); let buf: Bytes = daemon::get_bin(&format!("block/{}.bin", &blockhash));
timer.start("parse"); timer.start("parse");
let block: Block = deserialize(&buf).unwrap(); let block: Block = deserialize(&buf).unwrap();
@ -354,29 +134,9 @@ fn index_blocks(store: &mut Store) {
store.flush(); store.flush();
} }
struct Waiter {
sock: zmq::Socket,
}
impl Waiter {
pub fn new(endpoint: &str) -> Waiter {
let ctx = zmq::Context::new();
let sock = ctx.socket(zmq::SocketType::SUB).unwrap();
sock.set_subscribe(b"hashblock").unwrap();
sock.connect(endpoint).unwrap();
Waiter { sock }
}
pub fn wait(&self) -> Bytes {
let mut blockhash = self.sock.recv_multipart(0).unwrap().remove(1);
blockhash.reverse();
blockhash
}
}
fn main() { fn main() {
simple_logger::init_with_level(log::Level::Info).unwrap(); simple_logger::init_with_level(log::Level::Info).unwrap();
let waiter = Waiter::new("tcp://localhost:28332"); let waiter = waiter::Waiter::new("tcp://localhost:28332");
{ {
let mut store = Store::open( let mut store = Store::open(
"db/mainnet", "db/mainnet",

129
src/store.rs Normal file
View File

@ -0,0 +1,129 @@
extern crate bitcoin;
extern crate byteorder;
extern crate crypto;
extern crate rocksdb;
extern crate time;
use self::rocksdb::{DBCompactionStyle, DBCompressionType, WriteBatch, WriteOptions, DB};
use bitcoin::blockdata::block::BlockHeader;
use bitcoin::network::serialize::deserialize;
use std::char::from_digit;
use std::collections::HashMap;
use self::time::{Duration, PreciseTime};
use Bytes;
pub struct Store {
db: DB,
rows: Vec<Row>,
start: PreciseTime,
}
pub struct Row {
pub key: Bytes,
pub value: Bytes,
}
pub struct StoreOptions {
pub auto_compact: bool,
}
fn revhex(data: &[u8]) -> String {
let mut ret = String::with_capacity(data.len() * 2);
for item in data.iter().rev() {
ret.push(from_digit((*item / 0x10) as u32, 16).unwrap());
ret.push(from_digit((*item & 0x0f) as u32, 16).unwrap());
}
ret
}
impl Store {
/// Opens a new RocksDB at the specified location.
pub fn open(path: &str, opts: StoreOptions) -> Store {
let mut db_opts = rocksdb::Options::default();
db_opts.create_if_missing(true);
db_opts.set_compaction_style(DBCompactionStyle::Level);
db_opts.set_max_open_files(256);
db_opts.set_use_fsync(false);
db_opts.set_compression_type(DBCompressionType::Snappy);
db_opts.set_target_file_size_base(128 << 20);
db_opts.set_write_buffer_size(256 << 20);
// db_opts.set_compaction_readahead_size(2 << 20);
db_opts.set_disable_auto_compactions(!opts.auto_compact);
let mut block_opts = rocksdb::BlockBasedOptions::default();
block_opts.set_block_size(256 << 10);
info!("opening {}", path);
Store {
db: DB::open(&db_opts, &path).unwrap(),
rows: vec![],
start: PreciseTime::now(),
}
}
pub fn persist(&mut self, mut rows: Vec<Row>) {
self.rows.append(&mut rows);
let elapsed: Duration = self.start.to(PreciseTime::now());
if elapsed < Duration::seconds(60) && self.rows.len() < 10_000_000 {
return;
}
self.flush();
}
pub fn flush(&mut self) {
let mut batch = WriteBatch::default();
for row in &self.rows {
batch.put(row.key.as_slice(), row.value.as_slice()).unwrap();
}
let mut opts = WriteOptions::new();
opts.set_sync(true);
self.db.write_opt(batch, &opts).unwrap();
self.rows.clear();
self.start = PreciseTime::now();
}
pub fn read_headers(&mut self) -> HashMap<String, BlockHeader> {
let mut headers = HashMap::new();
for row in self.scan(b"B") {
let blockhash = revhex(&row.key);
let header: BlockHeader = deserialize(&row.value).unwrap();
headers.insert(blockhash, header);
}
headers
}
pub fn has_block(&mut self, blockhash: &[u8]) -> bool {
let key: &[u8] = &[b"B", blockhash].concat();
self.db.get(key).unwrap().is_some()
}
pub fn compact_if_needed(&mut self) {
let key = b"F"; // full compaction
if self.db.get(key).unwrap().is_some() {
return;
}
info!("full compaction");
self.db.compact_range(None, None); // should take a while
self.db.put(key, b"").unwrap();
}
// Use generators ???
fn scan(&mut self, prefix: &[u8]) -> Vec<Row> {
let mut rows = Vec::new();
let mut iter = self.db.raw_iterator();
let prefix_len = prefix.len();
iter.seek(prefix);
while iter.valid() {
let key = &iter.key().unwrap();
if &key[..prefix_len] != prefix {
break;
}
rows.push(Row {
key: key[prefix_len..].to_vec(),
value: iter.value().unwrap().to_vec(),
});
iter.next();
}
rows
}
}

50
src/timer.rs Normal file
View File

@ -0,0 +1,50 @@
extern crate time;
use std::fmt::Write;
use std::collections::HashMap;
use self::time::{Duration, PreciseTime};
pub struct Timer {
durations: HashMap<String, Duration>,
start: Option<PreciseTime>,
name: String,
}
impl Timer {
pub fn new() -> Timer {
Timer {
durations: HashMap::new(),
start: None,
name: String::from(""),
}
}
pub fn start(&mut self, name: &str) {
self.start = Some(self.stop());
self.name = name.to_string();
}
pub fn stop(&mut self) -> PreciseTime {
let now = PreciseTime::now();
if let Some(start) = self.start {
let duration = self.durations
.entry(self.name.to_string()) // ???
.or_insert(Duration::zero());
*duration = *duration + start.to(now);
}
self.start = None;
now
}
pub fn stats(&self) -> String {
let mut s = String::new();
let mut total = 0f64;
for (k, v) in self.durations.iter() {
let t = v.num_milliseconds() as f64 / 1e3;
total += t;
write!(&mut s, "{}: {:.2}s ", k, t).unwrap();
}
write!(&mut s, "total: {:.2}s", total).unwrap();
return s;
}
}

23
src/waiter.rs Normal file
View File

@ -0,0 +1,23 @@
extern crate zmq;
use Bytes;
pub struct Waiter {
sock: zmq::Socket,
}
impl Waiter {
pub fn new(endpoint: &str) -> Waiter {
let ctx = zmq::Context::new();
let sock = ctx.socket(zmq::SocketType::SUB).unwrap();
sock.set_subscribe(b"hashblock").unwrap();
sock.connect(endpoint).unwrap();
Waiter { sock }
}
pub fn wait(&self) -> Bytes {
let mut blockhash = self.sock.recv_multipart(0).unwrap().remove(1);
blockhash.reverse();
blockhash
}
}