lnbits-legend/lnbits/extensions/lnurldevice/lnurl.py

269 lines
8.7 KiB
Python
Raw Normal View History

2022-01-21 21:23:32 +00:00
import base64
import hashlib
2022-07-16 14:23:03 +02:00
import hmac
2022-01-21 21:23:32 +00:00
from http import HTTPStatus
from io import BytesIO
2022-07-16 14:23:03 +02:00
from typing import Optional
2022-01-21 21:23:32 +00:00
2022-07-16 14:23:03 +02:00
from embit import bech32, compact
2022-01-21 21:23:32 +00:00
from fastapi import Request
from fastapi.param_functions import Query
from starlette.exceptions import HTTPException
from lnbits.core.services import create_invoice
from lnbits.core.views.api import pay_invoice
2022-07-16 14:23:03 +02:00
from lnbits.utils.exchange_rates import fiat_amount_as_satoshis
2022-01-21 21:23:32 +00:00
from . import lnurldevice_ext
from .crud import (
create_lnurldevicepayment,
get_lnurldevice,
get_lnurldevicepayment,
get_lnurlpayload,
2022-07-16 14:23:03 +02:00
update_lnurldevicepayment,
2022-01-21 21:23:32 +00:00
)
def bech32_decode(bech):
"""tweaked version of bech32_decode that ignores length limitations"""
if (any(ord(x) < 33 or ord(x) > 126 for x in bech)) or (
bech.lower() != bech and bech.upper() != bech
):
return
bech = bech.lower()
device = bech.rfind("1")
if device < 1 or device + 7 > len(bech):
return
if not all(x in bech32.CHARSET for x in bech[device + 1 :]):
return
hrp = bech[:device]
data = [bech32.CHARSET.find(x) for x in bech[device + 1 :]]
encoding = bech32.bech32_verify_checksum(hrp, data)
if encoding is None:
return
return bytes(bech32.convertbits(data[:-6], 5, 8, False))
def xor_decrypt(key, blob):
s = BytesIO(blob)
variant = s.read(1)[0]
if variant != 1:
raise RuntimeError("Not implemented")
# reading nonce
l = s.read(1)[0]
nonce = s.read(l)
if len(nonce) != l:
raise RuntimeError("Missing nonce bytes")
if l < 8:
raise RuntimeError("Nonce is too short")
# reading payload
l = s.read(1)[0]
payload = s.read(l)
if len(payload) > 32:
raise RuntimeError("Payload is too long for this encryption method")
if len(payload) != l:
raise RuntimeError("Missing payload bytes")
hmacval = s.read()
expected = hmac.new(
key, b"Data:" + blob[: -len(hmacval)], digestmod="sha256"
).digest()
if len(hmacval) < 8:
raise RuntimeError("HMAC is too short")
if hmacval != expected[: len(hmacval)]:
raise RuntimeError("HMAC is invalid")
secret = hmac.new(key, b"Round secret:" + nonce, digestmod="sha256").digest()
payload = bytearray(payload)
for i in range(len(payload)):
payload[i] = payload[i] ^ secret[i]
s = BytesIO(payload)
pin = compact.read_from(s)
amount_in_cent = compact.read_from(s)
return pin, amount_in_cent
@lnurldevice_ext.get(
"/api/v1/lnurl/{device_id}",
status_code=HTTPStatus.OK,
name="lnurldevice.lnurl_v1_params",
)
async def lnurl_v1_params(
request: Request,
device_id: str = Query(None),
p: str = Query(None),
atm: str = Query(None),
):
device = await get_lnurldevice(device_id)
if not device:
return {
"status": "ERROR",
"reason": f"lnurldevice {device_id} not found on this server",
}
paymentcheck = await get_lnurlpayload(p)
if device.device == "atm":
if paymentcheck:
2022-01-30 19:43:30 +00:00
return {"status": "ERROR", "reason": f"Payment already claimed"}
2022-10-06 17:10:15 +01:00
if device.device == "switch":
2022-10-07 13:46:40 +01:00
price_msat = (
await fiat_amount_as_satoshis(float(device.profit), device.currency)
if device.currency != "sat"
else amount_in_cent
) * 1000
2022-10-06 17:10:15 +01:00
lnurldevicepayment = await create_lnurldevicepayment(
deviceid=device.id,
2022-10-07 13:46:40 +01:00
payload="bla",
sats=price_msat,
pin=1,
payhash="bla",
2022-10-06 17:10:15 +01:00
)
if not lnurldevicepayment:
return {"status": "ERROR", "reason": "Could not create payment."}
return {
"tag": "payRequest",
"callback": request.url_for(
2022-10-07 23:18:57 +01:00
"lnurldevice.lnurl_callback", paymentid=lnurldevicepayment.id
2022-10-06 17:10:15 +01:00
),
2022-10-07 13:46:40 +01:00
"minSendable": price_msat,
"maxSendable": price_msat,
2022-10-06 17:10:15 +01:00
"metadata": await device.lnurlpay_metadata(),
}
2022-01-21 21:23:32 +00:00
if len(p) % 4 > 0:
p += "=" * (4 - (len(p) % 4))
data = base64.urlsafe_b64decode(p)
pin = 0
amount_in_cent = 0
try:
result = xor_decrypt(device.key.encode(), data)
pin = result[0]
amount_in_cent = result[1]
except Exception as exc:
return {"status": "ERROR", "reason": str(exc)}
price_msat = (
await fiat_amount_as_satoshis(float(amount_in_cent) / 100, device.currency)
if device.currency != "sat"
else amount_in_cent
) * 1000
if atm:
if device.device != "atm":
return {"status": "ERROR", "reason": "Not ATM device."}
price_msat = int(price_msat * (1 - (device.profit / 100)) / 1000)
lnurldevicepayment = await create_lnurldevicepayment(
deviceid=device.id,
payload=p,
sats=price_msat * 1000,
pin=pin,
payhash="payment_hash",
)
if not lnurldevicepayment:
return {"status": "ERROR", "reason": "Could not create payment."}
return {
"tag": "withdrawRequest",
"callback": request.url_for(
"lnurldevice.lnurl_callback", paymentid=lnurldevicepayment.id
),
"k1": lnurldevicepayment.id,
"minWithdrawable": price_msat * 1000,
"maxWithdrawable": price_msat * 1000,
"defaultDescription": device.title,
}
price_msat = int(price_msat * ((device.profit / 100) + 1) / 1000)
2022-01-21 21:23:32 +00:00
lnurldevicepayment = await create_lnurldevicepayment(
deviceid=device.id,
payload=p,
sats=price_msat * 1000,
pin=pin,
payhash="payment_hash",
)
if not lnurldevicepayment:
return {"status": "ERROR", "reason": "Could not create payment."}
return {
"tag": "payRequest",
"callback": request.url_for(
"lnurldevice.lnurl_callback", paymentid=lnurldevicepayment.id
),
"minSendable": price_msat * 1000,
"maxSendable": price_msat * 1000,
"metadata": await device.lnurlpay_metadata(),
}
@lnurldevice_ext.get(
"/api/v1/lnurl/cb/{paymentid}",
status_code=HTTPStatus.OK,
name="lnurldevice.lnurl_callback",
)
2022-01-30 19:43:30 +00:00
async def lnurl_callback(
request: Request,
paymentid: str = Query(None),
pr: str = Query(None),
k1: str = Query(None),
):
2022-01-21 21:23:32 +00:00
lnurldevicepayment = await get_lnurldevicepayment(paymentid)
device = await get_lnurldevice(lnurldevicepayment.deviceid)
if not device:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN, detail="lnurldevice not found."
)
if device.device == "atm":
if not pr:
raise HTTPException(
status_code=HTTPStatus.FORBIDDEN, detail="No payment request"
)
else:
if lnurldevicepayment.id != k1:
return {"status": "ERROR", "reason": "Bad K1"}
if lnurldevicepayment.payhash != "payment_hash":
return {"status": "ERROR", "reason": f"Payment already claimed"}
2022-01-21 21:23:32 +00:00
lnurldevicepayment = await update_lnurldevicepayment(
lnurldevicepayment_id=paymentid, payhash=lnurldevicepayment.payload
)
await pay_invoice(
wallet_id=device.wallet,
payment_request=pr,
max_sat=lnurldevicepayment.sats / 1000,
extra={"tag": "withdraw"},
)
return {"status": "OK"}
2022-10-06 17:10:15 +01:00
if device.device == "switch":
payment_hash, payment_request = await create_invoice(
wallet_id=device.wallet,
amount=lnurldevicepayment.sats / 1000,
2022-10-07 13:46:40 +01:00
memo=device.title + "-" + lnurldevicepayment.id,
2022-10-06 17:10:15 +01:00
unhashed_description=(await device.lnurlpay_metadata()).encode("utf-8"),
2022-10-07 13:46:40 +01:00
extra={"tag": "Switch", "id": paymentid, "time": device.amount},
2022-10-06 17:10:15 +01:00
)
lnurldevicepayment = await update_lnurldevicepayment(
lnurldevicepayment_id=paymentid, payhash=payment_hash
)
return {
"pr": payment_request,
"routes": [],
}
2022-01-21 21:23:32 +00:00
payment_hash, payment_request = await create_invoice(
wallet_id=device.wallet,
amount=lnurldevicepayment.sats / 1000,
memo=device.title,
unhashed_description=(await device.lnurlpay_metadata()).encode("utf-8"),
2022-01-21 21:23:32 +00:00
extra={"tag": "PoS"},
)
lnurldevicepayment = await update_lnurldevicepayment(
lnurldevicepayment_id=paymentid, payhash=payment_hash
)
return {
"pr": payment_request,
"successAction": {
"tag": "url",
"description": "Check the attached link",
"url": request.url_for("lnurldevice.displaypin", paymentid=paymentid),
},
"routes": [],
}