2022-07-28 15:04:13 +03:00

282 lines
8.9 KiB

from http import HTTPStatus
from fastapi import Query, Request
from fastapi.params import Depends
from starlette.exceptions import HTTPException
from embit.descriptor import Descriptor, Key
from embit.psbt import PSBT, DerivationPath
from embit.ec import PublicKey
from embit.transaction import Transaction, TransactionInput, TransactionOutput
from embit import script
from lnbits.decorators import WalletTypeInfo, get_key_type, require_admin_key
from lnbits.extensions.watchonly import watchonly_ext
from .crud import (
from .models import CreateWallet, CreatePsbt, Config
from .helpers import parse_key
async def api_wallets_retrieve(wallet: WalletTypeInfo = Depends(get_key_type)):
return [wallet.dict() for wallet in await get_watch_wallets(wallet.wallet.user)]
return ""
async def api_wallet_retrieve(
wallet_id, wallet: WalletTypeInfo = Depends(get_key_type)
w_wallet = await get_watch_wallet(wallet_id)
if not w_wallet:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Wallet does not exist."
return w_wallet.dict()
async def api_wallet_create_or_update(
data: CreateWallet, w: WalletTypeInfo = Depends(require_admin_key)
wallet = await create_watch_wallet(
user=w.wallet.user, masterpub=data.masterpub, title=data.title
await api_get_addresses(wallet.id, w)
except Exception as e:
raise HTTPException(status_code=HTTPStatus.BAD_REQUEST, detail=str(e))
config = await get_config(w.wallet.user)
if not config:
await create_config(user=w.wallet.user)
return wallet.dict()
async def api_wallet_delete(wallet_id, w: WalletTypeInfo = Depends(require_admin_key)):
wallet = await get_watch_wallet(wallet_id)
if not wallet:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Wallet does not exist."
await delete_watch_wallet(wallet_id)
await delete_addresses_for_wallet(wallet_id)
raise HTTPException(status_code=HTTPStatus.NO_CONTENT)
async def api_fresh_address(wallet_id, w: WalletTypeInfo = Depends(get_key_type)):
address = await get_fresh_address(wallet_id)
return address.dict()
async def api_update_address(
id: str, req: Request, w: WalletTypeInfo = Depends(require_admin_key)
body = await req.json()
params = {}
# amout is only updated if the address has history
if "amount" in body:
params["amount"] = int(body["amount"])
params["has_activity"] = True
if "note" in body:
params["note"] = str(body["note"])
address = await update_address(**params, id=id)
wallet = (
await get_watch_wallet(address.wallet)
if address.branch_index == 0 and address.amount != 0
else None
if wallet and wallet.address_no < address.address_index:
await update_watch_wallet(
address.wallet, **{"address_no": address.address_index}
return address
async def api_get_addresses(wallet_id, w: WalletTypeInfo = Depends(get_key_type)):
wallet = await get_watch_wallet(wallet_id)
if not wallet:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND, detail="Wallet does not exist."
addresses = await get_addresses(wallet_id)
config = await get_config(w.wallet.user)
if not addresses:
await create_fresh_addresses(wallet_id, 0, config.receive_gap_limit)
await create_fresh_addresses(wallet_id, 0, config.change_gap_limit, True)
addresses = await get_addresses(wallet_id)
receive_addresses = list(filter(lambda addr: addr.branch_index == 0, addresses))
change_addresses = list(filter(lambda addr: addr.branch_index == 1, addresses))
last_receive_address = list(
filter(lambda addr: addr.has_activity, receive_addresses)
last_change_address = list(
filter(lambda addr: addr.has_activity, change_addresses)
if last_receive_address:
current_index = receive_addresses[-1].address_index
address_index = last_receive_address[0].address_index
await create_fresh_addresses(
wallet_id, current_index + 1, address_index + config.receive_gap_limit + 1
if last_change_address:
current_index = change_addresses[-1].address_index
address_index = last_change_address[0].address_index
await create_fresh_addresses(
current_index + 1,
address_index + config.change_gap_limit + 1,
addresses = await get_addresses(wallet_id)
return [address.dict() for address in addresses]
async def api_psbt_create(
data: CreatePsbt, w: WalletTypeInfo = Depends(require_admin_key)
vin = [
TransactionInput(bytes.fromhex(inp.tx_id), inp.vout) for inp in data.inputs
vout = [
TransactionOutput(out.amount, script.address_to_scriptpubkey(out.address))
for out in data.outputs
descriptors = {}
for _, masterpub in enumerate(data.masterpubs):
descriptors[masterpub.fingerprint] = parse_key(masterpub.public_key)
inputs_extra = []
bip32_derivations = {}
for i, inp in enumerate(data.inputs):
descriptor = descriptors[inp.masterpub_fingerprint][0]
d = descriptor.derive(inp.address_index, inp.branch_index)
for k in d.keys:
bip32_derivations[PublicKey.parse(k.sec())] = DerivationPath(
k.origin.fingerprint, k.origin.derivation
"bip32_derivations": bip32_derivations,
"non_witness_utxo": Transaction.from_string(inp.tx_hex),
tx = Transaction(vin=vin, vout=vout)
psbt = PSBT(tx)
for i, inp in enumerate(inputs_extra):
psbt.inputs[i].bip32_derivations = inp["bip32_derivations"]
psbt.inputs[i].non_witness_utxo = inp.get("non_witness_utxo", None)
outputs_extra = []
bip32_derivations = {}
for i, out in enumerate(data.outputs):
if out.branch_index == 1:
descriptor = descriptors[out.masterpub_fingerprint][0]
d = descriptor.derive(out.address_index, out.branch_index)
for k in d.keys:
bip32_derivations[PublicKey.parse(k.sec())] = DerivationPath(
k.origin.fingerprint, k.origin.derivation
outputs_extra.append({"bip32_derivations": bip32_derivations})
for i, out in enumerate(outputs_extra):
psbt.outputs[i].bip32_derivations = out["bip32_derivations"]
return psbt.to_string()
except Exception as e:
raise HTTPException(status_code=HTTPStatus.BAD_REQUEST, detail=str(e))
async def api_update_config(
data: Config, w: WalletTypeInfo = Depends(require_admin_key)
config = await update_config(data, user=w.wallet.user)
return config.dict()
async def api_get_config(w: WalletTypeInfo = Depends(get_key_type)):
config = await get_config(w.wallet.user)
if not config:
config = await create_config(user=w.wallet.user)
return config.dict()
### TODO: fix statspay dependcy and remove
async def api_update_mempool(
endpoint: str = Query(...), w: WalletTypeInfo = Depends(require_admin_key)
mempool = await update_mempool(**{"endpoint": endpoint}, user=w.wallet.user)
return mempool.dict()
### TODO: fix statspay dependcy and remove
async def api_get_mempool(w: WalletTypeInfo = Depends(require_admin_key)):
mempool = await get_mempool(w.wallet.user)
if not mempool:
mempool = await create_mempool(user=w.wallet.user)
return mempool.dict()