Merge pull request #4909 from mempool/nymkappa/unix-socket

[server] express server also listens on unix socket
This commit is contained in:
softsimon 2024-04-10 13:41:14 +09:00 committed by GitHub
commit 317b1b6ac5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 101 additions and 33 deletions

View file

@ -35,7 +35,8 @@
"MAX_PUSH_TX_SIZE_WEIGHT": 4000000,
"ALLOW_UNREACHABLE": true,
"PRICE_UPDATES_PER_HOUR": 1,
"MAX_TRACKED_ADDRESSES": 100
"MAX_TRACKED_ADDRESSES": 100,
"UNIX_SOCKET_PATH": ""
},
"CORE_RPC": {
"HOST": "127.0.0.1",

View file

@ -7,6 +7,7 @@
"BLOCKS_SUMMARIES_INDEXING": true,
"GOGGLES_INDEXING": false,
"HTTP_PORT": 1,
"UNIX_SOCKET_PATH": "/mempool/socket/mempool-bitcoin-mainnet",
"SPAWN_CLUSTER_PROCS": 2,
"API_URL_PREFIX": "__MEMPOOL_API_URL_PREFIX__",
"AUTOMATIC_BLOCK_REINDEXING": false,

View file

@ -20,6 +20,7 @@ describe('Mempool Backend Config', () => {
BLOCKS_SUMMARIES_INDEXING: false,
GOGGLES_INDEXING: false,
HTTP_PORT: 8999,
UNIX_SOCKET_PATH: '',
SPAWN_CLUSTER_PROCS: 0,
API_URL_PREFIX: '/api/v1/',
AUTOMATIC_BLOCK_REINDEXING: false,

View file

@ -44,7 +44,7 @@ const wantable = [
];
class WebsocketHandler {
private wss: WebSocket.Server | undefined;
private webSocketServers: WebSocket.Server[] = [];
private extraInitProperties = {};
private numClients = 0;
@ -57,8 +57,8 @@ class WebsocketHandler {
constructor() { }
setWebsocketServer(wss: WebSocket.Server) {
this.wss = wss;
addWebsocketServer(wss: WebSocket.Server) {
this.webSocketServers.push(wss);
}
setExtraInitData(property: string, value: any) {
@ -102,11 +102,13 @@ class WebsocketHandler {
}
setupConnectionHandling() {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
}
this.wss.on('connection', (client: WebSocket, req) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.on('connection', (client: WebSocket, req) => {
this.numConnected++;
client['remoteAddress'] = req.headers['x-forwarded-for'] || req.socket?.remoteAddress || 'unknown';
client.on('error', (e) => {
@ -369,14 +371,17 @@ class WebsocketHandler {
}
});
});
}
}
handleNewDonation(id: string) {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
}
this.wss.clients.forEach((client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
@ -384,43 +389,50 @@ class WebsocketHandler {
client.send(JSON.stringify({ donationConfirmed: true }));
}
});
}
}
handleLoadingChanged(indicators: ILoadingIndicators) {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
}
this.updateSocketDataFields({ 'loadingIndicators': indicators });
const response = JSON.stringify({ loadingIndicators: indicators });
this.wss.clients.forEach((client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
client.send(response);
});
}
}
handleNewConversionRates(conversionRates: ApiPrice) {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
}
this.updateSocketDataFields({ 'conversions': conversionRates });
const response = JSON.stringify({ conversions: conversionRates });
this.wss.clients.forEach((client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
client.send(response);
});
}
}
handleNewStatistic(stats: OptimizedStatistic) {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
}
this.printLogs();
@ -429,7 +441,9 @@ class WebsocketHandler {
'live-2h-chart': stats
});
this.wss.clients.forEach((client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
@ -440,11 +454,12 @@ class WebsocketHandler {
client.send(response);
});
}
}
handleReorg(): void {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
}
const da = difficultyAdjustment.getDifficultyAdjustment();
@ -455,7 +470,9 @@ class WebsocketHandler {
'da': da?.previousTime ? da : undefined,
});
this.wss.clients.forEach((client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
@ -473,13 +490,14 @@ class WebsocketHandler {
client.send(this.serializeResponse(response));
}
});
}
}
async $handleMempoolChange(newMempool: { [txid: string]: MempoolTransactionExtended }, mempoolSize: number,
newTransactions: MempoolTransactionExtended[], deletedTransactions: MempoolTransactionExtended[], accelerationDelta: string[],
candidates?: GbtCandidates): Promise<void> {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
}
this.printLogs();
@ -552,7 +570,9 @@ class WebsocketHandler {
// pre-compute new tracked outspends
const outspendCache: { [txid: string]: { [vout: number]: { vin: number, txid: string } } } = {};
const trackedTxs = new Set<string>();
this.wss.clients.forEach((client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
if (client['track-tx']) {
trackedTxs.add(client['track-tx']);
}
@ -562,6 +582,7 @@ class WebsocketHandler {
}
}
});
}
if (trackedTxs.size > 0) {
for (const tx of newTransactions) {
for (let i = 0; i < tx.vin.length; i++) {
@ -581,7 +602,9 @@ class WebsocketHandler {
const addressCache = this.makeAddressCache(newTransactions);
const removedAddressCache = this.makeAddressCache(deletedTransactions);
this.wss.clients.forEach(async (client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach(async (client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
@ -821,11 +844,12 @@ class WebsocketHandler {
client.send(this.serializeResponse(response));
}
});
}
}
async handleNewBlock(block: BlockExtended, txIds: string[], transactions: MempoolTransactionExtended[]): Promise<void> {
if (!this.wss) {
throw new Error('WebSocket.Server is not set');
if (!this.webSocketServers.length) {
throw new Error('No WebSocket.Server have been set');
}
this.printLogs();
@ -969,7 +993,10 @@ class WebsocketHandler {
return responseCache[key];
}
this.wss.clients.forEach((client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
if (client.readyState !== WebSocket.OPEN) {
return;
}
@ -1150,6 +1177,7 @@ class WebsocketHandler {
client.send(this.serializeResponse(response));
}
});
}
await statistics.runStatistics();
}
@ -1231,13 +1259,15 @@ class WebsocketHandler {
}
private printLogs(): void {
if (this.wss) {
if (this.webSocketServers.length) {
let numTxSubs = 0;
let numTxsSubs = 0;
let numProjectedSubs = 0;
let numRbfSubs = 0;
this.wss.clients.forEach((client) => {
// TODO - Fix indentation after PR is merged
for (const server of this.webSocketServers) {
server.clients.forEach((client) => {
if (client['track-tx']) {
numTxSubs++;
}
@ -1251,8 +1281,12 @@ class WebsocketHandler {
numRbfSubs++;
}
})
}
const count = this.wss?.clients?.size || 0;
let count = 0;
for (const server of this.webSocketServers) {
count += server.clients?.size || 0;
}
const diff = count - this.numClients;
this.numClients = count;
logger.debug(`${count} websocket clients | ${this.numConnected} connected | ${this.numDisconnected} disconnected | (${diff >= 0 ? '+' : ''}${diff})`);

View file

@ -9,6 +9,7 @@ interface IConfig {
NETWORK: 'mainnet' | 'testnet' | 'signet' | 'liquid' | 'liquidtestnet';
BACKEND: 'esplora' | 'electrum' | 'none';
HTTP_PORT: number;
UNIX_SOCKET_PATH: string;
SPAWN_CLUSTER_PROCS: number;
API_URL_PREFIX: string;
POLL_RATE_MS: number;
@ -164,6 +165,7 @@ const defaults: IConfig = {
'NETWORK': 'mainnet',
'BACKEND': 'none',
'HTTP_PORT': 8999,
'UNIX_SOCKET_PATH': '',
'SPAWN_CLUSTER_PROCS': 0,
'API_URL_PREFIX': '/api/v1/',
'POLL_RATE_MS': 2000,

View file

@ -48,7 +48,9 @@ import aboutRoutes from './api/about.routes';
class Server {
private wss: WebSocket.Server | undefined;
private wssUnixSocket: WebSocket.Server | undefined;
private server: http.Server | undefined;
private serverUnixSocket: http.Server | undefined;
private app: Application;
private currentBackendRetryInterval = 1;
private backendRetryCount = 0;
@ -137,6 +139,10 @@ class Server {
this.server = http.createServer(this.app);
this.wss = new WebSocket.Server({ server: this.server });
if (config.MEMPOOL.UNIX_SOCKET_PATH) {
this.serverUnixSocket = http.createServer(this.app);
this.wssUnixSocket = new WebSocket.Server({ server: this.serverUnixSocket });
}
this.setUpWebsocketHandling();
@ -192,6 +198,16 @@ class Server {
logger.notice(`Mempool Server is running on port ${config.MEMPOOL.HTTP_PORT}`);
}
});
if (this.serverUnixSocket) {
this.serverUnixSocket.listen(config.MEMPOOL.UNIX_SOCKET_PATH, () => {
if (worker) {
logger.info(`Mempool Server worker #${process.pid} started`);
} else {
logger.notice(`Mempool Server is listening on ${config.MEMPOOL.UNIX_SOCKET_PATH}`);
}
});
}
}
async runMainUpdateLoop(): Promise<void> {
@ -265,8 +281,12 @@ class Server {
setUpWebsocketHandling(): void {
if (this.wss) {
websocketHandler.setWebsocketServer(this.wss);
websocketHandler.addWebsocketServer(this.wss);
}
if (this.wssUnixSocket) {
websocketHandler.addWebsocketServer(this.wssUnixSocket);
}
if (Common.isLiquid() && config.DATABASE.ENABLED) {
blocks.setNewBlockCallback(async () => {
try {
@ -338,6 +358,12 @@ class Server {
if (config.DATABASE.ENABLED) {
DB.releasePidLock();
}
this.server?.close();
this.serverUnixSocket?.close();
this.wss?.close();
if (this.wssUnixSocket) {
this.wssUnixSocket.close();
}
process.exit(code);
}

View file

@ -6,6 +6,7 @@
"OFFICIAL": __MEMPOOL_OFFICIAL__,
"HTTP_PORT": __MEMPOOL_HTTP_PORT__,
"SPAWN_CLUSTER_PROCS": __MEMPOOL_SPAWN_CLUSTER_PROCS__,
"UNIX_SOCKET_PATH": "__MEMPOOL_UNIX_SOCKET_PATH__",
"API_URL_PREFIX": "__MEMPOOL_API_URL_PREFIX__",
"POLL_RATE_MS": __MEMPOOL_POLL_RATE_MS__,
"CACHE_DIR": "__MEMPOOL_CACHE_DIR__",

View file

@ -7,6 +7,7 @@ __MEMPOOL_ENABLED__=${MEMPOOL_ENABLED:=true}
__MEMPOOL_OFFICIAL__=${MEMPOOL_OFFICIAL:=false}
__MEMPOOL_HTTP_PORT__=${BACKEND_HTTP_PORT:=8999}
__MEMPOOL_SPAWN_CLUSTER_PROCS__=${MEMPOOL_SPAWN_CLUSTER_PROCS:=0}
__MEMPOOL_UNIX_SOCKET_PATH__=${MEMPOOL_UNIX_SOCKET_PATH:=""}
__MEMPOOL_API_URL_PREFIX__=${MEMPOOL_API_URL_PREFIX:=/api/v1/}
__MEMPOOL_POLL_RATE_MS__=${MEMPOOL_POLL_RATE_MS:=2000}
__MEMPOOL_CACHE_DIR__=${MEMPOOL_CACHE_DIR:=./cache}
@ -160,6 +161,7 @@ sed -i "s!__MEMPOOL_ENABLED__!${__MEMPOOL_ENABLED__}!g" mempool-config.json
sed -i "s!__MEMPOOL_OFFICIAL__!${__MEMPOOL_OFFICIAL__}!g" mempool-config.json
sed -i "s!__MEMPOOL_HTTP_PORT__!${__MEMPOOL_HTTP_PORT__}!g" mempool-config.json
sed -i "s!__MEMPOOL_SPAWN_CLUSTER_PROCS__!${__MEMPOOL_SPAWN_CLUSTER_PROCS__}!g" mempool-config.json
sed -i "s!__MEMPOOL_UNIX_SOCKET_PATH__!${__MEMPOOL_UNIX_SOCKET_PATH__}!g" mempool-config.json
sed -i "s!__MEMPOOL_API_URL_PREFIX__!${__MEMPOOL_API_URL_PREFIX__}!g" mempool-config.json
sed -i "s!__MEMPOOL_POLL_RATE_MS__!${__MEMPOOL_POLL_RATE_MS__}!g" mempool-config.json
sed -i "s!__MEMPOOL_CACHE_DIR__!${__MEMPOOL_CACHE_DIR__}!g" mempool-config.json