From 911d2c117e3595b53a49b5014486a5d4a6b8cf21 Mon Sep 17 00:00:00 2001 From: Shahana Farooqui Date: Mon, 24 Jul 2023 10:50:53 -0700 Subject: [PATCH] plugins/clnrest: Websocket Server - Added Dependencies - Updated WS client instructions - WS Server --- plugins/clnrest/README.md | 59 ++++++++++++++++++++++++++--- plugins/clnrest/clnrest.py | 65 +++++++++++++++++++++++++++----- plugins/clnrest/requirements.txt | 30 ++++++++------- 3 files changed, 126 insertions(+), 28 deletions(-) diff --git a/plugins/clnrest/README.md b/plugins/clnrest/README.md index a09e7523a..1779561e5 100644 --- a/plugins/clnrest/README.md +++ b/plugins/clnrest/README.md @@ -4,7 +4,7 @@ CLNRest is a lightweight Python-based core lightning plugin that transforms RPC ## Installation -Install required packages with `pip install json5 flask flask_restx gunicorn pyln-client` or `pip install -r requirements.txt`. +Install required packages with `pip install json5 flask flask_restx gunicorn pyln-client flask-socketio gevent gevent-websocket` or `pip install -r requirements.txt`. Note: if you have the older c-lightning-rest plugin, you can use `disable-plugin clnrest.py` to avoid any conflict with this one. Of course, you could use this one instead! @@ -17,10 +17,6 @@ If `rest-port` is not specified, the plugin will disable itself. - --rest-host: Defines the REST server host. Default is 127.0.0.1. - --rest-certs: Defines the path for HTTPS cert & key. Default path is same as RPC file path to utilize gRPC's client certificate. If it is missing at the configured location, new identity (`client.pem` and `client-key.pem`) will be generated. -## Plugin - -- It can be configured by adding `plugin=//clnrest/clnrest.py` to the Core Lightning's config file. - ## Server With the default configurations, the Swagger user interface will be available at https://127.0.0.1:3010/. The POST method requires `rune` and `nodeid` headers for authorization. @@ -46,3 +42,56 @@ This option should be used only when testing with self signed certificate. Postman with JSON body Postman bkpr plugin RPC

+ +## Websocket Server +Websocket server is available at `/ws` endpoint. clnrest queues up notifications received for a second then broadcasts them to listeners. + +### Websocket client examples + +#### Python + +```python +import socketio +import requests + +http_session = requests.Session() +http_session.verify = False +sio = socketio.Client(http_session=http_session) + +@sio.event +def message(data): + print(f'I received a message: {data}') + +@sio.event +def connect(): + print("I'm connected!") + +@sio.event +def disconnect(): + print("I'm disconnected!") + +sio.connect('https://127.0.0.1:3010/ws') +sio.wait() + +``` + +#### NodeJS + +```javascript +const io = require('socket.io-client'); + +const socket = io.connect('https://127.0.0.1:3010', {rejectUnauthorized: false}); + +socket.on('connect', function() { + console.log("I'm connected!"); +}); + +socket.on('message', function(data) { + console.log('I received a message: ', data); +}); + +socket.on('disconnect', function() { + console.log("I'm disconnected!"); +}); + +``` diff --git a/plugins/clnrest/clnrest.py b/plugins/clnrest/clnrest.py index 04159f2ff..8b7d73b09 100755 --- a/plugins/clnrest/clnrest.py +++ b/plugins/clnrest/clnrest.py @@ -4,11 +4,14 @@ try: from gunicorn import glogging # noqa: F401 from gunicorn.workers import sync # noqa: F401 + import os + import time from pathlib import Path from flask import Flask from flask_restx import Api from gunicorn.app.base import BaseApplication - from multiprocessing import Process, cpu_count + from multiprocessing import Process, Queue + from flask_socketio import SocketIO from utilities.generate_certs import generate_certs from utilities.shared import set_config from utilities.rpc_routes import rpcns @@ -17,24 +20,54 @@ except ModuleNotFoundError as err: # OK, something is not installed? import json import sys - getmanfest = json.loads(sys.stdin.readline()) + getmanifest = json.loads(sys.stdin.readline()) print(json.dumps({'jsonrpc': "2.0", - 'id': getmanfest['id'], + 'id': getmanifest['id'], 'result': {'disable': str(err)}})) sys.exit(1) + jobs = {} +app = Flask(__name__) +socketio = SocketIO(app, async_mode='gevent', cors_allowed_origins='*') +msgq = Queue() + + +def broadcast_from_message_queue(): + while True: + while not msgq.empty(): + msg = msgq.get() + if msg is None: + return + plugin.log(f"Emitting message: {msg}", "debug") + socketio.emit("message", msg) + time.sleep(1) # Wait for a second after processing all items in the queue + + +socketio.start_background_task(broadcast_from_message_queue) + + +@socketio.on("connect", namespace="/ws") +def ws_connect(): + plugin.log("Client Connected", "debug") + msgq.put("Client Connected") + + +@socketio.on("disconnect", namespace="/ws") +def ws_disconnect(): + plugin.log("Client Disconnected", "debug") + msgq.put("Client Disconnected") def create_app(): - app = Flask(__name__) + global app + app.config['SECRET_KEY'] = os.urandom(24).hex() authorizations = { "rune": {"type": "apiKey", "in": "header", "name": "Rune"}, "nodeid": {"type": "apiKey", "in": "header", "name": "Nodeid"} } api = Api(app, version="1.0", title="Core Lightning Rest", description="Core Lightning REST API Swagger", authorizations=authorizations, security=["rune", "nodeid"]) api.add_namespace(rpcns, path="/v1") - return app def set_application_options(plugin): @@ -43,7 +76,8 @@ def set_application_options(plugin): if REST_PROTOCOL == "http": options = { "bind": f"{REST_HOST}:{REST_PORT}", - "workers": cpu_count(), + "workers": 1, + "worker_class": "geventwebsocket.gunicorn.workers.GeventWebSocketWorker", "timeout": 60, "loglevel": "warning", } @@ -59,7 +93,8 @@ def set_application_options(plugin): raise Exception(f"{err}: Certificates do not exist at {CERTS_PATH}") options = { "bind": f"{REST_HOST}:{REST_PORT}", - "workers": cpu_count(), + "workers": 1, + "worker_class": "geventwebsocket.gunicorn.workers.GeventWebSocketWorker", "timeout": 60, "loglevel": "warning", "certfile": f"{CERTS_PATH}/client.pem", @@ -87,16 +122,17 @@ class CLNRestApplication(BaseApplication): def worker(): + global app options = set_application_options(plugin) - app = create_app() + create_app() CLNRestApplication(app, options).run() def start_server(): + global jobs from utilities.shared import REST_PORT if REST_PORT in jobs: return False, "server already running" - p = Process( target=worker, args=[], @@ -117,4 +153,13 @@ def init(options, configuration, plugin): start_server() -plugin.run() +@plugin.subscribe("*") +def on_any_notification(request, **kwargs): + plugin.log("Notification: {}".format(kwargs), "debug") + msgq.put(str(kwargs)) + + +try: + plugin.run() +except (KeyboardInterrupt, SystemExit): + pass diff --git a/plugins/clnrest/requirements.txt b/plugins/clnrest/requirements.txt index f308c9778..8c1ef9e4f 100644 --- a/plugins/clnrest/requirements.txt +++ b/plugins/clnrest/requirements.txt @@ -2,33 +2,37 @@ aniso8601==9.0.1 asn1crypto==1.5.1 attrs==23.1.0 base58==2.1.1 +bidict==0.22.1 bitstring==3.1.9 blinker==1.6.2 -cachelib==0.10.2 cffi==1.15.1 -click==8.1.3 -coincurve==17.0.0 -cryptography==36.0.2 +click==8.1.6 +coincurve==18.0.0 +cryptography==41.0.2 Flask==2.3.2 -Flask-Cors==4.0.0 flask-restx==1.1.0 -Flask-WTF==1.1.1 -gevent==22.10.2 +Flask-SocketIO==5.3.4 +gevent==23.7.0 +gevent-websocket==0.10.1 greenlet==2.0.2 -gunicorn==20.1.0 +gunicorn==21.2.0 itsdangerous==2.1.2 Jinja2==3.1.2 json5==0.9.14 -jsonschema==4.17.3 +jsonschema==4.18.4 +jsonschema-specifications==2023.7.1 MarkupSafe==2.1.3 +packaging==23.1 pycparser==2.21 pyln-bolt7==1.0.246 -pyln-client==23.5 -pyln-proto==23.5 -pyrsistent==0.19.3 +pyln-client==23.5.2 +pyln-proto==23.5.2 PySocks==1.7.1 +python-engineio==4.5.1 +python-socketio==5.8.0 pytz==2023.3 +referencing==0.30.0 +rpds-py==0.9.2 Werkzeug==2.3.6 -WTForms==3.0.1 zope.event==5.0 zope.interface==6.0