lnbits-legend/lnbits/extensions/diagonalley/notifier.py
2022-09-28 17:02:15 +01:00

83 lines
2.7 KiB
Python

## adapted from https://github.com/Sentymental/chat-fastapi-websocket
"""
Create a class Notifier that will handle messages
and delivery to the specific person
"""
from collections import defaultdict
from fastapi import WebSocket
from loguru import logger
class Notifier:
"""
Manages chatrooms, sessions and members.
Methods:
- get_notification_generator(self): async generator with notification messages
- get_members(self, room_name: str): get members in room
- push(message: str, room_name: str): push message
- connect(websocket: WebSocket, room_name: str): connect to room
- remove(websocket: WebSocket, room_name: str): remove
- _notify(message: str, room_name: str): notifier
"""
def __init__(self):
# Create sessions as a dict:
self.sessions: dict = defaultdict(dict)
# Create notification generator:
self.generator = self.get_notification_generator()
async def get_notification_generator(self):
"""Notification Generator"""
while True:
message = yield
msg = message["message"]
room_name = message["room_name"]
await self._notify(msg, room_name)
def get_members(self, room_name: str):
"""Get all members in a room"""
try:
logger.info(f"Looking for members in room: {room_name}")
return self.sessions[room_name]
except Exception:
logger.exception(f"There is no member in room: {room_name}")
return None
async def push(self, message: str, room_name: str = None):
"""Push a message"""
message_body = {"message": message, "room_name": room_name}
await self.generator.asend(message_body)
async def connect(self, websocket: WebSocket, room_name: str):
"""Connect to room"""
await websocket.accept()
if self.sessions[room_name] == {} or len(self.sessions[room_name]) == 0:
self.sessions[room_name] = []
self.sessions[room_name].append(websocket)
print(f"Connections ...: {self.sessions[room_name]}")
def remove(self, websocket: WebSocket, room_name: str):
"""Remove websocket from room"""
self.sessions[room_name].remove(websocket)
print(f"Connection removed...\nOpen connections...: {self.sessions[room_name]}")
async def _notify(self, message: str, room_name: str):
"""Notifier"""
remaining_sessions = []
while len(self.sessions[room_name]) > 0:
websocket = self.sessions[room_name].pop()
await websocket.send_text(message)
remaining_sessions.append(websocket)
self.sessions[room_name] = remaining_sessions