diff --git a/plugins/examples/cln-plugin-startup.rs b/plugins/examples/cln-plugin-startup.rs index c06c83c40..822083e47 100644 --- a/plugins/examples/cln-plugin-startup.rs +++ b/plugins/examples/cln-plugin-startup.rs @@ -3,7 +3,6 @@ #[macro_use] extern crate serde_json; use cln_plugin::{options, Builder, Error, Plugin}; -use std::pin::Pin; use tokio; #[tokio::main] async fn main() -> Result<(), anyhow::Error> { @@ -14,7 +13,7 @@ async fn main() -> Result<(), anyhow::Error> { "a test-option with default 42", )) .rpcmethod("testmethod", "This is a test", testmethod) - .subscribe("connect", Box::new(connect_handler)) + .subscribe("connect", connect_handler) .hook("peer_connected", peer_connected_handler) .start() .await?; @@ -25,7 +24,7 @@ async fn testmethod(_p: Plugin<()>, _v: serde_json::Value) -> Result, v: &serde_json::Value) -> Result<(), Error> { +async fn connect_handler(_p: Plugin<()>, v: serde_json::Value) -> Result<(), Error> { log::info!("Got a connect notification: {}", v); Ok(()) } diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs index 28752ac29..1318d893e 100644 --- a/plugins/src/lib.rs +++ b/plugins/src/lib.rs @@ -71,10 +71,36 @@ where self } - /// Subscribe to notifications for the given `topic`. - pub fn subscribe(mut self, topic: &str, callback: NotificationCallback) -> Builder { - self.subscriptions - .insert(topic.to_string(), Subscription { callback }); + /// Subscribe to notifications for the given `topic`. The handler + /// is an async function that takes a `Plugin` and the + /// notification as a `serde_json::Value` as inputs. Since + /// notifications do not expect a result the handler should only + /// report errors while processing. Any error reported while + /// processing the notification will be logged in the cln logs. + /// + /// ``` + /// use cln_plugin::{options, Builder, Error, Plugin}; + /// + /// async fn connect_handler(_p: Plugin<()>, v: serde_json::Value) -> Result<(), Error> { + /// println!("Got a connect notification: {}", v); + /// Ok(()) + /// } + /// + /// let b = Builder::new((), tokio::io::stdin(), tokio::io::stdout()) + /// .subscribe("connect", connect_handler); + /// ``` + pub fn subscribe(mut self, topic: &str, callback: C) -> Builder + where + C: Send + Sync + 'static, + C: Fn(Plugin, Request) -> F + 'static, + F: Future> + Send + Sync + 'static, + { + self.subscriptions.insert( + topic.to_string(), + Subscription { + callback: Box::new(move |p, r| Box::pin(callback(p, r))), + }, + ); self } @@ -255,7 +281,11 @@ type Request = serde_json::Value; type Response = Result; type AsyncCallback = Box, Request) -> Pin + Send>> + Send + Sync>; -type NotificationCallback = Box, &Request) -> Result<(), Error>>; +type AsyncNotificationCallback = Box< + dyn Fn(Plugin, Request) -> Pin> + Send>> + + Send + + Sync, +>; /// A struct collecting the metadata required to register a custom /// rpcmethod with the main daemon upon init. It'll get deconstructed @@ -273,7 +303,7 @@ struct Subscription where S: Clone + Send, { - callback: NotificationCallback, + callback: AsyncNotificationCallback, } struct Hook @@ -312,7 +342,7 @@ where #[allow(dead_code)] // Unused until we fill in the Hook structs. hooks: HashMap>, - subscriptions: HashMap>, + subscriptions: HashMap>, } use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -464,7 +494,7 @@ where method, params ); - if let Err(e) = callback(plugin.clone(), params) { + if let Err(e) = callback(plugin.clone(), params.clone()).await { log::error!("Error in notification handler '{}': {}", method, e); } Ok(())