plugins/clnrest: Websocket Server

- Added Dependencies
- Updated WS client instructions
- WS Server
This commit is contained in:
Shahana Farooqui 2023-07-24 10:50:53 -07:00 committed by Rusty Russell
parent 94205c094f
commit 911d2c117e
3 changed files with 126 additions and 28 deletions

View file

@ -4,7 +4,7 @@ CLNRest is a lightweight Python-based core lightning plugin that transforms RPC
## Installation
Install required packages with `pip install json5 flask flask_restx gunicorn pyln-client` or `pip install -r requirements.txt`.
Install required packages with `pip install json5 flask flask_restx gunicorn pyln-client flask-socketio gevent gevent-websocket` or `pip install -r requirements.txt`.
Note: if you have the older c-lightning-rest plugin, you can use `disable-plugin clnrest.py` to avoid any conflict with this one. Of course, you could use this one instead!
@ -17,10 +17,6 @@ If `rest-port` is not specified, the plugin will disable itself.
- --rest-host: Defines the REST server host. Default is 127.0.0.1.
- --rest-certs: Defines the path for HTTPS cert & key. Default path is same as RPC file path to utilize gRPC's client certificate. If it is missing at the configured location, new identity (`client.pem` and `client-key.pem`) will be generated.
## Plugin
- It can be configured by adding `plugin=/<path>/clnrest/clnrest.py` to the Core Lightning's config file.
## Server
With the default configurations, the Swagger user interface will be available at https://127.0.0.1:3010/. The POST method requires `rune` and `nodeid` headers for authorization.
@ -46,3 +42,56 @@ This option should be used only when testing with self signed certificate.
<img src="./.github/screenshots/Postman-with-body.png" width="200" alt="Postman with JSON body">
<img src="./.github/screenshots/Postman-bkpr-plugin.png" width="200" alt="Postman bkpr plugin RPC">
</p>
## Websocket Server
Websocket server is available at `/ws` endpoint. clnrest queues up notifications received for a second then broadcasts them to listeners.
### Websocket client examples
#### Python
```python
import socketio
import requests
http_session = requests.Session()
http_session.verify = False
sio = socketio.Client(http_session=http_session)
@sio.event
def message(data):
print(f'I received a message: {data}')
@sio.event
def connect():
print("I'm connected!")
@sio.event
def disconnect():
print("I'm disconnected!")
sio.connect('https://127.0.0.1:3010/ws')
sio.wait()
```
#### NodeJS
```javascript
const io = require('socket.io-client');
const socket = io.connect('https://127.0.0.1:3010', {rejectUnauthorized: false});
socket.on('connect', function() {
console.log("I'm connected!");
});
socket.on('message', function(data) {
console.log('I received a message: ', data);
});
socket.on('disconnect', function() {
console.log("I'm disconnected!");
});
```

View file

@ -4,11 +4,14 @@ try:
from gunicorn import glogging # noqa: F401
from gunicorn.workers import sync # noqa: F401
import os
import time
from pathlib import Path
from flask import Flask
from flask_restx import Api
from gunicorn.app.base import BaseApplication
from multiprocessing import Process, cpu_count
from multiprocessing import Process, Queue
from flask_socketio import SocketIO
from utilities.generate_certs import generate_certs
from utilities.shared import set_config
from utilities.rpc_routes import rpcns
@ -17,24 +20,54 @@ except ModuleNotFoundError as err:
# OK, something is not installed?
import json
import sys
getmanfest = json.loads(sys.stdin.readline())
getmanifest = json.loads(sys.stdin.readline())
print(json.dumps({'jsonrpc': "2.0",
'id': getmanfest['id'],
'id': getmanifest['id'],
'result': {'disable': str(err)}}))
sys.exit(1)
jobs = {}
app = Flask(__name__)
socketio = SocketIO(app, async_mode='gevent', cors_allowed_origins='*')
msgq = Queue()
def broadcast_from_message_queue():
while True:
while not msgq.empty():
msg = msgq.get()
if msg is None:
return
plugin.log(f"Emitting message: {msg}", "debug")
socketio.emit("message", msg)
time.sleep(1) # Wait for a second after processing all items in the queue
socketio.start_background_task(broadcast_from_message_queue)
@socketio.on("connect", namespace="/ws")
def ws_connect():
plugin.log("Client Connected", "debug")
msgq.put("Client Connected")
@socketio.on("disconnect", namespace="/ws")
def ws_disconnect():
plugin.log("Client Disconnected", "debug")
msgq.put("Client Disconnected")
def create_app():
app = Flask(__name__)
global app
app.config['SECRET_KEY'] = os.urandom(24).hex()
authorizations = {
"rune": {"type": "apiKey", "in": "header", "name": "Rune"},
"nodeid": {"type": "apiKey", "in": "header", "name": "Nodeid"}
}
api = Api(app, version="1.0", title="Core Lightning Rest", description="Core Lightning REST API Swagger", authorizations=authorizations, security=["rune", "nodeid"])
api.add_namespace(rpcns, path="/v1")
return app
def set_application_options(plugin):
@ -43,7 +76,8 @@ def set_application_options(plugin):
if REST_PROTOCOL == "http":
options = {
"bind": f"{REST_HOST}:{REST_PORT}",
"workers": cpu_count(),
"workers": 1,
"worker_class": "geventwebsocket.gunicorn.workers.GeventWebSocketWorker",
"timeout": 60,
"loglevel": "warning",
}
@ -59,7 +93,8 @@ def set_application_options(plugin):
raise Exception(f"{err}: Certificates do not exist at {CERTS_PATH}")
options = {
"bind": f"{REST_HOST}:{REST_PORT}",
"workers": cpu_count(),
"workers": 1,
"worker_class": "geventwebsocket.gunicorn.workers.GeventWebSocketWorker",
"timeout": 60,
"loglevel": "warning",
"certfile": f"{CERTS_PATH}/client.pem",
@ -87,16 +122,17 @@ class CLNRestApplication(BaseApplication):
def worker():
global app
options = set_application_options(plugin)
app = create_app()
create_app()
CLNRestApplication(app, options).run()
def start_server():
global jobs
from utilities.shared import REST_PORT
if REST_PORT in jobs:
return False, "server already running"
p = Process(
target=worker,
args=[],
@ -117,4 +153,13 @@ def init(options, configuration, plugin):
start_server()
plugin.run()
@plugin.subscribe("*")
def on_any_notification(request, **kwargs):
plugin.log("Notification: {}".format(kwargs), "debug")
msgq.put(str(kwargs))
try:
plugin.run()
except (KeyboardInterrupt, SystemExit):
pass

View file

@ -2,33 +2,37 @@ aniso8601==9.0.1
asn1crypto==1.5.1
attrs==23.1.0
base58==2.1.1
bidict==0.22.1
bitstring==3.1.9
blinker==1.6.2
cachelib==0.10.2
cffi==1.15.1
click==8.1.3
coincurve==17.0.0
cryptography==36.0.2
click==8.1.6
coincurve==18.0.0
cryptography==41.0.2
Flask==2.3.2
Flask-Cors==4.0.0
flask-restx==1.1.0
Flask-WTF==1.1.1
gevent==22.10.2
Flask-SocketIO==5.3.4
gevent==23.7.0
gevent-websocket==0.10.1
greenlet==2.0.2
gunicorn==20.1.0
gunicorn==21.2.0
itsdangerous==2.1.2
Jinja2==3.1.2
json5==0.9.14
jsonschema==4.17.3
jsonschema==4.18.4
jsonschema-specifications==2023.7.1
MarkupSafe==2.1.3
packaging==23.1
pycparser==2.21
pyln-bolt7==1.0.246
pyln-client==23.5
pyln-proto==23.5
pyrsistent==0.19.3
pyln-client==23.5.2
pyln-proto==23.5.2
PySocks==1.7.1
python-engineio==4.5.1
python-socketio==5.8.0
pytz==2023.3
referencing==0.30.0
rpds-py==0.9.2
Werkzeug==2.3.6
WTForms==3.0.1
zope.event==5.0
zope.interface==6.0