From af4eed3787835eab90d9500b0dac5e39516f9e9e Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Fri, 25 Feb 2022 14:36:39 +0100 Subject: [PATCH] cln-plugin: Make hooks asynchronous --- plugins/examples/cln-plugin-startup.rs | 4 ++-- plugins/src/lib.rs | 25 ++++++++++++++++--------- tests/test_cln_rs.py | 2 +- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/plugins/examples/cln-plugin-startup.rs b/plugins/examples/cln-plugin-startup.rs index a1b2f2a4f..c06c83c40 100644 --- a/plugins/examples/cln-plugin-startup.rs +++ b/plugins/examples/cln-plugin-startup.rs @@ -15,7 +15,7 @@ async fn main() -> Result<(), anyhow::Error> { )) .rpcmethod("testmethod", "This is a test", testmethod) .subscribe("connect", Box::new(connect_handler)) - //.hook("peer_connected", Box::new(peer_connected_handler)) + .hook("peer_connected", peer_connected_handler) .start() .await?; plugin.join().await @@ -30,7 +30,7 @@ fn connect_handler(_p: Plugin<()>, v: &serde_json::Value) -> Result<(), Error> { Ok(()) } -fn peer_connected_handler(_p: Plugin<()>, v: &serde_json::Value) -> Result { +async fn peer_connected_handler(_p: Plugin<()>, v: serde_json::Value) -> Result { log::info!("Got a connect hook call: {}", v); Ok(json!({"result": "continue"})) } diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs index 9f8bb777a..28752ac29 100644 --- a/plugins/src/lib.rs +++ b/plugins/src/lib.rs @@ -79,8 +79,18 @@ where } /// Add a subscription to a given `hookname` - pub fn hook(mut self, hookname: &str, callback: Callback) -> Self { - self.hooks.insert(hookname.to_string(), Hook { callback }); + pub fn hook(mut self, hookname: &str, callback: C) -> Self + where + C: Send + Sync + 'static, + C: Fn(Plugin, Request) -> F + 'static, + F: Future + Send + Sync + 'static, + { + self.hooks.insert( + hookname.to_string(), + Hook { + callback: Box::new(move |p, r| Box::pin(callback(p, r))), + }, + ); self } @@ -175,15 +185,14 @@ where // payload structs in messages.rs let mut rpcmethods: HashMap> = HashMap::from_iter(self.rpcmethods.drain().map(|(k, v)| (k, v.callback))); - // TODO re-enable once hooks are async again - //rpcmethods.extend(self.hooks.clone().drain().map(|(k, v)| (k, v.callback))); + rpcmethods.extend(self.hooks.drain().map(|(k, v)| (k, v.callback))); // Start the PluginDriver to handle plugin IO tokio::spawn( PluginDriver { plugin: plugin.clone(), rpcmethods, - hooks: HashMap::from_iter(self.hooks.drain().map(|(k, v)| (k, v.callback))), + hooks: HashMap::new(), subscriptions: HashMap::from_iter( self.subscriptions.drain().map(|(k, v)| (k, v.callback)), ), @@ -244,7 +253,6 @@ where // of parentheses. type Request = serde_json::Value; type Response = Result; -type Callback = Box, &Request) -> Response>; type AsyncCallback = Box, Request) -> Pin + Send>> + Send + Sync>; type NotificationCallback = Box, &Request) -> Result<(), Error>>; @@ -268,12 +276,11 @@ where callback: NotificationCallback, } -#[derive(Clone)] struct Hook where S: Clone + Send, { - callback: Callback, + callback: AsyncCallback, } #[derive(Clone)] @@ -304,7 +311,7 @@ where rpcmethods: HashMap>, #[allow(dead_code)] // Unused until we fill in the Hook structs. - hooks: HashMap>, + hooks: HashMap>, subscriptions: HashMap>, } diff --git a/tests/test_cln_rs.py b/tests/test_cln_rs.py index fa8840f1f..5c2d89108 100644 --- a/tests/test_cln_rs.py +++ b/tests/test_cln_rs.py @@ -55,5 +55,5 @@ def test_plugin_start(node_factory): assert l1.rpc.testmethod() == "Hello" l1.connect(l2) - #l1.daemon.wait_for_log(r'Got a connect hook call') + l1.daemon.wait_for_log(r'Got a connect hook call') l1.daemon.wait_for_log(r'Got a connect notification')