lnbits-legend/lnbits/tasks.py
calle 2ee10e28c5
Backend: Unstuck outgoing payments (#857)
* first attempts

* lndrest works

* fix details

* optional fee update

* use base64.urlsafe_b64encode

* return paymentstatus

* CLN: return pending for pending invoices

* grpc wip

* lndgrpc works

* cln: return pending for pending invoices really this time

* retry wallet out of exception

* wip eclair

* take all routines into try except

* cliche: return error

* rename payment.check_pending() to payment.check_status()

* rename payment.check_pending() to payment.check_status()

* eclair: works

* eclair: better error check

* opennode: works

* check payment.checking_id istead of payment.ok

* payment.ok check as well

* cln: works?

* cln: works

* lntxbot: works

* lnbits/wallets/lnpay.py

* cln: error handling

* make format

* lndhub full detail update

* spark: wip

* error to False

* wallets: return clean PaymentResponse

* opennode: strict error

* cliche: works

* lnbits: works

* cln: dont throw error

* preimage not error

* fix cln

* do not add duplicate payments

* revert cln

* extra safety for cln

* undo crud changes until tests work

* tasks: better logging and 0.5s sleep for regular status check

* 0.1 s

* check if wallet exists

* lnbits unhashed description

* remove sleep

* revert app.py

* cleanup

* add check

* clean error

* readd app.py

* fix eclaid
2022-08-30 13:28:58 +02:00

129 lines
3.7 KiB
Python

import asyncio
import time
import traceback
from http import HTTPStatus
from typing import Callable, List
from fastapi.exceptions import HTTPException
from loguru import logger
from lnbits.core.crud import (
delete_expired_invoices,
get_balance_checks,
get_payments,
get_standalone_payment,
)
from lnbits.core.services import redeem_lnurl_withdraw
from lnbits.settings import WALLET
deferred_async: List[Callable] = []
def record_async(func: Callable) -> Callable:
def recorder(state):
deferred_async.append(func)
return recorder
async def run_deferred_async():
for func in deferred_async:
asyncio.create_task(catch_everything_and_restart(func))
async def catch_everything_and_restart(func):
try:
await func()
except asyncio.CancelledError:
raise # because we must pass this up
except Exception as exc:
logger.error("caught exception in background task:", exc)
logger.error(traceback.format_exc())
logger.error("will restart the task in 5 seconds.")
await asyncio.sleep(5)
await catch_everything_and_restart(func)
async def send_push_promise(a, b) -> None:
pass
invoice_listeners: List[asyncio.Queue] = []
def register_invoice_listener(send_chan: asyncio.Queue):
"""
A method intended for extensions to call when they want to be notified about
new invoice payments incoming.
"""
invoice_listeners.append(send_chan)
async def webhook_handler():
handler = getattr(WALLET, "webhook_listener", None)
if handler:
return await handler()
raise HTTPException(status_code=HTTPStatus.NO_CONTENT)
internal_invoice_queue: asyncio.Queue = asyncio.Queue(0)
async def internal_invoice_listener():
while True:
checking_id = await internal_invoice_queue.get()
asyncio.create_task(invoice_callback_dispatcher(checking_id))
async def invoice_listener():
async for checking_id in WALLET.paid_invoices_stream():
logger.info("> got a payment notification", checking_id)
asyncio.create_task(invoice_callback_dispatcher(checking_id))
async def check_pending_payments():
outgoing = True
incoming = True
while True:
logger.debug(
f"Task: checking all pending payments (incoming={incoming}, outgoing={outgoing}) of last 15 days"
)
for payment in await get_payments(
since=(int(time.time()) - 60 * 60 * 24 * 15), # 15 days ago
complete=False,
pending=True,
outgoing=outgoing,
incoming=incoming,
exclude_uncheckable=True,
):
await payment.check_status()
logger.debug("Task: pending payments check finished")
# we delete expired invoices once upon the first pending check
if incoming:
logger.debug("Task: deleting all expired invoices")
await delete_expired_invoices()
logger.debug("Task: expired invoice deletion finished")
# after the first check we will only check outgoing, not incoming
# that will be handled by the global invoice listeners, hopefully
incoming = False
await asyncio.sleep(60 * 30) # every 30 minutes
async def perform_balance_checks():
while True:
for bc in await get_balance_checks():
redeem_lnurl_withdraw(bc.wallet, bc.url)
await asyncio.sleep(60 * 60 * 6) # every 6 hours
async def invoice_callback_dispatcher(checking_id: str):
payment = await get_standalone_payment(checking_id, incoming=True)
if payment and payment.is_in:
logger.trace("sending invoice callback for payment", checking_id)
await payment.set_pending(False)
for send_chan in invoice_listeners:
await send_chan.put(payment)