connectd: listen on ports for which we should spawn a proxy.

If the port is set, we spawn it (lightning_websocketd) on any
connection to that port.  That means websocketd is a per-peer daemon,
but it means every other daemon uses the connection normally (it's
just actually talking to websocketd instead of the client directly).

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2021-10-15 16:19:05 +10:30 committed by Christian Decker
parent b013b3ab0c
commit f78184ce46
6 changed files with 225 additions and 27 deletions

View File

@ -33,9 +33,13 @@
#include <connectd/tor.h>
#include <connectd/tor_autoservice.h>
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <netinet/in.h>
#include <sodium.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <wire/wire_sync.h>
/*~ We are passed two file descriptors when exec'ed from `lightningd`: the
@ -127,6 +131,12 @@ struct daemon {
/* Our features, as lightningd told us */
struct feature_set *our_features;
/* Subdaemon to proxy websocket requests. */
char *websocket_helper;
/* If non-zero, port to listen for websocket connections. */
u16 websocket_port;
};
/* Peers we're trying to reach: we iterate through addrs until we succeed
@ -529,11 +539,10 @@ static void conn_timeout(struct io_conn *conn)
io_close(conn);
}
/*~ When we get a connection in we set up its network address then call
* handshake.c to set up the crypto state. */
static struct io_plan *connection_in(struct io_conn *conn, struct daemon *daemon)
/*~ So, where are you from? */
static bool get_remote_address(struct io_conn *conn,
struct wireaddr_internal *addr)
{
struct wireaddr_internal addr;
struct sockaddr_storage s = {};
socklen_t len = sizeof(s);
@ -541,28 +550,44 @@ static struct io_plan *connection_in(struct io_conn *conn, struct daemon *daemon
if (getpeername(io_conn_fd(conn), (struct sockaddr *)&s, &len) != 0) {
status_debug("Failed to get peername for incoming conn: %s",
strerror(errno));
return io_close(conn);
return false;
}
if (s.ss_family == AF_INET6) {
struct sockaddr_in6 *s6 = (void *)&s;
addr.itype = ADDR_INTERNAL_WIREADDR;
wireaddr_from_ipv6(&addr.u.wireaddr,
addr->itype = ADDR_INTERNAL_WIREADDR;
wireaddr_from_ipv6(&addr->u.wireaddr,
&s6->sin6_addr, ntohs(s6->sin6_port));
} else if (s.ss_family == AF_INET) {
struct sockaddr_in *s4 = (void *)&s;
addr.itype = ADDR_INTERNAL_WIREADDR;
wireaddr_from_ipv4(&addr.u.wireaddr,
addr->itype = ADDR_INTERNAL_WIREADDR;
wireaddr_from_ipv4(&addr->u.wireaddr,
&s4->sin_addr, ntohs(s4->sin_port));
} else if (s.ss_family == AF_UNIX) {
struct sockaddr_un *sun = (void *)&s;
addr.itype = ADDR_INTERNAL_SOCKNAME;
memcpy(addr.u.sockname, sun->sun_path, sizeof(sun->sun_path));
addr->itype = ADDR_INTERNAL_SOCKNAME;
memcpy(addr->u.sockname, sun->sun_path, sizeof(sun->sun_path));
} else {
status_broken("Unknown socket type %i for incoming conn",
s.ss_family);
return io_close(conn);
return false;
}
return true;
}
/*~ As so common in C, we need to bundle two args into a callback, so we
* allocate a temporary structure to hold them: */
struct conn_in {
struct wireaddr_internal addr;
struct daemon *daemon;
};
/*~ Once we've got a connection in, we set it up here (whether it's via the
* websocket proxy, or direct). */
static struct io_plan *conn_in(struct io_conn *conn,
struct conn_in *conn_in_arg)
{
struct daemon *daemon = conn_in_arg->daemon;
/* If they don't complete handshake in reasonable time, hang up */
notleak(new_reltimer(&daemon->timers, conn,
@ -574,10 +599,122 @@ static struct io_plan *connection_in(struct io_conn *conn, struct daemon *daemon
* Note, again, the notleak() to avoid our simplistic leak detection
* code from thinking `conn` (which we don't keep a pointer to) is
* leaked */
return responder_handshake(notleak(conn), &daemon->mykey, &addr,
return responder_handshake(notleak(conn), &daemon->mykey,
&conn_in_arg->addr,
handshake_in_success, daemon);
}
/*~ When we get a direct connection in we set up its network address
* then call handshake.c to set up the crypto state. */
static struct io_plan *connection_in(struct io_conn *conn,
struct daemon *daemon)
{
struct conn_in conn_in_arg;
if (!get_remote_address(conn, &conn_in_arg.addr))
return io_close(conn);
conn_in_arg.daemon = daemon;
return conn_in(conn, &conn_in_arg);
}
/*~ <hello>I speak web socket</hello>.
*
* Actually that's dumb, websocket (aka rfc6455) looks nothing like that. */
static struct io_plan *websocket_connection_in(struct io_conn *conn,
struct daemon *daemon)
{
int childmsg[2], execfail[2];
pid_t childpid;
int err;
struct conn_in conn_in_arg;
if (!get_remote_address(conn, &conn_in_arg.addr))
return io_close(conn);
status_debug("Websocket connection in from %s",
type_to_string(tmpctx, struct wireaddr_internal,
&conn_in_arg.addr));
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, childmsg) != 0)
goto fail;
if (pipe(execfail) != 0)
goto close_msgfd_fail;
if (fcntl(execfail[1], F_SETFD, fcntl(execfail[1], F_GETFD)
| FD_CLOEXEC) < 0)
goto close_execfail_fail;
childpid = fork();
if (childpid < 0)
goto close_execfail_fail;
if (childpid == 0) {
size_t max;
close(childmsg[0]);
close(execfail[0]);
/* Attach remote socket to stdin. */
if (dup2(io_conn_fd(conn), STDIN_FILENO) == -1)
goto child_errno_fail;
/* Attach our socket to stdout. */
if (dup2(childmsg[1], STDOUT_FILENO) == -1)
goto child_errno_fail;
/* Make (fairly!) sure all other fds are closed. */
max = sysconf(_SC_OPEN_MAX);
for (size_t i = STDERR_FILENO + 1; i < max; i++)
close(i);
/* Tell websocket helper what we read so far. */
execlp(daemon->websocket_helper, daemon->websocket_helper,
NULL);
child_errno_fail:
err = errno;
/* Gcc's warn-unused-result fail. */
if (write(execfail[1], &err, sizeof(err))) {
;
}
exit(127);
}
close(childmsg[1]);
close(execfail[1]);
/* Child will close this without writing on successful exec. */
if (read(execfail[0], &err, sizeof(err)) == sizeof(err)) {
close(execfail[0]);
waitpid(childpid, NULL, 0);
status_broken("Exec of helper %s failed: %s",
daemon->websocket_helper, strerror(err));
errno = err;
return io_close(conn);
}
close(execfail[0]);
/* New connection actually talks to proxy process. */
conn_in_arg.daemon = daemon;
io_new_conn(tal_parent(conn), childmsg[0], conn_in, &conn_in_arg);
/* Abandon original (doesn't close since child has dup'd fd) */
return io_close(conn);
close_execfail_fail:
close_noerr(execfail[0]);
close_noerr(execfail[1]);
close_msgfd_fail:
close_noerr(childmsg[0]);
close_noerr(childmsg[1]);
fail:
status_broken("Preparation of helper failed: %s",
strerror(errno));
return io_close(conn);
}
/*~ These are the mirror functions for the connecting-out case. */
static struct io_plan *handshake_out_success(struct io_conn *conn,
const struct pubkey *key,
@ -906,9 +1043,14 @@ struct listen_fd {
* covers IPv4 too. Normally we'd consider failing to listen on a
* port to be fatal, so we note this when setting up addresses. */
bool mayfail;
/* Callback to use for the listening: either connection_in, or for
* our much-derided WebSocket ability, websocket_connection_in! */
struct io_plan *(*in_cb)(struct io_conn *conn, struct daemon *daemon);
};
static void add_listen_fd(struct daemon *daemon, int fd, bool mayfail)
static void add_listen_fd(struct daemon *daemon, int fd, bool mayfail,
struct io_plan *(*in_cb)(struct io_conn *,
struct daemon *))
{
/*~ utils.h contains a convenience macro tal_arr_expand which
* reallocates a tal_arr to make it one longer, then returns a pointer
@ -916,6 +1058,7 @@ static void add_listen_fd(struct daemon *daemon, int fd, bool mayfail)
struct listen_fd l;
l.fd = fd;
l.mayfail = mayfail;
l.in_cb = in_cb;
tal_arr_expand(&daemon->listen_fds, l);
}
@ -970,11 +1113,18 @@ fail:
/* Return true if it created socket successfully. */
static bool handle_wireaddr_listen(struct daemon *daemon,
const struct wireaddr *wireaddr,
bool mayfail)
bool mayfail,
bool websocket)
{
int fd;
struct sockaddr_in addr;
struct sockaddr_in6 addr6;
struct io_plan *(*in_cb)(struct io_conn *, struct daemon *);
if (websocket)
in_cb = websocket_connection_in;
else
in_cb = connection_in;
/* Note the use of a switch() over enum here, even though it must be
* IPv4 or IPv6 here; that will catch future changes. */
@ -984,9 +1134,10 @@ static bool handle_wireaddr_listen(struct daemon *daemon,
/* We might fail if IPv6 bound to port first */
fd = make_listen_fd(AF_INET, &addr, sizeof(addr), mayfail);
if (fd >= 0) {
status_debug("Created IPv4 listener on port %u",
status_debug("Created IPv4 %slistener on port %u",
websocket ? "websocket ": "",
wireaddr->port);
add_listen_fd(daemon, fd, mayfail);
add_listen_fd(daemon, fd, mayfail, in_cb);
return true;
}
return false;
@ -994,9 +1145,10 @@ static bool handle_wireaddr_listen(struct daemon *daemon,
wireaddr_to_ipv6(wireaddr, &addr6);
fd = make_listen_fd(AF_INET6, &addr6, sizeof(addr6), mayfail);
if (fd >= 0) {
status_debug("Created IPv6 listener on port %u",
status_debug("Created IPv6 %slistener on port %u",
websocket ? "websocket ": "",
wireaddr->port);
add_listen_fd(daemon, fd, mayfail);
add_listen_fd(daemon, fd, mayfail, in_cb);
return true;
}
return false;
@ -1122,7 +1274,7 @@ static struct wireaddr_internal *setup_listeners(const tal_t *ctx,
false);
status_debug("Created socket listener on file %s",
addrun.sun_path);
add_listen_fd(daemon, fd, false);
add_listen_fd(daemon, fd, false, connection_in);
/* We don't announce socket names, though we allow
* them to lazily specify --addr=/socket. */
add_binding(&binding, &wa);
@ -1147,7 +1299,7 @@ static struct wireaddr_internal *setup_listeners(const tal_t *ctx,
sizeof(wa.u.wireaddr.addr));
ipv6_ok = handle_wireaddr_listen(daemon, &wa.u.wireaddr,
true);
true, false);
if (ipv6_ok) {
add_binding(&binding, &wa);
if (announce
@ -1163,7 +1315,7 @@ static struct wireaddr_internal *setup_listeners(const tal_t *ctx,
sizeof(wa.u.wireaddr.addr));
/* OK if this fails, as long as one succeeds! */
if (handle_wireaddr_listen(daemon, &wa.u.wireaddr,
ipv6_ok)) {
ipv6_ok, false)) {
add_binding(&binding, &wa);
if (announce
&& public_address(daemon, &wa.u.wireaddr))
@ -1174,7 +1326,8 @@ static struct wireaddr_internal *setup_listeners(const tal_t *ctx,
}
/* This is a vanilla wireaddr as per BOLT #7 */
case ADDR_INTERNAL_WIREADDR:
handle_wireaddr_listen(daemon, &wa.u.wireaddr, false);
handle_wireaddr_listen(daemon, &wa.u.wireaddr,
false, false);
add_binding(&binding, &wa);
if (announce && public_address(daemon, &wa.u.wireaddr))
add_announcable(announcable, &wa.u.wireaddr);
@ -1188,6 +1341,38 @@ static struct wireaddr_internal *setup_listeners(const tal_t *ctx,
proposed_wireaddr[i].itype);
}
/* If we want websockets to match IPv4/v6, set it up now. */
if (daemon->websocket_port) {
bool announced_some = false;
struct wireaddr addr;
for (size_t i = 0; i < tal_count(binding); i++) {
/* Ignore UNIX sockets */
if (binding[i].itype != ADDR_INTERNAL_WIREADDR)
continue;
/* Override with websocket port */
addr = binding[i].u.wireaddr;
addr.port = daemon->websocket_port;
handle_wireaddr_listen(daemon, &addr, false, true);
announced_some = true;
/* FIXME: We don't report these bindings to
* lightningd, so they don't appear in
* getinfo. */
}
/* We add the websocket port to the announcement if it
* applies to any */
if (announced_some) {
wireaddr_from_websocket(&addr, daemon->websocket_port);
add_announcable(announcable, &addr);
}
}
/* FIXME: Websocket over Tor (difficult for autotor, since we need
* to use the same onion addr!) */
/* Now we have bindings, set up any Tor auto addresses: we will point
* it at the first bound IPv4 or IPv6 address we have. */
for (size_t i = 0; i < tal_count(proposed_wireaddr); i++) {
@ -1294,7 +1479,9 @@ static struct io_plan *connect_init(struct io_conn *conn,
&daemon->dev_allow_localhost, &daemon->use_dns,
&tor_password,
&daemon->use_v3_autotor,
&daemon->timeout_secs)) {
&daemon->timeout_secs,
&daemon->websocket_helper,
&daemon->websocket_port)) {
/* This is a helper which prints the type expected and the actual
* message, then exits (it should never be called!). */
master_badmsg(WIRE_CONNECTD_INIT, msg);
@ -1367,7 +1554,8 @@ static struct io_plan *connect_activate(struct io_conn *conn,
}
notleak(io_new_listener(daemon,
daemon->listen_fds[i].fd,
connection_in, daemon));
daemon->listen_fds[i].in_cb,
daemon));
}
}
/* Free, with NULL assignment just as an extra sanity check. */

View File

@ -18,6 +18,8 @@ msgdata,connectd_init,use_dns,bool,
msgdata,connectd_init,tor_password,wirestring,
msgdata,connectd_init,use_v3_autotor,bool,
msgdata,connectd_init,timeout_secs,u32,
msgdata,connectd_init,websocket_helper,wirestring,
msgdata,connectd_init,websocket_port,u16,
# Connectd->master, here are the addresses I bound, can announce.
msgtype,connectd_init_reply,2100

1 #include <common/cryptomsg.h>
18 msgdata,connectd_init,use_v3_autotor,bool,
19 msgdata,connectd_init,timeout_secs,u32,
20 # Connectd->master, here are the addresses I bound, can announce. msgdata,connectd_init,websocket_helper,wirestring,
21 msgdata,connectd_init,websocket_port,u16,
22 # Connectd->master, here are the addresses I bound, can announce.
23 msgtype,connectd_init_reply,2100
24 msgdata,connectd_init_reply,num_bindings,u16,
25 msgdata,connectd_init_reply,bindings,wireaddr_internal,num_bindings

View File

@ -857,7 +857,6 @@ static struct io_plan *act_two_responder(struct io_conn *conn,
return io_write(conn, &h->act2, ACT_TWO_SIZE, act_three_responder, h);
}
static struct io_plan *act_one_responder2(struct io_conn *conn,
struct handshake *h)
{

View File

@ -83,6 +83,9 @@ u8 fromwire_u8(const u8 **cursor UNNEEDED, size_t *max UNNEEDED)
/* Generated stub for fromwire_u8_array */
void fromwire_u8_array(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, u8 *arr UNNEEDED, size_t num UNNEEDED)
{ fprintf(stderr, "fromwire_u8_array called!\n"); abort(); }
/* Generated stub for notleak_ */
void *notleak_(const void *ptr UNNEEDED, bool plus_children UNNEEDED)
{ fprintf(stderr, "notleak_ called!\n"); abort(); }
/* Generated stub for towire */
void towire(u8 **pptr UNNEEDED, const void *data UNNEEDED, size_t len UNNEEDED)
{ fprintf(stderr, "towire called!\n"); abort(); }

View File

@ -83,6 +83,9 @@ u8 fromwire_u8(const u8 **cursor UNNEEDED, size_t *max UNNEEDED)
/* Generated stub for fromwire_u8_array */
void fromwire_u8_array(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, u8 *arr UNNEEDED, size_t num UNNEEDED)
{ fprintf(stderr, "fromwire_u8_array called!\n"); abort(); }
/* Generated stub for notleak_ */
void *notleak_(const void *ptr UNNEEDED, bool plus_children UNNEEDED)
{ fprintf(stderr, "notleak_ called!\n"); abort(); }
/* Generated stub for towire */
void towire(u8 **pptr UNNEEDED, const void *data UNNEEDED, size_t len UNNEEDED)
{ fprintf(stderr, "towire called!\n"); abort(); }

View File

@ -350,6 +350,7 @@ int connectd_init(struct lightningd *ld)
int hsmfd;
struct wireaddr_internal *wireaddrs = ld->proposed_wireaddr;
enum addr_listen_announce *listen_announce = ld->proposed_listen_announce;
const char *websocket_helper_path = "";
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0)
fatal("Could not socketpair for connectd<->gossipd");
@ -381,7 +382,9 @@ int connectd_init(struct lightningd *ld)
IFDEV(ld->dev_allow_localhost, false), ld->config.use_dns,
ld->tor_service_password ? ld->tor_service_password : "",
ld->config.use_v3_autotor,
ld->config.connection_timeout_secs);
ld->config.connection_timeout_secs,
websocket_helper_path,
0);
subd_req(ld->connectd, ld->connectd, take(msg), -1, 0,
connect_init_done, NULL);