cln_rpc: Split low- from high-level API calls.

The `cln::ClnRpc` plugin has a `call` and a `call`-typed method
which worked only on structs that are mentioned in
`src::primitives::Request`.

The consequence is that any rpc-method that is not (yet) defined in this
crate could not be used.

I've adapted the `ClnRpc`-method and create a low-level binding named
`call_raw`. All changes in this commit should be backward compatible.
This commit is contained in:
Erik De Smedt 2023-12-18 16:45:09 +01:00 committed by Christian Decker
parent a59dbbdae5
commit ce41aa4ccc
3 changed files with 210 additions and 33 deletions

14
Cargo.lock generated
View File

@ -318,6 +318,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-test",
"tokio-util", "tokio-util",
] ]
@ -1439,6 +1440,19 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-test"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e89b3cbabd3ae862100094ae433e1def582cf86451b4e9bf83aa7ac1d8a7d719"
dependencies = [
"async-stream",
"bytes",
"futures-core",
"tokio",
"tokio-stream",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.9" version = "0.7.9"

View File

@ -27,3 +27,4 @@ tokio-util = { version = "0.7", features = ["codec"] }
[dev-dependencies] [dev-dependencies]
env_logger = "0.10" env_logger = "0.10"
tokio = { version = "1", features = ["net", "macros", "rt-multi-thread"]} tokio = { version = "1", features = ["net", "macros", "rt-multi-thread"]}
tokio-test = "0.4.3"

View File

@ -1,11 +1,11 @@
use crate::codec::JsonCodec; use crate::codec::JsonCodec;
use crate::codec::JsonRpc;
pub use anyhow::Error; pub use anyhow::Error;
use anyhow::Result; use anyhow::Result;
use core::fmt::Debug;
use futures_util::sink::SinkExt; use futures_util::sink::SinkExt;
use futures_util::StreamExt; use futures_util::StreamExt;
use log::{debug, trace}; use log::{debug, trace};
use serde_json::json; use serde::{de::DeserializeOwned, Serialize};
use std::path::Path; use std::path::Path;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -57,44 +57,60 @@ impl ClnRpc {
}) })
} }
pub async fn call(&mut self, req: Request) -> Result<Response, RpcError> { /// Low-level API to call the rpc
trace!("Sending request {:?}", req); ///
/// It is the resposnbility of the caller to pick valid types `R` and `P`.
// Wrap the raw request in a well-formed JSON-RPC outer dict. ///
/// ```no_run
/// use cln_rpc::ClnRpc;
/// use cln_rpc::model::{requests::GetinfoRequest, responses::GetinfoResponse, responses::ListfundsResponse};
/// use std::path::Path;
/// use tokio_test;
/// tokio_test::block_on( async {
///
/// // Call using json-values
/// let mut cln = ClnRpc::new(Path::new("./lightningd/rpc")).await.unwrap();
/// let request = serde_json::json!({});
/// let response : serde_json::Value = cln.call_raw("getinfo", request).await.unwrap();
///
/// // Using a model
/// // Prefer to use call_typed instead
/// let request = GetinfoRequest {};
/// let response : GetinfoResponse = cln.call_raw("getinfo", request.clone()).await.unwrap();
/// // `call_typed` is more ergonomic because you don't have to specify the method name and return type
/// let response = cln.call_typed(request).await.unwrap();
///
/// // `call_typed` can catch issues at compile_time
/// let request = GetinfoRequest {};
/// let response : ListfundsResponse = cln.call_raw("get_info", request).await.unwrap(); // Runtime error
/// // The next line would not compile
/// // let response : ListfundsResponse = cln.call_typed(request).await.unwrap();
/// })
/// ```
pub async fn call_raw<R, P>(&mut self, method: &str, params: P) -> Result<R, RpcError>
where
P: Serialize + Debug,
R: DeserializeOwned + Debug,
{
trace!("Sending request {} with params {:?}", method, &params);
let id = self.next_id.fetch_add(1, Ordering::SeqCst); let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let req: JsonRpc<Notification, Request> = JsonRpc::Request(json!(id), req);
let req = serde_json::to_value(req).map_err(|e| RpcError {
code: None,
message: format!("Error parsing request: {}", e),
data: None,
})?;
let req2 = req.clone();
self.write.send(req).await.map_err(|e| RpcError {
code: None,
message: format!("Error passing request to lightningd: {}", e),
data: None,
})?;
let mut response = self // TODO: Can we make this nicer
.read // I don't want to have this json_rpc : 2.0 floating everywhere
.next() let req = serde_json::json!({
.await "jsonrpc" : "2.0",
.ok_or_else(|| RpcError { "id" : id,
code: None, "method" : method,
message: "no response from lightningd".to_string(), "params" : params,
data: None, });
})?
.map_err(|_| RpcError { let mut response: serde_json::Value = self.call_raw_request(&req).await?;
code: None,
message: "reading response from socket".to_string(),
data: None,
})?;
trace!("Read response {:?}", response); trace!("Read response {:?}", response);
// Annotate the response with the method from the request, so // Annotate the response with the method from the request, so
// serde_json knows which variant of [`Request`] should be // serde_json knows which variant of [`Request`] should be
// used. // used.
response["method"] = req2["method"].clone(); response["method"] = serde_json::Value::String(method.into());
if let Some(_) = response.get("result") { if let Some(_) = response.get("result") {
serde_json::from_value(response).map_err(|e| RpcError { serde_json::from_value(response).map_err(|e| RpcError {
code: None, code: None,
@ -113,6 +129,92 @@ impl ClnRpc {
} }
} }
/// A low level method to call raw reqeusts
///
/// This method is private by intention.
/// The caller is (implicitly) providing the `id` of the JsonRpcRequest.
/// This is dangerous because the caller might pick a non-unique id.
///
/// The request should serialize to a valid JsonRpcMessage and the response
/// should be able to deserialize any successful JsonRpcResponse.
/// ```no_run
/// use std::path::Path;
/// use cln_rpc::ClnRpc;
/// use tokio_test;
/// tokio_test::block_on( async {
/// let request = serde_json::json!({
/// "id" : 1,
/// "jsonrpc" : "2.0",
/// "method" : "some_method",
/// "params" : {}
/// }
/// );
/// let rpc = ClnRpc::new(Path::new("my_path_to_rpc_file"));
/// // let resp : serde_json::Value = rpc.call_raw_request(request).await.unwrap();
/// })
/// ```
///
async fn call_raw_request<Req, Resp>(&mut self, request: &Req) -> Result<Resp, RpcError>
where
Req: Serialize + Debug,
Resp: DeserializeOwned,
{
trace!("Sending request {:?}", request);
let request = serde_json::to_value(request).unwrap();
self.write.send(request).await.map_err(|e| RpcError {
code: None,
message: format!("Error passing request to lightningd: {}", e),
data: None,
})?;
let response = self
.read
.next()
.await
.ok_or_else(|| RpcError {
code: None,
message: "no response from lightningd".to_string(),
data: None,
})?
.map_err(|_| RpcError {
code: None,
message: "reading response from socket".to_string(),
data: None,
})?;
serde_json::from_value(response).map_err(|_| RpcError {
code: None,
message: "Failed to parse response".to_string(),
data: None,
})
}
pub async fn call(&mut self, req: Request) -> Result<Response, RpcError> {
trace!("call : Serialize and deserialize request {:?}", req);
// Construct the full JsonRpcRequest
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let mut value = serde_json::to_value(req).map_err(|e| RpcError {
code: None,
message: format!("Failed to serialize request: {}", e),
data: None,
})?;
value["jsonrpc"] = "2.0".into();
value["id"] = id.into();
let method = value["method"].clone();
//
let mut response: serde_json::Value = self.call_raw_request(&value).await?;
// Parse the response
// We add the `method` here because the Response-enum uses it to determine the type
response["method"] = method;
serde_json::from_value(response).map_err(|e| RpcError {
code: None,
message: format!("Failed to deserializer response : {}", e),
data: None,
})
}
pub async fn call_typed<R: IntoRequest>( pub async fn call_typed<R: IntoRequest>(
&mut self, &mut self,
request: R, request: R,
@ -139,6 +241,66 @@ mod test {
use crate::model::*; use crate::model::*;
use futures_util::StreamExt; use futures_util::StreamExt;
use serde_json::json; use serde_json::json;
use tokio_util::codec::{Framed, FramedRead};
#[tokio::test]
async fn call_raw_request() {
// Define the request and response send in the RPC-message
let rpc_request = serde_json::json!({
"id" : 1,
"jsonrpc" : "2.0",
"params" : {},
"method" : "some_method"
});
let rpc_request2 = rpc_request.clone();
let rpc_response = serde_json::json!({
"jsonrpc" : "2.0",
"id" : "1",
"result" : {"field_6" : 6}
});
let rpc_response2 = rpc_response.clone();
// Set up a pair of unix-streams
// The ClnRpc will read and write from usd1
// Im our test will read and write from usd2 and emulate Core Lightning behavior
let (uds1, uds2) = UnixStream::pair().unwrap();
let mut cln = ClnRpc::from_stream(uds1).unwrap();
// Open the test dummy reader
let mut frame = Framed::new(uds2, JsonCodec::default());
// Spawn the task that performs the RPC-call
tokio::task::spawn(async move {
let returned: serde_json::Value = cln.call_raw_request(&rpc_request2).await.unwrap();
assert_eq!(&returned, &rpc_response2)
});
// Verify that our emulated server received a request
let read_req = dbg!(frame.next().await.unwrap().unwrap());
assert_eq!(&rpc_request, &read_req);
frame.send(rpc_response).await.unwrap();
}
#[tokio::test]
async fn call_raw() {
let req = serde_json::json!({});
let (uds1, uds2) = UnixStream::pair().unwrap();
let mut cln = ClnRpc::from_stream(uds1).unwrap();
let mut read = FramedRead::new(uds2, JsonCodec::default());
tokio::task::spawn(async move {
let _: serde_json::Value = cln.call_raw("getinfo", req).await.unwrap();
});
let read_req = dbg!(read.next().await.unwrap().unwrap());
assert_eq!(
json!({"id": 1, "method": "getinfo", "params": {}, "jsonrpc": "2.0"}),
read_req
);
}
#[tokio::test] #[tokio::test]
async fn test_call() { async fn test_call() {