diff --git a/plugins/examples/cln-plugin-startup.rs b/plugins/examples/cln-plugin-startup.rs index fc3b20876..506f8fbef 100644 --- a/plugins/examples/cln-plugin-startup.rs +++ b/plugins/examples/cln-plugin-startup.rs @@ -1,9 +1,10 @@ //! This is a test plugin used to verify that we can compile and run //! plugins using the Rust API against c-lightning. - -use cln_plugin::{options, Builder}; +#[macro_use] +extern crate serde_json; +use cln_plugin::{options, Builder, Error, Plugin}; +use std::pin::Pin; use tokio; - #[tokio::main] async fn main() -> Result<(), anyhow::Error> { let plugin = Builder::new((), tokio::io::stdin(), tokio::io::stdout()) @@ -12,7 +13,12 @@ async fn main() -> Result<(), anyhow::Error> { options::Value::Integer(42), "a test-option with default 42", )) + .rpcmethod("testmethod", "This is a test", Box::new(testmethod)) .start() .await?; plugin.join().await } + +fn testmethod(_p: Plugin<()>, _v: &serde_json::Value) -> Result { + Ok(json!("Hello")) +} diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs index f53ff5b02..faf929b25 100644 --- a/plugins/src/lib.rs +++ b/plugins/src/lib.rs @@ -1,8 +1,9 @@ use crate::codec::{JsonCodec, JsonRpcCodec}; -pub use anyhow::{anyhow, Context, Error}; +pub use anyhow::{anyhow, Context}; use futures::sink::SinkExt; extern crate log; use log::trace; +use std::collections::HashMap; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::Mutex; @@ -21,11 +22,18 @@ pub mod options; use options::ConfigOption; +/// Need to tell us about something that went wrong? Use this error +/// type to do that. Use this alias to be safe from future changes in +/// our internal error handling, since we'll implement any necessary +/// conversions for you :-) +pub type Error = anyhow::Error; + /// Builder for a new plugin. pub struct Builder where I: AsyncRead + Unpin, O: Send + AsyncWrite + Unpin, + S: Clone + Send, { state: S, @@ -39,6 +47,7 @@ where subscriptions: Subscriptions, options: Vec, + rpcmethods: HashMap>, } impl Builder @@ -55,6 +64,7 @@ where hooks: Hooks::default(), subscriptions: Subscriptions::default(), options: vec![], + rpcmethods: HashMap::new(), } } @@ -63,6 +73,23 @@ where self } + pub fn rpcmethod( + mut self, + name: &str, + description: &str, + callback: Callback, + ) -> Builder { + self.rpcmethods.insert( + name.to_string(), + RpcMethod { + name: name.to_string(), + description: description.to_string(), + callback, + }, + ); + self + } + /// Build and start the plugin loop. This performs the handshake /// and spawns a new task that accepts incoming messages from /// c-lightning and dispatches them to the handlers. It only @@ -119,19 +146,31 @@ where o => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)), }; - let (tx, _) = tokio::sync::broadcast::channel(1); + let (wait_handle, _) = tokio::sync::broadcast::channel(1); + + // Collect the callbacks and create the hashmap for the dispatcher. + let mut rpcmethods = HashMap::new(); + for (name, callback) in self.rpcmethods.drain().map(|(k, v)| (k, v.callback)) { + rpcmethods.insert(name, callback); + } + + // An MPSC pair used by anything that needs to send messages + // to the main daemon. + let (sender, receiver) = tokio::sync::mpsc::channel(4); let plugin = Plugin { state: self.state, options: self.options, - wait_handle: tx, + wait_handle, + sender, }; // Start the PluginDriver to handle plugin IO tokio::spawn( PluginDriver { plugin: plugin.clone(), + rpcmethods, } - .run(input, output), + .run(receiver, input, output), ); Ok(plugin) @@ -141,9 +180,19 @@ where &mut self, _call: messages::GetManifestCall, ) -> messages::GetManifestResponse { + let rpcmethods: Vec<_> = self + .rpcmethods + .values() + .map(|v| messages::RpcMethod { + name: v.name.clone(), + description: v.description.clone(), + usage: String::new(), + }) + .collect(); + messages::GetManifestResponse { options: self.options.clone(), - rpcmethods: vec![], + rpcmethods, } } @@ -171,6 +220,20 @@ where } } +type Callback = Box, &serde_json::Value) -> Result>; + +/// A struct collecting the metadata required to register a custom +/// rpcmethod with the main daemon upon init. It'll get deconstructed +/// into just the callback after the init. +struct RpcMethod +where + S: Clone + Send, +{ + callback: Callback, + description: String, + name: String, +} + #[derive(Clone)] pub struct Plugin where @@ -182,6 +245,8 @@ where /// A signal that allows us to wait on the plugin's shutdown. wait_handle: tokio::sync::broadcast::Sender<()>, + + sender: tokio::sync::mpsc::Sender, } /// The [PluginDriver] is used to run the IO loop, reading messages @@ -195,9 +260,10 @@ where { #[allow(dead_code)] plugin: Plugin, + rpcmethods: HashMap>, } -use tokio::io::AsyncReadExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; impl PluginDriver where S: Send + Clone, @@ -205,16 +271,18 @@ where /// Run the plugin until we get a shutdown command. async fn run( self, + mut receiver: tokio::sync::mpsc::Receiver, mut input: FramedRead, - _output: Arc>>, + output: Arc>>, ) -> Result<(), Error> where I: Send + AsyncReadExt + Unpin, - O: Send, + O: Send + AsyncWriteExt + Unpin, { loop { tokio::select! { - _ = PluginDriver::dispatch_one(&mut input, &self.plugin) => {}, + _ = self.dispatch_one(&mut input, &self.plugin) => {}, + v = receiver.recv() => {output.lock().await.send(v.unwrap()).await?}, } } } @@ -222,6 +290,7 @@ where /// Dispatch one server-side event and then return. Just so we /// have a nicer looking `select` statement in `run` :-) async fn dispatch_one( + &self, input: &mut FramedRead, plugin: &Plugin, ) -> Result<(), Error> @@ -238,6 +307,31 @@ where messages::JsonRpc::Notification(n) => { PluginDriver::::dispatch_notification(n, plugin).await } + messages::JsonRpc::CustomRequest(id, p) => { + match self.dispatch_custom_request(id, p, plugin).await { + Ok(v) => plugin + .sender + .send(json!({ + "jsonrpc": "2.0", + "id": id, + "result": v + })) + .await + .context("returning custom result"), + Err(e) => plugin + .sender + .send(json!({ + "jsonrpc": "2.0", + "id": id, + "error": e.to_string(), + })) + .await + .context("returning custom error"), + } + } + messages::JsonRpc::CustomNotification(n) => { + PluginDriver::::dispatch_custom_notification(n, plugin).await + } } } Some(Err(e)) => Err(anyhow!("Error reading command: {}", e)), @@ -263,6 +357,44 @@ where trace!("Dispatching notification {:?}", notification); unimplemented!() } + async fn dispatch_custom_request( + &self, + _id: usize, + request: serde_json::Value, + plugin: &Plugin, + ) -> Result { + let method = request + .get("method") + .context("Missing 'method' in request")? + .as_str() + .context("'method' is not a string")?; + + let params = request + .get("params") + .context("Missing 'params' field in request")?; + let callback = self + .rpcmethods + .get(method) + .with_context(|| anyhow!("No handler for method '{}' registered", method))?; + + trace!( + "Dispatching custom request: method={}, params={}", + method, + params + ); + callback(plugin.clone(), params) + } + + async fn dispatch_custom_notification( + notification: serde_json::Value, + _plugin: &Plugin, + ) -> Result<(), Error> + where + S: Send + Clone, + { + trace!("Dispatching notification {:?}", notification); + unimplemented!() + } } impl Plugin diff --git a/plugins/src/messages.rs b/plugins/src/messages.rs index b966446dc..f32cb2086 100644 --- a/plugins/src/messages.rs +++ b/plugins/src/messages.rs @@ -70,6 +70,8 @@ pub(crate) struct InitCall { pub enum JsonRpc { Request(usize, R), Notification(N), + CustomRequest(usize, Value), + CustomNotification(Value), } /// This function disentangles the various cases: @@ -103,55 +105,29 @@ where let v = Value::deserialize(deserializer)?; let helper = IdHelper::deserialize(&v).map_err(de::Error::custom)?; match helper.id { - Some(id) => { - let r = R::deserialize(v).map_err(de::Error::custom)?; - Ok(JsonRpc::Request(id, r)) - } - None => { - let n = N::deserialize(v).map_err(de::Error::custom)?; - Ok(JsonRpc::Notification(n)) - } - } - } -} - -use serde::ser::{SerializeStruct, Serializer}; - -impl Serialize for JsonRpc -where - N: Serialize + Debug, - R: Serialize + Debug, -{ - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - match self { - JsonRpc::Notification(r) => { - let r = serde_json::to_value(r).unwrap(); - let mut s = serializer.serialize_struct("Notification", 3)?; - s.serialize_field("jsonrpc", "2.0")?; - s.serialize_field("method", &r["method"])?; - s.serialize_field("params", &r["params"])?; - s.end() - } - JsonRpc::Request(id, r) => { - let r = serde_json::to_value(r).unwrap(); - let mut s = serializer.serialize_struct("Request", 4)?; - s.serialize_field("jsonrpc", "2.0")?; - s.serialize_field("id", id)?; - s.serialize_field("method", &r["method"])?; - s.serialize_field("params", &r["params"])?; - s.end() - } + Some(id) => match R::deserialize(v.clone()) { + Ok(r) => Ok(JsonRpc::Request(id, r)), + Err(_) => Ok(JsonRpc::CustomRequest(id, v)), + }, + None => match N::deserialize(v.clone()) { + Ok(n) => Ok(JsonRpc::Notification(n)), + Err(_) => Ok(JsonRpc::CustomNotification(v)), + }, } } } +#[derive(Serialize, Default, Debug)] +pub(crate) struct RpcMethod { + pub(crate) name: String, + pub(crate) description: String, + pub(crate) usage: String, +} + #[derive(Serialize, Default, Debug)] pub(crate) struct GetManifestResponse { pub(crate) options: Vec, - pub(crate) rpcmethods: Vec<()>, + pub(crate) rpcmethods: Vec, } #[derive(Serialize, Default, Debug)] diff --git a/plugins/src/options.rs b/plugins/src/options.rs index 7ac6f3d7f..587686c24 100644 --- a/plugins/src/options.rs +++ b/plugins/src/options.rs @@ -109,7 +109,7 @@ mod test { "name": "name", "description":"description", "default": true, - "type": "booltes", + "type": "bool", }), ), ]; diff --git a/tests/test_cln_rs.py b/tests/test_cln_rs.py index 08dee44c9..c336c294d 100644 --- a/tests/test_cln_rs.py +++ b/tests/test_cln_rs.py @@ -37,3 +37,18 @@ def test_plugin_start(node_factory): 'path': None } assert expected == p + + # Now check that the `testmethod was registered ok + l1.rpc.help("testmethod") == { + 'help': [ + { + 'command': 'testmethod ', + 'category': 'plugin', + 'description': 'This is a test', + 'verbose': 'This is a test' + } + ], + 'format-hint': 'simple' + } + + assert l1.rpc.testmethod() == "Hello"