mirror of
https://github.com/lnbits/lnbits-legend.git
synced 2024-11-20 10:39:59 +01:00
52 lines
2.0 KiB
Python
52 lines
2.0 KiB
Python
from fastapi import Request, HTTPException
|
|
from fastapi.security.api_key import APIKeyQuery, APIKeyCookie, APIKeyHeader, APIKey
|
|
|
|
# https://medium.com/data-rebels/fastapi-authentication-revisited-enabling-api-key-authentication-122dc5975680
|
|
|
|
from fastapi import Security, Depends, FastAPI, HTTPException
|
|
from fastapi.security.api_key import APIKeyQuery, APIKeyCookie, APIKeyHeader, APIKey
|
|
from fastapi.security.base import SecurityBase
|
|
|
|
|
|
API_KEY = "usr"
|
|
API_KEY_NAME = "X-API-key"
|
|
|
|
api_key_query = APIKeyQuery(name=API_KEY_NAME, auto_error=False)
|
|
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
|
|
|
|
|
|
class AuthBearer(SecurityBase):
|
|
def __init__(self, scheme_name: str = None, auto_error: bool = True):
|
|
self.scheme_name = scheme_name or self.__class__.__name__
|
|
self.auto_error = auto_error
|
|
|
|
async def __call__(self, request: Request):
|
|
key = await self.get_api_key()
|
|
print(key)
|
|
# credentials: HTTPAuthorizationCredentials = await super(AuthBearer, self).__call__(request)
|
|
# if credentials:
|
|
# if not credentials.scheme == "Bearer":
|
|
# raise HTTPException(
|
|
# status_code=403, detail="Invalid authentication scheme.")
|
|
# if not self.verify_jwt(credentials.credentials):
|
|
# raise HTTPException(
|
|
# status_code=403, detail="Invalid token or expired token.")
|
|
# return credentials.credentials
|
|
# else:
|
|
# raise HTTPException(
|
|
# status_code=403, detail="Invalid authorization code.")
|
|
|
|
async def get_api_key(
|
|
self,
|
|
api_key_query: str = Security(api_key_query),
|
|
api_key_header: str = Security(api_key_header),
|
|
):
|
|
if api_key_query == API_KEY:
|
|
return api_key_query
|
|
elif api_key_header == API_KEY:
|
|
return api_key_header
|
|
else:
|
|
raise HTTPException(
|
|
status_code=403, detail="Could not validate credentials"
|
|
)
|