diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 000000000..e670e44ec --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,4 @@ +[workspace] +members = [ + "cln-rpc", +] diff --git a/Makefile b/Makefile index 5f88fe01c..618cc2d67 100644 --- a/Makefile +++ b/Makefile @@ -355,7 +355,7 @@ ifneq ($(FUZZING),0) include tests/fuzz/Makefile endif ifneq ($(RUST),0) -# Add Rust Makefiles here + include cln-rpc/Makefile endif # We make pretty much everything depend on these. diff --git a/cln-rpc/Cargo.toml b/cln-rpc/Cargo.toml new file mode 100644 index 000000000..4145d20a9 --- /dev/null +++ b/cln-rpc/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "cln-rpc" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1.0.51" +bytes = "1.1.0" +log = "0.4.14" +serde = { version = "1.0.131", features = ["derive"] } +serde_json = "1.0.72" +tokio-util = { version = "0.6.9", features = ["codec"] } +tokio = { version = "1", features = ["net"]} +native-tls = { version = "*", features = ["vendored"] } +futures-util = { version = "*", features = [ "sink" ] } + +[dev-dependencies] +tokio = { version = "1", features = ["net", "macros", "rt-multi-thread"]} +env_logger = "*" diff --git a/cln-rpc/Makefile b/cln-rpc/Makefile new file mode 100644 index 000000000..a1812982c --- /dev/null +++ b/cln-rpc/Makefile @@ -0,0 +1,16 @@ +cln-rpc-wrongdir: + $(MAKE) -C .. cln-rpc-all + +CLN_RPC_EXAMPLES := +CLN_RPC_GENALL = cln-rpc/src/model.rs +CLN_RPC_SOURCES = $(shell find cln-rpc -name *.rs) ${CLN_RPC_GENALL} +JSON_SCHEMA = doc/schemas/*.schema.json +DEFAULT_TARGETS += $(CLN_RPC_EXAMPLES) $(CLN_RPC_GENALL) + +$(CLN_RPC_GENALL): $(JSON_SCHEMA) + PYTHONPATH=contrib/msggen python3 contrib/msggen/msggen/__main__.py + +target/debug/examples/cln-rpc-getinfo: $(shell find cln-rpc -name *.rs) + cargo build --example cln-rpc-getinfo + +cln-rpc-all: ${CLN_RPC_GEN_ALL} ${CLN_RPC_EXAMPLES} diff --git a/cln-rpc/README.md b/cln-rpc/README.md new file mode 100644 index 000000000..d637e040c --- /dev/null +++ b/cln-rpc/README.md @@ -0,0 +1,3 @@ +# `cln-rpc`: Talk to c-lightning + + diff --git a/cln-rpc/src/codec.rs b/cln-rpc/src/codec.rs new file mode 100644 index 000000000..1e8adbbf3 --- /dev/null +++ b/cln-rpc/src/codec.rs @@ -0,0 +1,210 @@ +//! The codec is used to encode and decode messages received from and +//! sent to the main daemon. The protocol uses `stdout` and `stdin` to +//! exchange JSON formatted messages. Each message is separated by an +//! empty line and we're guaranteed that no other empty line is +//! present in the messages. +use crate::Error; +use anyhow::anyhow; +use bytes::{BufMut, BytesMut}; +use serde_json::value::Value; +use std::str::FromStr; +use std::{io, str}; +use tokio_util::codec::{Decoder, Encoder}; + +pub use crate::jsonrpc::JsonRpc; +use crate::{ + model::{Request}, + notifications::Notification, +}; + +/// A simple codec that parses messages separated by two successive +/// `\n` newlines. +#[derive(Default)] +pub struct MultiLineCodec {} + +/// Find two consecutive newlines, i.e., an empty line, signalling the +/// end of one message and the start of the next message. +fn find_separator(buf: &mut BytesMut) -> Option { + buf.iter() + .zip(buf.iter().skip(1)) + .position(|b| *b.0 == b'\n' && *b.1 == b'\n') +} + +fn utf8(buf: &[u8]) -> Result<&str, io::Error> { + str::from_utf8(buf) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Unable to decode input as UTF8")) +} + +impl Decoder for MultiLineCodec { + type Item = String; + type Error = Error; + fn decode(&mut self, buf: &mut BytesMut) -> Result, Error> { + if let Some(newline_offset) = find_separator(buf) { + let line = buf.split_to(newline_offset + 2); + let line = &line[..line.len() - 2]; + let line = utf8(line)?; + Ok(Some(line.to_string())) + } else { + Ok(None) + } + } +} + +impl Encoder for MultiLineCodec +where + T: AsRef, +{ + type Error = Error; + fn encode(&mut self, line: T, buf: &mut BytesMut) -> Result<(), Self::Error> { + let line = line.as_ref(); + buf.reserve(line.len() + 2); + buf.put(line.as_bytes()); + buf.put_u8(b'\n'); + buf.put_u8(b'\n'); + Ok(()) + } +} + +#[derive(Default)] +pub struct JsonCodec { + /// Sub-codec used to split the input into chunks that can then be + /// parsed by the JSON parser. + inner: MultiLineCodec, +} + +impl Encoder for JsonCodec +where + T: Into, +{ + type Error = Error; + fn encode(&mut self, msg: T, buf: &mut BytesMut) -> Result<(), Self::Error> { + let s = msg.into().to_string(); + self.inner.encode(s, buf) + } +} + +impl Decoder for JsonCodec { + type Item = Value; + type Error = Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result, Error> { + match self.inner.decode(buf) { + Ok(None) => Ok(None), + Err(e) => Err(e), + Ok(Some(s)) => { + if let Ok(v) = Value::from_str(&s) { + Ok(Some(v)) + } else { + Err(anyhow!("failed to parse JSON")) + } + } + } + } +} + +/// A codec that reads fully formed [crate::messages::JsonRpc] +/// messages. Internally it uses the [JsonCodec] which itself is built +/// on the [MultiLineCodec]. +#[derive(Default)] +pub(crate) struct JsonRpcCodec { + inner: JsonCodec, +} + +impl Decoder for JsonRpcCodec { + type Item = JsonRpc; + type Error = Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result, Error> { + match self.inner.decode(buf) { + Ok(None) => Ok(None), + Err(e) => Err(e), + Ok(Some(s)) => { + let req: Self::Item = serde_json::from_value(s)?; + Ok(Some(req)) + } + } + } +} + +#[cfg(test)] +mod test { + use super::{find_separator, JsonCodec, MultiLineCodec}; + use bytes::{BufMut, BytesMut}; + use serde_json::json; + use tokio_util::codec::{Decoder, Encoder}; + + #[test] + fn test_separator() { + struct Test(String, Option); + let tests = vec![ + Test("".to_string(), None), + Test("}\n\n".to_string(), Some(1)), + Test("\"hello\"},\n\"world\"}\n\n".to_string(), Some(18)), + ]; + + for t in tests.iter() { + let mut buf = BytesMut::new(); + buf.put_slice(t.0.as_bytes()); + assert_eq!(find_separator(&mut buf), t.1); + } + } + + #[test] + fn test_ml_decoder() { + struct Test(String, Option, String); + let tests = vec![ + Test("".to_string(), None, "".to_string()), + Test( + "{\"hello\":\"world\"}\n\nremainder".to_string(), + Some("{\"hello\":\"world\"}".to_string()), + "remainder".to_string(), + ), + Test( + "{\"hello\":\"world\"}\n\n{}\n\nremainder".to_string(), + Some("{\"hello\":\"world\"}".to_string()), + "{}\n\nremainder".to_string(), + ), + ]; + + for t in tests.iter() { + let mut buf = BytesMut::new(); + buf.put_slice(t.0.as_bytes()); + + let mut codec = MultiLineCodec::default(); + let mut remainder = BytesMut::new(); + remainder.put_slice(t.2.as_bytes()); + + assert_eq!(codec.decode(&mut buf).unwrap(), t.1); + assert_eq!(buf, remainder); + } + } + + #[test] + fn test_ml_encoder() { + let tests = vec!["test"]; + + for t in tests.iter() { + let mut buf = BytesMut::new(); + let mut codec = MultiLineCodec::default(); + let mut expected = BytesMut::new(); + expected.put_slice(t.as_bytes()); + expected.put_u8(b'\n'); + expected.put_u8(b'\n'); + codec.encode(t, &mut buf).unwrap(); + assert_eq!(buf, expected); + } + } + + #[test] + fn test_json_codec() { + let tests = vec![json!({"hello": "world"})]; + + for t in tests.iter() { + let mut codec = JsonCodec::default(); + let mut buf = BytesMut::new(); + codec.encode(t.clone(), &mut buf).unwrap(); + let decoded = codec.decode(&mut buf).unwrap().unwrap(); + assert_eq!(&decoded, t); + } + } +} diff --git a/cln-rpc/src/jsonrpc.rs b/cln-rpc/src/jsonrpc.rs new file mode 100644 index 000000000..99a5469b1 --- /dev/null +++ b/cln-rpc/src/jsonrpc.rs @@ -0,0 +1,88 @@ +//! Common structs to handle JSON-RPC decoding and encoding. They are +//! generic over the Notification and Request types. + +use serde::ser::{SerializeStruct, Serializer}; +use serde::de::{self, Deserializer}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::fmt::Debug; + +#[derive(Debug)] +pub enum JsonRpc { + Request(usize, R), + Notification(N), +} + +/// This function disentangles the various cases: +/// +/// 1) If we have an `id` then it is a request +/// +/// 2) Otherwise it's a notification that doesn't require a +/// response. +/// +/// Furthermore we distinguish between the built-in types and the +/// custom user notifications/methods: +/// +/// 1) We either match a built-in type above, +/// +/// 2) Or it's a custom one, so we pass it around just as a +/// `serde_json::Value` +impl<'de, N, R> Deserialize<'de> for JsonRpc +where + N: Deserialize<'de> + Debug, + R: Deserialize<'de> + Debug, +{ + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize, Debug)] + struct IdHelper { + id: Option, + } + + let v = Value::deserialize(deserializer)?; + let helper = IdHelper::deserialize(&v).map_err(de::Error::custom)?; + match helper.id { + Some(id) => { + let r = R::deserialize(v).map_err(de::Error::custom)?; + Ok(JsonRpc::Request(id, r)) + } + None => { + let n = N::deserialize(v).map_err(de::Error::custom)?; + Ok(JsonRpc::Notification(n)) + } + } + } +} + +impl Serialize for JsonRpc +where + N: Serialize + Debug, + R: Serialize + Debug, +{ + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + JsonRpc::Notification(r) => { + let r = serde_json::to_value(r).unwrap(); + let mut s = serializer.serialize_struct("Notification", 3)?; + s.serialize_field("jsonrpc", "2.0")?; + s.serialize_field("method", &r["method"])?; + s.serialize_field("params", &r["params"])?; + s.end() + } + JsonRpc::Request(id, r) => { + let r = serde_json::to_value(r).unwrap(); + let mut s = serializer.serialize_struct("Request", 4)?; + s.serialize_field("jsonrpc", "2.0")?; + s.serialize_field("id", id)?; + s.serialize_field("method", &r["method"])?; + s.serialize_field("params", &r["params"])?; + s.end() + } + } + } +} diff --git a/cln-rpc/src/lib.rs b/cln-rpc/src/lib.rs new file mode 100644 index 000000000..0197d4b39 --- /dev/null +++ b/cln-rpc/src/lib.rs @@ -0,0 +1,108 @@ +use crate::codec::JsonCodec; +use crate::codec::JsonRpc; +use anyhow::{Context, Error, Result}; +use futures_util::sink::SinkExt; +use futures_util::StreamExt; +use log::{debug, trace}; +use std::path::Path; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf}; +use tokio::net::UnixStream; +use tokio_util::codec::{FramedRead, FramedWrite}; + +pub mod codec; +pub mod jsonrpc; +pub mod model; +pub mod notifications; +pub mod primitives; + +pub use crate::{ + model::{Request, Response}, + notifications::Notification, +}; + +/// +pub struct ClnRpc { + next_id: AtomicUsize, + + #[allow(dead_code)] + read: FramedRead, + write: FramedWrite, +} + +impl ClnRpc { + pub async fn new

(path: P) -> Result + where + P: AsRef, + { + debug!( + "Connecting to socket at {}", + path.as_ref().to_string_lossy() + ); + ClnRpc::from_stream(UnixStream::connect(path).await?) + } + + fn from_stream(stream: UnixStream) -> Result { + let (read, write) = stream.into_split(); + + Ok(ClnRpc { + next_id: AtomicUsize::new(1), + read: FramedRead::new(read, JsonCodec::default()), + write: FramedWrite::new(write, JsonCodec::default()), + }) + } + + pub async fn call(&mut self, req: Request) -> Result { + trace!("Sending request {:?}", req); + + // Wrap the raw request in a well-formed JSON-RPC outer dict. + let id = self.next_id.fetch_add(1, Ordering::SeqCst); + let req: JsonRpc = JsonRpc::Request(id, req); + let req = serde_json::to_value(req)?; + let req2 = req.clone(); + self.write.send(req).await?; + + let mut response = self + .read + .next() + .await + .context("no response from lightningd")? + .context("reading response from socket")?; + trace!("Read response {:?}", response); + + // Annotate the response with the method from the request, so + // serde_json knows which variant of [`Request`] should be + // used. + response["method"] = req2["method"].clone(); + + serde_json::from_value(response).context("converting response into enum") + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::model::*; + use futures_util::StreamExt; + use serde_json::json; + + #[tokio::test] + async fn test_call() { + let req = Request::Getinfo(requests::GetinfoRequest {}); + let (uds1, uds2) = UnixStream::pair().unwrap(); + let mut cln = ClnRpc::from_stream(uds1).unwrap(); + + let mut read = FramedRead::new(uds2, JsonCodec::default()); + tokio::task::spawn(async move { + cln.call(req).await.unwrap(); + }); + + let read_req = dbg!(read.next().await.unwrap().unwrap()); + + assert_eq!( + json!({"id": 1, "method": "getinfo", "params": {}, "jsonrpc": "2.0"}), + read_req + ); + } +} diff --git a/cln-rpc/src/model.rs b/cln-rpc/src/model.rs new file mode 100644 index 000000000..ebc697023 Binary files /dev/null and b/cln-rpc/src/model.rs differ diff --git a/cln-rpc/src/notifications.rs b/cln-rpc/src/notifications.rs new file mode 100644 index 000000000..4256350f9 --- /dev/null +++ b/cln-rpc/src/notifications.rs @@ -0,0 +1,4 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize)] +pub enum Notification {} diff --git a/cln-rpc/src/primitives.rs b/cln-rpc/src/primitives.rs new file mode 100644 index 000000000..0c1faa31a --- /dev/null +++ b/cln-rpc/src/primitives.rs @@ -0,0 +1,142 @@ +use anyhow::{anyhow, Error, Result}; +use serde::{Deserialize, Serialize}; +use serde::{Deserializer, Serializer}; +#[derive(Copy, Clone, Serialize, Deserialize, Debug)] +#[allow(non_camel_case_types)] +pub enum ChannelState { + OPENINGD, + CHANNELD_AWAITING_LOCKIN, + CHANNELD_NORMAL, + CHANNELD_SHUTTING_DOWN, + CLOSINGD_SIGEXCHANGE, + CLOSINGD_COMPLETE, + AWAITING_UNILATERAL, + FUNDING_SPEND_SEEN, + ONCHAIN, + DUALOPEND_OPEN_INIT, + DUALOPEND_AWAITING_LOCKIN, +} + +#[derive(Copy, Clone, Serialize, Deserialize, Debug)] +#[allow(non_camel_case_types)] +pub enum ChannelStateChangeCause { + UNKNOWN, + LOCAL, + USER, + REMOTE, + PROTOCOL, + ONCHAIN, +} + +#[derive(Copy, Clone, Debug, PartialEq)] +pub struct Amount { + msat: u64, +} + +impl Amount { + pub fn from_msat(msat: u64) -> Amount { + Amount { msat: msat } + } + pub fn from_sat(sat: u64) -> Amount { + Amount { msat: 1_000 * sat } + } + pub fn from_btc(btc: u64) -> Amount { + Amount { + msat: 100_000_000_000 * btc, + } + } +} + +#[derive(Copy, Clone, Serialize, Deserialize, Debug, PartialEq)] +pub enum ChannelSide { + LOCAL, + REMOTE, +} + +impl<'de> Deserialize<'de> for Amount { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + use serde::de::Error; + let s: String = Deserialize::deserialize(deserializer)?; + let ss: &str = &s; + ss.try_into() + .map_err(|_e| Error::custom("could not parse amount")) + } +} + +impl Serialize for Amount { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&format!("{}msat", self.msat)) + } +} + +impl TryFrom<&str> for Amount { + type Error = Error; + fn try_from(s: &str) -> Result { + let number: u64 = s + .chars() + .map(|c| c.to_digit(10)) + .take_while(|opt| opt.is_some()) + .fold(0, |acc, digit| acc * 10 + (digit.unwrap() as u64)); + + let s = s.to_lowercase(); + if s.ends_with("msat") { + Ok(Amount::from_msat(number)) + } else if s.ends_with("sat") { + Ok(Amount::from_sat(number)) + } else if s.ends_with("btc") { + Ok(Amount::from_btc(number)) + } else { + Err(anyhow!("Unable to parse amount from string: {}", s)) + } + } +} + +impl From for String { + fn from(a: Amount) -> String { + format!("{}msat", a.msat) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_amount_serde() { + #[derive(Serialize, PartialEq, Debug, Deserialize)] + struct T { + amount: Amount, + } + + let tests = vec![ + ("{\"amount\": \"10msat\"}", Amount { msat: 10 }, "10msat"), + ( + "{\"amount\": \"42sat\"}", + Amount { msat: 42_000 }, + "42000msat", + ), + ( + "{\"amount\": \"31337btc\"}", + Amount { + msat: 3_133_700_000_000_000, + }, + "3133700000000000msat", + ), + ]; + + for (req, res, s) in tests.into_iter() { + println!("{:?} {:?}", req, res); + let parsed: T = serde_json::from_str(req).unwrap(); + assert_eq!(res, parsed.amount); + + let serialized: String = parsed.amount.into(); + assert_eq!(s, serialized); + } + } +}