brontide: add proxy subpackage for proxied brontide connections.

This commit adds a new `brontide/proxy` subpackage containing
two new structs. `proxy.Mux` is a multiplexer that allows multiple
`proxy.Conn` connections to be handled over a single brontide
connection. This can be used to allow a proxy running in a separate
process to handle connections to and from peers, freeing up the
node's process and dedicated hardware to ensuring the safety of
money entrusted to it.
This commit is contained in:
Alex Akselrod 2022-08-08 15:34:36 -07:00
parent 9f013f5058
commit 0eec1e8cbd
No known key found for this signature in database
GPG key ID: 7F09684C978E66B6
4 changed files with 1106 additions and 0 deletions

182
brontide/proxy/conn.go Normal file
View file

@ -0,0 +1,182 @@
package proxy
import (
"fmt"
"net"
"sync"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/lightningnetwork/lnd/buffer"
"github.com/lightningnetwork/lnd/pool"
)
// Conn is a mux-proxied connection that contains a remote pubkey
// and local pubkey, just like a Conn.
type Conn struct {
// We extend a connection. The two kinds we use are either a
// *Conn from this package, or a net.Conn returned by net.Pipe().
net.Conn
// remotePub and localPub support keeping track of peer keys for
// net.Conn objects returned by net.Pipe().
remotePub *btcec.PublicKey
localPub *btcec.PublicKey
// readPool is a pointer to the backing mux's readPool, which is
// shared among all of the connections backed by the mux.
readPool *pool.Read
// mtxMsg is a mutex to control access to the current message so that
// it can only be read/reset at the same time.
mtxMsg sync.Mutex
// curMsg is nil if there's no message that's been read, or a pointer
// to the byte slice representing the current message.
curMsg *[]byte
}
// ReadNextHeader reads the next header. It passes implementation details
// to a brontide.Conn if possible, otherwise it assumes that this is a pipe
// connection and it can read an entire message in a single read.
//
// TODO(aakselrod): handle large messages/fragmentation/partial reads/timeouts.
func (c *Conn) ReadNextHeader() (uint32, error) {
// If the underlying connection is able to read messages, do that.
if conn, ok := c.Conn.(MessageConnWithPubkey); ok {
return conn.ReadNextHeader()
}
c.mtxMsg.Lock()
defer c.mtxMsg.Unlock()
// If we've already read a message from a piped connection, return
// its length.
if c.curMsg != nil {
return uint32(len(*c.curMsg)), nil
}
// Use the regular API for net.Conn to read from the connection, save
// the data for returning as a body later, and return its length as
// expected for now.
var retBuf []byte
// Use a buffer from the read pool to read from the underlying conn.
err := c.readPool.Submit(func(buf *buffer.Read) error {
n, err := c.Read(buf[:])
if err != nil {
return err
}
// Make a copy of the data before we give the read buffer back
// to the pool.
retBuf = make([]byte, n)
copy(retBuf, buf[:n])
return nil
})
if err != nil {
return 0, err
}
c.curMsg = &retBuf
return uint32(len(retBuf)), nil
}
// ReadNextBody reads the next body.
//
// TODO(aakselrod): handle large messages/fragmentation/partial reads/timeouts.
func (c *Conn) ReadNextBody(buf []byte) ([]byte, error) {
// If the underlying connection is able to read messages, do that.
if conn, ok := c.Conn.(MessageConnWithPubkey); ok {
return conn.ReadNextBody(buf)
}
c.mtxMsg.Lock()
defer c.mtxMsg.Unlock()
// If we haven't read a message using ReadNextHeader, we shouldn't be
// reading a body.
if c.curMsg == nil {
return nil, fmt.Errorf("need to read header before body")
}
// If buffer and data lengths mismatch, return an error
if len(buf) != len(*c.curMsg) {
return nil, fmt.Errorf("wrong size buffer for message")
}
// Copy the message data into the buffer we've been passed
copy(buf, *c.curMsg)
// Reset current message for next read
c.curMsg = nil
return buf, nil
}
// ReadNextMessage reads the next message.
//
// TODO(aakselrod): handle large messages/fragmentation/partial reads/timeouts.
func (c *Conn) ReadNextMessage() ([]byte, error) {
// First we read the length of the message.
n, err := c.ReadNextHeader()
if err != nil {
return nil, err
}
// Create a new buffer and return the read.
buf := make([]byte, n)
return c.ReadNextBody(buf)
}
// WriteMessage implements the method defined in MessageConnWithPubkey.
// If the underlying connection is capable, the message API is used. Otherwise,
// it's converted into a regular Write.
//
// TODO(aakselrod): handle large messages/fragmentation/partial reads/timeouts.
func (c *Conn) WriteMessage(buf []byte) error {
// If the underlying connection is able to write messages, do that.
if conn, ok := c.Conn.(MessageConnWithPubkey); ok {
return conn.WriteMessage(buf)
}
// Use the regular API for net.Conn to write to the connection and
// return the result as expected.
n, err := c.Write(buf)
if err != nil {
return err
}
// Partial write.
if n < len(buf) {
return fmt.Errorf("only wrote %d of %d byte message", n,
len(buf))
}
return nil
}
// Flush implements the method defined in MessageConnWithPubkey. If the
// underlying connection is capable, its Flush method is called. Otherwise,
// this results in a NOOP.
func (c *Conn) Flush() (int, error) {
// If the underlying connection can be flushed, do that.
if conn, ok := c.Conn.(MessageConnWithPubkey); ok {
return conn.Flush()
}
// Do nothing.
return 0, nil
}
// LocalPub implements the method defined in MessageConnWithPubkey.
func (c *Conn) LocalPub() *btcec.PublicKey {
return c.localPub
}
// RemotePub implements the method defined in MessageConnWithPubkey.
func (c *Conn) RemotePub() *btcec.PublicKey {
return c.remotePub
}

View file

@ -0,0 +1,21 @@
package proxy
import (
"github.com/btcsuite/btcd/btcec/v2"
"github.com/lightningnetwork/lnd/peer"
)
// MessageConnWithPubkey matches both *brontide.Conn and *Conn to read and
// write messages as well as tell the identity of the connection's remote peer.
//
// TODO(aakselrod): Move this to peer package?
type MessageConnWithPubkey interface {
// Inherit from net.Conn
peer.MessageConn
// LocalPub returns the connection's identity for the local peer.
LocalPub() *btcec.PublicKey
// RemotePub returns the connection's identity for the remote peer.
RemotePub() *btcec.PublicKey
}

439
brontide/proxy/mux.go Normal file
View file

@ -0,0 +1,439 @@
package proxy
import (
"context"
"fmt"
"net"
"sync"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/pool"
"golang.org/x/sync/errgroup"
)
// connKey is an 8-byte key to identify connections multiplexed over a single
// connection between two muxes. In a special case, an all-zeroes connKey is
// a signal (right now, only supported to announce a new connection).
type connKey [8]byte
// pubKeyToConnKey takes the last 8 bytes of a public key and copies then to
// a connKey object.
//
// TODO(aakselrod): use a better derivation of connection key from pubkey like
// SipHash64 or just use the whole pubkey?
func pubKeyToConnKey(key *btcec.PublicKey) connKey {
var ck connKey
keyBytes := key.SerializeCompressed()
copy(ck[:], keyBytes[len(keyBytes)-8:])
return ck
}
// Mux multiplexes a set of connections over a single proxy connection. It sends
// an entire packet.
type Mux struct {
// mtx is a mutex for async access.
mtx sync.RWMutex
// eg is an rror group to collect errors from goroutines if wanted.
eg *errgroup.Group
// ctx is the context.
ctx context.Context
// stop is the cancel function for clean shutdown.
stop context.CancelFunc
// started tracks whether we've started the handler.
started bool
// conn is a brontide connection to another mux for multiplexing the
// conns below.
conn *brontide.Conn
// conns is a map of connections by substring of pubkey. The interface
// can be either a *brontide.Conn or a *Conn.
conns map[connKey]MessageConnWithPubkey
// readPool is a pool of read buffers shared among all of the mux's
// connections.
readPool *pool.Read
// accept is a channel for accepting new connections from the connected
// mux. The public key is that of a newly connected peer.
accept chan *btcec.PublicKey
}
// NewMux creates a new Mux and populates it with initial values.
func NewMux(ctx context.Context, readPool *pool.Read,
conn *brontide.Conn) *Mux {
m := &Mux{}
// Assign the mux connection and read buffer pool.
m.conn = conn
m.readPool = readPool
// Derive a context with our own cancel signal.
m.ctx, m.stop = context.WithCancel(ctx)
// Derive a new error group from the context above.
m.eg, m.ctx = errgroup.WithContext(m.ctx)
// Initialize the conns map and accept channel.
m.conns = make(map[connKey]MessageConnWithPubkey)
m.accept = make(chan *btcec.PublicKey)
return m
}
// Start launches the mux's event loop goroutine.
func (m *Mux) Start() error {
// Ensure we lock the mutex to modify the state.
m.mtx.Lock()
defer m.mtx.Unlock()
if m.started {
return fmt.Errorf("mux already running")
}
// Set to started.
m.started = true
m.eg.Go(m.eventLoop)
return nil
}
// Accept is used by the node mux to listen for p2p connections through the
// proxy mux to which it's connected. It's part of the net.Listener interface.
func (m *Mux) Accept() (net.Conn, error) {
for {
select {
// Wait for a message on the accept channel.
case pubKey, ok := <-m.accept:
// Channel closed, return error
if !ok {
return nil, net.ErrClosed
}
// Got a new pubkey, create the Conn objects we'll
// need. First, we need virtual piped connections.
ourConn, theirConn := net.Pipe()
// Create a connection for the mux to handle.
conn := &Conn{
Conn: ourConn,
localPub: m.conn.LocalPub(),
remotePub: pubKey,
readPool: m.readPool,
}
// Register the connection with the mux.
m.mtx.Lock()
err := m.add(conn)
m.mtx.Unlock()
if err != nil {
return nil, err
}
// Return a connection for the caller to use.
return &Conn{
Conn: theirConn,
localPub: conn.localPub,
remotePub: pubKey,
readPool: m.readPool,
}, nil
// Our context is cancelled, time to exit.
case <-m.ctx.Done():
return nil, net.ErrClosed
}
}
}
// Close closes the mux, stops the handler goroutine, closes any connections
// used by the mux, and stops their goroutines as well. It is part of the
// net.Listener interface.
func (m *Mux) Close() error {
m.mtx.Lock()
defer m.mtx.Unlock()
// We're not started.
if !m.started {
return fmt.Errorf("mux isn't running")
}
// Stop all of the connections' goroutines and delete them from the map.
for k := range m.conns {
m.stopConn(k)
}
// Close the mux connection and zero its pointer.
m.conn.Close()
m.conn = nil
// Let future callers know the mux isn't running.
m.started = false
return nil
}
// Addr returns the address of the mux's local connection. It is part of the
// net.Listener interface.
//
// TODO(aakselrod): should we specify this on init to return the proxy's
// listen address instead?
func (m *Mux) Addr() net.Addr {
return m.conn.LocalAddr()
}
// eventLoop handles messages coming in over the mux connection and
// de-multiplexes them to other connections. It also handles signals for the
// node mux that the proxy mux has added an incoming peer connection by
// signaling over the accept channel.
func (m *Mux) eventLoop() error {
// On event loop exit, stop all of the connections
defer func() {
m.Close()
}()
m.mtx.RLock()
muxConn := m.conn
m.mtx.RUnlock()
if muxConn == nil {
return net.ErrClosed
}
// Start the event loop.
for {
// TODO(aakselrod): handle oversized messages, fragmentation,
// timeouts, partial reads, etc.
l, err := muxConn.ReadNextHeader()
if err != nil {
return err
}
pkt := make([]byte, l)
pkt, err = muxConn.ReadNextBody(pkt)
if err != nil {
return err
}
// TODO(aakselrod): more robust signaling..
if len(pkt) < 9 {
// We expect 8 bytes for the connection key and at
// least 1 byte of data here.
return fmt.Errorf("mux requires at least 9 bytes, "+
"got %d", len(pkt))
}
// Get the connection key for this message from the first 8
// bytes.
var key connKey
copy(key[:], pkt[:8])
// If key is all zeroes, we're opening a connection to a new
// peer, so next 33 bytes are the peer pubkey.
//
// TODO(aakselrod): more robust signaling.
if (key == connKey{}) {
if len(pkt) != btcec.PubKeyBytesLenCompressed+8 {
// We expect exactly 33+8=41 bytes, otherwise
// ignore the message
continue
}
// The first 8 bytes are zeroes as a signal that the
// message is from the peer mux, not a multiplexed
// connection. The rest of the message is a pubkey for
// adding a new connection.
//
// TODO(aakselrod): handle more types of messages,
// perhaps dial-out messages or other useful things
// that proxies do.
pubKey, err := btcec.ParsePubKey(pkt[8:])
if err != nil {
return fmt.Errorf("error parsing pubkey: %v",
err)
}
// Derive the connKey.
key = pubKeyToConnKey(pubKey)
// Check if a connection already exists under this
// pubkey. If so, it could be a duplicate or a
// reconnection attempt.
m.mtx.RLock()
_, connected := m.conns[key]
m.mtx.RUnlock()
if connected {
// We already have a proxied connection for
// this key, so we wait for the next one.
//
// TODO(aakselrod): handle reconnects. Some
// handling already exists in the Add function.
continue
}
select {
// Send the public key to an Accept() running somewhere.
//
// TODO(aakselrod): use better notifications or at
// least handle the case where there isn't an Accept()
// running.
case m.accept <- pubKey:
continue
// Time to quit, our context is cancelled.
case <-m.ctx.Done():
return nil
}
}
// Got the connection key in the message, so look up the
// matching connection by key.
m.mtx.RLock()
conn, ok := m.conns[key]
m.mtx.RUnlock()
if !ok {
// Drop packet, it doesn't match a connection that we
// have..
//
// TODO(aakselrod): smarter handling. Perhaps we signal
// a new connection just by sending a message relating
// to its public key instead of an all-zeroes connKey?
continue
}
n, err := conn.Write(pkt[8:])
if err != nil || n < len(pkt)-8 {
m.mtx.Lock()
m.stopConn(key)
m.mtx.Unlock()
}
}
}
// stopConn stops a connection and must be run with the read-write lock
// acquired.
func (m *Mux) stopConn(key connKey) {
conn, ok := m.conns[key]
delete(m.conns, key)
if ok && conn != nil {
conn.Close()
}
}
// add is used to add and start handling a new connection and must be run with
// the read-write lock acquired.
func (m *Mux) add(conn MessageConnWithPubkey) error {
// Check if mux is started.
if !m.started {
return fmt.Errorf("can't add connection to mux that's " +
"not running")
}
// Derive connection key from public key.
key := pubKeyToConnKey(conn.RemotePub())
// In case we're replacing a stale connection.
//
// TODO(aakselrod): robust handling of reconnects.
m.stopConn(key)
// Add connection to map by connKey.
m.conns[key] = conn
// Start a goroutine for the mux to handle the newly added connection.
m.eg.Go(func() error { return m.handleConn(key, conn) })
return nil
}
// Add adds a new connection to be multiplexed through the multiplexer. It can
// be either a *Conn or a *brontide.Conn.
func (m *Mux) Add(conn MessageConnWithPubkey) error {
m.mtx.Lock()
defer m.mtx.Unlock()
err := m.add(conn)
if err != nil {
return err
}
// Signal the peer mux that we've got a new connection. First, get the
// key bytes.
//
// TODO(aakselrod): use a separate mutex for writing to the mux
// connection.
err = m.conn.WriteMessage(append(make([]byte, 8),
conn.RemotePub().SerializeCompressed()...))
if err != nil {
return err
}
_, err = m.conn.Flush()
if err != nil {
return err
}
return nil
}
// handleConn runs in a goroutine to multiplex a connection through the
// mux.
func (m *Mux) handleConn(key connKey, conn MessageConnWithPubkey) error {
// Stop connection on exit
defer func() {
m.mtx.Lock()
m.stopConn(key)
m.mtx.Unlock()
}()
for {
// Read a message from the connection.
//
// TODO(aakselrod): handle oversized messages, fragmentation,
// timeouts, partial reads, etc.
n, err := conn.ReadNextHeader()
if err != nil {
return err
}
buf := make([]byte, n)
buf, err = conn.ReadNextBody(buf)
if err != nil {
return err
}
// Multiplex the message through the mux connection.
//
// TODO(aakselrod): use a separate mutex for writing to the
// mux connection.
m.mtx.Lock()
err = m.conn.WriteMessage(append(key[:], buf...))
if err != nil {
m.mtx.Unlock()
return err
}
_, err = m.conn.Flush()
if err != nil {
m.mtx.Unlock()
return err
}
m.mtx.Unlock()
}
}

View file

@ -0,0 +1,464 @@
package proxy_test
import (
"context"
"encoding/hex"
"fmt"
"net"
"testing"
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/brontide/proxy"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lntest"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/pool"
"github.com/stretchr/testify/require"
)
const (
// p2pPrivStr is the hex private key of the node in a test context.
// It is also copied to the proxy so it can pretend to be the node.
p2pPrivStr = "677f456999a1ec3db53f4cb51e64db15941a37aaeb67eaed018bb6f637a933ed"
// from the node.
proxyPrivStr = "7568a50fe45d3681e66e915a1ded0e39e92a512f8278fae6540e57868533c093"
// peer1PrivStr is the hex private key of a peer that connects to the
// proxy as if it's connecting directly to the node.
peer1PrivStr = "7568a507345d3681e66e915a1ded0e39e92a512f8278fae6540e57868533c093"
// peer1PrivStr is the hex private key of a peer that connects to the
// proxy as if it's connecting directly to the node.
peer2PrivStr = "7568a50fe81d3681e66e915a1ded0e39e92a512f8278fae6540e57868533c093"
)
// proxyTestContext is a struct which holds required structs for running proxy
// tests.
type proxyTestContext struct {
// Inherit from context.Context.
context.Context
// cancel is the cancel function for this context.
cancel context.CancelFunc
// t is the test context
t *testing.T
// p2pAddrStr is the address on which the proxy listens for p2p
// connections.
p2pAddrStr string
// proxyAddrStr is the address on which the proxy listens for a
// connection from the node.
proxyAddrStr string
// p2pPrivKey is the node private key which is also exported to the
// proxy for impersonation.
p2pPrivKey *btcec.PrivateKey
// p2pPubKey is the node public key derived from the private key above.
// It is used for the node to identify itself to the proxy and for the
// proxy to impersonate the node to peers.
p2pPubKey *btcec.PublicKey
// proxyPrivKey is the proxy private key used for communication with
// the node.
proxyPrivKey *btcec.PrivateKey
// proxyPubKey is the proxy public key derived from the private key
// above. It is used for the proxy to identify itself to the node so
// the node can trust it to proxy traffic.
proxyPubKey *btcec.PublicKey
// p2pPrivECDH is used to connect the node mux to the proxy mux
// as well as start the p2pListener below.
p2pPrivECDH keychain.SingleKeyECDH
// p2pListener listens for connections from peers. It impersonates
// a listener that the node would bring up by using the node key.
p2pListener *brontide.Listener
// proxyListener listens for mux connections from the node. It uses its
// own keypair to identify itself to the node.
proxyListener *brontide.Listener
// p2pNetAddr represents the network address, including public key,
// IP address/hostname, and TCP port, for peer connections.
p2pNetAddr *lnwire.NetAddress
// proxyNetAddr represents the network address, including public key,
// IP address/hostname, and TCP port, for the node connection to the
// proxy.
proxyNetAddr *lnwire.NetAddress
// readPool is a read buffer pool.
readPool *pool.Read
// proxyMux runs on the proxy side. It starts with a connection from
// the node on the proxyListener, and then connections are added to
// it as they're accepted on p2pListener.
proxyMux *proxy.Mux
// nodeMux runs on the node side. It connects to the proxyMux using
// a single brontide connection. All of the peer connections are
// multiplexed over the single connection between the node and proxy.
nodeMux *proxy.Mux
}
// newProxyTestContext creates a new proxy test context and populates it with
// an initial node and proxy mux. It starts them running but does not create
// any peers.
func newProxyTestContext(t *testing.T) (*proxyTestContext, error) {
t.Helper()
// Create context with a cancel function.
ctx, cancel := context.WithCancel(context.Background())
// Generate random ports to listen on.
p2pAddrStr := fmt.Sprintf("localhost:%d", lntest.NextAvailablePort())
proxyAddrStr := fmt.Sprintf("localhost:%d", lntest.NextAvailablePort())
// Get keys from hex constants.
p2pPrivKey, p2pPubKey, err := hexToKey(p2pPrivStr)
require.NoError(t, err, "error parsing p2p key")
proxyPrivKey, proxyPubKey, err := hexToKey(proxyPrivStr)
require.NoError(t, err, "error parsing proxy key")
// Wrap keys for ECDH.
var p2pPrivECDH keychain.SingleKeyECDH = &keychain.PrivKeyECDH{
PrivKey: p2pPrivKey,
}
var proxyPrivECDH keychain.SingleKeyECDH = &keychain.PrivKeyECDH{
PrivKey: proxyPrivKey,
}
// Start the listeners on the required ports.
p2pListener, err := brontide.NewListener(p2pPrivECDH, p2pAddrStr)
require.NoError(t, err, "couldn't start p2p listener")
proxyListener, err := brontide.NewListener(proxyPrivECDH, proxyAddrStr)
require.NoError(t, err, "couldn't start proxy listener")
// Construct the network addresses for testing.
p2pNetAddr := &lnwire.NetAddress{
IdentityKey: p2pPubKey,
Address: p2pListener.Addr(),
ChainNet: wire.SimNet,
}
proxyNetAddr := &lnwire.NetAddress{
IdentityKey: proxyPubKey,
Address: proxyListener.Addr(),
ChainNet: wire.SimNet,
}
// Make a read buffer pool.
readPool := pool.NewRead(
pool.NewReadBuffer(
pool.DefaultReadBufferGCInterval,
pool.DefaultReadBufferExpiryInterval,
),
5,
pool.DefaultWorkerTimeout,
)
err = readPool.Start()
require.NoError(t, err, "couldn't start read pool")
// Construct the test context object.
c := &proxyTestContext{
ctx,
cancel,
t,
p2pAddrStr,
proxyAddrStr,
p2pPrivKey,
p2pPubKey,
proxyPrivKey,
proxyPubKey,
p2pPrivECDH,
p2pListener,
proxyListener,
p2pNetAddr,
proxyNetAddr,
readPool,
nil,
nil,
}
// Now we start the handler goroutines and return the test
// context.
return c, c.handleMuxes()
}
// handleMuxes connects and starts the node and proxy muxes, and starts a
// goroutine that accepts connections over the p2pListener and adds them to
// the proxyMux, thus signaling the nodeMux. Dialing the p2pListener and
// accepting nodeMux connections is left up to tests themselves.
func (c *proxyTestContext) handleMuxes() error {
// doneChan lets goroutines signal when their mux is set up
doneChan := make(chan struct{})
// Start a goroutine to handle the proxy mux.
go func() {
// First, accept a connection from the node.
proxyMuxConn, err := c.proxyListener.Accept()
require.NoError(c.t, err,
"accepting proxyMux connection failed")
// Cast as *brontide.Conn
brontideConn, ok := proxyMuxConn.(*brontide.Conn)
require.True(c.t, ok, "couldn't cast proxyMuxConn as "+
"*brontide.Conn")
// Authenticate the node connecting to the proxy.
require.Equal(c.t, brontideConn.RemotePub(), c.p2pPubKey)
// Spin up the proxy mux.
c.proxyMux = proxy.NewMux(c, c.readPool, brontideConn)
err = c.proxyMux.Start()
require.NoError(c.t, err, "starting proxyMux failed")
// Signal the proxy mux is started.
select {
case doneChan <- struct{}{}:
case <-c.Done():
return
}
// Accept connections on the p2pListener and add them to the
// proxy mux. The proxy mux will signal the node mux about
// the new connection.
for {
// Accept a connection.
p2pConn, err := c.p2pListener.Accept()
// If the listener is closed, it's time to exit. Any
// other error is a failure.
if err != nil && err.Error() ==
"brontide connection closed" {
return
}
require.NoError(c.t, err, "accepting p2p "+
"connection failed")
// Add it to the mux.
err = c.proxyMux.Add(p2pConn.(*brontide.Conn))
require.NoError(c.t, err, "adding p2p connection to "+
"proxyMux failed")
}
}()
// Dial the proxyListener and set up the node mux on the resulting
// connection. This automatically authenticates the pubkey of the
// proxyListener.
nodeMuxConn, err := brontide.Dial(c.p2pPrivECDH, c.proxyNetAddr,
time.Second, net.DialTimeout)
require.NoError(c.t, err, "connecting to proxy mux failed")
c.nodeMux = proxy.NewMux(c, c.readPool, nodeMuxConn)
err = c.nodeMux.Start()
require.NoError(c.t, err, "starting nodeMux failed")
// Ensure the proxyMux has also started.
select {
case <-doneChan:
case <-c.Done():
return fmt.Errorf("context ended early")
}
return nil
}
// stop cleanly stops all the goroutines under the proxyTestContext.
func (c *proxyTestContext) stop() {
c.cancel()
c.proxyListener.Close()
c.p2pListener.Close()
c.proxyMux.Close()
c.nodeMux.Close()
require.NoError(c.t, c.readPool.Stop())
}
// TestProxy creates a test context with a node mux and proxy mux, connects
// two peers to the proxy, and ensures the node sees them connect and can
// send data to and receive data from them.
func TestProxy(t *testing.T) {
// Create the proxy test context.
c, err := newProxyTestContext(t)
require.NoError(t, err)
require.NotNil(t, c)
defer c.stop()
// Derive keys for the peers from their hex constants.
peer1PrivKey, peer1PubKey, err := hexToKey(peer1PrivStr)
require.NoError(t, err, "error parsing peer1 key")
peer2PrivKey, peer2PubKey, err := hexToKey(peer2PrivStr)
require.NoError(t, err, "error parsing peer2 key")
// Wrap keys for ECDH.
var peer1PrivECDH keychain.SingleKeyECDH = &keychain.PrivKeyECDH{
PrivKey: peer1PrivKey,
}
var peer2PrivECDH keychain.SingleKeyECDH = &keychain.PrivKeyECDH{
PrivKey: peer2PrivKey,
}
doneChan := make(chan struct{})
var peer1Conn, peer2Conn *brontide.Conn
var nodePeer1Conn, nodePeer2Conn *proxy.Conn
// Accept two connections at the node, one per peer, in a goroutine.
go func() {
conn, err := c.nodeMux.Accept()
require.NoError(t, err, "accepting nodePeer1Conn failed")
nodePeer1Conn = conn.(*proxy.Conn)
conn, err = c.nodeMux.Accept()
require.NoError(t, err, "accepting nodePeer2Conn failed")
nodePeer2Conn = conn.(*proxy.Conn)
select {
case doneChan <- struct{}{}:
case <-c.Done():
}
}()
// Connect the first peer to the proxy, which should allow the above
// goroutine to populate nodePeer1.
peer1Conn, err = brontide.Dial(peer1PrivECDH, c.p2pNetAddr,
time.Second, net.DialTimeout)
require.NoError(t, err, "connecting peer 1 to p2p proxy failed")
defer peer1Conn.Close()
// Connect the second peer to the proxy, which should allow the above
// goroutine to populate nodePeer2.
peer2Conn, err = brontide.Dial(peer2PrivECDH, c.p2pNetAddr,
time.Second, net.DialTimeout)
require.NoError(t, err, "connecting peer 1 to p2p proxy failed")
defer peer2Conn.Close()
// Await for nodePeer1 and nodePeer2 to populate.
select {
case <-doneChan:
case <-c.Done():
t.Fatalf("context ended early")
}
defer nodePeer1Conn.Close()
defer nodePeer2Conn.Close()
// Ensure everyone's got each other's correct public keys.
require.Equal(t, *(peer1Conn.LocalPub()),
*(nodePeer1Conn.RemotePub()), *peer1PubKey)
require.Equal(t, *(peer2Conn.LocalPub()),
*(nodePeer2Conn.RemotePub()), *peer2PubKey)
// Send test messages between the node and each peer through the proxy.
//
// TODO(aakselrod): Test reconnections, large packets, etc.
testCases := []struct {
from proxy.MessageConnWithPubkey
to proxy.MessageConnWithPubkey
}{
{
// From node to peer 1.
from: nodePeer1Conn,
to: peer1Conn,
},
{
// From peer 1 to node.
from: peer1Conn,
to: nodePeer1Conn,
},
{
// From node to peer 2.
from: nodePeer2Conn,
to: peer2Conn,
},
{
// From peer 2 to node.
from: peer2Conn,
to: nodePeer2Conn,
},
}
for i, testCase := range testCases {
// Create a unique string for data to send across the wire.
buf := []byte(fmt.Sprintf("case %d", i))
// Set up a goroutine to receive a message and check that
// it's the one we expect.
go func() {
// Receive the data.
n, err := testCase.to.ReadNextHeader()
require.NoError(t, err, "couldn't read header")
buf2 := make([]byte, n)
buf2, err = testCase.to.ReadNextBody(buf2)
require.NoError(t, err, "couldn't read body")
// Make sure it's the data we expected.
require.Equal(t, buf, buf2)
// Signal that we're done and exit.
select {
case doneChan <- struct{}{}:
case <-c.Done():
}
}()
// Write a message and flush it.
err := testCase.from.WriteMessage(buf)
require.NoError(t, err, "couldn't write message")
_, err = testCase.from.Flush()
require.NoError(t, err, "couldn't flush")
// Wait for the message to be received and checked.
select {
case <-doneChan:
case <-c.Done():
return
}
}
}
// hexToKey returns a private key decoded from a hex string, a public key
// derived from the private key, and an error if unsuccessful.
func hexToKey(keyStr string) (*btcec.PrivateKey, *btcec.PublicKey, error) {
keyBytes, err := hex.DecodeString(keyStr)
if err != nil {
return nil, nil, err
}
privKey, pubKey := btcec.PrivKeyFromBytes(keyBytes)
return privKey, pubKey, nil
}