Merge pull request #2240 from mempool/nymkappa/feature/clightning

Add clightning support to the lightning backend
This commit is contained in:
wiz 2022-08-02 23:52:53 +00:00 committed by GitHub
commit a43a65df2c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 430 additions and 23 deletions

View file

@ -0,0 +1,263 @@
// Imported from https://github.com/shesek/lightning-client-js
'use strict';
const methods = [
'addgossip',
'autocleaninvoice',
'check',
'checkmessage',
'close',
'connect',
'createinvoice',
'createinvoicerequest',
'createoffer',
'createonion',
'decode',
'decodepay',
'delexpiredinvoice',
'delinvoice',
'delpay',
'dev-listaddrs',
'dev-rescan-outputs',
'disableoffer',
'disconnect',
'estimatefees',
'feerates',
'fetchinvoice',
'fundchannel',
'fundchannel_cancel',
'fundchannel_complete',
'fundchannel_start',
'fundpsbt',
'getchaininfo',
'getinfo',
'getlog',
'getrawblockbyheight',
'getroute',
'getsharedsecret',
'getutxout',
'help',
'invoice',
'keysend',
'legacypay',
'listchannels',
'listconfigs',
'listforwards',
'listfunds',
'listinvoices',
'listnodes',
'listoffers',
'listpays',
'listpeers',
'listsendpays',
'listtransactions',
'multifundchannel',
'multiwithdraw',
'newaddr',
'notifications',
'offer',
'offerout',
'openchannel_abort',
'openchannel_bump',
'openchannel_init',
'openchannel_signed',
'openchannel_update',
'pay',
'payersign',
'paystatus',
'ping',
'plugin',
'reserveinputs',
'sendinvoice',
'sendonion',
'sendonionmessage',
'sendpay',
'sendpsbt',
'sendrawtransaction',
'setchannelfee',
'signmessage',
'signpsbt',
'stop',
'txdiscard',
'txprepare',
'txsend',
'unreserveinputs',
'utxopsbt',
'waitanyinvoice',
'waitblockheight',
'waitinvoice',
'waitsendpay',
'withdraw'
];
import EventEmitter from 'events';
import { existsSync, statSync } from 'fs';
import { createConnection, Socket } from 'net';
import { homedir } from 'os';
import path from 'path';
import { createInterface, Interface } from 'readline';
import logger from '../../../logger';
import { AbstractLightningApi } from '../lightning-api-abstract-factory';
import { ILightningApi } from '../lightning-api.interface';
import { convertAndmergeBidirectionalChannels, convertNode } from './clightning-convert';
class LightningError extends Error {
type: string = 'lightning';
message: string = 'lightning-client error';
constructor(error) {
super();
this.type = error.type;
this.message = error.message;
}
}
const defaultRpcPath = path.join(homedir(), '.lightning')
, fStat = (...p) => statSync(path.join(...p))
, fExists = (...p) => existsSync(path.join(...p))
export default class CLightningClient extends EventEmitter implements AbstractLightningApi {
private rpcPath: string;
private reconnectWait: number;
private reconnectTimeout;
private reqcount: number;
private client: Socket;
private rl: Interface;
private clientConnectionPromise: Promise<unknown>;
constructor(rpcPath = defaultRpcPath) {
if (!path.isAbsolute(rpcPath)) {
throw new Error('The rpcPath must be an absolute path');
}
if (!fExists(rpcPath) || !fStat(rpcPath).isSocket()) {
// network directory provided, use the lightning-rpc within in
if (fExists(rpcPath, 'lightning-rpc')) {
rpcPath = path.join(rpcPath, 'lightning-rpc');
}
// main data directory provided, default to using the bitcoin mainnet subdirectory
// to be removed in v0.2.0
else if (fExists(rpcPath, 'bitcoin', 'lightning-rpc')) {
logger.warn(`[CLightningClient] ${rpcPath}/lightning-rpc is missing, using the bitcoin mainnet subdirectory at ${rpcPath}/bitcoin instead.`)
logger.warn(`[CLightningClient] specifying the main lightning data directory is deprecated, please specify the network directory explicitly.\n`)
rpcPath = path.join(rpcPath, 'bitcoin', 'lightning-rpc')
}
}
logger.debug(`[CLightningClient] Connecting to ${rpcPath}`);
super();
this.rpcPath = rpcPath;
this.reconnectWait = 0.5;
this.reconnectTimeout = null;
this.reqcount = 0;
const _self = this;
this.client = createConnection(rpcPath);
this.rl = createInterface({ input: this.client })
this.clientConnectionPromise = new Promise<void>(resolve => {
_self.client.on('connect', () => {
logger.info(`[CLightningClient] Lightning client connected`);
_self.reconnectWait = 1;
resolve();
});
_self.client.on('end', () => {
logger.err('[CLightningClient] Lightning client connection closed, reconnecting');
_self.increaseWaitTime();
_self.reconnect();
});
_self.client.on('error', error => {
logger.err(`[CLightningClient] Lightning client connection error: ${error}`);
_self.emit('error', error);
_self.increaseWaitTime();
_self.reconnect();
});
});
this.rl.on('line', line => {
line = line.trim();
if (!line) {
return;
}
const data = JSON.parse(line);
// logger.debug(`[CLightningClient] #${data.id} <-- ${JSON.stringify(data.error || data.result)}`);
_self.emit('res:' + data.id, data);
});
}
increaseWaitTime(): void {
if (this.reconnectWait >= 16) {
this.reconnectWait = 16;
} else {
this.reconnectWait *= 2;
}
}
reconnect(): void {
const _self = this;
if (this.reconnectTimeout) {
return;
}
this.reconnectTimeout = setTimeout(() => {
logger.debug('[CLightningClient] Trying to reconnect...');
_self.client.connect(_self.rpcPath);
_self.reconnectTimeout = null;
}, this.reconnectWait * 1000);
}
call(method, args = []): Promise<any> {
const _self = this;
const callInt = ++this.reqcount;
const sendObj = {
jsonrpc: '2.0',
method,
params: args,
id: '' + callInt
};
// logger.debug(`[CLightningClient] #${callInt} --> ${method} ${args}`);
// Wait for the client to connect
return this.clientConnectionPromise
.then(() => new Promise((resolve, reject) => {
// Wait for a response
this.once('res:' + callInt, res => res.error == null
? resolve(res.result)
: reject(new LightningError(res.error))
);
// Send the command
_self.client.write(JSON.stringify(sendObj));
}));
}
async $getNetworkGraph(): Promise<ILightningApi.NetworkGraph> {
const listnodes: any[] = await this.call('listnodes');
const listchannels: any[] = await this.call('listchannels');
const channelsList = await convertAndmergeBidirectionalChannels(listchannels['channels']);
return {
nodes: listnodes['nodes'].map(node => convertNode(node)),
edges: channelsList,
};
}
}
const protify = s => s.replace(/-([a-z])/g, m => m[1].toUpperCase());
methods.forEach(k => {
CLightningClient.prototype[protify(k)] = function (...args: any) {
return this.call(k, args);
};
});

View file

@ -0,0 +1,112 @@
import { ILightningApi } from '../lightning-api.interface';
import FundingTxFetcher from '../../../tasks/lightning/sync-tasks/funding-tx-fetcher';
/**
* Convert a clightning "listnode" entry to a lnd node entry
*/
export function convertNode(clNode: any): ILightningApi.Node {
return {
alias: clNode.alias ?? '',
color: `#${clNode.color ?? ''}`,
features: [], // TODO parse and return clNode.feature
pub_key: clNode.nodeid,
addresses: clNode.addresses?.map((addr) => {
return {
network: addr.type,
addr: `${addr.address}:${addr.port}`
};
}),
last_update: clNode?.last_timestamp ?? 0,
};
}
/**
* Convert clightning "listchannels" response to lnd "describegraph.edges" format
*/
export async function convertAndmergeBidirectionalChannels(clChannels: any[]): Promise<ILightningApi.Channel[]> {
const consolidatedChannelList: ILightningApi.Channel[] = [];
const clChannelsDict = {};
const clChannelsDictCount = {};
for (const clChannel of clChannels) {
if (!clChannelsDict[clChannel.short_channel_id]) {
clChannelsDict[clChannel.short_channel_id] = clChannel;
clChannelsDictCount[clChannel.short_channel_id] = 1;
} else {
consolidatedChannelList.push(
await buildFullChannel(clChannel, clChannelsDict[clChannel.short_channel_id])
);
delete clChannelsDict[clChannel.short_channel_id];
clChannelsDictCount[clChannel.short_channel_id]++;
}
}
for (const short_channel_id of Object.keys(clChannelsDict)) {
consolidatedChannelList.push(await buildIncompleteChannel(clChannelsDict[short_channel_id]));
}
return consolidatedChannelList;
}
export function convertChannelId(channelId): string {
const s = channelId.split('x').map(part => parseInt(part));
return BigInt((s[0] << 40) | (s[1] << 16) | s[2]).toString();
}
/**
* Convert two clightning "getchannels" entries into a full a lnd "describegraph.edges" format
* In this case, clightning knows the channel policy for both nodes
*/
async function buildFullChannel(clChannelA: any, clChannelB: any): Promise<ILightningApi.Channel> {
const lastUpdate = Math.max(clChannelA.last_update ?? 0, clChannelB.last_update ?? 0);
const tx = await FundingTxFetcher.$fetchChannelOpenTx(clChannelA.short_channel_id);
const parts = clChannelA.short_channel_id.split('x');
const outputIdx = parts[2];
return {
channel_id: clChannelA.short_channel_id,
capacity: clChannelA.satoshis,
last_update: lastUpdate,
node1_policy: convertPolicy(clChannelA),
node2_policy: convertPolicy(clChannelB),
chan_point: `${tx.txid}:${outputIdx}`,
node1_pub: clChannelA.source,
node2_pub: clChannelB.source,
};
}
/**
* Convert one clightning "getchannels" entry into a full a lnd "describegraph.edges" format
* In this case, clightning knows the channel policy of only one node
*/
async function buildIncompleteChannel(clChannel: any): Promise<ILightningApi.Channel> {
const tx = await FundingTxFetcher.$fetchChannelOpenTx(clChannel.short_channel_id);
const parts = clChannel.short_channel_id.split('x');
const outputIdx = parts[2];
return {
channel_id: clChannel.short_channel_id,
capacity: clChannel.satoshis,
last_update: clChannel.last_update ?? 0,
node1_policy: convertPolicy(clChannel),
node2_policy: null,
chan_point: `${tx.txid}:${outputIdx}`,
node1_pub: clChannel.source,
node2_pub: clChannel.destination,
};
}
/**
* Convert a clightning "listnode" response to a lnd channel policy format
*/
function convertPolicy(clChannel: any): ILightningApi.RoutingPolicy {
return {
time_lock_delta: 0, // TODO
min_htlc: clChannel.htlc_minimum_msat.slice(0, -4),
max_htlc_msat: clChannel.htlc_maximum_msat.slice(0, -4),
fee_base_msat: clChannel.base_fee_millisatoshi,
fee_rate_milli_msat: clChannel.fee_per_millionth,
disabled: !clChannel.active,
last_update: clChannel.last_update ?? 0,
};
}

View file

@ -1,7 +1,5 @@
import { ILightningApi } from './lightning-api.interface';
export interface AbstractLightningApi {
$getNetworkInfo(): Promise<ILightningApi.NetworkInfo>;
$getNetworkGraph(): Promise<ILightningApi.NetworkGraph>;
$getInfo(): Promise<ILightningApi.Info>;
}

View file

@ -1,9 +1,12 @@
import config from '../../config';
import CLightningClient from './clightning/clightning-client';
import { AbstractLightningApi } from './lightning-api-abstract-factory';
import LndApi from './lnd/lnd-api';
function lightningApiFactory(): AbstractLightningApi {
switch (config.LIGHTNING.BACKEND) {
switch (config.LIGHTNING.ENABLED === true && config.LIGHTNING.BACKEND) {
case 'cln':
return new CLightningClient(config.CLIGHTNING.SOCKET);
case 'lnd':
default:
return new LndApi();

View file

@ -38,6 +38,9 @@ interface IConfig {
MACAROON_PATH: string;
REST_API_URL: string;
};
CLIGHTNING: {
SOCKET: string;
};
ELECTRUM: {
HOST: string;
PORT: number;
@ -186,6 +189,9 @@ const defaults: IConfig = {
'MACAROON_PATH': '',
'REST_API_URL': 'https://localhost:8080',
},
'CLIGHTNING': {
'SOCKET': '',
},
'SOCKS5PROXY': {
'ENABLED': false,
'USE_ONION': true,
@ -226,6 +232,7 @@ class Config implements IConfig {
BISQ: IConfig['BISQ'];
LIGHTNING: IConfig['LIGHTNING'];
LND: IConfig['LND'];
CLIGHTNING: IConfig['CLIGHTNING'];
SOCKS5PROXY: IConfig['SOCKS5PROXY'];
PRICE_DATA_SERVER: IConfig['PRICE_DATA_SERVER'];
EXTERNAL_DATA_SERVER: IConfig['EXTERNAL_DATA_SERVER'];
@ -244,6 +251,7 @@ class Config implements IConfig {
this.BISQ = configs.BISQ;
this.LIGHTNING = configs.LIGHTNING;
this.LND = configs.LND;
this.CLIGHTNING = configs.CLIGHTNING;
this.SOCKS5PROXY = configs.SOCKS5PROXY;
this.PRICE_DATA_SERVER = configs.PRICE_DATA_SERVER;
this.EXTERNAL_DATA_SERVER = configs.EXTERNAL_DATA_SERVER;

View file

@ -28,12 +28,13 @@ import nodesRoutes from './api/explorer/nodes.routes';
import channelsRoutes from './api/explorer/channels.routes';
import generalLightningRoutes from './api/explorer/general.routes';
import lightningStatsUpdater from './tasks/lightning/stats-updater.service';
import nodeSyncService from './tasks/lightning/node-sync.service';
import networkSyncService from './tasks/lightning/network-sync.service';
import statisticsRoutes from './api/statistics/statistics.routes';
import miningRoutes from './api/mining/mining-routes';
import bisqRoutes from './api/bisq/bisq.routes';
import liquidRoutes from './api/liquid/liquid.routes';
import bitcoinRoutes from './api/bitcoin/bitcoin.routes';
import fundingTxFetcher from "./tasks/lightning/sync-tasks/funding-tx-fetcher";
class Server {
private wss: WebSocket.Server | undefined;
@ -136,8 +137,9 @@ class Server {
}
if (config.LIGHTNING.ENABLED) {
nodeSyncService.$startService()
.then(() => lightningStatsUpdater.$startService());
fundingTxFetcher.$init()
.then(() => networkSyncService.$startService())
.then(() => lightningStatsUpdater.$startService());
}
this.server.listen(config.MEMPOOL.HTTP_PORT, () => {

View file

@ -5,11 +5,12 @@ import bitcoinClient from '../../api/bitcoin/bitcoin-client';
import bitcoinApi from '../../api/bitcoin/bitcoin-api-factory';
import config from '../../config';
import { IEsploraApi } from '../../api/bitcoin/esplora-api.interface';
import lightningApi from '../../api/lightning/lightning-api-factory';
import { ILightningApi } from '../../api/lightning/lightning-api.interface';
import { $lookupNodeLocation } from './sync-tasks/node-locations';
import lightningApi from '../../api/lightning/lightning-api-factory';
import { convertChannelId } from '../../api/lightning/clightning/clightning-convert';
class NodeSyncService {
class NetworkSyncService {
constructor() {}
public async $startService() {
@ -27,6 +28,11 @@ class NodeSyncService {
logger.info(`Updating nodes and channels...`);
const networkGraph = await lightningApi.$getNetworkGraph();
if (networkGraph.nodes.length === 0 || networkGraph.edges.length === 0) {
logger.info(`LN Network graph is empty, retrying in 10 seconds`);
setTimeout(this.$runUpdater, 10000);
return;
}
for (const node of networkGraph.nodes) {
await this.$saveNode(node);
@ -320,7 +326,7 @@ class NodeSyncService {
;`;
await DB.query(query, [
channel.channel_id,
this.toIntegerId(channel.channel_id),
this.toShortId(channel.channel_id),
channel.capacity,
txid,
@ -375,6 +381,10 @@ class NodeSyncService {
}
private async $setChannelsInactive(graphChannelsIds: string[]): Promise<void> {
if (graphChannelsIds.length === 0) {
return;
}
try {
await DB.query(`
UPDATE channels
@ -391,8 +401,7 @@ class NodeSyncService {
private async $saveNode(node: ILightningApi.Node): Promise<void> {
try {
const updatedAt = this.utcDateToMysql(node.last_update);
const sockets = node.addresses.map(a => a.addr).join(',');
const sockets = (node.addresses?.map(a => a.addr).join(',')) ?? '';
const query = `INSERT INTO nodes(
public_key,
first_seen,
@ -401,15 +410,16 @@ class NodeSyncService {
color,
sockets
)
VALUES (?, NOW(), ?, ?, ?, ?) ON DUPLICATE KEY UPDATE updated_at = ?, alias = ?, color = ?, sockets = ?;`;
VALUES (?, NOW(), FROM_UNIXTIME(?), ?, ?, ?)
ON DUPLICATE KEY UPDATE updated_at = FROM_UNIXTIME(?), alias = ?, color = ?, sockets = ?`;
await DB.query(query, [
node.pub_key,
updatedAt,
node.last_update,
node.alias,
node.color,
sockets,
updatedAt,
node.last_update,
node.alias,
node.color,
sockets,
@ -419,8 +429,19 @@ class NodeSyncService {
}
}
private toIntegerId(id: string): string {
if (config.LIGHTNING.BACKEND === 'lnd') {
return id;
}
return convertChannelId(id);
}
/** Decodes a channel id returned by lnd as uint64 to a short channel id */
private toShortId(id: string): string {
if (config.LIGHTNING.BACKEND === 'cln') {
return id;
}
const n = BigInt(id);
return [
n >> 40n, // nth block
@ -435,4 +456,4 @@ class NodeSyncService {
}
}
export default new NodeSyncService();
export default new NetworkSyncService();

View file

@ -1,8 +1,6 @@
import { existsSync, promises } from 'fs';
import bitcoinApiFactory from '../../../api/bitcoin/bitcoin-api-factory';
import bitcoinClient from '../../../api/bitcoin/bitcoin-client';
import config from '../../../config';
import DB from '../../../database';
import logger from '../../../logger';
const fsPromises = promises;
@ -16,12 +14,7 @@ class FundingTxFetcher {
private channelNewlyProcessed = 0;
public fundingTxCache = {};
async $fetchChannelsFundingTxs(channelIds: string[]): Promise<void> {
if (this.running) {
return;
}
this.running = true;
async $init(): Promise<void> {
// Load funding tx disk cache
if (Object.keys(this.fundingTxCache).length === 0 && existsSync(CACHE_FILE_NAME)) {
try {
@ -32,6 +25,13 @@ class FundingTxFetcher {
}
logger.debug(`Imported ${Object.keys(this.fundingTxCache).length} funding tx amount from the disk cache`);
}
}
async $fetchChannelsFundingTxs(channelIds: string[]): Promise<void> {
if (this.running) {
return;
}
this.running = true;
const globalTimer = new Date().getTime() / 1000;
let cacheTimer = new Date().getTime() / 1000;