lnbits-legend/lnbits/core/crud.py
fiatjaf 76199c66a1 livestream extension.
(except docs)
2021-04-11 00:44:55 -03:00

382 lines
9.1 KiB
Python

import json
import datetime
from uuid import uuid4
from typing import List, Optional, Dict, Any
from lnbits import bolt11
from lnbits.db import Connection
from lnbits.settings import DEFAULT_WALLET_NAME
from . import db
from .models import User, Wallet, Payment
# accounts
# --------
async def create_account(conn: Optional[Connection] = None) -> User:
user_id = uuid4().hex
await (conn or db).execute("INSERT INTO accounts (id) VALUES (?)", (user_id,))
new_account = await get_account(user_id=user_id, conn=conn)
assert new_account, "Newly created account couldn't be retrieved"
return new_account
async def get_account(
user_id: str, conn: Optional[Connection] = None
) -> Optional[User]:
row = await (conn or db).fetchone(
"SELECT id, email, pass as password FROM accounts WHERE id = ?", (user_id,)
)
return User(**row) if row else None
async def get_user(user_id: str, conn: Optional[Connection] = None) -> Optional[User]:
user = await (conn or db).fetchone(
"SELECT id, email FROM accounts WHERE id = ?", (user_id,)
)
if user:
extensions = await (conn or db).fetchall(
"SELECT extension FROM extensions WHERE user = ? AND active = 1", (user_id,)
)
wallets = await (conn or db).fetchall(
"""
SELECT *, COALESCE((SELECT balance FROM balances WHERE wallet = wallets.id), 0) AS balance_msat
FROM wallets
WHERE user = ?
""",
(user_id,),
)
return (
User(
**{
**user,
**{
"extensions": [e[0] for e in extensions],
"wallets": [Wallet(**w) for w in wallets],
},
}
)
if user
else None
)
async def update_user_extension(
*, user_id: str, extension: str, active: int, conn: Optional[Connection] = None
) -> None:
await (conn or db).execute(
"""
INSERT OR REPLACE INTO extensions (user, extension, active)
VALUES (?, ?, ?)
""",
(user_id, extension, active),
)
# wallets
# -------
async def create_wallet(
*,
user_id: str,
wallet_name: Optional[str] = None,
conn: Optional[Connection] = None,
) -> Wallet:
wallet_id = uuid4().hex
await (conn or db).execute(
"""
INSERT INTO wallets (id, name, user, adminkey, inkey)
VALUES (?, ?, ?, ?, ?)
""",
(
wallet_id,
wallet_name or DEFAULT_WALLET_NAME,
user_id,
uuid4().hex,
uuid4().hex,
),
)
new_wallet = await get_wallet(wallet_id=wallet_id, conn=conn)
assert new_wallet, "Newly created wallet couldn't be retrieved"
return new_wallet
async def delete_wallet(
*, user_id: str, wallet_id: str, conn: Optional[Connection] = None
) -> None:
await (conn or db).execute(
"""
UPDATE wallets AS w
SET
user = 'del:' || w.user,
adminkey = 'del:' || w.adminkey,
inkey = 'del:' || w.inkey
WHERE id = ? AND user = ?
""",
(wallet_id, user_id),
)
async def get_wallet(
wallet_id: str, conn: Optional[Connection] = None
) -> Optional[Wallet]:
row = await (conn or db).fetchone(
"""
SELECT *, COALESCE((SELECT balance FROM balances WHERE wallet = wallets.id), 0) AS balance_msat
FROM wallets
WHERE id = ?
""",
(wallet_id,),
)
return Wallet(**row) if row else None
async def get_wallet_for_key(
key: str, key_type: str = "invoice", conn: Optional[Connection] = None
) -> Optional[Wallet]:
row = await (conn or db).fetchone(
"""
SELECT *, COALESCE((SELECT balance FROM balances WHERE wallet = wallets.id), 0) AS balance_msat
FROM wallets
WHERE adminkey = ? OR inkey = ?
""",
(key, key),
)
if not row:
return None
if key_type == "admin" and row["adminkey"] != key:
return None
return Wallet(**row)
# wallet payments
# ---------------
async def get_standalone_payment(
checking_id_or_hash: str, conn: Optional[Connection] = None
) -> Optional[Payment]:
row = await (conn or db).fetchone(
"""
SELECT *
FROM apipayments
WHERE checking_id = ? OR hash = ?
LIMIT 1
""",
(checking_id_or_hash, checking_id_or_hash),
)
return Payment.from_row(row) if row else None
async def get_wallet_payment(
wallet_id: str, payment_hash: str, conn: Optional[Connection] = None
) -> Optional[Payment]:
row = await (conn or db).fetchone(
"""
SELECT *
FROM apipayments
WHERE wallet = ? AND hash = ?
""",
(wallet_id, payment_hash),
)
return Payment.from_row(row) if row else None
async def get_payments(
*,
wallet_id: Optional[str] = None,
complete: bool = False,
pending: bool = False,
outgoing: bool = False,
incoming: bool = False,
since: Optional[int] = None,
exclude_uncheckable: bool = False,
conn: Optional[Connection] = None,
) -> List[Payment]:
"""
Filters payments to be returned by complete | pending | outgoing | incoming.
"""
args: List[Any] = []
clause: List[str] = []
if since != None:
clause.append("time > ?")
args.append(since)
if wallet_id:
clause.append("wallet = ?")
args.append(wallet_id)
if complete and pending:
pass
elif complete:
clause.append("((amount > 0 AND pending = 0) OR amount < 0)")
elif pending:
clause.append("pending = 1")
else:
pass
if outgoing and incoming:
pass
elif outgoing:
clause.append("amount < 0")
elif incoming:
clause.append("amount > 0")
else:
pass
if exclude_uncheckable: # checkable means it has a checking_id that isn't internal
clause.append("checking_id NOT LIKE 'temp_%'")
clause.append("checking_id NOT LIKE 'internal_%'")
where = ""
if clause:
where = f"WHERE {' AND '.join(clause)}"
rows = await (conn or db).fetchall(
f"""
SELECT *
FROM apipayments
{where}
ORDER BY time DESC
""",
tuple(args),
)
return [Payment.from_row(row) for row in rows]
async def delete_expired_invoices(
conn: Optional[Connection] = None,
) -> None:
rows = await (conn or db).fetchall(
"""
SELECT bolt11
FROM apipayments
WHERE pending = 1 AND amount > 0 AND time < strftime('%s', 'now') - 86400
"""
)
for (payment_request,) in rows:
try:
invoice = bolt11.decode(payment_request)
except:
continue
expiration_date = datetime.datetime.fromtimestamp(invoice.date + invoice.expiry)
if expiration_date > datetime.datetime.utcnow():
continue
await (conn or db).execute(
"""
DELETE FROM apipayments
WHERE pending = 1 AND hash = ?
""",
(invoice.payment_hash,),
)
# payments
# --------
async def create_payment(
*,
wallet_id: str,
checking_id: str,
payment_request: str,
payment_hash: str,
amount: int,
memo: str,
fee: int = 0,
preimage: Optional[str] = None,
pending: bool = True,
extra: Optional[Dict] = None,
webhook: Optional[str] = None,
conn: Optional[Connection] = None,
) -> Payment:
await (conn or db).execute(
"""
INSERT INTO apipayments
(wallet, checking_id, bolt11, hash, preimage,
amount, pending, memo, fee, extra, webhook)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
wallet_id,
checking_id,
payment_request,
payment_hash,
preimage,
amount,
int(pending),
memo,
fee,
json.dumps(extra)
if extra and extra != {} and type(extra) is dict
else None,
webhook,
),
)
new_payment = await get_wallet_payment(wallet_id, payment_hash, conn=conn)
assert new_payment, "Newly created payment couldn't be retrieved"
return new_payment
async def update_payment_status(
checking_id: str,
pending: bool,
conn: Optional[Connection] = None,
) -> None:
await (conn or db).execute(
"UPDATE apipayments SET pending = ? WHERE checking_id = ?",
(
int(pending),
checking_id,
),
)
async def delete_payment(
checking_id: str,
conn: Optional[Connection] = None,
) -> None:
await (conn or db).execute(
"DELETE FROM apipayments WHERE checking_id = ?", (checking_id,)
)
async def check_internal(
payment_hash: str,
conn: Optional[Connection] = None,
) -> Optional[str]:
row = await (conn or db).fetchone(
"""
SELECT checking_id FROM apipayments
WHERE hash = ? AND pending AND amount > 0
""",
(payment_hash,),
)
if not row:
return None
else:
return row["checking_id"]