mirror of
https://github.com/romanz/electrs.git
synced 2024-11-19 09:54:09 +01:00
Use JSONRPC error codes
See https://www.jsonrpc.org/specification#error_object
This commit is contained in:
parent
0d44ec67e2
commit
f03c1d0938
@ -91,10 +91,6 @@ enum PollResult {
|
||||
}
|
||||
|
||||
fn rpc_poll(client: &mut bitcoincore_rpc::Client) -> PollResult {
|
||||
use bitcoincore_rpc::{
|
||||
jsonrpc::error::Error::Rpc as ServerError, Error::JsonRpc as JsonRpcError,
|
||||
};
|
||||
|
||||
match client.get_blockchain_info() {
|
||||
Ok(info) => {
|
||||
let left_blocks = info.headers - info.blocks;
|
||||
@ -113,7 +109,7 @@ fn rpc_poll(client: &mut bitcoincore_rpc::Client) -> PollResult {
|
||||
PollResult::Done(Ok(()))
|
||||
}
|
||||
Err(err) => {
|
||||
if let JsonRpcError(ServerError(ref e)) = err {
|
||||
if let Some(e) = extract_bitcoind_error(&err) {
|
||||
if e.code == -28 {
|
||||
info!("waiting for RPC warmup: {}", e.message);
|
||||
return PollResult::Retry;
|
||||
@ -317,3 +313,15 @@ fn build_version_message() -> NetworkMessage {
|
||||
relay: false,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) type RpcError = bitcoincore_rpc::jsonrpc::error::RpcError;
|
||||
|
||||
pub(crate) fn extract_bitcoind_error(err: &bitcoincore_rpc::Error) -> Option<&RpcError> {
|
||||
use bitcoincore_rpc::{
|
||||
jsonrpc::error::Error::Rpc as ServerError, Error::JsonRpc as JsonRpcError,
|
||||
};
|
||||
match err {
|
||||
JsonRpcError(ServerError(e)) => Some(e),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
259
src/electrum.rs
259
src/electrum.rs
@ -5,15 +5,21 @@ use bitcoin::{
|
||||
BlockHash, Txid,
|
||||
};
|
||||
use rayon::prelude::*;
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
use serde_json::{from_value, json, Value};
|
||||
use serde_derive::Deserialize;
|
||||
use serde_json::{self, json, Value};
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::iter::FromIterator;
|
||||
|
||||
use crate::{
|
||||
cache::Cache, config::Config, daemon::Daemon, merkle::Proof, metrics::Histogram,
|
||||
status::Status, tracker::Tracker, types::ScriptHash,
|
||||
cache::Cache,
|
||||
config::Config,
|
||||
daemon::{self, extract_bitcoind_error, Daemon},
|
||||
merkle::Proof,
|
||||
metrics::Histogram,
|
||||
status::Status,
|
||||
tracker::Tracker,
|
||||
types::ScriptHash,
|
||||
};
|
||||
|
||||
const ELECTRS_VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
@ -28,10 +34,9 @@ pub struct Client {
|
||||
status: HashMap<ScriptHash, Status>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
#[derive(Deserialize)]
|
||||
struct Request {
|
||||
id: Value,
|
||||
jsonrpc: String,
|
||||
method: String,
|
||||
|
||||
#[serde(default)]
|
||||
@ -68,6 +73,42 @@ impl From<TxGetArgs> for (Txid, bool) {
|
||||
}
|
||||
}
|
||||
|
||||
enum StandardError {
|
||||
ParseError,
|
||||
InvalidRequest,
|
||||
MethodNotFound,
|
||||
InvalidParams,
|
||||
}
|
||||
|
||||
enum RpcError {
|
||||
// JSON-RPC spec errors
|
||||
Standard(StandardError),
|
||||
// Electrum-specific errors
|
||||
BadRequest(anyhow::Error),
|
||||
DaemonError(daemon::RpcError),
|
||||
}
|
||||
|
||||
impl RpcError {
|
||||
fn to_value(&self) -> Value {
|
||||
match self {
|
||||
RpcError::Standard(err) => match err {
|
||||
StandardError::ParseError => json!({"code": -32700, "message": "parse error"}),
|
||||
StandardError::InvalidRequest => {
|
||||
json!({"code": -32600, "message": "invalid request"})
|
||||
}
|
||||
StandardError::MethodNotFound => {
|
||||
json!({"code": -32601, "message": "method not found"})
|
||||
}
|
||||
StandardError::InvalidParams => {
|
||||
json!({"code": -32602, "message": "invalid params"})
|
||||
}
|
||||
},
|
||||
RpcError::BadRequest(err) => json!({"code": 1, "message": err.to_string()}),
|
||||
RpcError::DaemonError(err) => json!({"code": 2, "message": err.message}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Electrum RPC handler
|
||||
pub struct Rpc {
|
||||
tracker: Tracker,
|
||||
@ -96,7 +137,7 @@ impl Rpc {
|
||||
self.tracker.sync(&self.daemon)
|
||||
}
|
||||
|
||||
pub fn update_client(&self, client: &mut Client) -> Result<Vec<Value>> {
|
||||
pub fn update_client(&self, client: &mut Client) -> Result<Vec<String>> {
|
||||
let chain = self.tracker.chain();
|
||||
let mut notifications = client
|
||||
.status
|
||||
@ -129,67 +170,7 @@ impl Rpc {
|
||||
));
|
||||
}
|
||||
}
|
||||
Ok(notifications)
|
||||
}
|
||||
|
||||
pub fn handle_request(&self, client: &mut Client, value: Value) -> Result<Value> {
|
||||
let requests: Requests = from_value(value).context("invalid request")?;
|
||||
match requests {
|
||||
Requests::Single(request) => self.handle_single_request(client, request),
|
||||
Requests::Batch(requests) => requests
|
||||
.into_iter()
|
||||
.map(|request| self.handle_single_request(client, request))
|
||||
.collect::<Result<Vec<_>>>()
|
||||
.map(|results| json!(results)),
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_single_request(&self, client: &mut Client, request: Request) -> Result<Value> {
|
||||
let Request {
|
||||
id,
|
||||
jsonrpc,
|
||||
method,
|
||||
params,
|
||||
} = request;
|
||||
self.rpc_duration.observe_duration(&method, || {
|
||||
let result = match method.as_str() {
|
||||
"blockchain.scripthash.get_history" => {
|
||||
self.scripthash_get_history(client, from_value(params)?)
|
||||
}
|
||||
"blockchain.scripthash.subscribe" => {
|
||||
self.scripthash_subscribe(client, from_value(params)?)
|
||||
}
|
||||
"blockchain.transaction.broadcast" => {
|
||||
self.transaction_broadcast(from_value(params)?)
|
||||
}
|
||||
"blockchain.transaction.get" => self.transaction_get(from_value(params)?),
|
||||
"blockchain.transaction.get_merkle" => {
|
||||
self.transaction_get_merkle(from_value(params)?)
|
||||
}
|
||||
"server.banner" => Ok(json!(self.banner)),
|
||||
"server.donation_address" => Ok(Value::Null),
|
||||
"server.peers.subscribe" => Ok(json!([])),
|
||||
"blockchain.block.header" => self.block_header(from_value(params)?),
|
||||
"blockchain.block.headers" => self.block_headers(from_value(params)?),
|
||||
"blockchain.estimatefee" => self.estimate_fee(from_value(params)?),
|
||||
"blockchain.headers.subscribe" => self.headers_subscribe(client),
|
||||
"blockchain.relayfee" => self.relayfee(),
|
||||
"mempool.get_fee_histogram" => self.get_fee_histogram(),
|
||||
"server.ping" => Ok(Value::Null),
|
||||
"server.version" => self.version(from_value(params)?),
|
||||
&_ => bail!("unknown method '{}' with {}", method, params,),
|
||||
};
|
||||
|
||||
Ok(match result {
|
||||
Ok(value) => json!({"jsonrpc": jsonrpc, "id": id, "result": value}),
|
||||
Err(err) => {
|
||||
let msg = format!("RPC failed: {:#}", err);
|
||||
warn!("{}", msg);
|
||||
let error = json!({"code": 1, "message": msg});
|
||||
json!({"jsonrpc": jsonrpc, "id": id, "error": error})
|
||||
}
|
||||
})
|
||||
})
|
||||
Ok(notifications.into_iter().map(|v| v.to_string()).collect())
|
||||
}
|
||||
|
||||
fn headers_subscribe(&self, client: &mut Client) -> Result<Value> {
|
||||
@ -319,7 +300,10 @@ impl Rpc {
|
||||
|
||||
fn version(&self, (client_id, client_version): (String, Version)) -> Result<Value> {
|
||||
match client_version {
|
||||
Version::Single(v) if v == PROTOCOL_VERSION => (),
|
||||
Version::Single(v) if v == PROTOCOL_VERSION => {
|
||||
let server_id = format!("electrs/{}", ELECTRS_VERSION);
|
||||
Ok(json!([server_id, PROTOCOL_VERSION]))
|
||||
}
|
||||
_ => {
|
||||
bail!(
|
||||
"{} requested {:?}, server supports {}",
|
||||
@ -328,12 +312,143 @@ impl Rpc {
|
||||
PROTOCOL_VERSION
|
||||
);
|
||||
}
|
||||
};
|
||||
let server_id = format!("electrs/{}", ELECTRS_VERSION);
|
||||
Ok(json!([server_id, PROTOCOL_VERSION]))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_request(&self, client: &mut Client, line: &str) -> String {
|
||||
let error_msg_no_id = |err| error_msg(Value::Null, RpcError::Standard(err));
|
||||
let response: Value = match serde_json::from_str(line) {
|
||||
// parse JSON from str
|
||||
Ok(value) => match serde_json::from_value(value) {
|
||||
// parse RPC from JSON
|
||||
Ok(requests) => match requests {
|
||||
Requests::Single(request) => self.call(client, request),
|
||||
Requests::Batch(requests) => json!(requests
|
||||
.into_iter()
|
||||
.map(|request| self.call(client, request))
|
||||
.collect::<Vec<Value>>()),
|
||||
},
|
||||
Err(err) => {
|
||||
warn!("invalid RPC request ({:?}): {}", line, err);
|
||||
error_msg_no_id(StandardError::InvalidRequest)
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
warn!("invalid JSON ({:?}): {}", line, err);
|
||||
error_msg_no_id(StandardError::ParseError)
|
||||
}
|
||||
};
|
||||
response.to_string()
|
||||
}
|
||||
|
||||
fn call(&self, client: &mut Client, request: Request) -> Value {
|
||||
let Request { id, method, params } = request;
|
||||
let call = match Call::parse(&method, params) {
|
||||
Ok(call) => call,
|
||||
Err(err) => return error_msg(id, RpcError::Standard(err)),
|
||||
};
|
||||
self.rpc_duration.observe_duration(&method, || {
|
||||
let result = match call {
|
||||
Call::Banner => Ok(json!(self.banner)),
|
||||
Call::BlockHeader(args) => self.block_header(args),
|
||||
Call::BlockHeaders(args) => self.block_headers(args),
|
||||
Call::Donation => Ok(Value::Null),
|
||||
Call::EstimateFee(args) => self.estimate_fee(args),
|
||||
Call::HeadersSubscribe => self.headers_subscribe(client),
|
||||
Call::MempoolFeeHistogram => self.get_fee_histogram(),
|
||||
Call::PeersSubscribe => Ok(json!([])),
|
||||
Call::Ping => Ok(Value::Null),
|
||||
Call::RelayFee => self.relayfee(),
|
||||
Call::ScriptHashGetHistory(args) => self.scripthash_get_history(client, args),
|
||||
Call::ScriptHashSubscribe(args) => self.scripthash_subscribe(client, args),
|
||||
Call::TransactionBroadcast(args) => self.transaction_broadcast(args),
|
||||
Call::TransactionGet(args) => self.transaction_get(args),
|
||||
Call::TransactionGetMerkle(args) => self.transaction_get_merkle(args),
|
||||
Call::Version(args) => self.version(args),
|
||||
};
|
||||
match result {
|
||||
Ok(value) => result_msg(id, value),
|
||||
Err(err) => {
|
||||
warn!("RPC {} failed: {:#}", method, err);
|
||||
match err
|
||||
.downcast_ref::<bitcoincore_rpc::Error>()
|
||||
.and_then(extract_bitcoind_error)
|
||||
{
|
||||
Some(e) => error_msg(id, RpcError::DaemonError(e.clone())),
|
||||
None => error_msg(id, RpcError::BadRequest(err)),
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
enum Call {
|
||||
Banner,
|
||||
BlockHeader((usize,)),
|
||||
BlockHeaders((usize, usize)),
|
||||
TransactionBroadcast((String,)),
|
||||
Donation,
|
||||
EstimateFee((u16,)),
|
||||
HeadersSubscribe,
|
||||
MempoolFeeHistogram,
|
||||
PeersSubscribe,
|
||||
Ping,
|
||||
RelayFee,
|
||||
ScriptHashGetHistory((ScriptHash,)),
|
||||
ScriptHashSubscribe((ScriptHash,)),
|
||||
TransactionGet(TxGetArgs),
|
||||
TransactionGetMerkle((Txid, usize)),
|
||||
Version((String, Version)),
|
||||
}
|
||||
|
||||
impl Call {
|
||||
fn parse(method: &str, params: Value) -> std::result::Result<Call, StandardError> {
|
||||
Ok(match method {
|
||||
"blockchain.block.header" => Call::BlockHeader(convert(params)?),
|
||||
"blockchain.block.headers" => Call::BlockHeaders(convert(params)?),
|
||||
"blockchain.estimatefee" => Call::EstimateFee(convert(params)?),
|
||||
"blockchain.headers.subscribe" => Call::HeadersSubscribe,
|
||||
"blockchain.relayfee" => Call::RelayFee,
|
||||
"blockchain.scripthash.get_history" => Call::ScriptHashGetHistory(convert(params)?),
|
||||
"blockchain.scripthash.subscribe" => Call::ScriptHashSubscribe(convert(params)?),
|
||||
"blockchain.transaction.broadcast" => Call::TransactionBroadcast(convert(params)?),
|
||||
"blockchain.transaction.get" => Call::TransactionGet(convert(params)?),
|
||||
"blockchain.transaction.get_merkle" => Call::TransactionGetMerkle(convert(params)?),
|
||||
"mempool.get_fee_histogram" => Call::MempoolFeeHistogram,
|
||||
"server.banner" => Call::Banner,
|
||||
"server.donation_address" => Call::Donation,
|
||||
"server.peers.subscribe" => Call::PeersSubscribe,
|
||||
"server.ping" => Call::Ping,
|
||||
"server.version" => Call::Version(convert(params)?),
|
||||
_ => {
|
||||
warn!("unknown method {}", method);
|
||||
return Err(StandardError::MethodNotFound);
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn convert<T>(params: Value) -> std::result::Result<T, StandardError>
|
||||
where
|
||||
T: serde::de::DeserializeOwned,
|
||||
{
|
||||
let params_str = params.to_string();
|
||||
serde_json::from_value(params).map_err(|err| {
|
||||
warn!("invalid params {}: {}", params_str, err);
|
||||
StandardError::InvalidParams
|
||||
})
|
||||
}
|
||||
|
||||
fn notification(method: &str, params: &[Value]) -> Value {
|
||||
json!({"jsonrpc": "2.0", "method": method, "params": params})
|
||||
}
|
||||
|
||||
fn result_msg(id: Value, result: Value) -> Value {
|
||||
json!({"jsonrpc": "2.0", "id": id, "result": result})
|
||||
}
|
||||
|
||||
fn error_msg(id: Value, error: RpcError) -> Value {
|
||||
json!({"jsonrpc": "2.0", "id": id, "error": error.to_value()})
|
||||
}
|
||||
|
@ -3,7 +3,6 @@ use bitcoin::BlockHash;
|
||||
use bitcoincore_rpc::RpcApi;
|
||||
use crossbeam_channel::{bounded, select, unbounded, Receiver, Sender};
|
||||
use rayon::prelude::*;
|
||||
use serde_json::{de::from_str, Value};
|
||||
|
||||
use std::{
|
||||
collections::hash_map::HashMap,
|
||||
@ -46,6 +45,17 @@ impl Peer {
|
||||
Self { id, client, stream }
|
||||
}
|
||||
|
||||
fn send(&mut self, values: Vec<String>) -> Result<()> {
|
||||
for mut value in values {
|
||||
debug!("{}: send {}", self.id, value);
|
||||
value += "\n";
|
||||
self.stream
|
||||
.write_all(value.as_bytes())
|
||||
.with_context(|| format!("failed to send response: {:?}", value))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn disconnect(self) {
|
||||
let _ = self.stream.shutdown(Shutdown::Both);
|
||||
}
|
||||
@ -123,7 +133,8 @@ fn notify_peer(rpc: &Rpc, peer: &mut Peer) -> Result<()> {
|
||||
let notifications = rpc
|
||||
.update_client(&mut peer.client)
|
||||
.context("failed to generate notifications")?;
|
||||
send_to_peer(peer, ¬ifications).context("failed to send notifications")
|
||||
peer.send(notifications)
|
||||
.context("failed to send notifications")
|
||||
}
|
||||
|
||||
struct Event {
|
||||
@ -146,7 +157,7 @@ fn handle_event(rpc: &Rpc, peers: &mut HashMap<usize, Peer>, event: Event) {
|
||||
}
|
||||
Message::Request(line) => {
|
||||
let result = match peers.get_mut(&peer_id) {
|
||||
Some(peer) => handle_request(rpc, peer, line),
|
||||
Some(peer) => handle_request(rpc, peer, &line),
|
||||
None => return, // unknown peer
|
||||
};
|
||||
if let Err(e) = result {
|
||||
@ -161,24 +172,9 @@ fn handle_event(rpc: &Rpc, peers: &mut HashMap<usize, Peer>, event: Event) {
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_request(rpc: &Rpc, peer: &mut Peer, line: String) -> Result<()> {
|
||||
let request: Value = from_str(&line).with_context(|| format!("invalid request: {}", line))?;
|
||||
let response: Value = rpc
|
||||
.handle_request(&mut peer.client, request)
|
||||
.with_context(|| format!("failed to handle request: {}", line))?;
|
||||
send_to_peer(peer, &[response])
|
||||
}
|
||||
|
||||
fn send_to_peer(peer: &mut Peer, values: &[Value]) -> Result<()> {
|
||||
for value in values {
|
||||
let mut response = value.to_string();
|
||||
debug!("{}: send {}", peer.id, response);
|
||||
response += "\n";
|
||||
peer.stream
|
||||
.write_all(response.as_bytes())
|
||||
.with_context(|| format!("failed to send response: {}", response))?;
|
||||
}
|
||||
Ok(())
|
||||
fn handle_request(rpc: &Rpc, peer: &mut Peer, line: &str) -> Result<()> {
|
||||
let response = rpc.handle_request(&mut peer.client, &line);
|
||||
peer.send(vec![response])
|
||||
}
|
||||
|
||||
fn accept_loop(listener: TcpListener, server_tx: Sender<Event>) -> Result<()> {
|
||||
|
Loading…
Reference in New Issue
Block a user