lnbits-legend/lnbits/extensions/boltz/views_api.py
2022-10-25 09:20:16 +02:00

338 lines
10 KiB
Python

from datetime import datetime
from http import HTTPStatus
from typing import List
import httpx
from fastapi import status
from fastapi.encoders import jsonable_encoder
from fastapi.param_functions import Body
from fastapi.params import Depends, Query
from loguru import logger
from pydantic import BaseModel
from starlette.exceptions import HTTPException
from starlette.requests import Request
from lnbits.core.crud import get_user
from lnbits.decorators import WalletTypeInfo, get_key_type, require_admin_key
from lnbits.settings import settings
from . import boltz_ext
from .boltz import (
create_refund_tx,
create_reverse_swap,
create_swap,
get_boltz_pairs,
get_swap_status,
)
from .crud import (
create_reverse_submarine_swap,
create_submarine_swap,
get_pending_reverse_submarine_swaps,
get_pending_submarine_swaps,
get_reverse_submarine_swap,
get_reverse_submarine_swaps,
get_submarine_swap,
get_submarine_swaps,
update_swap_status,
)
from .models import (
CreateReverseSubmarineSwap,
CreateSubmarineSwap,
ReverseSubmarineSwap,
SubmarineSwap,
)
from .utils import check_balance
@boltz_ext.get(
"/api/v1/swap/mempool",
name=f"boltz.get /swap/mempool",
summary="get a the mempool url",
description="""
This endpoint gets the URL from mempool.space
""",
response_description="mempool.space url",
response_model=str,
)
async def api_mempool_url():
return settings.boltz_mempool_space_url
# NORMAL SWAP
@boltz_ext.get(
"/api/v1/swap",
name=f"boltz.get /swap",
summary="get a list of swaps a swap",
description="""
This endpoint gets a list of normal swaps.
""",
response_description="list of normal swaps",
dependencies=[Depends(get_key_type)],
response_model=List[SubmarineSwap],
)
async def api_submarineswap(
g: WalletTypeInfo = Depends(get_key_type),
all_wallets: bool = Query(False),
):
wallet_ids = [g.wallet.id]
if all_wallets:
wallet_ids = (await get_user(g.wallet.user)).wallet_ids
for swap in await get_pending_submarine_swaps(wallet_ids):
swap_status = get_swap_status(swap)
if swap_status.hit_timeout:
if not swap_status.has_lockup:
logger.warning(
f"Boltz - swap: {swap.id} hit timeout, but no lockup tx..."
)
await update_swap_status(swap.id, "timeout")
return [swap.dict() for swap in await get_submarine_swaps(wallet_ids)]
@boltz_ext.post(
"/api/v1/swap/refund",
name=f"boltz.swap_refund",
summary="refund of a swap",
description="""
This endpoint attempts to refund a normal swaps, creates onchain tx and sets swap status ro refunded.
""",
response_description="refunded swap with status set to refunded",
dependencies=[Depends(require_admin_key)],
response_model=SubmarineSwap,
responses={
400: {"description": "when swap_id is missing"},
404: {"description": "when swap is not found"},
405: {"description": "when swap is not pending"},
500: {
"description": "when something goes wrong creating the refund onchain tx"
},
},
)
async def api_submarineswap_refund(
swap_id: str,
g: WalletTypeInfo = Depends(require_admin_key), # type: ignore
):
if swap_id == None:
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST, detail="swap_id missing"
)
swap = await get_submarine_swap(swap_id)
if swap == None:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="swap does not exist."
)
if swap.status != "pending":
raise HTTPException(
status_code=HTTPStatus.METHOD_NOT_ALLOWED, detail="swap is not pending."
)
try:
await create_refund_tx(swap)
except httpx.RequestError as exc:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Unreachable: {exc.request.url!r}.",
)
except Exception as exc:
raise HTTPException(status_code=HTTPStatus.METHOD_NOT_ALLOWED, detail=str(exc))
await update_swap_status(swap.id, "refunded")
return swap
@boltz_ext.post(
"/api/v1/swap",
status_code=status.HTTP_201_CREATED,
name=f"boltz.post /swap",
summary="create a submarine swap",
description="""
This endpoint creates a submarine swap
""",
response_description="create swap",
response_model=SubmarineSwap,
responses={
405: {"description": "not allowed method, insufficient balance"},
500: {"description": "boltz error"},
},
)
async def api_submarineswap_create(
data: CreateSubmarineSwap,
wallet: WalletTypeInfo = Depends(require_admin_key), # type: ignore
):
try:
swap_data = await create_swap(data)
except httpx.RequestError as exc:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Unreachable: {exc.request.url!r}.",
)
except Exception as exc:
raise HTTPException(status_code=HTTPStatus.METHOD_NOT_ALLOWED, detail=str(exc))
except httpx.HTTPStatusError as exc:
raise HTTPException(
status_code=exc.response.status_code, detail=exc.response.json()["error"]
)
swap = await create_submarine_swap(swap_data)
return swap.dict()
# REVERSE SWAP
@boltz_ext.get(
"/api/v1/swap/reverse",
name=f"boltz.get /swap/reverse",
summary="get a list of reverse swaps a swap",
description="""
This endpoint gets a list of reverse swaps.
""",
response_description="list of reverse swaps",
dependencies=[Depends(get_key_type)],
response_model=List[ReverseSubmarineSwap],
)
async def api_reverse_submarineswap(
g: WalletTypeInfo = Depends(get_key_type), # type:ignore
all_wallets: bool = Query(False),
):
wallet_ids = [g.wallet.id]
if all_wallets:
wallet_ids = (await get_user(g.wallet.user)).wallet_ids
return [swap.dict() for swap in await get_reverse_submarine_swaps(wallet_ids)]
@boltz_ext.post(
"/api/v1/swap/reverse",
status_code=status.HTTP_201_CREATED,
name=f"boltz.post /swap/reverse",
summary="create a reverse submarine swap",
description="""
This endpoint creates a reverse submarine swap
""",
response_description="create reverse swap",
response_model=ReverseSubmarineSwap,
responses={
405: {"description": "not allowed method, insufficient balance"},
500: {"description": "boltz error"},
},
)
async def api_reverse_submarineswap_create(
data: CreateReverseSubmarineSwap,
wallet: WalletTypeInfo = Depends(require_admin_key),
):
if not await check_balance(data):
raise HTTPException(
status_code=HTTPStatus.METHOD_NOT_ALLOWED, detail="Insufficient balance."
)
try:
swap_data, task = await create_reverse_swap(data)
except httpx.RequestError as exc:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Unreachable: {exc.request.url!r}.",
)
except httpx.HTTPStatusError as exc:
raise HTTPException(
status_code=exc.response.status_code, detail=exc.response.json()["error"]
)
except Exception as exc:
raise HTTPException(status_code=HTTPStatus.METHOD_NOT_ALLOWED, detail=str(exc))
swap = await create_reverse_submarine_swap(swap_data)
return swap.dict()
@boltz_ext.post(
"/api/v1/swap/status",
name=f"boltz.swap_status",
summary="shows the status of a swap",
description="""
This endpoint attempts to get the status of the swap.
""",
response_description="status of swap json",
responses={
404: {"description": "when swap_id is not found"},
},
)
async def api_swap_status(
swap_id: str, wallet: WalletTypeInfo = Depends(require_admin_key) # type: ignore
):
swap = await get_submarine_swap(swap_id) or await get_reverse_submarine_swap(
swap_id
)
if swap == None:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="swap does not exist."
)
try:
status = get_swap_status(swap)
except httpx.RequestError as exc:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Unreachable: {exc.request.url!r}.",
)
except Exception as exc:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(exc)
)
return status
@boltz_ext.post(
"/api/v1/swap/check",
name=f"boltz.swap_check",
summary="list all pending swaps",
description="""
This endpoint gives you 2 lists of pending swaps and reverse swaps.
""",
response_description="list of pending swaps",
)
async def api_check_swaps(
g: WalletTypeInfo = Depends(require_admin_key), # type: ignore
all_wallets: bool = Query(False),
):
wallet_ids = [g.wallet.id]
if all_wallets:
wallet_ids = (await get_user(g.wallet.user)).wallet_ids
status = []
try:
for swap in await get_pending_submarine_swaps(wallet_ids):
status.append(get_swap_status(swap))
for reverseswap in await get_pending_reverse_submarine_swaps(wallet_ids):
status.append(get_swap_status(reverseswap))
except httpx.RequestError as exc:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Unreachable: {exc.request.url!r}.",
)
except Exception as exc:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(exc)
)
return status
@boltz_ext.get(
"/api/v1/swap/boltz",
name=f"boltz.get /swap/boltz",
summary="get a boltz configuration",
description="""
This endpoint gets configuration for boltz. (limits, fees...)
""",
response_description="dict of boltz config",
response_model=dict,
)
async def api_boltz_config():
try:
res = get_boltz_pairs()
except httpx.RequestError as exc:
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=f"Unreachable: {exc.request.url!r}.",
)
except Exception as e:
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=str(e))
return res["pairs"]["BTC/BTC"]