diff --git a/server.sh b/server.sh index e1d2f90..a3b3614 100755 --- a/server.sh +++ b/server.sh @@ -8,7 +8,7 @@ cargo build --all --features "metrics_process" --release NETWORK=$1 shift -DB=./db +DB=./db1 export RUST_LOG=${RUST_LOG-electrs=INFO} target/release/electrs --network $NETWORK --db-dir $DB --daemon-dir $HOME/.bitcoin $* diff --git a/src/chain.rs b/src/chain.rs index 2267457..302c870 100644 --- a/src/chain.rs +++ b/src/chain.rs @@ -5,20 +5,17 @@ use bitcoin::hashes::hex::FromHex; use bitcoin::network::constants; use bitcoin::{BlockHash, BlockHeader}; +use crate::types::{GlobalTxId, HeaderRow}; + /// A new header found, to be added to the chain at specific height -pub(crate) struct NewHeader { - header: BlockHeader, +pub(crate) struct NewBlockHash { hash: BlockHash, height: usize, } -impl NewHeader { - pub(crate) fn from((header, height): (BlockHeader, usize)) -> Self { - Self { - header, - hash: header.block_hash(), - height, - } +impl NewBlockHash { + pub(crate) fn from(hash: BlockHash, height: usize) -> Self { + Self { hash, height } } pub(crate) fn height(&self) -> usize { @@ -30,9 +27,33 @@ impl NewHeader { } } +pub(crate) struct IndexedHeader { + header: BlockHeader, + hash: BlockHash, + height: usize, + gtxid: GlobalTxId, +} + +impl IndexedHeader { + pub(crate) fn from(row: &HeaderRow, height: usize) -> Self { + Self { + header: row.header, + hash: row.header.block_hash(), + height, + gtxid: row.gtxid, + } + } +} + +struct HeaderEntry { + hash: BlockHash, + header: BlockHeader, + gtxid: GlobalTxId, +} + /// Current blockchain headers' list pub struct Chain { - headers: Vec<(BlockHash, BlockHeader)>, + headers: Vec, heights: HashMap, } @@ -49,7 +70,11 @@ impl Chain { let genesis: BlockHeader = deserialize(&genesis_header_bytes).unwrap(); assert_eq!(genesis.prev_blockhash, BlockHash::default()); Self { - headers: vec![(genesis.block_hash(), genesis)], + headers: vec![HeaderEntry { + hash: genesis.block_hash(), + header: genesis, + gtxid: GlobalTxId::default(), + }], heights: std::iter::once((genesis.block_hash(), 0)).collect(), // genesis header @ zero height } } @@ -59,41 +84,60 @@ impl Chain { return; } let new_height = self.height().saturating_sub(n); - self.update(vec![NewHeader::from(( - self.headers[new_height].1, - new_height, - ))]) + let entry = &self.headers[new_height]; + let header = IndexedHeader { + header: entry.header, + height: new_height, + hash: entry.hash, + gtxid: entry.gtxid, + }; + self.update(vec![header]) } /// Load the chain from a collecion of headers, up to the given tip - pub(crate) fn load(&mut self, headers: Vec, tip: BlockHash) { - let genesis_hash = self.headers[0].0; + pub(crate) fn load(&mut self, rows: &[HeaderRow], tip: BlockHash) { + let genesis_hash = self.headers[0].hash; - let mut header_map: HashMap = - headers.into_iter().map(|h| (h.block_hash(), h)).collect(); + let mut map: HashMap = rows + .iter() + .map(|row| (row.header.block_hash(), row)) + .collect(); let mut blockhash = tip; - let mut new_headers = vec![]; + let mut ordered_rows = vec![]; while blockhash != genesis_hash { - let header = match header_map.remove(&blockhash) { - Some(header) => header, + let row = match map.remove(&blockhash) { + Some(row) => row, None => panic!("missing header {} while loading from DB", blockhash), }; - blockhash = header.prev_blockhash; - new_headers.push(header); + blockhash = row.header.prev_blockhash; + ordered_rows.push(row); } - info!("loading {} headers, tip={}", new_headers.len(), tip); - let new_headers = new_headers.into_iter().rev(); // order by height - self.update(new_headers.zip(1..).map(NewHeader::from).collect()) + info!("loading {} headers, tip={}", ordered_rows.len(), tip); + let ordered_rows = ordered_rows.iter().rev(); // order by height + self.update( + ordered_rows + .zip(1..) + .map(|(row, height)| IndexedHeader::from(row, height)) + .collect(), + ) } /// Get the block hash at specified height (if exists) pub(crate) fn get_block_hash(&self, height: usize) -> Option { - self.headers.get(height).map(|(hash, _header)| *hash) + self.headers.get(height).map(|entry| entry.hash) + } + + pub(crate) fn get_block_hash_by_gtxid(&self, id: GlobalTxId) -> Option { + let height = match self.headers.binary_search_by_key(&id, |entry| entry.gtxid) { + Ok(height) => height, + Err(height) => height, + }; + self.get_block_hash(height) } /// Get the block header at specified height (if exists) pub(crate) fn get_block_header(&self, height: usize) -> Option<&BlockHeader> { - self.headers.get(height).map(|(_hash, header)| header) + self.headers.get(height).map(|entry| &entry.header) } /// Get the block height given the specified hash (if exists) @@ -102,28 +146,46 @@ impl Chain { } /// Update the chain with a list of new headers (possibly a reorg) - pub(crate) fn update(&mut self, headers: Vec) { + pub(crate) fn update(&mut self, headers: Vec) { if let Some(first_height) = headers.first().map(|h| h.height) { - for (hash, _header) in self.headers.drain(first_height..) { - assert!(self.heights.remove(&hash).is_some()); + for entry in self.headers.drain(first_height..) { + assert!(self.heights.remove(&entry.hash).is_some()); } for (h, height) in headers.into_iter().zip(first_height..) { assert_eq!(h.height, height); assert_eq!(h.hash, h.header.block_hash()); + + if height > 0 { + let prev = &self.headers[height - 1]; + assert!(prev.hash == h.header.prev_blockhash); + assert!(prev.gtxid < h.gtxid); + } else { + assert_eq!(h.header.prev_blockhash, BlockHash::default()); + } + assert!(self.heights.insert(h.hash, h.height).is_none()); - self.headers.push((h.hash, h.header)); + self.headers.push(HeaderEntry { + hash: h.hash, + header: h.header, + gtxid: h.gtxid, + }); } info!( "chain updated: tip={}, height={}", - self.headers.last().unwrap().0, - self.headers.len() - 1 + self.tip(), + self.height() ); } } /// Best block hash pub(crate) fn tip(&self) -> BlockHash { - self.headers.last().expect("empty chain").0 + self.headers.last().expect("empty chain").hash + } + + /// Latest global transaction id + pub(crate) fn gtxid(&self) -> GlobalTxId { + self.headers.last().expect("empty chain").gtxid } /// Number of blocks (excluding genesis block) @@ -141,7 +203,7 @@ impl Chain { if result.len() >= 10 { step *= 2; } - result.push(self.headers[index].0); + result.push(self.headers[index].hash); if index == 0 { break; } @@ -153,7 +215,7 @@ impl Chain { #[cfg(test)] mod tests { - use super::{Chain, NewHeader}; + use super::{Chain, GlobalTxId, HeaderRow, IndexedHeader}; use bitcoin::consensus::deserialize; use bitcoin::hashes::hex::{FromHex, ToHex}; use bitcoin::network::constants::Network::Regtest; @@ -188,16 +250,34 @@ mod tests { .map(|hex_header| deserialize(&Vec::from_hex(hex_header).unwrap()).unwrap()) .collect(); + let mut header_rows = vec![]; + let mut gtxid = GlobalTxId::default(); + for header in &headers { + gtxid.next(); + header_rows.push(HeaderRow { + header: *header, + gtxid, + }) + } + for chunk_size in 1..hex_headers.len() { let mut regtest = Chain::new(Regtest); let mut height = 0; + let mut gtxid = GlobalTxId::default(); let mut tip = regtest.tip(); for chunk in headers.chunks(chunk_size) { let mut update = vec![]; for header in chunk { height += 1; + gtxid.next(); tip = header.block_hash(); - update.push(NewHeader::from((*header, height))) + update.push(IndexedHeader::from( + &HeaderRow { + header: *header, + gtxid, + }, + height, + )) } regtest.update(update); assert_eq!(regtest.tip(), tip); @@ -209,7 +289,7 @@ mod tests { // test loading from a list of headers and tip let mut regtest = Chain::new(Regtest); - regtest.load(headers.clone(), headers.last().unwrap().block_hash()); + regtest.load(&header_rows, headers.last().unwrap().block_hash()); assert_eq!(regtest.height(), headers.len()); // test getters @@ -242,11 +322,18 @@ mod tests { // test reorg let mut regtest = Chain::new(Regtest); - regtest.load(headers.clone(), headers.last().unwrap().block_hash()); + regtest.load(&header_rows, headers.last().unwrap().block_hash()); let height = regtest.height(); + let gtxid = regtest.gtxid(); let new_header: BlockHeader = deserialize(&Vec::from_hex("000000200030d7f9c11ef35b89a0eefb9a5e449909339b5e7854d99804ea8d6a49bf900a0304d2e55fe0b6415949cff9bca0f88c0717884a5e5797509f89f856af93624a7a6bcc60ffff7f2000000000").unwrap()).unwrap(); - regtest.update(vec![NewHeader::from((new_header, height))]); + regtest.update(vec![IndexedHeader::from( + &HeaderRow { + header: new_header, + gtxid, + }, + height, + )]); assert_eq!(regtest.height(), height); assert_eq!( regtest.tip().to_hex(), diff --git a/src/daemon.rs b/src/daemon.rs index 0903c25..9839aa7 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1,8 +1,6 @@ use anyhow::{Context, Result}; -use bitcoin::{ - consensus::serialize, hashes::hex::ToHex, Amount, Block, BlockHash, Transaction, Txid, -}; +use bitcoin::{consensus::serialize, hashes::hex::ToHex, Amount, BlockHash, Transaction, Txid}; use bitcoincore_rpc::{json, jsonrpc, Auth, Client, RpcApi}; use crossbeam_channel::Receiver; use parking_lot::Mutex; @@ -13,10 +11,10 @@ use std::io::Read; use std::path::Path; use crate::{ - chain::{Chain, NewHeader}, + chain::{Chain, NewBlockHash}, config::Config, metrics::Metrics, - p2p::Connection, + p2p::{BlockRequest, Connection}, signals::ExitFlag, }; @@ -230,16 +228,15 @@ impl Daemon { .context("failed to get mempool entry") } - pub(crate) fn get_new_headers(&self, chain: &Chain) -> Result> { + pub(crate) fn get_new_headers(&self, chain: &Chain) -> Result> { self.p2p.lock().get_new_headers(chain) } - pub(crate) fn for_blocks(&self, blockhashes: B, func: F) -> Result<()> + pub(crate) fn for_blocks(&self, requests: Vec, func: F) -> Result<()> where - B: IntoIterator, - F: FnMut(BlockHash, Block), + F: FnMut(BlockRequest, &[u8]), { - self.p2p.lock().for_blocks(blockhashes, func) + self.p2p.lock().for_blocks(requests, func) } pub(crate) fn new_block_notification(&self) -> Receiver<()> { diff --git a/src/db.rs b/src/db.rs index b7f11da..aeedc96 100644 --- a/src/db.rs +++ b/src/db.rs @@ -13,6 +13,7 @@ pub(crate) struct WriteBatch { pub(crate) funding_rows: Vec, pub(crate) spending_rows: Vec, pub(crate) txid_rows: Vec, + pub(crate) offset_rows: Vec<(Row, Row)>, } impl WriteBatch { @@ -21,6 +22,7 @@ impl WriteBatch { self.funding_rows.sort_unstable(); self.spending_rows.sort_unstable(); self.txid_rows.sort_unstable(); + self.offset_rows.sort_unstable(); } } @@ -33,10 +35,18 @@ pub struct DBStore { const CONFIG_CF: &str = "config"; const HEADERS_CF: &str = "headers"; const TXID_CF: &str = "txid"; +const OFFSET_CF: &str = "offset"; const FUNDING_CF: &str = "funding"; const SPENDING_CF: &str = "spending"; -const COLUMN_FAMILIES: &[&str] = &[CONFIG_CF, HEADERS_CF, TXID_CF, FUNDING_CF, SPENDING_CF]; +const COLUMN_FAMILIES: &[&str] = &[ + CONFIG_CF, + HEADERS_CF, + TXID_CF, + OFFSET_CF, + FUNDING_CF, + SPENDING_CF, +]; const CONFIG_KEY: &str = "C"; const TIP_KEY: &[u8] = b"T"; @@ -84,7 +94,7 @@ struct Config { format: u64, } -const CURRENT_FORMAT: u64 = 0; +const CURRENT_FORMAT: u64 = 1; impl Default for Config { fn default() -> Self { @@ -207,6 +217,10 @@ impl DBStore { self.db.cf_handle(TXID_CF).expect("missing TXID_CF") } + fn offset_cf(&self) -> &rocksdb::ColumnFamily { + self.db.cf_handle(OFFSET_CF).expect("missing TXID_CF") + } + fn headers_cf(&self) -> &rocksdb::ColumnFamily { self.db.cf_handle(HEADERS_CF).expect("missing HEADERS_CF") } @@ -252,6 +266,12 @@ impl DBStore { .expect("get_tip failed") } + pub(crate) fn get_tx_offset(&self, key: &[u8]) -> Option> { + self.db + .get_cf(self.offset_cf(), key) + .expect("get_tx_offset failed") + } + pub(crate) fn write(&self, batch: &WriteBatch) { let mut db_batch = rocksdb::WriteBatch::default(); for key in &batch.funding_rows { @@ -263,6 +283,9 @@ impl DBStore { for key in &batch.txid_rows { db_batch.put_cf(self.txid_cf(), key, b""); } + for (key, value) in &batch.offset_rows { + db_batch.put_cf(self.offset_cf(), key, value); + } for key in &batch.header_rows { db_batch.put_cf(self.headers_cf(), key, b""); } diff --git a/src/index.rs b/src/index.rs index 323cb31..42eaab7 100644 --- a/src/index.rs +++ b/src/index.rs @@ -1,14 +1,18 @@ use anyhow::{Context, Result}; use bitcoin::consensus::{deserialize, serialize}; -use bitcoin::{Block, BlockHash, OutPoint, Txid}; +use bitcoin::{BlockHash, OutPoint, Txid}; use crate::{ - chain::{Chain, NewHeader}, + chain::{Chain, IndexedHeader, NewBlockHash}, daemon::Daemon, db::{DBStore, Row, WriteBatch}, metrics::{self, Gauge, Histogram, Metrics}, + p2p::BlockRequest, signals::ExitFlag, - types::{HashPrefixRow, HeaderRow, ScriptHash, ScriptHashRow, SpendingPrefixRow, TxidRow}, + types::{ + GlobalTxId, HashPrefixRow, HeaderRow, ScriptHash, ScriptHashRow, SpendingPrefixRow, + TxLocation, TxOffsetRow, TxidRow, + }, }; #[derive(Clone)] @@ -78,6 +82,7 @@ struct IndexResult { funding_rows: Vec, spending_rows: Vec, txid_rows: Vec, + offset_rows: Vec, } impl IndexResult { @@ -91,6 +96,9 @@ impl IndexResult { let txid_rows = self.txid_rows.iter().map(HashPrefixRow::to_db_row); batch.txid_rows.extend(txid_rows); + let offset_rows = self.offset_rows.iter().map(|row| (row.key(), row.value())); + batch.offset_rows.extend(offset_rows); + batch.header_rows.push(self.header_row.to_db_row()); batch.tip_row = serialize(&self.header_row.header.block_hash()).into_boxed_slice(); } @@ -117,12 +125,12 @@ impl Index { ) -> Result { if let Some(row) = store.get_tip() { let tip = deserialize(&row).expect("invalid tip"); - let headers = store + let rows: Vec = store .read_headers() .into_iter() - .map(|row| HeaderRow::from_db_row(&row).header) + .map(|row| HeaderRow::from_db_row(&row)) .collect(); - chain.load(headers, tip); + chain.load(&rows, tip); chain.drop_last_headers(reindex_last_blocks); }; let stats = Stats::new(metrics); @@ -154,31 +162,43 @@ impl Index { Ok(result) } + pub(crate) fn get_tx_location(&self, id: GlobalTxId) -> Option { + let blockhash = self.chain.get_block_hash_by_gtxid(id); + let offset = self + .store + .get_tx_offset(&serialize(&id)) + .map(|value| TxOffsetRow::parse_offset(&value)); + match (blockhash, offset) { + (Some(blockhash), Some(offset)) => Some(TxLocation { blockhash, offset }), + _ => None, + } + } + pub(crate) fn filter_by_txid(&self, txid: Txid) -> impl Iterator + '_ { self.store .iter_txid(TxidRow::scan_prefix(txid)) - .map(|row| HashPrefixRow::from_db_row(&row).height()) - .filter_map(move |height| self.chain.get_block_hash(height)) + .map(|row| HashPrefixRow::from_db_row(&row).id()) + .filter_map(move |id| self.chain.get_block_hash_by_gtxid(id)) } pub(crate) fn filter_by_funding( &self, scripthash: ScriptHash, - ) -> impl Iterator + '_ { + ) -> impl Iterator + '_ { self.store .iter_funding(ScriptHashRow::scan_prefix(scripthash)) - .map(|row| HashPrefixRow::from_db_row(&row).height()) - .filter_map(move |height| self.chain.get_block_hash(height)) + .map(|row| HashPrefixRow::from_db_row(&row).id()) + .filter_map(move |id| self.get_tx_location(id)) } pub(crate) fn filter_by_spending( &self, outpoint: OutPoint, - ) -> impl Iterator + '_ { + ) -> impl Iterator + '_ { self.store .iter_spending(SpendingPrefixRow::scan_prefix(outpoint)) - .map(|row| HashPrefixRow::from_db_row(&row).height()) - .filter_map(move |height| self.chain.get_block_hash(height)) + .map(|row| HashPrefixRow::from_db_row(&row).id()) + .filter_map(move |id| self.get_tx_location(id)) } // Return `Ok(true)` when the chain is fully synced and the index is compacted. @@ -202,6 +222,8 @@ impl Index { return Ok(true); // no more blocks to index (done for now) } } + let mut gtxid = self.chain.gtxid(); + let mut indexed_headers = Vec::with_capacity(new_headers.len()); for chunk in new_headers.chunks(self.batch_size) { exit_flag.poll().with_context(|| { format!( @@ -209,22 +231,33 @@ impl Index { chunk.first().unwrap().height() ) })?; - self.sync_blocks(daemon, chunk)?; + indexed_headers.extend(self.sync_blocks(daemon, chunk, &mut gtxid)?); } - self.chain.update(new_headers); + self.chain.update(indexed_headers); self.stats.observe_chain(&self.chain); Ok(false) // sync is not done } - fn sync_blocks(&mut self, daemon: &Daemon, chunk: &[NewHeader]) -> Result<()> { - let blockhashes: Vec = chunk.iter().map(|h| h.hash()).collect(); + fn sync_blocks( + &mut self, + daemon: &Daemon, + chunk: &[NewBlockHash], + gtxid: &mut GlobalTxId, + ) -> Result> { + let mut indexed_headers = Vec::with_capacity(chunk.len()); + let requests: Vec = chunk + .iter() + .map(|h| BlockRequest::get_full_block(h.hash())) + .collect(); let mut heights = chunk.iter().map(|h| h.height()); let mut batch = WriteBatch::default(); - daemon.for_blocks(blockhashes, |_blockhash, block| { + daemon.for_blocks(requests, |req, raw| { let height = heights.next().expect("unexpected block"); self.stats.observe_duration("block", || { - index_single_block(block, height).extend(&mut batch) + let result = index_single_block(req, raw, gtxid); + indexed_headers.push(IndexedHeader::from(&result.header_row, height)); + result.extend(&mut batch); }); self.stats.height.set("tip", height as f64); })?; @@ -239,7 +272,7 @@ impl Index { self.stats .observe_duration("write", || self.store.write(&batch)); self.stats.observe_db(&self.store); - Ok(()) + Ok(indexed_headers) } pub(crate) fn is_ready(&self) -> bool { @@ -251,13 +284,17 @@ fn db_rows_size(rows: &[Row]) -> usize { rows.iter().map(|key| key.len()).sum() } -fn index_single_block(block: Block, height: usize) -> IndexResult { - let mut funding_rows = Vec::with_capacity(block.txdata.iter().map(|tx| tx.output.len()).sum()); - let mut spending_rows = Vec::with_capacity(block.txdata.iter().map(|tx| tx.input.len()).sum()); - let mut txid_rows = Vec::with_capacity(block.txdata.len()); +fn index_single_block(req: BlockRequest, raw: &[u8], gtxid: &mut GlobalTxId) -> IndexResult { + let header = req.parse_header(raw); + let mut funding_rows = vec![]; + let mut spending_rows = vec![]; + let mut txid_rows = vec![]; + let mut offset_rows = vec![]; - for tx in &block.txdata { - txid_rows.push(TxidRow::row(tx.txid(), height)); + for (tx, offset) in req.parse_transactions(raw) { + gtxid.next(); + txid_rows.push(TxidRow::row(tx.txid(), *gtxid)); + offset_rows.push(TxOffsetRow::row(*gtxid, offset)); funding_rows.extend( tx.output @@ -265,7 +302,7 @@ fn index_single_block(block: Block, height: usize) -> IndexResult { .filter(|txo| !txo.script_pubkey.is_provably_unspendable()) .map(|txo| { let scripthash = ScriptHash::new(&txo.script_pubkey); - ScriptHashRow::row(scripthash, height) + ScriptHashRow::row(scripthash, *gtxid) }), ); @@ -275,13 +312,14 @@ fn index_single_block(block: Block, height: usize) -> IndexResult { spending_rows.extend( tx.input .iter() - .map(|txin| SpendingPrefixRow::row(txin.previous_output, height)), + .map(|txin| SpendingPrefixRow::row(txin.previous_output, *gtxid)), ); } IndexResult { funding_rows, spending_rows, txid_rows, - header_row: HeaderRow::new(block.header), + offset_rows, + header_row: HeaderRow::new(header, *gtxid), } } diff --git a/src/p2p.rs b/src/p2p.rs index b7ed35a..eebeda7 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -11,19 +11,20 @@ use bitcoin::{ message_network, }, secp256k1::{self, rand::Rng}, - Block, BlockHash, BlockHeader, Network, + Block, BlockHash, BlockHeader, Network, Transaction, }; use crossbeam_channel::{bounded, select, Receiver, Sender}; -use std::io::{self, ErrorKind, Write}; +use std::io::{self, Cursor, ErrorKind, Seek, SeekFrom, Write}; use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use crate::{ - chain::{Chain, NewHeader}, + chain::{Chain, NewBlockHash}, config::ELECTRS_VERSION, metrics::{default_duration_buckets, default_size_buckets, Histogram, Metrics}, + types::TxLocation, }; enum Request { @@ -43,15 +44,76 @@ impl Request { Request::GetBlocks( blockhashes .iter() - .map(|blockhash| Inventory::WitnessBlock(*blockhash)) + .copied() + .map(Inventory::WitnessBlock) .collect(), ) } } +pub(crate) struct BlockRequest { + hash: BlockHash, + offsets: Option>, // if None, fetch all txs +} + +impl BlockRequest { + pub fn get_single_transaction(loc: TxLocation) -> Self { + Self { + hash: loc.blockhash, + offsets: Some(vec![loc.offset as usize]), + } + } + + pub fn get_full_block(hash: BlockHash) -> Self { + Self { + hash, + offsets: None, + } + } + + pub fn blockhash(&self) -> BlockHash { + self.hash + } + + pub fn parse_header<'a>(&'a self, mut block: &'a [u8]) -> BlockHeader { + let header = BlockHeader::consensus_decode(&mut block).expect("TODO: better handling"); + assert_eq!(header.block_hash(), self.hash, "got wrong block"); + header + } + + pub fn parse_transactions<'a>( + &'a self, + block: &'a [u8], + ) -> Box + 'a> { + if let Some(offsets) = &self.offsets { + let result = offsets.iter().map(move |offset| { + let tx = Transaction::consensus_decode(&block[*offset..]) + .expect("TODO: better handling"); + (tx, *offset) + }); + return Box::new(result); + } + let mut cursor = Cursor::new(block); + cursor + .seek(SeekFrom::Start(80)) + .expect("TODO: better handling"); // skip block header + let count = VarInt::consensus_decode(&mut cursor) + .expect("TODO: better handling") + .0; + let result = (0..count).map(move |_| { + let offset = cursor.position() as usize; + let tx = Transaction::consensus_decode(&mut cursor).expect("TODO: better handling"); + (tx, offset) + }); + // TODO: check all block is parsed + Box::new(result) + } + // TODO: add tests +} + pub(crate) struct Connection { req_send: Sender, - blocks_recv: Receiver, + blocks_recv: Receiver>, headers_recv: Receiver>, new_block_recv: Receiver<()>, @@ -62,7 +124,7 @@ impl Connection { /// Get new block headers (supporting reorgs). /// https://en.bitcoin.it/wiki/Protocol_documentation#getheaders /// Defined as `&mut self` to prevent concurrent invocations (https://github.com/romanz/electrs/pull/526#issuecomment-934685515). - pub(crate) fn get_new_headers(&mut self, chain: &Chain) -> Result> { + pub(crate) fn get_new_headers(&mut self, chain: &Chain) -> Result> { self.req_send.send(Request::get_new_headers(chain))?; let headers = self .headers_recv @@ -79,41 +141,42 @@ impl Connection { None => bail!("missing prev_blockhash: {}", prev_blockhash), }; Ok(headers - .into_iter() + .iter() .zip(new_heights) - .map(NewHeader::from) + .map(|(header, height)| NewBlockHash::from(header.block_hash(), height)) .collect()) } /// Request and process the specified blocks (in the specified order). /// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks for details. /// Defined as `&mut self` to prevent concurrent invocations (https://github.com/romanz/electrs/pull/526#issuecomment-934685515). - pub(crate) fn for_blocks(&mut self, blockhashes: B, mut func: F) -> Result<()> + pub(crate) fn for_blocks(&mut self, requests: Vec, mut func: F) -> Result<()> where - B: IntoIterator, - F: FnMut(BlockHash, Block), + F: FnMut(BlockRequest, &[u8]), { self.blocks_duration.observe_duration("total", || { - let blockhashes: Vec = blockhashes.into_iter().collect(); - if blockhashes.is_empty() { + if requests.is_empty() { return Ok(()); } + let blockhashes: Vec<_> = requests.iter().map(|req| req.hash).collect(); self.blocks_duration.observe_duration("request", || { - debug!("loading {} blocks", blockhashes.len()); + debug!("loading {} blocks", requests.len()); self.req_send.send(Request::get_blocks(&blockhashes)) })?; - for hash in blockhashes { - let block = self.blocks_duration.observe_duration("response", || { - let block = self - .blocks_recv - .recv() - .with_context(|| format!("failed to get block {}", hash))?; - ensure!(block.block_hash() == hash, "got unexpected block"); - Ok(block) - })?; + for (req, blockhash) in requests.into_iter().zip(blockhashes.into_iter()) { + let block = + self.blocks_duration + .observe_duration("response", || -> Result> { + let block = self + .blocks_recv + .recv() + .with_context(|| format!("failed to get block {}", blockhash))?; + // ensure!(block.block_hash() == hash, "got unexpected block"); // TODO: add it back + Ok(block) + })?; self.blocks_duration - .observe_duration("process", || func(hash, block)); + .observe_duration("process", || func(req, &block)); } Ok(()) }) @@ -227,7 +290,7 @@ impl Connection { }); let (req_send, req_recv) = bounded::(1); - let (blocks_send, blocks_recv) = bounded::(10); + let (blocks_send, blocks_recv) = bounded::>(10); let (headers_send, headers_recv) = bounded::>(1); let (new_block_send, new_block_recv) = bounded::<()>(0); let (init_send, init_recv) = bounded::<()>(0); @@ -271,10 +334,10 @@ impl Connection { NetworkMessage::Verack => { init_send.send(())?; // peer acknowledged our version } - NetworkMessage::Block(block) => blocks_send.send(block)?, NetworkMessage::Headers(headers) => headers_send.send(headers)?, NetworkMessage::Alert(_) => (), // https://bitcoin.org/en/alert/2016-11-01-alert-retirement NetworkMessage::Addr(_) => (), // unused + NetworkMessage::Unknown { command, payload } if command.as_ref() == "block" => blocks_send.send(payload)?, msg => warn!("unexpected message: {:?}", msg), } } @@ -343,7 +406,6 @@ impl RawNetworkMessage { "verack" => NetworkMessage::Verack, "inv" => NetworkMessage::Inv(Decodable::consensus_decode(&mut raw)?), "notfound" => NetworkMessage::NotFound(Decodable::consensus_decode(&mut raw)?), - "block" => NetworkMessage::Block(Decodable::consensus_decode(&mut raw)?), "headers" => { let len = VarInt::consensus_decode(&mut raw)?.0; let mut headers = Vec::with_capacity(len as usize); diff --git a/src/status.rs b/src/status.rs index f72fabf..0bc7e49 100644 --- a/src/status.rs +++ b/src/status.rs @@ -1,7 +1,7 @@ use anyhow::Result; use bitcoin::{ hashes::{sha256, Hash, HashEngine}, - Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid, + Amount, BlockHash, OutPoint, SignedAmount, Transaction, Txid, }; use rayon::prelude::*; use serde::ser::{Serialize, Serializer}; @@ -15,7 +15,8 @@ use crate::{ daemon::Daemon, index::Index, mempool::Mempool, - types::{ScriptHash, StatusHash}, + p2p::BlockRequest, + types::{ScriptHash, StatusHash, TxLocation}, }; /// Given a scripthash, store relevant inputs and outputs of a specific transaction @@ -303,17 +304,20 @@ impl ScriptHashStatus { } /// Apply func only on the new blocks (fetched from daemon). - fn for_new_blocks(&self, blockhashes: B, daemon: &Daemon, func: F) -> Result<()> + fn for_new_blocks(&self, locations: B, daemon: &Daemon, mut func: F) -> Result<()> where - B: IntoIterator, - F: FnMut(BlockHash, Block), + B: IntoIterator, + F: FnMut(BlockRequest, &[u8]), { - daemon.for_blocks( - blockhashes - .into_iter() - .filter(|blockhash| !self.confirmed.contains_key(blockhash)), - func, - ) + let requests = locations + .into_iter() + .filter(|loc| !self.confirmed.contains_key(&loc.blockhash)) + .map(BlockRequest::get_single_transaction) + .collect(); + daemon.for_blocks(requests, |loc, blob| { + // assert_eq!(loc.blockhash, block.block_hash()); // TODO: re-add + func(loc, blob) + }) } /// Get funding and spending entries from new blocks. @@ -328,12 +332,12 @@ impl ScriptHashStatus { let scripthash = self.scripthash; let mut result = HashMap::>::new(); - let funding_blockhashes = index.limit_result(index.filter_by_funding(scripthash))?; - self.for_new_blocks(funding_blockhashes, daemon, |blockhash, block| { - let block_entries = result.entry(blockhash).or_default(); - filter_block_txs(block, |tx| filter_outputs(tx, scripthash)).for_each( + let funding_locations = index.limit_result(index.filter_by_funding(scripthash))?; + self.for_new_blocks(funding_locations, daemon, |req, blob| { + let block_entries = result.entry(req.blockhash()).or_default(); + filter_block_txs(blob, &req, |tx| filter_outputs(tx, scripthash)).for_each( |FilteredTx { - pos, + offset, tx, txid, result: funding_outputs, @@ -341,28 +345,28 @@ impl ScriptHashStatus { cache.add_tx(txid, move || tx); outpoints.extend(make_outpoints(txid, &funding_outputs)); block_entries - .entry(pos) + .entry(offset) .or_insert_with(|| TxEntry::new(txid)) .outputs = funding_outputs; }, ); })?; - let spending_blockhashes: HashSet = outpoints + let spending_locations: HashSet = outpoints .par_iter() .flat_map_iter(|outpoint| index.filter_by_spending(*outpoint)) .collect(); - self.for_new_blocks(spending_blockhashes, daemon, |blockhash, block| { - let block_entries = result.entry(blockhash).or_default(); - filter_block_txs(block, |tx| filter_inputs(tx, outpoints)).for_each( + self.for_new_blocks(spending_locations, daemon, |req, blob| { + let block_entries = result.entry(req.blockhash()).or_default(); + filter_block_txs(blob, &req, |tx| filter_inputs(tx, outpoints)).for_each( |FilteredTx { - pos, + offset, tx, txid, result: spent_outpoints, }| { cache.add_tx(txid, move || tx); block_entries - .entry(pos) + .entry(offset) .or_insert_with(|| TxEntry::new(txid)) .spent = spent_outpoints; }, @@ -377,7 +381,7 @@ impl ScriptHashStatus { .into_iter() .collect::>() .into_iter() - .map(|(_pos, entry)| entry) + .map(|(_offset, entry)| entry) .collect::>(); (blockhash, sorted_entries) }) @@ -510,33 +514,31 @@ fn compute_status_hash(history: &[HistoryEntry]) -> Option { struct FilteredTx { tx: Transaction, txid: Txid, - pos: usize, + offset: usize, result: Vec, } -fn filter_block_txs( - block: Block, - map_fn: impl Fn(&Transaction) -> Vec + Sync, -) -> impl Iterator> { - block - .txdata - .into_par_iter() - .enumerate() - .filter_map(|(pos, tx)| { +fn filter_block_txs<'a, T: Send>( + blob: &'a [u8], + req: &'a BlockRequest, + map_fn: impl Fn(&Transaction) -> Vec + Sync + 'a, +) -> impl Iterator> + 'a { + req.parse_transactions(blob) + .into_iter() + .filter_map(move |(tx, offset)| { let result = map_fn(&tx); if result.is_empty() { - return None; // skip irrelevant transaction + None // skip irrelevant transaction + } else { + let txid = tx.txid(); + Some(FilteredTx { + tx, + txid, + offset, + result, + }) } - let txid = tx.txid(); - Some(FilteredTx { - tx, - txid, - pos, - result, - }) }) - .collect::>() - .into_iter() } #[cfg(test)] diff --git a/src/types.rs b/src/types.rs index c84da9a..4d3a7d3 100644 --- a/src/types.rs +++ b/src/types.rs @@ -5,7 +5,7 @@ use std::convert::TryFrom; use bitcoin::{ consensus::encode::{deserialize, serialize, Decodable, Encodable}, hashes::{borrow_slice_impl, hash_newtype, hex_fmt_impl, index_impl, serde_impl, sha256, Hash}, - BlockHeader, OutPoint, Script, Txid, + BlockHash, BlockHeader, OutPoint, Script, Txid, }; use crate::db; @@ -37,15 +37,41 @@ macro_rules! impl_consensus_encoding { ); } +#[derive(Debug, Eq, PartialEq, Hash)] +pub(crate) struct TxLocation { + pub blockhash: BlockHash, + pub offset: u32, // byte offset within the block +} + const HASH_PREFIX_LEN: usize = 8; type HashPrefix = [u8; HASH_PREFIX_LEN]; -type Height = u32; + +#[derive(Debug, Serialize, Deserialize, PartialEq, Copy, Clone, Default, Ord, Eq, PartialOrd)] +pub(crate) struct GlobalTxId { + id: u32, // transaction index in blockchain order +} + +impl From for GlobalTxId { + fn from(id: u64) -> Self { + Self { + id: u32::try_from(id).expect("GlobalTxId is too large"), + } + } +} + +impl GlobalTxId { + pub fn next(&mut self) { + self.id += 1; + } +} + +impl_consensus_encoding!(GlobalTxId, id); #[derive(Debug, Serialize, Deserialize, PartialEq)] pub(crate) struct HashPrefixRow { prefix: [u8; HASH_PREFIX_LEN], - height: Height, // transaction confirmed height + id: GlobalTxId, } impl HashPrefixRow { @@ -57,12 +83,12 @@ impl HashPrefixRow { deserialize(row).expect("bad HashPrefixRow") } - pub fn height(&self) -> usize { - usize::try_from(self.height).expect("invalid height") + pub fn id(&self) -> GlobalTxId { + self.id } } -impl_consensus_encoding!(HashPrefixRow, prefix, height); +impl_consensus_encoding!(HashPrefixRow, prefix, id); hash_newtype!( ScriptHash, @@ -91,10 +117,10 @@ impl ScriptHashRow { scripthash.0[..HASH_PREFIX_LEN].to_vec().into_boxed_slice() } - pub(crate) fn row(scripthash: ScriptHash, height: usize) -> HashPrefixRow { + pub(crate) fn row(scripthash: ScriptHash, id: GlobalTxId) -> HashPrefixRow { HashPrefixRow { prefix: scripthash.prefix(), - height: Height::try_from(height).expect("invalid height"), + id, } } } @@ -125,10 +151,10 @@ impl SpendingPrefixRow { Box::new(spending_prefix(outpoint)) } - pub(crate) fn row(outpoint: OutPoint, height: usize) -> HashPrefixRow { + pub(crate) fn row(outpoint: OutPoint, id: GlobalTxId) -> HashPrefixRow { HashPrefixRow { prefix: spending_prefix(outpoint), - height: Height::try_from(height).expect("invalid height"), + id, } } } @@ -148,26 +174,53 @@ impl TxidRow { Box::new(txid_prefix(&txid)) } - pub(crate) fn row(txid: Txid, height: usize) -> HashPrefixRow { + pub(crate) fn row(txid: Txid, id: GlobalTxId) -> HashPrefixRow { HashPrefixRow { prefix: txid_prefix(&txid), - height: Height::try_from(height).expect("invalid height"), + id, } } } +pub(crate) struct TxOffsetRow { + id: GlobalTxId, + offset: u32, // byte offset within a block +} + +impl TxOffsetRow { + pub(crate) fn row(id: GlobalTxId, offset: usize) -> Self { + Self { + id, + offset: u32::try_from(offset).expect("too large offset"), + } + } + + pub(crate) fn key(&self) -> Box<[u8]> { + serialize(&self.id).into_boxed_slice() + } + + pub(crate) fn value(&self) -> Box<[u8]> { + serialize(&self.offset).into_boxed_slice() + } + + pub(crate) fn parse_offset(bytes: &[u8]) -> u32 { + deserialize(bytes).expect("invalid offset") + } +} + // *************************************************************************** #[derive(Debug, Serialize, Deserialize)] pub(crate) struct HeaderRow { pub(crate) header: BlockHeader, + pub(crate) gtxid: GlobalTxId, // after indexing this block (cumulative # of transactions) } -impl_consensus_encoding!(HeaderRow, header); +impl_consensus_encoding!(HeaderRow, header, gtxid); impl HeaderRow { - pub(crate) fn new(header: BlockHeader) -> Self { - Self { header } + pub(crate) fn new(header: BlockHeader, gtxid: GlobalTxId) -> Self { + Self { header, gtxid } } pub(crate) fn to_db_row(&self) -> db::Row { @@ -181,7 +234,9 @@ impl HeaderRow { #[cfg(test)] mod tests { - use crate::types::{spending_prefix, HashPrefixRow, ScriptHash, ScriptHashRow, TxidRow}; + use crate::types::{ + spending_prefix, GlobalTxId, HashPrefixRow, ScriptHash, ScriptHashRow, TxidRow, + }; use bitcoin::{hashes::hex::ToHex, Address, OutPoint, Txid}; use serde_json::{from_str, json}; @@ -199,7 +254,7 @@ mod tests { fn test_scripthash_row() { let hex = "\"4b3d912c1523ece4615e91bf0d27381ca72169dbf6b1c2ffcc9f92381d4984a3\""; let scripthash: ScriptHash = from_str(&hex).unwrap(); - let row1 = ScriptHashRow::row(scripthash, 123456); + let row1 = ScriptHashRow::row(scripthash, GlobalTxId::from(123456)); let db_row = row1.to_db_row(); assert_eq!(db_row[..].to_hex(), "a384491d38929fcc40e20100"); let row2 = HashPrefixRow::from_db_row(&db_row); @@ -222,8 +277,8 @@ mod tests { let hex = "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599"; let txid = Txid::from_str(hex).unwrap(); - let row1 = TxidRow::row(txid, 91812); - let row2 = TxidRow::row(txid, 91842); + let row1 = TxidRow::row(txid, GlobalTxId::from(91812)); + let row2 = TxidRow::row(txid, GlobalTxId::from(91842)); assert_eq!(row1.to_db_row().to_hex(), "9985d82954e10f22a4660100"); assert_eq!(row2.to_db_row().to_hex(), "9985d82954e10f22c2660100"); @@ -235,8 +290,8 @@ mod tests { let hex = "e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468"; let txid = Txid::from_str(hex).unwrap(); - let row1 = TxidRow::row(txid, 91722); - let row2 = TxidRow::row(txid, 91880); + let row1 = TxidRow::row(txid, GlobalTxId::from(91722)); + let row2 = TxidRow::row(txid, GlobalTxId::from(91880)); // low-endian encoding => rows should be sorted according to block height assert_eq!(row1.to_db_row().to_hex(), "68b45f58b674e94e4a660100");