From 8717c4e5a234f222f521cdd677c5f9700eeb99de Mon Sep 17 00:00:00 2001
From: Christian Decker <decker.christian@gmail.com>
Date: Fri, 8 Apr 2022 05:41:12 +0200
Subject: [PATCH] cln-grpc: Add midstate between configuration and replying to
 init

This is a bit special, in that it allows us to configure the plugin,
but then still abort startup by sending `init` with the `disable` flag
set.
---
 plugins/grpc-plugin/src/main.rs |  23 +++--
 plugins/src/lib.rs              | 171 +++++++++++++++++++++++---------
 plugins/src/messages.rs         |   5 +-
 tests/test_cln_rs.py            |   4 +-
 4 files changed, 143 insertions(+), 60 deletions(-)

diff --git a/plugins/grpc-plugin/src/main.rs b/plugins/grpc-plugin/src/main.rs
index f73463761..58bd2751c 100644
--- a/plugins/grpc-plugin/src/main.rs
+++ b/plugins/grpc-plugin/src/main.rs
@@ -34,28 +34,29 @@ async fn main() -> Result<()> {
             options::Value::Integer(-1),
             "Which port should the grpc plugin listen for incoming connections?",
         ))
-        .start()
+        .configure()
         .await?;
 
     let bind_port = match plugin.option("grpc-port") {
         Some(options::Value::Integer(-1)) => {
             log::info!("`grpc-port` option is not configured, exiting.");
-            None
+            plugin.disable("`grpc-port` option is not configured.").await?;
+            return Ok(());
         }
-        Some(options::Value::Integer(i)) => Some(i),
+        Some(options::Value::Integer(i)) => i,
         None => return Err(anyhow!("Missing 'grpc-port' option")),
         Some(o) => return Err(anyhow!("grpc-port is not a valid integer: {:?}", o)),
     };
 
-    if let Some(bind_port) = bind_port {
-        let bind_addr: SocketAddr = format!("0.0.0.0:{}", bind_port).parse().unwrap();
+    let plugin = plugin.start().await?;
 
-        tokio::spawn(async move {
-            if let Err(e) = run_interface(bind_addr, state).await {
-                warn!("Error running the grpc interface: {}", e);
-            }
-        });
-    }
+    let bind_addr: SocketAddr = format!("0.0.0.0:{}", bind_port).parse().unwrap();
+
+    tokio::spawn(async move {
+        if let Err(e) = run_interface(bind_addr, state).await {
+            warn!("Error running the grpc interface: {}", e);
+        }
+    });
 
     plugin.join().await
 }
diff --git a/plugins/src/lib.rs b/plugins/src/lib.rs
index 0179db860..a3798adf7 100644
--- a/plugins/src/lib.rs
+++ b/plugins/src/lib.rs
@@ -139,12 +139,7 @@ where
         self
     }
 
-    /// Build and start the plugin loop. This performs the handshake
-    /// and spawns a new task that accepts incoming messages from
-    /// Core Lightning and dispatches them to the handlers. It only
-    /// returns after completing the handshake to ensure that the
-    /// configuration and initialization was successfull.
-    pub async fn start(mut self) -> Result<Plugin<S>, anyhow::Error> {
+    pub async fn configure(mut self) -> Result<ConfiguredPlugin<S, I, O>, anyhow::Error> {
         let mut input = FramedRead::new(self.input.take().unwrap(), JsonRpcCodec::default());
 
         // Sadly we need to wrap the output in a mutex in order to
@@ -177,28 +172,25 @@ where
                     .await?
             }
             Some(o) => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
-            None => return Err(anyhow!("Lost connection to lightning expecting getmanifest")),
+            None => {
+                return Err(anyhow!(
+                    "Lost connection to lightning expecting getmanifest"
+                ))
+            }
         };
-
-        match input.next().await {
+        let init_id = match input.next().await {
             Some(Ok(messages::JsonRpc::Request(id, messages::Request::Init(m)))) => {
-                output
-                    .lock()
-                    .await
-                    .send(json!({
-                        "jsonrpc": "2.0",
-                        "result": self.handle_init(m)?,
-                        "id": id,
-                    }))
-                    .await?
+                self.handle_init(m)?;
+                id
             }
 
             Some(o) => return Err(anyhow!("Got unexpected message {:?} from lightningd", o)),
             None => {
-		// If we are being called with --help we will get
-		// disconnected here. That's expected, so don't
-		// complain about it.
-	    }
+                // If we are being called with --help we will get
+                // disconnected here. That's expected, so don't
+                // complain about it.
+                0
+            }
         };
 
         let (wait_handle, _) = tokio::sync::broadcast::channel(1);
@@ -219,23 +211,33 @@ where
             HashMap::from_iter(self.rpcmethods.drain().map(|(k, v)| (k, v.callback)));
         rpcmethods.extend(self.hooks.drain().map(|(k, v)| (k, v.callback)));
 
-        // Start the PluginDriver to handle plugin IO
-        tokio::spawn(
-            PluginDriver {
+        // Leave the `init` reply pending, so we can disable based on
+        // the options if required.
+        Ok(ConfiguredPlugin {
+            // The JSON-RPC `id` field so we can reply correctly.
+            init_id,
+            input,
+            output,
+            receiver,
+            driver: PluginDriver {
                 plugin: plugin.clone(),
                 rpcmethods,
                 hooks: HashMap::new(),
                 subscriptions: HashMap::from_iter(
                     self.subscriptions.drain().map(|(k, v)| (k, v.callback)),
                 ),
-            }
-            .run(receiver, input, output),
-	    // TODO Use the broadcast to distribute any error that we
-	    // might receive here to anyone listening. (Shutdown
-	    // signal)
-        );
+            },
+            plugin,
+        })
+    }
 
-        Ok(plugin)
+    /// Build and start the plugin loop. This performs the handshake
+    /// and spawns a new task that accepts incoming messages from
+    /// Core Lightning and dispatches them to the handlers. It only
+    /// returns after completing the handshake to ensure that the
+    /// configuration and initialization was successfull.
+    pub async fn start(self) -> Result<Plugin<S>, anyhow::Error> {
+        self.configure().await?.start().await
     }
 
     fn handle_get_manifest(
@@ -322,6 +324,22 @@ where
     callback: AsyncCallback<S>,
 }
 
+/// A plugin that has registered with the lightning daemon, and gotten
+/// its options filled, however has not yet acknowledged the `init`
+/// message. This is a mid-state allowing a plugin to disable itself,
+/// based on the options.
+pub struct ConfiguredPlugin<S, I, O>
+where
+    S: Clone + Send,
+{
+    init_id: usize,
+    input: FramedRead<I, JsonRpcCodec>,
+    output: Arc<Mutex<FramedWrite<O, JsonCodec>>>,
+    plugin: Plugin<S>,
+    driver: PluginDriver<S>,
+    receiver: tokio::sync::mpsc::Receiver<serde_json::Value>,
+}
+
 #[derive(Clone)]
 pub struct Plugin<S>
 where
@@ -350,6 +368,67 @@ where
     }
 }
 
+impl<S, I, O> ConfiguredPlugin<S, I, O>
+where
+    S: Send + Clone + Sync + 'static,
+    I: AsyncRead + Send + Unpin + 'static,
+    O: Send + AsyncWrite + Unpin + 'static,
+{
+    #[allow(unused_mut)]
+    pub async fn start(mut self) -> Result<Plugin<S>, anyhow::Error> {
+        let driver = self.driver;
+        let plugin = self.plugin;
+        let output = self.output;
+        let input = self.input;
+        let receiver = self.receiver; // Now reply to the `init` message that `configure` left pending.
+        output
+            .lock()
+            .await
+            .send(json!(
+                {
+                    "jsonrpc": "2.0",
+                    "id": self.init_id,
+            "result": crate::messages::InitResponse{disable: None}
+                }
+            ))
+            .await
+            .context("sending init response")?;
+        // Start the PluginDriver to handle plugin IO
+        tokio::spawn(
+            driver.run(receiver, input, output),
+            // TODO Use the broadcast to distribute any error that we
+            // might receive here to anyone listening. (Shutdown
+            // signal)
+        );
+        Ok(plugin)
+    }
+
+    /// Abort the plugin startup. Communicate that we're about to exit
+    /// voluntarily, and this is not an error.
+    #[allow(unused_mut)]
+    pub async fn disable(mut self, reason: &str) -> Result<(), anyhow::Error> {
+        self.output
+            .lock()
+            .await
+            .send(json!(
+                {
+                    "jsonrpc": "2.0",
+                    "id": self.init_id,
+            "result": crate::messages::InitResponse{
+            disable: Some(reason.to_string())
+            }
+                }
+            ))
+            .await
+            .context("sending init response")?;
+        Ok(())
+    }
+
+    pub fn option(&self, name: &str) -> Option<options::Value> {
+        self.plugin.option(name)
+    }
+}
+
 /// The [PluginDriver] is used to run the IO loop, reading messages
 /// from the Lightning daemon, dispatching calls and notifications to
 /// the plugin, and returning responses to the the daemon. We also use
@@ -384,21 +463,21 @@ where
         O: Send + AsyncWriteExt + Unpin,
     {
         loop {
-	    // If we encounter any error reading or writing from/to
-	    // the master we hand them up, so we can return control to
-	    // the user-code, which may require some cleanups or
-	    // similar.
+            // If we encounter any error reading or writing from/to
+            // the master we hand them up, so we can return control to
+            // the user-code, which may require some cleanups or
+            // similar.
             tokio::select! {
-                e = self.dispatch_one(&mut input, &self.plugin) => {
-		    //Hand any error up.
-		    e?;
-		},
-		v = receiver.recv() => {
-		    output.lock().await.send(
-			v.context("internal communication error")?
-		    ).await?;
-		},
-            }
+                    e = self.dispatch_one(&mut input, &self.plugin) => {
+                //Hand any error up.
+                e?;
+            },
+            v = receiver.recv() => {
+                output.lock().await.send(
+                v.context("internal communication error")?
+                ).await?;
+            },
+                }
         }
     }
 
diff --git a/plugins/src/messages.rs b/plugins/src/messages.rs
index ec11cb2eb..4f19aaea9 100644
--- a/plugins/src/messages.rs
+++ b/plugins/src/messages.rs
@@ -132,6 +132,9 @@ pub(crate) struct GetManifestResponse {
 }
 
 #[derive(Serialize, Default, Debug)]
-pub struct InitResponse {}
+pub struct InitResponse {
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub disable: Option<String>,
+}
 
 pub trait Response: Serialize + Debug {}
diff --git a/tests/test_cln_rs.py b/tests/test_cln_rs.py
index 996d7b19d..8fd3b8336 100644
--- a/tests/test_cln_rs.py
+++ b/tests/test_cln_rs.py
@@ -176,8 +176,8 @@ def test_grpc_no_auto_start(node_factory):
         "plugin": str(bin_path),
     })
 
-    l1.daemon.logsearch_start = 0
-    assert l1.daemon.is_in_log(r'plugin-cln-grpc: Killing plugin: exited during normal operation')
+    wait_for(lambda: [p for p in l1.rpc.plugin('list')['plugins'] if 'cln-grpc' in p['name']] == [])
+    assert l1.daemon.is_in_log(r'plugin-cln-grpc: Killing plugin: disabled itself at init')
 
 
 def test_grpc_wrong_auth(node_factory):