lnbits-legend/lnbits/core/views/admin_api.py
2022-12-08 15:41:52 +02:00

74 lines
1.9 KiB
Python

from http import HTTPStatus
from typing import Optional
from fastapi import Body
from fastapi.params import Depends
from starlette.exceptions import HTTPException
from lnbits.core.crud import get_wallet
from lnbits.core.models import AdminSettings, UpdateSettings, User
from lnbits.core.services import update_wallet_balance
from lnbits.decorators import check_admin
from lnbits.server import server_restart
from .. import core_app
from ..crud import delete_admin_settings, get_admin_settings, update_admin_settings
@core_app.get(
"/admin/api/v1/restart/",
status_code=HTTPStatus.OK,
dependencies=[Depends(check_admin)],
)
async def api_restart_server() -> dict[str, str]:
server_restart.set()
return {"status": "Success"}
@core_app.get("/admin/api/v1/settings/")
async def api_get_settings(
user: User = Depends(check_admin), # type: ignore
) -> Optional[AdminSettings]:
admin_settings = await get_admin_settings(user.super_user)
return admin_settings
@core_app.put(
"/admin/api/v1/topup/",
status_code=HTTPStatus.OK,
dependencies=[Depends(check_admin)],
)
async def api_topup_balance(
id: str = Body(...), amount: int = Body(...)
) -> dict[str, str]:
try:
await get_wallet(id)
except:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN, detail="wallet does not exist."
)
await update_wallet_balance(wallet_id=id, amount=int(amount))
return {"status": "Success"}
@core_app.put(
"/admin/api/v1/settings/",
status_code=HTTPStatus.OK,
dependencies=[Depends(check_admin)],
)
async def api_update_settings(data: UpdateSettings):
await update_admin_settings(data)
return {"status": "Success"}
@core_app.delete(
"/admin/api/v1/settings/",
status_code=HTTPStatus.OK,
dependencies=[Depends(check_admin)],
)
async def api_delete_settings() -> dict[str, str]:
await delete_admin_settings()
return {"status": "Success"}