1
0
mirror of https://github.com/romanz/electrs.git synced 2024-11-19 01:43:29 +01:00

Add blockchain.outpoint.subscribe RPC

This commit is contained in:
Roman Zeyde 2021-08-13 20:34:40 +03:00
parent 41763a44b3
commit 834bbd70f8
3 changed files with 196 additions and 6 deletions

View File

@ -2,7 +2,7 @@ use anyhow::{bail, Context, Result};
use bitcoin::{
consensus::{deserialize, encode::serialize_hex},
hashes::hex::FromHex,
BlockHash, Txid,
BlockHash, OutPoint, Txid,
};
use crossbeam_channel::Receiver;
use rayon::prelude::*;
@ -21,7 +21,7 @@ use crate::{
merkle::Proof,
metrics::{self, Histogram, Metrics},
signals::Signal,
status::ScriptHashStatus,
status::{OutPointStatus, ScriptHashStatus},
tracker::Tracker,
types::ScriptHash,
};
@ -36,6 +36,7 @@ const UNSUBSCRIBED_QUERY_MESSAGE: &str = "your wallet uses less efficient method
pub struct Client {
tip: Option<BlockHash>,
scripthashes: HashMap<ScriptHash, ScriptHashStatus>,
outpoints: HashMap<OutPoint, OutPointStatus>,
}
#[derive(Deserialize)]
@ -185,7 +186,25 @@ impl Rpc {
}
})
.collect::<Result<Vec<Value>>>()
.context("failed to update status")?;
.context("failed to update scripthash status")?;
notifications.extend(
client
.outpoints
.par_iter_mut()
.filter_map(|(outpoint, status)| -> Option<Result<Value>> {
match self.tracker.update_outpoint_status(status, &self.daemon) {
Ok(true) => Some(Ok(notification(
"blockchain.outpoint.subscribe",
&[json!([outpoint.txid, outpoint.vout]), json!(status)],
))),
Ok(false) => None, // outpoint status is the same
Err(e) => Some(Err(e)),
}
})
.collect::<Result<Vec<Value>>>()
.context("failed to update scripthash status")?,
);
if let Some(old_tip) = client.tip {
let new_tip = self.tracker.chain().tip();
@ -350,6 +369,28 @@ impl Rpc {
})
}
fn outpoint_subscribe(&self, client: &mut Client, (txid, vout): (Txid, u32)) -> Result<Value> {
let outpoint = OutPoint::new(txid, vout);
Ok(match client.outpoints.entry(outpoint) {
Entry::Occupied(e) => json!(e.get()),
Entry::Vacant(e) => {
let outpoint = OutPoint::new(txid, vout);
let mut status = OutPointStatus::new(outpoint);
self.tracker
.update_outpoint_status(&mut status, &self.daemon)?;
json!(e.insert(status))
}
})
}
fn outpoint_unsubscribe(
&self,
client: &mut Client,
(txid, vout): (Txid, u32),
) -> Result<Value> {
Ok(json!(client.outpoints.remove(&OutPoint::new(txid, vout))))
}
fn new_status(&self, scripthash: ScriptHash) -> Result<ScriptHashStatus> {
let mut status = ScriptHashStatus::new(scripthash);
self.tracker
@ -548,6 +589,8 @@ impl Rpc {
Params::Features => self.features(),
Params::HeadersSubscribe => self.headers_subscribe(client),
Params::MempoolFeeHistogram => self.get_fee_histogram(),
Params::OutPointSubscribe(args) => self.outpoint_subscribe(client, *args),
Params::OutPointUnsubscribe(args) => self.outpoint_unsubscribe(client, *args),
Params::PeersSubscribe => Ok(json!([])),
Params::Ping => Ok(Value::Null),
Params::RelayFee => self.relayfee(),
@ -572,12 +615,13 @@ enum Params {
Banner,
BlockHeader((usize,)),
BlockHeaders((usize, usize)),
TransactionBroadcast((String,)),
Donation,
EstimateFee((u16,)),
Features,
HeadersSubscribe,
MempoolFeeHistogram,
OutPointSubscribe((Txid, u32)), // TODO: support spk_hint
OutPointUnsubscribe((Txid, u32)),
PeersSubscribe,
Ping,
RelayFee,
@ -586,6 +630,7 @@ enum Params {
ScriptHashListUnspent((ScriptHash,)),
ScriptHashSubscribe((ScriptHash,)),
ScriptHashUnsubscribe((ScriptHash,)),
TransactionBroadcast((String,)),
TransactionGet(TxGetArgs),
TransactionGetMerkle((Txid, usize)),
TransactionFromPosition((usize, usize, bool)),
@ -599,6 +644,8 @@ impl Params {
"blockchain.block.headers" => Params::BlockHeaders(convert(params)?),
"blockchain.estimatefee" => Params::EstimateFee(convert(params)?),
"blockchain.headers.subscribe" => Params::HeadersSubscribe,
"blockchain.outpoint.subscribe" => Params::OutPointSubscribe(convert(params)?),
"blockchain.outpoint.unsubscribe" => Params::OutPointUnsubscribe(convert(params)?),
"blockchain.relayfee" => Params::RelayFee,
"blockchain.scripthash.get_balance" => Params::ScriptHashGetBalance(convert(params)?),
"blockchain.scripthash.get_history" => Params::ScriptHashGetHistory(convert(params)?),

View File

@ -4,7 +4,7 @@ use bitcoin::{
Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid,
};
use rayon::prelude::*;
use serde::ser::{Serialize, Serializer};
use serde::ser::{Serialize, SerializeMap, Serializer};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::convert::TryFrom;
@ -48,12 +48,26 @@ impl TxEntry {
// Confirmation height of a transaction or its mempool state:
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-history
// https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-mempool
#[derive(Copy, Clone, Eq, PartialEq)]
enum Height {
Confirmed { height: usize },
Unconfirmed { has_unconfirmed_inputs: bool },
}
impl Height {
fn from_blockhash(blockhash: BlockHash, chain: &Chain) -> Self {
let height = chain
.get_block_height(&blockhash)
.expect("missing block in chain");
Self::Confirmed { height }
}
fn unconfirmed(e: &crate::mempool::Entry) -> Self {
Self::Unconfirmed {
has_unconfirmed_inputs: e.has_unconfirmed_inputs,
}
}
fn as_i64(&self) -> i64 {
match self {
Self::Confirmed { height } => i64::try_from(*height).unwrap(),
@ -538,6 +552,127 @@ fn filter_block_txs<T: Send>(
.into_iter()
}
pub(crate) struct OutPointStatus {
outpoint: OutPoint,
funding: Option<Height>,
spending: Option<(Txid, Height)>,
tip: BlockHash,
}
impl Serialize for OutPointStatus {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut map = serializer.serialize_map(None)?;
if let Some(funding) = &self.funding {
map.serialize_entry("height", &funding)?;
}
if let Some((txid, height)) = &self.spending {
map.serialize_entry("spender_txhash", &txid)?;
map.serialize_entry("spender_height", &height)?;
}
map.end()
}
}
impl OutPointStatus {
pub(crate) fn new(outpoint: OutPoint) -> Self {
Self {
outpoint,
funding: None,
spending: None,
tip: BlockHash::all_zeros(),
}
}
pub(crate) fn sync(
&mut self,
index: &Index,
mempool: &Mempool,
daemon: &Daemon,
) -> Result<bool> {
let funding = self.sync_funding(index, daemon, mempool)?;
let spending = self.sync_spending(index, daemon, mempool)?;
let same_status = (self.funding == funding) && (self.spending == spending);
self.funding = funding;
self.spending = spending;
self.tip = index.chain().tip();
Ok(!same_status)
}
/// Return true iff current tip became unconfirmed
fn is_reorg(&self, chain: &Chain) -> bool {
chain.get_block_height(&self.tip).is_none()
}
fn sync_funding(
&self,
index: &Index,
daemon: &Daemon,
mempool: &Mempool,
) -> Result<Option<Height>> {
let chain = index.chain();
if !self.is_reorg(chain) {
if let Some(Height::Confirmed { .. }) = &self.funding {
return Ok(self.funding);
}
}
let mut confirmed = None;
daemon.for_blocks(
index.filter_by_txid(self.outpoint.txid),
|blockhash, block| {
if confirmed.is_none() {
for tx in block.txdata {
let txid = tx.txid();
let output_len = u32::try_from(tx.output.len()).unwrap();
if self.outpoint.txid == txid && self.outpoint.vout < output_len {
confirmed = Some(Height::from_blockhash(blockhash, chain));
return;
}
}
}
},
)?;
Ok(confirmed.or_else(|| mempool.get(&self.outpoint.txid).map(Height::unconfirmed)))
}
fn sync_spending(
&self,
index: &Index,
daemon: &Daemon,
mempool: &Mempool,
) -> Result<Option<(Txid, Height)>> {
let chain = index.chain();
if !self.is_reorg(chain) {
if let Some((_, Height::Confirmed { .. })) = &self.spending {
return Ok(self.spending);
}
}
let spending_blockhashes = index.filter_by_spending(self.outpoint);
let mut confirmed = None;
daemon.for_blocks(spending_blockhashes, |blockhash, block| {
for tx in block.txdata {
for txi in &tx.input {
if txi.previous_output == self.outpoint {
// TODO: there should be only one spending input
assert!(confirmed.is_none(), "double spend of {}", self.outpoint);
confirmed = Some((tx.txid(), Height::from_blockhash(blockhash, chain)));
return;
}
}
}
})?;
Ok(confirmed.or_else(|| {
let entries = mempool.filter_by_spending(&self.outpoint);
assert!(entries.len() <= 1, "double spend of {}", self.outpoint);
entries
.first()
.map(|entry| (entry.txid, Height::unconfirmed(entry)))
}))
}
}
#[cfg(test)]
mod tests {
use super::HistoryEntry;

View File

@ -11,7 +11,7 @@ use crate::{
mempool::{FeeHistogram, Mempool},
metrics::Metrics,
signals::ExitFlag,
status::{Balance, ScriptHashStatus, UnspentEntry},
status::{Balance, OutPointStatus, ScriptHashStatus, UnspentEntry},
};
/// Electrum protocol subscriptions' tracker
@ -114,4 +114,12 @@ impl Tracker {
})?;
Ok(result)
}
pub(crate) fn update_outpoint_status(
&self,
status: &mut OutPointStatus,
daemon: &Daemon,
) -> Result<bool> {
status.sync(&self.index, &self.mempool, daemon)
}
}