From ac87c4960361f244c726597d60957262c222dc8c Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Fri, 16 Dec 2022 17:18:02 +0100 Subject: [PATCH] fix: disable compression --- lnbits/core/views/api.py | 2 +- lnbits/wallets/lnbits.py | 30 +++++++++++++++++++----------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/lnbits/core/views/api.py b/lnbits/core/views/api.py index 21342d686..3aa0ec00e 100644 --- a/lnbits/core/views/api.py +++ b/lnbits/core/views/api.py @@ -416,7 +416,7 @@ async def subscribe_wallet_invoices(request: Request, wallet: Wallet): yield dict(data=jdata, event=typ) except asyncio.CancelledError as e: - logger.debug(f"CancelledError on listener {uid}: {e}") + logger.debug(f"removing listener for wallet {uid}") api_invoice_listeners.pop(uid) task.cancel() return diff --git a/lnbits/wallets/lnbits.py b/lnbits/wallets/lnbits.py index ddd80e770..3a3cfb2ea 100644 --- a/lnbits/wallets/lnbits.py +++ b/lnbits/wallets/lnbits.py @@ -147,18 +147,26 @@ class LNbitsWallet(Wallet): while True: try: async with httpx.AsyncClient(timeout=None, headers=self.key) as client: - async with client.stream("GET", url) as r: + del client.headers[ + "accept-encoding" + ] # we have to disable compression for SSEs + async with client.stream( + "GET", url, content="text/event-stream" + ) as r: + sse_event = False async for line in r.aiter_lines(): - if line.startswith("data:"): - try: - data = json.loads(line[5:]) - except json.decoder.JSONDecodeError: - continue - - if type(data) is not dict: - continue - - yield data["payment_hash"] # payment_hash + # The data we want to listen to is of this shape: + # event: payment-received + # data: {.., "payment_hash" : "asd"} + if line.startswith("event: payment-received"): + sse_event = True + continue + elif sse_event and line.startswith("data:"): + data = json.loads(line[len("data:") :]) + sse_event = False + yield data["payment_hash"] + else: + sse_event = False except (OSError, httpx.ReadError, httpx.ConnectError, httpx.ReadTimeout): pass