grpc-plugin: Run cargo fmt for minor style fixes

This commit is contained in:
michael1011 2024-07-29 15:07:40 +02:00 committed by Christian Decker
parent 9b1f331748
commit 46c3920585

View File

@ -14,19 +14,19 @@ struct PluginState {
rpc_path: PathBuf,
identity: tls::Identity,
ca_cert: Vec<u8>,
events : broadcast::Sender<cln_rpc::notifications::Notification>,
events: broadcast::Sender<cln_rpc::notifications::Notification>,
}
const OPTION_GRPC_PORT : options::IntegerConfigOption = options::ConfigOption::new_i64_no_default(
const OPTION_GRPC_PORT: options::IntegerConfigOption = options::ConfigOption::new_i64_no_default(
"grpc-port",
"Which port should the grpc plugin listen for incoming connections?");
"Which port should the grpc plugin listen for incoming connections?",
);
const OPTION_GRPC_MSG_BUFFER_SIZE : options::DefaultIntegerConfigOption = options::ConfigOption::new_i64_with_default(
"grpc-msg-buffer-size",
1024,
"Number of notifications which can be stored in the grpc message buffer. Notifications can be skipped if this buffer is full");
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
debug!("Starting grpc plugin");
@ -57,21 +57,19 @@ async fn main() -> Result<()> {
Some(port) => port,
None => {
log::info!("'grpc-port' options i not configured. exiting.");
plugin
.disable("Missing 'grpc-port' option")
.await?;
return Ok(())
plugin.disable("Missing 'grpc-port' option").await?;
return Ok(());
}
};
let buffer_size : i64 = plugin.option(&OPTION_GRPC_MSG_BUFFER_SIZE).unwrap();
let buffer_size: i64 = plugin.option(&OPTION_GRPC_MSG_BUFFER_SIZE).unwrap();
let buffer_size = match usize::try_from(buffer_size) {
Ok(b) => b,
Err(_) => {
plugin
.disable("'grpc-msg-buffer-size' should be strictly positive")
.await?;
return Ok(())
return Ok(());
}
};
@ -83,7 +81,7 @@ async fn main() -> Result<()> {
rpc_path: PathBuf::from(plugin.configuration().rpc_file.as_str()),
identity,
ca_cert,
events : sender
events: sender,
};
let plugin = plugin.start(state.clone()).await?;
@ -92,9 +90,9 @@ async fn main() -> Result<()> {
tokio::select! {
_ = plugin.join() => {
// This will likely never be shown, if we got here our
// parent process is exiting and not processing out log
// messages anymore.
// This will likely never be shown, if we got here our
// parent process is exiting and not processing out log
// messages anymore.
debug!("Plugin loop terminated")
}
e = run_interface(bind_addr, state) => {
@ -112,15 +110,13 @@ async fn run_interface(bind_addr: SocketAddr, state: PluginState) -> Result<()>
.identity(identity)
.client_ca_root(ca_cert);
let server = tonic::transport::Server::builder()
.tls_config(tls)
.context("configuring tls")?
.add_service(
NodeServer::new(
cln_grpc::Server::new(&state.rpc_path, state.events.clone())
.await
.context("creating NodeServer instance")?,
.add_service(NodeServer::new(
cln_grpc::Server::new(&state.rpc_path, state.events.clone())
.await
.context("creating NodeServer instance")?,
))
.serve(bind_addr);
@ -134,16 +130,15 @@ async fn run_interface(bind_addr: SocketAddr, state: PluginState) -> Result<()>
Ok(())
}
async fn handle_notification(plugin : Plugin<PluginState>, value : serde_json::Value) -> Result<()> {
let notification : Result<Notification, _> = serde_json::from_value(value);
async fn handle_notification(plugin: Plugin<PluginState>, value: serde_json::Value) -> Result<()> {
let notification: Result<Notification, _> = serde_json::from_value(value);
match notification {
Err(err) => {
log::debug!("Failed to parse notification from lightningd {:?}", err);
},
}
Ok(notification) => {
match plugin.state().events.send(notification) {
Err(err) => log::warn!("Failed to broadcast notification {:?}", err),
Ok(_) => {},
if let Err(err) = plugin.state().events.send(notification) {
log::warn!("Failed to broadcast notification {:?}", err)
}
}
};