From 0eec1e8cbd46072a560891cf7b64f0e7b3d0184e Mon Sep 17 00:00:00 2001 From: Alex Akselrod Date: Mon, 8 Aug 2022 15:34:36 -0700 Subject: [PATCH] brontide: add proxy subpackage for proxied brontide connections. This commit adds a new `brontide/proxy` subpackage containing two new structs. `proxy.Mux` is a multiplexer that allows multiple `proxy.Conn` connections to be handled over a single brontide connection. This can be used to allow a proxy running in a separate process to handle connections to and from peers, freeing up the node's process and dedicated hardware to ensuring the safety of money entrusted to it. --- brontide/proxy/conn.go | 182 ++++++++++++++ brontide/proxy/interface.go | 21 ++ brontide/proxy/mux.go | 439 +++++++++++++++++++++++++++++++++ brontide/proxy/proxy_test.go | 464 +++++++++++++++++++++++++++++++++++ 4 files changed, 1106 insertions(+) create mode 100644 brontide/proxy/conn.go create mode 100644 brontide/proxy/interface.go create mode 100644 brontide/proxy/mux.go create mode 100644 brontide/proxy/proxy_test.go diff --git a/brontide/proxy/conn.go b/brontide/proxy/conn.go new file mode 100644 index 000000000..e854405f8 --- /dev/null +++ b/brontide/proxy/conn.go @@ -0,0 +1,182 @@ +package proxy + +import ( + "fmt" + "net" + "sync" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/lightningnetwork/lnd/buffer" + "github.com/lightningnetwork/lnd/pool" +) + +// Conn is a mux-proxied connection that contains a remote pubkey +// and local pubkey, just like a Conn. +type Conn struct { + // We extend a connection. The two kinds we use are either a + // *Conn from this package, or a net.Conn returned by net.Pipe(). + net.Conn + + // remotePub and localPub support keeping track of peer keys for + // net.Conn objects returned by net.Pipe(). + remotePub *btcec.PublicKey + localPub *btcec.PublicKey + + // readPool is a pointer to the backing mux's readPool, which is + // shared among all of the connections backed by the mux. + readPool *pool.Read + + // mtxMsg is a mutex to control access to the current message so that + // it can only be read/reset at the same time. + mtxMsg sync.Mutex + + // curMsg is nil if there's no message that's been read, or a pointer + // to the byte slice representing the current message. + curMsg *[]byte +} + +// ReadNextHeader reads the next header. It passes implementation details +// to a brontide.Conn if possible, otherwise it assumes that this is a pipe +// connection and it can read an entire message in a single read. +// +// TODO(aakselrod): handle large messages/fragmentation/partial reads/timeouts. +func (c *Conn) ReadNextHeader() (uint32, error) { + // If the underlying connection is able to read messages, do that. + if conn, ok := c.Conn.(MessageConnWithPubkey); ok { + return conn.ReadNextHeader() + } + + c.mtxMsg.Lock() + defer c.mtxMsg.Unlock() + + // If we've already read a message from a piped connection, return + // its length. + if c.curMsg != nil { + return uint32(len(*c.curMsg)), nil + } + + // Use the regular API for net.Conn to read from the connection, save + // the data for returning as a body later, and return its length as + // expected for now. + var retBuf []byte + + // Use a buffer from the read pool to read from the underlying conn. + err := c.readPool.Submit(func(buf *buffer.Read) error { + n, err := c.Read(buf[:]) + if err != nil { + return err + } + + // Make a copy of the data before we give the read buffer back + // to the pool. + retBuf = make([]byte, n) + + copy(retBuf, buf[:n]) + + return nil + }) + if err != nil { + return 0, err + } + + c.curMsg = &retBuf + + return uint32(len(retBuf)), nil +} + +// ReadNextBody reads the next body. +// +// TODO(aakselrod): handle large messages/fragmentation/partial reads/timeouts. +func (c *Conn) ReadNextBody(buf []byte) ([]byte, error) { + // If the underlying connection is able to read messages, do that. + if conn, ok := c.Conn.(MessageConnWithPubkey); ok { + return conn.ReadNextBody(buf) + } + + c.mtxMsg.Lock() + defer c.mtxMsg.Unlock() + + // If we haven't read a message using ReadNextHeader, we shouldn't be + // reading a body. + if c.curMsg == nil { + return nil, fmt.Errorf("need to read header before body") + } + + // If buffer and data lengths mismatch, return an error + if len(buf) != len(*c.curMsg) { + return nil, fmt.Errorf("wrong size buffer for message") + } + + // Copy the message data into the buffer we've been passed + copy(buf, *c.curMsg) + + // Reset current message for next read + c.curMsg = nil + + return buf, nil +} + +// ReadNextMessage reads the next message. +// +// TODO(aakselrod): handle large messages/fragmentation/partial reads/timeouts. +func (c *Conn) ReadNextMessage() ([]byte, error) { + // First we read the length of the message. + n, err := c.ReadNextHeader() + if err != nil { + return nil, err + } + + // Create a new buffer and return the read. + buf := make([]byte, n) + return c.ReadNextBody(buf) +} + +// WriteMessage implements the method defined in MessageConnWithPubkey. +// If the underlying connection is capable, the message API is used. Otherwise, +// it's converted into a regular Write. +// +// TODO(aakselrod): handle large messages/fragmentation/partial reads/timeouts. +func (c *Conn) WriteMessage(buf []byte) error { + // If the underlying connection is able to write messages, do that. + if conn, ok := c.Conn.(MessageConnWithPubkey); ok { + return conn.WriteMessage(buf) + } + + // Use the regular API for net.Conn to write to the connection and + // return the result as expected. + n, err := c.Write(buf) + if err != nil { + return err + } + + // Partial write. + if n < len(buf) { + return fmt.Errorf("only wrote %d of %d byte message", n, + len(buf)) + } + + return nil +} + +// Flush implements the method defined in MessageConnWithPubkey. If the +// underlying connection is capable, its Flush method is called. Otherwise, +// this results in a NOOP. +func (c *Conn) Flush() (int, error) { + // If the underlying connection can be flushed, do that. + if conn, ok := c.Conn.(MessageConnWithPubkey); ok { + return conn.Flush() + } + + // Do nothing. + return 0, nil +} + +// LocalPub implements the method defined in MessageConnWithPubkey. +func (c *Conn) LocalPub() *btcec.PublicKey { + return c.localPub +} + +// RemotePub implements the method defined in MessageConnWithPubkey. +func (c *Conn) RemotePub() *btcec.PublicKey { + return c.remotePub +} diff --git a/brontide/proxy/interface.go b/brontide/proxy/interface.go new file mode 100644 index 000000000..8b152881b --- /dev/null +++ b/brontide/proxy/interface.go @@ -0,0 +1,21 @@ +package proxy + +import ( + "github.com/btcsuite/btcd/btcec/v2" + "github.com/lightningnetwork/lnd/peer" +) + +// MessageConnWithPubkey matches both *brontide.Conn and *Conn to read and +// write messages as well as tell the identity of the connection's remote peer. +// +// TODO(aakselrod): Move this to peer package? +type MessageConnWithPubkey interface { + // Inherit from net.Conn + peer.MessageConn + + // LocalPub returns the connection's identity for the local peer. + LocalPub() *btcec.PublicKey + + // RemotePub returns the connection's identity for the remote peer. + RemotePub() *btcec.PublicKey +} diff --git a/brontide/proxy/mux.go b/brontide/proxy/mux.go new file mode 100644 index 000000000..ea154c222 --- /dev/null +++ b/brontide/proxy/mux.go @@ -0,0 +1,439 @@ +package proxy + +import ( + "context" + "fmt" + "net" + "sync" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/lightningnetwork/lnd/brontide" + "github.com/lightningnetwork/lnd/pool" + "golang.org/x/sync/errgroup" +) + +// connKey is an 8-byte key to identify connections multiplexed over a single +// connection between two muxes. In a special case, an all-zeroes connKey is +// a signal (right now, only supported to announce a new connection). +type connKey [8]byte + +// pubKeyToConnKey takes the last 8 bytes of a public key and copies then to +// a connKey object. +// +// TODO(aakselrod): use a better derivation of connection key from pubkey like +// SipHash64 or just use the whole pubkey? +func pubKeyToConnKey(key *btcec.PublicKey) connKey { + var ck connKey + + keyBytes := key.SerializeCompressed() + + copy(ck[:], keyBytes[len(keyBytes)-8:]) + + return ck +} + +// Mux multiplexes a set of connections over a single proxy connection. It sends +// an entire packet. +type Mux struct { + + // mtx is a mutex for async access. + mtx sync.RWMutex + + // eg is an rror group to collect errors from goroutines if wanted. + eg *errgroup.Group + + // ctx is the context. + ctx context.Context + + // stop is the cancel function for clean shutdown. + stop context.CancelFunc + + // started tracks whether we've started the handler. + started bool + + // conn is a brontide connection to another mux for multiplexing the + // conns below. + conn *brontide.Conn + + // conns is a map of connections by substring of pubkey. The interface + // can be either a *brontide.Conn or a *Conn. + conns map[connKey]MessageConnWithPubkey + + // readPool is a pool of read buffers shared among all of the mux's + // connections. + readPool *pool.Read + + // accept is a channel for accepting new connections from the connected + // mux. The public key is that of a newly connected peer. + accept chan *btcec.PublicKey +} + +// NewMux creates a new Mux and populates it with initial values. +func NewMux(ctx context.Context, readPool *pool.Read, + conn *brontide.Conn) *Mux { + + m := &Mux{} + + // Assign the mux connection and read buffer pool. + m.conn = conn + m.readPool = readPool + + // Derive a context with our own cancel signal. + m.ctx, m.stop = context.WithCancel(ctx) + + // Derive a new error group from the context above. + m.eg, m.ctx = errgroup.WithContext(m.ctx) + + // Initialize the conns map and accept channel. + m.conns = make(map[connKey]MessageConnWithPubkey) + m.accept = make(chan *btcec.PublicKey) + + return m +} + +// Start launches the mux's event loop goroutine. +func (m *Mux) Start() error { + // Ensure we lock the mutex to modify the state. + m.mtx.Lock() + defer m.mtx.Unlock() + + if m.started { + return fmt.Errorf("mux already running") + } + + // Set to started. + m.started = true + + m.eg.Go(m.eventLoop) + + return nil +} + +// Accept is used by the node mux to listen for p2p connections through the +// proxy mux to which it's connected. It's part of the net.Listener interface. +func (m *Mux) Accept() (net.Conn, error) { + for { + select { + // Wait for a message on the accept channel. + case pubKey, ok := <-m.accept: + + // Channel closed, return error + if !ok { + return nil, net.ErrClosed + } + + // Got a new pubkey, create the Conn objects we'll + // need. First, we need virtual piped connections. + ourConn, theirConn := net.Pipe() + + // Create a connection for the mux to handle. + conn := &Conn{ + Conn: ourConn, + localPub: m.conn.LocalPub(), + remotePub: pubKey, + readPool: m.readPool, + } + + // Register the connection with the mux. + m.mtx.Lock() + err := m.add(conn) + m.mtx.Unlock() + if err != nil { + return nil, err + } + + // Return a connection for the caller to use. + return &Conn{ + Conn: theirConn, + localPub: conn.localPub, + remotePub: pubKey, + readPool: m.readPool, + }, nil + + // Our context is cancelled, time to exit. + case <-m.ctx.Done(): + return nil, net.ErrClosed + } + } +} + +// Close closes the mux, stops the handler goroutine, closes any connections +// used by the mux, and stops their goroutines as well. It is part of the +// net.Listener interface. +func (m *Mux) Close() error { + m.mtx.Lock() + defer m.mtx.Unlock() + + // We're not started. + if !m.started { + return fmt.Errorf("mux isn't running") + } + + // Stop all of the connections' goroutines and delete them from the map. + for k := range m.conns { + m.stopConn(k) + } + + // Close the mux connection and zero its pointer. + m.conn.Close() + m.conn = nil + + // Let future callers know the mux isn't running. + m.started = false + + return nil +} + +// Addr returns the address of the mux's local connection. It is part of the +// net.Listener interface. +// +// TODO(aakselrod): should we specify this on init to return the proxy's +// listen address instead? +func (m *Mux) Addr() net.Addr { + return m.conn.LocalAddr() +} + +// eventLoop handles messages coming in over the mux connection and +// de-multiplexes them to other connections. It also handles signals for the +// node mux that the proxy mux has added an incoming peer connection by +// signaling over the accept channel. +func (m *Mux) eventLoop() error { + // On event loop exit, stop all of the connections + defer func() { + m.Close() + }() + + m.mtx.RLock() + muxConn := m.conn + m.mtx.RUnlock() + + if muxConn == nil { + return net.ErrClosed + } + + // Start the event loop. + for { + // TODO(aakselrod): handle oversized messages, fragmentation, + // timeouts, partial reads, etc. + l, err := muxConn.ReadNextHeader() + if err != nil { + return err + } + + pkt := make([]byte, l) + pkt, err = muxConn.ReadNextBody(pkt) + if err != nil { + return err + } + + // TODO(aakselrod): more robust signaling.. + if len(pkt) < 9 { + // We expect 8 bytes for the connection key and at + // least 1 byte of data here. + return fmt.Errorf("mux requires at least 9 bytes, "+ + "got %d", len(pkt)) + } + + // Get the connection key for this message from the first 8 + // bytes. + var key connKey + copy(key[:], pkt[:8]) + + // If key is all zeroes, we're opening a connection to a new + // peer, so next 33 bytes are the peer pubkey. + // + // TODO(aakselrod): more robust signaling. + if (key == connKey{}) { + if len(pkt) != btcec.PubKeyBytesLenCompressed+8 { + // We expect exactly 33+8=41 bytes, otherwise + // ignore the message + continue + } + + // The first 8 bytes are zeroes as a signal that the + // message is from the peer mux, not a multiplexed + // connection. The rest of the message is a pubkey for + // adding a new connection. + // + // TODO(aakselrod): handle more types of messages, + // perhaps dial-out messages or other useful things + // that proxies do. + pubKey, err := btcec.ParsePubKey(pkt[8:]) + if err != nil { + return fmt.Errorf("error parsing pubkey: %v", + err) + } + + // Derive the connKey. + key = pubKeyToConnKey(pubKey) + + // Check if a connection already exists under this + // pubkey. If so, it could be a duplicate or a + // reconnection attempt. + m.mtx.RLock() + _, connected := m.conns[key] + m.mtx.RUnlock() + + if connected { + // We already have a proxied connection for + // this key, so we wait for the next one. + // + // TODO(aakselrod): handle reconnects. Some + // handling already exists in the Add function. + continue + } + + select { + // Send the public key to an Accept() running somewhere. + // + // TODO(aakselrod): use better notifications or at + // least handle the case where there isn't an Accept() + // running. + case m.accept <- pubKey: + continue + + // Time to quit, our context is cancelled. + case <-m.ctx.Done(): + return nil + } + } + + // Got the connection key in the message, so look up the + // matching connection by key. + m.mtx.RLock() + conn, ok := m.conns[key] + m.mtx.RUnlock() + + if !ok { + // Drop packet, it doesn't match a connection that we + // have.. + // + // TODO(aakselrod): smarter handling. Perhaps we signal + // a new connection just by sending a message relating + // to its public key instead of an all-zeroes connKey? + continue + } + + n, err := conn.Write(pkt[8:]) + if err != nil || n < len(pkt)-8 { + m.mtx.Lock() + m.stopConn(key) + m.mtx.Unlock() + } + } +} + +// stopConn stops a connection and must be run with the read-write lock +// acquired. +func (m *Mux) stopConn(key connKey) { + conn, ok := m.conns[key] + delete(m.conns, key) + + if ok && conn != nil { + conn.Close() + } +} + +// add is used to add and start handling a new connection and must be run with +// the read-write lock acquired. +func (m *Mux) add(conn MessageConnWithPubkey) error { + // Check if mux is started. + if !m.started { + return fmt.Errorf("can't add connection to mux that's " + + "not running") + } + + // Derive connection key from public key. + key := pubKeyToConnKey(conn.RemotePub()) + + // In case we're replacing a stale connection. + // + // TODO(aakselrod): robust handling of reconnects. + m.stopConn(key) + + // Add connection to map by connKey. + m.conns[key] = conn + + // Start a goroutine for the mux to handle the newly added connection. + m.eg.Go(func() error { return m.handleConn(key, conn) }) + + return nil +} + +// Add adds a new connection to be multiplexed through the multiplexer. It can +// be either a *Conn or a *brontide.Conn. +func (m *Mux) Add(conn MessageConnWithPubkey) error { + m.mtx.Lock() + defer m.mtx.Unlock() + + err := m.add(conn) + if err != nil { + return err + } + + // Signal the peer mux that we've got a new connection. First, get the + // key bytes. + // + // TODO(aakselrod): use a separate mutex for writing to the mux + // connection. + err = m.conn.WriteMessage(append(make([]byte, 8), + conn.RemotePub().SerializeCompressed()...)) + if err != nil { + return err + } + + _, err = m.conn.Flush() + if err != nil { + return err + } + + return nil +} + +// handleConn runs in a goroutine to multiplex a connection through the +// mux. +func (m *Mux) handleConn(key connKey, conn MessageConnWithPubkey) error { + // Stop connection on exit + defer func() { + m.mtx.Lock() + m.stopConn(key) + m.mtx.Unlock() + }() + + for { + // Read a message from the connection. + // + // TODO(aakselrod): handle oversized messages, fragmentation, + // timeouts, partial reads, etc. + n, err := conn.ReadNextHeader() + if err != nil { + return err + } + + buf := make([]byte, n) + buf, err = conn.ReadNextBody(buf) + if err != nil { + return err + } + + // Multiplex the message through the mux connection. + // + // TODO(aakselrod): use a separate mutex for writing to the + // mux connection. + m.mtx.Lock() + + err = m.conn.WriteMessage(append(key[:], buf...)) + if err != nil { + m.mtx.Unlock() + return err + } + + _, err = m.conn.Flush() + if err != nil { + m.mtx.Unlock() + return err + } + + m.mtx.Unlock() + } +} diff --git a/brontide/proxy/proxy_test.go b/brontide/proxy/proxy_test.go new file mode 100644 index 000000000..6c6d730e7 --- /dev/null +++ b/brontide/proxy/proxy_test.go @@ -0,0 +1,464 @@ +package proxy_test + +import ( + "context" + "encoding/hex" + "fmt" + "net" + "testing" + "time" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/brontide" + "github.com/lightningnetwork/lnd/brontide/proxy" + "github.com/lightningnetwork/lnd/keychain" + "github.com/lightningnetwork/lnd/lntest" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/pool" + "github.com/stretchr/testify/require" +) + +const ( + // p2pPrivStr is the hex private key of the node in a test context. + // It is also copied to the proxy so it can pretend to be the node. + p2pPrivStr = "677f456999a1ec3db53f4cb51e64db15941a37aaeb67eaed018bb6f637a933ed" + + // from the node. + proxyPrivStr = "7568a50fe45d3681e66e915a1ded0e39e92a512f8278fae6540e57868533c093" + + // peer1PrivStr is the hex private key of a peer that connects to the + // proxy as if it's connecting directly to the node. + peer1PrivStr = "7568a507345d3681e66e915a1ded0e39e92a512f8278fae6540e57868533c093" + + // peer1PrivStr is the hex private key of a peer that connects to the + // proxy as if it's connecting directly to the node. + peer2PrivStr = "7568a50fe81d3681e66e915a1ded0e39e92a512f8278fae6540e57868533c093" +) + +// proxyTestContext is a struct which holds required structs for running proxy +// tests. +type proxyTestContext struct { + // Inherit from context.Context. + context.Context + + // cancel is the cancel function for this context. + cancel context.CancelFunc + + // t is the test context + t *testing.T + + // p2pAddrStr is the address on which the proxy listens for p2p + // connections. + p2pAddrStr string + + // proxyAddrStr is the address on which the proxy listens for a + // connection from the node. + proxyAddrStr string + + // p2pPrivKey is the node private key which is also exported to the + // proxy for impersonation. + p2pPrivKey *btcec.PrivateKey + + // p2pPubKey is the node public key derived from the private key above. + // It is used for the node to identify itself to the proxy and for the + // proxy to impersonate the node to peers. + p2pPubKey *btcec.PublicKey + + // proxyPrivKey is the proxy private key used for communication with + // the node. + proxyPrivKey *btcec.PrivateKey + + // proxyPubKey is the proxy public key derived from the private key + // above. It is used for the proxy to identify itself to the node so + // the node can trust it to proxy traffic. + proxyPubKey *btcec.PublicKey + + // p2pPrivECDH is used to connect the node mux to the proxy mux + // as well as start the p2pListener below. + p2pPrivECDH keychain.SingleKeyECDH + + // p2pListener listens for connections from peers. It impersonates + // a listener that the node would bring up by using the node key. + p2pListener *brontide.Listener + + // proxyListener listens for mux connections from the node. It uses its + // own keypair to identify itself to the node. + proxyListener *brontide.Listener + + // p2pNetAddr represents the network address, including public key, + // IP address/hostname, and TCP port, for peer connections. + p2pNetAddr *lnwire.NetAddress + + // proxyNetAddr represents the network address, including public key, + // IP address/hostname, and TCP port, for the node connection to the + // proxy. + proxyNetAddr *lnwire.NetAddress + + // readPool is a read buffer pool. + readPool *pool.Read + + // proxyMux runs on the proxy side. It starts with a connection from + // the node on the proxyListener, and then connections are added to + // it as they're accepted on p2pListener. + proxyMux *proxy.Mux + + // nodeMux runs on the node side. It connects to the proxyMux using + // a single brontide connection. All of the peer connections are + // multiplexed over the single connection between the node and proxy. + nodeMux *proxy.Mux +} + +// newProxyTestContext creates a new proxy test context and populates it with +// an initial node and proxy mux. It starts them running but does not create +// any peers. +func newProxyTestContext(t *testing.T) (*proxyTestContext, error) { + t.Helper() + + // Create context with a cancel function. + ctx, cancel := context.WithCancel(context.Background()) + + // Generate random ports to listen on. + p2pAddrStr := fmt.Sprintf("localhost:%d", lntest.NextAvailablePort()) + proxyAddrStr := fmt.Sprintf("localhost:%d", lntest.NextAvailablePort()) + + // Get keys from hex constants. + p2pPrivKey, p2pPubKey, err := hexToKey(p2pPrivStr) + require.NoError(t, err, "error parsing p2p key") + + proxyPrivKey, proxyPubKey, err := hexToKey(proxyPrivStr) + require.NoError(t, err, "error parsing proxy key") + + // Wrap keys for ECDH. + var p2pPrivECDH keychain.SingleKeyECDH = &keychain.PrivKeyECDH{ + PrivKey: p2pPrivKey, + } + + var proxyPrivECDH keychain.SingleKeyECDH = &keychain.PrivKeyECDH{ + PrivKey: proxyPrivKey, + } + + // Start the listeners on the required ports. + p2pListener, err := brontide.NewListener(p2pPrivECDH, p2pAddrStr) + require.NoError(t, err, "couldn't start p2p listener") + + proxyListener, err := brontide.NewListener(proxyPrivECDH, proxyAddrStr) + require.NoError(t, err, "couldn't start proxy listener") + + // Construct the network addresses for testing. + p2pNetAddr := &lnwire.NetAddress{ + IdentityKey: p2pPubKey, + Address: p2pListener.Addr(), + ChainNet: wire.SimNet, + } + + proxyNetAddr := &lnwire.NetAddress{ + IdentityKey: proxyPubKey, + Address: proxyListener.Addr(), + ChainNet: wire.SimNet, + } + + // Make a read buffer pool. + readPool := pool.NewRead( + pool.NewReadBuffer( + pool.DefaultReadBufferGCInterval, + pool.DefaultReadBufferExpiryInterval, + ), + 5, + pool.DefaultWorkerTimeout, + ) + + err = readPool.Start() + require.NoError(t, err, "couldn't start read pool") + + // Construct the test context object. + c := &proxyTestContext{ + ctx, + cancel, + t, + p2pAddrStr, + proxyAddrStr, + p2pPrivKey, + p2pPubKey, + proxyPrivKey, + proxyPubKey, + p2pPrivECDH, + p2pListener, + proxyListener, + p2pNetAddr, + proxyNetAddr, + readPool, + nil, + nil, + } + + // Now we start the handler goroutines and return the test + // context. + return c, c.handleMuxes() +} + +// handleMuxes connects and starts the node and proxy muxes, and starts a +// goroutine that accepts connections over the p2pListener and adds them to +// the proxyMux, thus signaling the nodeMux. Dialing the p2pListener and +// accepting nodeMux connections is left up to tests themselves. +func (c *proxyTestContext) handleMuxes() error { + // doneChan lets goroutines signal when their mux is set up + doneChan := make(chan struct{}) + + // Start a goroutine to handle the proxy mux. + go func() { + // First, accept a connection from the node. + proxyMuxConn, err := c.proxyListener.Accept() + require.NoError(c.t, err, + "accepting proxyMux connection failed") + + // Cast as *brontide.Conn + brontideConn, ok := proxyMuxConn.(*brontide.Conn) + require.True(c.t, ok, "couldn't cast proxyMuxConn as "+ + "*brontide.Conn") + + // Authenticate the node connecting to the proxy. + require.Equal(c.t, brontideConn.RemotePub(), c.p2pPubKey) + + // Spin up the proxy mux. + c.proxyMux = proxy.NewMux(c, c.readPool, brontideConn) + + err = c.proxyMux.Start() + require.NoError(c.t, err, "starting proxyMux failed") + + // Signal the proxy mux is started. + select { + case doneChan <- struct{}{}: + case <-c.Done(): + return + } + + // Accept connections on the p2pListener and add them to the + // proxy mux. The proxy mux will signal the node mux about + // the new connection. + for { + // Accept a connection. + p2pConn, err := c.p2pListener.Accept() + + // If the listener is closed, it's time to exit. Any + // other error is a failure. + if err != nil && err.Error() == + "brontide connection closed" { + + return + } + + require.NoError(c.t, err, "accepting p2p "+ + "connection failed") + + // Add it to the mux. + err = c.proxyMux.Add(p2pConn.(*brontide.Conn)) + require.NoError(c.t, err, "adding p2p connection to "+ + "proxyMux failed") + } + }() + + // Dial the proxyListener and set up the node mux on the resulting + // connection. This automatically authenticates the pubkey of the + // proxyListener. + nodeMuxConn, err := brontide.Dial(c.p2pPrivECDH, c.proxyNetAddr, + time.Second, net.DialTimeout) + require.NoError(c.t, err, "connecting to proxy mux failed") + + c.nodeMux = proxy.NewMux(c, c.readPool, nodeMuxConn) + + err = c.nodeMux.Start() + require.NoError(c.t, err, "starting nodeMux failed") + + // Ensure the proxyMux has also started. + select { + case <-doneChan: + + case <-c.Done(): + return fmt.Errorf("context ended early") + } + + return nil +} + +// stop cleanly stops all the goroutines under the proxyTestContext. +func (c *proxyTestContext) stop() { + c.cancel() + + c.proxyListener.Close() + + c.p2pListener.Close() + + c.proxyMux.Close() + + c.nodeMux.Close() + + require.NoError(c.t, c.readPool.Stop()) +} + +// TestProxy creates a test context with a node mux and proxy mux, connects +// two peers to the proxy, and ensures the node sees them connect and can +// send data to and receive data from them. +func TestProxy(t *testing.T) { + // Create the proxy test context. + c, err := newProxyTestContext(t) + + require.NoError(t, err) + require.NotNil(t, c) + + defer c.stop() + + // Derive keys for the peers from their hex constants. + peer1PrivKey, peer1PubKey, err := hexToKey(peer1PrivStr) + require.NoError(t, err, "error parsing peer1 key") + + peer2PrivKey, peer2PubKey, err := hexToKey(peer2PrivStr) + require.NoError(t, err, "error parsing peer2 key") + + // Wrap keys for ECDH. + var peer1PrivECDH keychain.SingleKeyECDH = &keychain.PrivKeyECDH{ + PrivKey: peer1PrivKey, + } + + var peer2PrivECDH keychain.SingleKeyECDH = &keychain.PrivKeyECDH{ + PrivKey: peer2PrivKey, + } + + doneChan := make(chan struct{}) + + var peer1Conn, peer2Conn *brontide.Conn + var nodePeer1Conn, nodePeer2Conn *proxy.Conn + + // Accept two connections at the node, one per peer, in a goroutine. + go func() { + conn, err := c.nodeMux.Accept() + require.NoError(t, err, "accepting nodePeer1Conn failed") + + nodePeer1Conn = conn.(*proxy.Conn) + + conn, err = c.nodeMux.Accept() + require.NoError(t, err, "accepting nodePeer2Conn failed") + + nodePeer2Conn = conn.(*proxy.Conn) + + select { + case doneChan <- struct{}{}: + + case <-c.Done(): + } + }() + + // Connect the first peer to the proxy, which should allow the above + // goroutine to populate nodePeer1. + peer1Conn, err = brontide.Dial(peer1PrivECDH, c.p2pNetAddr, + time.Second, net.DialTimeout) + require.NoError(t, err, "connecting peer 1 to p2p proxy failed") + defer peer1Conn.Close() + + // Connect the second peer to the proxy, which should allow the above + // goroutine to populate nodePeer2. + peer2Conn, err = brontide.Dial(peer2PrivECDH, c.p2pNetAddr, + time.Second, net.DialTimeout) + require.NoError(t, err, "connecting peer 1 to p2p proxy failed") + defer peer2Conn.Close() + + // Await for nodePeer1 and nodePeer2 to populate. + select { + case <-doneChan: + + case <-c.Done(): + t.Fatalf("context ended early") + } + + defer nodePeer1Conn.Close() + defer nodePeer2Conn.Close() + + // Ensure everyone's got each other's correct public keys. + require.Equal(t, *(peer1Conn.LocalPub()), + *(nodePeer1Conn.RemotePub()), *peer1PubKey) + require.Equal(t, *(peer2Conn.LocalPub()), + *(nodePeer2Conn.RemotePub()), *peer2PubKey) + + // Send test messages between the node and each peer through the proxy. + // + // TODO(aakselrod): Test reconnections, large packets, etc. + testCases := []struct { + from proxy.MessageConnWithPubkey + to proxy.MessageConnWithPubkey + }{ + { + // From node to peer 1. + from: nodePeer1Conn, + to: peer1Conn, + }, + { + // From peer 1 to node. + from: peer1Conn, + to: nodePeer1Conn, + }, + { + // From node to peer 2. + from: nodePeer2Conn, + to: peer2Conn, + }, + { + // From peer 2 to node. + from: peer2Conn, + to: nodePeer2Conn, + }, + } + + for i, testCase := range testCases { + // Create a unique string for data to send across the wire. + buf := []byte(fmt.Sprintf("case %d", i)) + + // Set up a goroutine to receive a message and check that + // it's the one we expect. + go func() { + // Receive the data. + n, err := testCase.to.ReadNextHeader() + require.NoError(t, err, "couldn't read header") + + buf2 := make([]byte, n) + buf2, err = testCase.to.ReadNextBody(buf2) + require.NoError(t, err, "couldn't read body") + + // Make sure it's the data we expected. + require.Equal(t, buf, buf2) + + // Signal that we're done and exit. + select { + case doneChan <- struct{}{}: + + case <-c.Done(): + } + }() + + // Write a message and flush it. + err := testCase.from.WriteMessage(buf) + require.NoError(t, err, "couldn't write message") + + _, err = testCase.from.Flush() + require.NoError(t, err, "couldn't flush") + + // Wait for the message to be received and checked. + select { + case <-doneChan: + + case <-c.Done(): + return + } + } +} + +// hexToKey returns a private key decoded from a hex string, a public key +// derived from the private key, and an error if unsuccessful. +func hexToKey(keyStr string) (*btcec.PrivateKey, *btcec.PublicKey, error) { + keyBytes, err := hex.DecodeString(keyStr) + if err != nil { + return nil, nil, err + } + + privKey, pubKey := btcec.PrivKeyFromBytes(keyBytes) + return privKey, pubKey, nil +}