pyln: Add notifications support to LightningRpc

Changelog-Added: pyln-client: Added support for command notifications to LightningRpc via the `notify` context-manager.
This commit is contained in:
Christian Decker 2021-01-02 14:28:31 +01:00
parent 84b3653606
commit b6650425b9

View File

@ -1,12 +1,13 @@
from decimal import Decimal
from math import floor, log10
from typing import Optional, Union
import json import json
import logging import logging
import os import os
import socket import socket
import warnings import warnings
from contextlib import contextmanager
from decimal import Decimal
from json import JSONEncoder from json import JSONEncoder
from math import floor, log10
from typing import Optional, Union
def _patched_default(self, obj): def _patched_default(self, obj):
@ -283,8 +284,9 @@ class UnixDomainSocketRpc(object):
self.decoder = decoder self.decoder = decoder
self.executor = executor self.executor = executor
self.logger = logger self.logger = logger
self._notify = None
self.next_id = 0 self.next_id = 1
def _writeobj(self, sock, obj): def _writeobj(self, sock, obj):
s = json.dumps(obj, ensure_ascii=False, cls=self.encoder_cls) s = json.dumps(obj, ensure_ascii=False, cls=self.encoder_cls)
@ -334,18 +336,44 @@ class UnixDomainSocketRpc(object):
# FIXME: we open a new socket for every readobj call... # FIXME: we open a new socket for every readobj call...
sock = UnixSocket(self.socket_path) sock = UnixSocket(self.socket_path)
this_id = self.next_id this_id = self.next_id
self._writeobj(sock, { self.next_id += 0
buf = b''
if self._notify is not None:
# Opt into the notifications support
self._writeobj(sock, {
"jsonrpc": "2.0",
"method": "notifications",
"id": 0,
"params": {
"enable": True
},
})
_, buf = self._readobj(sock, buf)
request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": method, "method": method,
"params": payload, "params": payload,
"id": this_id, "id": this_id,
}) }
self.next_id += 1
buf = b'' self._writeobj(sock, request)
while True: while True:
resp, buf = self._readobj(sock, buf) resp, buf = self._readobj(sock, buf)
# FIXME: We should offer a callback for notifications. id = resp.get("id", None)
if 'method' not in resp or 'id' in resp: meth = resp.get("method", None)
if meth == 'message' and self._notify is not None:
n = resp['params']
self._notify(
message=n.get('message', None),
progress=n.get('progress', None),
request=request
)
continue
if meth is None or id is None:
break break
self.logger.debug("Received response for %s call: %r", method, resp) self.logger.debug("Received response for %s call: %r", method, resp)
@ -361,6 +389,31 @@ class UnixDomainSocketRpc(object):
raise ValueError("Malformed response, \"result\" missing.") raise ValueError("Malformed response, \"result\" missing.")
return resp["result"] return resp["result"]
@contextmanager
def notify(self, fn):
"""Register a notification callback to use for a set of RPC calls.
This is a context manager and should be used like this:
```python
def fn(message, progress, request, **kwargs):
print(message)
with rpc.notify(fn):
rpc.somemethod()
```
The `fn` function will be called once for each notification
the is sent by `somemethod`. This is a context manager,
meaning that multiple commands can share the same context, and
the same notification function.
"""
old = self._notify
self._notify = fn
yield
self._notify = old
class LightningRpc(UnixDomainSocketRpc): class LightningRpc(UnixDomainSocketRpc):
""" """