mirror of
https://github.com/lnbits/lnbits-legend.git
synced 2024-11-20 10:39:59 +01:00
fusion's fix (not working)
This commit is contained in:
parent
36d55db4fc
commit
8e396beb44
@ -84,22 +84,27 @@ class WalletTypeInfo():
|
|||||||
self.wallet_type = wallet_type
|
self.wallet_type = wallet_type
|
||||||
self.wallet = wallet
|
self.wallet = wallet
|
||||||
|
|
||||||
|
api_key_header_xapi = APIKeyHeader(name="X-API-KEY", auto_error=False, description="Admin or Invoice key for wallet API's")
|
||||||
api_key_header = APIKeyHeader(name="X-API-KEY", auto_error=False, description="Admin or Invoice key for wallet API's")
|
api_key_header_auth = APIKeyHeader(name="AUTHORIZATION", auto_error=False, description="Admin or Invoice key for wallet API's")
|
||||||
api_key_query = APIKeyQuery(name="api-key", auto_error=False, description="Admin or Invoice key for wallet API's")
|
api_key_query = APIKeyQuery(name="api-key", auto_error=False, description="Admin or Invoice key for wallet API's")
|
||||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
|
||||||
async def get_key_type(r: Request,
|
async def get_key_type(r: Request,
|
||||||
token: str = Security(oauth2_scheme),
|
api_key_header_auth: str = Security(api_key_header_auth),
|
||||||
api_key_header: str = Security(api_key_header),
|
api_key_header: str = Security(api_key_header_auth),
|
||||||
api_key_query: str = Security(api_key_query)) -> WalletTypeInfo:
|
api_key_query: str = Security(api_key_query)) -> WalletTypeInfo:
|
||||||
# 0: admin
|
# 0: admin
|
||||||
# 1: invoice
|
# 1: invoice
|
||||||
# 2: invalid
|
# 2: invalid
|
||||||
# print("TOKEN", b64decode(token).decode("utf-8").split(":"))
|
# print("TOKEN", b64decode(token).decode("utf-8").split(":"))
|
||||||
|
|
||||||
key_type, key = b64decode(token).decode("utf-8").split(":")
|
if api_key_header_xapi:
|
||||||
|
token = api_key_header_xapi
|
||||||
|
elif api_key_header_auth:
|
||||||
|
_, token = b64decode(api_key_header_auth).decode("utf-8").split(":")
|
||||||
|
elif api_key_query:
|
||||||
|
token = api_key_query
|
||||||
|
|
||||||
try:
|
try:
|
||||||
checker = WalletAdminKeyChecker(api_key=key if token else api_key_query)
|
checker = WalletAdminKeyChecker(api_key=token)
|
||||||
await checker.__call__(r)
|
await checker.__call__(r)
|
||||||
return WalletTypeInfo(0, checker.wallet)
|
return WalletTypeInfo(0, checker.wallet)
|
||||||
except HTTPException as e:
|
except HTTPException as e:
|
||||||
@ -111,7 +116,7 @@ async def get_key_type(r: Request,
|
|||||||
raise
|
raise
|
||||||
|
|
||||||
try:
|
try:
|
||||||
checker = WalletInvoiceKeyChecker(api_key=key if token else None)
|
checker = WalletInvoiceKeyChecker(api_key=token)
|
||||||
await checker.__call__(r)
|
await checker.__call__(r)
|
||||||
return WalletTypeInfo(1, checker.wallet)
|
return WalletTypeInfo(1, checker.wallet)
|
||||||
except HTTPException as e:
|
except HTTPException as e:
|
||||||
@ -121,6 +126,42 @@ async def get_key_type(r: Request,
|
|||||||
return WalletTypeInfo(2, None)
|
return WalletTypeInfo(2, None)
|
||||||
except:
|
except:
|
||||||
raise
|
raise
|
||||||
|
# api_key_header = APIKeyHeader(name="X-API-KEY", auto_error=False, description="Admin or Invoice key for wallet API's")
|
||||||
|
# api_key_query = APIKeyQuery(name="api-key", auto_error=False, description="Admin or Invoice key for wallet API's")
|
||||||
|
# oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
||||||
|
# async def get_key_type(r: Request,
|
||||||
|
# token: str = Security(oauth2_scheme),
|
||||||
|
# api_key_header: str = Security(api_key_header),
|
||||||
|
# api_key_query: str = Security(api_key_query)) -> WalletTypeInfo:
|
||||||
|
# # 0: admin
|
||||||
|
# # 1: invoice
|
||||||
|
# # 2: invalid
|
||||||
|
# # print("TOKEN", b64decode(token).decode("utf-8").split(":"))
|
||||||
|
#
|
||||||
|
# key_type, key = b64decode(token).decode("utf-8").split(":")
|
||||||
|
# try:
|
||||||
|
# checker = WalletAdminKeyChecker(api_key=key if token else api_key_query)
|
||||||
|
# await checker.__call__(r)
|
||||||
|
# return WalletTypeInfo(0, checker.wallet)
|
||||||
|
# except HTTPException as e:
|
||||||
|
# if e.status_code == HTTPStatus.BAD_REQUEST:
|
||||||
|
# raise
|
||||||
|
# if e.status_code == HTTPStatus.UNAUTHORIZED:
|
||||||
|
# pass
|
||||||
|
# except:
|
||||||
|
# raise
|
||||||
|
#
|
||||||
|
# try:
|
||||||
|
# checker = WalletInvoiceKeyChecker(api_key=key if token else None)
|
||||||
|
# await checker.__call__(r)
|
||||||
|
# return WalletTypeInfo(1, checker.wallet)
|
||||||
|
# except HTTPException as e:
|
||||||
|
# if e.status_code == HTTPStatus.BAD_REQUEST:
|
||||||
|
# raise
|
||||||
|
# if e.status_code == HTTPStatus.UNAUTHORIZED:
|
||||||
|
# return WalletTypeInfo(2, None)
|
||||||
|
# except:
|
||||||
|
# raise
|
||||||
|
|
||||||
def api_validate_post_request(*, schema: dict):
|
def api_validate_post_request(*, schema: dict):
|
||||||
def wrap(view):
|
def wrap(view):
|
||||||
|
Loading…
Reference in New Issue
Block a user