From 95e8573ff83a81ff8dc794c35a86efdee55ea372 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Tue, 6 Oct 2020 00:39:54 -0300 Subject: [PATCH] SSE endpoint for paid invoices. also move very essential stuff from core/tasks.py to tasks.py so things are more organized. --- lnbits/app.py | 11 ++-- lnbits/core/__init__.py | 6 ++- lnbits/core/tasks.py | 81 ++++------------------------ lnbits/core/views/api.py | 73 +++++++++++++++++++++---- lnbits/core/views/generic.py | 2 +- lnbits/db.py | 3 ++ lnbits/extensions/lnurlp/__init__.py | 2 +- lnbits/extensions/lnurlp/tasks.py | 1 - lnbits/tasks.py | 81 ++++++++++++++++++++++++++++ 9 files changed, 167 insertions(+), 93 deletions(-) create mode 100644 lnbits/tasks.py diff --git a/lnbits/app.py b/lnbits/app.py index 64e286c54..1ed2ce29f 100644 --- a/lnbits/app.py +++ b/lnbits/app.py @@ -12,6 +12,7 @@ from .core import core_app from .db import open_db, open_ext_db from .helpers import get_valid_extensions, get_js_vendored, get_css_vendored, url_for_vendored from .proxy_fix import ASGIProxyFix +from .tasks import invoice_listener, webhook_handler, grab_app_for_later secure_headers = SecureHeaders(hsts=False) @@ -33,6 +34,7 @@ def create_app(config_object="lnbits.settings") -> QuartTrio: register_commands(app) register_request_hooks(app) register_async_tasks(app) + grab_app_for_later(app) return app @@ -52,7 +54,7 @@ def register_blueprints(app: QuartTrio) -> None: @bp.teardown_request async def after_request(exc): - g.ext_db.__exit__(type(exc), exc, None) + g.ext_db.close() app.register_blueprint(bp, url_prefix=f"/{ext.code}") except Exception: @@ -90,6 +92,7 @@ def register_request_hooks(app: QuartTrio): @app.before_request async def before_request(): g.db = open_db() + g.nursery = app.nursery @app.after_request async def set_secure_headers(response): @@ -98,12 +101,10 @@ def register_request_hooks(app: QuartTrio): @app.teardown_request async def after_request(exc): - g.db.__exit__(type(exc), exc, None) + g.db.close() def register_async_tasks(app): - from lnbits.core.tasks import invoice_listener, webhook_handler - @app.route("/wallet/webhook", methods=["GET", "POST", "PUT", "PATCH", "DELETE"]) async def webhook_listener(): return await webhook_handler() @@ -111,7 +112,7 @@ def register_async_tasks(app): @app.before_serving async def listeners(): app.nursery.start_soon(invoice_listener) - print("started invoice_listener") + print("started global invoice_listener.") @app.after_serving async def stop_listeners(): diff --git a/lnbits/core/__init__.py b/lnbits/core/__init__.py index e34c68c5d..7ccaded9e 100644 --- a/lnbits/core/__init__.py +++ b/lnbits/core/__init__.py @@ -8,6 +8,8 @@ core_app: Blueprint = Blueprint( from .views.api import * # noqa from .views.generic import * # noqa -from .tasks import grab_app_for_later +from .tasks import on_invoice_paid -core_app.record(grab_app_for_later) +from lnbits.tasks import register_invoice_listener + +register_invoice_listener("core", on_invoice_paid) diff --git a/lnbits/core/tasks.py b/lnbits/core/tasks.py index bafc47406..577682370 100644 --- a/lnbits/core/tasks.py +++ b/lnbits/core/tasks.py @@ -1,78 +1,15 @@ import trio # type: ignore -from http import HTTPStatus -from typing import Optional, Tuple, List, Callable, Awaitable -from quart import Request, g -from quart_trio import QuartTrio -from werkzeug.datastructures import Headers - -from lnbits.db import open_db, open_ext_db -from lnbits.settings import WALLET +from typing import List from .models import Payment -from .crud import get_standalone_payment -main_app: Optional[QuartTrio] = None +sse_listeners: List[trio.MemorySendChannel] = [] -def grab_app_for_later(state): - global main_app - main_app = state.app - - -async def send_push_promise(a, b) -> None: - pass - - -async def run_on_pseudo_request(func: Callable, *args): - fk = Request( - "GET", - "http", - "/background/pseudo", - b"", - Headers([("host", "lnbits.background")]), - "", - "1.1", - send_push_promise=send_push_promise, - ) - assert main_app - - async def run(): - async with main_app.request_context(fk): - with open_db() as g.db: # type: ignore - await func(*args) - - async with trio.open_nursery() as nursery: - nursery.start_soon(run) - - -invoice_listeners: List[Tuple[str, Callable[[Payment], Awaitable[None]]]] = [] - - -def register_invoice_listener(ext_name: str, cb: Callable[[Payment], Awaitable[None]]): - """ - A method intended for extensions to call when they want to be notified about - new invoice payments incoming. - """ - print(f"registering {ext_name} invoice_listener callback: {cb}") - invoice_listeners.append((ext_name, cb)) - - -async def webhook_handler(): - handler = getattr(WALLET, "webhook_listener", None) - if handler: - return await handler() - return "", HTTPStatus.NO_CONTENT - - -async def invoice_listener(): - async for checking_id in WALLET.paid_invoices_stream(): - await run_on_pseudo_request(invoice_callback_dispatcher, checking_id) - - -async def invoice_callback_dispatcher(checking_id: str): - payment = get_standalone_payment(checking_id) - if payment and payment.is_in: - payment.set_pending(False) - for ext_name, cb in invoice_listeners: - with open_ext_db(ext_name) as g.ext_db: # type: ignore - await cb(payment) +async def on_invoice_paid(payment: Payment): + for send_channel in sse_listeners: + try: + send_channel.send_nowait(payment) + except trio.WouldBlock: + print("removing sse listener", send_channel) + sse_listeners.remove(send_channel) diff --git a/lnbits/core/views/api.py b/lnbits/core/views/api.py index 72c23c771..1aecbc490 100644 --- a/lnbits/core/views/api.py +++ b/lnbits/core/views/api.py @@ -1,25 +1,23 @@ -from quart import g, jsonify, request +import trio # type: ignore +import json +from quart import g, jsonify, request, make_response from http import HTTPStatus from binascii import unhexlify from lnbits import bolt11 -from lnbits.core import core_app -from lnbits.core.services import create_invoice, pay_invoice -from lnbits.core.crud import delete_expired_invoices from lnbits.decorators import api_check_wallet_key, api_validate_post_request +from .. import core_app +from ..services import create_invoice, pay_invoice +from ..crud import delete_expired_invoices +from ..tasks import sse_listeners + @core_app.route("/api/v1/wallet", methods=["GET"]) @api_check_wallet_key("invoice") async def api_wallet(): return ( - jsonify( - { - "id": g.wallet.id, - "name": g.wallet.name, - "balance": g.wallet.balance_msat, - } - ), + jsonify({"id": g.wallet.id, "name": g.wallet.name, "balance": g.wallet.balance_msat,}), HTTPStatus.OK, ) @@ -124,3 +122,56 @@ async def api_payment(payment_hash): return jsonify({"paid": False}), HTTPStatus.OK return jsonify({"paid": not payment.pending}), HTTPStatus.OK + + +@core_app.route("/api/v1/payments/sse", methods=["GET"]) +@api_check_wallet_key("invoice") +async def api_payments_sse(): + g.db.close() + + send_payment, receive_payment = trio.open_memory_channel(0) + + print("adding sse listener", send_payment) + sse_listeners.append(send_payment) + + send_event, receive_event = trio.open_memory_channel(0) + + async def payment_received() -> None: + async for payment in receive_payment: + await send_event.send(("payment", payment)) + + async def repeat_keepalive(): + await trio.sleep(1) + while True: + await send_event.send(("keepalive", "")) + await trio.sleep(25) + + g.nursery.start_soon(payment_received) + g.nursery.start_soon(repeat_keepalive) + + async def send_events(): + try: + async for typ, data in receive_event: + message = [f"event: {typ}".encode("utf-8")] + + if data: + jdata = json.dumps(data) + message.append(f"data: {jdata}".encode("utf-8")) + + yield b"\n".join(message) + b"\r\n\r\n" + except trio.Cancelled: + print("canceled!") + return + + response = await make_response( + send_events(), + { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + "Connection": "keep-alive", + "Transfer-Encoding": "chunked", + }, + ) + response.timeout = None + return response diff --git a/lnbits/core/views/generic.py b/lnbits/core/views/generic.py index 46fa31bc2..4ff17981c 100644 --- a/lnbits/core/views/generic.py +++ b/lnbits/core/views/generic.py @@ -8,6 +8,7 @@ from lnurl import LnurlResponse, LnurlWithdrawResponse, decode as decode_lnurl from lnbits.core import core_app from lnbits.decorators import check_user_exists, validate_uuids from lnbits.settings import LNBITS_ALLOWED_USERS, SERVICE_FEE +from lnbits.tasks import run_on_pseudo_request from ..crud import ( create_account, @@ -17,7 +18,6 @@ from ..crud import ( delete_wallet, ) from ..services import redeem_lnurl_withdraw -from ..tasks import run_on_pseudo_request @core_app.route("/favicon.ico") diff --git a/lnbits/db.py b/lnbits/db.py index 72d8660ab..b7c9e0235 100644 --- a/lnbits/db.py +++ b/lnbits/db.py @@ -12,6 +12,9 @@ class Database: self.cursor = self.connection.cursor() self.closed = False + def close(self): + self.__exit__(None, None, None) + def __enter__(self): return self diff --git a/lnbits/extensions/lnurlp/__init__.py b/lnbits/extensions/lnurlp/__init__.py index 2e3e16834..a4bc65f37 100644 --- a/lnbits/extensions/lnurlp/__init__.py +++ b/lnbits/extensions/lnurlp/__init__.py @@ -9,6 +9,6 @@ from .views import * # noqa from .lnurl import * # noqa from .tasks import on_invoice_paid -from lnbits.core.tasks import register_invoice_listener +from lnbits.tasks import register_invoice_listener register_invoice_listener("lnurlp", on_invoice_paid) diff --git a/lnbits/extensions/lnurlp/tasks.py b/lnbits/extensions/lnurlp/tasks.py index 49fb61b00..37d245597 100644 --- a/lnbits/extensions/lnurlp/tasks.py +++ b/lnbits/extensions/lnurlp/tasks.py @@ -6,7 +6,6 @@ from .crud import get_pay_link_by_invoice, mark_webhook_sent async def on_invoice_paid(payment: Payment) -> None: - print(payment) islnurlp = "lnurlp" == payment.extra.get("tag") if islnurlp: pay_link = get_pay_link_by_invoice(payment.payment_hash) diff --git a/lnbits/tasks.py b/lnbits/tasks.py new file mode 100644 index 000000000..7a5bf7184 --- /dev/null +++ b/lnbits/tasks.py @@ -0,0 +1,81 @@ +import trio # type: ignore +from http import HTTPStatus +from typing import Optional, Tuple, List, Callable, Awaitable +from quart import Request, g +from quart_trio import QuartTrio +from werkzeug.datastructures import Headers + +from lnbits.db import open_db, open_ext_db +from lnbits.settings import WALLET + +from lnbits.core.models import Payment +from lnbits.core.crud import get_standalone_payment + +main_app: Optional[QuartTrio] = None + + +def grab_app_for_later(app: QuartTrio): + global main_app + main_app = app + + +async def send_push_promise(a, b) -> None: + pass + + +async def run_on_pseudo_request(func: Callable, *args): + fk = Request( + "GET", + "http", + "/background/pseudo", + b"", + Headers([("host", "lnbits.background")]), + "", + "1.1", + send_push_promise=send_push_promise, + ) + assert main_app + + async def run(): + async with main_app.request_context(fk): + with open_db() as g.db: # type: ignore + await func(*args) + + async with trio.open_nursery() as nursery: + nursery.start_soon(run) + + +invoice_listeners: List[Tuple[str, Callable[[Payment], Awaitable[None]]]] = [] + + +def register_invoice_listener(ext_name: str, cb: Callable[[Payment], Awaitable[None]]): + """ + A method intended for extensions to call when they want to be notified about + new invoice payments incoming. + """ + print(f"registering {ext_name} invoice_listener callback: {cb}") + invoice_listeners.append((ext_name, cb)) + + +async def webhook_handler(): + handler = getattr(WALLET, "webhook_listener", None) + if handler: + return await handler() + return "", HTTPStatus.NO_CONTENT + + +async def invoice_listener(): + async for checking_id in WALLET.paid_invoices_stream(): + await run_on_pseudo_request(invoice_callback_dispatcher, checking_id) + + +async def invoice_callback_dispatcher(checking_id: str): + payment = get_standalone_payment(checking_id) + if payment and payment.is_in: + payment.set_pending(False) + for ext_name, cb in invoice_listeners: + if ext_name == "core": + await cb(payment) + else: + with open_ext_db(ext_name) as g.ext_db: # type: ignore + await cb(payment)