mirror of
https://github.com/lnbits/lnbits-legend.git
synced 2025-02-24 22:58:46 +01:00
134 lines
5.2 KiB
Python
134 lines
5.2 KiB
Python
import json
|
|
import math
|
|
from quart import jsonify, request
|
|
from http import HTTPStatus
|
|
import traceback
|
|
|
|
from . import bleskomat_ext
|
|
from .crud import (
|
|
create_bleskomat_lnurl,
|
|
get_bleskomat_by_api_key_id,
|
|
get_bleskomat_lnurl,
|
|
)
|
|
|
|
from .exchange_rates import (
|
|
fetch_fiat_exchange_rate,
|
|
)
|
|
|
|
from .helpers import (
|
|
generate_bleskomat_lnurl_signature,
|
|
generate_bleskomat_lnurl_secret,
|
|
LnurlHttpError,
|
|
LnurlValidationError,
|
|
prepare_lnurl_params,
|
|
query_to_signing_payload,
|
|
unshorten_lnurl_query,
|
|
)
|
|
|
|
|
|
# Handles signed URL from Bleskomat ATMs and "action" callback of auto-generated LNURLs.
|
|
@bleskomat_ext.route("/u", methods=["GET"])
|
|
async def api_bleskomat_lnurl():
|
|
try:
|
|
query = request.args.to_dict()
|
|
|
|
# Unshorten query if "s" is used instead of "signature".
|
|
if "s" in query:
|
|
query = unshorten_lnurl_query(query)
|
|
|
|
if "signature" in query:
|
|
|
|
# Signature provided.
|
|
# Use signature to verify that the URL was generated by an authorized device.
|
|
# Later validate parameters, auto-generate LNURL, reply with LNURL response object.
|
|
signature = query["signature"]
|
|
|
|
# The API key ID, nonce, and tag should be present in the query string.
|
|
for field in ["id", "nonce", "tag"]:
|
|
if not field in query:
|
|
raise LnurlHttpError(
|
|
f'Failed API key signature check: Missing "{field}"',
|
|
HTTPStatus.BAD_REQUEST,
|
|
)
|
|
|
|
# URL signing scheme is described here:
|
|
# https://github.com/chill117/lnurl-node#how-to-implement-url-signing-scheme
|
|
payload = query_to_signing_payload(query)
|
|
api_key_id = query["id"]
|
|
bleskomat = await get_bleskomat_by_api_key_id(api_key_id)
|
|
if not bleskomat:
|
|
raise LnurlHttpError("Unknown API key", HTTPStatus.BAD_REQUEST)
|
|
api_key_secret = bleskomat.api_key_secret
|
|
api_key_encoding = bleskomat.api_key_encoding
|
|
expected_signature = generate_bleskomat_lnurl_signature(
|
|
payload, api_key_secret, api_key_encoding
|
|
)
|
|
if signature != expected_signature:
|
|
raise LnurlHttpError("Invalid API key signature", HTTPStatus.FORBIDDEN)
|
|
|
|
# Signature is valid.
|
|
# In the case of signed URLs, the secret is deterministic based on the API key ID and signature.
|
|
secret = generate_bleskomat_lnurl_secret(api_key_id, signature)
|
|
lnurl = await get_bleskomat_lnurl(secret)
|
|
if not lnurl:
|
|
try:
|
|
tag = query["tag"]
|
|
params = prepare_lnurl_params(tag, query)
|
|
if "f" in query:
|
|
rate = await fetch_fiat_exchange_rate(
|
|
currency=query["f"],
|
|
provider=bleskomat.exchange_rate_provider,
|
|
)
|
|
# Convert fee (%) to decimal:
|
|
fee = float(bleskomat.fee) / 100
|
|
if tag == "withdrawRequest":
|
|
for key in ["minWithdrawable", "maxWithdrawable"]:
|
|
amount_sats = int(
|
|
math.floor((params[key] / rate) * 1e8)
|
|
)
|
|
fee_sats = int(math.floor(amount_sats * fee))
|
|
amount_sats_less_fee = amount_sats - fee_sats
|
|
# Convert to msats:
|
|
params[key] = int(amount_sats_less_fee * 1e3)
|
|
except LnurlValidationError as e:
|
|
raise LnurlHttpError(e.message, HTTPStatus.BAD_REQUEST)
|
|
# Create a new LNURL using the query parameters provided in the signed URL.
|
|
params = json.JSONEncoder().encode(params)
|
|
lnurl = await create_bleskomat_lnurl(
|
|
bleskomat=bleskomat, secret=secret, tag=tag, params=params, uses=1
|
|
)
|
|
|
|
# Reply with LNURL response object.
|
|
return jsonify(lnurl.get_info_response_object(secret)), HTTPStatus.OK
|
|
|
|
# No signature provided.
|
|
# Treat as "action" callback.
|
|
|
|
if not "k1" in query:
|
|
raise LnurlHttpError("Missing secret", HTTPStatus.BAD_REQUEST)
|
|
|
|
secret = query["k1"]
|
|
lnurl = await get_bleskomat_lnurl(secret)
|
|
if not lnurl:
|
|
raise LnurlHttpError("Invalid secret", HTTPStatus.BAD_REQUEST)
|
|
|
|
if not lnurl.has_uses_remaining():
|
|
raise LnurlHttpError(
|
|
"Maximum number of uses already reached", HTTPStatus.BAD_REQUEST
|
|
)
|
|
|
|
try:
|
|
await lnurl.execute_action(query)
|
|
except LnurlValidationError as e:
|
|
raise LnurlHttpError(str(e), HTTPStatus.BAD_REQUEST)
|
|
|
|
except LnurlHttpError as e:
|
|
return jsonify({"status": "ERROR", "reason": str(e)}), e.http_status
|
|
except Exception:
|
|
traceback.print_exc()
|
|
return (
|
|
jsonify({"status": "ERROR", "reason": "Unexpected error"}),
|
|
HTTPStatus.INTERNAL_SERVER_ERROR,
|
|
)
|
|
|
|
return jsonify({"status": "OK"}), HTTPStatus.OK
|