lnbits-legend/lnbits/extensions/subdomains/tasks.py
dni ⚡ 09871bbabc
fix mypy for extensions (#873)
* explicitly exclude all extensions from mypy

* fix example extension mypy

* fix subdomains extension mypy + 1 type error fixed

* fix mypy discordbot

* mypy check copilot extensnion

* copilot black

* add invoices ext to ignore

* add boltz and boltcard

* copilit id is necessary

* was discordbot is ok

Co-authored-by: dni <dni.khr@gmail.com>
2022-10-24 16:29:30 +02:00

58 lines
1.9 KiB
Python

import asyncio
import httpx
from lnbits.core.models import Payment
from lnbits.helpers import get_current_extension_name
from lnbits.tasks import register_invoice_listener
from .cloudflare import cloudflare_create_subdomain
from .crud import get_domain, set_subdomain_paid
async def wait_for_paid_invoices():
invoice_queue = asyncio.Queue()
register_invoice_listener(invoice_queue, get_current_extension_name())
while True:
payment = await invoice_queue.get()
await on_invoice_paid(payment)
async def on_invoice_paid(payment: Payment) -> None:
if not payment.extra or payment.extra.get("tag") != "lnsubdomain":
# not an lnurlp invoice
return
await payment.set_pending(False)
subdomain = await set_subdomain_paid(payment_hash=payment.payment_hash)
domain = await get_domain(subdomain.domain)
### Create subdomain
cf_response = await cloudflare_create_subdomain(
domain=domain,
subdomain=subdomain.subdomain,
record_type=subdomain.record_type,
ip=subdomain.ip,
)
### Use webhook to notify about cloudflare registration
if domain and domain.webhook:
async with httpx.AsyncClient() as client:
try:
r = await client.post(
domain.webhook,
json={
"domain": subdomain.domain_name,
"subdomain": subdomain.subdomain,
"record_type": subdomain.record_type,
"email": subdomain.email,
"ip": subdomain.ip,
"cost:": str(subdomain.sats) + " sats",
"duration": str(subdomain.duration) + " days",
"cf_response": cf_response,
},
timeout=40,
)
except AssertionError:
webhook = None