core-lightning/plugins/clnrest/utilities/shared.py

87 lines
3.1 KiB
Python
Raw Normal View History

2023-07-14 11:36:24 +09:30
import json5
import re
import json
CERTS_PATH, REST_PROTOCOL, REST_HOST, REST_PORT, REST_CORS_ORIGINS = "", "", "", "", []
2023-07-14 11:36:24 +09:30
def set_config(options):
if 'rest-port' not in options:
return "`rest-port` option is not configured"
global CERTS_PATH, REST_PROTOCOL, REST_HOST, REST_PORT, REST_CORS_ORIGINS
2023-07-14 11:36:24 +09:30
CERTS_PATH = str(options["rest-certs"])
REST_PROTOCOL = str(options["rest-protocol"])
REST_HOST = str(options["rest-host"])
REST_PORT = int(options["rest-port"])
cors_origins = options["rest-cors-origins"]
REST_CORS_ORIGINS.clear()
for origin in cors_origins:
REST_CORS_ORIGINS.append(str(origin))
return None
2023-07-14 11:36:24 +09:30
def call_rpc_method(plugin, rpc_method, payload):
try:
response = plugin.rpc.call(rpc_method, payload)
if '"error":' in str(response).lower():
raise Exception(response)
else:
plugin.log(f"{response}", "debug")
if '"result":' in str(response).lower():
# Use json5.loads ONLY when necessary, as it increases processing time significantly
return json.loads(response)["result"]
else:
return response
except Exception as err:
plugin.log(f"Error: {err}", "error")
if "error" in str(err).lower():
match_err_obj = re.search(r'"error":\{.*?\}', str(err))
if match_err_obj is not None:
err = "{" + match_err_obj.group() + "}"
else:
match_err_str = re.search(r"error: \{.*?\}", str(err))
if match_err_str is not None:
err = "{" + match_err_str.group() + "}"
raise Exception(err)
def verify_rune(plugin, request):
rune = request.headers.get("rune", None)
if rune is None:
raise Exception('{ "error": {"code": 403, "message": "Not authorized: Missing rune"} }')
if request.is_json:
if len(request.data) != 0:
rpc_params = request.get_json()
else:
rpc_params = {}
2023-07-14 11:36:24 +09:30
else:
rpc_params = request.form.to_dict()
try:
rpc_method = request.view_args["rpc_method"]
except Exception:
rpc_method = ""
return call_rpc_method(plugin, "checkrune",
{"rune": rune,
"method": rpc_method,
"params": rpc_params})
2023-07-14 11:36:24 +09:30
def process_help_response(help_response):
# Use json5.loads due to single quotes in response
processed_res = json5.loads(str(help_response))["help"]
line = "\n---------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n\n"
processed_html_res = ""
for row in processed_res:
processed_html_res += f"Command: {row['command']}\n"
processed_html_res += f"Category: {row['category']}\n"
processed_html_res += f"Description: {row['description']}\n"
processed_html_res += f"Verbose: {row['verbose']}\n"
processed_html_res += line
return processed_html_res