Fusion44s relay_manager added to nostradmin

This commit is contained in:
Ben Arc 2022-02-17 23:17:44 +00:00
parent 769b966806
commit 14b95d3ba0
2 changed files with 121 additions and 9 deletions

View file

@ -0,0 +1,105 @@
import asyncio
import logging
from typing import Any, Dict
import websockets
# Many thanks to Fusion44 for https://gist.github.com/fusion44/9af0b054b4609012752b0c8c1dafbd4a
class Relay:
url: str
read: bool
write: bool
active: bool
_connection: Any
def __init__(self, url: str, read: bool, write: bool, active: bool) -> None:
self.url = url
self.read = read
self.write = write
self.active = active
async def connect(self):
self._connection = await websockets.connect(self.url)
async def send(self, message):
if not self.write or not self.active:
raise RuntimeError("Can't send to a relay that is inactive or not writable")
await self._connection.send(message)
async def listen(self):
if not self.read or not self.active:
raise RuntimeError(
"Can't listen to a relay that is inactive or not readable"
)
while True:
try:
msg = await self._connection.recv()
yield msg
except websockets.ConnectionClosedOK:
break
class RelayManager:
_relays: Dict[str, Relay] = {}
_tasks: Dict[str, asyncio.Task] = {}
msg_channel = asyncio.Queue()
def __init__(self, enable_ws_debugger=False) -> None:
if enable_ws_debugger:
logger = logging.getLogger("websockets")
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
pass
async def add_relay(self, relay: Relay):
if relay.url in self._relays.keys():
raise RuntimeError(f"Relay {relay.url} exists")
if relay.active and relay.read:
await relay.connect()
loop = asyncio.get_event_loop()
t = loop.create_task(self._relay_listener(relay, self.msg_channel))
self._tasks[relay.url] = t
self._relays[relay.url] = relay
async def remove_relay(self, url: str):
if url in self._relays.keys():
relay = self._relays[url]
if relay.active:
t = self._tasks[url]
t.cancel()
try:
await t
except asyncio.CancelledError:
print(f"Canceled task {relay.url}")
def get_relay(self, url: str) -> Relay:
if not url in self._relays.keys():
raise RuntimeError("Relay not found")
return self._relays.get(url)
async def update_relay(self, relay: Relay):
if relay.url in self._relays.keys():
# remove the old relay
await self.remove_relay(relay.url)
# add the new relay
await self.add_relay(relay)
else:
raise RuntimeError("Unknown Relay")
async def send_to_all(self, message):
for r in self._relays.values():
if r.write:
await r.send(message=message)
async def _relay_listener(self, relay: Relay, msg_chan: asyncio.Queue):
print(f"listening for {relay.url}")
async for msg in relay.listen():
print(msg)
msg_chan.put_nowait(msg)
print(f"STOP listening for {relay.url}")

View file

@ -18,6 +18,7 @@ from lnbits.core.views.api import api_payment
from lnbits.decorators import check_user_exists from lnbits.decorators import check_user_exists
from .crud import get_nostrkeys, get_nostrrelay from .crud import get_nostrkeys, get_nostrrelay
from .relay_manager import RelayManager, Relay
templates = Jinja2Templates(directory="templates") templates = Jinja2Templates(directory="templates")
@ -38,15 +39,17 @@ async def index(request: Request, user: User = Depends(check_user_exists)):
websocket_queue = asyncio.Queue(1000) websocket_queue = asyncio.Queue(1000)
# while True:
async def nostr_subscribe():
return
# for the relays:
# async with websockets.connect("ws://localhost:8765") as websocket:
# for the public keys:
# await websocket.send("subscribe to events")
# await websocket.recv()
mgr: RelayManager = RelayManager(enable_ws_debugger=False)
# listen for events coming from relays
async def connectToNostr():
while True:
e = await mgr.msg_channel.get()
print(e)
connectToNostr
##################################################################### #####################################################################
################### LNBITS WEBSOCKET ROUTES ######################### ################### LNBITS WEBSOCKET ROUTES #########################
#### HERE IS WHERE LNBITS FRONTEND CAN RECEIVE AND SEND MESSAGES #### #### HERE IS WHERE LNBITS FRONTEND CAN RECEIVE AND SEND MESSAGES ####
@ -97,7 +100,11 @@ async def updater(nostr_id, message):
async def relay_check(relay: str): async def relay_check(relay: str):
async with websockets.connect(relay) as websocket: async with websockets.connect(relay) as websocket:
if str(websocket.state) == "State.OPEN": if str(websocket.state) == "State.OPEN":
print(str(websocket.state)) r = Relay(url=relay, read=True, write=True, active=True)
try:
await mgr.add_relay(r)
except:
None
return True return True
else: else:
return False return False