lnbits-legend/tests/api/test_auth.py
dni ⚡ 2940cf97c5
feat: parse nested pydantic models fetchone and fetchall + add shortcuts for insert_query and update_query into Database (#2714)
* feat: add shortcuts for insert_query and update_query into `Database`
example: await db.insert("table_name", base_model)
* remove where from argument
* chore: code clean-up
* extension manager
* lnbits-qrcode  components
* parse date from dict
* refactor: make `settings` a fixture
* chore: remove verbose key names
* fix: time column
* fix: cast balance to `int`
* extension toggle vue3
* vue3 @input migration
* fix: payment extra and payment hash
* fix dynamic fields and ext db migration
* remove shadow on cards in dark theme
* screwed up and made more css pushes to this branch
* attempt to make chip component in settings dynamic fields
* dynamic chips
* qrscanner
* clean init admin settings
* make get_user better
* add dbversion model
* remove update_payment_status/extra/details
* traces for value and assertion errors
* refactor services
* add PaymentFiatAmount
* return Payment on api endpoints
* rename to get_user_from_account
* refactor: just refactor (#2740)
* rc5
* Fix db cache (#2741)
* [refactor] split services.py (#2742)
* refactor: spit `core.py` (#2743)
* refactor: make QR more customizable
* fix: print.html
* fix: qrcode options
* fix: white shadow on dark theme
* fix: datetime wasnt parsed in dict_to_model
* add timezone for conversion
* only parse timestamp for sqlite, postgres does it
* log internal payment success
* fix: export wallet to phone QR
* Adding a customisable border theme, like gradient (#2746)
* fixed mobile scan btn
* fix test websocket
* fix get_payments tests
* dict_to_model skip none values
* preimage none instead of defaulting to 0000...
* fixup test real invoice tests
* fixed pheonixd for wss
* fix nodemanager test settings
* fix lnbits funding
* only insert extension when they dont exist

---------

Co-authored-by: Vlad Stan <stan.v.vlad@gmail.com>
Co-authored-by: Tiago Vasconcelos <talvasconcelos@gmail.com>
Co-authored-by: Arc <ben@arc.wales>
Co-authored-by: Arc <33088785+arcbtc@users.noreply.github.com>
2024-10-29 09:58:22 +01:00

1027 lines
35 KiB
Python

import base64
import json
import os
import time
import jwt
import pytest
import secp256k1
import shortuuid
from httpx import AsyncClient
from lnbits.core.models import AccessTokenPayload, User
from lnbits.core.views.user_api import api_users_reset_password
from lnbits.settings import AuthMethods, Settings
from lnbits.utils.nostr import hex_to_npub, sign_event
nostr_event = {
"kind": 27235,
"tags": [["u", "http://localhost:5000/nostr"], ["method", "POST"]],
"created_at": 1727681048,
"content": "",
"pubkey": "f6e80df16fa27f1f2774af0ac61b096f8f63ce9116f0a954fca1e25baee84ba9",
"id": "0fd22355fe63043116fdfceb77be6bf22686aacd16b9e99a10fea6e55ae3f589",
"sig": "fb7eb47fa8355747f6837e55620103d73ba47b2c3164ab8319d2f164022a9f25"
"6e00ecda7d3c8945f07b7d6ecc18cfff34c07bc99677309e2b9310d9fc1bb138",
}
private_key = secp256k1.PrivateKey(
bytes.fromhex("6e00ecda7d3c8945f07b7d6ecc18cfff34c07bc99677309e2b9310d9fc1bb138")
)
pubkey_hex = private_key.pubkey.serialize().hex()[2:]
################################ LOGIN ################################
@pytest.mark.asyncio
async def test_login_bad_user(http_client: AsyncClient):
response = await http_client.post(
"/api/v1/auth", json={"username": "non_existing_user", "password": "secret1234"}
)
assert response.status_code == 401, "User does not exist"
assert response.json().get("detail") == "Invalid credentials."
@pytest.mark.asyncio
async def test_login_alan_usr(user_alan: User, http_client: AsyncClient):
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
assert response.status_code == 200, "Alan logs in OK."
access_token = response.json().get("access_token")
assert access_token is not None, "Expected access token after login."
response = await http_client.get(
"/api/v1/auth", headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == 200, "Alan logs in OK."
alan = response.json()
assert alan["id"] == user_alan.id
assert alan["username"] == user_alan.username
assert alan["email"] == user_alan.email
@pytest.mark.asyncio
async def test_login_usr_not_allowed(
user_alan: User, http_client: AsyncClient, settings: Settings
):
# exclude 'user_id_only'
settings.auth_allowed_methods = [AuthMethods.username_and_password.value]
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
assert response.status_code == 401, "Login method not allowed."
assert response.json().get("detail") == "Login by 'User ID' not allowed."
settings.auth_allowed_methods = AuthMethods.all()
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
assert response.status_code == 200, "Login with 'usr' allowed."
assert (
response.json().get("access_token") is not None
), "Expected access token after login."
@pytest.mark.asyncio
async def test_login_alan_username_password_ok(
user_alan: User, http_client: AsyncClient, settings: Settings
):
response = await http_client.post(
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
)
assert response.status_code == 200, "Alan logs in OK"
access_token = response.json().get("access_token")
assert access_token is not None
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
access_token_payload = AccessTokenPayload(**payload)
assert access_token_payload.sub == "alan", "Subject is Alan."
assert access_token_payload.email == "alan@lnbits.com"
assert access_token_payload.auth_time, "Auth time should be set by server."
assert (
0 <= time.time() - access_token_payload.auth_time <= 5
), "Auth time should be very close to now()."
response = await http_client.get(
"/api/v1/auth", headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == 200, "User exits."
user = User(**response.json())
assert user.username == "alan", "Username check."
assert user.email == "alan@lnbits.com", "Email check."
assert not user.pubkey, "No pubkey."
assert not user.admin, "Not admin."
assert not user.super_user, "Not superuser."
assert user.has_password, "Password configured."
assert (
len(user.wallets) == 1
), f"Expected 1 default wallet, not {len(user.wallets)}."
@pytest.mark.asyncio
async def test_login_alan_email_password_ok(user_alan: User, http_client: AsyncClient):
response = await http_client.post(
"/api/v1/auth", json={"username": user_alan.email, "password": "secret1234"}
)
assert response.status_code == 200, "Alan logs in OK"
access_token = response.json().get("access_token")
assert access_token is not None
@pytest.mark.asyncio
async def test_login_alan_password_nok(user_alan: User, http_client: AsyncClient):
response = await http_client.post(
"/api/v1/auth", json={"username": user_alan.username, "password": "bad_pasword"}
)
assert response.status_code == 401, "User does not exist"
assert response.json().get("detail") == "Invalid credentials."
@pytest.mark.asyncio
async def test_login_username_password_not_allowed(
user_alan: User, http_client: AsyncClient, settings: Settings
):
# exclude 'username_password'
settings.auth_allowed_methods = [AuthMethods.user_id_only.value]
response = await http_client.post(
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
)
assert response.status_code == 401, "Login method not allowed."
assert (
response.json().get("detail") == "Login by 'Username and Password' not allowed."
)
settings.auth_allowed_methods = AuthMethods.all()
response = await http_client.post(
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
)
assert response.status_code == 200, "Username and password is allowed."
assert response.json().get("access_token") is not None
@pytest.mark.asyncio
async def test_login_alan_change_auth_secret_key(
user_alan: User, http_client: AsyncClient, settings: Settings
):
response = await http_client.post(
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
)
assert response.status_code == 200, "Alan logs in OK"
access_token = response.json().get("access_token")
assert access_token is not None
initial_auth_secret_key = settings.auth_secret_key
settings.auth_secret_key = shortuuid.uuid()
response = await http_client.get(
"/api/v1/auth", headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == 401, "Access token not valid anymore."
assert response.json().get("detail") == "Invalid access token."
settings.auth_secret_key = initial_auth_secret_key
response = await http_client.get(
"/api/v1/auth", headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == 200, "Access token valid again."
################################ REGISTER WITH PASSWORD ################################
@pytest.mark.asyncio
async def test_register_ok(http_client: AsyncClient):
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
access_token = response.json().get("access_token")
assert response.status_code == 200, "User created."
assert response.json().get("access_token") is not None
response = await http_client.get(
"/api/v1/auth", headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == 200, "User exits."
user = User(**response.json())
assert user.username == f"u21.{tiny_id}", "Username check."
assert user.email == f"u21.{tiny_id}@lnbits.com", "Email check."
assert not user.pubkey, "No pubkey check."
assert not user.admin, "Not admin."
assert not user.super_user, "Not superuser."
assert user.has_password, "Password configured."
assert (
len(user.wallets) == 1
), f"Expected 1 default wallet, not {len(user.wallets)}."
@pytest.mark.asyncio
async def test_register_email_twice(http_client: AsyncClient):
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
assert response.json().get("access_token") is not None
tiny_id_2 = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id_2}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 400, "Not allowed."
assert response.json().get("detail") == "Email already exists."
@pytest.mark.asyncio
async def test_register_username_twice(http_client: AsyncClient):
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
assert response.json().get("access_token") is not None
tiny_id_2 = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id_2}@lnbits.com",
},
)
assert response.status_code == 400, "Not allowed."
assert response.json().get("detail") == "Username already exists."
@pytest.mark.asyncio
async def test_register_passwords_do_not_match(http_client: AsyncClient):
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret0000",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 400, "Bad passwords."
assert response.json().get("detail") == "Passwords do not match."
@pytest.mark.asyncio
async def test_register_bad_email(http_client: AsyncClient):
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": "not_an_email_lnbits.com",
},
)
assert response.status_code == 400, "Bad email."
assert response.json().get("detail") == "Invalid email."
################################ CHANGE PASSWORD ################################
@pytest.mark.asyncio
async def test_change_password_ok(http_client: AsyncClient, settings: Settings):
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
access_token_payload = AccessTokenPayload(**payload)
response = await http_client.put(
"/api/v1/auth/password",
headers={"Authorization": f"Bearer {access_token}"},
json={
"username": f"u21.{tiny_id}",
"user_id": access_token_payload.usr,
"password_old": "secret1234",
"password": "secret0000",
"password_repeat": "secret0000",
},
)
assert response.status_code == 200, "Password changed."
user = User(**response.json())
assert user.username == f"u21.{tiny_id}", "Username check."
assert user.email == f"u21.{tiny_id}@lnbits.com", "Email check."
response = await http_client.post(
"/api/v1/auth", json={"username": f"u21.{tiny_id}", "password": "secret1234"}
)
assert response.status_code == 401, "Old password does not work"
assert response.json().get("detail") == "Invalid credentials."
response = await http_client.post(
"/api/v1/auth", json={"username": f"u21.{tiny_id}", "password": "secret0000"}
)
assert response.status_code == 200, "New password works."
assert response.json().get("access_token") is not None, "Access token created."
@pytest.mark.asyncio
async def test_change_password_not_authenticated(http_client: AsyncClient):
tiny_id = shortuuid.uuid()[:8]
response = await http_client.put(
"/api/v1/auth/password",
json={
"username": f"u21.{tiny_id}",
"user_id": "0000",
"password_old": "secret1234",
"password": "secret0000",
"password_repeat": "secret0000",
},
)
assert response.status_code == 401, "User not authenticated."
assert response.json().get("detail") == "Missing user ID or access token."
@pytest.mark.asyncio
async def test_alan_change_password_old_nok(user_alan: User, http_client: AsyncClient):
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
assert response.status_code == 200, "Alan logs in OK."
access_token = response.json().get("access_token")
assert access_token is not None
response = await http_client.put(
"/api/v1/auth/password",
headers={"Authorization": f"Bearer {access_token}"},
json={
"username": user_alan.username,
"user_id": user_alan.id,
"password_old": "secret0000",
"password": "secret0001",
"password_repeat": "secret0001",
},
)
assert response.status_code == 400, "Old password bad."
assert response.json().get("detail") == "Invalid old password."
@pytest.mark.asyncio
async def test_alan_change_password_different_user(
user_alan: User, http_client: AsyncClient
):
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
assert response.status_code == 200, "Alan logs in OK."
access_token = response.json().get("access_token")
assert access_token is not None
response = await http_client.put(
"/api/v1/auth/password",
headers={"Authorization": f"Bearer {access_token}"},
json={
"username": user_alan.username,
"user_id": user_alan.id[::-1],
"password_old": "secret1234",
"password": "secret0001",
"password_repeat": "secret0001",
},
)
assert response.status_code == 400, "Different user id."
assert response.json().get("detail") == "Invalid user ID."
@pytest.mark.asyncio
async def test_alan_change_password_auth_threshold_expired(
user_alan: User, http_client: AsyncClient, settings: Settings
):
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
assert response.status_code == 200, "Alan logs in OK."
access_token = response.json().get("access_token")
assert access_token is not None
settings.auth_credetials_update_threshold = 1
time.sleep(1.1)
response = await http_client.put(
"/api/v1/auth/password",
headers={"Authorization": f"Bearer {access_token}"},
json={
"username": user_alan.username,
"user_id": user_alan.id,
"password_old": "secret1234",
"password": "secret1234",
"password_repeat": "secret1234",
},
)
assert response.status_code == 400
assert (
response.json().get("detail") == "You can only update your credentials"
" in the first 1 seconds."
" Please login again or ask a new reset key!"
)
################################ REGISTER PUBLIC KEY ################################
@pytest.mark.asyncio
async def test_register_nostr_ok(http_client: AsyncClient, settings: Settings):
event = {**nostr_event}
event["created_at"] = int(time.time())
private_key = secp256k1.PrivateKey(bytes.fromhex(os.urandom(32).hex()))
pubkey_hex = private_key.pubkey.serialize().hex()[2:]
event_signed = sign_event(event, pubkey_hex, private_key)
base64_event = base64.b64encode(json.dumps(event_signed).encode()).decode("ascii")
response = await http_client.post(
"/api/v1/auth/nostr",
headers={"Authorization": f"nostr {base64_event}"},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
access_token_payload = AccessTokenPayload(**payload)
assert access_token_payload.auth_time, "Auth time should be set by server."
assert (
0 <= time.time() - access_token_payload.auth_time <= 5
), "Auth time should be very close to now()."
response = await http_client.get(
"/api/v1/auth", headers={"Authorization": f"Bearer {access_token}"}
)
user = User(**response.json())
assert user.username is None, "No username."
assert user.email is None, "No email."
assert user.pubkey == pubkey_hex, "Pubkey check."
assert not user.admin, "Not admin."
assert not user.super_user, "Not superuser."
assert not user.has_password, "Password configured."
assert (
len(user.wallets) == 1
), f"Expected 1 default wallet, not {len(user.wallets)}."
@pytest.mark.asyncio
async def test_register_nostr_not_allowed(http_client: AsyncClient, settings: Settings):
# exclude 'nostr_auth_nip98'
settings.auth_allowed_methods = [AuthMethods.username_and_password.value]
response = await http_client.post(
"/api/v1/auth/nostr",
json={},
)
assert response.status_code == 401, "User not authenticated."
assert response.json().get("detail") == "Login with Nostr Auth not allowed."
settings.auth_allowed_methods = AuthMethods.all()
@pytest.mark.asyncio
async def test_register_nostr_bad_header(http_client: AsyncClient):
response = await http_client.post("/api/v1/auth/nostr")
assert response.status_code == 401, "Missing header."
assert response.json().get("detail") == "Nostr Auth header missing."
response = await http_client.post(
"/api/v1/auth/nostr",
headers={"Authorization": "Bearer xyz"},
)
assert response.status_code == 401, "Non nostr header."
assert response.json().get("detail") == "Invalid Authorization scheme."
response = await http_client.post(
"/api/v1/auth/nostr",
headers={"Authorization": "nostr xyz"},
)
assert response.status_code == 400, "Nostr not base64."
assert response.json().get("detail") == "Nostr login event cannot be parsed."
@pytest.mark.asyncio
async def test_register_nostr_bad_event(http_client: AsyncClient, settings: Settings):
settings.auth_allowed_methods = AuthMethods.all()
base64_event = base64.b64encode(json.dumps(nostr_event).encode()).decode("ascii")
response = await http_client.post(
"/api/v1/auth/nostr",
headers={"Authorization": f"nostr {base64_event}"},
)
assert response.status_code == 400, "Nostr event expired."
assert (
response.json().get("detail")
== f"More than {settings.auth_credetials_update_threshold}"
" seconds have passed since the event was signed."
)
corrupted_event = {**nostr_event}
corrupted_event["content"] = "xyz"
base64_event = base64.b64encode(json.dumps(corrupted_event).encode()).decode(
"ascii"
)
response = await http_client.post(
"/api/v1/auth/nostr",
headers={"Authorization": f"nostr {base64_event}"},
)
assert response.status_code == 400, "Nostr event signature invalid."
assert response.json().get("detail") == "Nostr login event is not valid."
@pytest.mark.asyncio
async def test_register_nostr_bad_event_kind(http_client: AsyncClient):
event_bad_kind = {**nostr_event}
event_bad_kind["kind"] = "12345"
event_bad_kind_signed = sign_event(event_bad_kind, pubkey_hex, private_key)
base64_event_bad_kind = base64.b64encode(
json.dumps(event_bad_kind_signed).encode()
).decode("ascii")
response = await http_client.post(
"/api/v1/auth/nostr",
headers={"Authorization": f"nostr {base64_event_bad_kind}"},
)
assert response.status_code == 400, "Nostr event kind invalid."
assert response.json().get("detail") == "Invalid event kind."
@pytest.mark.asyncio
async def test_register_nostr_bad_event_tag_u(http_client: AsyncClient):
event_bad_kind = {**nostr_event}
event_bad_kind["created_at"] = int(time.time())
event_bad_kind["tags"] = [["u", "http://localhost:5000/nostr"]]
event_bad_tag_signed = sign_event(event_bad_kind, pubkey_hex, private_key)
base64_event_tag_kind = base64.b64encode(
json.dumps(event_bad_tag_signed).encode()
).decode("ascii")
response = await http_client.post(
"/api/v1/auth/nostr",
headers={"Authorization": f"nostr {base64_event_tag_kind}"},
)
assert response.status_code == 400, "Nostr event tag missing."
assert response.json().get("detail") == "Tag 'method' is missing."
event_bad_kind["tags"] = [["u", "http://localhost:5000/nostr"], ["method", "XYZ"]]
event_bad_tag_signed = sign_event(event_bad_kind, pubkey_hex, private_key)
base64_event_tag_kind = base64.b64encode(
json.dumps(event_bad_tag_signed).encode()
).decode("ascii")
response = await http_client.post(
"/api/v1/auth/nostr",
headers={"Authorization": f"nostr {base64_event_tag_kind}"},
)
assert response.status_code == 400, "Nostr event tag invalid."
assert response.json().get("detail") == "Invalid value for tag 'method'."
@pytest.mark.asyncio
async def test_register_nostr_bad_event_tag_menthod(http_client: AsyncClient):
event_bad_kind = {**nostr_event}
event_bad_kind["created_at"] = int(time.time())
event_bad_kind["tags"] = [["method", "POST"]]
event_bad_tag_signed = sign_event(event_bad_kind, pubkey_hex, private_key)
base64_event = base64.b64encode(json.dumps(event_bad_tag_signed).encode()).decode(
"ascii"
)
response = await http_client.post(
"/api/v1/auth/nostr",
headers={"Authorization": f"nostr {base64_event}"},
)
assert response.status_code == 400, "Nostr event tag missing."
assert response.json().get("detail") == "Tag 'u' for URL is missing."
event_bad_kind["tags"] = [["u", "http://demo.lnbits.com/nostr"], ["method", "POST"]]
event_bad_tag_signed = sign_event(event_bad_kind, pubkey_hex, private_key)
base64_event = base64.b64encode(json.dumps(event_bad_tag_signed).encode()).decode(
"ascii"
)
response = await http_client.post(
"/api/v1/auth/nostr",
headers={"Authorization": f"nostr {base64_event}"},
)
assert response.status_code == 400, "Nostr event tag invalid."
assert (
response.json().get("detail") == "Invalid value for tag 'u':"
" 'http://demo.lnbits.com/nostr'."
)
################################ CHANGE PUBLIC KEY ################################
async def test_change_pubkey_npub_ok(http_client: AsyncClient, settings: Settings):
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
access_token_payload = AccessTokenPayload(**payload)
private_key = secp256k1.PrivateKey(bytes.fromhex(os.urandom(32).hex()))
pubkey_hex = private_key.pubkey.serialize().hex()[2:]
npub = hex_to_npub(pubkey_hex)
response = await http_client.put(
"/api/v1/auth/pubkey",
headers={"Authorization": f"Bearer {access_token}"},
json={
"user_id": access_token_payload.usr,
"pubkey": npub,
},
)
assert response.status_code == 200, "Pubkey changed."
user = User(**response.json())
assert user.username == f"u21.{tiny_id}", "Username check."
assert user.email == f"u21.{tiny_id}@lnbits.com", "Email check."
assert user.pubkey == pubkey_hex
@pytest.mark.asyncio
async def test_change_pubkey_ok(
http_client: AsyncClient, user_alan: User, settings: Settings
):
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
access_token_payload = AccessTokenPayload(**payload)
private_key = secp256k1.PrivateKey(bytes.fromhex(os.urandom(32).hex()))
pubkey_hex = private_key.pubkey.serialize().hex()[2:]
response = await http_client.put(
"/api/v1/auth/pubkey",
headers={"Authorization": f"Bearer {access_token}"},
json={
"user_id": access_token_payload.usr,
"pubkey": pubkey_hex,
},
)
assert response.status_code == 200, "Pubkey changed."
user = User(**response.json())
assert user.username == f"u21.{tiny_id}", "Username check."
assert user.email == f"u21.{tiny_id}@lnbits.com", "Email check."
assert user.pubkey == pubkey_hex
# Login with nostr
event = {**nostr_event}
event["created_at"] = int(time.time())
event["pubkey"] = pubkey_hex
event_signed = sign_event(event, pubkey_hex, private_key)
base64_event = base64.b64encode(json.dumps(event_signed).encode()).decode("ascii")
response = await http_client.post(
"/api/v1/auth/nostr",
headers={"Authorization": f"nostr {base64_event}"},
)
assert response.status_code == 200, "User logged in."
access_token = response.json().get("access_token")
assert access_token is not None
response = await http_client.get(
"/api/v1/auth", headers={"Authorization": f"Bearer {access_token}"}
)
user = User(**response.json())
assert user.username == f"u21.{tiny_id}", "Username check."
assert user.email == f"u21.{tiny_id}@lnbits.com", "Email check."
assert user.pubkey == pubkey_hex, "No pubkey."
assert not user.admin, "Not admin."
assert not user.super_user, "Not superuser."
assert user.has_password, "Password configured."
assert len(user.wallets) == 1, "One default wallet."
response = await http_client.post(
"/api/v1/auth", json={"username": user_alan.username, "password": "secret1234"}
)
assert response.status_code == 200, "Alan logs in OK"
access_token = response.json().get("access_token")
assert access_token is not None
response = await http_client.put(
"/api/v1/auth/pubkey",
headers={"Authorization": f"Bearer {access_token}"},
json={
"user_id": user_alan.id,
"pubkey": pubkey_hex,
},
)
assert response.status_code == 400, "Pubkey already used."
assert response.json().get("detail") == "Public key already in use."
@pytest.mark.asyncio
async def test_change_pubkey_not_authenticated(
http_client: AsyncClient, user_alan: User
):
response = await http_client.put(
"/api/v1/auth/pubkey",
json={
"user_id": user_alan.id,
"pubkey": pubkey_hex,
},
)
assert response.status_code == 401, "Must be authenticated to change pubkey."
assert response.json().get("detail") == "Missing user ID or access token."
@pytest.mark.asyncio
async def test_change_pubkey_other_user(http_client: AsyncClient, user_alan: User):
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
assert response.status_code == 200, "Alan logs in OK."
access_token = response.json().get("access_token")
assert access_token is not None
response = await http_client.put(
"/api/v1/auth/pubkey",
headers={"Authorization": f"Bearer {access_token}"},
json={
"user_id": user_alan.id[::-1],
"pubkey": pubkey_hex,
},
)
assert response.status_code == 400, "Not your user."
assert response.json().get("detail") == "Invalid user ID."
@pytest.mark.asyncio
async def test_alan_change_pubkey_auth_threshold_expired(
user_alan: User, http_client: AsyncClient, settings: Settings
):
response = await http_client.post("/api/v1/auth/usr", json={"usr": user_alan.id})
assert response.status_code == 200, "Alan logs in OK."
access_token = response.json().get("access_token")
assert access_token is not None
settings.auth_credetials_update_threshold = 1
time.sleep(2.1)
response = await http_client.put(
"/api/v1/auth/pubkey",
headers={"Authorization": f"Bearer {access_token}"},
json={
"user_id": user_alan.id,
"pubkey": pubkey_hex,
},
)
assert response.status_code == 400, "Treshold expired."
assert (
response.json().get("detail") == "You can only update your credentials"
" in the first 1 seconds."
" Please login again or ask a new reset key!"
)
################################ RESET PASSWORD ################################
@pytest.mark.asyncio
async def test_request_reset_key_ok(http_client: AsyncClient, settings: Settings):
tiny_id = shortuuid.uuid()[:8]
response = await http_client.post(
"/api/v1/auth/register",
json={
"username": f"u21.{tiny_id}",
"password": "secret1234",
"password_repeat": "secret1234",
"email": f"u21.{tiny_id}@lnbits.com",
},
)
assert response.status_code == 200, "User created."
access_token = response.json().get("access_token")
assert access_token is not None
payload: dict = jwt.decode(access_token, settings.auth_secret_key, ["HS256"])
access_token_payload = AccessTokenPayload(**payload)
assert access_token_payload.usr, "User id set."
reset_key = await api_users_reset_password(access_token_payload.usr)
assert reset_key, "Reset key created."
assert reset_key[:10] == "reset_key_", "This is not a reset key."
response = await http_client.put(
"/api/v1/auth/reset",
json={
"reset_key": reset_key,
"password": "secret0000",
"password_repeat": "secret0000",
},
)
assert response.status_code == 200, "Password reset."
access_token = response.json().get("access_token")
assert access_token is not None
response = await http_client.post(
"/api/v1/auth", json={"username": f"u21.{tiny_id}", "password": "secret1234"}
)
assert response.status_code == 401, "Old passord not valid."
assert response.json().get("detail") == "Invalid credentials."
response = await http_client.post(
"/api/v1/auth", json={"username": f"u21.{tiny_id}", "password": "secret0000"}
)
assert response.status_code == 200, "Login new password OK."
access_token = response.json().get("access_token")
assert access_token is not None
@pytest.mark.asyncio
async def test_request_reset_key_user_not_found(http_client: AsyncClient):
user_id = "926abb2ab59a48ebb2485bcceb58d05e"
reset_key = await api_users_reset_password(user_id)
assert reset_key, "Reset key created."
assert reset_key[:10] == "reset_key_", "This is not a reset key."
response = await http_client.put(
"/api/v1/auth/reset",
json={
"reset_key": reset_key,
"password": "secret0000",
"password_repeat": "secret0000",
},
)
assert response.status_code == 404, "User does not exist."
assert response.json().get("detail") == "User not found."
@pytest.mark.asyncio
async def test_reset_username_password_not_allowed(
http_client: AsyncClient, settings: Settings
):
# exclude 'username_password'
settings.auth_allowed_methods = [AuthMethods.user_id_only.value]
user_id = "926abb2ab59a48ebb2485bcceb58d05e"
reset_key = await api_users_reset_password(user_id)
assert reset_key, "Reset key created."
response = await http_client.put(
"/api/v1/auth/reset",
json={
"reset_key": reset_key,
"password": "secret0000",
"password_repeat": "secret0000",
},
)
settings.auth_allowed_methods = AuthMethods.all()
assert response.status_code == 401, "Login method not allowed."
assert (
response.json().get("detail") == "Auth by 'Username and Password' not allowed."
)
@pytest.mark.asyncio
async def test_reset_username_passwords_do_not_matcj(
http_client: AsyncClient, user_alan: User
):
reset_key = await api_users_reset_password(user_alan.id)
assert reset_key, "Reset key created."
response = await http_client.put(
"/api/v1/auth/reset",
json={
"reset_key": reset_key,
"password": "secret0000",
"password_repeat": "secret-does-not-mathc",
},
)
assert response.status_code == 400, "Passwords do not match."
assert response.json().get("detail") == "Passwords do not match."
@pytest.mark.asyncio
async def test_reset_username_password_bad_key(http_client: AsyncClient):
response = await http_client.put(
"/api/v1/auth/reset",
json={
"reset_key": "reset_key_xxxxxxxxxxx",
"password": "secret0000",
"password_repeat": "secret0000",
},
)
assert response.status_code == 400, "Bad reset key."
assert response.json().get("detail") == "Invalid reset key."
@pytest.mark.asyncio
async def test_reset_password_auth_threshold_expired(
user_alan: User, http_client: AsyncClient, settings: Settings
):
reset_key = await api_users_reset_password(user_alan.id)
assert reset_key, "Reset key created."
settings.auth_credetials_update_threshold = 1
time.sleep(1.1)
response = await http_client.put(
"/api/v1/auth/reset",
json={
"reset_key": reset_key,
"password": "secret0000",
"password_repeat": "secret0000",
},
)
assert response.status_code == 400, "Treshold expired."
assert (
response.json().get("detail") == "You can only update your credentials"
" in the first 1 seconds."
" Please login again or ask a new reset key!"
)