2023-04-21 14:51:46 +02:00
|
|
|
import json
|
2023-12-12 12:38:19 +02:00
|
|
|
import re
|
|
|
|
from datetime import datetime, timedelta
|
2023-04-21 14:51:46 +02:00
|
|
|
from pathlib import Path
|
2023-04-03 14:55:49 +02:00
|
|
|
from typing import Any, List, Optional, Type
|
2021-08-27 20:54:42 +02:00
|
|
|
|
|
|
|
import jinja2
|
2023-04-03 14:55:49 +02:00
|
|
|
import shortuuid
|
2023-12-12 12:38:19 +02:00
|
|
|
from jose import jwt
|
2023-10-16 13:27:59 +02:00
|
|
|
from pydantic import BaseModel
|
2023-07-31 10:21:30 +02:00
|
|
|
from pydantic.schema import field_schema
|
2019-12-14 17:26:26 -03:00
|
|
|
|
2021-08-27 20:54:42 +02:00
|
|
|
from lnbits.jinja2_templating import Jinja2Templates
|
2023-09-25 15:04:44 +02:00
|
|
|
from lnbits.nodes import get_node_class
|
2021-08-27 20:54:42 +02:00
|
|
|
from lnbits.requestvars import g
|
2022-10-03 16:46:46 +02:00
|
|
|
from lnbits.settings import settings
|
2024-01-16 17:33:11 +02:00
|
|
|
from lnbits.utils.crypto import AESCipher
|
2020-02-18 20:27:04 +01:00
|
|
|
|
2023-05-09 10:18:53 +02:00
|
|
|
from .db import FilterModel
|
2023-01-20 10:06:32 +02:00
|
|
|
from .extension_manager import get_valid_extensions
|
2020-09-05 08:00:44 +02:00
|
|
|
|
2023-01-11 11:25:18 +02:00
|
|
|
|
2020-04-16 17:10:53 +02:00
|
|
|
def urlsafe_short_hash() -> str:
|
|
|
|
return shortuuid.uuid()
|
2020-09-15 15:54:05 -03:00
|
|
|
|
|
|
|
|
2021-10-17 18:33:29 +01:00
|
|
|
def url_for(endpoint: str, external: Optional[bool] = False, **params: Any) -> str:
|
2021-08-27 20:54:42 +02:00
|
|
|
base = g().base_url if external else ""
|
|
|
|
url_params = "?"
|
2023-01-21 12:15:18 +00:00
|
|
|
for key, value in params.items():
|
|
|
|
url_params += f"{key}={value}&"
|
2021-08-27 20:54:42 +02:00
|
|
|
url = f"{base}{endpoint}{url_params}"
|
|
|
|
return url
|
|
|
|
|
2022-09-22 10:46:11 +02:00
|
|
|
|
2023-10-27 13:50:49 +02:00
|
|
|
def static_url_for(static: str, path: str) -> str:
|
|
|
|
return f"/{static}/{path}?v={settings.server_startup_time}"
|
|
|
|
|
|
|
|
|
2023-03-31 12:46:24 +02:00
|
|
|
def template_renderer(additional_folders: Optional[List] = None) -> Jinja2Templates:
|
2023-01-21 12:16:31 +00:00
|
|
|
folders = ["lnbits/templates", "lnbits/core/templates"]
|
|
|
|
if additional_folders:
|
2023-09-25 13:44:29 +03:00
|
|
|
additional_folders += [
|
|
|
|
Path(settings.lnbits_extensions_path, "extensions", f)
|
|
|
|
for f in additional_folders
|
|
|
|
]
|
2023-01-21 12:16:31 +00:00
|
|
|
folders.extend(additional_folders)
|
|
|
|
t = Jinja2Templates(loader=jinja2.FileSystemLoader(folders))
|
2023-10-27 13:50:49 +02:00
|
|
|
t.env.globals["static_url_for"] = static_url_for
|
2022-06-27 01:11:35 +02:00
|
|
|
|
2022-12-02 14:36:09 +00:00
|
|
|
if settings.lnbits_ad_space_enabled:
|
|
|
|
t.env.globals["AD_SPACE"] = settings.lnbits_ad_space.split(",")
|
|
|
|
t.env.globals["AD_SPACE_TITLE"] = settings.lnbits_ad_space_title
|
2022-10-03 16:46:46 +02:00
|
|
|
|
2023-03-07 12:28:52 +01:00
|
|
|
t.env.globals["VOIDWALLET"] = settings.lnbits_backend_wallet_class == "VoidWallet"
|
2022-10-03 16:46:46 +02:00
|
|
|
t.env.globals["HIDE_API"] = settings.lnbits_hide_api
|
|
|
|
t.env.globals["SITE_TITLE"] = settings.lnbits_site_title
|
|
|
|
t.env.globals["LNBITS_DENOMINATION"] = settings.lnbits_denomination
|
|
|
|
t.env.globals["SITE_TAGLINE"] = settings.lnbits_site_tagline
|
|
|
|
t.env.globals["SITE_DESCRIPTION"] = settings.lnbits_site_description
|
|
|
|
t.env.globals["LNBITS_THEME_OPTIONS"] = settings.lnbits_theme_options
|
2023-10-09 20:11:11 +01:00
|
|
|
t.env.globals["LNBITS_QR_LOGO"] = settings.lnbits_qr_logo
|
2023-05-04 17:21:37 +02:00
|
|
|
t.env.globals["LNBITS_VERSION"] = settings.version
|
2023-10-11 13:46:27 +02:00
|
|
|
t.env.globals["LNBITS_NEW_ACCOUNTS_ALLOWED"] = settings.new_accounts_allowed
|
2023-12-12 12:38:19 +02:00
|
|
|
t.env.globals["LNBITS_AUTH_METHODS"] = settings.auth_allowed_methods
|
2023-02-22 14:30:57 +02:00
|
|
|
t.env.globals["LNBITS_ADMIN_UI"] = settings.lnbits_admin_ui
|
2023-11-21 08:11:21 -03:00
|
|
|
t.env.globals["LNBITS_SERVICE_FEE"] = settings.lnbits_service_fee
|
|
|
|
t.env.globals["LNBITS_SERVICE_FEE_MAX"] = settings.lnbits_service_fee_max
|
|
|
|
t.env.globals["LNBITS_SERVICE_FEE_WALLET"] = settings.lnbits_service_fee_wallet
|
2023-09-25 15:04:44 +02:00
|
|
|
t.env.globals["LNBITS_NODE_UI"] = (
|
|
|
|
settings.lnbits_node_ui and get_node_class() is not None
|
|
|
|
)
|
|
|
|
t.env.globals["LNBITS_NODE_UI_AVAILABLE"] = get_node_class() is not None
|
2023-01-20 09:39:27 +02:00
|
|
|
t.env.globals["EXTENSIONS"] = [
|
|
|
|
e
|
|
|
|
for e in get_valid_extensions()
|
|
|
|
if e.code not in settings.lnbits_deactivated_extensions
|
|
|
|
]
|
2022-10-03 16:46:46 +02:00
|
|
|
if settings.lnbits_custom_logo:
|
|
|
|
t.env.globals["USE_CUSTOM_LOGO"] = settings.lnbits_custom_logo
|
2021-10-17 18:33:29 +01:00
|
|
|
|
2023-04-27 16:05:49 +02:00
|
|
|
if settings.bundle_assets:
|
2023-10-27 13:50:49 +02:00
|
|
|
t.env.globals["INCLUDED_JS"] = ["bundle.min.js"]
|
|
|
|
t.env.globals["INCLUDED_CSS"] = ["bundle.min.css"]
|
2023-04-27 16:05:49 +02:00
|
|
|
else:
|
2023-04-21 14:51:46 +02:00
|
|
|
vendor_filepath = Path(settings.lnbits_path, "static", "vendor.json")
|
|
|
|
with open(vendor_filepath) as vendor_file:
|
|
|
|
vendor_files = json.loads(vendor_file.read())
|
|
|
|
t.env.globals["INCLUDED_JS"] = vendor_files["js"]
|
|
|
|
t.env.globals["INCLUDED_CSS"] = vendor_files["css"]
|
2021-08-27 20:54:42 +02:00
|
|
|
|
2023-09-11 15:48:49 +02:00
|
|
|
t.env.globals["WEBPUSH_PUBKEY"] = settings.lnbits_webpush_pubkey
|
|
|
|
|
2021-08-27 20:54:42 +02:00
|
|
|
return t
|
2022-10-04 09:51:47 +02:00
|
|
|
|
|
|
|
|
|
|
|
def get_current_extension_name() -> str:
|
|
|
|
"""
|
|
|
|
Returns the name of the extension that calls this method.
|
|
|
|
"""
|
|
|
|
import inspect
|
|
|
|
import json
|
|
|
|
import os
|
|
|
|
|
|
|
|
callee_filepath = inspect.stack()[1].filename
|
2023-01-21 12:07:19 +00:00
|
|
|
callee_dirname, _ = os.path.split(callee_filepath)
|
2022-10-04 09:51:47 +02:00
|
|
|
|
|
|
|
path = os.path.normpath(callee_dirname)
|
|
|
|
extension_director_name = path.split(os.sep)[-1]
|
|
|
|
try:
|
|
|
|
config_path = os.path.join(callee_dirname, "config.json")
|
|
|
|
with open(config_path) as json_file:
|
|
|
|
config = json.load(json_file)
|
|
|
|
ext_name = config["name"]
|
2023-08-16 12:17:54 +02:00
|
|
|
except Exception:
|
2022-10-04 09:51:47 +02:00
|
|
|
ext_name = extension_director_name
|
|
|
|
return ext_name
|
2023-04-03 14:55:49 +02:00
|
|
|
|
|
|
|
|
2023-05-09 10:18:53 +02:00
|
|
|
def generate_filter_params_openapi(model: Type[FilterModel], keep_optional=False):
|
2023-04-03 14:55:49 +02:00
|
|
|
"""
|
2023-08-24 11:26:09 +02:00
|
|
|
Generate openapi documentation for Filters. This is intended to be used along
|
|
|
|
parse_filters (see example)
|
2023-04-03 14:55:49 +02:00
|
|
|
:param model: Filter model
|
2023-08-24 11:26:09 +02:00
|
|
|
:param keep_optional: If false, all parameters will be optional,
|
|
|
|
otherwise inferred from model
|
2023-04-03 14:55:49 +02:00
|
|
|
"""
|
|
|
|
fields = list(model.__fields__.values())
|
|
|
|
params = []
|
|
|
|
for field in fields:
|
2023-07-31 10:21:30 +02:00
|
|
|
schema, _, _ = field_schema(field, model_name_map={})
|
2023-04-03 14:55:49 +02:00
|
|
|
|
|
|
|
description = "Supports Filtering"
|
2023-05-09 10:18:53 +02:00
|
|
|
if (
|
|
|
|
hasattr(model, "__search_fields__")
|
|
|
|
and field.name in model.__search_fields__
|
|
|
|
):
|
|
|
|
description += ". Supports Search"
|
2023-04-03 14:55:49 +02:00
|
|
|
|
|
|
|
parameter = {
|
|
|
|
"name": field.alias,
|
|
|
|
"in": "query",
|
|
|
|
"required": field.required if keep_optional else False,
|
|
|
|
"schema": schema,
|
|
|
|
"description": description,
|
|
|
|
}
|
|
|
|
params.append(parameter)
|
|
|
|
|
|
|
|
return {
|
|
|
|
"parameters": params,
|
|
|
|
}
|
2023-10-16 13:27:59 +02:00
|
|
|
|
|
|
|
|
|
|
|
def insert_query(table_name: str, model: BaseModel) -> str:
|
|
|
|
"""
|
|
|
|
Generate an insert query with placeholders for a given table and model
|
|
|
|
:param table_name: Name of the table
|
|
|
|
:param model: Pydantic model
|
|
|
|
"""
|
|
|
|
placeholders = ", ".join(["?"] * len(model.dict().keys()))
|
|
|
|
fields = ", ".join(model.dict().keys())
|
|
|
|
return f"INSERT INTO {table_name} ({fields}) VALUES ({placeholders})"
|
|
|
|
|
|
|
|
|
|
|
|
def update_query(table_name: str, model: BaseModel, where: str = "WHERE id = ?") -> str:
|
|
|
|
"""
|
|
|
|
Generate an update query with placeholders for a given table and model
|
|
|
|
:param table_name: Name of the table
|
|
|
|
:param model: Pydantic model
|
|
|
|
:param where: Where string, default to `WHERE id = ?`
|
|
|
|
"""
|
|
|
|
query = ", ".join([f"{field} = ?" for field in model.dict().keys()])
|
|
|
|
return f"UPDATE {table_name} SET {query} {where}"
|
2023-12-12 12:38:19 +02:00
|
|
|
|
|
|
|
|
|
|
|
def is_valid_email_address(email: str) -> bool:
|
|
|
|
email_regex = r"[A-Za-z0-9\._%+-]+@[A-Za-z0-9\.-]+\.[A-Za-z]{2,63}"
|
|
|
|
return re.fullmatch(email_regex, email) is not None
|
|
|
|
|
|
|
|
|
|
|
|
def is_valid_username(username: str) -> bool:
|
|
|
|
username_regex = r"(?=[a-zA-Z0-9._]{2,20}$)(?!.*[_.]{2})[^_.].*[^_.]"
|
|
|
|
return re.fullmatch(username_regex, username) is not None
|
|
|
|
|
|
|
|
|
|
|
|
def create_access_token(data: dict):
|
|
|
|
expire = datetime.utcnow() + timedelta(minutes=settings.auth_token_expire_minutes)
|
|
|
|
to_encode = data.copy()
|
|
|
|
to_encode.update({"exp": expire})
|
|
|
|
return jwt.encode(to_encode, settings.auth_secret_key, "HS256")
|
2024-01-16 17:33:11 +02:00
|
|
|
|
|
|
|
|
|
|
|
def encrypt_internal_message(m: Optional[str] = None) -> Optional[str]:
|
|
|
|
"""Encrypt message with the internal secret key"""
|
|
|
|
if not m:
|
|
|
|
return None
|
|
|
|
return AESCipher(key=settings.auth_secret_key).encrypt(m.encode())
|
|
|
|
|
|
|
|
|
|
|
|
def decrypt_internal_message(m: Optional[str] = None) -> Optional[str]:
|
|
|
|
"""Decrypt message with the internal secret key"""
|
|
|
|
if not m:
|
|
|
|
return None
|
|
|
|
return AESCipher(key=settings.auth_secret_key).decrypt(m)
|