cln-rpc: Scaffolding for the cln-rpc crate

Changelog-Added: cln-rpc: A new Rust library called `cln-rpc` can be used to interact with the JSON-RPC
This commit is contained in:
Christian Decker 2022-01-14 16:11:28 +01:00
parent 7fdad0a60c
commit faa3835177
11 changed files with 595 additions and 1 deletions

4
Cargo.toml Normal file
View File

@ -0,0 +1,4 @@
[workspace]
members = [
"cln-rpc",
]

View File

@ -355,7 +355,7 @@ ifneq ($(FUZZING),0)
include tests/fuzz/Makefile
endif
ifneq ($(RUST),0)
# Add Rust Makefiles here
include cln-rpc/Makefile
endif
# We make pretty much everything depend on these.

19
cln-rpc/Cargo.toml Normal file
View File

@ -0,0 +1,19 @@
[package]
name = "cln-rpc"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.51"
bytes = "1.1.0"
log = "0.4.14"
serde = { version = "1.0.131", features = ["derive"] }
serde_json = "1.0.72"
tokio-util = { version = "0.6.9", features = ["codec"] }
tokio = { version = "1", features = ["net"]}
native-tls = { version = "*", features = ["vendored"] }
futures-util = { version = "*", features = [ "sink" ] }
[dev-dependencies]
tokio = { version = "1", features = ["net", "macros", "rt-multi-thread"]}
env_logger = "*"

16
cln-rpc/Makefile Normal file
View File

@ -0,0 +1,16 @@
cln-rpc-wrongdir:
$(MAKE) -C .. cln-rpc-all
CLN_RPC_EXAMPLES :=
CLN_RPC_GENALL = cln-rpc/src/model.rs
CLN_RPC_SOURCES = $(shell find cln-rpc -name *.rs) ${CLN_RPC_GENALL}
JSON_SCHEMA = doc/schemas/*.schema.json
DEFAULT_TARGETS += $(CLN_RPC_EXAMPLES) $(CLN_RPC_GENALL)
$(CLN_RPC_GENALL): $(JSON_SCHEMA)
PYTHONPATH=contrib/msggen python3 contrib/msggen/msggen/__main__.py
target/debug/examples/cln-rpc-getinfo: $(shell find cln-rpc -name *.rs)
cargo build --example cln-rpc-getinfo
cln-rpc-all: ${CLN_RPC_GEN_ALL} ${CLN_RPC_EXAMPLES}

3
cln-rpc/README.md Normal file
View File

@ -0,0 +1,3 @@
# `cln-rpc`: Talk to c-lightning

210
cln-rpc/src/codec.rs Normal file
View File

@ -0,0 +1,210 @@
//! The codec is used to encode and decode messages received from and
//! sent to the main daemon. The protocol uses `stdout` and `stdin` to
//! exchange JSON formatted messages. Each message is separated by an
//! empty line and we're guaranteed that no other empty line is
//! present in the messages.
use crate::Error;
use anyhow::anyhow;
use bytes::{BufMut, BytesMut};
use serde_json::value::Value;
use std::str::FromStr;
use std::{io, str};
use tokio_util::codec::{Decoder, Encoder};
pub use crate::jsonrpc::JsonRpc;
use crate::{
model::{Request},
notifications::Notification,
};
/// A simple codec that parses messages separated by two successive
/// `\n` newlines.
#[derive(Default)]
pub struct MultiLineCodec {}
/// Find two consecutive newlines, i.e., an empty line, signalling the
/// end of one message and the start of the next message.
fn find_separator(buf: &mut BytesMut) -> Option<usize> {
buf.iter()
.zip(buf.iter().skip(1))
.position(|b| *b.0 == b'\n' && *b.1 == b'\n')
}
fn utf8(buf: &[u8]) -> Result<&str, io::Error> {
str::from_utf8(buf)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Unable to decode input as UTF8"))
}
impl Decoder for MultiLineCodec {
type Item = String;
type Error = Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Error> {
if let Some(newline_offset) = find_separator(buf) {
let line = buf.split_to(newline_offset + 2);
let line = &line[..line.len() - 2];
let line = utf8(line)?;
Ok(Some(line.to_string()))
} else {
Ok(None)
}
}
}
impl<T> Encoder<T> for MultiLineCodec
where
T: AsRef<str>,
{
type Error = Error;
fn encode(&mut self, line: T, buf: &mut BytesMut) -> Result<(), Self::Error> {
let line = line.as_ref();
buf.reserve(line.len() + 2);
buf.put(line.as_bytes());
buf.put_u8(b'\n');
buf.put_u8(b'\n');
Ok(())
}
}
#[derive(Default)]
pub struct JsonCodec {
/// Sub-codec used to split the input into chunks that can then be
/// parsed by the JSON parser.
inner: MultiLineCodec,
}
impl<T> Encoder<T> for JsonCodec
where
T: Into<Value>,
{
type Error = Error;
fn encode(&mut self, msg: T, buf: &mut BytesMut) -> Result<(), Self::Error> {
let s = msg.into().to_string();
self.inner.encode(s, buf)
}
}
impl Decoder for JsonCodec {
type Item = Value;
type Error = Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Error> {
match self.inner.decode(buf) {
Ok(None) => Ok(None),
Err(e) => Err(e),
Ok(Some(s)) => {
if let Ok(v) = Value::from_str(&s) {
Ok(Some(v))
} else {
Err(anyhow!("failed to parse JSON"))
}
}
}
}
}
/// A codec that reads fully formed [crate::messages::JsonRpc]
/// messages. Internally it uses the [JsonCodec] which itself is built
/// on the [MultiLineCodec].
#[derive(Default)]
pub(crate) struct JsonRpcCodec {
inner: JsonCodec,
}
impl Decoder for JsonRpcCodec {
type Item = JsonRpc<Notification, Request>;
type Error = Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Error> {
match self.inner.decode(buf) {
Ok(None) => Ok(None),
Err(e) => Err(e),
Ok(Some(s)) => {
let req: Self::Item = serde_json::from_value(s)?;
Ok(Some(req))
}
}
}
}
#[cfg(test)]
mod test {
use super::{find_separator, JsonCodec, MultiLineCodec};
use bytes::{BufMut, BytesMut};
use serde_json::json;
use tokio_util::codec::{Decoder, Encoder};
#[test]
fn test_separator() {
struct Test(String, Option<usize>);
let tests = vec![
Test("".to_string(), None),
Test("}\n\n".to_string(), Some(1)),
Test("\"hello\"},\n\"world\"}\n\n".to_string(), Some(18)),
];
for t in tests.iter() {
let mut buf = BytesMut::new();
buf.put_slice(t.0.as_bytes());
assert_eq!(find_separator(&mut buf), t.1);
}
}
#[test]
fn test_ml_decoder() {
struct Test(String, Option<String>, String);
let tests = vec![
Test("".to_string(), None, "".to_string()),
Test(
"{\"hello\":\"world\"}\n\nremainder".to_string(),
Some("{\"hello\":\"world\"}".to_string()),
"remainder".to_string(),
),
Test(
"{\"hello\":\"world\"}\n\n{}\n\nremainder".to_string(),
Some("{\"hello\":\"world\"}".to_string()),
"{}\n\nremainder".to_string(),
),
];
for t in tests.iter() {
let mut buf = BytesMut::new();
buf.put_slice(t.0.as_bytes());
let mut codec = MultiLineCodec::default();
let mut remainder = BytesMut::new();
remainder.put_slice(t.2.as_bytes());
assert_eq!(codec.decode(&mut buf).unwrap(), t.1);
assert_eq!(buf, remainder);
}
}
#[test]
fn test_ml_encoder() {
let tests = vec!["test"];
for t in tests.iter() {
let mut buf = BytesMut::new();
let mut codec = MultiLineCodec::default();
let mut expected = BytesMut::new();
expected.put_slice(t.as_bytes());
expected.put_u8(b'\n');
expected.put_u8(b'\n');
codec.encode(t, &mut buf).unwrap();
assert_eq!(buf, expected);
}
}
#[test]
fn test_json_codec() {
let tests = vec![json!({"hello": "world"})];
for t in tests.iter() {
let mut codec = JsonCodec::default();
let mut buf = BytesMut::new();
codec.encode(t.clone(), &mut buf).unwrap();
let decoded = codec.decode(&mut buf).unwrap().unwrap();
assert_eq!(&decoded, t);
}
}
}

88
cln-rpc/src/jsonrpc.rs Normal file
View File

@ -0,0 +1,88 @@
//! Common structs to handle JSON-RPC decoding and encoding. They are
//! generic over the Notification and Request types.
use serde::ser::{SerializeStruct, Serializer};
use serde::de::{self, Deserializer};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt::Debug;
#[derive(Debug)]
pub enum JsonRpc<N, R> {
Request(usize, R),
Notification(N),
}
/// This function disentangles the various cases:
///
/// 1) If we have an `id` then it is a request
///
/// 2) Otherwise it's a notification that doesn't require a
/// response.
///
/// Furthermore we distinguish between the built-in types and the
/// custom user notifications/methods:
///
/// 1) We either match a built-in type above,
///
/// 2) Or it's a custom one, so we pass it around just as a
/// `serde_json::Value`
impl<'de, N, R> Deserialize<'de> for JsonRpc<N, R>
where
N: Deserialize<'de> + Debug,
R: Deserialize<'de> + Debug,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize, Debug)]
struct IdHelper {
id: Option<usize>,
}
let v = Value::deserialize(deserializer)?;
let helper = IdHelper::deserialize(&v).map_err(de::Error::custom)?;
match helper.id {
Some(id) => {
let r = R::deserialize(v).map_err(de::Error::custom)?;
Ok(JsonRpc::Request(id, r))
}
None => {
let n = N::deserialize(v).map_err(de::Error::custom)?;
Ok(JsonRpc::Notification(n))
}
}
}
}
impl<N, R> Serialize for JsonRpc<N, R>
where
N: Serialize + Debug,
R: Serialize + Debug,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self {
JsonRpc::Notification(r) => {
let r = serde_json::to_value(r).unwrap();
let mut s = serializer.serialize_struct("Notification", 3)?;
s.serialize_field("jsonrpc", "2.0")?;
s.serialize_field("method", &r["method"])?;
s.serialize_field("params", &r["params"])?;
s.end()
}
JsonRpc::Request(id, r) => {
let r = serde_json::to_value(r).unwrap();
let mut s = serializer.serialize_struct("Request", 4)?;
s.serialize_field("jsonrpc", "2.0")?;
s.serialize_field("id", id)?;
s.serialize_field("method", &r["method"])?;
s.serialize_field("params", &r["params"])?;
s.end()
}
}
}
}

108
cln-rpc/src/lib.rs Normal file
View File

@ -0,0 +1,108 @@
use crate::codec::JsonCodec;
use crate::codec::JsonRpc;
use anyhow::{Context, Error, Result};
use futures_util::sink::SinkExt;
use futures_util::StreamExt;
use log::{debug, trace};
use std::path::Path;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::UnixStream;
use tokio_util::codec::{FramedRead, FramedWrite};
pub mod codec;
pub mod jsonrpc;
pub mod model;
pub mod notifications;
pub mod primitives;
pub use crate::{
model::{Request, Response},
notifications::Notification,
};
///
pub struct ClnRpc {
next_id: AtomicUsize,
#[allow(dead_code)]
read: FramedRead<OwnedReadHalf, JsonCodec>,
write: FramedWrite<OwnedWriteHalf, JsonCodec>,
}
impl ClnRpc {
pub async fn new<P>(path: P) -> Result<ClnRpc>
where
P: AsRef<Path>,
{
debug!(
"Connecting to socket at {}",
path.as_ref().to_string_lossy()
);
ClnRpc::from_stream(UnixStream::connect(path).await?)
}
fn from_stream(stream: UnixStream) -> Result<ClnRpc> {
let (read, write) = stream.into_split();
Ok(ClnRpc {
next_id: AtomicUsize::new(1),
read: FramedRead::new(read, JsonCodec::default()),
write: FramedWrite::new(write, JsonCodec::default()),
})
}
pub async fn call(&mut self, req: Request) -> Result<Response, Error> {
trace!("Sending request {:?}", req);
// Wrap the raw request in a well-formed JSON-RPC outer dict.
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let req: JsonRpc<Notification, Request> = JsonRpc::Request(id, req);
let req = serde_json::to_value(req)?;
let req2 = req.clone();
self.write.send(req).await?;
let mut response = self
.read
.next()
.await
.context("no response from lightningd")?
.context("reading response from socket")?;
trace!("Read response {:?}", response);
// Annotate the response with the method from the request, so
// serde_json knows which variant of [`Request`] should be
// used.
response["method"] = req2["method"].clone();
serde_json::from_value(response).context("converting response into enum")
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::model::*;
use futures_util::StreamExt;
use serde_json::json;
#[tokio::test]
async fn test_call() {
let req = Request::Getinfo(requests::GetinfoRequest {});
let (uds1, uds2) = UnixStream::pair().unwrap();
let mut cln = ClnRpc::from_stream(uds1).unwrap();
let mut read = FramedRead::new(uds2, JsonCodec::default());
tokio::task::spawn(async move {
cln.call(req).await.unwrap();
});
let read_req = dbg!(read.next().await.unwrap().unwrap());
assert_eq!(
json!({"id": 1, "method": "getinfo", "params": {}, "jsonrpc": "2.0"}),
read_req
);
}
}

BIN
cln-rpc/src/model.rs Normal file

Binary file not shown.

View File

@ -0,0 +1,4 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
pub enum Notification {}

142
cln-rpc/src/primitives.rs Normal file
View File

@ -0,0 +1,142 @@
use anyhow::{anyhow, Error, Result};
use serde::{Deserialize, Serialize};
use serde::{Deserializer, Serializer};
#[derive(Copy, Clone, Serialize, Deserialize, Debug)]
#[allow(non_camel_case_types)]
pub enum ChannelState {
OPENINGD,
CHANNELD_AWAITING_LOCKIN,
CHANNELD_NORMAL,
CHANNELD_SHUTTING_DOWN,
CLOSINGD_SIGEXCHANGE,
CLOSINGD_COMPLETE,
AWAITING_UNILATERAL,
FUNDING_SPEND_SEEN,
ONCHAIN,
DUALOPEND_OPEN_INIT,
DUALOPEND_AWAITING_LOCKIN,
}
#[derive(Copy, Clone, Serialize, Deserialize, Debug)]
#[allow(non_camel_case_types)]
pub enum ChannelStateChangeCause {
UNKNOWN,
LOCAL,
USER,
REMOTE,
PROTOCOL,
ONCHAIN,
}
#[derive(Copy, Clone, Debug, PartialEq)]
pub struct Amount {
msat: u64,
}
impl Amount {
pub fn from_msat(msat: u64) -> Amount {
Amount { msat: msat }
}
pub fn from_sat(sat: u64) -> Amount {
Amount { msat: 1_000 * sat }
}
pub fn from_btc(btc: u64) -> Amount {
Amount {
msat: 100_000_000_000 * btc,
}
}
}
#[derive(Copy, Clone, Serialize, Deserialize, Debug, PartialEq)]
pub enum ChannelSide {
LOCAL,
REMOTE,
}
impl<'de> Deserialize<'de> for Amount {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
use serde::de::Error;
let s: String = Deserialize::deserialize(deserializer)?;
let ss: &str = &s;
ss.try_into()
.map_err(|_e| Error::custom("could not parse amount"))
}
}
impl Serialize for Amount {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&format!("{}msat", self.msat))
}
}
impl TryFrom<&str> for Amount {
type Error = Error;
fn try_from(s: &str) -> Result<Amount> {
let number: u64 = s
.chars()
.map(|c| c.to_digit(10))
.take_while(|opt| opt.is_some())
.fold(0, |acc, digit| acc * 10 + (digit.unwrap() as u64));
let s = s.to_lowercase();
if s.ends_with("msat") {
Ok(Amount::from_msat(number))
} else if s.ends_with("sat") {
Ok(Amount::from_sat(number))
} else if s.ends_with("btc") {
Ok(Amount::from_btc(number))
} else {
Err(anyhow!("Unable to parse amount from string: {}", s))
}
}
}
impl From<Amount> for String {
fn from(a: Amount) -> String {
format!("{}msat", a.msat)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_amount_serde() {
#[derive(Serialize, PartialEq, Debug, Deserialize)]
struct T {
amount: Amount,
}
let tests = vec![
("{\"amount\": \"10msat\"}", Amount { msat: 10 }, "10msat"),
(
"{\"amount\": \"42sat\"}",
Amount { msat: 42_000 },
"42000msat",
),
(
"{\"amount\": \"31337btc\"}",
Amount {
msat: 3_133_700_000_000_000,
},
"3133700000000000msat",
),
];
for (req, res, s) in tests.into_iter() {
println!("{:?} {:?}", req, res);
let parsed: T = serde_json::from_str(req).unwrap();
assert_eq!(res, parsed.amount);
let serialized: String = parsed.amount.into();
assert_eq!(s, serialized);
}
}
}