lnbits-legend/lnbits/core/views/auth_api.py
dni ⚡ a4c000d7dc
feat: add password reset for usermanager (#2688)
* feat: add password reset for usermanager
- add a reset_key to account table
- add ?reset_key= GET arguments to index.html and show reset form if provided
- superuser can generate and copy reset url with key to share
future ideas:
- could add send forgot password email if user fill out email address
* feat: simplify reset key
* test: use reset key
* test: add more tests
* test: reset passwords do not match
* test: `reset_password_auth_threshold_expired`

---------

Co-authored-by: Vlad Stan <stan.v.vlad@gmail.com>
2024-10-01 10:59:57 +02:00

495 lines
17 KiB
Python

import base64
import importlib
import json
from time import time
from typing import Callable, Optional
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi_sso.sso.base import OpenID, SSOBase
from loguru import logger
from starlette.status import (
HTTP_400_BAD_REQUEST,
HTTP_401_UNAUTHORIZED,
HTTP_403_FORBIDDEN,
HTTP_500_INTERNAL_SERVER_ERROR,
)
from lnbits.core.services import create_user_account
from lnbits.decorators import access_token_payload, check_user_exists
from lnbits.helpers import (
create_access_token,
decrypt_internal_message,
encrypt_internal_message,
is_valid_email_address,
is_valid_username,
)
from lnbits.settings import AuthMethods, settings
from lnbits.utils.nostr import normalize_public_key, verify_event
from ..crud import (
get_account,
get_account_by_email,
get_account_by_pubkey,
get_account_by_username_or_email,
get_user,
get_user_password,
update_account,
update_user_password,
update_user_pubkey,
verify_user_password,
)
from ..models import (
AccessTokenPayload,
CreateUser,
LoginUsernamePassword,
LoginUsr,
ResetUserPassword,
UpdateSuperuserPassword,
UpdateUser,
UpdateUserPassword,
UpdateUserPubkey,
User,
UserConfig,
)
auth_router = APIRouter(prefix="/api/v1/auth", tags=["Auth"])
@auth_router.get("", description="Get the authenticated user")
async def get_auth_user(user: User = Depends(check_user_exists)) -> User:
return user
@auth_router.post("", description="Login via the username and password")
async def login(data: LoginUsernamePassword) -> JSONResponse:
if not settings.is_auth_method_allowed(AuthMethods.username_and_password):
raise HTTPException(
HTTP_401_UNAUTHORIZED, "Login by 'Username and Password' not allowed."
)
try:
user = await get_account_by_username_or_email(data.username)
if not user:
raise HTTPException(HTTP_401_UNAUTHORIZED, "Invalid credentials.")
if not await verify_user_password(user.id, data.password):
raise HTTPException(HTTP_401_UNAUTHORIZED, "Invalid credentials.")
return _auth_success_response(user.username, user.id, user.email)
except HTTPException as exc:
raise exc
except Exception as exc:
logger.debug(exc)
raise HTTPException(HTTP_500_INTERNAL_SERVER_ERROR, "Cannot login.") from exc
@auth_router.post("/nostr", description="Login via Nostr")
async def nostr_login(request: Request) -> JSONResponse:
if not settings.is_auth_method_allowed(AuthMethods.nostr_auth_nip98):
raise HTTPException(HTTP_401_UNAUTHORIZED, "Login with Nostr Auth not allowed.")
try:
event = _nostr_nip98_event(request)
user = await get_account_by_pubkey(event["pubkey"])
if not user:
user = await create_user_account(
pubkey=event["pubkey"], user_config=UserConfig(provider="nostr")
)
return _auth_success_response(user.username or "", user.id, user.email)
except HTTPException as exc:
raise exc
except AssertionError as exc:
raise HTTPException(HTTP_401_UNAUTHORIZED, str(exc)) from exc
except Exception as exc:
logger.warning(exc)
raise HTTPException(HTTP_500_INTERNAL_SERVER_ERROR, "Cannot login.") from exc
@auth_router.post("/usr", description="Login via the User ID")
async def login_usr(data: LoginUsr) -> JSONResponse:
if not settings.is_auth_method_allowed(AuthMethods.user_id_only):
raise HTTPException(HTTP_401_UNAUTHORIZED, "Login by 'User ID' not allowed.")
try:
user = await get_user(data.usr)
if not user:
raise HTTPException(HTTP_401_UNAUTHORIZED, "User ID does not exist.")
return _auth_success_response(user.username or "", user.id, user.email)
except HTTPException as exc:
raise exc
except Exception as exc:
logger.debug(exc)
raise HTTPException(HTTP_500_INTERNAL_SERVER_ERROR, "Cannot login.") from exc
@auth_router.get("/{provider}", description="SSO Provider")
async def login_with_sso_provider(
request: Request, provider: str, user_id: Optional[str] = None
):
provider_sso = _new_sso(provider)
if not provider_sso:
raise HTTPException(
HTTP_401_UNAUTHORIZED, f"Login by '{provider}' not allowed."
)
provider_sso.redirect_uri = str(request.base_url) + f"api/v1/auth/{provider}/token"
with provider_sso:
state = encrypt_internal_message(user_id)
return await provider_sso.get_login_redirect(state=state)
@auth_router.get("/{provider}/token", description="Handle OAuth callback")
async def handle_oauth_token(request: Request, provider: str) -> RedirectResponse:
provider_sso = _new_sso(provider)
if not provider_sso:
raise HTTPException(
HTTP_401_UNAUTHORIZED, f"Login by '{provider}' not allowed."
)
try:
with provider_sso:
userinfo = await provider_sso.verify_and_process(request)
assert userinfo is not None
user_id = decrypt_internal_message(provider_sso.state)
request.session.pop("user", None)
return await _handle_sso_login(userinfo, user_id)
except HTTPException as exc:
raise exc
except ValueError as exc:
raise HTTPException(HTTP_403_FORBIDDEN, str(exc)) from exc
except Exception as exc:
logger.debug(exc)
raise HTTPException(
HTTP_500_INTERNAL_SERVER_ERROR,
f"Cannot authenticate user with {provider} Auth.",
) from exc
@auth_router.post("/logout")
async def logout() -> JSONResponse:
response = JSONResponse({"status": "success"}, status_code=status.HTTP_200_OK)
response.delete_cookie("cookie_access_token")
response.delete_cookie("is_lnbits_user_authorized")
response.delete_cookie("is_access_token_expired")
response.delete_cookie("lnbits_last_active_wallet")
return response
@auth_router.post("/register")
async def register(data: CreateUser) -> JSONResponse:
if not settings.is_auth_method_allowed(AuthMethods.username_and_password):
raise HTTPException(
HTTP_401_UNAUTHORIZED, "Register by 'Username and Password' not allowed."
)
if data.password != data.password_repeat:
raise HTTPException(HTTP_400_BAD_REQUEST, "Passwords do not match.")
if not data.username:
raise HTTPException(HTTP_400_BAD_REQUEST, "Missing username.")
if not is_valid_username(data.username):
raise HTTPException(HTTP_400_BAD_REQUEST, "Invalid username.")
if data.email and not is_valid_email_address(data.email):
raise HTTPException(HTTP_400_BAD_REQUEST, "Invalid email.")
try:
user = await create_user_account(
email=data.email, username=data.username, password=data.password
)
return _auth_success_response(user.username, user.id, user.email)
except ValueError as exc:
raise HTTPException(HTTP_403_FORBIDDEN, str(exc)) from exc
except Exception as exc:
logger.debug(exc)
raise HTTPException(
HTTP_500_INTERNAL_SERVER_ERROR, "Cannot create user."
) from exc
@auth_router.put("/password")
async def update_password(
data: UpdateUserPassword,
user: User = Depends(check_user_exists),
payload: AccessTokenPayload = Depends(access_token_payload),
) -> Optional[User]:
if data.user_id != user.id:
raise HTTPException(HTTP_400_BAD_REQUEST, "Invalid user ID.")
try:
if data.username and not user.username:
await update_account(user_id=user.id, username=data.username)
# old accounts do not have a pasword
if await get_user_password(data.user_id):
assert data.password_old, "Missing old password"
old_pwd_ok = await verify_user_password(data.user_id, data.password_old)
assert old_pwd_ok, "Invalid credentials."
return await update_user_password(data, payload.auth_time or 0)
except AssertionError as exc:
raise HTTPException(HTTP_403_FORBIDDEN, str(exc)) from exc
except Exception as exc:
logger.debug(exc)
raise HTTPException(
HTTP_500_INTERNAL_SERVER_ERROR, "Cannot update user password."
) from exc
@auth_router.put("/pubkey")
async def update_pubkey(
data: UpdateUserPubkey,
user: User = Depends(check_user_exists),
payload: AccessTokenPayload = Depends(access_token_payload),
) -> Optional[User]:
if data.user_id != user.id:
raise HTTPException(HTTP_400_BAD_REQUEST, "Invalid user ID.")
try:
data.pubkey = normalize_public_key(data.pubkey)
return await update_user_pubkey(data, payload.auth_time or 0)
except AssertionError as exc:
raise HTTPException(HTTP_403_FORBIDDEN, str(exc)) from exc
except Exception as exc:
logger.debug(exc)
raise HTTPException(
HTTP_500_INTERNAL_SERVER_ERROR, "Cannot update user pubkey."
) from exc
@auth_router.put("/reset")
async def reset_password(data: ResetUserPassword) -> JSONResponse:
if not settings.is_auth_method_allowed(AuthMethods.username_and_password):
raise HTTPException(
HTTP_401_UNAUTHORIZED, "Auth by 'Username and Password' not allowed."
)
try:
assert data.reset_key[:10] == "reset_key_", "This is not a reset key."
reset_data_json = decrypt_internal_message(
base64.b64decode(data.reset_key[10:]).decode()
)
assert reset_data_json, "Cannot process reset key."
action, user_id, request_time = json.loads(reset_data_json)
assert action == "reset", "Expected reset action."
assert user_id is not None, "Missing user ID."
assert request_time is not None, "Missing reset time."
user = await get_account(user_id)
assert user, "User not found."
update_pwd = UpdateUserPassword(
user_id=user.id,
username=user.username or "",
password=data.password,
password_repeat=data.password_repeat,
)
user = await update_user_password(update_pwd, request_time)
return _auth_success_response(
username=user.username, user_id=user_id, email=user.email
)
except AssertionError as exc:
raise HTTPException(HTTP_403_FORBIDDEN, str(exc)) from exc
except Exception as exc:
logger.warning(exc)
raise HTTPException(
HTTP_500_INTERNAL_SERVER_ERROR, "Cannot reset user password."
) from exc
@auth_router.put("/update")
async def update(
data: UpdateUser, user: User = Depends(check_user_exists)
) -> Optional[User]:
if data.user_id != user.id:
raise HTTPException(HTTP_400_BAD_REQUEST, "Invalid user ID.")
if data.username and not is_valid_username(data.username):
raise HTTPException(HTTP_400_BAD_REQUEST, "Invalid username.")
if data.email != user.email:
raise HTTPException(HTTP_400_BAD_REQUEST, "Email mismatch.")
try:
return await update_account(user.id, data.username, None, data.config)
except AssertionError as exc:
raise HTTPException(HTTP_403_FORBIDDEN, str(exc)) from exc
except Exception as exc:
logger.debug(exc)
raise HTTPException(
HTTP_500_INTERNAL_SERVER_ERROR, "Cannot update user."
) from exc
@auth_router.put("/first_install")
async def first_install(data: UpdateSuperuserPassword) -> JSONResponse:
if not settings.first_install:
raise HTTPException(HTTP_401_UNAUTHORIZED, "This is not your first install")
try:
await update_account(
user_id=settings.super_user,
username=data.username,
user_config=UserConfig(provider="lnbits"),
)
super_user = UpdateUserPassword(
user_id=settings.super_user,
password=data.password,
password_repeat=data.password_repeat,
username=data.username,
)
user = await update_user_password(super_user, int(time()))
settings.first_install = False
return _auth_success_response(user.username, user.id, user.email)
except AssertionError as exc:
raise HTTPException(HTTP_403_FORBIDDEN, str(exc)) from exc
except Exception as exc:
logger.debug(exc)
raise HTTPException(
HTTP_500_INTERNAL_SERVER_ERROR, "Cannot init user password."
) from exc
async def _handle_sso_login(userinfo: OpenID, verified_user_id: Optional[str] = None):
email = userinfo.email
if not email or not is_valid_email_address(email):
raise HTTPException(HTTP_400_BAD_REQUEST, "Invalid email.")
redirect_path = "/wallet"
user_config = UserConfig(**dict(userinfo))
user_config.email_verified = True
account = await get_account_by_email(email)
if verified_user_id:
if account:
raise HTTPException(HTTP_401_UNAUTHORIZED, "Email already used.")
account = await get_account(verified_user_id)
if not account:
raise HTTPException(HTTP_401_UNAUTHORIZED, "Cannot verify user email.")
redirect_path = "/account"
if account:
user = await update_account(account.id, email=email, user_config=user_config)
else:
if not settings.new_accounts_allowed:
raise HTTPException(HTTP_400_BAD_REQUEST, "Account creation is disabled.")
user = await create_user_account(email=email, user_config=user_config)
if not user:
raise HTTPException(HTTP_401_UNAUTHORIZED, "User not found.")
return _auth_redirect_response(redirect_path, email)
def _auth_success_response(
username: Optional[str] = None,
user_id: Optional[str] = None,
email: Optional[str] = None,
) -> JSONResponse:
payload = AccessTokenPayload(
sub=username or "", usr=user_id, email=email, auth_time=int(time())
)
access_token = create_access_token(data=payload.dict())
response = JSONResponse({"access_token": access_token, "token_type": "bearer"})
response.set_cookie("cookie_access_token", access_token, httponly=True)
response.set_cookie("is_lnbits_user_authorized", "true")
response.delete_cookie("is_access_token_expired")
return response
def _auth_redirect_response(path: str, email: str) -> RedirectResponse:
payload = AccessTokenPayload(sub="" or "", email=email, auth_time=int(time()))
access_token = create_access_token(data=payload.dict())
response = RedirectResponse(path)
response.set_cookie("cookie_access_token", access_token, httponly=True)
response.set_cookie("is_lnbits_user_authorized", "true")
response.delete_cookie("is_access_token_expired")
return response
def _new_sso(provider: str) -> Optional[SSOBase]:
try:
if not settings.is_auth_method_allowed(AuthMethods(f"{provider}-auth")):
return None
client_id = getattr(settings, f"{provider}_client_id", None)
client_secret = getattr(settings, f"{provider}_client_secret", None)
discovery_url = getattr(settings, f"{provider}_discovery_url", None)
if not client_id or not client_secret:
logger.warning(f"{provider} auth allowed but not configured.")
return None
sso_provider_class = _find_auth_provider_class(provider)
sso_provider = sso_provider_class(
client_id, client_secret, None, allow_insecure_http=True
)
if (
discovery_url
and getattr(sso_provider, "discovery_url", discovery_url) != discovery_url
):
sso_provider.discovery_url = discovery_url
return sso_provider
except Exception as e:
logger.warning(e)
return None
def _find_auth_provider_class(provider: str) -> Callable:
sso_modules = ["lnbits.core.sso", "fastapi_sso.sso"]
for module in sso_modules:
try:
provider_module = importlib.import_module(f"{module}.{provider}")
provider_class = getattr(provider_module, f"{provider.title()}SSO")
if provider_class:
return provider_class
except Exception:
pass
raise ValueError(f"No SSO provider found for '{provider}'.")
def _nostr_nip98_event(request: Request) -> dict:
auth_header = request.headers.get("Authorization")
assert auth_header, "Nostr Auth header missing."
scheme, token = auth_header.split()
assert scheme.lower() == "nostr", "Authorization header is not nostr."
event = None
try:
event_json = base64.b64decode(token.encode("ascii"))
event = json.loads(event_json)
except Exception as exc:
logger.warning(exc)
assert event, "Nostr login event cannot be parsed."
assert verify_event(event), "Nostr login event is not valid."
assert event["kind"] == 27_235, "Invalid event kind."
auth_threshold = settings.auth_credetials_update_threshold
assert (
abs(time() - event["created_at"]) < auth_threshold
), f"More than {auth_threshold} seconds have passed since the event was signed."
method: Optional[str] = next((v for k, v in event["tags"] if k == "method"), None)
assert method, "Tag 'method' is missing."
assert method.upper() == "POST", "Incorrect value for tag 'method'."
url = next((v for k, v in event["tags"] if k == "u"), None)
assert url, "Tag 'u' for URL is missing."
accepted_urls = [f"{u}/nostr" for u in settings.nostr_absolute_request_urls]
assert url in accepted_urls, f"Incorrect value for tag 'u': '{url}'."
return event