diff --git a/src/daemon.rs b/src/daemon.rs index f39c2ce..970580e 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -5,6 +5,7 @@ use bitcoin::{Amount, BlockHash, Transaction, Txid}; use bitcoincore_rpc::{json, jsonrpc, Auth, Client, RpcApi}; use crossbeam_channel::Receiver; use parking_lot::Mutex; +use serde::Serialize; use serde_json::{json, Value}; use std::fs::File; @@ -223,25 +224,16 @@ impl Daemon { pub(crate) fn get_mempool_entries( &self, txids: &[Txid], - ) -> Result>> { - let client = self.rpc.get_jsonrpc_client(); - debug!("getting {} mempool entries", txids.len()); - let args: Vec<_> = txids - .iter() - .map(|txid| vec![serde_json::value::to_raw_value(txid).unwrap()]) - .collect(); - let reqs: Vec<_> = args - .iter() - .map(|a| client.build_request("getmempoolentry", a)) - .collect(); - let res = client.send_batch(&reqs).context("batch request failed")?; - debug!("got {} mempool entries", res.len()); - Ok(res + ) -> Result>> { + let results = batch_request(self.rpc.get_jsonrpc_client(), "getmempoolentry", txids)?; + Ok(results .into_iter() - .map(|r| { - r.context("missing response")? - .result::() - .context("invalid response") + .map(|r| match r?.result::() { + Ok(entry) => Some(entry), + Err(err) => { + debug!("failed to get mempool entry: {}", err); // probably due to RBF + None + } }) .collect()) } @@ -249,28 +241,32 @@ impl Daemon { pub(crate) fn get_mempool_transactions( &self, txids: &[Txid], - ) -> Result>> { - let client = self.rpc.get_jsonrpc_client(); - debug!("getting {} transactions", txids.len()); - let args: Vec<_> = txids - .iter() - .map(|txid| vec![serde_json::value::to_raw_value(txid).unwrap()]) - .collect(); - let reqs: Vec<_> = args - .iter() - .map(|a| client.build_request("getrawtransaction", a)) - .collect(); - let res = client.send_batch(&reqs).context("batch request failed")?; - debug!("got {} mempool transactions", res.len()); - Ok(res + ) -> Result>> { + let results = batch_request(self.rpc.get_jsonrpc_client(), "getrawtransaction", txids)?; + Ok(results .into_iter() - .map(|r| -> Result { - let tx_hex = r - .context("missing response")? - .result::() - .context("invalid response")?; - let tx_bytes = Vec::from_hex(&tx_hex).context("non-hex transaction")?; - deserialize(&tx_bytes).context("invalid transaction") + .map(|r| -> Option { + let tx_hex = match r?.result::() { + Ok(tx_hex) => Some(tx_hex), + Err(err) => { + debug!("failed to get mempool tx: {}", err); // probably due to RBF + None + } + }?; + let tx_bytes = match Vec::from_hex(&tx_hex) { + Ok(tx_bytes) => Some(tx_bytes), + Err(err) => { + warn!("got non-hex transaction {}: {}", tx_hex, err); + None + } + }?; + match deserialize(&tx_bytes) { + Ok(tx) => Some(tx), + Err(err) => { + warn!("got invalid tx {}: {}", tx_hex, err); + None + } + } }) .collect()) } @@ -303,3 +299,29 @@ pub(crate) fn extract_bitcoind_error(err: &bitcoincore_rpc::Error) -> Option<&Rp _ => None, } } + +fn batch_request( + client: &jsonrpc::Client, + name: &str, + items: &[T], +) -> Result>> +where + T: Serialize, +{ + debug!("calling {} on {} items", name, items.len()); + let args: Vec<_> = items + .iter() + .map(|item| vec![serde_json::value::to_raw_value(item).unwrap()]) + .collect(); + let reqs: Vec<_> = args + .iter() + .map(|arg| client.build_request(name, arg)) + .collect(); + match client.send_batch(&reqs) { + Ok(values) => { + assert_eq!(items.len(), values.len()); + Ok(values) + } + Err(err) => bail!("batch {} request failed: {}", name, err), + } +} diff --git a/src/mempool.rs b/src/mempool.rs index 1108a81..b4a240f 100644 --- a/src/mempool.rs +++ b/src/mempool.rs @@ -82,8 +82,20 @@ impl MempoolSyncUpdate { .iter() .zip(entries.into_iter().zip(txs.into_iter())) .filter_map(|(txid, (entry, tx))| { - let tx = tx.ok()?; - let entry = entry.ok()?; + let entry = match entry { + Some(entry) => entry, + None => { + warn!("missing mempool entry: {}", txid); + return None; + } + }; + let tx = match tx { + Some(tx) => tx, + None => { + warn!("missing mempool tx: {}", txid); + return None; + } + }; Some(Entry { txid: *txid, tx,