From 8898511cf6b6612bb15127bc7c8e54aa551517ce Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Wed, 20 Jul 2022 16:56:27 +0200 Subject: [PATCH] cln-plugin: Defer binding the plugin state until after configuring We had a bit of a chicken-and-egg problem, where we instantiated the `state` to be managed by the `Plugin` during the very first step when creating the `Builder`, but then the state might depend on the configuration we only get later. This would force developers to add placeholders in the form of `Option` into the state, when really they'd never be none after configuring. This defers the binding until after we get the configuration and cleans up the semantics: - `Builder`: declare options, hooks, etc - `ConfiguredPlugin`: we have exchanged the handshake with `lightningd`, now we can construct the `state` accordingly - `Plugin`: Running instance of the plugin Changelog-Changed: cln-plugin: Moved the state binding to the plugin until after the configuration step --- cln-grpc/src/test.rs | 3 +- plugins/examples/cln-plugin-startup.rs | 6 +- plugins/grpc-plugin/src/main.rs | 16 +- plugins/src/lib.rs | 200 ++++++++++++------------- 4 files changed, 114 insertions(+), 111 deletions(-) diff --git a/cln-grpc/src/test.rs b/cln-grpc/src/test.rs index 4ed4d71b9..78f49cdc5 100644 --- a/cln-grpc/src/test.rs +++ b/cln-grpc/src/test.rs @@ -248,7 +248,8 @@ fn test_keysend() { "035d2b1192dfba134e10e540875d366ebc8bc353d5aa766b80c090b39c3a5d885d", ) .unwrap(), - msatoshi: Some(Amount { msat: 10000 }), + amount_msat: Some(Amount { msat: 10000 }), + label: Some("hello".to_string()), exemptfee: None, maxdelay: None, diff --git a/plugins/examples/cln-plugin-startup.rs b/plugins/examples/cln-plugin-startup.rs index ff152235f..3fdd6bdcf 100644 --- a/plugins/examples/cln-plugin-startup.rs +++ b/plugins/examples/cln-plugin-startup.rs @@ -6,7 +6,9 @@ use cln_plugin::{options, Builder, Error, Plugin}; use tokio; #[tokio::main] async fn main() -> Result<(), anyhow::Error> { - if let Some(plugin) = Builder::new((), tokio::io::stdin(), tokio::io::stdout()) + let state = (); + + if let Some(plugin) = Builder::new(tokio::io::stdin(), tokio::io::stdout()) .option(options::ConfigOption::new( "test-option", options::Value::Integer(42), @@ -15,7 +17,7 @@ async fn main() -> Result<(), anyhow::Error> { .rpcmethod("testmethod", "This is a test", testmethod) .subscribe("connect", connect_handler) .hook("peer_connected", peer_connected_handler) - .start() + .start(state) .await? { plugin.join().await diff --git a/plugins/grpc-plugin/src/main.rs b/plugins/grpc-plugin/src/main.rs index 70dffa5d3..926f7cefb 100644 --- a/plugins/grpc-plugin/src/main.rs +++ b/plugins/grpc-plugin/src/main.rs @@ -22,13 +22,7 @@ async fn main() -> Result<()> { let directory = std::env::current_dir()?; let (identity, ca_cert) = tls::init(&directory)?; - let state = PluginState { - rpc_path: path.into(), - identity, - ca_cert, - }; - - let plugin = match Builder::new(state.clone(), tokio::io::stdin(), tokio::io::stdout()) + let plugin = match Builder::new(tokio::io::stdin(), tokio::io::stdout()) .option(options::ConfigOption::new( "grpc-port", options::Value::Integer(-1), @@ -54,7 +48,13 @@ async fn main() -> Result<()> { Some(o) => return Err(anyhow!("grpc-port is not a valid integer: {:?}", o)), }; - let plugin = plugin.start().await?; + let state = PluginState { + rpc_path: path.into(), + identity, + ca_cert, + }; + + let plugin = plugin.start(state.clone()).await?; let bind_addr: SocketAddr = format!("0.0.0.0:{}", bind_port).parse().unwrap(); diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs index 56237d5c0..8c034a4a6 100644 --- a/plugins/src/lib.rs +++ b/plugins/src/lib.rs @@ -1,6 +1,7 @@ use crate::codec::{JsonCodec, JsonRpcCodec}; pub use anyhow::{anyhow, Context}; use futures::sink::SinkExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; extern crate log; use log::trace; use messages::Configuration; @@ -13,6 +14,7 @@ use tokio::sync::Mutex; use tokio_stream::StreamExt; use tokio_util::codec::FramedRead; use tokio_util::codec::FramedWrite; +use options::ConfigOption; pub mod codec; pub mod logging; @@ -23,7 +25,6 @@ extern crate serde_json; pub mod options; -use options::ConfigOption; /// Need to tell us about something that went wrong? Use this error /// type to do that. Use this alias to be safe from future changes in @@ -38,34 +39,81 @@ where O: Send + AsyncWrite + Unpin, S: Clone + Send, { - state: S, - input: Option, output: Option, hooks: HashMap>, options: Vec, - configuration: Option, rpcmethods: HashMap>, subscriptions: HashMap>, dynamic: bool, } +/// A plugin that has registered with the lightning daemon, and gotten +/// its options filled, however has not yet acknowledged the `init` +/// message. This is a mid-state allowing a plugin to disable itself, +/// based on the options. +pub struct ConfiguredPlugin +where + S: Clone + Send, +{ + init_id: serde_json::Value, + input: FramedRead, + output: Arc>>, + options: Vec, + configuration: Configuration, + rpcmethods: HashMap>, + hooks: HashMap>, + subscriptions: HashMap>, +} + +/// The [PluginDriver] is used to run the IO loop, reading messages +/// from the Lightning daemon, dispatching calls and notifications to +/// the plugin, and returning responses to the the daemon. We also use +/// it to handle spontaneous messages like Notifications and logging +/// events. +struct PluginDriver +where + S: Send + Clone, +{ + plugin: Plugin, + rpcmethods: HashMap>, + + #[allow(dead_code)] // Unused until we fill in the Hook structs. + hooks: HashMap>, + subscriptions: HashMap>, +} + +#[derive(Clone)] +pub struct Plugin +where + S: Clone + Send, +{ + /// The state gets cloned for each request + state: S, + /// "options" field of "init" message sent by cln + options: Vec, + /// "configuration" field of "init" message sent by cln + configuration: Configuration, + /// A signal that allows us to wait on the plugin's shutdown. + wait_handle: tokio::sync::broadcast::Sender<()>, + + sender: tokio::sync::mpsc::Sender, +} + impl Builder where O: Send + AsyncWrite + Unpin + 'static, S: Clone + Sync + Send + Clone + 'static, I: AsyncRead + Send + Unpin + 'static, { - pub fn new(state: S, input: I, output: O) -> Self { + pub fn new(input: I, output: O) -> Self { Self { - state, input: Some(input), output: Some(output), hooks: HashMap::new(), subscriptions: HashMap::new(), options: vec![], - configuration: None, rpcmethods: HashMap::new(), dynamic: false, } @@ -91,7 +139,7 @@ where /// Ok(()) /// } /// - /// let b = Builder::new((), tokio::io::stdin(), tokio::io::stdout()) + /// let b = Builder::new(tokio::io::stdin(), tokio::io::stdout()) /// .subscribe("connect", connect_handler); /// ``` pub fn subscribe(mut self, topic: &str, callback: C) -> Builder @@ -195,10 +243,9 @@ where )) } }; - let init_id = match input.next().await { + let (init_id, configuration) = match input.next().await { Some(Ok(messages::JsonRpc::Request(id, messages::Request::Init(m)))) => { - self.handle_init(m)?; - id + (id, self.handle_init(m)?) } Some(o) => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)), @@ -210,27 +257,15 @@ where } }; - let (wait_handle, _) = tokio::sync::broadcast::channel(1); - - // An MPSC pair used by anything that needs to send messages - // to the main daemon. - let (sender, receiver) = tokio::sync::mpsc::channel(4); - let plugin = Plugin { - state: self.state, - options: self.options, - configuration: self - .configuration - .ok_or(anyhow!("Plugin configuration missing"))?, - wait_handle, - sender, - }; - // TODO Split the two hashmaps once we fill in the hook // payload structs in messages.rs let mut rpcmethods: HashMap> = HashMap::from_iter(self.rpcmethods.drain().map(|(k, v)| (k, v.callback))); rpcmethods.extend(self.hooks.drain().map(|(k, v)| (k, v.callback))); + let subscriptions = + HashMap::from_iter(self.subscriptions.drain().map(|(k, v)| (k, v.callback))); + // Leave the `init` reply pending, so we can disable based on // the options if required. Ok(Some(ConfiguredPlugin { @@ -238,16 +273,11 @@ where init_id, input, output, - receiver, - driver: PluginDriver { - plugin: plugin.clone(), - rpcmethods, - hooks: HashMap::new(), - subscriptions: HashMap::from_iter( - self.subscriptions.drain().map(|(k, v)| (k, v.callback)), - ), - }, - plugin, + rpcmethods, + subscriptions, + options: self.options, + configuration, + hooks: HashMap::new(), })) } @@ -261,9 +291,9 @@ where /// `Plugin` instance and return `None` instead. This signals that /// we should exit, and not continue running. `start()` returns in /// order to allow user code to perform cleanup if necessary. - pub async fn start(self) -> Result>, anyhow::Error> { + pub async fn start(self, state: S) -> Result>, anyhow::Error> { if let Some(cp) = self.configure().await? { - Ok(Some(cp.start().await?)) + Ok(Some(cp.start(state).await?)) } else { Ok(None) } @@ -292,7 +322,7 @@ where } } - fn handle_init(&mut self, call: messages::InitCall) -> Result { + fn handle_init(&mut self, call: messages::InitCall) -> Result { use options::Value as OValue; use serde_json::Value as JValue; @@ -312,9 +342,7 @@ where } } - self.configuration = Some(call.configuration); - - Ok(messages::InitResponse::default()) + Ok(call.configuration) } } @@ -356,39 +384,6 @@ where callback: AsyncCallback, } -/// A plugin that has registered with the lightning daemon, and gotten -/// its options filled, however has not yet acknowledged the `init` -/// message. This is a mid-state allowing a plugin to disable itself, -/// based on the options. -pub struct ConfiguredPlugin -where - S: Clone + Send, -{ - init_id: serde_json::Value, - input: FramedRead, - output: Arc>>, - plugin: Plugin, - driver: PluginDriver, - receiver: tokio::sync::mpsc::Receiver, -} - -#[derive(Clone)] -pub struct Plugin -where - S: Clone + Send, -{ - /// The state gets cloned for each request - state: S, - /// "options" field of "init" message sent by cln - options: Vec, - /// "configuration" field of "init" message sent by cln - configuration: Configuration, - /// A signal that allows us to wait on the plugin's shutdown. - wait_handle: tokio::sync::broadcast::Sender<()>, - - sender: tokio::sync::mpsc::Sender, -} - impl Plugin where S: Clone + Send, @@ -409,12 +404,30 @@ where O: Send + AsyncWrite + Unpin + 'static, { #[allow(unused_mut)] - pub async fn start(mut self) -> Result, anyhow::Error> { - let driver = self.driver; - let plugin = self.plugin; + pub async fn start(mut self, state: S) -> Result, anyhow::Error> { let output = self.output; let input = self.input; - let receiver = self.receiver; // Now reply to the `init` message that `configure` left pending. + let (wait_handle, _) = tokio::sync::broadcast::channel(1); + + // An MPSC pair used by anything that needs to send messages + // to the main daemon. + let (sender, receiver) = tokio::sync::mpsc::channel(4); + + let plugin = Plugin { + state, + options: self.options, + configuration: self.configuration, + wait_handle, + sender, + }; + + let driver = PluginDriver { + plugin: plugin.clone(), + rpcmethods: self.rpcmethods, + hooks: self.hooks, + subscriptions: self.subscriptions, + }; + output .lock() .await @@ -465,28 +478,14 @@ where } pub fn option(&self, name: &str) -> Option { - self.plugin.option(name) + self.options + .iter() + .filter(|o| o.name() == name) + .next() + .map(|co| co.value.clone().unwrap_or(co.default().clone())) } } -/// The [PluginDriver] is used to run the IO loop, reading messages -/// from the Lightning daemon, dispatching calls and notifications to -/// the plugin, and returning responses to the the daemon. We also use -/// it to handle spontaneous messages like Notifications and logging -/// events. -struct PluginDriver -where - S: Send + Clone, -{ - plugin: Plugin, - rpcmethods: HashMap>, - - #[allow(dead_code)] // Unused until we fill in the Hook structs. - hooks: HashMap>, - subscriptions: HashMap>, -} - -use tokio::io::{AsyncReadExt, AsyncWriteExt}; impl PluginDriver where S: Send + Clone, @@ -688,7 +687,8 @@ mod test { #[tokio::test] async fn init() { - let builder = Builder::new((), tokio::io::stdin(), tokio::io::stdout()); - let _ = builder.start(); + let state = (); + let builder = Builder::new(tokio::io::stdin(), tokio::io::stdout()); + let _ = builder.start(state); } }