core-lightning/tools/reckless

911 lines
34 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env python3
from subprocess import Popen, PIPE, TimeoutExpired, run
import sys
import json
import os
import argparse
from pathlib import Path, PosixPath
import shutil
import tempfile
2022-10-21 20:51:13 +02:00
from typing import Union
from urllib.parse import urlparse
from urllib.request import urlopen
import logging
import copy
logging.basicConfig(
level=logging.DEBUG,
format='[%(asctime)s] %(levelname)s: %(message)s',
handlers=[logging.StreamHandler(stream=sys.stdout)],
)
repos = ['https://github.com/lightningd/plugins']
2023-04-07 00:04:44 +02:00
def py_entry_guesses(name) -> list:
return [name, f'{name}.py', '__init__.py']
2023-04-07 00:04:44 +02:00
def unsupported_entry(name) -> list:
return [f'{name}.go', f'{name}.sh']
2023-04-07 00:04:44 +02:00
def entry_guesses(name: str) -> list:
guesses = []
for inst in INSTALLERS:
for entry in inst.entries:
guesses.append(entry.format(name=name))
return guesses
class Installer:
'''
The identification of a plugin language, compiler or interpreter
availability, and the install procedures.
'''
2023-04-07 00:04:44 +02:00
def __init__(self, name: str, mimetype: str,
exe: Union[str, None] = None,
compiler: Union[str, None] = None,
manager: Union[str, None] = None,
entry: Union[str, None] = None):
self.name = name
self.mimetype = mimetype
self.entries = []
if entry:
self.entries.append(entry)
self.exe = exe # interpreter (if required)
self.compiler = compiler # compiler bin (if required)
self.manager = manager # dependency manager (if required)
self.dependency_file = None
self.dependency_call = None
def __repr__(self):
return (f'<Installer {self.name}: mimetype: {self.mimetype}, '
f'exe: {self.exe}, manager: {self.manager}>')
2023-04-07 00:04:44 +02:00
def executable(self) -> bool:
'''Validate the necessary bins are available to execute the plugin.'''
if self.exe:
if shutil.which(self.exe):
# This should arguably not be checked here.
if self.manager:
if shutil.which(self.manager):
return True
return False
return True
return False
2023-04-07 00:04:44 +02:00
return True
2023-04-07 00:04:44 +02:00
def installable(self) -> bool:
'''Validate the necessary compiler and package manager executables are
available to install. If these are defined, they are considered
mandatory even though the user may have the requisite packages already
installed.'''
if self.compiler and not shutil.which(self.compiler):
return False
if self.manager and not shutil.which(self.manager):
return False
return True
def add_entrypoint(self, entry: str):
self.entries.append(entry)
def add_dependency_file(self, dep: str):
self.dependency_file = dep
def add_dependency_call(self, call: list):
if self.dependency_call is None:
self.dependency_call = []
self.dependency_call.append(call)
def copy(self):
return copy.deepcopy(self)
class InstInfo:
def __init__(self, name, url, git_url):
self.name = name
self.repo = url # Used for 'git clone'
self.git_url = git_url # API access for github repos
self.entry = None
self.deps = None
self.subdir = None
self.commit = None
def __repr__(self):
return (f'InstInfo({self.name}, {self.repo}, {self.git_url}, '
f'{self.entry}, {self.deps})')
2023-04-07 00:04:44 +02:00
def get_inst_details(self) -> bool:
"""
Populate installation details from a github repo url.
Return True if all data is found.
"""
if "api.github.com" in self.git_url:
# This lets us redirect to handle blackbox testing
redir_addr = (API_GITHUB_COM +
self.git_url.split("api.github.com")[-1])
r = urlopen(redir_addr, timeout=5)
else:
r = urlopen(self.git_url, timeout=5)
if r.status != 200:
return False
if 'git/tree' in self.git_url:
tree = json.loads(r.read().decode())['tree']
else:
tree = json.loads(r.read().decode())
for g in entry_guesses(self.name):
for f in tree:
if f['path'].lower() == g.lower():
self.entry = f['path']
break
if self.entry is not None:
break
if self.entry is None:
for g in unsupported_entry(self.name):
for f in tree:
if f['path'] == g:
# FIXME: This should be easier to implement
print(f'entrypoint {g} is not yet supported')
return False
for inst in INSTALLERS:
# FIXME: Allow multiple depencencies
for f in tree:
if f['path'] == inst.dependency_file:
return True
if not self.entry:
return False
if not self.deps:
return False
return True
def create_dir(r: int, directory: PosixPath) -> bool:
"""Creation of a directory at path `d` with a maximum new dir depth `r`"""
if directory.exists():
return True
if r <= 0:
return False
if create_dir(r-1, directory.parent):
os.mkdir(directory, 0o777)
print(f'created directory {directory}')
assert directory.exists()
2022-10-21 20:51:13 +02:00
return True
return False
2022-10-21 20:51:13 +02:00
def remove_dir(target: str) -> bool:
try:
shutil.rmtree(target)
return True
except NotADirectoryError:
print(f"Tried to remove directory {target} that does not exist.")
except PermissionError:
print(f"Permission denied removing dir: {target}")
return False
class Config():
"""A generic class for procuring, reading and editing config files"""
2022-10-21 20:51:13 +02:00
def obtain_config(self,
config_path: str,
default_text: str,
warn: bool = False) -> str:
"""Return a config file from the desired location. Create one with
default_text if it cannot be found."""
if isinstance(config_path, type(None)):
raise Exception("Generic config must be passed a config_path.")
assert isinstance(config_path, str)
# FIXME: warn if reckless dir exists, but conf not found
if Path(config_path).exists():
with open(config_path, 'r+') as f:
config_content = f.readlines()
return config_content
print(f'config file not found: {config_path}')
if warn:
confirm = input('press [Y] to create one now.\n').upper() == 'Y'
else:
confirm = True
if not confirm:
sys.exit(1)
parent_path = Path(config_path).parent
# Create up to one parent in the directory tree.
if create_dir(1, parent_path):
with open(self.conf_fp, 'w') as f:
f.write(default_text)
# FIXME: Handle write failure
return default_text
else:
logging.debug('could not create the parent directory ' +
parent_path)
raise FileNotFoundError('invalid parent directory')
def editConfigFile(self, addline: Union[str, None],
removeline: Union[str, None]):
"""Idempotent function to add and/or remove a single line each."""
remove_these_lines = []
with open(self.conf_fp, 'r') as reckless_conf:
original = reckless_conf.readlines()
empty_lines = []
write_required = False
for n, l in enumerate(original):
if removeline and l.strip() == removeline.strip():
write_required = True
remove_these_lines.append(n)
continue
if l.strip() == '':
empty_lines.append(n)
if n-1 in empty_lines:
# The white space is getting excessive.
remove_these_lines.append(n)
continue
if not addline:
return
# No write necessary if addline is already in config.
if not write_required:
for line in original:
if line.strip() == addline.strip():
return
with open(self.conf_fp, 'w') as conf_write:
# no need to write if passed 'None'
line_exists = not bool(addline)
for n, l in enumerate(original):
if n not in remove_these_lines:
if n > 0:
conf_write.write(f'\n{l.strip()}')
else:
conf_write.write(l.strip())
if addline.strip() == l.strip():
# addline is idempotent
line_exists = True
if not line_exists:
conf_write.write(f'\n{addline}')
2022-10-21 20:51:13 +02:00
def __init__(self, path: Union[str, None] = None,
default_text: Union[str, None] = None,
warn: bool = False):
assert path is not None
assert default_text is not None
self.conf_fp = path
self.content = self.obtain_config(self.conf_fp, default_text,
warn=warn)
class RecklessConfig(Config):
"""Reckless config (by default, specific to the bitcoin network only.)
This is inherited by the lightningd config and contains all reckless
maintained plugins."""
2022-10-21 20:51:13 +02:00
def enable_plugin(self, plugin_path: str):
"""Handle persistent plugin loading via config"""
self.editConfigFile(f'plugin={plugin_path}',
f'disable-plugin={plugin_path}')
2022-10-21 20:51:13 +02:00
def disable_plugin(self, plugin_path: str):
"""Handle persistent plugin disabling via config"""
self.editConfigFile(f'disable-plugin={plugin_path}',
f'plugin={plugin_path}')
2022-10-21 20:51:13 +02:00
def __init__(self, path: Union[str, None] = None,
default_text: Union[str, None] = None):
if path is None:
path = Path(LIGHTNING_DIR) / 'reckless' / 'bitcoin-reckless.conf'
if default_text is None:
default_text = (
'# This configuration file is managed by reckless to activate '
'and disable\n# reckless-installed plugins\n\n'
)
Config.__init__(self, path=str(path), default_text=default_text)
self.reckless_dir = Path(path).parent
class LightningBitcoinConfig(Config):
"""lightningd config specific to the bitcoin network. This is inherited by
the main lightningd config and in turn, inherits bitcoin-reckless.conf."""
2022-10-21 20:51:13 +02:00
def __init__(self, path: Union[str, None] = None,
default_text: Union[str, None] = None,
warn: bool = True):
if path is None:
path = Path(LIGHTNING_DIR).joinpath('bitcoin', 'config')
if default_text is None:
default_text = "# This config was autopopulated by reckless\n\n"
Config.__init__(self, path=str(path),
default_text=default_text, warn=warn)
class InferInstall():
"""Once a plugin is installed, we may need its directory and entrypoint"""
2022-10-21 20:51:13 +02:00
def __init__(self, name: str):
reck_contents = os.listdir(RECKLESS_CONFIG.reckless_dir)
reck_contents_lower = {}
for f in reck_contents:
reck_contents_lower.update({f.lower(): f})
def match_name(name) -> str:
for tier in range(0, 10):
# Look for each installers preferred entrypoint format first
for inst in INSTALLERS:
fmt = inst.entries[tier]
if '{name}' in fmt:
pre = fmt.split('{name}')[0]
post = fmt.split('{name}')[-1]
if name.startswith(pre) and name.endswith(post):
return name.lstrip(pre).rstrip(post)
else:
if fmt == name:
return name
return name
name = match_name(name)
if name.lower() in reck_contents_lower.keys():
actual_name = reck_contents_lower[name.lower()]
self.dir = Path(RECKLESS_CONFIG.reckless_dir).joinpath(actual_name)
else:
raise Exception(f"Could not find a reckless directory for {name}")
plug_dir = Path(RECKLESS_CONFIG.reckless_dir).joinpath(actual_name)
for guess in entry_guesses(actual_name):
for content in plug_dir.iterdir():
if content.name == guess:
self.entry = str(content)
self.name = actual_name
return
raise Exception(f'plugin entrypoint not found in {self.dir}')
python3pip = Installer('python3pip', 'text/x-python', exe='python3',
manager='pip', entry='{name}.py')
python3pip.add_entrypoint('{name}')
python3pip.add_entrypoint('__init__.py')
python3pip.add_dependency_file('requirements.txt')
python3pip.add_dependency_call(['pip', 'install', '-r', 'requirements.txt'])
python3pip3 = python3pip.copy()
python3pip3.manager = 'pip3'
python3pip3.dependency_call = [['pip3', 'install', '-r', 'requirements.txt']]
# Nodejs plugin installer
nodejs = Installer('nodejs', 'application/javascript', exe='node',
manager='npm', entry='{name}.js')
nodejs.add_entrypoint('{name}')
nodejs.add_dependency_call(['npm', 'install', '--omit=dev'])
nodejs.add_dependency_file('package.json')
INSTALLERS = {python3pip, python3pip3, nodejs}
2022-10-21 20:51:13 +02:00
def help_alias(targets: list):
if len(targets) == 0:
parser.print_help(sys.stdout)
else:
2022-10-21 20:51:13 +02:00
print('try "reckless {} -h"'.format(' '.join(targets)))
sys.exit(1)
2023-04-07 00:04:44 +02:00
def _search_repo(name: str, url: str) -> Union[InstInfo, None]:
"""look in given repo and, if found, populate InstInfo"""
# Remove api subdomain, subdirectories, etc.
repo = url.split('/')
while '' in repo:
repo.remove('')
repo_name = None
parsed_url = urlparse(url)
if 'github.com' not in parsed_url.netloc:
# FIXME: Handle non-github repos.
2023-04-07 00:04:44 +02:00
return None
if len(parsed_url.path.split('/')) < 2:
2023-04-07 00:04:44 +02:00
return None
start = 1
# Maybe we were passed an api.github.com/repo/<user> url
if 'api' in parsed_url.netloc:
start += 1
repo_user = parsed_url.path.split('/')[start]
repo_name = parsed_url.path.split('/')[start + 1]
# Get details from the github API.
api_url = f'{API_GITHUB_COM}/repos/{repo_user}/{repo_name}/contents/'
plugins_cont = api_url
r = urlopen(plugins_cont, timeout=5)
if r.status != 200:
print(f"Plugin repository {api_url} unavailable")
2023-04-07 00:04:44 +02:00
return None
# Repo is for this plugin
if repo_name.lower() == name.lower():
MyPlugin = InstInfo(repo_name,
f'https://github.com/{repo_user}/{repo_name}',
api_url)
if not MyPlugin.get_inst_details():
2023-04-07 00:04:44 +02:00
return None
return MyPlugin
# Repo contains multiple plugins?
for x in json.loads(r.read().decode()):
if x["name"].lower() == name.lower():
# Look for the rest of the install details
# These are in lightningd/plugins directly
if 'lightningd/plugins/' in x['html_url']:
MyPlugin = InstInfo(x['name'],
'https://github.com/lightningd/plugins',
x['git_url'])
MyPlugin.subdir = x['name']
# submodules from another github repo
else:
MyPlugin = InstInfo(x['name'], x['html_url'], x['git_url'])
# Submodule URLs are appended with /tree/<commit hash>
if MyPlugin.repo.split('/')[-2] == 'tree':
MyPlugin.commit = MyPlugin.repo.split('/')[-1]
MyPlugin.repo = MyPlugin.repo.split('/tree/')[0]
logging.debug(f'repo using commit: {MyPlugin.commit}')
if not MyPlugin.get_inst_details():
logging.debug((f'Found plugin in {url}, but missing install '
'details'))
2023-04-07 00:04:44 +02:00
return None
return MyPlugin
2023-04-07 00:04:44 +02:00
return None
2022-10-21 20:51:13 +02:00
def _install_plugin(src: InstInfo) -> bool:
"""make sure the repo exists and clone it."""
logging.debug(f'Install requested from {src}.')
if RECKLESS_CONFIG is None:
print('error: reckless install directory unavailable')
sys.exit(2)
# Use a unique directory for each cloned repo.
clone_path = 'reckless-{}'.format(str(hash(os.times()))[-9:])
clone_path = Path(tempfile.gettempdir()) / clone_path
inst_path = Path(RECKLESS_CONFIG.reckless_dir) / src.name
if Path(clone_path).exists():
logging.debug(f'{clone_path} already exists - deleting')
shutil.rmtree(clone_path)
# clone git repository to /tmp/reckless-...
if ('http' in src.repo[:4]) or ('github.com' in src.repo):
if 'github.com' in src.repo:
url = f"{GITHUB_COM}" + src.repo.split("github.com")[-1]
else:
url = src.repo
git = Popen(['git', 'clone', url, str(clone_path)],
stdout=PIPE, stderr=PIPE)
git.wait()
if git.returncode != 0:
logging.debug(git.stderr.read().decode())
if Path(clone_path).exists():
remove_dir(clone_path)
print('Error: Failed to clone repo')
return False
plugin_path = clone_path
if src.subdir is not None:
plugin_path = Path(clone_path) / src.subdir
if src.commit:
logging.debug(f"Checking out commit {src.commit}")
checkout = Popen(['git', 'checkout', src.commit],
cwd=str(plugin_path), stdout=PIPE, stderr=PIPE)
checkout.wait()
if checkout.returncode != 0:
print(f'failed to checkout referenced commit {src.commit}')
return False
2022-10-19 00:44:53 +02:00
# Find a suitable installer
for inst_method in INSTALLERS:
if not (inst_method.installable() and inst_method.executable()):
continue
if inst_method.dependency_file is not None:
if inst_method.dependency_file not in os.listdir(plugin_path):
continue
logging.debug(f"using installer {inst_method.name}")
INSTALLER = inst_method
break
# try it out
if INSTALLER and INSTALLER.dependency_call:
for call in INSTALLER.dependency_call:
logging.debug(f"Install: invoking '{call}'")
if logging.root.level < logging.WARNING:
pip = Popen(call, cwd=plugin_path, text=True)
else:
pip = Popen(call, cwd=plugin_path, stdout=PIPE, stderr=PIPE,
text=True)
pip.wait()
# FIXME: handle output of multiple calls
if pip.returncode == 0:
print('dependencies installed successfully')
else:
print('error encountered installing dependencies')
if pip.stdout:
logging.debug(pip.stdout.read())
return False
test_log = []
try:
test = run([Path(plugin_path).joinpath(src.entry)],
cwd=str(plugin_path), stdout=PIPE, stderr=PIPE,
text=True, timeout=3)
for line in test.stderr:
test_log.append(line.strip('\n'))
returncode = test.returncode
except TimeoutExpired:
# If the plugin is still running, it's assumed to be okay.
returncode = 0
pass
if returncode != 0:
logging.debug("plugin testing error:")
for line in test_log:
logging.debug(f' {line}')
print('plugin testing failed')
return False
# Find this cute little plugin a forever home
shutil.copytree(str(plugin_path), inst_path)
print(f'plugin installed: {inst_path}')
remove_dir(clone_path)
return True
def install(plugin_name: str):
"""downloads plugin from source repos, installs and activates plugin"""
assert isinstance(plugin_name, str)
src = search(plugin_name)
if src:
logging.debug(f'Retrieving {src.name} from {src.repo}')
if not _install_plugin(src):
print('installation aborted')
sys.exit(1)
# Match case of the containing directory
for dirname in os.listdir(RECKLESS_CONFIG.reckless_dir):
if dirname.lower() == src.name.lower():
inst_path = Path(RECKLESS_CONFIG.reckless_dir)
inst_path = inst_path / dirname / src.entry
RECKLESS_CONFIG.enable_plugin(inst_path)
enable(src.name)
return
print(('dynamic activation failed: '
f'{src.name} not found in reckless directory'))
sys.exit(1)
def uninstall(plugin_name: str):
"""disables plugin and deletes the plugin's reckless dir"""
assert isinstance(plugin_name, str)
logging.debug(f'Uninstalling plugin {plugin_name}')
disable(plugin_name)
inst = InferInstall(plugin_name)
if not Path(inst.entry).exists():
print(f'cannot find installed plugin at expected path {inst.entry}')
sys.exit(1)
logging.debug(f'looking for {str(Path(inst.entry).parent)}')
if remove_dir(str(Path(inst.entry).parent)):
print(f"{inst.name} uninstalled successfully.")
def search(plugin_name: str) -> InstInfo:
"""searches plugin index for plugin"""
ordered_repos = RECKLESS_SOURCES
for r in RECKLESS_SOURCES:
# Search repos named after the plugin first
if r.split('/')[-1].lower() == plugin_name.lower():
ordered_repos.remove(r)
ordered_repos.insert(0, r)
for r in ordered_repos:
p = _search_repo(plugin_name, r)
if p:
print(f"found {p.name} in repo: {p.repo}")
logging.debug(f"entry: {p.entry}")
if p.subdir:
logging.debug(f'sub-directory: {p.subdir}')
return p
print(f'Unable to locate source for plugin {plugin_name}')
class RPCError(Exception):
"""lightning-cli fails to connect to lightningd RPC"""
def __init__(self, err):
self.err = err
def __str__(self):
return 'RPCError({self.err})'
class CLIError(Exception):
"""lightningd error response"""
def __init__(self, code, message):
self.code = code
self.message = message
def __str__(self):
return f'CLIError({self.code} {self.message})'
def lightning_cli(*args, timeout=15) -> dict:
# CLI commands will be added to any necessary options
cmd = LIGHTNING_CLI_CALL.copy()
cmd.extend(args)
clncli = Popen(cmd, stdout=PIPE, stderr=PIPE)
clncli.wait(timeout=timeout)
out = clncli.stdout.read().decode()
if len(out) > 0 and out[0] == '{':
# If all goes well, a json object is typically returned
out = json.loads(out.replace('\n', ''))
else:
# help, -V, etc. may not return json, so stash it here.
out = {'content': out}
if clncli.returncode == 0:
return out
if clncli.returncode == 1:
# RPC doesn't like our input
# output contains 'code' and 'message'
raise CLIError(out['code'], out['message'])
if clncli.returncode == 2:
# RPC not available - lightningd not running or using alternate config
err = clncli.stderr.read().decode()
raise RPCError(err)
def enable(plugin_name: str):
"""dynamically activates plugin and adds to config (persistent)"""
assert isinstance(plugin_name, str)
inst = InferInstall(plugin_name)
path = inst.entry
if not Path(path).exists():
print(f'cannot find installed plugin at expected path {path}')
sys.exit(1)
logging.debug(f'activating {plugin_name}')
try:
lightning_cli('plugin', 'start', path)
except CLIError as err:
if 'already registered' in err.message:
logging.debug(f'{inst.name} is already running')
else:
print(f'reckless: {inst.name} failed to start!')
raise err
except RPCError:
logging.debug(('lightningd rpc unavailable. '
'Skipping dynamic activation.'))
RECKLESS_CONFIG.enable_plugin(path)
print(f'{inst.name} enabled')
def disable(plugin_name: str):
"""reckless disable <plugin>
deactivates an installed plugin"""
assert isinstance(plugin_name, str)
inst = InferInstall(plugin_name)
path = inst.entry
if not Path(path).exists():
sys.stderr.write(f'Could not find plugin at {path}\n')
sys.exit(1)
logging.debug(f'deactivating {plugin_name}')
try:
lightning_cli('plugin', 'stop', path)
except CLIError as err:
if err.code == -32602:
logging.debug('plugin not currently running')
else:
print('lightning-cli plugin stop failed')
raise err
except RPCError:
logging.debug(('lightningd rpc unavailable. '
'Skipping dynamic deactivation.'))
RECKLESS_CONFIG.disable_plugin(path)
print(f'{inst.name} disabled')
2022-10-21 20:51:13 +02:00
def load_config(reckless_dir: Union[str, None] = None,
network: str = 'bitcoin') -> Config:
"""Initial directory discovery and config file creation."""
net_conf = None
# Does the lightning-cli already reference an explicit config?
try:
active_config = lightning_cli('listconfigs', timeout=3)['configs']
if 'conf' in active_config:
net_conf = LightningBitcoinConfig(path=active_config['conf']['value_str'])
except RPCError:
pass
if reckless_dir is None:
reckless_dir = Path(LIGHTNING_DIR) / 'reckless'
else:
if not os.path.isabs(reckless_dir):
reckless_dir = Path.cwd() / reckless_dir
if LIGHTNING_CONFIG:
network_path = LIGHTNING_CONFIG
else:
network_path = Path(LIGHTNING_DIR) / network / 'config'
reck_conf_path = Path(reckless_dir) / f'{network}-reckless.conf'
if net_conf:
if str(network_path) != net_conf.conf_fp:
print('error: reckless configuration does not match lightningd:\n'
f'reckless network config path: {network_path}\n'
f'lightningd active config: {net_conf.conf_fp}')
sys.exit(1)
else:
# The network-specific config file (bitcoin by default)
net_conf = LightningBitcoinConfig(path=network_path)
# Reckless manages plugins here.
try:
reckless_conf = RecklessConfig(path=reck_conf_path)
except FileNotFoundError:
print('Error: reckless config file could not be written: ',
str(reck_conf_path))
sys.exit(1)
if not net_conf:
print('Error: could not load or create the network specific lightningd'
' config (default .lightning/bitcoin)')
sys.exit(1)
net_conf.editConfigFile(f'include {reckless_conf.conf_fp}', None)
return reckless_conf
2022-10-21 20:51:13 +02:00
def get_sources_file() -> str:
return Path(RECKLESS_DIR) / '.sources'
2022-10-21 20:51:13 +02:00
def sources_from_file() -> list:
sources_file = get_sources_file()
read_sources = []
with open(sources_file, 'r') as f:
for src in f.readlines():
if len(src.strip()) > 0:
read_sources.append(src.strip())
return read_sources
2022-10-21 20:51:13 +02:00
def loadSources() -> list:
"""Look for the repo sources file."""
sources_file = get_sources_file()
# This would have been created if possible
if not Path(sources_file).exists():
logging.debug('Warning: Reckless requires write access')
Config(path=str(sources_file),
default_text='https://github.com/lightningd/plugins')
return ['https://github.com/lightningd/plugins']
return sources_from_file()
def add_source(src: str):
"""Additional git repositories, directories, etc. are passed here."""
assert isinstance(src, str)
# Is it a file?
maybe_path = os.path.realpath(src)
if Path(maybe_path).exists():
# FIXME: This should handle either a directory or a git repo
if os.path.isdir(maybe_path):
print(f'local sources not yet supported: {src}')
elif 'github.com' in src:
my_file = Config(path=str(get_sources_file()),
default_text='https://github.com/lightningd/plugins')
my_file.editConfigFile(src, None)
else:
print(f'failed to add source {src}')
def remove_source(src: str):
"""Remove a source from the sources file."""
assert isinstance(src, str)
if src in sources_from_file():
my_file = Config(path=get_sources_file(),
default_text='https://github.com/lightningd/plugins')
my_file.editConfigFile(None, src)
print('plugin source removed')
else:
print(f'source not found: {src}')
def list_source():
"""Provide the user with all stored source repositories."""
for src in sources_from_file():
print(src)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# This default depends on the .lightning directory
parser.add_argument('-d', '--reckless-dir',
help='specify a data directory for reckless to use',
type=str, default=None)
parser.add_argument('-l', '--lightning',
help='lightning data directory (default:~/.lightning)',
type=str,
default=Path.home().joinpath('.lightning'))
parser.add_argument('-c', '--conf',
help=' config file used by lightningd',
type=str,
default=None)
parser.add_argument('-r', '--regtest', action='store_true')
parser.add_argument('--network',
help="specify a network to use (default: bitcoin)",
type=str)
parser.add_argument('-v', '--verbose', action="store_const",
dest="loglevel", const=logging.DEBUG,
default=logging.WARNING)
cmd1 = parser.add_subparsers(dest='cmd1', help='command',
required=True)
install_cmd = cmd1.add_parser('install', help='search for and install a '
'plugin, then test and activate')
install_cmd.add_argument('targets', type=str, nargs='*')
install_cmd.set_defaults(func=install)
uninstall_cmd = cmd1.add_parser('uninstall', help='deactivate a plugin '
'and remove it from the directory')
uninstall_cmd.add_argument('targets', type=str, nargs='*')
uninstall_cmd.set_defaults(func=uninstall)
search_cmd = cmd1.add_parser('search', help='search for a plugin from '
'the available source repositories')
search_cmd.add_argument('targets', type=str, nargs='*')
search_cmd.set_defaults(func=search)
enable_cmd = cmd1.add_parser('enable', help='dynamically enable a plugin '
'and update config')
enable_cmd.add_argument('targets', type=str, nargs='*')
enable_cmd.set_defaults(func=enable)
disable_cmd = cmd1.add_parser('disable', help='disable a plugin')
disable_cmd.add_argument('targets', type=str, nargs='*')
disable_cmd.set_defaults(func=disable)
source_parser = cmd1.add_parser('source', help='manage plugin search '
'sources')
source_subs = source_parser.add_subparsers(dest='source_subs',
required=True)
list_parse = source_subs.add_parser('list', help='list available plugin '
'sources (repositories)')
list_parse.set_defaults(func=list_source)
source_add = source_subs.add_parser('add', help='add a source repository')
source_add.add_argument('targets', type=str, nargs='*')
source_add.set_defaults(func=add_source)
source_rem = source_subs.add_parser('remove', aliases=['rem', 'rm'],
help='remove a plugin source '
'repository')
source_rem.add_argument('targets', type=str, nargs='*')
source_rem.set_defaults(func=remove_source)
help_cmd = cmd1.add_parser('help', help='for contextual help, use '
'"reckless <cmd> -h"')
help_cmd.add_argument('targets', type=str, nargs='*')
help_cmd.set_defaults(func=help_alias)
args = parser.parse_args()
NETWORK = 'regtest' if args.regtest else 'bitcoin'
SUPPORTED_NETWORKS = ['bitcoin', 'regtest', 'liquid', 'liquid-regtest',
'litecoin', 'signet', 'testnet']
if args.network:
if args.network in SUPPORTED_NETWORKS:
NETWORK = args.network
else:
print(f"Error: {args.network} network not supported")
LIGHTNING_DIR = Path(args.lightning)
# This env variable is set under CI testing
LIGHTNING_CLI_CALL = [os.environ.get('LIGHTNING_CLI')]
if LIGHTNING_CLI_CALL == [None]:
LIGHTNING_CLI_CALL = ['lightning-cli']
if NETWORK != 'bitcoin':
LIGHTNING_CLI_CALL.append(f'--network={NETWORK}')
if LIGHTNING_DIR != Path.home().joinpath('.lightning'):
LIGHTNING_CLI_CALL.append(f'--lightning-dir={LIGHTNING_DIR}')
if args.reckless_dir:
RECKLESS_DIR = args.reckless_dir
else:
RECKLESS_DIR = Path(LIGHTNING_DIR) / 'reckless'
LIGHTNING_CONFIG = args.conf
RECKLESS_CONFIG = load_config(reckless_dir=RECKLESS_DIR,
network=NETWORK)
RECKLESS_SOURCES = loadSources()
API_GITHUB_COM = 'https://api.github.com'
GITHUB_COM = 'https://github.com'
# Used for blackbox testing to avoid hitting github servers
if 'REDIR_GITHUB_API' in os.environ:
API_GITHUB_COM = os.environ['REDIR_GITHUB_API']
if 'REDIR_GITHUB' in os.environ:
GITHUB_COM = os.environ['REDIR_GITHUB']
logging.root.setLevel(args.loglevel)
if 'targets' in args:
# FIXME: Catch missing argument
if args.func.__name__ == 'help_alias':
args.func(args.targets)
sys.exit(0)
for target in args.targets:
args.func(target)
else:
args.func()