Merge pull request #774 from jkczyz/2021-01-http-client

HTTP-based block source clients
This commit is contained in:
Matt Corallo 2021-02-04 09:16:14 -08:00 committed by GitHub
commit b67ec5a273
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 1789 additions and 1 deletions

View file

@ -13,7 +13,7 @@ jobs:
1.30.0,
# 1.34.2 is Debian stable
1.34.2,
# 1.45.2 is MSRV for lightning-net-tokio and generates coverage
# 1.45.2 is MSRV for lightning-net-tokio, lightning-block-sync, and coverage generation
1.45.2]
include:
- toolchain: stable
@ -48,6 +48,24 @@ jobs:
- name: Build on Rust ${{ matrix.toolchain }}
if: "! matrix.build-net-tokio"
run: cargo build --verbose --color always -p lightning
- name: Build Block Sync Clients on Rust ${{ matrix.toolchain }} with features
if: "matrix.build-net-tokio && !matrix.coverage"
run: |
cd lightning-block-sync
cargo build --verbose --color always --features rest-client
cargo build --verbose --color always --features rpc-client
cargo build --verbose --color always --features rpc-client,rest-client
cargo build --verbose --color always --features rpc-client,rest-client,tokio
cd ..
- name: Build Block Sync Clients on Rust ${{ matrix.toolchain }} with features and full code-linking for coverage generation
if: matrix.coverage
run: |
cd lightning-block-sync
RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rest-client
RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client
RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client,rest-client
RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client,rest-client,tokio
cd ..
- name: Test on Rust ${{ matrix.toolchain }} with net-tokio
if: "matrix.build-net-tokio && !matrix.coverage"
run: cargo test --verbose --color always
@ -57,6 +75,24 @@ jobs:
- name: Test on Rust ${{ matrix.toolchain }}
if: "! matrix.build-net-tokio"
run: cargo test --verbose --color always -p lightning
- name: Test Block Sync Clients on Rust ${{ matrix.toolchain }} with features
if: "matrix.build-net-tokio && !matrix.coverage"
run: |
cd lightning-block-sync
cargo test --verbose --color always --features rest-client
cargo test --verbose --color always --features rpc-client
cargo test --verbose --color always --features rpc-client,rest-client
cargo test --verbose --color always --features rpc-client,rest-client,tokio
cd ..
- name: Test Block Sync Clients on Rust ${{ matrix.toolchain }} with features and full code-linking for coverage generation
if: matrix.coverage
run: |
cd lightning-block-sync
RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rest-client
RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rpc-client
RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rpc-client,rest-client
RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rpc-client,rest-client,tokio
cd ..
- name: Install deps for kcov
if: matrix.coverage
run: |

View file

@ -2,6 +2,7 @@
members = [
"lightning",
"lightning-block-sync",
"lightning-net-tokio",
"lightning-persister",
]

View file

@ -0,0 +1,25 @@
[package]
name = "lightning-block-sync"
version = "0.0.1"
authors = ["Jeffrey Czyz", "Matt Corallo"]
license = "Apache-2.0"
edition = "2018"
description = """
Utilities to fetch the chain data from a block source and feed them into Rust Lightning.
"""
[features]
rest-client = [ "serde", "serde_json", "chunked_transfer" ]
rpc-client = [ "serde", "serde_json", "chunked_transfer" ]
[dependencies]
bitcoin = "0.24"
lightning = { version = "0.0.12", path = "../lightning" }
tokio = { version = "1.0", features = [ "io-util", "net" ], optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
serde_json = { version = "1.0", optional = true }
chunked_transfer = { version = "1.4", optional = true }
futures = { version = "0.3" }
[dev-dependencies]
tokio = { version = "1.0", features = [ "macros", "rt" ] }

View file

@ -0,0 +1,472 @@
use crate::{BlockHeaderData, BlockSourceError};
use crate::http::{BinaryResponse, JsonResponse};
use crate::utils::hex_to_uint256;
use bitcoin::blockdata::block::{Block, BlockHeader};
use bitcoin::consensus::encode;
use bitcoin::hash_types::{BlockHash, TxMerkleNode};
use bitcoin::hashes::hex::{ToHex, FromHex};
use serde::Deserialize;
use serde_json;
use std::convert::From;
use std::convert::TryFrom;
use std::convert::TryInto;
/// Conversion from `std::io::Error` into `BlockSourceError`.
impl From<std::io::Error> for BlockSourceError {
fn from(e: std::io::Error) -> BlockSourceError {
match e.kind() {
std::io::ErrorKind::InvalidData => BlockSourceError::persistent(e),
std::io::ErrorKind::InvalidInput => BlockSourceError::persistent(e),
_ => BlockSourceError::transient(e),
}
}
}
/// Parses binary data as a block.
impl TryInto<Block> for BinaryResponse {
type Error = std::io::Error;
fn try_into(self) -> std::io::Result<Block> {
match encode::deserialize(&self.0) {
Err(_) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid block data")),
Ok(block) => Ok(block),
}
}
}
/// Converts a JSON value into block header data. The JSON value may be an object representing a
/// block header or an array of such objects. In the latter case, the first object is converted.
impl TryInto<BlockHeaderData> for JsonResponse {
type Error = std::io::Error;
fn try_into(self) -> std::io::Result<BlockHeaderData> {
let mut header = match self.0 {
serde_json::Value::Array(mut array) if !array.is_empty() => array.drain(..).next().unwrap(),
serde_json::Value::Object(_) => self.0,
_ => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "unexpected JSON type")),
};
if !header.is_object() {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON object"));
}
// Add an empty previousblockhash for the genesis block.
if let None = header.get("previousblockhash") {
let hash: BlockHash = Default::default();
header.as_object_mut().unwrap().insert("previousblockhash".to_string(), serde_json::json!(hash.to_hex()));
}
match serde_json::from_value::<GetHeaderResponse>(header) {
Err(_) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid header response")),
Ok(response) => match response.try_into() {
Err(_) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid header data")),
Ok(header) => Ok(header),
},
}
}
}
/// Response data from `getblockheader` RPC and `headers` REST requests.
#[derive(Deserialize)]
struct GetHeaderResponse {
pub version: i32,
pub merkleroot: String,
pub time: u32,
pub nonce: u32,
pub bits: String,
pub previousblockhash: String,
pub chainwork: String,
pub height: u32,
}
/// Converts from `GetHeaderResponse` to `BlockHeaderData`.
impl TryFrom<GetHeaderResponse> for BlockHeaderData {
type Error = bitcoin::hashes::hex::Error;
fn try_from(response: GetHeaderResponse) -> Result<Self, bitcoin::hashes::hex::Error> {
Ok(BlockHeaderData {
header: BlockHeader {
version: response.version,
prev_blockhash: BlockHash::from_hex(&response.previousblockhash)?,
merkle_root: TxMerkleNode::from_hex(&response.merkleroot)?,
time: response.time,
bits: u32::from_be_bytes(<[u8; 4]>::from_hex(&response.bits)?),
nonce: response.nonce,
},
chainwork: hex_to_uint256(&response.chainwork)?,
height: response.height,
})
}
}
/// Converts a JSON value into a block. Assumes the block is hex-encoded in a JSON string.
impl TryInto<Block> for JsonResponse {
type Error = std::io::Error;
fn try_into(self) -> std::io::Result<Block> {
match self.0.as_str() {
None => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON string")),
Some(hex_data) => match Vec::<u8>::from_hex(hex_data) {
Err(_) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid hex data")),
Ok(block_data) => match encode::deserialize(&block_data) {
Err(_) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid block data")),
Ok(block) => Ok(block),
},
},
}
}
}
/// Converts a JSON value into the best block hash and optional height.
impl TryInto<(BlockHash, Option<u32>)> for JsonResponse {
type Error = std::io::Error;
fn try_into(self) -> std::io::Result<(BlockHash, Option<u32>)> {
if !self.0.is_object() {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON object"));
}
let hash = match &self.0["bestblockhash"] {
serde_json::Value::String(hex_data) => match BlockHash::from_hex(&hex_data) {
Err(_) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid hex data")),
Ok(block_hash) => block_hash,
},
_ => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON string")),
};
let height = match &self.0["blocks"] {
serde_json::Value::Null => None,
serde_json::Value::Number(height) => match height.as_u64() {
None => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid height")),
Some(height) => match height.try_into() {
Err(_) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid height")),
Ok(height) => Some(height),
}
},
_ => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON number")),
};
Ok((hash, height))
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use bitcoin::blockdata::constants::genesis_block;
use bitcoin::consensus::encode;
use bitcoin::network::constants::Network;
/// Converts from `BlockHeaderData` into a `GetHeaderResponse` JSON value.
impl From<BlockHeaderData> for serde_json::Value {
fn from(data: BlockHeaderData) -> Self {
let BlockHeaderData { chainwork, height, header } = data;
serde_json::json!({
"chainwork": chainwork.to_string()["0x".len()..],
"height": height,
"version": header.version,
"merkleroot": header.merkle_root.to_hex(),
"time": header.time,
"nonce": header.nonce,
"bits": header.bits.to_hex(),
"previousblockhash": header.prev_blockhash.to_hex(),
})
}
}
#[test]
fn into_block_header_from_json_response_with_unexpected_type() {
let response = JsonResponse(serde_json::json!(42));
match TryInto::<BlockHeaderData>::try_into(response) {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
assert_eq!(e.get_ref().unwrap().to_string(), "unexpected JSON type");
},
Ok(_) => panic!("Expected error"),
}
}
#[test]
fn into_block_header_from_json_response_with_unexpected_header_type() {
let response = JsonResponse(serde_json::json!([42]));
match TryInto::<BlockHeaderData>::try_into(response) {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON object");
},
Ok(_) => panic!("Expected error"),
}
}
#[test]
fn into_block_header_from_json_response_with_invalid_header_response() {
let block = genesis_block(Network::Bitcoin);
let mut response = JsonResponse(BlockHeaderData {
chainwork: block.header.work(),
height: 0,
header: block.header
}.into());
response.0["chainwork"].take();
match TryInto::<BlockHeaderData>::try_into(response) {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
assert_eq!(e.get_ref().unwrap().to_string(), "invalid header response");
},
Ok(_) => panic!("Expected error"),
}
}
#[test]
fn into_block_header_from_json_response_with_invalid_header_data() {
let block = genesis_block(Network::Bitcoin);
let mut response = JsonResponse(BlockHeaderData {
chainwork: block.header.work(),
height: 0,
header: block.header
}.into());
response.0["chainwork"] = serde_json::json!("foobar");
match TryInto::<BlockHeaderData>::try_into(response) {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
assert_eq!(e.get_ref().unwrap().to_string(), "invalid header data");
},
Ok(_) => panic!("Expected error"),
}
}
#[test]
fn into_block_header_from_json_response_with_valid_header() {
let block = genesis_block(Network::Bitcoin);
let response = JsonResponse(BlockHeaderData {
chainwork: block.header.work(),
height: 0,
header: block.header
}.into());
match TryInto::<BlockHeaderData>::try_into(response) {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(data) => {
assert_eq!(data.chainwork, block.header.work());
assert_eq!(data.height, 0);
assert_eq!(data.header, block.header);
},
}
}
#[test]
fn into_block_header_from_json_response_with_valid_header_array() {
let genesis_block = genesis_block(Network::Bitcoin);
let best_block_header = BlockHeader {
prev_blockhash: genesis_block.block_hash(),
..genesis_block.header
};
let chainwork = genesis_block.header.work() + best_block_header.work();
let response = JsonResponse(serde_json::json!([
serde_json::Value::from(BlockHeaderData {
chainwork, height: 1, header: best_block_header,
}),
serde_json::Value::from(BlockHeaderData {
chainwork: genesis_block.header.work(), height: 0, header: genesis_block.header,
}),
]));
match TryInto::<BlockHeaderData>::try_into(response) {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(data) => {
assert_eq!(data.chainwork, chainwork);
assert_eq!(data.height, 1);
assert_eq!(data.header, best_block_header);
},
}
}
#[test]
fn into_block_header_from_json_response_without_previous_block_hash() {
let block = genesis_block(Network::Bitcoin);
let mut response = JsonResponse(BlockHeaderData {
chainwork: block.header.work(),
height: 0,
header: block.header
}.into());
response.0.as_object_mut().unwrap().remove("previousblockhash");
match TryInto::<BlockHeaderData>::try_into(response) {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(BlockHeaderData { chainwork: _, height: _, header }) => {
assert_eq!(header, block.header);
},
}
}
#[test]
fn into_block_from_invalid_binary_response() {
let response = BinaryResponse(b"foo".to_vec());
match TryInto::<Block>::try_into(response) {
Err(_) => {},
Ok(_) => panic!("Expected error"),
}
}
#[test]
fn into_block_from_valid_binary_response() {
let genesis_block = genesis_block(Network::Bitcoin);
let response = BinaryResponse(encode::serialize(&genesis_block));
match TryInto::<Block>::try_into(response) {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(block) => assert_eq!(block, genesis_block),
}
}
#[test]
fn into_block_from_json_response_with_unexpected_type() {
let response = JsonResponse(serde_json::json!({ "result": "foo" }));
match TryInto::<Block>::try_into(response) {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON string");
},
Ok(_) => panic!("Expected error"),
}
}
#[test]
fn into_block_from_json_response_with_invalid_hex_data() {
let response = JsonResponse(serde_json::json!("foobar"));
match TryInto::<Block>::try_into(response) {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
assert_eq!(e.get_ref().unwrap().to_string(), "invalid hex data");
},
Ok(_) => panic!("Expected error"),
}
}
#[test]
fn into_block_from_json_response_with_invalid_block_data() {
let response = JsonResponse(serde_json::json!("abcd"));
match TryInto::<Block>::try_into(response) {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
assert_eq!(e.get_ref().unwrap().to_string(), "invalid block data");
},
Ok(_) => panic!("Expected error"),
}
}
#[test]
fn into_block_from_json_response_with_valid_block_data() {
let genesis_block = genesis_block(Network::Bitcoin);
let response = JsonResponse(serde_json::json!(encode::serialize_hex(&genesis_block)));
match TryInto::<Block>::try_into(response) {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(block) => assert_eq!(block, genesis_block),
}
}
#[test]
fn into_block_hash_from_json_response_with_unexpected_type() {
let response = JsonResponse(serde_json::json!("foo"));
match TryInto::<(BlockHash, Option<u32>)>::try_into(response) {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON object");
},
Ok(_) => panic!("Expected error"),
}
}
#[test]
fn into_block_hash_from_json_response_with_unexpected_bestblockhash_type() {
let response = JsonResponse(serde_json::json!({ "bestblockhash": 42 }));
match TryInto::<(BlockHash, Option<u32>)>::try_into(response) {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON string");
},
Ok(_) => panic!("Expected error"),
}
}
#[test]
fn into_block_hash_from_json_response_with_invalid_hex_data() {
let response = JsonResponse(serde_json::json!({ "bestblockhash": "foobar"} ));
match TryInto::<(BlockHash, Option<u32>)>::try_into(response) {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
assert_eq!(e.get_ref().unwrap().to_string(), "invalid hex data");
},
Ok(_) => panic!("Expected error"),
}
}
#[test]
fn into_block_hash_from_json_response_without_height() {
let block = genesis_block(Network::Bitcoin);
let response = JsonResponse(serde_json::json!({
"bestblockhash": block.block_hash().to_hex(),
}));
match TryInto::<(BlockHash, Option<u32>)>::try_into(response) {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((hash, height)) => {
assert_eq!(hash, block.block_hash());
assert!(height.is_none());
},
}
}
#[test]
fn into_block_hash_from_json_response_with_unexpected_blocks_type() {
let block = genesis_block(Network::Bitcoin);
let response = JsonResponse(serde_json::json!({
"bestblockhash": block.block_hash().to_hex(),
"blocks": "foo",
}));
match TryInto::<(BlockHash, Option<u32>)>::try_into(response) {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON number");
},
Ok(_) => panic!("Expected error"),
}
}
#[test]
fn into_block_hash_from_json_response_with_invalid_height() {
let block = genesis_block(Network::Bitcoin);
let response = JsonResponse(serde_json::json!({
"bestblockhash": block.block_hash().to_hex(),
"blocks": std::u64::MAX,
}));
match TryInto::<(BlockHash, Option<u32>)>::try_into(response) {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
assert_eq!(e.get_ref().unwrap().to_string(), "invalid height");
},
Ok(_) => panic!("Expected error"),
}
}
#[test]
fn into_block_hash_from_json_response_with_height() {
let block = genesis_block(Network::Bitcoin);
let response = JsonResponse(serde_json::json!({
"bestblockhash": block.block_hash().to_hex(),
"blocks": 1,
}));
match TryInto::<(BlockHash, Option<u32>)>::try_into(response) {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((hash, height)) => {
assert_eq!(hash, block.block_hash());
assert_eq!(height.unwrap(), 1);
},
}
}
}

View file

@ -0,0 +1,767 @@
use chunked_transfer;
use serde_json;
use std::convert::TryFrom;
#[cfg(not(feature = "tokio"))]
use std::io::Write;
use std::net::ToSocketAddrs;
use std::time::Duration;
#[cfg(feature = "tokio")]
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
#[cfg(feature = "tokio")]
use tokio::net::TcpStream;
#[cfg(not(feature = "tokio"))]
use std::io::BufRead;
use std::io::Read;
#[cfg(not(feature = "tokio"))]
use std::net::TcpStream;
/// Timeout for operations on TCP streams.
const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5);
/// Maximum HTTP message header size in bytes.
const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192;
/// Maximum HTTP message body size in bytes. Enough for a hex-encoded block in JSON format and any
/// overhead for HTTP chunked transfer encoding.
const MAX_HTTP_MESSAGE_BODY_SIZE: usize = 2 * 4_000_000 + 32_000;
/// Endpoint for interacting with an HTTP-based API.
#[derive(Debug)]
pub struct HttpEndpoint {
host: String,
port: Option<u16>,
path: String,
}
impl HttpEndpoint {
/// Creates an endpoint for the given host and default HTTP port.
pub fn for_host(host: String) -> Self {
Self {
host,
port: None,
path: String::from("/"),
}
}
/// Specifies a port to use with the endpoint.
pub fn with_port(mut self, port: u16) -> Self {
self.port = Some(port);
self
}
/// Specifies a path to use with the endpoint.
pub fn with_path(mut self, path: String) -> Self {
self.path = path;
self
}
/// Returns the endpoint host.
pub fn host(&self) -> &str {
&self.host
}
/// Returns the endpoint port.
pub fn port(&self) -> u16 {
match self.port {
None => 80,
Some(port) => port,
}
}
/// Returns the endpoint path.
pub fn path(&self) -> &str {
&self.path
}
}
impl<'a> std::net::ToSocketAddrs for &'a HttpEndpoint {
type Iter = <(&'a str, u16) as std::net::ToSocketAddrs>::Iter;
fn to_socket_addrs(&self) -> std::io::Result<Self::Iter> {
(self.host(), self.port()).to_socket_addrs()
}
}
/// Client for making HTTP requests.
pub(crate) struct HttpClient {
stream: TcpStream,
}
impl HttpClient {
/// Opens a connection to an HTTP endpoint.
pub fn connect<E: ToSocketAddrs>(endpoint: E) -> std::io::Result<Self> {
let address = match endpoint.to_socket_addrs()?.next() {
None => {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "could not resolve to any addresses"));
},
Some(address) => address,
};
let stream = std::net::TcpStream::connect_timeout(&address, TCP_STREAM_TIMEOUT)?;
stream.set_read_timeout(Some(TCP_STREAM_TIMEOUT))?;
stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT))?;
#[cfg(feature = "tokio")]
let stream = {
stream.set_nonblocking(true)?;
TcpStream::from_std(stream)?
};
Ok(Self { stream })
}
/// Sends a `GET` request for a resource identified by `uri` at the `host`.
///
/// Returns the response body in `F` format.
#[allow(dead_code)]
pub async fn get<F>(&mut self, uri: &str, host: &str) -> std::io::Result<F>
where F: TryFrom<Vec<u8>, Error = std::io::Error> {
let request = format!(
"GET {} HTTP/1.1\r\n\
Host: {}\r\n\
Connection: keep-alive\r\n\
\r\n", uri, host);
let response_body = self.send_request_with_retry(&request).await?;
F::try_from(response_body)
}
/// Sends a `POST` request for a resource identified by `uri` at the `host` using the given HTTP
/// authentication credentials.
///
/// The request body consists of the provided JSON `content`. Returns the response body in `F`
/// format.
#[allow(dead_code)]
pub async fn post<F>(&mut self, uri: &str, host: &str, auth: &str, content: serde_json::Value) -> std::io::Result<F>
where F: TryFrom<Vec<u8>, Error = std::io::Error> {
let content = content.to_string();
let request = format!(
"POST {} HTTP/1.1\r\n\
Host: {}\r\n\
Authorization: {}\r\n\
Connection: keep-alive\r\n\
Content-Type: application/json\r\n\
Content-Length: {}\r\n\
\r\n\
{}", uri, host, auth, content.len(), content);
let response_body = self.send_request_with_retry(&request).await?;
F::try_from(response_body)
}
/// Sends an HTTP request message and reads the response, returning its body. Attempts to
/// reconnect and retry if the connection has been closed.
async fn send_request_with_retry(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
let endpoint = self.stream.peer_addr().unwrap();
match self.send_request(request).await {
Ok(bytes) => Ok(bytes),
Err(e) => match e.kind() {
std::io::ErrorKind::ConnectionReset |
std::io::ErrorKind::ConnectionAborted |
std::io::ErrorKind::UnexpectedEof => {
// Reconnect if the connection was closed. This may happen if the server's
// keep-alive limits are reached.
*self = Self::connect(endpoint)?;
self.send_request(request).await
},
_ => Err(e),
},
}
}
/// Sends an HTTP request message and reads the response, returning its body.
async fn send_request(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
self.write_request(request).await?;
self.read_response().await
}
/// Writes an HTTP request message.
async fn write_request(&mut self, request: &str) -> std::io::Result<()> {
#[cfg(feature = "tokio")]
{
self.stream.write_all(request.as_bytes()).await?;
self.stream.flush().await
}
#[cfg(not(feature = "tokio"))]
{
self.stream.write_all(request.as_bytes())?;
self.stream.flush()
}
}
/// Reads an HTTP response message.
async fn read_response(&mut self) -> std::io::Result<Vec<u8>> {
#[cfg(feature = "tokio")]
let stream = self.stream.split().0;
#[cfg(not(feature = "tokio"))]
let stream = std::io::Read::by_ref(&mut self.stream);
let limited_stream = stream.take(MAX_HTTP_MESSAGE_HEADER_SIZE as u64);
#[cfg(feature = "tokio")]
let mut reader = tokio::io::BufReader::new(limited_stream);
#[cfg(not(feature = "tokio"))]
let mut reader = std::io::BufReader::new(limited_stream);
macro_rules! read_line { () => { {
let mut line = String::new();
#[cfg(feature = "tokio")]
let bytes_read = reader.read_line(&mut line).await?;
#[cfg(not(feature = "tokio"))]
let bytes_read = reader.read_line(&mut line)?;
match bytes_read {
0 => None,
_ => {
// Remove trailing CRLF
if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } }
Some(line)
},
}
} } }
// Read and parse status line
let status_line = read_line!()
.ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?;
let status = HttpStatus::parse(&status_line)?;
// Read and parse relevant headers
let mut message_length = HttpMessageLength::Empty;
loop {
let line = read_line!()
.ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no headers"))?;
if line.is_empty() { break; }
let header = HttpHeader::parse(&line)?;
if header.has_name("Content-Length") {
let length = header.value.parse()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
if let HttpMessageLength::Empty = message_length {
message_length = HttpMessageLength::ContentLength(length);
}
continue;
}
if header.has_name("Transfer-Encoding") {
message_length = HttpMessageLength::TransferEncoding(header.value.into());
continue;
}
}
if !status.is_ok() {
// TODO: Handle 3xx redirection responses.
return Err(std::io::Error::new(std::io::ErrorKind::NotFound, "not found"));
}
// Read message body
let read_limit = MAX_HTTP_MESSAGE_BODY_SIZE - reader.buffer().len();
reader.get_mut().set_limit(read_limit as u64);
match message_length {
HttpMessageLength::Empty => { Ok(Vec::new()) },
HttpMessageLength::ContentLength(length) => {
if length == 0 || length > MAX_HTTP_MESSAGE_BODY_SIZE {
Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "out of range"))
} else {
let mut content = vec![0; length];
#[cfg(feature = "tokio")]
reader.read_exact(&mut content[..]).await?;
#[cfg(not(feature = "tokio"))]
reader.read_exact(&mut content[..])?;
Ok(content)
}
},
HttpMessageLength::TransferEncoding(coding) => {
if !coding.eq_ignore_ascii_case("chunked") {
Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput, "unsupported transfer coding"))
} else {
let mut content = Vec::new();
#[cfg(feature = "tokio")]
{
// Since chunked_transfer doesn't have an async interface, only use it to
// determine the size of each chunk to read.
//
// TODO: Replace with an async interface when available.
// https://github.com/frewsxcv/rust-chunked-transfer/issues/7
loop {
// Read the chunk header which contains the chunk size.
let mut chunk_header = String::new();
reader.read_line(&mut chunk_header).await?;
if chunk_header == "0\r\n" {
// Read the terminator chunk since the decoder consumes the CRLF
// immediately when this chunk is encountered.
reader.read_line(&mut chunk_header).await?;
}
// Decode the chunk header to obtain the chunk size.
let mut buffer = Vec::new();
let mut decoder = chunked_transfer::Decoder::new(chunk_header.as_bytes());
decoder.read_to_end(&mut buffer)?;
// Read the chunk body.
let chunk_size = match decoder.remaining_chunks_size() {
None => break,
Some(chunk_size) => chunk_size,
};
let chunk_offset = content.len();
content.resize(chunk_offset + chunk_size + "\r\n".len(), 0);
reader.read_exact(&mut content[chunk_offset..]).await?;
content.resize(chunk_offset + chunk_size, 0);
}
Ok(content)
}
#[cfg(not(feature = "tokio"))]
{
let mut decoder = chunked_transfer::Decoder::new(reader);
decoder.read_to_end(&mut content)?;
Ok(content)
}
}
},
}
}
}
/// HTTP response status code as defined by [RFC 7231].
///
/// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-6
struct HttpStatus<'a> {
code: &'a str,
}
impl<'a> HttpStatus<'a> {
/// Parses an HTTP status line as defined by [RFC 7230].
///
/// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.1.2
fn parse(line: &'a String) -> std::io::Result<HttpStatus<'a>> {
let mut tokens = line.splitn(3, ' ');
let http_version = tokens.next()
.ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no HTTP-Version"))?;
if !http_version.eq_ignore_ascii_case("HTTP/1.1") &&
!http_version.eq_ignore_ascii_case("HTTP/1.0") {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid HTTP-Version"));
}
let code = tokens.next()
.ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Status-Code"))?;
if code.len() != 3 || !code.chars().all(|c| c.is_ascii_digit()) {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid Status-Code"));
}
let _reason = tokens.next()
.ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Reason-Phrase"))?;
Ok(Self { code })
}
/// Returns whether the status is successful (i.e., 2xx status class).
fn is_ok(&self) -> bool {
self.code.starts_with('2')
}
}
/// HTTP response header as defined by [RFC 7231].
///
/// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-7
struct HttpHeader<'a> {
name: &'a str,
value: &'a str,
}
impl<'a> HttpHeader<'a> {
/// Parses an HTTP header field as defined by [RFC 7230].
///
/// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.2
fn parse(line: &'a String) -> std::io::Result<HttpHeader<'a>> {
let mut tokens = line.splitn(2, ':');
let name = tokens.next()
.ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header name"))?;
let value = tokens.next()
.ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header value"))?
.trim_start();
Ok(Self { name, value })
}
/// Returns whether the header field has the given name.
fn has_name(&self, name: &str) -> bool {
self.name.eq_ignore_ascii_case(name)
}
}
/// HTTP message body length as defined by [RFC 7230].
///
/// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.3.3
enum HttpMessageLength {
Empty,
ContentLength(usize),
TransferEncoding(String),
}
/// An HTTP response body in binary format.
pub(crate) struct BinaryResponse(pub(crate) Vec<u8>);
/// An HTTP response body in JSON format.
pub(crate) struct JsonResponse(pub(crate) serde_json::Value);
/// Interprets bytes from an HTTP response body as binary data.
impl TryFrom<Vec<u8>> for BinaryResponse {
type Error = std::io::Error;
fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
Ok(BinaryResponse(bytes))
}
}
/// Interprets bytes from an HTTP response body as a JSON value.
impl TryFrom<Vec<u8>> for JsonResponse {
type Error = std::io::Error;
fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
Ok(JsonResponse(serde_json::from_slice(&bytes)?))
}
}
#[cfg(test)]
mod endpoint_tests {
use super::HttpEndpoint;
#[test]
fn with_default_port() {
let endpoint = HttpEndpoint::for_host("foo.com".into());
assert_eq!(endpoint.host(), "foo.com");
assert_eq!(endpoint.port(), 80);
}
#[test]
fn with_custom_port() {
let endpoint = HttpEndpoint::for_host("foo.com".into()).with_port(8080);
assert_eq!(endpoint.host(), "foo.com");
assert_eq!(endpoint.port(), 8080);
}
#[test]
fn with_uri_path() {
let endpoint = HttpEndpoint::for_host("foo.com".into()).with_path("/path".into());
assert_eq!(endpoint.host(), "foo.com");
assert_eq!(endpoint.path(), "/path");
}
#[test]
fn without_uri_path() {
let endpoint = HttpEndpoint::for_host("foo.com".into());
assert_eq!(endpoint.host(), "foo.com");
assert_eq!(endpoint.path(), "/");
}
#[test]
fn convert_to_socket_addrs() {
let endpoint = HttpEndpoint::for_host("foo.com".into());
let host = endpoint.host();
let port = endpoint.port();
use std::net::ToSocketAddrs;
match (&endpoint).to_socket_addrs() {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(mut socket_addrs) => {
match socket_addrs.next() {
None => panic!("Expected socket address"),
Some(addr) => {
assert_eq!(addr, (host, port).to_socket_addrs().unwrap().next().unwrap());
assert!(socket_addrs.next().is_none());
}
}
}
}
}
}
#[cfg(test)]
pub(crate) mod client_tests {
use super::*;
use std::io::BufRead;
use std::io::Write;
/// Server for handling HTTP client requests with a stock response.
pub struct HttpServer {
address: std::net::SocketAddr,
handler: std::thread::JoinHandle<()>,
shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
/// Body of HTTP response messages.
pub enum MessageBody<T: ToString> {
Empty,
Content(T),
ChunkedContent(T),
}
impl HttpServer {
pub fn responding_with_ok<T: ToString>(body: MessageBody<T>) -> Self {
let response = match body {
MessageBody::Empty => "HTTP/1.1 200 OK\r\n\r\n".to_string(),
MessageBody::Content(body) => {
let body = body.to_string();
format!(
"HTTP/1.1 200 OK\r\n\
Content-Length: {}\r\n\
\r\n\
{}", body.len(), body)
},
MessageBody::ChunkedContent(body) => {
let mut chuncked_body = Vec::new();
{
use chunked_transfer::Encoder;
let mut encoder = Encoder::with_chunks_size(&mut chuncked_body, 8);
encoder.write_all(body.to_string().as_bytes()).unwrap();
}
format!(
"HTTP/1.1 200 OK\r\n\
Transfer-Encoding: chunked\r\n\
\r\n\
{}", String::from_utf8(chuncked_body).unwrap())
},
};
HttpServer::responding_with(response)
}
pub fn responding_with_not_found() -> Self {
let response = "HTTP/1.1 404 Not Found\r\n\r\n".to_string();
HttpServer::responding_with(response)
}
fn responding_with(response: String) -> Self {
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let address = listener.local_addr().unwrap();
let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let shutdown_signaled = std::sync::Arc::clone(&shutdown);
let handler = std::thread::spawn(move || {
for stream in listener.incoming() {
let mut stream = stream.unwrap();
stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT)).unwrap();
let lines_read = std::io::BufReader::new(&stream)
.lines()
.take_while(|line| !line.as_ref().unwrap().is_empty())
.count();
if lines_read == 0 { continue; }
for chunk in response.as_bytes().chunks(16) {
if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) {
return;
} else {
if let Err(_) = stream.write(chunk) { break; }
if let Err(_) = stream.flush() { break; }
}
}
}
});
Self { address, handler, shutdown }
}
fn shutdown(self) {
self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst);
self.handler.join().unwrap();
}
pub fn endpoint(&self) -> HttpEndpoint {
HttpEndpoint::for_host(self.address.ip().to_string()).with_port(self.address.port())
}
}
#[test]
fn connect_to_unresolvable_host() {
match HttpClient::connect(("example.invalid", 80)) {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other),
Ok(_) => panic!("Expected error"),
}
}
#[test]
fn connect_with_no_socket_address() {
match HttpClient::connect(&vec![][..]) {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput),
Ok(_) => panic!("Expected error"),
}
}
#[test]
fn connect_with_unknown_server() {
match HttpClient::connect(("::", 80)) {
#[cfg(target_os = "windows")]
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::AddrNotAvailable),
#[cfg(not(target_os = "windows"))]
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::ConnectionRefused),
Ok(_) => panic!("Expected error"),
}
}
#[tokio::test]
async fn connect_with_valid_endpoint() {
let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
match HttpClient::connect(&server.endpoint()) {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(_) => {},
}
}
#[tokio::test]
async fn read_empty_message() {
let server = HttpServer::responding_with("".to_string());
let mut client = HttpClient::connect(&server.endpoint()).unwrap();
match client.get::<BinaryResponse>("/foo", "foo.com").await {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
assert_eq!(e.get_ref().unwrap().to_string(), "no status line");
},
Ok(_) => panic!("Expected error"),
}
}
#[tokio::test]
async fn read_incomplete_message() {
let server = HttpServer::responding_with("HTTP/1.1 200 OK".to_string());
let mut client = HttpClient::connect(&server.endpoint()).unwrap();
match client.get::<BinaryResponse>("/foo", "foo.com").await {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
},
Ok(_) => panic!("Expected error"),
}
}
#[tokio::test]
async fn read_too_large_message_headers() {
let response = format!(
"HTTP/1.1 302 Found\r\n\
Location: {}\r\n\
\r\n", "Z".repeat(MAX_HTTP_MESSAGE_HEADER_SIZE));
let server = HttpServer::responding_with(response);
let mut client = HttpClient::connect(&server.endpoint()).unwrap();
match client.get::<BinaryResponse>("/foo", "foo.com").await {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
},
Ok(_) => panic!("Expected error"),
}
}
#[tokio::test]
async fn read_too_large_message_body() {
let body = "Z".repeat(MAX_HTTP_MESSAGE_BODY_SIZE + 1);
let server = HttpServer::responding_with_ok::<String>(MessageBody::Content(body));
let mut client = HttpClient::connect(&server.endpoint()).unwrap();
match client.get::<BinaryResponse>("/foo", "foo.com").await {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
assert_eq!(e.get_ref().unwrap().to_string(), "out of range");
},
Ok(_) => panic!("Expected error"),
}
server.shutdown();
}
#[tokio::test]
async fn read_message_with_unsupported_transfer_coding() {
let response = String::from(
"HTTP/1.1 200 OK\r\n\
Transfer-Encoding: gzip\r\n\
\r\n\
foobar");
let server = HttpServer::responding_with(response);
let mut client = HttpClient::connect(&server.endpoint()).unwrap();
match client.get::<BinaryResponse>("/foo", "foo.com").await {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
assert_eq!(e.get_ref().unwrap().to_string(), "unsupported transfer coding");
},
Ok(_) => panic!("Expected error"),
}
}
#[tokio::test]
async fn read_empty_message_body() {
let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
let mut client = HttpClient::connect(&server.endpoint()).unwrap();
match client.get::<BinaryResponse>("/foo", "foo.com").await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
}
}
#[tokio::test]
async fn read_message_body_with_length() {
let body = "foo bar baz qux".repeat(32);
let content = MessageBody::Content(body.clone());
let server = HttpServer::responding_with_ok::<String>(content);
let mut client = HttpClient::connect(&server.endpoint()).unwrap();
match client.get::<BinaryResponse>("/foo", "foo.com").await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
}
}
#[tokio::test]
async fn read_chunked_message_body() {
let body = "foo bar baz qux".repeat(32);
let chunked_content = MessageBody::ChunkedContent(body.clone());
let server = HttpServer::responding_with_ok::<String>(chunked_content);
let mut client = HttpClient::connect(&server.endpoint()).unwrap();
match client.get::<BinaryResponse>("/foo", "foo.com").await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
}
}
#[tokio::test]
async fn reconnect_closed_connection() {
let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
let mut client = HttpClient::connect(&server.endpoint()).unwrap();
assert!(client.get::<BinaryResponse>("/foo", "foo.com").await.is_ok());
match client.get::<BinaryResponse>("/foo", "foo.com").await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
}
}
#[test]
fn from_bytes_into_binary_response() {
let bytes = b"foo";
match BinaryResponse::try_from(bytes.to_vec()) {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(response) => assert_eq!(&response.0, bytes),
}
}
#[test]
fn from_invalid_bytes_into_json_response() {
let json = serde_json::json!({ "result": 42 });
match JsonResponse::try_from(json.to_string().as_bytes()[..5].to_vec()) {
Err(_) => {},
Ok(_) => panic!("Expected error"),
}
}
#[test]
fn from_valid_bytes_into_json_response() {
let json = serde_json::json!({ "result": 42 });
match JsonResponse::try_from(json.to_string().as_bytes().to_vec()) {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(response) => assert_eq!(response.0, json),
}
}
}

View file

@ -0,0 +1,126 @@
//! A lightweight client for keeping in sync with chain activity.
//!
//! Defines a [`BlockSource`] trait, which is an asynchronous interface for retrieving block headers
//! and data.
//!
//! Enabling feature `rest-client` or `rpc-client` allows configuring the client to fetch blocks
//! using Bitcoin Core's REST or RPC interface, respectively.
//!
//! Both features support either blocking I/O using `std::net::TcpStream` or, with feature `tokio`,
//! non-blocking I/O using `tokio::net::TcpStream` from inside a Tokio runtime.
//!
//! [`BlockSource`]: trait.BlockSource.html
#[cfg(any(feature = "rest-client", feature = "rpc-client"))]
pub mod http;
#[cfg(feature = "rest-client")]
pub mod rest;
#[cfg(feature = "rpc-client")]
pub mod rpc;
#[cfg(any(feature = "rest-client", feature = "rpc-client"))]
mod convert;
#[cfg(any(feature = "rest-client", feature = "rpc-client"))]
mod utils;
use bitcoin::blockdata::block::{Block, BlockHeader};
use bitcoin::hash_types::BlockHash;
use bitcoin::util::uint::Uint256;
use std::future::Future;
use std::pin::Pin;
/// Abstract type for retrieving block headers and data.
pub trait BlockSource : Sync + Send {
/// Returns the header for a given hash. A height hint may be provided in case a block source
/// cannot easily find headers based on a hash. This is merely a hint and thus the returned
/// header must have the same hash as was requested. Otherwise, an error must be returned.
///
/// Implementations that cannot find headers based on the hash should return a `Transient` error
/// when `height_hint` is `None`.
fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, height_hint: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData>;
/// Returns the block for a given hash. A headers-only block source should return a `Transient`
/// error.
fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>;
// TODO: Phrase in terms of `Poll` once added.
/// Returns the hash of the best block and, optionally, its height. When polling a block source,
/// the height is passed to `get_header` to allow for a more efficient lookup.
fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<(BlockHash, Option<u32>)>;
}
/// Result type for `BlockSource` requests.
type BlockSourceResult<T> = Result<T, BlockSourceError>;
// TODO: Replace with BlockSourceResult once `async` trait functions are supported. For details,
// see: https://areweasyncyet.rs.
/// Result type for asynchronous `BlockSource` requests.
type AsyncBlockSourceResult<'a, T> = Pin<Box<dyn Future<Output = BlockSourceResult<T>> + 'a + Send>>;
/// Error type for `BlockSource` requests.
///
/// Transient errors may be resolved when re-polling, but no attempt will be made to re-poll on
/// persistent errors.
pub struct BlockSourceError {
kind: BlockSourceErrorKind,
error: Box<dyn std::error::Error + Send + Sync>,
}
/// The kind of `BlockSourceError`, either persistent or transient.
#[derive(Clone, Copy)]
pub enum BlockSourceErrorKind {
/// Indicates an error that won't resolve when retrying a request (e.g., invalid data).
Persistent,
/// Indicates an error that may resolve when retrying a request (e.g., unresponsive).
Transient,
}
impl BlockSourceError {
/// Creates a new persistent error originated from the given error.
pub fn persistent<E>(error: E) -> Self
where E: Into<Box<dyn std::error::Error + Send + Sync>> {
Self {
kind: BlockSourceErrorKind::Persistent,
error: error.into(),
}
}
/// Creates a new transient error originated from the given error.
pub fn transient<E>(error: E) -> Self
where E: Into<Box<dyn std::error::Error + Send + Sync>> {
Self {
kind: BlockSourceErrorKind::Transient,
error: error.into(),
}
}
/// Returns the kind of error.
pub fn kind(&self) -> BlockSourceErrorKind {
self.kind
}
/// Converts the error into the underlying error.
pub fn into_inner(self) -> Box<dyn std::error::Error + Send + Sync> {
self.error
}
}
/// A block header and some associated data. This information should be available from most block
/// sources (and, notably, is available in Bitcoin Core's RPC and REST interfaces).
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct BlockHeaderData {
/// The block header itself.
pub header: BlockHeader,
/// The block height where the genesis block has height 0.
pub height: u32,
/// The total chain work in expected number of double-SHA256 hashes required to build a chain
/// of equivalent weight.
pub chainwork: Uint256,
}

View file

@ -0,0 +1,110 @@
use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult};
use crate::http::{BinaryResponse, HttpEndpoint, HttpClient, JsonResponse};
use bitcoin::blockdata::block::Block;
use bitcoin::hash_types::BlockHash;
use bitcoin::hashes::hex::ToHex;
use std::convert::TryFrom;
use std::convert::TryInto;
/// A simple REST client for requesting resources using HTTP `GET`.
pub struct RestClient {
endpoint: HttpEndpoint,
client: HttpClient,
}
impl RestClient {
/// Creates a new REST client connected to the given endpoint.
///
/// The endpoint should contain the REST path component (e.g., http://127.0.0.1:8332/rest).
pub fn new(endpoint: HttpEndpoint) -> std::io::Result<Self> {
let client = HttpClient::connect(&endpoint)?;
Ok(Self { endpoint, client })
}
/// Requests a resource encoded in `F` format and interpreted as type `T`.
async fn request_resource<F, T>(&mut self, resource_path: &str) -> std::io::Result<T>
where F: TryFrom<Vec<u8>, Error = std::io::Error> + TryInto<T, Error = std::io::Error> {
let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port());
let uri = format!("{}/{}", self.endpoint.path().trim_end_matches("/"), resource_path);
self.client.get::<F>(&uri, &host).await?.try_into()
}
}
impl BlockSource for RestClient {
fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
Box::pin(async move {
let resource_path = format!("headers/1/{}.json", header_hash.to_hex());
Ok(self.request_resource::<JsonResponse, _>(&resource_path).await?)
})
}
fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
Box::pin(async move {
let resource_path = format!("block/{}.bin", header_hash.to_hex());
Ok(self.request_resource::<BinaryResponse, _>(&resource_path).await?)
})
}
fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
Box::pin(async move {
Ok(self.request_resource::<JsonResponse, _>("chaininfo.json").await?)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::http::BinaryResponse;
use crate::http::client_tests::{HttpServer, MessageBody};
/// Parses binary data as a string-encoded `u32`.
impl TryInto<u32> for BinaryResponse {
type Error = std::io::Error;
fn try_into(self) -> std::io::Result<u32> {
match std::str::from_utf8(&self.0) {
Err(e) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
Ok(s) => match u32::from_str_radix(s, 10) {
Err(e) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
Ok(n) => Ok(n),
}
}
}
}
#[tokio::test]
async fn request_unknown_resource() {
let server = HttpServer::responding_with_not_found();
let mut client = RestClient::new(server.endpoint()).unwrap();
match client.request_resource::<BinaryResponse, u32>("/").await {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::NotFound),
Ok(_) => panic!("Expected error"),
}
}
#[tokio::test]
async fn request_malformed_resource() {
let server = HttpServer::responding_with_ok(MessageBody::Content("foo"));
let mut client = RestClient::new(server.endpoint()).unwrap();
match client.request_resource::<BinaryResponse, u32>("/").await {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidData),
Ok(_) => panic!("Expected error"),
}
}
#[tokio::test]
async fn request_valid_resource() {
let server = HttpServer::responding_with_ok(MessageBody::Content(42));
let mut client = RestClient::new(server.endpoint()).unwrap();
match client.request_resource::<BinaryResponse, u32>("/").await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(n) => assert_eq!(n, 42),
}
}
}

View file

@ -0,0 +1,197 @@
use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult};
use crate::http::{HttpClient, HttpEndpoint, JsonResponse};
use bitcoin::blockdata::block::Block;
use bitcoin::hash_types::BlockHash;
use bitcoin::hashes::hex::ToHex;
use serde_json;
use std::convert::TryFrom;
use std::convert::TryInto;
use std::sync::atomic::{AtomicUsize, Ordering};
/// A simple RPC client for calling methods using HTTP `POST`.
pub struct RpcClient {
basic_auth: String,
endpoint: HttpEndpoint,
client: HttpClient,
id: AtomicUsize,
}
impl RpcClient {
/// Creates a new RPC client connected to the given endpoint with the provided credentials. The
/// credentials should be a base64 encoding of a user name and password joined by a colon, as is
/// required for HTTP basic access authentication.
pub fn new(credentials: &str, endpoint: HttpEndpoint) -> std::io::Result<Self> {
let client = HttpClient::connect(&endpoint)?;
Ok(Self {
basic_auth: "Basic ".to_string() + credentials,
endpoint,
client,
id: AtomicUsize::new(0),
})
}
/// Calls a method with the response encoded in JSON format and interpreted as type `T`.
async fn call_method<T>(&mut self, method: &str, params: &[serde_json::Value]) -> std::io::Result<T>
where JsonResponse: TryFrom<Vec<u8>, Error = std::io::Error> + TryInto<T, Error = std::io::Error> {
let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port());
let uri = self.endpoint.path();
let content = serde_json::json!({
"method": method,
"params": params,
"id": &self.id.fetch_add(1, Ordering::AcqRel).to_string()
});
let mut response = self.client.post::<JsonResponse>(&uri, &host, &self.basic_auth, content)
.await?.0;
if !response.is_object() {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON object"));
}
let error = &response["error"];
if !error.is_null() {
// TODO: Examine error code for a more precise std::io::ErrorKind.
let message = error["message"].as_str().unwrap_or("unknown error");
return Err(std::io::Error::new(std::io::ErrorKind::Other, message));
}
let result = &mut response["result"];
if result.is_null() {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON result"));
}
JsonResponse(result.take()).try_into()
}
}
impl BlockSource for RpcClient {
fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
Box::pin(async move {
let header_hash = serde_json::json!(header_hash.to_hex());
Ok(self.call_method("getblockheader", &[header_hash]).await?)
})
}
fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
Box::pin(async move {
let header_hash = serde_json::json!(header_hash.to_hex());
let verbosity = serde_json::json!(0);
Ok(self.call_method("getblock", &[header_hash, verbosity]).await?)
})
}
fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
Box::pin(async move {
Ok(self.call_method("getblockchaininfo", &[]).await?)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::http::client_tests::{HttpServer, MessageBody};
/// Credentials encoded in base64.
const CREDENTIALS: &'static str = "dXNlcjpwYXNzd29yZA==";
/// Converts a JSON value into `u64`.
impl TryInto<u64> for JsonResponse {
type Error = std::io::Error;
fn try_into(self) -> std::io::Result<u64> {
match self.0.as_u64() {
None => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "not a number")),
Some(n) => Ok(n),
}
}
}
#[tokio::test]
async fn call_method_returning_unknown_response() {
let server = HttpServer::responding_with_not_found();
let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::NotFound),
Ok(_) => panic!("Expected error"),
}
}
#[tokio::test]
async fn call_method_returning_malfomred_response() {
let response = serde_json::json!("foo");
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON object");
},
Ok(_) => panic!("Expected error"),
}
}
#[tokio::test]
async fn call_method_returning_error() {
let response = serde_json::json!({
"error": { "code": -8, "message": "invalid parameter" },
});
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
let invalid_block_hash = serde_json::json!("foo");
match client.call_method::<u64>("getblock", &[invalid_block_hash]).await {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::Other);
assert_eq!(e.get_ref().unwrap().to_string(), "invalid parameter");
},
Ok(_) => panic!("Expected error"),
}
}
#[tokio::test]
async fn call_method_returning_missing_result() {
let response = serde_json::json!({ "result": null });
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON result");
},
Ok(_) => panic!("Expected error"),
}
}
#[tokio::test]
async fn call_method_returning_malformed_result() {
let response = serde_json::json!({ "result": "foo" });
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
assert_eq!(e.get_ref().unwrap().to_string(), "not a number");
},
Ok(_) => panic!("Expected error"),
}
}
#[tokio::test]
async fn call_method_returning_valid_result() {
let response = serde_json::json!({ "result": 654470 });
let server = HttpServer::responding_with_ok(MessageBody::Content(response));
let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
match client.call_method::<u64>("getblockcount", &[]).await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok(count) => assert_eq!(count, 654470),
}
}
}

View file

@ -0,0 +1,54 @@
use bitcoin::hashes::hex::FromHex;
use bitcoin::util::uint::Uint256;
pub fn hex_to_uint256(hex: &str) -> Result<Uint256, bitcoin::hashes::hex::Error> {
let bytes = <[u8; 32]>::from_hex(hex)?;
Ok(Uint256::from_be_bytes(bytes))
}
#[cfg(test)]
mod tests {
use super::*;
use bitcoin::util::uint::Uint256;
#[test]
fn hex_to_uint256_empty_str() {
assert!(hex_to_uint256("").is_err());
}
#[test]
fn hex_to_uint256_too_short_str() {
let hex = String::from_utf8(vec![b'0'; 32]).unwrap();
assert_eq!(hex_to_uint256(&hex), Err(bitcoin::hashes::hex::Error::InvalidLength(64, 32)));
}
#[test]
fn hex_to_uint256_too_long_str() {
let hex = String::from_utf8(vec![b'0'; 128]).unwrap();
assert_eq!(hex_to_uint256(&hex), Err(bitcoin::hashes::hex::Error::InvalidLength(64, 128)));
}
#[test]
fn hex_to_uint256_odd_length_str() {
let hex = String::from_utf8(vec![b'0'; 65]).unwrap();
assert_eq!(hex_to_uint256(&hex), Err(bitcoin::hashes::hex::Error::OddLengthString(65)));
}
#[test]
fn hex_to_uint256_invalid_char() {
let hex = String::from_utf8(vec![b'G'; 64]).unwrap();
assert_eq!(hex_to_uint256(&hex), Err(bitcoin::hashes::hex::Error::InvalidChar(b'G')));
}
#[test]
fn hex_to_uint256_lowercase_str() {
let hex: String = std::iter::repeat("0123456789abcdef").take(4).collect();
assert_eq!(hex_to_uint256(&hex).unwrap(), Uint256([0x0123456789abcdefu64; 4]));
}
#[test]
fn hex_to_uint256_uppercase_str() {
let hex: String = std::iter::repeat("0123456789ABCDEF").take(4).collect();
assert_eq!(hex_to_uint256(&hex).unwrap(), Uint256([0x0123456789abcdefu64; 4]));
}
}