cln-plugin: Make notification handlers asynchronous

This commit is contained in:
Christian Decker 2022-02-25 14:48:32 +01:00 committed by Rusty Russell
parent af4eed3787
commit 1bd2b8c9f7
2 changed files with 40 additions and 11 deletions

View file

@ -3,7 +3,6 @@
#[macro_use]
extern crate serde_json;
use cln_plugin::{options, Builder, Error, Plugin};
use std::pin::Pin;
use tokio;
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
@ -14,7 +13,7 @@ async fn main() -> Result<(), anyhow::Error> {
"a test-option with default 42",
))
.rpcmethod("testmethod", "This is a test", testmethod)
.subscribe("connect", Box::new(connect_handler))
.subscribe("connect", connect_handler)
.hook("peer_connected", peer_connected_handler)
.start()
.await?;
@ -25,7 +24,7 @@ async fn testmethod(_p: Plugin<()>, _v: serde_json::Value) -> Result<serde_json:
Ok(json!("Hello"))
}
fn connect_handler(_p: Plugin<()>, v: &serde_json::Value) -> Result<(), Error> {
async fn connect_handler(_p: Plugin<()>, v: serde_json::Value) -> Result<(), Error> {
log::info!("Got a connect notification: {}", v);
Ok(())
}

View file

@ -71,10 +71,36 @@ where
self
}
/// Subscribe to notifications for the given `topic`.
pub fn subscribe(mut self, topic: &str, callback: NotificationCallback<S>) -> Builder<S, I, O> {
self.subscriptions
.insert(topic.to_string(), Subscription { callback });
/// Subscribe to notifications for the given `topic`. The handler
/// is an async function that takes a `Plugin<S>` and the
/// notification as a `serde_json::Value` as inputs. Since
/// notifications do not expect a result the handler should only
/// report errors while processing. Any error reported while
/// processing the notification will be logged in the cln logs.
///
/// ```
/// use cln_plugin::{options, Builder, Error, Plugin};
///
/// async fn connect_handler(_p: Plugin<()>, v: serde_json::Value) -> Result<(), Error> {
/// println!("Got a connect notification: {}", v);
/// Ok(())
/// }
///
/// let b = Builder::new((), tokio::io::stdin(), tokio::io::stdout())
/// .subscribe("connect", connect_handler);
/// ```
pub fn subscribe<C, F>(mut self, topic: &str, callback: C) -> Builder<S, I, O>
where
C: Send + Sync + 'static,
C: Fn(Plugin<S>, Request) -> F + 'static,
F: Future<Output = Result<(), Error>> + Send + Sync + 'static,
{
self.subscriptions.insert(
topic.to_string(),
Subscription {
callback: Box::new(move |p, r| Box::pin(callback(p, r))),
},
);
self
}
@ -255,7 +281,11 @@ type Request = serde_json::Value;
type Response = Result<serde_json::Value, Error>;
type AsyncCallback<S> =
Box<dyn Fn(Plugin<S>, Request) -> Pin<Box<dyn Future<Output = Response> + Send>> + Send + Sync>;
type NotificationCallback<S> = Box<fn(Plugin<S>, &Request) -> Result<(), Error>>;
type AsyncNotificationCallback<S> = Box<
dyn Fn(Plugin<S>, Request) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>
+ Send
+ Sync,
>;
/// A struct collecting the metadata required to register a custom
/// rpcmethod with the main daemon upon init. It'll get deconstructed
@ -273,7 +303,7 @@ struct Subscription<S>
where
S: Clone + Send,
{
callback: NotificationCallback<S>,
callback: AsyncNotificationCallback<S>,
}
struct Hook<S>
@ -312,7 +342,7 @@ where
#[allow(dead_code)] // Unused until we fill in the Hook structs.
hooks: HashMap<String, AsyncCallback<S>>,
subscriptions: HashMap<String, NotificationCallback<S>>,
subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
}
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@ -464,7 +494,7 @@ where
method,
params
);
if let Err(e) = callback(plugin.clone(), params) {
if let Err(e) = callback(plugin.clone(), params.clone()).await {
log::error!("Error in notification handler '{}': {}", method, e);
}
Ok(())