From f78184ce46bce0fba7ddb014646c68290fac2231 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Fri, 15 Oct 2021 16:19:05 +1030 Subject: [PATCH] connectd: listen on ports for which we should spawn a proxy. If the port is set, we spawn it (lightning_websocketd) on any connection to that port. That means websocketd is a per-peer daemon, but it means every other daemon uses the connection normally (it's just actually talking to websocketd instead of the client directly). Signed-off-by: Rusty Russell --- connectd/connectd.c | 238 +++++++++++++++++++++++--- connectd/connectd_wire.csv | 2 + connectd/handshake.c | 1 - connectd/test/run-initiator-success.c | 3 + connectd/test/run-responder-success.c | 3 + lightningd/connect_control.c | 5 +- 6 files changed, 225 insertions(+), 27 deletions(-) diff --git a/connectd/connectd.c b/connectd/connectd.c index 892a531fc..68c767813 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -33,9 +33,13 @@ #include #include #include +#include #include #include #include +#include +#include +#include #include /*~ We are passed two file descriptors when exec'ed from `lightningd`: the @@ -127,6 +131,12 @@ struct daemon { /* Our features, as lightningd told us */ struct feature_set *our_features; + + /* Subdaemon to proxy websocket requests. */ + char *websocket_helper; + + /* If non-zero, port to listen for websocket connections. */ + u16 websocket_port; }; /* Peers we're trying to reach: we iterate through addrs until we succeed @@ -529,11 +539,10 @@ static void conn_timeout(struct io_conn *conn) io_close(conn); } -/*~ When we get a connection in we set up its network address then call - * handshake.c to set up the crypto state. */ -static struct io_plan *connection_in(struct io_conn *conn, struct daemon *daemon) +/*~ So, where are you from? */ +static bool get_remote_address(struct io_conn *conn, + struct wireaddr_internal *addr) { - struct wireaddr_internal addr; struct sockaddr_storage s = {}; socklen_t len = sizeof(s); @@ -541,28 +550,44 @@ static struct io_plan *connection_in(struct io_conn *conn, struct daemon *daemon if (getpeername(io_conn_fd(conn), (struct sockaddr *)&s, &len) != 0) { status_debug("Failed to get peername for incoming conn: %s", strerror(errno)); - return io_close(conn); + return false; } if (s.ss_family == AF_INET6) { struct sockaddr_in6 *s6 = (void *)&s; - addr.itype = ADDR_INTERNAL_WIREADDR; - wireaddr_from_ipv6(&addr.u.wireaddr, + addr->itype = ADDR_INTERNAL_WIREADDR; + wireaddr_from_ipv6(&addr->u.wireaddr, &s6->sin6_addr, ntohs(s6->sin6_port)); } else if (s.ss_family == AF_INET) { struct sockaddr_in *s4 = (void *)&s; - addr.itype = ADDR_INTERNAL_WIREADDR; - wireaddr_from_ipv4(&addr.u.wireaddr, + addr->itype = ADDR_INTERNAL_WIREADDR; + wireaddr_from_ipv4(&addr->u.wireaddr, &s4->sin_addr, ntohs(s4->sin_port)); } else if (s.ss_family == AF_UNIX) { struct sockaddr_un *sun = (void *)&s; - addr.itype = ADDR_INTERNAL_SOCKNAME; - memcpy(addr.u.sockname, sun->sun_path, sizeof(sun->sun_path)); + addr->itype = ADDR_INTERNAL_SOCKNAME; + memcpy(addr->u.sockname, sun->sun_path, sizeof(sun->sun_path)); } else { status_broken("Unknown socket type %i for incoming conn", s.ss_family); - return io_close(conn); + return false; } + return true; +} + +/*~ As so common in C, we need to bundle two args into a callback, so we + * allocate a temporary structure to hold them: */ +struct conn_in { + struct wireaddr_internal addr; + struct daemon *daemon; +}; + +/*~ Once we've got a connection in, we set it up here (whether it's via the + * websocket proxy, or direct). */ +static struct io_plan *conn_in(struct io_conn *conn, + struct conn_in *conn_in_arg) +{ + struct daemon *daemon = conn_in_arg->daemon; /* If they don't complete handshake in reasonable time, hang up */ notleak(new_reltimer(&daemon->timers, conn, @@ -574,10 +599,122 @@ static struct io_plan *connection_in(struct io_conn *conn, struct daemon *daemon * Note, again, the notleak() to avoid our simplistic leak detection * code from thinking `conn` (which we don't keep a pointer to) is * leaked */ - return responder_handshake(notleak(conn), &daemon->mykey, &addr, + return responder_handshake(notleak(conn), &daemon->mykey, + &conn_in_arg->addr, handshake_in_success, daemon); } +/*~ When we get a direct connection in we set up its network address + * then call handshake.c to set up the crypto state. */ +static struct io_plan *connection_in(struct io_conn *conn, + struct daemon *daemon) +{ + struct conn_in conn_in_arg; + + if (!get_remote_address(conn, &conn_in_arg.addr)) + return io_close(conn); + + conn_in_arg.daemon = daemon; + return conn_in(conn, &conn_in_arg); +} + +/*~ I speak web socket. + * + * Actually that's dumb, websocket (aka rfc6455) looks nothing like that. */ +static struct io_plan *websocket_connection_in(struct io_conn *conn, + struct daemon *daemon) +{ + int childmsg[2], execfail[2]; + pid_t childpid; + int err; + struct conn_in conn_in_arg; + + if (!get_remote_address(conn, &conn_in_arg.addr)) + return io_close(conn); + + status_debug("Websocket connection in from %s", + type_to_string(tmpctx, struct wireaddr_internal, + &conn_in_arg.addr)); + + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, childmsg) != 0) + goto fail; + + if (pipe(execfail) != 0) + goto close_msgfd_fail; + + if (fcntl(execfail[1], F_SETFD, fcntl(execfail[1], F_GETFD) + | FD_CLOEXEC) < 0) + goto close_execfail_fail; + + childpid = fork(); + if (childpid < 0) + goto close_execfail_fail; + + if (childpid == 0) { + size_t max; + close(childmsg[0]); + close(execfail[0]); + + /* Attach remote socket to stdin. */ + if (dup2(io_conn_fd(conn), STDIN_FILENO) == -1) + goto child_errno_fail; + + /* Attach our socket to stdout. */ + if (dup2(childmsg[1], STDOUT_FILENO) == -1) + goto child_errno_fail; + + /* Make (fairly!) sure all other fds are closed. */ + max = sysconf(_SC_OPEN_MAX); + for (size_t i = STDERR_FILENO + 1; i < max; i++) + close(i); + + /* Tell websocket helper what we read so far. */ + execlp(daemon->websocket_helper, daemon->websocket_helper, + NULL); + + child_errno_fail: + err = errno; + /* Gcc's warn-unused-result fail. */ + if (write(execfail[1], &err, sizeof(err))) { + ; + } + exit(127); + } + + close(childmsg[1]); + close(execfail[1]); + + /* Child will close this without writing on successful exec. */ + if (read(execfail[0], &err, sizeof(err)) == sizeof(err)) { + close(execfail[0]); + waitpid(childpid, NULL, 0); + status_broken("Exec of helper %s failed: %s", + daemon->websocket_helper, strerror(err)); + errno = err; + return io_close(conn); + } + + close(execfail[0]); + + /* New connection actually talks to proxy process. */ + conn_in_arg.daemon = daemon; + io_new_conn(tal_parent(conn), childmsg[0], conn_in, &conn_in_arg); + + /* Abandon original (doesn't close since child has dup'd fd) */ + return io_close(conn); + +close_execfail_fail: + close_noerr(execfail[0]); + close_noerr(execfail[1]); +close_msgfd_fail: + close_noerr(childmsg[0]); + close_noerr(childmsg[1]); +fail: + status_broken("Preparation of helper failed: %s", + strerror(errno)); + return io_close(conn); +} + /*~ These are the mirror functions for the connecting-out case. */ static struct io_plan *handshake_out_success(struct io_conn *conn, const struct pubkey *key, @@ -906,9 +1043,14 @@ struct listen_fd { * covers IPv4 too. Normally we'd consider failing to listen on a * port to be fatal, so we note this when setting up addresses. */ bool mayfail; + /* Callback to use for the listening: either connection_in, or for + * our much-derided WebSocket ability, websocket_connection_in! */ + struct io_plan *(*in_cb)(struct io_conn *conn, struct daemon *daemon); }; -static void add_listen_fd(struct daemon *daemon, int fd, bool mayfail) +static void add_listen_fd(struct daemon *daemon, int fd, bool mayfail, + struct io_plan *(*in_cb)(struct io_conn *, + struct daemon *)) { /*~ utils.h contains a convenience macro tal_arr_expand which * reallocates a tal_arr to make it one longer, then returns a pointer @@ -916,6 +1058,7 @@ static void add_listen_fd(struct daemon *daemon, int fd, bool mayfail) struct listen_fd l; l.fd = fd; l.mayfail = mayfail; + l.in_cb = in_cb; tal_arr_expand(&daemon->listen_fds, l); } @@ -970,11 +1113,18 @@ fail: /* Return true if it created socket successfully. */ static bool handle_wireaddr_listen(struct daemon *daemon, const struct wireaddr *wireaddr, - bool mayfail) + bool mayfail, + bool websocket) { int fd; struct sockaddr_in addr; struct sockaddr_in6 addr6; + struct io_plan *(*in_cb)(struct io_conn *, struct daemon *); + + if (websocket) + in_cb = websocket_connection_in; + else + in_cb = connection_in; /* Note the use of a switch() over enum here, even though it must be * IPv4 or IPv6 here; that will catch future changes. */ @@ -984,9 +1134,10 @@ static bool handle_wireaddr_listen(struct daemon *daemon, /* We might fail if IPv6 bound to port first */ fd = make_listen_fd(AF_INET, &addr, sizeof(addr), mayfail); if (fd >= 0) { - status_debug("Created IPv4 listener on port %u", + status_debug("Created IPv4 %slistener on port %u", + websocket ? "websocket ": "", wireaddr->port); - add_listen_fd(daemon, fd, mayfail); + add_listen_fd(daemon, fd, mayfail, in_cb); return true; } return false; @@ -994,9 +1145,10 @@ static bool handle_wireaddr_listen(struct daemon *daemon, wireaddr_to_ipv6(wireaddr, &addr6); fd = make_listen_fd(AF_INET6, &addr6, sizeof(addr6), mayfail); if (fd >= 0) { - status_debug("Created IPv6 listener on port %u", + status_debug("Created IPv6 %slistener on port %u", + websocket ? "websocket ": "", wireaddr->port); - add_listen_fd(daemon, fd, mayfail); + add_listen_fd(daemon, fd, mayfail, in_cb); return true; } return false; @@ -1122,7 +1274,7 @@ static struct wireaddr_internal *setup_listeners(const tal_t *ctx, false); status_debug("Created socket listener on file %s", addrun.sun_path); - add_listen_fd(daemon, fd, false); + add_listen_fd(daemon, fd, false, connection_in); /* We don't announce socket names, though we allow * them to lazily specify --addr=/socket. */ add_binding(&binding, &wa); @@ -1147,7 +1299,7 @@ static struct wireaddr_internal *setup_listeners(const tal_t *ctx, sizeof(wa.u.wireaddr.addr)); ipv6_ok = handle_wireaddr_listen(daemon, &wa.u.wireaddr, - true); + true, false); if (ipv6_ok) { add_binding(&binding, &wa); if (announce @@ -1163,7 +1315,7 @@ static struct wireaddr_internal *setup_listeners(const tal_t *ctx, sizeof(wa.u.wireaddr.addr)); /* OK if this fails, as long as one succeeds! */ if (handle_wireaddr_listen(daemon, &wa.u.wireaddr, - ipv6_ok)) { + ipv6_ok, false)) { add_binding(&binding, &wa); if (announce && public_address(daemon, &wa.u.wireaddr)) @@ -1174,7 +1326,8 @@ static struct wireaddr_internal *setup_listeners(const tal_t *ctx, } /* This is a vanilla wireaddr as per BOLT #7 */ case ADDR_INTERNAL_WIREADDR: - handle_wireaddr_listen(daemon, &wa.u.wireaddr, false); + handle_wireaddr_listen(daemon, &wa.u.wireaddr, + false, false); add_binding(&binding, &wa); if (announce && public_address(daemon, &wa.u.wireaddr)) add_announcable(announcable, &wa.u.wireaddr); @@ -1188,6 +1341,38 @@ static struct wireaddr_internal *setup_listeners(const tal_t *ctx, proposed_wireaddr[i].itype); } + /* If we want websockets to match IPv4/v6, set it up now. */ + if (daemon->websocket_port) { + bool announced_some = false; + struct wireaddr addr; + + for (size_t i = 0; i < tal_count(binding); i++) { + /* Ignore UNIX sockets */ + if (binding[i].itype != ADDR_INTERNAL_WIREADDR) + continue; + + /* Override with websocket port */ + addr = binding[i].u.wireaddr; + addr.port = daemon->websocket_port; + handle_wireaddr_listen(daemon, &addr, false, true); + announced_some = true; + /* FIXME: We don't report these bindings to + * lightningd, so they don't appear in + * getinfo. */ + } + + + /* We add the websocket port to the announcement if it + * applies to any */ + if (announced_some) { + wireaddr_from_websocket(&addr, daemon->websocket_port); + add_announcable(announcable, &addr); + } + } + + /* FIXME: Websocket over Tor (difficult for autotor, since we need + * to use the same onion addr!) */ + /* Now we have bindings, set up any Tor auto addresses: we will point * it at the first bound IPv4 or IPv6 address we have. */ for (size_t i = 0; i < tal_count(proposed_wireaddr); i++) { @@ -1294,7 +1479,9 @@ static struct io_plan *connect_init(struct io_conn *conn, &daemon->dev_allow_localhost, &daemon->use_dns, &tor_password, &daemon->use_v3_autotor, - &daemon->timeout_secs)) { + &daemon->timeout_secs, + &daemon->websocket_helper, + &daemon->websocket_port)) { /* This is a helper which prints the type expected and the actual * message, then exits (it should never be called!). */ master_badmsg(WIRE_CONNECTD_INIT, msg); @@ -1367,7 +1554,8 @@ static struct io_plan *connect_activate(struct io_conn *conn, } notleak(io_new_listener(daemon, daemon->listen_fds[i].fd, - connection_in, daemon)); + daemon->listen_fds[i].in_cb, + daemon)); } } /* Free, with NULL assignment just as an extra sanity check. */ diff --git a/connectd/connectd_wire.csv b/connectd/connectd_wire.csv index 093f7d33f..0719838d3 100644 --- a/connectd/connectd_wire.csv +++ b/connectd/connectd_wire.csv @@ -18,6 +18,8 @@ msgdata,connectd_init,use_dns,bool, msgdata,connectd_init,tor_password,wirestring, msgdata,connectd_init,use_v3_autotor,bool, msgdata,connectd_init,timeout_secs,u32, +msgdata,connectd_init,websocket_helper,wirestring, +msgdata,connectd_init,websocket_port,u16, # Connectd->master, here are the addresses I bound, can announce. msgtype,connectd_init_reply,2100 diff --git a/connectd/handshake.c b/connectd/handshake.c index cc1fb92b4..768b4a2a6 100644 --- a/connectd/handshake.c +++ b/connectd/handshake.c @@ -857,7 +857,6 @@ static struct io_plan *act_two_responder(struct io_conn *conn, return io_write(conn, &h->act2, ACT_TWO_SIZE, act_three_responder, h); } - static struct io_plan *act_one_responder2(struct io_conn *conn, struct handshake *h) { diff --git a/connectd/test/run-initiator-success.c b/connectd/test/run-initiator-success.c index 627e0e241..926fa7c8a 100644 --- a/connectd/test/run-initiator-success.c +++ b/connectd/test/run-initiator-success.c @@ -83,6 +83,9 @@ u8 fromwire_u8(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) /* Generated stub for fromwire_u8_array */ void fromwire_u8_array(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, u8 *arr UNNEEDED, size_t num UNNEEDED) { fprintf(stderr, "fromwire_u8_array called!\n"); abort(); } +/* Generated stub for notleak_ */ +void *notleak_(const void *ptr UNNEEDED, bool plus_children UNNEEDED) +{ fprintf(stderr, "notleak_ called!\n"); abort(); } /* Generated stub for towire */ void towire(u8 **pptr UNNEEDED, const void *data UNNEEDED, size_t len UNNEEDED) { fprintf(stderr, "towire called!\n"); abort(); } diff --git a/connectd/test/run-responder-success.c b/connectd/test/run-responder-success.c index a531f19ca..0a24a1009 100644 --- a/connectd/test/run-responder-success.c +++ b/connectd/test/run-responder-success.c @@ -83,6 +83,9 @@ u8 fromwire_u8(const u8 **cursor UNNEEDED, size_t *max UNNEEDED) /* Generated stub for fromwire_u8_array */ void fromwire_u8_array(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, u8 *arr UNNEEDED, size_t num UNNEEDED) { fprintf(stderr, "fromwire_u8_array called!\n"); abort(); } +/* Generated stub for notleak_ */ +void *notleak_(const void *ptr UNNEEDED, bool plus_children UNNEEDED) +{ fprintf(stderr, "notleak_ called!\n"); abort(); } /* Generated stub for towire */ void towire(u8 **pptr UNNEEDED, const void *data UNNEEDED, size_t len UNNEEDED) { fprintf(stderr, "towire called!\n"); abort(); } diff --git a/lightningd/connect_control.c b/lightningd/connect_control.c index f26d39247..314efccb4 100644 --- a/lightningd/connect_control.c +++ b/lightningd/connect_control.c @@ -350,6 +350,7 @@ int connectd_init(struct lightningd *ld) int hsmfd; struct wireaddr_internal *wireaddrs = ld->proposed_wireaddr; enum addr_listen_announce *listen_announce = ld->proposed_listen_announce; + const char *websocket_helper_path = ""; if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) fatal("Could not socketpair for connectd<->gossipd"); @@ -381,7 +382,9 @@ int connectd_init(struct lightningd *ld) IFDEV(ld->dev_allow_localhost, false), ld->config.use_dns, ld->tor_service_password ? ld->tor_service_password : "", ld->config.use_v3_autotor, - ld->config.connection_timeout_secs); + ld->config.connection_timeout_secs, + websocket_helper_path, + 0); subd_req(ld->connectd, ld->connectd, take(msg), -1, 0, connect_init_done, NULL);