lnbits-legend/lnbits/decorators.py

260 lines
8.0 KiB
Python
Raw Normal View History

from http import HTTPStatus
2022-07-20 12:21:39 +02:00
from typing import Union
2021-08-29 19:38:42 +02:00
from cerberus import Validator # type: ignore
2021-10-18 17:06:06 +02:00
from fastapi import status
2021-08-29 19:38:42 +02:00
from fastapi.exceptions import HTTPException
from fastapi.openapi.models import APIKey, APIKeyIn
from fastapi.params import Security
from fastapi.security.api_key import APIKeyHeader, APIKeyQuery
from fastapi.security.base import SecurityBase
2021-10-18 17:06:06 +02:00
from pydantic.types import UUID4
2021-08-29 19:38:42 +02:00
from starlette.requests import Request
from lnbits.core.crud import get_user, get_wallet_for_key
2021-10-18 17:06:06 +02:00
from lnbits.core.models import User, Wallet
from lnbits.requestvars import g
from lnbits.settings import settings
2021-08-29 19:38:42 +02:00
class KeyChecker(SecurityBase):
2021-10-17 19:33:29 +02:00
def __init__(
self, scheme_name: str = None, auto_error: bool = True, api_key: str = None
):
2021-08-29 19:38:42 +02:00
self.scheme_name = scheme_name or self.__class__.__name__
self.auto_error = auto_error
self._key_type = "invoice"
self._api_key = api_key
if api_key:
key = APIKey(
2021-10-17 19:33:29 +02:00
**{"in": APIKeyIn.query},
name="X-API-KEY",
description="Wallet API Key - QUERY",
2021-08-29 19:38:42 +02:00
)
else:
key = APIKey(
2021-10-17 19:33:29 +02:00
**{"in": APIKeyIn.header},
name="X-API-KEY",
description="Wallet API Key - HEADER",
2021-08-29 19:38:42 +02:00
)
2022-07-26 12:21:21 +02:00
self.wallet = None # type: ignore
self.model: APIKey = key
2021-08-29 19:38:42 +02:00
async def __call__(self, request: Request):
2021-08-29 19:38:42 +02:00
try:
2021-10-17 19:33:29 +02:00
key_value = (
self._api_key
if self._api_key
else request.headers.get("X-API-KEY") or request.query_params["api-key"]
)
2021-08-29 19:38:42 +02:00
# FIXME: Find another way to validate the key. A fetch from DB should be avoided here.
# Also, we should not return the wallet here - thats silly.
# Possibly store it in a Redis DB
2022-07-26 12:21:21 +02:00
self.wallet = await get_wallet_for_key(key_value, self._key_type) # type: ignore
2022-07-26 12:07:16 +02:00
if not self.wallet:
2021-10-17 19:33:29 +02:00
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail="Invalid key or expired key.",
)
2021-08-29 19:38:42 +02:00
except KeyError:
2021-10-17 19:33:29 +02:00
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, detail="`X-API-KEY` header missing."
)
2021-08-29 19:38:42 +02:00
class WalletInvoiceKeyChecker(KeyChecker):
"""
WalletInvoiceKeyChecker will ensure that the provided invoice
wallet key is correct and populate g().wallet with the wallet
2021-08-29 19:38:42 +02:00
for the key in `X-API-key`.
The checker will raise an HTTPException when the key is wrong in some ways.
"""
2021-10-17 19:33:29 +02:00
def __init__(
self, scheme_name: str = None, auto_error: bool = True, api_key: str = None
):
2021-08-29 19:38:42 +02:00
super().__init__(scheme_name, auto_error, api_key)
self._key_type = "invoice"
2021-10-17 19:33:29 +02:00
2021-08-29 19:38:42 +02:00
class WalletAdminKeyChecker(KeyChecker):
"""
WalletAdminKeyChecker will ensure that the provided admin
wallet key is correct and populate g().wallet with the wallet
2021-08-29 19:38:42 +02:00
for the key in `X-API-key`.
The checker will raise an HTTPException when the key is wrong in some ways.
"""
2021-10-17 19:33:29 +02:00
def __init__(
self, scheme_name: str = None, auto_error: bool = True, api_key: str = None
):
2021-08-29 19:38:42 +02:00
super().__init__(scheme_name, auto_error, api_key)
self._key_type = "admin"
2021-10-17 19:33:29 +02:00
class WalletTypeInfo:
2021-08-29 19:38:42 +02:00
wallet_type: int
2022-07-26 12:21:21 +02:00
wallet: Wallet
2022-07-26 12:21:21 +02:00
def __init__(self, wallet_type: int, wallet: Wallet) -> None:
2021-08-29 19:38:42 +02:00
self.wallet_type = wallet_type
self.wallet = wallet
2021-10-17 19:33:29 +02:00
api_key_header = APIKeyHeader(
name="X-API-KEY",
auto_error=False,
description="Admin or Invoice key for wallet API's",
)
api_key_query = APIKeyQuery(
name="api-key",
auto_error=False,
description="Admin or Invoice key for wallet API's",
)
async def get_key_type(
r: Request,
2022-07-25 13:30:45 +02:00
api_key_header: str = Security(api_key_header), # type: ignore
api_key_query: str = Security(api_key_query), # type: ignore
2021-10-17 19:33:29 +02:00
) -> WalletTypeInfo:
2021-08-29 19:38:42 +02:00
# 0: admin
# 1: invoice
# 2: invalid
2022-06-01 14:53:05 +02:00
pathname = r["path"].split("/")[1]
2021-10-17 19:33:29 +02:00
token = api_key_header or api_key_query
2021-10-15 18:05:38 +02:00
if not token:
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail="Invoice (or Admin) key required.",
)
2021-10-17 19:33:29 +02:00
for typenr, WalletChecker in zip(
[0, 1], [WalletAdminKeyChecker, WalletInvoiceKeyChecker]
):
try:
checker = WalletChecker(api_key=token)
await checker.__call__(r)
wallet = WalletTypeInfo(typenr, checker.wallet) # type: ignore
if wallet is None or wallet.wallet is None:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Wallet does not exist."
)
if (
settings.lnbits_admin_users
and wallet.wallet.user not in settings.lnbits_admin_users
) and (
settings.lnbits_admin_extensions
and pathname in settings.lnbits_admin_extensions
):
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN,
detail="User not authorized for this extension.",
)
return wallet
except HTTPException as e:
if e.status_code == HTTPStatus.BAD_REQUEST:
raise
elif e.status_code == HTTPStatus.UNAUTHORIZED:
# we pass this in case it is not an invoice key, nor an admin key, and then return NOT_FOUND at the end of this block
pass
else:
raise
except:
raise
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Wallet does not exist."
)
2021-10-17 19:33:29 +02:00
2021-10-18 17:06:06 +02:00
async def require_admin_key(
r: Request,
2022-07-25 13:30:45 +02:00
api_key_header: str = Security(api_key_header), # type: ignore
api_key_query: str = Security(api_key_query), # type: ignore
2021-10-18 17:06:06 +02:00
):
token = api_key_header or api_key_query
if not token:
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail="Admin key required.",
)
2021-10-18 17:06:06 +02:00
wallet = await get_key_type(r, token)
2021-10-18 17:06:06 +02:00
if wallet.wallet_type != 0:
# If wallet type is not admin then return the unauthorized status
# This also covers when the user passes an invalid key type
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Admin key required."
)
else:
return wallet
2022-01-30 20:43:30 +01:00
2021-12-28 16:22:45 +01:00
async def require_invoice_key(
r: Request,
2022-07-25 13:30:45 +02:00
api_key_header: str = Security(api_key_header), # type: ignore
api_key_query: str = Security(api_key_query), # type: ignore
2021-12-28 16:22:45 +01:00
):
token = api_key_header or api_key_query
if not token:
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail="Invoice (or Admin) key required.",
)
2021-12-28 16:22:45 +01:00
wallet = await get_key_type(r, token)
if wallet.wallet_type > 1:
# If wallet type is not invoice then return the unauthorized status
# This also covers when the user passes an invalid key type
raise HTTPException(
2022-01-30 20:43:30 +01:00
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invoice (or Admin) key required.",
2021-12-28 16:22:45 +01:00
)
else:
return wallet
async def check_user_exists(usr: UUID4) -> User:
g().user = await get_user(usr.hex)
if not g().user:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="User does not exist."
)
2022-09-22 10:46:11 +02:00
if (
len(settings.lnbits_allowed_users) > 0
and g().user.id not in settings.lnbits_allowed_users
):
2022-09-21 16:35:06 +02:00
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED, detail="User not authorized."
)
if g().user.id in settings.lnbits_admin_users:
2022-09-21 16:35:06 +02:00
g().user.admin = True
return g().user
async def check_admin(usr: UUID4) -> User:
user = await check_user_exists(usr)
if not user.id in settings.lnbits_admin_users:
raise HTTPException(
status_code=HTTPStatus.UNAUTHORIZED,
detail="User not authorized. No admin privileges.",
)
return user