lnbits-legend/lnbits/extensions/subdomains/crud.py

193 lines
6.1 KiB
Python
Raw Normal View History

2020-12-28 22:32:04 +01:00
from typing import List, Optional, Union
from lnbits.helpers import urlsafe_short_hash
from . import db
from .models import Domains, Subdomains
import httpx
from lnbits.extensions import subdomains
2020-12-31 18:50:16 +01:00
2020-12-28 22:32:04 +01:00
async def create_subdomain(
payment_hash: str,
wallet: str,
domain: str,
subdomain: str,
email: str,
ip: str,
sats: int,
2020-12-31 18:39:16 +01:00
duration: int,
2020-12-31 18:50:16 +01:00
record_type: str,
2020-12-28 22:32:04 +01:00
) -> Subdomains:
await db.execute(
"""
2020-12-31 18:39:16 +01:00
INSERT INTO subdomain (id, domain, email, subdomain, ip, wallet, sats, duration, paid, record_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
2020-12-28 22:32:04 +01:00
""",
2020-12-31 18:39:16 +01:00
(payment_hash, domain, email, subdomain, ip, wallet, sats, duration, False, record_type),
2020-12-28 22:32:04 +01:00
)
subdomain = await get_subdomain(payment_hash)
assert subdomain, "Newly created subdomain couldn't be retrieved"
return subdomain
async def set_subdomain_paid(payment_hash: str) -> Subdomains:
2020-12-31 18:50:16 +01:00
row = await db.fetchone(
"SELECT s.*, d.domain as domain_name FROM subdomain s INNER JOIN domain d ON (s.domain = d.id) WHERE s.id = ?",
(payment_hash,),
)
if row[8] == False:
2020-12-28 22:32:04 +01:00
await db.execute(
"""
UPDATE subdomain
SET paid = true
WHERE id = ?
""",
(payment_hash,),
)
domaindata = await get_domain(row[1])
assert domaindata, "Couldn't get domain from paid subdomain"
amount = domaindata.amountmade + row[8]
2020-12-28 22:32:04 +01:00
await db.execute(
"""
UPDATE domain
SET amountmade = ?
WHERE id = ?
""",
(amount, row[1]),
)
2020-12-28 22:32:04 +01:00
subdomain = await get_subdomain(payment_hash)
### SEND REQUEST TO CLOUDFLARE
2020-12-31 18:50:16 +01:00
url = "https://api.cloudflare.com/client/v4/zones/" + domaindata.cf_zone_id + "/dns_records"
header = {"Authorization": "Bearer " + domaindata.cf_token, "Content-Type": "application/json"}
aRecord = subdomain.subdomain + "." + subdomain.domain_name
cf_response = ""
async with httpx.AsyncClient() as client:
try:
r = await client.post(
url,
headers=header,
json={
2020-12-31 18:39:16 +01:00
"type": subdomain.record_type,
"name": aRecord,
"content": subdomain.ip,
"ttl": 0,
2020-12-31 18:50:16 +01:00
"proxed": False,
},
timeout=40,
)
cf_response = r.text
except AssertionError:
cf_response = "Error occured"
### Use webhook to notify about cloudflare registration
2020-12-28 22:32:04 +01:00
if domaindata.webhook:
async with httpx.AsyncClient() as client:
try:
r = await client.post(
domaindata.webhook,
json={
"domain": subdomain.domain_name,
2020-12-28 22:32:04 +01:00
"subdomain": subdomain.subdomain,
2020-12-31 18:39:16 +01:00
"record_type": subdomain.record_type,
2020-12-28 22:32:04 +01:00
"email": subdomain.email,
"ip": subdomain.ip,
"cost:": str(subdomain.sats) + " sats",
"duration": str(subdomain.duration) + " days",
2020-12-31 18:50:16 +01:00
"cf_response": cf_response,
2020-12-28 22:32:04 +01:00
},
timeout=40,
)
except AssertionError:
webhook = None
2020-12-28 22:32:04 +01:00
subdomain = await get_subdomain(payment_hash)
return
async def get_subdomain(subdomain_id: str) -> Optional[Subdomains]:
2020-12-31 18:50:16 +01:00
row = await db.fetchone(
"SELECT s.*, d.domain as domain_name FROM subdomain s INNER JOIN domain d ON (s.domain = d.id) WHERE s.id = ?",
(subdomain_id,),
)
print(row)
2020-12-28 22:32:04 +01:00
return Subdomains(**row) if row else None
async def get_subdomains(wallet_ids: Union[str, List[str]]) -> List[Subdomains]:
if isinstance(wallet_ids, str):
wallet_ids = [wallet_ids]
q = ",".join(["?"] * len(wallet_ids))
2020-12-31 18:50:16 +01:00
rows = await db.fetchall(
f"SELECT s.*, d.domain as domain_name FROM subdomain s INNER JOIN domain d ON (s.domain = d.id) WHERE s.wallet IN ({q})",
(*wallet_ids,),
)
2020-12-28 22:32:04 +01:00
return [Subdomains(**row) for row in rows]
async def delete_subdomain(subdomain_id: str) -> None:
await db.execute("DELETE FROM subdomain WHERE id = ?", (subdomain_id,))
# Domains
2020-12-31 18:50:16 +01:00
async def create_domain(
*,
wallet: str,
domain: str,
cf_token: str,
cf_zone_id: str,
webhook: Optional[str] = None,
description: str,
cost: int,
allowed_record_types: str,
) -> Domains:
2020-12-28 22:32:04 +01:00
domain_id = urlsafe_short_hash()
await db.execute(
"""
2020-12-31 18:39:16 +01:00
INSERT INTO domain (id, wallet, domain, webhook, cf_token, cf_zone_id, description, cost, amountmade, allowed_record_types)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
2020-12-28 22:32:04 +01:00
""",
2020-12-31 18:39:16 +01:00
(domain_id, wallet, domain, webhook, cf_token, cf_zone_id, description, cost, 0, allowed_record_types),
2020-12-28 22:32:04 +01:00
)
domain = await get_domain(domain_id)
assert domain, "Newly created domain couldn't be retrieved"
return domain
async def update_domain(domain_id: str, **kwargs) -> Domains:
q = ", ".join([f"{field[0]} = ?" for field in kwargs.items()])
await db.execute(f"UPDATE domain SET {q} WHERE id = ?", (*kwargs.values(), domain_id))
row = await db.fetchone("SELECT * FROM domain WHERE id = ?", (domain_id,))
assert row, "Newly updated domain couldn't be retrieved"
return Domains(**row)
async def get_domain(domain_id: str) -> Optional[Domains]:
row = await db.fetchone("SELECT * FROM domain WHERE id = ?", (domain_id,))
return Domains(**row) if row else None
async def get_domains(wallet_ids: Union[str, List[str]]) -> List[Domains]:
if isinstance(wallet_ids, str):
wallet_ids = [wallet_ids]
q = ",".join(["?"] * len(wallet_ids))
rows = await db.fetchall(f"SELECT * FROM domain WHERE wallet IN ({q})", (*wallet_ids,))
return [Domains(**row) for row in rows]
async def delete_domain(domain_id: str) -> None:
await db.execute("DELETE FROM domain WHERE id = ?", (domain_id,))