lnbits-legend/lnbits/tasks.py

229 lines
7.5 KiB
Python
Raw Normal View History

import asyncio
[FEAT] Push notification integration into core (#1393) * push notification integration into core added missing component fixed bell working on all pages - made pubkey global template env var - had to move `get_push_notification_pubkey` to `helpers.py` because of circular reference with `tasks.py` formay trying to fix mypy added py-vapid to requirements Trying to fix stub mypy issue * removed key files * webpush key pair is saved in db `webpush_settings` * removed lnaddress extension changes * support for multi user account subscriptions, subscriptions are stored user based fixed syntax error fixed syntax error removed unused line * fixed subscribed user storage with local storage, no get request required * method is singular now * cleanup unsubscribed or expired push subscriptions fixed flake8 errors fixed poetry errors * updating to latest lnbits formatting, rebase error fix * remove unused? * revert * relock * remove * do not create settings table use adminsettings mypy fix * cleanup old code * catch case when client tries to recreate existing webpush subscription e.g. on cleared local storage * show notification bell on user related pages only * use local storage with one key like array, some refactoring * fixed crud import * fixed too long line * removed unused imports * ruff * make webpush editable * fixed privkey encoding * fix ruff * fix migration --------- Co-authored-by: schneimi <admin@schneimi.de> Co-authored-by: schneimi <dev@schneimi.de> Co-authored-by: dni ⚡ <office@dnilabs.com>
2023-09-11 15:48:49 +02:00
import json
2022-07-16 14:23:03 +02:00
import time
import traceback
import uuid
from http import HTTPStatus
from typing import Dict, List, Optional
2021-09-11 11:02:48 +02:00
from fastapi.exceptions import HTTPException
2022-07-16 14:23:03 +02:00
from loguru import logger
[FEAT] Push notification integration into core (#1393) * push notification integration into core added missing component fixed bell working on all pages - made pubkey global template env var - had to move `get_push_notification_pubkey` to `helpers.py` because of circular reference with `tasks.py` formay trying to fix mypy added py-vapid to requirements Trying to fix stub mypy issue * removed key files * webpush key pair is saved in db `webpush_settings` * removed lnaddress extension changes * support for multi user account subscriptions, subscriptions are stored user based fixed syntax error fixed syntax error removed unused line * fixed subscribed user storage with local storage, no get request required * method is singular now * cleanup unsubscribed or expired push subscriptions fixed flake8 errors fixed poetry errors * updating to latest lnbits formatting, rebase error fix * remove unused? * revert * relock * remove * do not create settings table use adminsettings mypy fix * cleanup old code * catch case when client tries to recreate existing webpush subscription e.g. on cleared local storage * show notification bell on user related pages only * use local storage with one key like array, some refactoring * fixed crud import * fixed too long line * removed unused imports * ruff * make webpush editable * fixed privkey encoding * fix ruff * fix migration --------- Co-authored-by: schneimi <admin@schneimi.de> Co-authored-by: schneimi <dev@schneimi.de> Co-authored-by: dni ⚡ <office@dnilabs.com>
2023-09-11 15:48:49 +02:00
from py_vapid import Vapid
from pywebpush import WebPushException, webpush
2021-09-11 11:02:48 +02:00
from lnbits.core.crud import (
delete_expired_invoices,
[FEAT] Push notification integration into core (#1393) * push notification integration into core added missing component fixed bell working on all pages - made pubkey global template env var - had to move `get_push_notification_pubkey` to `helpers.py` because of circular reference with `tasks.py` formay trying to fix mypy added py-vapid to requirements Trying to fix stub mypy issue * removed key files * webpush key pair is saved in db `webpush_settings` * removed lnaddress extension changes * support for multi user account subscriptions, subscriptions are stored user based fixed syntax error fixed syntax error removed unused line * fixed subscribed user storage with local storage, no get request required * method is singular now * cleanup unsubscribed or expired push subscriptions fixed flake8 errors fixed poetry errors * updating to latest lnbits formatting, rebase error fix * remove unused? * revert * relock * remove * do not create settings table use adminsettings mypy fix * cleanup old code * catch case when client tries to recreate existing webpush subscription e.g. on cleared local storage * show notification bell on user related pages only * use local storage with one key like array, some refactoring * fixed crud import * fixed too long line * removed unused imports * ruff * make webpush editable * fixed privkey encoding * fix ruff * fix migration --------- Co-authored-by: schneimi <admin@schneimi.de> Co-authored-by: schneimi <dev@schneimi.de> Co-authored-by: dni ⚡ <office@dnilabs.com>
2023-09-11 15:48:49 +02:00
delete_webpush_subscriptions,
2021-04-17 23:27:15 +02:00
get_balance_checks,
2022-07-16 14:23:03 +02:00
get_payments,
get_standalone_payment,
)
from lnbits.core.db import db
2021-04-17 23:27:15 +02:00
from lnbits.core.services import redeem_lnurl_withdraw
[FEAT] Push notification integration into core (#1393) * push notification integration into core added missing component fixed bell working on all pages - made pubkey global template env var - had to move `get_push_notification_pubkey` to `helpers.py` because of circular reference with `tasks.py` formay trying to fix mypy added py-vapid to requirements Trying to fix stub mypy issue * removed key files * webpush key pair is saved in db `webpush_settings` * removed lnaddress extension changes * support for multi user account subscriptions, subscriptions are stored user based fixed syntax error fixed syntax error removed unused line * fixed subscribed user storage with local storage, no get request required * method is singular now * cleanup unsubscribed or expired push subscriptions fixed flake8 errors fixed poetry errors * updating to latest lnbits formatting, rebase error fix * remove unused? * revert * relock * remove * do not create settings table use adminsettings mypy fix * cleanup old code * catch case when client tries to recreate existing webpush subscription e.g. on cleared local storage * show notification bell on user related pages only * use local storage with one key like array, some refactoring * fixed crud import * fixed too long line * removed unused imports * ruff * make webpush editable * fixed privkey encoding * fix ruff * fix migration --------- Co-authored-by: schneimi <admin@schneimi.de> Co-authored-by: schneimi <dev@schneimi.de> Co-authored-by: dni ⚡ <office@dnilabs.com>
2023-09-11 15:48:49 +02:00
from lnbits.settings import settings
from lnbits.wallets import get_wallet_class
tasks: List[asyncio.Task] = []
def create_task(coro):
task = asyncio.create_task(coro)
tasks.append(task)
return task
def create_permanent_task(func):
return create_task(catch_everything_and_restart(func))
def cancel_all_tasks():
for task in tasks:
try:
task.cancel()
except Exception as exc:
logger.warning(f"error while cancelling task: {str(exc)}")
async def catch_everything_and_restart(func):
try:
await func()
except asyncio.CancelledError:
raise # because we must pass this up
except Exception as exc:
logger.error("caught exception in background task:", exc)
logger.error(traceback.format_exc())
logger.error("will restart the task in 5 seconds.")
await asyncio.sleep(5)
await catch_everything_and_restart(func)
async def send_push_promise(a, b) -> None:
pass
class SseListenersDict(dict):
"""
A dict of sse listeners.
"""
def __init__(self, name: Optional[str] = None):
self.name = name or f"sse_listener_{str(uuid.uuid4())[:8]}"
def __setitem__(self, key, value):
assert isinstance(key, str), f"{key} is not a string"
assert isinstance(value, asyncio.Queue), f"{value} is not an asyncio.Queue"
logger.trace(f"sse: adding listener {key} to {self.name}. len = {len(self)+1}")
return super().__setitem__(key, value)
def __delitem__(self, key):
logger.trace(f"sse: removing listener from {self.name}. len = {len(self)-1}")
return super().__delitem__(key)
_RaiseKeyError = object() # singleton for no-default behavior
def pop(self, key, v=_RaiseKeyError) -> None:
logger.trace(f"sse: removing listener from {self.name}. len = {len(self)-1}")
return super().pop(key)
invoice_listeners: Dict[str, asyncio.Queue] = SseListenersDict("invoice_listeners")
def register_invoice_listener(send_chan: asyncio.Queue, name: Optional[str] = None):
"""
A method intended for extensions (and core/tasks.py) to call when they want to be
notified about new invoice payments incoming. Will emit all incoming payments.
"""
name_unique = f"{name or 'no_name'}_{str(uuid.uuid4())[:8]}"
logger.trace(f"sse: registering invoice listener {name_unique}")
invoice_listeners[name_unique] = send_chan
async def webhook_handler():
"""
Returns the webhook_handler for the selected wallet if present. Used by API.
"""
2022-10-05 09:46:59 +02:00
WALLET = get_wallet_class()
handler = getattr(WALLET, "webhook_listener", None)
if handler:
return await handler()
2021-09-11 11:02:48 +02:00
raise HTTPException(status_code=HTTPStatus.NO_CONTENT)
2022-07-19 18:51:35 +02:00
internal_invoice_queue: asyncio.Queue = asyncio.Queue(0)
async def internal_invoice_listener():
"""
internal_invoice_queue will be filled directly in core/services.py
after the payment was deemed to be settled internally.
Called by the app startup sequence.
"""
while True:
checking_id = await internal_invoice_queue.get()
logger.info("> got internal payment notification", checking_id)
asyncio.create_task(invoice_callback_dispatcher(checking_id))
async def invoice_listener():
"""
invoice_listener will collect all invoices that come directly
from the backend wallet.
Called by the app startup sequence.
"""
2022-10-05 09:46:59 +02:00
WALLET = get_wallet_class()
async for checking_id in WALLET.paid_invoices_stream():
logger.info("> got a payment notification", checking_id)
asyncio.create_task(invoice_callback_dispatcher(checking_id))
async def check_pending_payments():
"""
check_pending_payments is called during startup to check for pending payments with
the backend and also to delete expired invoices. Incoming payments will be
checked only once, outgoing pending payments will be checked regularly.
"""
outgoing = True
incoming = True
while True:
async with db.connect() as conn:
logger.info(
f"Task: checking all pending payments (incoming={incoming},"
f" outgoing={outgoing}) of last 15 days"
)
start_time = time.time()
pending_payments = await get_payments(
since=(int(time.time()) - 60 * 60 * 24 * 15), # 15 days ago
complete=False,
pending=True,
outgoing=outgoing,
incoming=incoming,
exclude_uncheckable=True,
conn=conn,
)
for payment in pending_payments:
await payment.check_status(conn=conn)
logger.info(
f"Task: pending check finished for {len(pending_payments)} payments"
f" (took {time.time() - start_time:0.3f} s)"
)
# we delete expired invoices once upon the first pending check
if incoming:
logger.debug("Task: deleting all expired invoices")
start_time = time.time()
await delete_expired_invoices(conn=conn)
logger.info(
"Task: expired invoice deletion finished (took"
f" {time.time() - start_time:0.3f} s)"
)
# after the first check we will only check outgoing, not incoming
# that will be handled by the global invoice listeners, hopefully
incoming = False
await asyncio.sleep(60 * 30) # every 30 minutes
2021-04-17 23:27:15 +02:00
async def perform_balance_checks():
while True:
for bc in await get_balance_checks():
await redeem_lnurl_withdraw(bc.wallet, bc.url)
2021-04-17 23:27:15 +02:00
await asyncio.sleep(60 * 60 * 6) # every 6 hours
2021-04-17 23:27:15 +02:00
async def invoice_callback_dispatcher(checking_id: str):
"""
Takes incoming payments, sets pending=False, and dispatches them to
invoice_listeners from core and extensions.
"""
payment = await get_standalone_payment(checking_id, incoming=True)
if payment and payment.is_in:
logger.trace(f"sse sending invoice callback for payment {checking_id}")
await payment.set_pending(False)
for chan_name, send_chan in invoice_listeners.items():
logger.trace(f"sse sending to chan: {chan_name}")
await send_chan.put(payment)
[FEAT] Push notification integration into core (#1393) * push notification integration into core added missing component fixed bell working on all pages - made pubkey global template env var - had to move `get_push_notification_pubkey` to `helpers.py` because of circular reference with `tasks.py` formay trying to fix mypy added py-vapid to requirements Trying to fix stub mypy issue * removed key files * webpush key pair is saved in db `webpush_settings` * removed lnaddress extension changes * support for multi user account subscriptions, subscriptions are stored user based fixed syntax error fixed syntax error removed unused line * fixed subscribed user storage with local storage, no get request required * method is singular now * cleanup unsubscribed or expired push subscriptions fixed flake8 errors fixed poetry errors * updating to latest lnbits formatting, rebase error fix * remove unused? * revert * relock * remove * do not create settings table use adminsettings mypy fix * cleanup old code * catch case when client tries to recreate existing webpush subscription e.g. on cleared local storage * show notification bell on user related pages only * use local storage with one key like array, some refactoring * fixed crud import * fixed too long line * removed unused imports * ruff * make webpush editable * fixed privkey encoding * fix ruff * fix migration --------- Co-authored-by: schneimi <admin@schneimi.de> Co-authored-by: schneimi <dev@schneimi.de> Co-authored-by: dni ⚡ <office@dnilabs.com>
2023-09-11 15:48:49 +02:00
async def send_push_notification(subscription, title, body, url=""):
vapid = Vapid()
try:
logger.debug("sending push notification")
webpush(
json.loads(subscription.data),
json.dumps({"title": title, "body": body, "url": url}),
vapid.from_pem(bytes(settings.lnbits_webpush_privkey, "utf-8")),
{"aud": "", "sub": "mailto:alan@lnbits.com"},
)
except WebPushException as e:
if e.response.status_code == HTTPStatus.GONE:
# cleanup unsubscribed or expired push subscriptions
await delete_webpush_subscriptions(subscription.endpoint)
else:
logger.error(f"failed sending push notification: {e.response.text}")