lnbits-legend/lnbits/core/views/websocket_api.py
dni ⚡ 6d5ad9e229
chore: adhere to ruff's "N" rules (#2377)
* chore: adhere to ruff's "N" rules

WARN: reinstall failing extensions!

bunch of more consistent variable naming. inspired by this issue.
https://github.com/lnbits/lnbits/issues/2308

* fixup! chore: adhere to ruff's "N" rules
* rename to funding_source
* skip jmeter

---------

Co-authored-by: Pavol Rusnak <pavol@rusnak.io>
2024-04-15 09:02:21 +02:00

40 lines
1 KiB
Python

from fastapi import (
APIRouter,
WebSocket,
WebSocketDisconnect,
)
from ..services import (
websocket_manager,
websocket_updater,
)
websocket_router = APIRouter(prefix="/api/v1/ws", tags=["Websocket"])
@websocket_router.websocket("/{item_id}")
async def websocket_connect(websocket: WebSocket, item_id: str):
await websocket_manager.connect(websocket, item_id)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
websocket_manager.disconnect(websocket)
@websocket_router.post("/{item_id}")
async def websocket_update_post(item_id: str, data: str):
try:
await websocket_updater(item_id, data)
return {"sent": True, "data": data}
except Exception:
return {"sent": False, "data": data}
@websocket_router.get("/{item_id}/{data}")
async def websocket_update_get(item_id: str, data: str):
try:
await websocket_updater(item_id, data)
return {"sent": True, "data": data}
except Exception:
return {"sent": False, "data": data}