cln-plugin: Notify waiting tasks if the lightningd connection closes

This is usually a signal that lightningd is shutting down, so notify
any instance that is waiting on `plugin.join()`.

Changelog-Fixed: cln-plugin: Fixed an issue where plugins would hang indefinitely despite `lightningd` closing the connection
This commit is contained in:
Christian Decker 2022-06-30 17:24:35 +02:00 committed by Rusty Russell
parent 12275d0bfe
commit 1efa5c37be
2 changed files with 35 additions and 23 deletions

View File

@ -14,7 +14,7 @@ struct PluginState {
ca_cert: Vec<u8>,
}
#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
debug!("Starting grpc plugin");
let path = Path::new("lightning-rpc");
@ -58,13 +58,18 @@ async fn main() -> Result<()> {
let bind_addr: SocketAddr = format!("0.0.0.0:{}", bind_port).parse().unwrap();
tokio::spawn(async move {
if let Err(e) = run_interface(bind_addr, state).await {
warn!("Error running the grpc interface: {}", e);
tokio::select! {
_ = plugin.join() => {
// This will likely never be shown, if we got here our
// parent process is exiting and not processing out log
// messages anymore.
debug!("Plugin loop terminated")
}
});
plugin.join().await
e = run_interface(bind_addr, state) => {
warn!("Error running grpc interface: {:?}", e)
}
}
Ok(())
}
async fn run_interface(bind_addr: SocketAddr, state: PluginState) -> Result<()> {

View File

@ -427,13 +427,19 @@ where
))
.await
.context("sending init response")?;
let joiner = plugin.wait_handle.clone();
// Start the PluginDriver to handle plugin IO
tokio::spawn(
driver.run(receiver, input, output),
// TODO Use the broadcast to distribute any error that we
// might receive here to anyone listening. (Shutdown
// signal)
);
tokio::spawn(async move {
if let Err(e) = driver.run(receiver, input, output).await {
log::warn!("Plugin loop returned error {:?}", e);
}
// Now that we have left the reader loop its time to
// notify any waiting tasks. This most likely will cause
// the main task to exit and the plugin to terminate.
joiner.send(())
});
Ok(plugin)
}
@ -502,16 +508,17 @@ where
// the user-code, which may require some cleanups or
// similar.
tokio::select! {
e = self.dispatch_one(&mut input, &self.plugin) => {
//Hand any error up.
e?;
},
v = receiver.recv() => {
output.lock().await.send(
v.context("internal communication error")?
).await?;
},
}
e = self.dispatch_one(&mut input, &self.plugin) => {
if let Err(e) = e {
return Err(e)
}
},
v = receiver.recv() => {
output.lock().await.send(
v.context("internal communication error")?
).await?;
},
}
}
}