1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 09:54:09 +01:00

Support get_chunk API call via concurrent Index::header_list()

This commit is contained in:
Roman Zeyde 2018-05-01 15:00:45 +03:00
parent e3490568cf
commit 92ac75e50d
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB
4 changed files with 65 additions and 38 deletions

View File

@ -33,7 +33,7 @@ struct Config {
}
fn run_server(config: Config) {
let mut index = index::Index::new();
let index = index::Index::new();
let waiter = waiter::Waiter::new("tcp://localhost:28332");
let daemon = daemon::Daemon::new("http://localhost:8332");
{
@ -51,7 +51,7 @@ fn run_server(config: Config) {
}
let store = store::Store::open(DB_PATH, store::StoreOptions { auto_compact: true });
let query = query::Query::new(&store, &daemon);
let query = query::Query::new(&store, &daemon, &index);
crossbeam::scope(|scope| {
scope.spawn(|| rpc::serve("localhost:50001", &query));

View File

@ -8,6 +8,7 @@ use crypto::digest::Digest;
use crypto::sha2::Sha256;
use pbr;
use std::io::{stderr, Stderr};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use time;
@ -268,25 +269,25 @@ impl<'a> Iterator for BatchIter<'a> {
pub struct Index {
// TODO: store also a &HeaderMap.
// TODO: store also latest snapshot.
headers: HeaderList,
headers: RwLock<Arc<HeaderList>>,
}
impl Index {
pub fn new() -> Index {
Index {
headers: HeaderList::empty(),
headers: RwLock::new(Arc::new(HeaderList::empty())),
}
}
pub fn headers_list(&self) -> Arc<HeaderList> {
self.headers.read().unwrap().clone()
}
fn get_missing_headers<'a>(
&self,
store: &Store,
current_headers: &'a HeaderList,
) -> Vec<&'a HeaderEntry> {
if current_headers.equals(&self.headers) {
return Vec::new(); // everything was indexed already.
}
let indexed_headers: HeaderMap = read_indexed_headers(&store);
{
let best_block_header: &BlockHeader =
@ -306,15 +307,19 @@ impl Index {
.collect()
}
pub fn update(&mut self, store: &Store, daemon: &Daemon) {
let current_headers = daemon.enumerate_headers(&self.headers);
pub fn update(&self, store: &Store, daemon: &Daemon) {
let indexed_headers: Arc<HeaderList> = self.headers_list();
let current_headers = daemon.enumerate_headers(&*indexed_headers);
{
if indexed_headers.equals(&current_headers) {
return; // everything was indexed already.
}
let missing_headers = self.get_missing_headers(&store, &current_headers);
for rows in BatchIter::new(Indexer::new(missing_headers, &daemon)) {
// TODO: add timing
store.persist(&rows);
}
}
self.headers = current_headers
*self.headers.write().unwrap() = Arc::new(current_headers);
}
}

View File

@ -1,23 +1,29 @@
use bincode;
use bitcoin::blockdata::block::BlockHeader;
use bitcoin::blockdata::transaction::Transaction;
use bitcoin::network::serialize::deserialize;
use bitcoin::network::serialize::{deserialize, serialize};
use bitcoin::util::hash::Sha256dHash;
use itertools::enumerate;
use daemon::Daemon;
use index::{compute_script_hash, hash_prefix, HashPrefix, TxInKey, TxInRow, TxKey, TxOutRow,
HASH_PREFIX_LEN};
use index::{compute_script_hash, hash_prefix, HashPrefix, Index, TxInKey, TxInRow, TxKey,
TxOutRow, HASH_PREFIX_LEN};
use store::Store;
use types::Bytes;
pub struct Query<'a> {
store: &'a Store,
daemon: &'a Daemon,
index: &'a Index,
}
impl<'a> Query<'a> {
pub fn new(store: &'a Store, daemon: &'a Daemon) -> Query<'a> {
Query { store, daemon }
pub fn new(store: &'a Store, daemon: &'a Daemon, index: &'a Index) -> Query<'a> {
Query {
store,
daemon,
index,
}
}
fn load_txns(&self, prefixes: Vec<HashPrefix>) -> Vec<Transaction> {
@ -97,8 +103,19 @@ impl<'a> Query<'a> {
balance as f64 / 100_000_000f64
}
pub fn get_tx(&self, tx_hash: Sha256dHash) -> Bytes {
pub fn get_tx(&self, tx_hash: &Sha256dHash) -> Bytes {
self.daemon
.get(&format!("tx/{}.bin", tx_hash.be_hex_string()))
}
pub fn get_headers(&self, heights: &[usize]) -> Vec<Bytes> {
let headers_list = self.index.headers_list();
let headers = headers_list.headers();
let mut result = Vec::new();
for h in heights {
let header: &BlockHeader = headers[*h].header();
result.push(serialize(header).unwrap());
}
result
}
}

View File

@ -1,4 +1,5 @@
use bitcoin::util::hash::Sha256dHash;
use itertools;
use serde_json::{from_str, Number, Value};
use std::io::{BufRead, BufReader, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
@ -37,47 +38,50 @@ impl<'a> Handler<'a> {
Ok(json!([])) // TODO: consult with actual mempool
}
fn blockchain_estimatefee(&self, _params: &[&str]) -> Result<Value> {
fn blockchain_block_get_chunk(&self, params: &[Value]) -> Result<Value> {
const CHUNK_SIZE: usize = 2016;
let index = params.get(0).chain_err(|| "missing index")?;
let index = index.as_u64().chain_err(|| "non-number index")? as usize;
let heights: Vec<usize> = (0..CHUNK_SIZE).map(|h| index * CHUNK_SIZE + h).collect();
let headers = self.query.get_headers(&heights);
let result = itertools::join(headers.into_iter().map(|x| util::hexlify(&x)), "");
Ok(json!(result))
}
fn blockchain_estimatefee(&self, _params: &[Value]) -> Result<Value> {
Ok(json!(1e-5)) // TODO: consult with actual mempool
}
fn blockchain_scripthash_subscribe(&self, _params: &[&str]) -> Result<Value> {
fn blockchain_scripthash_subscribe(&self, _params: &[Value]) -> Result<Value> {
Ok(json!("HEX_STATUS"))
}
fn blockchain_scripthash_get_balance(&self, params: &[&str]) -> Result<Value> {
let script_hash_hex = params.get(0).chain_err(|| "missing scripthash")?;
let script_hash =
Sha256dHash::from_hex(script_hash_hex).chain_err(|| "invalid scripthash")?;
fn blockchain_scripthash_get_balance(&self, params: &[Value]) -> Result<Value> {
let script_hash = params.get(0).chain_err(|| "missing scripthash")?;
let script_hash = script_hash.as_str().chain_err(|| "non-string scripthash")?;
let script_hash = Sha256dHash::from_hex(script_hash).chain_err(|| "non-hex scripthash")?;
let confirmed = self.query.balance(&script_hash[..]);
Ok(json!({ "confirmed": confirmed })) // TODO: "unconfirmed"
}
fn blockchain_scripthash_get_history(&self, _params: &[&str]) -> Result<Value> {
fn blockchain_scripthash_get_history(&self, _params: &[Value]) -> Result<Value> {
Ok(json!([])) // TODO: list of {tx_hash: "ABC", height: 123}
}
fn blockchain_transaction_get(&self, params: &[&str]) -> Result<Value> {
fn blockchain_transaction_get(&self, params: &[Value]) -> Result<Value> {
// TODO: handle 'verbose' param
let tx_hash_hex = params.get(0).chain_err(|| "missing tx_hash")?;
let tx_hash = Sha256dHash::from_hex(tx_hash_hex).chain_err(|| "invalid tx_hash")?;
let tx_hex = util::hexlify(&self.query.get_tx(tx_hash));
let tx_hash = params.get(0).chain_err(|| "missing tx_hash")?;
let tx_hash = tx_hash.as_str().chain_err(|| "non-string tx_hash")?;
let tx_hash = Sha256dHash::from_hex(tx_hash).chain_err(|| "non-hex tx_hash")?;
let tx_hex = util::hexlify(&self.query.get_tx(&tx_hash));
Ok(json!(tx_hex))
}
fn blockchain_transaction_get_merkle(&self, _params: &[&str]) -> Result<Value> {
fn blockchain_transaction_get_merkle(&self, _params: &[Value]) -> Result<Value> {
Ok(json!({"block_height": 123, "merkle": ["A", "B", "C"], "pos": 45}))
}
fn handle_command(&self, method: &str, params_values: &[Value], id: &Number) -> Result<Value> {
let mut params = Vec::<&str>::new();
for value in params_values {
if let Some(s) = value.as_str() {
params.push(s);
} else {
bail!("invalid param: {:?}", value);
}
}
fn handle_command(&self, method: &str, params: &[Value], id: &Number) -> Result<Value> {
let result = match method {
"blockchain.headers.subscribe" => self.blockchain_headers_subscribe(),
"server.version" => self.server_version(),
@ -85,6 +89,7 @@ impl<'a> Handler<'a> {
"server.donation_address" => self.server_donation_address(),
"server.peers.subscribe" => self.server_peers_subscribe(),
"mempool.get_fee_histogram" => self.mempool_get_fee_histogram(),
"blockchain.block.get_chunk" => self.blockchain_block_get_chunk(&params),
"blockchain.estimatefee" => self.blockchain_estimatefee(&params),
"blockchain.scripthash.subscribe" => self.blockchain_scripthash_subscribe(&params),
"blockchain.scripthash.get_balance" => self.blockchain_scripthash_get_balance(&params),