From 92ac75e50d4ad3341d6d4d8f0cca06240be2d5cc Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Tue, 1 May 2018 15:00:45 +0300 Subject: [PATCH] Support get_chunk API call via concurrent Index::header_list() --- src/bin/index_server.rs | 4 ++-- src/index.rs | 23 ++++++++++++-------- src/query.rs | 29 +++++++++++++++++++------ src/rpc.rs | 47 +++++++++++++++++++++++------------------ 4 files changed, 65 insertions(+), 38 deletions(-) diff --git a/src/bin/index_server.rs b/src/bin/index_server.rs index ee54a6b..d997a80 100644 --- a/src/bin/index_server.rs +++ b/src/bin/index_server.rs @@ -33,7 +33,7 @@ struct Config { } fn run_server(config: Config) { - let mut index = index::Index::new(); + let index = index::Index::new(); let waiter = waiter::Waiter::new("tcp://localhost:28332"); let daemon = daemon::Daemon::new("http://localhost:8332"); { @@ -51,7 +51,7 @@ fn run_server(config: Config) { } let store = store::Store::open(DB_PATH, store::StoreOptions { auto_compact: true }); - let query = query::Query::new(&store, &daemon); + let query = query::Query::new(&store, &daemon, &index); crossbeam::scope(|scope| { scope.spawn(|| rpc::serve("localhost:50001", &query)); diff --git a/src/index.rs b/src/index.rs index 250a973..7510be3 100644 --- a/src/index.rs +++ b/src/index.rs @@ -8,6 +8,7 @@ use crypto::digest::Digest; use crypto::sha2::Sha256; use pbr; use std::io::{stderr, Stderr}; +use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use time; @@ -268,25 +269,25 @@ impl<'a> Iterator for BatchIter<'a> { pub struct Index { // TODO: store also a &HeaderMap. // TODO: store also latest snapshot. - headers: HeaderList, + headers: RwLock>, } impl Index { pub fn new() -> Index { Index { - headers: HeaderList::empty(), + headers: RwLock::new(Arc::new(HeaderList::empty())), } } + pub fn headers_list(&self) -> Arc { + self.headers.read().unwrap().clone() + } + fn get_missing_headers<'a>( &self, store: &Store, current_headers: &'a HeaderList, ) -> Vec<&'a HeaderEntry> { - if current_headers.equals(&self.headers) { - return Vec::new(); // everything was indexed already. - } - let indexed_headers: HeaderMap = read_indexed_headers(&store); { let best_block_header: &BlockHeader = @@ -306,15 +307,19 @@ impl Index { .collect() } - pub fn update(&mut self, store: &Store, daemon: &Daemon) { - let current_headers = daemon.enumerate_headers(&self.headers); + pub fn update(&self, store: &Store, daemon: &Daemon) { + let indexed_headers: Arc = self.headers_list(); + let current_headers = daemon.enumerate_headers(&*indexed_headers); { + if indexed_headers.equals(¤t_headers) { + return; // everything was indexed already. + } let missing_headers = self.get_missing_headers(&store, ¤t_headers); for rows in BatchIter::new(Indexer::new(missing_headers, &daemon)) { // TODO: add timing store.persist(&rows); } } - self.headers = current_headers + *self.headers.write().unwrap() = Arc::new(current_headers); } } diff --git a/src/query.rs b/src/query.rs index 5356e9e..8506536 100644 --- a/src/query.rs +++ b/src/query.rs @@ -1,23 +1,29 @@ use bincode; +use bitcoin::blockdata::block::BlockHeader; use bitcoin::blockdata::transaction::Transaction; -use bitcoin::network::serialize::deserialize; +use bitcoin::network::serialize::{deserialize, serialize}; use bitcoin::util::hash::Sha256dHash; use itertools::enumerate; use daemon::Daemon; -use index::{compute_script_hash, hash_prefix, HashPrefix, TxInKey, TxInRow, TxKey, TxOutRow, - HASH_PREFIX_LEN}; +use index::{compute_script_hash, hash_prefix, HashPrefix, Index, TxInKey, TxInRow, TxKey, + TxOutRow, HASH_PREFIX_LEN}; use store::Store; use types::Bytes; pub struct Query<'a> { store: &'a Store, daemon: &'a Daemon, + index: &'a Index, } impl<'a> Query<'a> { - pub fn new(store: &'a Store, daemon: &'a Daemon) -> Query<'a> { - Query { store, daemon } + pub fn new(store: &'a Store, daemon: &'a Daemon, index: &'a Index) -> Query<'a> { + Query { + store, + daemon, + index, + } } fn load_txns(&self, prefixes: Vec) -> Vec { @@ -97,8 +103,19 @@ impl<'a> Query<'a> { balance as f64 / 100_000_000f64 } - pub fn get_tx(&self, tx_hash: Sha256dHash) -> Bytes { + pub fn get_tx(&self, tx_hash: &Sha256dHash) -> Bytes { self.daemon .get(&format!("tx/{}.bin", tx_hash.be_hex_string())) } + + pub fn get_headers(&self, heights: &[usize]) -> Vec { + let headers_list = self.index.headers_list(); + let headers = headers_list.headers(); + let mut result = Vec::new(); + for h in heights { + let header: &BlockHeader = headers[*h].header(); + result.push(serialize(header).unwrap()); + } + result + } } diff --git a/src/rpc.rs b/src/rpc.rs index ea7662b..a043fe8 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -1,4 +1,5 @@ use bitcoin::util::hash::Sha256dHash; +use itertools; use serde_json::{from_str, Number, Value}; use std::io::{BufRead, BufReader, Write}; use std::net::{SocketAddr, TcpListener, TcpStream}; @@ -37,47 +38,50 @@ impl<'a> Handler<'a> { Ok(json!([])) // TODO: consult with actual mempool } - fn blockchain_estimatefee(&self, _params: &[&str]) -> Result { + fn blockchain_block_get_chunk(&self, params: &[Value]) -> Result { + const CHUNK_SIZE: usize = 2016; + let index = params.get(0).chain_err(|| "missing index")?; + let index = index.as_u64().chain_err(|| "non-number index")? as usize; + let heights: Vec = (0..CHUNK_SIZE).map(|h| index * CHUNK_SIZE + h).collect(); + let headers = self.query.get_headers(&heights); + let result = itertools::join(headers.into_iter().map(|x| util::hexlify(&x)), ""); + Ok(json!(result)) + } + + fn blockchain_estimatefee(&self, _params: &[Value]) -> Result { Ok(json!(1e-5)) // TODO: consult with actual mempool } - fn blockchain_scripthash_subscribe(&self, _params: &[&str]) -> Result { + fn blockchain_scripthash_subscribe(&self, _params: &[Value]) -> Result { Ok(json!("HEX_STATUS")) } - fn blockchain_scripthash_get_balance(&self, params: &[&str]) -> Result { - let script_hash_hex = params.get(0).chain_err(|| "missing scripthash")?; - let script_hash = - Sha256dHash::from_hex(script_hash_hex).chain_err(|| "invalid scripthash")?; + fn blockchain_scripthash_get_balance(&self, params: &[Value]) -> Result { + let script_hash = params.get(0).chain_err(|| "missing scripthash")?; + let script_hash = script_hash.as_str().chain_err(|| "non-string scripthash")?; + let script_hash = Sha256dHash::from_hex(script_hash).chain_err(|| "non-hex scripthash")?; let confirmed = self.query.balance(&script_hash[..]); Ok(json!({ "confirmed": confirmed })) // TODO: "unconfirmed" } - fn blockchain_scripthash_get_history(&self, _params: &[&str]) -> Result { + fn blockchain_scripthash_get_history(&self, _params: &[Value]) -> Result { Ok(json!([])) // TODO: list of {tx_hash: "ABC", height: 123} } - fn blockchain_transaction_get(&self, params: &[&str]) -> Result { + fn blockchain_transaction_get(&self, params: &[Value]) -> Result { // TODO: handle 'verbose' param - let tx_hash_hex = params.get(0).chain_err(|| "missing tx_hash")?; - let tx_hash = Sha256dHash::from_hex(tx_hash_hex).chain_err(|| "invalid tx_hash")?; - let tx_hex = util::hexlify(&self.query.get_tx(tx_hash)); + let tx_hash = params.get(0).chain_err(|| "missing tx_hash")?; + let tx_hash = tx_hash.as_str().chain_err(|| "non-string tx_hash")?; + let tx_hash = Sha256dHash::from_hex(tx_hash).chain_err(|| "non-hex tx_hash")?; + let tx_hex = util::hexlify(&self.query.get_tx(&tx_hash)); Ok(json!(tx_hex)) } - fn blockchain_transaction_get_merkle(&self, _params: &[&str]) -> Result { + fn blockchain_transaction_get_merkle(&self, _params: &[Value]) -> Result { Ok(json!({"block_height": 123, "merkle": ["A", "B", "C"], "pos": 45})) } - fn handle_command(&self, method: &str, params_values: &[Value], id: &Number) -> Result { - let mut params = Vec::<&str>::new(); - for value in params_values { - if let Some(s) = value.as_str() { - params.push(s); - } else { - bail!("invalid param: {:?}", value); - } - } + fn handle_command(&self, method: &str, params: &[Value], id: &Number) -> Result { let result = match method { "blockchain.headers.subscribe" => self.blockchain_headers_subscribe(), "server.version" => self.server_version(), @@ -85,6 +89,7 @@ impl<'a> Handler<'a> { "server.donation_address" => self.server_donation_address(), "server.peers.subscribe" => self.server_peers_subscribe(), "mempool.get_fee_histogram" => self.mempool_get_fee_histogram(), + "blockchain.block.get_chunk" => self.blockchain_block_get_chunk(¶ms), "blockchain.estimatefee" => self.blockchain_estimatefee(¶ms), "blockchain.scripthash.subscribe" => self.blockchain_scripthash_subscribe(¶ms), "blockchain.scripthash.get_balance" => self.blockchain_scripthash_get_balance(¶ms),