1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 01:43:29 +01:00

WIP: gtxid

This commit is contained in:
Roman Zeyde 2021-11-23 12:22:53 +02:00
parent 4f6d5a4d4a
commit 5d7655efc1
8 changed files with 441 additions and 177 deletions

View File

@ -8,7 +8,7 @@ cargo build --all --features "metrics_process" --release
NETWORK=$1
shift
DB=./db
DB=./db1
export RUST_LOG=${RUST_LOG-electrs=INFO}
target/release/electrs --network $NETWORK --db-dir $DB --daemon-dir $HOME/.bitcoin $*

View File

@ -5,20 +5,17 @@ use bitcoin::hashes::hex::FromHex;
use bitcoin::network::constants;
use bitcoin::{BlockHash, BlockHeader};
use crate::types::{GlobalTxId, HeaderRow};
/// A new header found, to be added to the chain at specific height
pub(crate) struct NewHeader {
header: BlockHeader,
pub(crate) struct NewBlockHash {
hash: BlockHash,
height: usize,
}
impl NewHeader {
pub(crate) fn from((header, height): (BlockHeader, usize)) -> Self {
Self {
header,
hash: header.block_hash(),
height,
}
impl NewBlockHash {
pub(crate) fn from(hash: BlockHash, height: usize) -> Self {
Self { hash, height }
}
pub(crate) fn height(&self) -> usize {
@ -30,9 +27,33 @@ impl NewHeader {
}
}
pub(crate) struct IndexedHeader {
header: BlockHeader,
hash: BlockHash,
height: usize,
gtxid: GlobalTxId,
}
impl IndexedHeader {
pub(crate) fn from(row: &HeaderRow, height: usize) -> Self {
Self {
header: row.header,
hash: row.header.block_hash(),
height,
gtxid: row.gtxid,
}
}
}
struct HeaderEntry {
hash: BlockHash,
header: BlockHeader,
gtxid: GlobalTxId,
}
/// Current blockchain headers' list
pub struct Chain {
headers: Vec<(BlockHash, BlockHeader)>,
headers: Vec<HeaderEntry>,
heights: HashMap<BlockHash, usize>,
}
@ -49,7 +70,11 @@ impl Chain {
let genesis: BlockHeader = deserialize(&genesis_header_bytes).unwrap();
assert_eq!(genesis.prev_blockhash, BlockHash::default());
Self {
headers: vec![(genesis.block_hash(), genesis)],
headers: vec![HeaderEntry {
hash: genesis.block_hash(),
header: genesis,
gtxid: GlobalTxId::default(),
}],
heights: std::iter::once((genesis.block_hash(), 0)).collect(), // genesis header @ zero height
}
}
@ -59,41 +84,60 @@ impl Chain {
return;
}
let new_height = self.height().saturating_sub(n);
self.update(vec![NewHeader::from((
self.headers[new_height].1,
new_height,
))])
let entry = &self.headers[new_height];
let header = IndexedHeader {
header: entry.header,
height: new_height,
hash: entry.hash,
gtxid: entry.gtxid,
};
self.update(vec![header])
}
/// Load the chain from a collecion of headers, up to the given tip
pub(crate) fn load(&mut self, headers: Vec<BlockHeader>, tip: BlockHash) {
let genesis_hash = self.headers[0].0;
pub(crate) fn load(&mut self, rows: &[HeaderRow], tip: BlockHash) {
let genesis_hash = self.headers[0].hash;
let mut header_map: HashMap<BlockHash, BlockHeader> =
headers.into_iter().map(|h| (h.block_hash(), h)).collect();
let mut map: HashMap<BlockHash, &HeaderRow> = rows
.iter()
.map(|row| (row.header.block_hash(), row))
.collect();
let mut blockhash = tip;
let mut new_headers = vec![];
let mut ordered_rows = vec![];
while blockhash != genesis_hash {
let header = match header_map.remove(&blockhash) {
Some(header) => header,
let row = match map.remove(&blockhash) {
Some(row) => row,
None => panic!("missing header {} while loading from DB", blockhash),
};
blockhash = header.prev_blockhash;
new_headers.push(header);
blockhash = row.header.prev_blockhash;
ordered_rows.push(row);
}
info!("loading {} headers, tip={}", new_headers.len(), tip);
let new_headers = new_headers.into_iter().rev(); // order by height
self.update(new_headers.zip(1..).map(NewHeader::from).collect())
info!("loading {} headers, tip={}", ordered_rows.len(), tip);
let ordered_rows = ordered_rows.iter().rev(); // order by height
self.update(
ordered_rows
.zip(1..)
.map(|(row, height)| IndexedHeader::from(row, height))
.collect(),
)
}
/// Get the block hash at specified height (if exists)
pub(crate) fn get_block_hash(&self, height: usize) -> Option<BlockHash> {
self.headers.get(height).map(|(hash, _header)| *hash)
self.headers.get(height).map(|entry| entry.hash)
}
pub(crate) fn get_block_hash_by_gtxid(&self, id: GlobalTxId) -> Option<BlockHash> {
let height = match self.headers.binary_search_by_key(&id, |entry| entry.gtxid) {
Ok(height) => height,
Err(height) => height,
};
self.get_block_hash(height)
}
/// Get the block header at specified height (if exists)
pub(crate) fn get_block_header(&self, height: usize) -> Option<&BlockHeader> {
self.headers.get(height).map(|(_hash, header)| header)
self.headers.get(height).map(|entry| &entry.header)
}
/// Get the block height given the specified hash (if exists)
@ -102,28 +146,46 @@ impl Chain {
}
/// Update the chain with a list of new headers (possibly a reorg)
pub(crate) fn update(&mut self, headers: Vec<NewHeader>) {
pub(crate) fn update(&mut self, headers: Vec<IndexedHeader>) {
if let Some(first_height) = headers.first().map(|h| h.height) {
for (hash, _header) in self.headers.drain(first_height..) {
assert!(self.heights.remove(&hash).is_some());
for entry in self.headers.drain(first_height..) {
assert!(self.heights.remove(&entry.hash).is_some());
}
for (h, height) in headers.into_iter().zip(first_height..) {
assert_eq!(h.height, height);
assert_eq!(h.hash, h.header.block_hash());
if height > 0 {
let prev = &self.headers[height - 1];
assert!(prev.hash == h.header.prev_blockhash);
assert!(prev.gtxid < h.gtxid);
} else {
assert_eq!(h.header.prev_blockhash, BlockHash::default());
}
assert!(self.heights.insert(h.hash, h.height).is_none());
self.headers.push((h.hash, h.header));
self.headers.push(HeaderEntry {
hash: h.hash,
header: h.header,
gtxid: h.gtxid,
});
}
info!(
"chain updated: tip={}, height={}",
self.headers.last().unwrap().0,
self.headers.len() - 1
self.tip(),
self.height()
);
}
}
/// Best block hash
pub(crate) fn tip(&self) -> BlockHash {
self.headers.last().expect("empty chain").0
self.headers.last().expect("empty chain").hash
}
/// Latest global transaction id
pub(crate) fn gtxid(&self) -> GlobalTxId {
self.headers.last().expect("empty chain").gtxid
}
/// Number of blocks (excluding genesis block)
@ -141,7 +203,7 @@ impl Chain {
if result.len() >= 10 {
step *= 2;
}
result.push(self.headers[index].0);
result.push(self.headers[index].hash);
if index == 0 {
break;
}
@ -153,7 +215,7 @@ impl Chain {
#[cfg(test)]
mod tests {
use super::{Chain, NewHeader};
use super::{Chain, GlobalTxId, HeaderRow, IndexedHeader};
use bitcoin::consensus::deserialize;
use bitcoin::hashes::hex::{FromHex, ToHex};
use bitcoin::network::constants::Network::Regtest;
@ -188,16 +250,34 @@ mod tests {
.map(|hex_header| deserialize(&Vec::from_hex(hex_header).unwrap()).unwrap())
.collect();
let mut header_rows = vec![];
let mut gtxid = GlobalTxId::default();
for header in &headers {
gtxid.next();
header_rows.push(HeaderRow {
header: *header,
gtxid,
})
}
for chunk_size in 1..hex_headers.len() {
let mut regtest = Chain::new(Regtest);
let mut height = 0;
let mut gtxid = GlobalTxId::default();
let mut tip = regtest.tip();
for chunk in headers.chunks(chunk_size) {
let mut update = vec![];
for header in chunk {
height += 1;
gtxid.next();
tip = header.block_hash();
update.push(NewHeader::from((*header, height)))
update.push(IndexedHeader::from(
&HeaderRow {
header: *header,
gtxid,
},
height,
))
}
regtest.update(update);
assert_eq!(regtest.tip(), tip);
@ -209,7 +289,7 @@ mod tests {
// test loading from a list of headers and tip
let mut regtest = Chain::new(Regtest);
regtest.load(headers.clone(), headers.last().unwrap().block_hash());
regtest.load(&header_rows, headers.last().unwrap().block_hash());
assert_eq!(regtest.height(), headers.len());
// test getters
@ -242,11 +322,18 @@ mod tests {
// test reorg
let mut regtest = Chain::new(Regtest);
regtest.load(headers.clone(), headers.last().unwrap().block_hash());
regtest.load(&header_rows, headers.last().unwrap().block_hash());
let height = regtest.height();
let gtxid = regtest.gtxid();
let new_header: BlockHeader = deserialize(&Vec::from_hex("000000200030d7f9c11ef35b89a0eefb9a5e449909339b5e7854d99804ea8d6a49bf900a0304d2e55fe0b6415949cff9bca0f88c0717884a5e5797509f89f856af93624a7a6bcc60ffff7f2000000000").unwrap()).unwrap();
regtest.update(vec![NewHeader::from((new_header, height))]);
regtest.update(vec![IndexedHeader::from(
&HeaderRow {
header: new_header,
gtxid,
},
height,
)]);
assert_eq!(regtest.height(), height);
assert_eq!(
regtest.tip().to_hex(),

View File

@ -1,8 +1,6 @@
use anyhow::{Context, Result};
use bitcoin::{
consensus::serialize, hashes::hex::ToHex, Amount, Block, BlockHash, Transaction, Txid,
};
use bitcoin::{consensus::serialize, hashes::hex::ToHex, Amount, BlockHash, Transaction, Txid};
use bitcoincore_rpc::{json, jsonrpc, Auth, Client, RpcApi};
use crossbeam_channel::Receiver;
use parking_lot::Mutex;
@ -13,10 +11,10 @@ use std::io::Read;
use std::path::Path;
use crate::{
chain::{Chain, NewHeader},
chain::{Chain, NewBlockHash},
config::Config,
metrics::Metrics,
p2p::Connection,
p2p::{BlockRequest, Connection},
signals::ExitFlag,
};
@ -230,16 +228,15 @@ impl Daemon {
.context("failed to get mempool entry")
}
pub(crate) fn get_new_headers(&self, chain: &Chain) -> Result<Vec<NewHeader>> {
pub(crate) fn get_new_headers(&self, chain: &Chain) -> Result<Vec<NewBlockHash>> {
self.p2p.lock().get_new_headers(chain)
}
pub(crate) fn for_blocks<B, F>(&self, blockhashes: B, func: F) -> Result<()>
pub(crate) fn for_blocks<F>(&self, requests: Vec<BlockRequest>, func: F) -> Result<()>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, Block),
F: FnMut(BlockRequest, &[u8]),
{
self.p2p.lock().for_blocks(blockhashes, func)
self.p2p.lock().for_blocks(requests, func)
}
pub(crate) fn new_block_notification(&self) -> Receiver<()> {

View File

@ -13,6 +13,7 @@ pub(crate) struct WriteBatch {
pub(crate) funding_rows: Vec<Row>,
pub(crate) spending_rows: Vec<Row>,
pub(crate) txid_rows: Vec<Row>,
pub(crate) offset_rows: Vec<(Row, Row)>,
}
impl WriteBatch {
@ -21,6 +22,7 @@ impl WriteBatch {
self.funding_rows.sort_unstable();
self.spending_rows.sort_unstable();
self.txid_rows.sort_unstable();
self.offset_rows.sort_unstable();
}
}
@ -33,10 +35,18 @@ pub struct DBStore {
const CONFIG_CF: &str = "config";
const HEADERS_CF: &str = "headers";
const TXID_CF: &str = "txid";
const OFFSET_CF: &str = "offset";
const FUNDING_CF: &str = "funding";
const SPENDING_CF: &str = "spending";
const COLUMN_FAMILIES: &[&str] = &[CONFIG_CF, HEADERS_CF, TXID_CF, FUNDING_CF, SPENDING_CF];
const COLUMN_FAMILIES: &[&str] = &[
CONFIG_CF,
HEADERS_CF,
TXID_CF,
OFFSET_CF,
FUNDING_CF,
SPENDING_CF,
];
const CONFIG_KEY: &str = "C";
const TIP_KEY: &[u8] = b"T";
@ -84,7 +94,7 @@ struct Config {
format: u64,
}
const CURRENT_FORMAT: u64 = 0;
const CURRENT_FORMAT: u64 = 1;
impl Default for Config {
fn default() -> Self {
@ -207,6 +217,10 @@ impl DBStore {
self.db.cf_handle(TXID_CF).expect("missing TXID_CF")
}
fn offset_cf(&self) -> &rocksdb::ColumnFamily {
self.db.cf_handle(OFFSET_CF).expect("missing TXID_CF")
}
fn headers_cf(&self) -> &rocksdb::ColumnFamily {
self.db.cf_handle(HEADERS_CF).expect("missing HEADERS_CF")
}
@ -252,6 +266,12 @@ impl DBStore {
.expect("get_tip failed")
}
pub(crate) fn get_tx_offset(&self, key: &[u8]) -> Option<Vec<u8>> {
self.db
.get_cf(self.offset_cf(), key)
.expect("get_tx_offset failed")
}
pub(crate) fn write(&self, batch: &WriteBatch) {
let mut db_batch = rocksdb::WriteBatch::default();
for key in &batch.funding_rows {
@ -263,6 +283,9 @@ impl DBStore {
for key in &batch.txid_rows {
db_batch.put_cf(self.txid_cf(), key, b"");
}
for (key, value) in &batch.offset_rows {
db_batch.put_cf(self.offset_cf(), key, value);
}
for key in &batch.header_rows {
db_batch.put_cf(self.headers_cf(), key, b"");
}

View File

@ -1,14 +1,18 @@
use anyhow::{Context, Result};
use bitcoin::consensus::{deserialize, serialize};
use bitcoin::{Block, BlockHash, OutPoint, Txid};
use bitcoin::{BlockHash, OutPoint, Txid};
use crate::{
chain::{Chain, NewHeader},
chain::{Chain, IndexedHeader, NewBlockHash},
daemon::Daemon,
db::{DBStore, Row, WriteBatch},
metrics::{self, Gauge, Histogram, Metrics},
p2p::BlockRequest,
signals::ExitFlag,
types::{HashPrefixRow, HeaderRow, ScriptHash, ScriptHashRow, SpendingPrefixRow, TxidRow},
types::{
GlobalTxId, HashPrefixRow, HeaderRow, ScriptHash, ScriptHashRow, SpendingPrefixRow,
TxLocation, TxOffsetRow, TxidRow,
},
};
#[derive(Clone)]
@ -78,6 +82,7 @@ struct IndexResult {
funding_rows: Vec<HashPrefixRow>,
spending_rows: Vec<HashPrefixRow>,
txid_rows: Vec<HashPrefixRow>,
offset_rows: Vec<TxOffsetRow>,
}
impl IndexResult {
@ -91,6 +96,9 @@ impl IndexResult {
let txid_rows = self.txid_rows.iter().map(HashPrefixRow::to_db_row);
batch.txid_rows.extend(txid_rows);
let offset_rows = self.offset_rows.iter().map(|row| (row.key(), row.value()));
batch.offset_rows.extend(offset_rows);
batch.header_rows.push(self.header_row.to_db_row());
batch.tip_row = serialize(&self.header_row.header.block_hash()).into_boxed_slice();
}
@ -117,12 +125,12 @@ impl Index {
) -> Result<Self> {
if let Some(row) = store.get_tip() {
let tip = deserialize(&row).expect("invalid tip");
let headers = store
let rows: Vec<HeaderRow> = store
.read_headers()
.into_iter()
.map(|row| HeaderRow::from_db_row(&row).header)
.map(|row| HeaderRow::from_db_row(&row))
.collect();
chain.load(headers, tip);
chain.load(&rows, tip);
chain.drop_last_headers(reindex_last_blocks);
};
let stats = Stats::new(metrics);
@ -154,31 +162,43 @@ impl Index {
Ok(result)
}
pub(crate) fn get_tx_location(&self, id: GlobalTxId) -> Option<TxLocation> {
let blockhash = self.chain.get_block_hash_by_gtxid(id);
let offset = self
.store
.get_tx_offset(&serialize(&id))
.map(|value| TxOffsetRow::parse_offset(&value));
match (blockhash, offset) {
(Some(blockhash), Some(offset)) => Some(TxLocation { blockhash, offset }),
_ => None,
}
}
pub(crate) fn filter_by_txid(&self, txid: Txid) -> impl Iterator<Item = BlockHash> + '_ {
self.store
.iter_txid(TxidRow::scan_prefix(txid))
.map(|row| HashPrefixRow::from_db_row(&row).height())
.filter_map(move |height| self.chain.get_block_hash(height))
.map(|row| HashPrefixRow::from_db_row(&row).id())
.filter_map(move |id| self.chain.get_block_hash_by_gtxid(id))
}
pub(crate) fn filter_by_funding(
&self,
scripthash: ScriptHash,
) -> impl Iterator<Item = BlockHash> + '_ {
) -> impl Iterator<Item = TxLocation> + '_ {
self.store
.iter_funding(ScriptHashRow::scan_prefix(scripthash))
.map(|row| HashPrefixRow::from_db_row(&row).height())
.filter_map(move |height| self.chain.get_block_hash(height))
.map(|row| HashPrefixRow::from_db_row(&row).id())
.filter_map(move |id| self.get_tx_location(id))
}
pub(crate) fn filter_by_spending(
&self,
outpoint: OutPoint,
) -> impl Iterator<Item = BlockHash> + '_ {
) -> impl Iterator<Item = TxLocation> + '_ {
self.store
.iter_spending(SpendingPrefixRow::scan_prefix(outpoint))
.map(|row| HashPrefixRow::from_db_row(&row).height())
.filter_map(move |height| self.chain.get_block_hash(height))
.map(|row| HashPrefixRow::from_db_row(&row).id())
.filter_map(move |id| self.get_tx_location(id))
}
// Return `Ok(true)` when the chain is fully synced and the index is compacted.
@ -202,6 +222,8 @@ impl Index {
return Ok(true); // no more blocks to index (done for now)
}
}
let mut gtxid = self.chain.gtxid();
let mut indexed_headers = Vec::with_capacity(new_headers.len());
for chunk in new_headers.chunks(self.batch_size) {
exit_flag.poll().with_context(|| {
format!(
@ -209,22 +231,33 @@ impl Index {
chunk.first().unwrap().height()
)
})?;
self.sync_blocks(daemon, chunk)?;
indexed_headers.extend(self.sync_blocks(daemon, chunk, &mut gtxid)?);
}
self.chain.update(new_headers);
self.chain.update(indexed_headers);
self.stats.observe_chain(&self.chain);
Ok(false) // sync is not done
}
fn sync_blocks(&mut self, daemon: &Daemon, chunk: &[NewHeader]) -> Result<()> {
let blockhashes: Vec<BlockHash> = chunk.iter().map(|h| h.hash()).collect();
fn sync_blocks(
&mut self,
daemon: &Daemon,
chunk: &[NewBlockHash],
gtxid: &mut GlobalTxId,
) -> Result<Vec<IndexedHeader>> {
let mut indexed_headers = Vec::with_capacity(chunk.len());
let requests: Vec<BlockRequest> = chunk
.iter()
.map(|h| BlockRequest::get_full_block(h.hash()))
.collect();
let mut heights = chunk.iter().map(|h| h.height());
let mut batch = WriteBatch::default();
daemon.for_blocks(blockhashes, |_blockhash, block| {
daemon.for_blocks(requests, |req, raw| {
let height = heights.next().expect("unexpected block");
self.stats.observe_duration("block", || {
index_single_block(block, height).extend(&mut batch)
let result = index_single_block(req, raw, gtxid);
indexed_headers.push(IndexedHeader::from(&result.header_row, height));
result.extend(&mut batch);
});
self.stats.height.set("tip", height as f64);
})?;
@ -239,7 +272,7 @@ impl Index {
self.stats
.observe_duration("write", || self.store.write(&batch));
self.stats.observe_db(&self.store);
Ok(())
Ok(indexed_headers)
}
pub(crate) fn is_ready(&self) -> bool {
@ -251,13 +284,17 @@ fn db_rows_size(rows: &[Row]) -> usize {
rows.iter().map(|key| key.len()).sum()
}
fn index_single_block(block: Block, height: usize) -> IndexResult {
let mut funding_rows = Vec::with_capacity(block.txdata.iter().map(|tx| tx.output.len()).sum());
let mut spending_rows = Vec::with_capacity(block.txdata.iter().map(|tx| tx.input.len()).sum());
let mut txid_rows = Vec::with_capacity(block.txdata.len());
fn index_single_block(req: BlockRequest, raw: &[u8], gtxid: &mut GlobalTxId) -> IndexResult {
let header = req.parse_header(raw);
let mut funding_rows = vec![];
let mut spending_rows = vec![];
let mut txid_rows = vec![];
let mut offset_rows = vec![];
for tx in &block.txdata {
txid_rows.push(TxidRow::row(tx.txid(), height));
for (tx, offset) in req.parse_transactions(raw) {
gtxid.next();
txid_rows.push(TxidRow::row(tx.txid(), *gtxid));
offset_rows.push(TxOffsetRow::row(*gtxid, offset));
funding_rows.extend(
tx.output
@ -265,7 +302,7 @@ fn index_single_block(block: Block, height: usize) -> IndexResult {
.filter(|txo| !txo.script_pubkey.is_provably_unspendable())
.map(|txo| {
let scripthash = ScriptHash::new(&txo.script_pubkey);
ScriptHashRow::row(scripthash, height)
ScriptHashRow::row(scripthash, *gtxid)
}),
);
@ -275,13 +312,14 @@ fn index_single_block(block: Block, height: usize) -> IndexResult {
spending_rows.extend(
tx.input
.iter()
.map(|txin| SpendingPrefixRow::row(txin.previous_output, height)),
.map(|txin| SpendingPrefixRow::row(txin.previous_output, *gtxid)),
);
}
IndexResult {
funding_rows,
spending_rows,
txid_rows,
header_row: HeaderRow::new(block.header),
offset_rows,
header_row: HeaderRow::new(header, *gtxid),
}
}

View File

@ -11,19 +11,20 @@ use bitcoin::{
message_network,
},
secp256k1::{self, rand::Rng},
Block, BlockHash, BlockHeader, Network,
Block, BlockHash, BlockHeader, Network, Transaction,
};
use crossbeam_channel::{bounded, select, Receiver, Sender};
use std::io::{self, ErrorKind, Write};
use std::io::{self, Cursor, ErrorKind, Seek, SeekFrom, Write};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use crate::{
chain::{Chain, NewHeader},
chain::{Chain, NewBlockHash},
config::ELECTRS_VERSION,
metrics::{default_duration_buckets, default_size_buckets, Histogram, Metrics},
types::TxLocation,
};
enum Request {
@ -43,15 +44,76 @@ impl Request {
Request::GetBlocks(
blockhashes
.iter()
.map(|blockhash| Inventory::WitnessBlock(*blockhash))
.copied()
.map(Inventory::WitnessBlock)
.collect(),
)
}
}
pub(crate) struct BlockRequest {
hash: BlockHash,
offsets: Option<Vec<usize>>, // if None, fetch all txs
}
impl BlockRequest {
pub fn get_single_transaction(loc: TxLocation) -> Self {
Self {
hash: loc.blockhash,
offsets: Some(vec![loc.offset as usize]),
}
}
pub fn get_full_block(hash: BlockHash) -> Self {
Self {
hash,
offsets: None,
}
}
pub fn blockhash(&self) -> BlockHash {
self.hash
}
pub fn parse_header<'a>(&'a self, mut block: &'a [u8]) -> BlockHeader {
let header = BlockHeader::consensus_decode(&mut block).expect("TODO: better handling");
assert_eq!(header.block_hash(), self.hash, "got wrong block");
header
}
pub fn parse_transactions<'a>(
&'a self,
block: &'a [u8],
) -> Box<dyn Iterator<Item = (Transaction, usize)> + 'a> {
if let Some(offsets) = &self.offsets {
let result = offsets.iter().map(move |offset| {
let tx = Transaction::consensus_decode(&block[*offset..])
.expect("TODO: better handling");
(tx, *offset)
});
return Box::new(result);
}
let mut cursor = Cursor::new(block);
cursor
.seek(SeekFrom::Start(80))
.expect("TODO: better handling"); // skip block header
let count = VarInt::consensus_decode(&mut cursor)
.expect("TODO: better handling")
.0;
let result = (0..count).map(move |_| {
let offset = cursor.position() as usize;
let tx = Transaction::consensus_decode(&mut cursor).expect("TODO: better handling");
(tx, offset)
});
// TODO: check all block is parsed
Box::new(result)
}
// TODO: add tests
}
pub(crate) struct Connection {
req_send: Sender<Request>,
blocks_recv: Receiver<Block>,
blocks_recv: Receiver<Vec<u8>>,
headers_recv: Receiver<Vec<BlockHeader>>,
new_block_recv: Receiver<()>,
@ -62,7 +124,7 @@ impl Connection {
/// Get new block headers (supporting reorgs).
/// https://en.bitcoin.it/wiki/Protocol_documentation#getheaders
/// Defined as `&mut self` to prevent concurrent invocations (https://github.com/romanz/electrs/pull/526#issuecomment-934685515).
pub(crate) fn get_new_headers(&mut self, chain: &Chain) -> Result<Vec<NewHeader>> {
pub(crate) fn get_new_headers(&mut self, chain: &Chain) -> Result<Vec<NewBlockHash>> {
self.req_send.send(Request::get_new_headers(chain))?;
let headers = self
.headers_recv
@ -79,41 +141,42 @@ impl Connection {
None => bail!("missing prev_blockhash: {}", prev_blockhash),
};
Ok(headers
.into_iter()
.iter()
.zip(new_heights)
.map(NewHeader::from)
.map(|(header, height)| NewBlockHash::from(header.block_hash(), height))
.collect())
}
/// Request and process the specified blocks (in the specified order).
/// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks for details.
/// Defined as `&mut self` to prevent concurrent invocations (https://github.com/romanz/electrs/pull/526#issuecomment-934685515).
pub(crate) fn for_blocks<B, F>(&mut self, blockhashes: B, mut func: F) -> Result<()>
pub(crate) fn for_blocks<F>(&mut self, requests: Vec<BlockRequest>, mut func: F) -> Result<()>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, Block),
F: FnMut(BlockRequest, &[u8]),
{
self.blocks_duration.observe_duration("total", || {
let blockhashes: Vec<BlockHash> = blockhashes.into_iter().collect();
if blockhashes.is_empty() {
if requests.is_empty() {
return Ok(());
}
let blockhashes: Vec<_> = requests.iter().map(|req| req.hash).collect();
self.blocks_duration.observe_duration("request", || {
debug!("loading {} blocks", blockhashes.len());
debug!("loading {} blocks", requests.len());
self.req_send.send(Request::get_blocks(&blockhashes))
})?;
for hash in blockhashes {
let block = self.blocks_duration.observe_duration("response", || {
let block = self
.blocks_recv
.recv()
.with_context(|| format!("failed to get block {}", hash))?;
ensure!(block.block_hash() == hash, "got unexpected block");
Ok(block)
})?;
for (req, blockhash) in requests.into_iter().zip(blockhashes.into_iter()) {
let block =
self.blocks_duration
.observe_duration("response", || -> Result<Vec<u8>> {
let block = self
.blocks_recv
.recv()
.with_context(|| format!("failed to get block {}", blockhash))?;
// ensure!(block.block_hash() == hash, "got unexpected block"); // TODO: add it back
Ok(block)
})?;
self.blocks_duration
.observe_duration("process", || func(hash, block));
.observe_duration("process", || func(req, &block));
}
Ok(())
})
@ -227,7 +290,7 @@ impl Connection {
});
let (req_send, req_recv) = bounded::<Request>(1);
let (blocks_send, blocks_recv) = bounded::<Block>(10);
let (blocks_send, blocks_recv) = bounded::<Vec<u8>>(10);
let (headers_send, headers_recv) = bounded::<Vec<BlockHeader>>(1);
let (new_block_send, new_block_recv) = bounded::<()>(0);
let (init_send, init_recv) = bounded::<()>(0);
@ -271,10 +334,10 @@ impl Connection {
NetworkMessage::Verack => {
init_send.send(())?; // peer acknowledged our version
}
NetworkMessage::Block(block) => blocks_send.send(block)?,
NetworkMessage::Headers(headers) => headers_send.send(headers)?,
NetworkMessage::Alert(_) => (), // https://bitcoin.org/en/alert/2016-11-01-alert-retirement
NetworkMessage::Addr(_) => (), // unused
NetworkMessage::Unknown { command, payload } if command.as_ref() == "block" => blocks_send.send(payload)?,
msg => warn!("unexpected message: {:?}", msg),
}
}
@ -343,7 +406,6 @@ impl RawNetworkMessage {
"verack" => NetworkMessage::Verack,
"inv" => NetworkMessage::Inv(Decodable::consensus_decode(&mut raw)?),
"notfound" => NetworkMessage::NotFound(Decodable::consensus_decode(&mut raw)?),
"block" => NetworkMessage::Block(Decodable::consensus_decode(&mut raw)?),
"headers" => {
let len = VarInt::consensus_decode(&mut raw)?.0;
let mut headers = Vec::with_capacity(len as usize);

View File

@ -1,7 +1,7 @@
use anyhow::Result;
use bitcoin::{
hashes::{sha256, Hash, HashEngine},
Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid,
Amount, BlockHash, OutPoint, SignedAmount, Transaction, Txid,
};
use rayon::prelude::*;
use serde::ser::{Serialize, Serializer};
@ -15,7 +15,8 @@ use crate::{
daemon::Daemon,
index::Index,
mempool::Mempool,
types::{ScriptHash, StatusHash},
p2p::BlockRequest,
types::{ScriptHash, StatusHash, TxLocation},
};
/// Given a scripthash, store relevant inputs and outputs of a specific transaction
@ -303,17 +304,20 @@ impl ScriptHashStatus {
}
/// Apply func only on the new blocks (fetched from daemon).
fn for_new_blocks<B, F>(&self, blockhashes: B, daemon: &Daemon, func: F) -> Result<()>
fn for_new_blocks<B, F>(&self, locations: B, daemon: &Daemon, mut func: F) -> Result<()>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, Block),
B: IntoIterator<Item = TxLocation>,
F: FnMut(BlockRequest, &[u8]),
{
daemon.for_blocks(
blockhashes
.into_iter()
.filter(|blockhash| !self.confirmed.contains_key(blockhash)),
func,
)
let requests = locations
.into_iter()
.filter(|loc| !self.confirmed.contains_key(&loc.blockhash))
.map(BlockRequest::get_single_transaction)
.collect();
daemon.for_blocks(requests, |loc, blob| {
// assert_eq!(loc.blockhash, block.block_hash()); // TODO: re-add
func(loc, blob)
})
}
/// Get funding and spending entries from new blocks.
@ -328,12 +332,12 @@ impl ScriptHashStatus {
let scripthash = self.scripthash;
let mut result = HashMap::<BlockHash, HashMap<usize, TxEntry>>::new();
let funding_blockhashes = index.limit_result(index.filter_by_funding(scripthash))?;
self.for_new_blocks(funding_blockhashes, daemon, |blockhash, block| {
let block_entries = result.entry(blockhash).or_default();
filter_block_txs(block, |tx| filter_outputs(tx, scripthash)).for_each(
let funding_locations = index.limit_result(index.filter_by_funding(scripthash))?;
self.for_new_blocks(funding_locations, daemon, |req, blob| {
let block_entries = result.entry(req.blockhash()).or_default();
filter_block_txs(blob, &req, |tx| filter_outputs(tx, scripthash)).for_each(
|FilteredTx {
pos,
offset,
tx,
txid,
result: funding_outputs,
@ -341,28 +345,28 @@ impl ScriptHashStatus {
cache.add_tx(txid, move || tx);
outpoints.extend(make_outpoints(txid, &funding_outputs));
block_entries
.entry(pos)
.entry(offset)
.or_insert_with(|| TxEntry::new(txid))
.outputs = funding_outputs;
},
);
})?;
let spending_blockhashes: HashSet<BlockHash> = outpoints
let spending_locations: HashSet<TxLocation> = outpoints
.par_iter()
.flat_map_iter(|outpoint| index.filter_by_spending(*outpoint))
.collect();
self.for_new_blocks(spending_blockhashes, daemon, |blockhash, block| {
let block_entries = result.entry(blockhash).or_default();
filter_block_txs(block, |tx| filter_inputs(tx, outpoints)).for_each(
self.for_new_blocks(spending_locations, daemon, |req, blob| {
let block_entries = result.entry(req.blockhash()).or_default();
filter_block_txs(blob, &req, |tx| filter_inputs(tx, outpoints)).for_each(
|FilteredTx {
pos,
offset,
tx,
txid,
result: spent_outpoints,
}| {
cache.add_tx(txid, move || tx);
block_entries
.entry(pos)
.entry(offset)
.or_insert_with(|| TxEntry::new(txid))
.spent = spent_outpoints;
},
@ -377,7 +381,7 @@ impl ScriptHashStatus {
.into_iter()
.collect::<BTreeMap<usize, TxEntry>>()
.into_iter()
.map(|(_pos, entry)| entry)
.map(|(_offset, entry)| entry)
.collect::<Vec<TxEntry>>();
(blockhash, sorted_entries)
})
@ -510,33 +514,31 @@ fn compute_status_hash(history: &[HistoryEntry]) -> Option<StatusHash> {
struct FilteredTx<T> {
tx: Transaction,
txid: Txid,
pos: usize,
offset: usize,
result: Vec<T>,
}
fn filter_block_txs<T: Send>(
block: Block,
map_fn: impl Fn(&Transaction) -> Vec<T> + Sync,
) -> impl Iterator<Item = FilteredTx<T>> {
block
.txdata
.into_par_iter()
.enumerate()
.filter_map(|(pos, tx)| {
fn filter_block_txs<'a, T: Send>(
blob: &'a [u8],
req: &'a BlockRequest,
map_fn: impl Fn(&Transaction) -> Vec<T> + Sync + 'a,
) -> impl Iterator<Item = FilteredTx<T>> + 'a {
req.parse_transactions(blob)
.into_iter()
.filter_map(move |(tx, offset)| {
let result = map_fn(&tx);
if result.is_empty() {
return None; // skip irrelevant transaction
None // skip irrelevant transaction
} else {
let txid = tx.txid();
Some(FilteredTx {
tx,
txid,
offset,
result,
})
}
let txid = tx.txid();
Some(FilteredTx {
tx,
txid,
pos,
result,
})
})
.collect::<Vec<_>>()
.into_iter()
}
#[cfg(test)]

View File

@ -5,7 +5,7 @@ use std::convert::TryFrom;
use bitcoin::{
consensus::encode::{deserialize, serialize, Decodable, Encodable},
hashes::{borrow_slice_impl, hash_newtype, hex_fmt_impl, index_impl, serde_impl, sha256, Hash},
BlockHeader, OutPoint, Script, Txid,
BlockHash, BlockHeader, OutPoint, Script, Txid,
};
use crate::db;
@ -37,15 +37,41 @@ macro_rules! impl_consensus_encoding {
);
}
#[derive(Debug, Eq, PartialEq, Hash)]
pub(crate) struct TxLocation {
pub blockhash: BlockHash,
pub offset: u32, // byte offset within the block
}
const HASH_PREFIX_LEN: usize = 8;
type HashPrefix = [u8; HASH_PREFIX_LEN];
type Height = u32;
#[derive(Debug, Serialize, Deserialize, PartialEq, Copy, Clone, Default, Ord, Eq, PartialOrd)]
pub(crate) struct GlobalTxId {
id: u32, // transaction index in blockchain order
}
impl From<u64> for GlobalTxId {
fn from(id: u64) -> Self {
Self {
id: u32::try_from(id).expect("GlobalTxId is too large"),
}
}
}
impl GlobalTxId {
pub fn next(&mut self) {
self.id += 1;
}
}
impl_consensus_encoding!(GlobalTxId, id);
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub(crate) struct HashPrefixRow {
prefix: [u8; HASH_PREFIX_LEN],
height: Height, // transaction confirmed height
id: GlobalTxId,
}
impl HashPrefixRow {
@ -57,12 +83,12 @@ impl HashPrefixRow {
deserialize(row).expect("bad HashPrefixRow")
}
pub fn height(&self) -> usize {
usize::try_from(self.height).expect("invalid height")
pub fn id(&self) -> GlobalTxId {
self.id
}
}
impl_consensus_encoding!(HashPrefixRow, prefix, height);
impl_consensus_encoding!(HashPrefixRow, prefix, id);
hash_newtype!(
ScriptHash,
@ -91,10 +117,10 @@ impl ScriptHashRow {
scripthash.0[..HASH_PREFIX_LEN].to_vec().into_boxed_slice()
}
pub(crate) fn row(scripthash: ScriptHash, height: usize) -> HashPrefixRow {
pub(crate) fn row(scripthash: ScriptHash, id: GlobalTxId) -> HashPrefixRow {
HashPrefixRow {
prefix: scripthash.prefix(),
height: Height::try_from(height).expect("invalid height"),
id,
}
}
}
@ -125,10 +151,10 @@ impl SpendingPrefixRow {
Box::new(spending_prefix(outpoint))
}
pub(crate) fn row(outpoint: OutPoint, height: usize) -> HashPrefixRow {
pub(crate) fn row(outpoint: OutPoint, id: GlobalTxId) -> HashPrefixRow {
HashPrefixRow {
prefix: spending_prefix(outpoint),
height: Height::try_from(height).expect("invalid height"),
id,
}
}
}
@ -148,26 +174,53 @@ impl TxidRow {
Box::new(txid_prefix(&txid))
}
pub(crate) fn row(txid: Txid, height: usize) -> HashPrefixRow {
pub(crate) fn row(txid: Txid, id: GlobalTxId) -> HashPrefixRow {
HashPrefixRow {
prefix: txid_prefix(&txid),
height: Height::try_from(height).expect("invalid height"),
id,
}
}
}
pub(crate) struct TxOffsetRow {
id: GlobalTxId,
offset: u32, // byte offset within a block
}
impl TxOffsetRow {
pub(crate) fn row(id: GlobalTxId, offset: usize) -> Self {
Self {
id,
offset: u32::try_from(offset).expect("too large offset"),
}
}
pub(crate) fn key(&self) -> Box<[u8]> {
serialize(&self.id).into_boxed_slice()
}
pub(crate) fn value(&self) -> Box<[u8]> {
serialize(&self.offset).into_boxed_slice()
}
pub(crate) fn parse_offset(bytes: &[u8]) -> u32 {
deserialize(bytes).expect("invalid offset")
}
}
// ***************************************************************************
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct HeaderRow {
pub(crate) header: BlockHeader,
pub(crate) gtxid: GlobalTxId, // after indexing this block (cumulative # of transactions)
}
impl_consensus_encoding!(HeaderRow, header);
impl_consensus_encoding!(HeaderRow, header, gtxid);
impl HeaderRow {
pub(crate) fn new(header: BlockHeader) -> Self {
Self { header }
pub(crate) fn new(header: BlockHeader, gtxid: GlobalTxId) -> Self {
Self { header, gtxid }
}
pub(crate) fn to_db_row(&self) -> db::Row {
@ -181,7 +234,9 @@ impl HeaderRow {
#[cfg(test)]
mod tests {
use crate::types::{spending_prefix, HashPrefixRow, ScriptHash, ScriptHashRow, TxidRow};
use crate::types::{
spending_prefix, GlobalTxId, HashPrefixRow, ScriptHash, ScriptHashRow, TxidRow,
};
use bitcoin::{hashes::hex::ToHex, Address, OutPoint, Txid};
use serde_json::{from_str, json};
@ -199,7 +254,7 @@ mod tests {
fn test_scripthash_row() {
let hex = "\"4b3d912c1523ece4615e91bf0d27381ca72169dbf6b1c2ffcc9f92381d4984a3\"";
let scripthash: ScriptHash = from_str(&hex).unwrap();
let row1 = ScriptHashRow::row(scripthash, 123456);
let row1 = ScriptHashRow::row(scripthash, GlobalTxId::from(123456));
let db_row = row1.to_db_row();
assert_eq!(db_row[..].to_hex(), "a384491d38929fcc40e20100");
let row2 = HashPrefixRow::from_db_row(&db_row);
@ -222,8 +277,8 @@ mod tests {
let hex = "d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599";
let txid = Txid::from_str(hex).unwrap();
let row1 = TxidRow::row(txid, 91812);
let row2 = TxidRow::row(txid, 91842);
let row1 = TxidRow::row(txid, GlobalTxId::from(91812));
let row2 = TxidRow::row(txid, GlobalTxId::from(91842));
assert_eq!(row1.to_db_row().to_hex(), "9985d82954e10f22a4660100");
assert_eq!(row2.to_db_row().to_hex(), "9985d82954e10f22c2660100");
@ -235,8 +290,8 @@ mod tests {
let hex = "e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468";
let txid = Txid::from_str(hex).unwrap();
let row1 = TxidRow::row(txid, 91722);
let row2 = TxidRow::row(txid, 91880);
let row1 = TxidRow::row(txid, GlobalTxId::from(91722));
let row2 = TxidRow::row(txid, GlobalTxId::from(91880));
// low-endian encoding => rows should be sorted according to block height
assert_eq!(row1.to_db_row().to_hex(), "68b45f58b674e94e4a660100");