diff --git a/plugins/examples/cln-plugin-startup.rs b/plugins/examples/cln-plugin-startup.rs index df71ae39c..fc3b20876 100644 --- a/plugins/examples/cln-plugin-startup.rs +++ b/plugins/examples/cln-plugin-startup.rs @@ -6,17 +6,13 @@ use tokio; #[tokio::main] async fn main() -> Result<(), anyhow::Error> { - let (plugin, stdin) = Builder::new((), tokio::io::stdin(), tokio::io::stdout()) + let plugin = Builder::new((), tokio::io::stdin(), tokio::io::stdout()) .option(options::ConfigOption::new( "test-option", options::Value::Integer(42), "a test-option with default 42", )) - .build(); - - tokio::spawn(async { - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; - log::info!("Hello world"); - }); - plugin.run(stdin).await + .start() + .await?; + plugin.join().await } diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs index 2d6382454..f53ff5b02 100644 --- a/plugins/src/lib.rs +++ b/plugins/src/lib.rs @@ -1,9 +1,8 @@ use crate::codec::{JsonCodec, JsonRpcCodec}; -pub use anyhow::Error; +pub use anyhow::{anyhow, Context, Error}; use futures::sink::SinkExt; extern crate log; -use log::{trace, warn}; -use std::marker::PhantomData; +use log::trace; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::Mutex; @@ -25,14 +24,13 @@ use options::ConfigOption; /// Builder for a new plugin. pub struct Builder where - S: Clone + Send, I: AsyncRead + Unpin, O: Send + AsyncWrite + Unpin, { state: S, - input: I, - output: O, + input: Option, + output: Option, #[allow(dead_code)] hooks: Hooks, @@ -46,14 +44,14 @@ where impl Builder where O: Send + AsyncWrite + Unpin + 'static, - S: Clone + Send + 'static, + S: Clone + Sync + Send + Clone + 'static, I: AsyncRead + Send + Unpin + 'static, { pub fn new(state: S, input: I, output: O) -> Self { Self { state, - input, - output, + input: Some(input), + output: Some(output), hooks: Hooks::default(), subscriptions: Subscriptions::default(), options: vec![], @@ -65,147 +63,96 @@ where self } - pub fn build(self) -> (Plugin, I) { + /// Build and start the plugin loop. This performs the handshake + /// and spawns a new task that accepts incoming messages from + /// c-lightning and dispatches them to the handlers. It only + /// returns after completing the handshake to ensure that the + /// configuration and initialization was successfull. + pub async fn start(mut self) -> Result, anyhow::Error> { + let mut input = FramedRead::new(self.input.take().unwrap(), JsonRpcCodec::default()); + + // Sadly we need to wrap the output in a mutex in order to + // enable early logging, i.e., logging that is done before the + // PluginDriver is processing events during the + // handshake. Otherwise we could just write the log events to + // the event queue and have the PluginDriver be the sole owner + // of `Stdout`. let output = Arc::new(Mutex::new(FramedWrite::new( - self.output, + self.output.take().unwrap(), JsonCodec::default(), ))); // Now configure the logging, so any `log` call is wrapped // in a JSON-RPC notification and sent to c-lightning - tokio::spawn(async move {}); - ( - Plugin { - state: Arc::new(Mutex::new(self.state)), - output, - input_type: PhantomData, - options: self.options, - }, - self.input, - ) - } -} - -pub struct Plugin -where - S: Clone + Send, - I: AsyncRead, - O: Send + AsyncWrite + 'static, -{ - //input: FramedRead, - output: Arc>>, - - /// The state gets cloned for each request - state: Arc>, - input_type: PhantomData, - options: Vec, -} - -impl Plugin -where - S: Clone + Send, - I: AsyncRead + Send + Unpin, - O: Send + AsyncWrite + Unpin, -{ - pub fn options(&self) -> Vec { - self.options.clone() - } -} - -impl Plugin -where - S: Clone + Send, - I: AsyncRead + Send + Unpin, - O: Send + AsyncWrite + Unpin + 'static, -{ - /// Read incoming requests from `c-lightning and dispatch their handling. - #[allow(unused_mut)] - pub async fn run(mut self, input: I) -> Result<(), Error> { - crate::logging::init(self.output.clone()).await?; + crate::logging::init(output.clone()).await?; trace!("Plugin logging initialized"); - let mut input = FramedRead::new(input, JsonRpcCodec::default()); - loop { - match input.next().await { - Some(Ok(msg)) => { - trace!("Received a message: {:?}", msg); - match msg { - messages::JsonRpc::Request(id, p) => { - self.dispatch_request(id, p).await? - // Use a match to detect Ok / Error and return an error if we failed. - } - messages::JsonRpc::Notification(n) => self.dispatch_notification(n).await?, - } - } - Some(Err(e)) => { - warn!("Error reading command: {}", e); - break; - } - None => break, + // Read the `getmanifest` message: + match input.next().await { + Some(Ok(messages::JsonRpc::Request(id, messages::Request::Getmanifest(m)))) => { + output + .lock() + .await + .send(json!({ + "jsonrpc": "2.0", + "result": self.handle_get_manifest(m), + "id": id, + })) + .await? } - } - Ok(()) - } - - async fn dispatch_request( - &mut self, - id: usize, - request: messages::Request, - ) -> Result<(), Error> { - trace!("Dispatching request {:?}", request); - let state = self.state.clone(); - let res: serde_json::Value = match request { - messages::Request::Getmanifest(c) => { - serde_json::to_value(self.handle_get_manifest(c, state).await?).unwrap() - } - messages::Request::Init(c) => { - serde_json::to_value(self.handle_init(c, state).await?).unwrap() - } - o => panic!("Request {:?} is currently unhandled", o), + o => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)), }; - trace!("Sending respone {:?}", res); - let mut out = self.output.lock().await; - out.send(json!({ - "jsonrpc": "2.0", - "result": res, - "id": id, - })) - .await - .unwrap(); - Ok(()) + match input.next().await { + Some(Ok(messages::JsonRpc::Request(id, messages::Request::Init(m)))) => { + output + .lock() + .await + .send(json!({ + "jsonrpc": "2.0", + "result": self.handle_init(m)?, + "id": id, + })) + .await? + } + + o => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)), + }; + + let (tx, _) = tokio::sync::broadcast::channel(1); + let plugin = Plugin { + state: self.state, + options: self.options, + wait_handle: tx, + }; + + // Start the PluginDriver to handle plugin IO + tokio::spawn( + PluginDriver { + plugin: plugin.clone(), + } + .run(input, output), + ); + + Ok(plugin) } - async fn dispatch_notification( - &mut self, - notification: messages::Notification, - ) -> Result<(), Error> { - trace!("Dispatching notification {:?}", notification); - unimplemented!() - } - - async fn handle_get_manifest( + fn handle_get_manifest( &mut self, _call: messages::GetManifestCall, - _state: Arc>, - ) -> Result { - Ok(messages::GetManifestResponse { + ) -> messages::GetManifestResponse { + messages::GetManifestResponse { options: self.options.clone(), rpcmethods: vec![], - }) + } } - async fn handle_init( - &mut self, - call: messages::InitCall, - _state: Arc>, - ) -> Result { + fn handle_init(&mut self, call: messages::InitCall) -> Result { use options::Value as OValue; use serde_json::Value as JValue; // Match up the ConfigOptions and fill in their values if we // have a matching entry. - for opt in self.options.iter_mut() { if let Some(val) = call.options.get(opt.name()) { opt.value = Some(match (opt.default(), &val) { @@ -224,6 +171,125 @@ where } } +#[derive(Clone)] +pub struct Plugin +where + S: Clone + Send, +{ + /// The state gets cloned for each request + state: S, + options: Vec, + + /// A signal that allows us to wait on the plugin's shutdown. + wait_handle: tokio::sync::broadcast::Sender<()>, +} + +/// The [PluginDriver] is used to run the IO loop, reading messages +/// from the Lightning daemon, dispatching calls and notifications to +/// the plugin, and returning responses to the the daemon. We also use +/// it to handle spontaneous messages like Notifications and logging +/// events. +struct PluginDriver +where + S: Send + Clone, +{ + #[allow(dead_code)] + plugin: Plugin, +} + +use tokio::io::AsyncReadExt; +impl PluginDriver +where + S: Send + Clone, +{ + /// Run the plugin until we get a shutdown command. + async fn run( + self, + mut input: FramedRead, + _output: Arc>>, + ) -> Result<(), Error> + where + I: Send + AsyncReadExt + Unpin, + O: Send, + { + loop { + tokio::select! { + _ = PluginDriver::dispatch_one(&mut input, &self.plugin) => {}, + } + } + } + + /// Dispatch one server-side event and then return. Just so we + /// have a nicer looking `select` statement in `run` :-) + async fn dispatch_one( + input: &mut FramedRead, + plugin: &Plugin, + ) -> Result<(), Error> + where + I: Send + AsyncReadExt + Unpin, + { + match input.next().await { + Some(Ok(msg)) => { + trace!("Received a message: {:?}", msg); + match msg { + messages::JsonRpc::Request(id, p) => { + PluginDriver::::dispatch_request(id, p, plugin).await + } + messages::JsonRpc::Notification(n) => { + PluginDriver::::dispatch_notification(n, plugin).await + } + } + } + Some(Err(e)) => Err(anyhow!("Error reading command: {}", e)), + None => Ok(()), + } + } + + async fn dispatch_request( + id: usize, + request: messages::Request, + _plugin: &Plugin, + ) -> Result<(), Error> { + panic!("Unexpected request {:?} with id {}", request, id); + } + + async fn dispatch_notification( + notification: messages::Notification, + _plugin: &Plugin, + ) -> Result<(), Error> + where + S: Send + Clone, + { + trace!("Dispatching notification {:?}", notification); + unimplemented!() + } +} + +impl Plugin +where + S: Clone + Send, +{ + pub fn options(&self) -> Vec { + self.options.clone() + } + pub fn state(&self) -> &S { + &self.state + } +} + +impl Plugin +where + S: Send + Clone, +{ + pub async fn join(&self) -> Result<(), Error> { + self.wait_handle + .subscribe() + .recv() + .await + .context("error waiting for shutdown") + } +} + /// A container for all the configure hooks. It is just a collection /// of callbacks that can be registered by the users of the /// library. Based on this configuration we can then generate the @@ -239,9 +305,9 @@ struct Subscriptions {} mod test { use super::*; - #[test] - fn init() { + #[tokio::test] + async fn init() { let builder = Builder::new((), tokio::io::stdin(), tokio::io::stdout()); - builder.build(); + let _ = builder.start(); } } diff --git a/plugins/src/options.rs b/plugins/src/options.rs index 04e11509f..7ac6f3d7f 100644 --- a/plugins/src/options.rs +++ b/plugins/src/options.rs @@ -104,14 +104,14 @@ mod test { }), ), ( - ConfigOption::new("name", Value::Boolean(true), "description" + ConfigOption::new("name", Value::Boolean(true), "description"), json!({ "name": "name", "description":"description", "default": true, "type": "booltes", }), - )), + ), ]; for (input, expected) in tests.iter() {