core-lightning/plugins/clnrest/clnrest.py
Peter Neuroth eb3b1b8134 clnrest: Import sys in except clause explicitly
This fixes a crash on startup of core-lightning where gevent could not
be imported. This happens before sys is imported and throws us into the
except clause which calls sys.
By importing it explicitly in the except clause we are not dependend of
the order of imports in the try bracket.

Signed-off-by: Peter Neuroth <pet.v.ne@gmail.com>

Changelog-Fixes: Plugin: clnrest crashed on startup when gevent was
missing.
2023-11-21 08:24:09 +01:00

230 lines
7.7 KiB
Python
Executable file

#!/usr/bin/env python3
# For --hidden-import gunicorn.glogging gunicorn.workers.sync
try:
from gevent import monkey
monkey.patch_ssl()
import sys
import os
import re
import ssl
import time
import multiprocessing
from gunicorn import glogging # noqa: F401
from gunicorn.workers import sync # noqa: F401
from pathlib import Path
from flask import Flask, request, Blueprint
from flask_restx import Api
from flask_cors import CORS
from gunicorn.app.base import BaseApplication
from multiprocessing import Process, Queue
from flask_socketio import SocketIO, disconnect
from utilities.generate_certs import generate_certs
from utilities.shared import set_config, verify_rune
from utilities.rpc_routes import rpcns
from utilities.rpc_plugin import plugin
except ModuleNotFoundError as err:
# OK, something is not installed?
import json
import sys
getmanifest = json.loads(sys.stdin.readline())
print(json.dumps({'jsonrpc': "2.0",
'id': getmanifest['id'],
'result': {'disable': str(err)}}))
sys.exit(1)
multiprocessing.set_start_method('fork')
def check_origin(origin):
from utilities.shared import REST_CORS_ORIGINS
is_whitelisted = False
if REST_CORS_ORIGINS[0] == "*":
is_whitelisted = True
else:
for whitelisted_origin in REST_CORS_ORIGINS:
try:
does_match = bool(re.compile(whitelisted_origin).match(origin))
is_whitelisted = is_whitelisted or does_match
except Exception as err:
plugin.log(f"Error from rest-cors-origin {whitelisted_origin} match with {origin}: {err}", "info")
return is_whitelisted
jobs = {}
app = Flask(__name__)
socketio = SocketIO(app, async_mode="gevent", cors_allowed_origins=check_origin)
msgq = Queue()
def broadcast_from_message_queue():
while True:
while not msgq.empty():
msg = msgq.get()
if msg is None:
return
socketio.emit("message", msg)
# Wait for a second after processing all items in the queue
time.sleep(1)
# Starts a background task which pulls notifications from the message queue
# and broadcasts them to all connected ws clients at one-second intervals.
socketio.start_background_task(broadcast_from_message_queue)
@socketio.on("message")
def handle_message(message):
plugin.log(f"Received message from client: {message}", "debug")
socketio.emit('message', {"client_message": message, "session": request.sid})
@socketio.on("connect")
def ws_connect():
try:
plugin.log("Client Connecting...", "debug")
rune = request.headers.get("rune", None)
is_valid_rune = verify_rune(plugin, rune, "listclnrest-notifications", None)
if "error" in is_valid_rune:
# Logging as error/warn emits the event for all clients
plugin.log(f"Error: {is_valid_rune}", "info")
raise Exception(is_valid_rune)
plugin.log("Client Connected", "debug")
return True
except Exception as err:
# Logging as error/warn emits the event for all clients
plugin.log(f"{err}", "info")
disconnect()
def create_app():
from utilities.shared import REST_CORS_ORIGINS
global app
app.config["SECRET_KEY"] = os.urandom(24).hex()
authorizations = {
"rune": {"type": "apiKey", "in": "header", "name": "Rune"}
}
CORS(app, resources={r"/*": {"origins": REST_CORS_ORIGINS}})
blueprint = Blueprint("api", __name__)
api = Api(blueprint, version="1.0", title="Core Lightning Rest", description="Core Lightning REST API Swagger", authorizations=authorizations, security=["rune"])
app.register_blueprint(blueprint)
api.add_namespace(rpcns, path="/v1")
@app.after_request
def add_csp_headers(response):
try:
from utilities.shared import REST_CSP
response.headers['Content-Security-Policy'] = REST_CSP.replace('\\', '').replace("[\"", '').replace("\"]", '')
return response
except Exception as err:
plugin.log(f"Error from clnrest-csp config: {err}", "info")
def set_application_options(plugin):
from utilities.shared import CERTS_PATH, REST_PROTOCOL, REST_HOST, REST_PORT
plugin.log(f"REST Server is starting at {REST_PROTOCOL}://{REST_HOST}:{REST_PORT}", "debug")
if REST_PROTOCOL == "http":
# Assigning only one worker due to added complexity between gunicorn's multiple worker process forks
# and websocket connection's persistance with a single worker.
options = {
"bind": f"{REST_HOST}:{REST_PORT}",
"workers": 1,
"worker_class": "geventwebsocket.gunicorn.workers.GeventWebSocketWorker",
"timeout": 60,
"loglevel": "warning",
}
else:
cert_file = Path(f"{CERTS_PATH}/client.pem")
key_file = Path(f"{CERTS_PATH}/client-key.pem")
try:
if not cert_file.is_file() or not key_file.is_file():
plugin.log(f"Certificate not found at {CERTS_PATH}. Generating a new certificate!", "debug")
generate_certs(plugin, REST_HOST, CERTS_PATH)
plugin.log(f"Certs Path: {CERTS_PATH}", "debug")
except Exception as err:
raise Exception(f"{err}: Certificates do not exist at {CERTS_PATH}")
# Assigning only one worker due to added complexity between gunicorn's multiple worker process forks
# and websocket connection's persistance with a single worker.
options = {
"bind": f"{REST_HOST}:{REST_PORT}",
"workers": 1,
"worker_class": "geventwebsocket.gunicorn.workers.GeventWebSocketWorker",
"timeout": 60,
"loglevel": "warning",
"certfile": f"{CERTS_PATH}/client.pem",
"keyfile": f"{CERTS_PATH}/client-key.pem",
"ssl_version": ssl.PROTOCOL_TLSv1_2
}
return options
class CLNRestApplication(BaseApplication):
def __init__(self, app, options=None):
from utilities.shared import REST_PROTOCOL, REST_HOST, REST_PORT
self.application = app
self.options = options or {}
super().__init__()
plugin.log(f"REST server running at {REST_PROTOCOL}://{REST_HOST}:{REST_PORT}", "info")
def load_config(self):
config = {key: value for key, value in self.options.items()
if key in self.cfg.settings and value is not None}
for key, value in config.items():
self.cfg.set(key.lower(), value)
def load(self):
return self.application
def worker():
global app
options = set_application_options(plugin)
create_app()
CLNRestApplication(app, options).run()
def start_server():
global jobs
from utilities.shared import REST_PORT
if REST_PORT in jobs:
return False, "server already running"
p = Process(
target=worker,
args=[],
name="server on port {}".format(REST_PORT),
)
p.daemon = True
jobs[REST_PORT] = p
p.start()
return True
@plugin.init()
def init(options, configuration, plugin):
# We require options before we open a port.
err = set_config(options)
if err:
return {'disable': err}
start_server()
@plugin.subscribe("*")
def on_any_notification(request, **kwargs):
plugin.log("Notification: {}".format(kwargs), "debug")
if request.method == 'shutdown':
# A plugin which subscribes to shutdown is expected to exit itself.
sys.exit(0)
else:
msgq.put(kwargs)
try:
plugin.run()
except ValueError as err:
plugin.log("Unable to subscribe to all events. Feature available with CLN v23.08 and above: {}".format(err), "warn")
except (KeyboardInterrupt, SystemExit):
pass