msggen: Implement notification-server

This commit is contained in:
Erik De Smedt 2024-02-14 13:00:19 +01:00 committed by Christian Decker
parent fa43d3ecfc
commit 521ac6a12a
6 changed files with 3421 additions and 3106 deletions

35
Cargo.lock generated
View file

@ -264,11 +264,15 @@ dependencies = [
"anyhow", "anyhow",
"bitcoin", "bitcoin",
"cln-rpc", "cln-rpc",
"futures-core",
"hex", "hex",
"log", "log",
"prost", "prost",
"serde", "serde",
"serde_json", "serde_json",
"tokio",
"tokio-stream",
"tokio-util",
"tonic", "tonic",
"tonic-build", "tonic-build",
] ]
@ -452,9 +456,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-core" name = "futures-core"
version = "0.3.28" version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]] [[package]]
name = "futures-executor" name = "futures-executor"
@ -734,9 +738,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.148" version = "0.2.153"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cdc71e17332e86d2e1d38c1f99edcb6288ee11b815fb1a4b049eaa2114d369b" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd"
[[package]] [[package]]
name = "linux-raw-sys" name = "linux-raw-sys"
@ -794,9 +798,9 @@ dependencies = [
[[package]] [[package]]
name = "mio" name = "mio"
version = "0.8.8" version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09"
dependencies = [ dependencies = [
"libc", "libc",
"wasi", "wasi",
@ -1313,9 +1317,9 @@ dependencies = [
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.5.4" version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys", "windows-sys",
@ -1449,9 +1453,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.32.0" version = "1.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931"
dependencies = [ dependencies = [
"backtrace", "backtrace",
"bytes", "bytes",
@ -1459,7 +1463,7 @@ dependencies = [
"mio", "mio",
"num_cpus", "num_cpus",
"pin-project-lite", "pin-project-lite",
"socket2 0.5.4", "socket2 0.5.5",
"tokio-macros", "tokio-macros",
"windows-sys", "windows-sys",
] ]
@ -1476,9 +1480,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "2.1.0" version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -1505,6 +1509,7 @@ dependencies = [
"futures-core", "futures-core",
"pin-project-lite", "pin-project-lite",
"tokio", "tokio",
"tokio-util",
] ]
[[package]] [[package]]
@ -1522,9 +1527,9 @@ dependencies = [
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.9" version = "0.7.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-core", "futures-core",

View file

@ -21,6 +21,10 @@ prost = "0.11"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
hex = "0.4.3" hex = "0.4.3"
bitcoin = { version = "0.30", features = [ "serde" ] } bitcoin = { version = "0.30", features = [ "serde" ] }
tokio-stream = { version = "0.1.14", features = ["sync"] }
tokio = { version = "1.36.0", features = ["sync"] }
futures-core = "0.3.30"
tokio-util = "0.7.10"
[dev-dependencies] [dev-dependencies]
serde_json = "1.0.72" serde_json = "1.0.72"

File diff suppressed because it is too large Load diff

View file

@ -152,8 +152,7 @@ class GrpcGenerator(IGenerator):
name = str(notification.typename) name = str(notification.typename)
self.write( self.write(
f" rpc Subscribe{name}({notification.request.typename}) returns (stream {notification.response.typename}) {{}}\n", f" rpc Subscribe{name}({notification.request.typename}) returns (stream {notification.response.typename}) {{}}\n",
cleanup=False, cleanup=False)
)
self.write( self.write(
f"""}} f"""}}

View file

@ -1,44 +1,116 @@
# A grpc model # A grpc model
import re import re
import logging
from typing import TextIO, Optional
from msggen.model import Service from msggen.model import Service
from msggen.gen.grpc.convert import GrpcConverterGenerator from msggen.gen.generator import IGenerator
from msggen.gen.grpc.util import method_name_overrides from msggen.gen.grpc.util import method_name_overrides, camel_to_snake, snake_to_camel
from textwrap import indent, dedent
class GrpcServerGenerator(GrpcConverterGenerator): class GrpcServerGenerator(IGenerator):
def __init__(self, dest: TextIO):
self.dest = dest
self.logger = logging.getLogger(__name__)
def write(self, text: str, numindent: Optional[int] = None):
if numindent is None:
self.dest.write(text)
else:
text = dedent(text)
text = indent(text, " " * numindent)
self.dest.write(text)
def generate(self, service: Service) -> None: def generate(self, service: Service) -> None:
self.write( self.write(
f"""\ f"""\
use crate::pb::node_server::Node; use crate::pb::node_server::Node;
use crate::pb; use crate::pb;
use cln_rpc::{{Request, Response, ClnRpc}}; use cln_rpc::{{Request, Response, ClnRpc}};
use cln_rpc::notifications::Notification;
use anyhow::Result; use anyhow::Result;
use std::path::{{Path, PathBuf}}; use std::path::{{Path, PathBuf}};
use std::pin::Pin;
use std::task::{{Context, Poll}};
use cln_rpc::model::requests; use cln_rpc::model::requests;
use log::{{debug, trace}}; use log::{{debug, trace}};
use tonic::{{Code, Status}}; use tonic::{{Code, Status}};
use tokio::sync::broadcast;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tokio_stream::wrappers::BroadcastStream;
#[derive(Clone)] #[derive(Clone)]
pub struct Server pub struct Server
{{ {{
rpc_path: PathBuf, rpc_path: PathBuf,
events : broadcast::Sender<Notification>
}} }}
impl Server impl Server
{{ {{
pub async fn new(path: &Path) -> Result<Self> pub async fn new(
path: &Path,
events : broadcast::Sender<Notification>
) -> Result<Self>
{{ {{
Ok(Self {{ Ok(Self {{
rpc_path: path.to_path_buf(), rpc_path: path.to_path_buf(),
events : events
}}) }})
}} }}
}} }}
pub struct NotificationStream<T> {{
inner : Pin<Box<BroadcastStream<Notification>>>,
fn_filter_map : fn(Notification) -> Option<T>
}}
impl<T : 'static + Send + Clone> tokio_stream::Stream for NotificationStream<T> {{
type Item = Result<T, tonic::Status>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {{
while let Poll::Ready(result) = self.inner.as_mut().poll_next(cx) {{
// None is used here to signal that we have reached the end of stream
// If inner ends the stream by returning None we do the same
if result.is_none() {{
return Poll::Ready(None)
}}
let result: Result<cln_rpc::Notification, BroadcastStreamRecvError> = result.unwrap();
match result {{
Err(BroadcastStreamRecvError::Lagged(lag)) => {{
// In this error case we've missed some notifications
// We log the error to core lightning and forward
// this information to the client
log::warn!("Due to lag the grpc-server skipped {{}} notifications", lag);
return Poll::Ready(Some(Err(
Status::data_loss(
format!("Skipped up to {{}} notifications", lag)))))
}}
Ok(notification) => {{
let filtered = (self.fn_filter_map)(notification);
match filtered {{
Some(n) => return Poll::Ready(Some(Ok(n))),
None => {{
// We ignore the message if it isn't a match.
// e.g: A `ChannelOpenedStream` will ignore `CustomMsgNotifications`
}}
}}
}}
}}
}}
Poll::Pending
}}
}}
#[tonic::async_trait] #[tonic::async_trait]
impl Node for Server impl Node for Server
{{ {{
""" """,
numindent=0,
) )
for method in service.methods: for method in service.methods:
@ -47,8 +119,8 @@ class GrpcServerGenerator(GrpcConverterGenerator):
name = re.sub(r"(?<!_)(?<!^)(?=[A-Z])", "_", mname).lower() name = re.sub(r"(?<!_)(?<!^)(?=[A-Z])", "_", mname).lower()
name = name.replace("-", "") name = name.replace("-", "")
method.name = method.name.replace("-", "") method.name = method.name.replace("-", "")
pbname_request = self.to_camel_case(str(method.request.typename)) pbname_request = snake_to_camel(str(method.request.typename))
pbname_response = self.to_camel_case(str(method.response.typename)) pbname_response = snake_to_camel(str(method.response.typename))
self.write( self.write(
f"""\ f"""\
async fn {name}( async fn {name}(
@ -82,12 +154,43 @@ class GrpcServerGenerator(GrpcConverterGenerator):
}} }}
}}\n\n""", }}\n\n""",
numindent=0, numindent=1,
) )
for notification in service.notifications:
typename = str(notification.typename)
snake_name = camel_to_snake(typename)
response_name = str(notification.response.typename)
stream_request = f"Stream{typename}Request"
stream_name = f"Subscribe{notification.typename}Stream"
self.write( self.write(
f"""\ f"""
type Subscribe{typename}Stream = NotificationStream<pb::{response_name}>;
async fn subscribe_{snake_name}(
&self,
_request : tonic::Request<pb::{stream_request}>
) -> Result<tonic::Response<Self::{stream_name}>, tonic::Status> {{
let receiver = self.events.subscribe();
let stream = BroadcastStream::new(receiver);
let boxed = Box::pin(stream);
let result = NotificationStream {{
inner : boxed,
fn_filter_map : |x| {{
match x {{
Notification::{typename}(x) => {{
Some(x.into())
}}
_ => None
}}
}}
}};
Ok(tonic::Response::new(result))
}} }}
""", """,
numindent=0, numindent=1,
) )
self.write("""}""", numindent=0)

View file

@ -1,3 +1,6 @@
# A grpc model
import re
typemap = { typemap = {
"boolean": "bool", "boolean": "bool",
"hex": "bytes", "hex": "bytes",
@ -36,3 +39,23 @@ typemap = {
method_name_overrides = { method_name_overrides = {
"Connect": "ConnectPeer", "Connect": "ConnectPeer",
} }
def snake_to_camel(snake_str: str):
components = snake_str.split("_")
# We capitalize the first letter of each component except the first one
# with the 'capitalize' method and join them together, while preserving
# existing camel cases.
camel_case = components[0]
for word in components[1:]:
if not word.isupper():
camel_case += word[0].upper() + word[1:]
else:
camel_case += word.capitalize()
return camel_case
def camel_to_snake(camel_case: str):
snake = re.sub(r"(?<!^)(?=[A-Z])", "_", camel_case).lower()
snake = snake.replace("-", "")
return snake