mirror of
https://github.com/romanz/electrs.git
synced 2024-11-19 01:43:29 +01:00
Don't fail mempool sync on missing transactions (#997)
Otherwise, RBF may cause current sync implementation to fail.
This commit is contained in:
parent
c6bea47685
commit
576b5b2bb6
100
src/daemon.rs
100
src/daemon.rs
@ -5,6 +5,7 @@ use bitcoin::{Amount, BlockHash, Transaction, Txid};
|
||||
use bitcoincore_rpc::{json, jsonrpc, Auth, Client, RpcApi};
|
||||
use crossbeam_channel::Receiver;
|
||||
use parking_lot::Mutex;
|
||||
use serde::Serialize;
|
||||
use serde_json::{json, Value};
|
||||
|
||||
use std::fs::File;
|
||||
@ -223,25 +224,16 @@ impl Daemon {
|
||||
pub(crate) fn get_mempool_entries(
|
||||
&self,
|
||||
txids: &[Txid],
|
||||
) -> Result<Vec<Result<json::GetMempoolEntryResult>>> {
|
||||
let client = self.rpc.get_jsonrpc_client();
|
||||
debug!("getting {} mempool entries", txids.len());
|
||||
let args: Vec<_> = txids
|
||||
.iter()
|
||||
.map(|txid| vec![serde_json::value::to_raw_value(txid).unwrap()])
|
||||
.collect();
|
||||
let reqs: Vec<_> = args
|
||||
.iter()
|
||||
.map(|a| client.build_request("getmempoolentry", a))
|
||||
.collect();
|
||||
let res = client.send_batch(&reqs).context("batch request failed")?;
|
||||
debug!("got {} mempool entries", res.len());
|
||||
Ok(res
|
||||
) -> Result<Vec<Option<json::GetMempoolEntryResult>>> {
|
||||
let results = batch_request(self.rpc.get_jsonrpc_client(), "getmempoolentry", txids)?;
|
||||
Ok(results
|
||||
.into_iter()
|
||||
.map(|r| {
|
||||
r.context("missing response")?
|
||||
.result::<json::GetMempoolEntryResult>()
|
||||
.context("invalid response")
|
||||
.map(|r| match r?.result::<json::GetMempoolEntryResult>() {
|
||||
Ok(entry) => Some(entry),
|
||||
Err(err) => {
|
||||
debug!("failed to get mempool entry: {}", err); // probably due to RBF
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
@ -249,28 +241,32 @@ impl Daemon {
|
||||
pub(crate) fn get_mempool_transactions(
|
||||
&self,
|
||||
txids: &[Txid],
|
||||
) -> Result<Vec<Result<Transaction>>> {
|
||||
let client = self.rpc.get_jsonrpc_client();
|
||||
debug!("getting {} transactions", txids.len());
|
||||
let args: Vec<_> = txids
|
||||
.iter()
|
||||
.map(|txid| vec![serde_json::value::to_raw_value(txid).unwrap()])
|
||||
.collect();
|
||||
let reqs: Vec<_> = args
|
||||
.iter()
|
||||
.map(|a| client.build_request("getrawtransaction", a))
|
||||
.collect();
|
||||
let res = client.send_batch(&reqs).context("batch request failed")?;
|
||||
debug!("got {} mempool transactions", res.len());
|
||||
Ok(res
|
||||
) -> Result<Vec<Option<Transaction>>> {
|
||||
let results = batch_request(self.rpc.get_jsonrpc_client(), "getrawtransaction", txids)?;
|
||||
Ok(results
|
||||
.into_iter()
|
||||
.map(|r| -> Result<Transaction> {
|
||||
let tx_hex = r
|
||||
.context("missing response")?
|
||||
.result::<String>()
|
||||
.context("invalid response")?;
|
||||
let tx_bytes = Vec::from_hex(&tx_hex).context("non-hex transaction")?;
|
||||
deserialize(&tx_bytes).context("invalid transaction")
|
||||
.map(|r| -> Option<Transaction> {
|
||||
let tx_hex = match r?.result::<String>() {
|
||||
Ok(tx_hex) => Some(tx_hex),
|
||||
Err(err) => {
|
||||
debug!("failed to get mempool tx: {}", err); // probably due to RBF
|
||||
None
|
||||
}
|
||||
}?;
|
||||
let tx_bytes = match Vec::from_hex(&tx_hex) {
|
||||
Ok(tx_bytes) => Some(tx_bytes),
|
||||
Err(err) => {
|
||||
warn!("got non-hex transaction {}: {}", tx_hex, err);
|
||||
None
|
||||
}
|
||||
}?;
|
||||
match deserialize(&tx_bytes) {
|
||||
Ok(tx) => Some(tx),
|
||||
Err(err) => {
|
||||
warn!("got invalid tx {}: {}", tx_hex, err);
|
||||
None
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
@ -303,3 +299,29 @@ pub(crate) fn extract_bitcoind_error(err: &bitcoincore_rpc::Error) -> Option<&Rp
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn batch_request<T>(
|
||||
client: &jsonrpc::Client,
|
||||
name: &str,
|
||||
items: &[T],
|
||||
) -> Result<Vec<Option<jsonrpc::Response>>>
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
debug!("calling {} on {} items", name, items.len());
|
||||
let args: Vec<_> = items
|
||||
.iter()
|
||||
.map(|item| vec![serde_json::value::to_raw_value(item).unwrap()])
|
||||
.collect();
|
||||
let reqs: Vec<_> = args
|
||||
.iter()
|
||||
.map(|arg| client.build_request(name, arg))
|
||||
.collect();
|
||||
match client.send_batch(&reqs) {
|
||||
Ok(values) => {
|
||||
assert_eq!(items.len(), values.len());
|
||||
Ok(values)
|
||||
}
|
||||
Err(err) => bail!("batch {} request failed: {}", name, err),
|
||||
}
|
||||
}
|
||||
|
@ -82,8 +82,20 @@ impl MempoolSyncUpdate {
|
||||
.iter()
|
||||
.zip(entries.into_iter().zip(txs.into_iter()))
|
||||
.filter_map(|(txid, (entry, tx))| {
|
||||
let tx = tx.ok()?;
|
||||
let entry = entry.ok()?;
|
||||
let entry = match entry {
|
||||
Some(entry) => entry,
|
||||
None => {
|
||||
warn!("missing mempool entry: {}", txid);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let tx = match tx {
|
||||
Some(tx) => tx,
|
||||
None => {
|
||||
warn!("missing mempool tx: {}", txid);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
Some(Entry {
|
||||
txid: *txid,
|
||||
tx,
|
||||
|
Loading…
Reference in New Issue
Block a user