cln-plugin: Rework the plugin library using a Builder

This commit is contained in:
Christian Decker 2022-02-15 16:24:15 +01:00 committed by Rusty Russell
parent 4aba119733
commit 22618a2f94
3 changed files with 201 additions and 139 deletions

View file

@ -6,17 +6,13 @@ use tokio;
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let (plugin, stdin) = Builder::new((), tokio::io::stdin(), tokio::io::stdout())
let plugin = Builder::new((), tokio::io::stdin(), tokio::io::stdout())
.option(options::ConfigOption::new(
"test-option",
options::Value::Integer(42),
"a test-option with default 42",
))
.build();
tokio::spawn(async {
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
log::info!("Hello world");
});
plugin.run(stdin).await
.start()
.await?;
plugin.join().await
}

View file

@ -1,9 +1,8 @@
use crate::codec::{JsonCodec, JsonRpcCodec};
pub use anyhow::Error;
pub use anyhow::{anyhow, Context, Error};
use futures::sink::SinkExt;
extern crate log;
use log::{trace, warn};
use std::marker::PhantomData;
use log::trace;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::Mutex;
@ -25,14 +24,13 @@ use options::ConfigOption;
/// Builder for a new plugin.
pub struct Builder<S, I, O>
where
S: Clone + Send,
I: AsyncRead + Unpin,
O: Send + AsyncWrite + Unpin,
{
state: S,
input: I,
output: O,
input: Option<I>,
output: Option<O>,
#[allow(dead_code)]
hooks: Hooks,
@ -46,14 +44,14 @@ where
impl<S, I, O> Builder<S, I, O>
where
O: Send + AsyncWrite + Unpin + 'static,
S: Clone + Send + 'static,
S: Clone + Sync + Send + Clone + 'static,
I: AsyncRead + Send + Unpin + 'static,
{
pub fn new(state: S, input: I, output: O) -> Self {
Self {
state,
input,
output,
input: Some(input),
output: Some(output),
hooks: Hooks::default(),
subscriptions: Subscriptions::default(),
options: vec![],
@ -65,147 +63,96 @@ where
self
}
pub fn build(self) -> (Plugin<S, I, O>, I) {
/// Build and start the plugin loop. This performs the handshake
/// and spawns a new task that accepts incoming messages from
/// c-lightning and dispatches them to the handlers. It only
/// returns after completing the handshake to ensure that the
/// configuration and initialization was successfull.
pub async fn start(mut self) -> Result<Plugin<S>, anyhow::Error> {
let mut input = FramedRead::new(self.input.take().unwrap(), JsonRpcCodec::default());
// Sadly we need to wrap the output in a mutex in order to
// enable early logging, i.e., logging that is done before the
// PluginDriver is processing events during the
// handshake. Otherwise we could just write the log events to
// the event queue and have the PluginDriver be the sole owner
// of `Stdout`.
let output = Arc::new(Mutex::new(FramedWrite::new(
self.output,
self.output.take().unwrap(),
JsonCodec::default(),
)));
// Now configure the logging, so any `log` call is wrapped
// in a JSON-RPC notification and sent to c-lightning
tokio::spawn(async move {});
(
Plugin {
state: Arc::new(Mutex::new(self.state)),
output,
input_type: PhantomData,
options: self.options,
},
self.input,
)
}
}
pub struct Plugin<S, I, O>
where
S: Clone + Send,
I: AsyncRead,
O: Send + AsyncWrite + 'static,
{
//input: FramedRead<Stdin, JsonCodec>,
output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
/// The state gets cloned for each request
state: Arc<Mutex<S>>,
input_type: PhantomData<I>,
options: Vec<ConfigOption>,
}
impl<S, I, O> Plugin<S, I, O>
where
S: Clone + Send,
I: AsyncRead + Send + Unpin,
O: Send + AsyncWrite + Unpin,
{
pub fn options(&self) -> Vec<ConfigOption> {
self.options.clone()
}
}
impl<S, I, O> Plugin<S, I, O>
where
S: Clone + Send,
I: AsyncRead + Send + Unpin,
O: Send + AsyncWrite + Unpin + 'static,
{
/// Read incoming requests from `c-lightning and dispatch their handling.
#[allow(unused_mut)]
pub async fn run(mut self, input: I) -> Result<(), Error> {
crate::logging::init(self.output.clone()).await?;
crate::logging::init(output.clone()).await?;
trace!("Plugin logging initialized");
let mut input = FramedRead::new(input, JsonRpcCodec::default());
loop {
match input.next().await {
Some(Ok(msg)) => {
trace!("Received a message: {:?}", msg);
match msg {
messages::JsonRpc::Request(id, p) => {
self.dispatch_request(id, p).await?
// Use a match to detect Ok / Error and return an error if we failed.
}
messages::JsonRpc::Notification(n) => self.dispatch_notification(n).await?,
}
}
Some(Err(e)) => {
warn!("Error reading command: {}", e);
break;
}
None => break,
// Read the `getmanifest` message:
match input.next().await {
Some(Ok(messages::JsonRpc::Request(id, messages::Request::Getmanifest(m)))) => {
output
.lock()
.await
.send(json!({
"jsonrpc": "2.0",
"result": self.handle_get_manifest(m),
"id": id,
}))
.await?
}
}
Ok(())
}
async fn dispatch_request(
&mut self,
id: usize,
request: messages::Request,
) -> Result<(), Error> {
trace!("Dispatching request {:?}", request);
let state = self.state.clone();
let res: serde_json::Value = match request {
messages::Request::Getmanifest(c) => {
serde_json::to_value(self.handle_get_manifest(c, state).await?).unwrap()
}
messages::Request::Init(c) => {
serde_json::to_value(self.handle_init(c, state).await?).unwrap()
}
o => panic!("Request {:?} is currently unhandled", o),
o => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
};
trace!("Sending respone {:?}", res);
let mut out = self.output.lock().await;
out.send(json!({
"jsonrpc": "2.0",
"result": res,
"id": id,
}))
.await
.unwrap();
Ok(())
match input.next().await {
Some(Ok(messages::JsonRpc::Request(id, messages::Request::Init(m)))) => {
output
.lock()
.await
.send(json!({
"jsonrpc": "2.0",
"result": self.handle_init(m)?,
"id": id,
}))
.await?
}
o => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
};
let (tx, _) = tokio::sync::broadcast::channel(1);
let plugin = Plugin {
state: self.state,
options: self.options,
wait_handle: tx,
};
// Start the PluginDriver to handle plugin IO
tokio::spawn(
PluginDriver {
plugin: plugin.clone(),
}
.run(input, output),
);
Ok(plugin)
}
async fn dispatch_notification(
&mut self,
notification: messages::Notification,
) -> Result<(), Error> {
trace!("Dispatching notification {:?}", notification);
unimplemented!()
}
async fn handle_get_manifest(
fn handle_get_manifest(
&mut self,
_call: messages::GetManifestCall,
_state: Arc<Mutex<S>>,
) -> Result<messages::GetManifestResponse, Error> {
Ok(messages::GetManifestResponse {
) -> messages::GetManifestResponse {
messages::GetManifestResponse {
options: self.options.clone(),
rpcmethods: vec![],
})
}
}
async fn handle_init(
&mut self,
call: messages::InitCall,
_state: Arc<Mutex<S>>,
) -> Result<messages::InitResponse, Error> {
fn handle_init(&mut self, call: messages::InitCall) -> Result<messages::InitResponse, Error> {
use options::Value as OValue;
use serde_json::Value as JValue;
// Match up the ConfigOptions and fill in their values if we
// have a matching entry.
for opt in self.options.iter_mut() {
if let Some(val) = call.options.get(opt.name()) {
opt.value = Some(match (opt.default(), &val) {
@ -224,6 +171,125 @@ where
}
}
#[derive(Clone)]
pub struct Plugin<S>
where
S: Clone + Send,
{
/// The state gets cloned for each request
state: S,
options: Vec<ConfigOption>,
/// A signal that allows us to wait on the plugin's shutdown.
wait_handle: tokio::sync::broadcast::Sender<()>,
}
/// The [PluginDriver] is used to run the IO loop, reading messages
/// from the Lightning daemon, dispatching calls and notifications to
/// the plugin, and returning responses to the the daemon. We also use
/// it to handle spontaneous messages like Notifications and logging
/// events.
struct PluginDriver<S>
where
S: Send + Clone,
{
#[allow(dead_code)]
plugin: Plugin<S>,
}
use tokio::io::AsyncReadExt;
impl<S> PluginDriver<S>
where
S: Send + Clone,
{
/// Run the plugin until we get a shutdown command.
async fn run<I, O>(
self,
mut input: FramedRead<I, JsonRpcCodec>,
_output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
) -> Result<(), Error>
where
I: Send + AsyncReadExt + Unpin,
O: Send,
{
loop {
tokio::select! {
_ = PluginDriver::dispatch_one(&mut input, &self.plugin) => {},
}
}
}
/// Dispatch one server-side event and then return. Just so we
/// have a nicer looking `select` statement in `run` :-)
async fn dispatch_one<I>(
input: &mut FramedRead<I, JsonRpcCodec>,
plugin: &Plugin<S>,
) -> Result<(), Error>
where
I: Send + AsyncReadExt + Unpin,
{
match input.next().await {
Some(Ok(msg)) => {
trace!("Received a message: {:?}", msg);
match msg {
messages::JsonRpc::Request(id, p) => {
PluginDriver::<S>::dispatch_request(id, p, plugin).await
}
messages::JsonRpc::Notification(n) => {
PluginDriver::<S>::dispatch_notification(n, plugin).await
}
}
}
Some(Err(e)) => Err(anyhow!("Error reading command: {}", e)),
None => Ok(()),
}
}
async fn dispatch_request(
id: usize,
request: messages::Request,
_plugin: &Plugin<S>,
) -> Result<(), Error> {
panic!("Unexpected request {:?} with id {}", request, id);
}
async fn dispatch_notification(
notification: messages::Notification,
_plugin: &Plugin<S>,
) -> Result<(), Error>
where
S: Send + Clone,
{
trace!("Dispatching notification {:?}", notification);
unimplemented!()
}
}
impl<S> Plugin<S>
where
S: Clone + Send,
{
pub fn options(&self) -> Vec<ConfigOption> {
self.options.clone()
}
pub fn state(&self) -> &S {
&self.state
}
}
impl<S> Plugin<S>
where
S: Send + Clone,
{
pub async fn join(&self) -> Result<(), Error> {
self.wait_handle
.subscribe()
.recv()
.await
.context("error waiting for shutdown")
}
}
/// A container for all the configure hooks. It is just a collection
/// of callbacks that can be registered by the users of the
/// library. Based on this configuration we can then generate the
@ -239,9 +305,9 @@ struct Subscriptions {}
mod test {
use super::*;
#[test]
fn init() {
#[tokio::test]
async fn init() {
let builder = Builder::new((), tokio::io::stdin(), tokio::io::stdout());
builder.build();
let _ = builder.start();
}
}

View file

@ -104,14 +104,14 @@ mod test {
}),
),
(
ConfigOption::new("name", Value::Boolean(true), "description"
ConfigOption::new("name", Value::Boolean(true), "description"),
json!({
"name": "name",
"description":"description",
"default": true,
"type": "booltes",
}),
)),
),
];
for (input, expected) in tests.iter() {