Merge pull request #8631 from Chinwendu20/brontidetest

[code-health]: reduce duplication in brontide test.
This commit is contained in:
Oliver Gugger 2024-05-21 12:46:16 +02:00 committed by GitHub
commit 01e3ac61d8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 405 additions and 430 deletions

View file

@ -6,22 +6,19 @@ import (
"testing"
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lntest/mock"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chancloser"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/pool"
"github.com/stretchr/testify/require"
)
@ -39,20 +36,14 @@ var (
func TestPeerChannelClosureShutdownResponseLinkRemoved(t *testing.T) {
t.Parallel()
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
broadcastTxChan := make(chan *wire.MsgTx)
mockSwitch := &mockMessageSwitch{}
alicePeer, bobChan, err := createTestPeer(
t, notifier, broadcastTxChan, noUpdate, mockSwitch,
)
harness, err := createTestPeerWithChannel(t, noUpdate)
require.NoError(t, err, "unable to create test channels")
var (
alicePeer = harness.peer
bobChan = harness.channel
)
chanPoint := bobChan.ChannelPoint()
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
@ -87,20 +78,17 @@ func TestPeerChannelClosureShutdownResponseLinkRemoved(t *testing.T) {
func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) {
t.Parallel()
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
broadcastTxChan := make(chan *wire.MsgTx)
mockSwitch := &mockMessageSwitch{}
alicePeer, bobChan, err := createTestPeer(
t, notifier, broadcastTxChan, noUpdate, mockSwitch,
)
harness, err := createTestPeerWithChannel(t, noUpdate)
require.NoError(t, err, "unable to create test channels")
var (
alicePeer = harness.peer
bobChan = harness.channel
mockSwitch = harness.mockSwitch
broadcastTxChan = harness.publishTx
notifier = harness.notifier
)
chanPoint := bobChan.ChannelPoint()
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
@ -192,20 +180,17 @@ func TestPeerChannelClosureAcceptFeeResponder(t *testing.T) {
func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) {
t.Parallel()
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
broadcastTxChan := make(chan *wire.MsgTx)
mockSwitch := &mockMessageSwitch{}
alicePeer, bobChan, err := createTestPeer(
t, notifier, broadcastTxChan, noUpdate, mockSwitch,
)
harness, err := createTestPeerWithChannel(t, noUpdate)
require.NoError(t, err, "unable to create test channels")
var (
bobChan = harness.channel
alicePeer = harness.peer
mockSwitch = harness.mockSwitch
broadcastTxChan = harness.publishTx
notifier = harness.notifier
)
chanPoint := bobChan.ChannelPoint()
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
mockLink := newMockUpdateHandler(chanID)
@ -316,20 +301,17 @@ func TestPeerChannelClosureAcceptFeeInitiator(t *testing.T) {
func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) {
t.Parallel()
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
broadcastTxChan := make(chan *wire.MsgTx)
mockSwitch := &mockMessageSwitch{}
alicePeer, bobChan, err := createTestPeer(
t, notifier, broadcastTxChan, noUpdate, mockSwitch,
)
harness, err := createTestPeerWithChannel(t, noUpdate)
require.NoError(t, err, "unable to create test channels")
var (
bobChan = harness.channel
alicePeer = harness.peer
mockSwitch = harness.mockSwitch
broadcastTxChan = harness.publishTx
notifier = harness.notifier
)
chanPoint := bobChan.ChannelPoint()
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
@ -503,20 +485,17 @@ func TestPeerChannelClosureFeeNegotiationsResponder(t *testing.T) {
func TestPeerChannelClosureFeeNegotiationsInitiator(t *testing.T) {
t.Parallel()
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
broadcastTxChan := make(chan *wire.MsgTx)
mockSwitch := &mockMessageSwitch{}
alicePeer, bobChan, err := createTestPeer(
t, notifier, broadcastTxChan, noUpdate, mockSwitch,
)
harness, err := createTestPeerWithChannel(t, noUpdate)
require.NoError(t, err, "unable to create test channels")
var (
alicePeer = harness.peer
bobChan = harness.channel
mockSwitch = harness.mockSwitch
broadcastTxChan = harness.publishTx
notifier = harness.notifier
)
chanPoint := bobChan.ChannelPoint()
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
mockLink := newMockUpdateHandler(chanID)
@ -830,31 +809,27 @@ func TestCustomShutdownScript(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
broadcastTxChan := make(chan *wire.MsgTx)
mockSwitch := &mockMessageSwitch{}
// Open a channel.
alicePeer, bobChan, err := createTestPeer(
t, notifier, broadcastTxChan, test.update,
mockSwitch,
harness, err := createTestPeerWithChannel(
t, test.update,
)
if err != nil {
t.Fatalf("unable to create test channels: %v", err)
}
var (
alicePeer = harness.peer
bobChan = harness.channel
mockSwitch = harness.mockSwitch
)
chanPoint := bobChan.ChannelPoint()
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
mockLink := newMockUpdateHandler(chanID)
mockSwitch.links = append(mockSwitch.links, mockLink)
// Request initiator to cooperatively close the channel, with
// a specified delivery address.
// Request initiator to cooperatively close the channel,
// with a specified delivery address.
updateChan := make(chan interface{}, 1)
errChan := make(chan error, 1)
closeCommand := htlcswitch.ChanClose{
@ -1000,35 +975,23 @@ func TestStaticRemoteDowngrade(t *testing.T) {
test := test
t.Run(test.name, func(t *testing.T) {
writeBufferPool := pool.NewWriteBuffer(
pool.DefaultWriteBufferGCInterval,
pool.DefaultWriteBufferExpiryInterval,
params := createTestPeer(t)
var (
p = params.peer
mockConn = params.mockConn
writePool = p.cfg.WritePool
)
writePool := pool.NewWrite(
writeBufferPool, 1, timeout,
)
require.NoError(t, writePool.Start())
mockConn := newMockConn(t, 1)
p := Brontide{
cfg: Config{
LegacyFeatures: legacy,
Features: test.features,
Conn: mockConn,
WritePool: writePool,
PongBuf: make([]byte, lnwire.MaxPongBytes),
},
log: peerLog,
}
// Set feature bits.
p.cfg.LegacyFeatures = legacy
p.cfg.Features = test.features
var b bytes.Buffer
_, err := lnwire.WriteMessage(&b, test.expectedInit, 0)
require.NoError(t, err)
// Send our init message, assert that we write our expected message
// and shutdown our write pool.
// Send our init message, assert that we write our
// expected message and shutdown our write pool.
require.NoError(t, p.sendInitMsg(test.legacy))
mockConn.assertWrite(b.Bytes())
require.NoError(t, writePool.Stop())
@ -1056,101 +1019,20 @@ func genScript(t *testing.T, address string) lnwire.DeliveryAddress {
func TestPeerCustomMessage(t *testing.T) {
t.Parallel()
// Set up node Alice.
dbAlice, err := channeldb.Open(t.TempDir())
params := createTestPeer(t)
var (
mockConn = params.mockConn
alicePeer = params.peer
receivedCustomChan = params.customChan
remoteKey = alicePeer.PubKey()
)
// Start peer.
startPeerDone := startPeer(t, mockConn, alicePeer)
_, err := fn.RecvOrTimeout(startPeerDone, 2*timeout)
require.NoError(t, err)
aliceKey, err := btcec.NewPrivateKey()
require.NoError(t, err)
writeBufferPool := pool.NewWriteBuffer(
pool.DefaultWriteBufferGCInterval,
pool.DefaultWriteBufferExpiryInterval,
)
writePool := pool.NewWrite(
writeBufferPool, 1, timeout,
)
require.NoError(t, writePool.Start())
readBufferPool := pool.NewReadBuffer(
pool.DefaultReadBufferGCInterval,
pool.DefaultReadBufferExpiryInterval,
)
readPool := pool.NewRead(
readBufferPool, 1, timeout,
)
require.NoError(t, readPool.Start())
mockConn := newMockConn(t, 1)
receivedCustomChan := make(chan *customMsg)
remoteKey := [33]byte{8}
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
// TODO(yy): change ChannelNotifier to be an interface.
channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB())
require.NoError(t, channelNotifier.Start())
t.Cleanup(func() {
require.NoError(t, channelNotifier.Stop(),
"stop channel notifier failed")
})
alicePeer := NewBrontide(Config{
PubKeyBytes: remoteKey,
ChannelDB: dbAlice.ChannelStateDB(),
Addr: &lnwire.NetAddress{
IdentityKey: aliceKey.PubKey(),
},
PrunePersistentPeerConnection: func([33]byte) {},
Features: lnwire.EmptyFeatureVector(),
LegacyFeatures: lnwire.EmptyFeatureVector(),
WritePool: writePool,
ReadPool: readPool,
Conn: mockConn,
ChainNotifier: notifier,
HandleCustomMessage: func(
peer [33]byte, msg *lnwire.Custom) error {
receivedCustomChan <- &customMsg{
peer: peer,
msg: *msg,
}
return nil
},
PongBuf: make([]byte, lnwire.MaxPongBytes),
ChannelNotifier: channelNotifier,
})
// Set up the init sequence.
go func() {
// Read init message.
<-mockConn.writtenMessages
// Write the init reply message.
initReplyMsg := lnwire.NewInitMessage(
lnwire.NewRawFeatureVector(
lnwire.DataLossProtectRequired,
),
lnwire.NewRawFeatureVector(),
)
var b bytes.Buffer
_, err = lnwire.WriteMessage(&b, initReplyMsg, 0)
require.NoError(t, err)
mockConn.readMessages <- b.Bytes()
}()
// Start the peer.
require.NoError(t, alicePeer.Start())
// Send a custom message.
customMsg, err := lnwire.NewCustom(
lnwire.MessageType(40000), []byte{1, 2, 3},
@ -1185,21 +1067,12 @@ func TestUpdateNextRevocation(t *testing.T) {
require := require.New(t)
// TODO(yy): create interface for lnwallet.LightningChannel so we can
// easily mock it without the following setups.
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
broadcastTxChan := make(chan *wire.MsgTx)
mockSwitch := &mockMessageSwitch{}
alicePeer, bobChan, err := createTestPeer(
t, notifier, broadcastTxChan, noUpdate, mockSwitch,
)
harness, err := createTestPeerWithChannel(t, noUpdate)
require.NoError(err, "unable to create test channels")
bobChan := harness.channel
alicePeer := harness.peer
// testChannel is used to test the updateNextRevocation function.
testChannel := bobChan.State()
@ -1412,30 +1285,22 @@ func TestHandleRemovePendingChannel(t *testing.T) {
func TestStartupWriteMessageRace(t *testing.T) {
t.Parallel()
// Set up parameters for createTestPeer.
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
broadcastTxChan := make(chan *wire.MsgTx)
mockSwitch := &mockMessageSwitch{}
// Use a callback to extract the channel created by createTestPeer, so
// we can mark it borked below. We can't mark it borked within the
// callback, since the channel hasn't been saved to the DB yet when the
// callback executes.
// Use a callback to extract the channel created by
// createTestPeerWithChannel, so we can mark it borked below.
// We can't mark it borked within the callback, since the channel hasn't
// been saved to the DB yet when the callback executes.
var channel *channeldb.OpenChannel
getChannels := func(a, b *channeldb.OpenChannel) {
channel = a
}
// createTestPeer creates a peer and a channel with that peer.
peer, _, err := createTestPeer(
t, notifier, broadcastTxChan, getChannels, mockSwitch,
)
// createTestPeerWithChannel creates a peer and a channel with that
// peer.
harness, err := createTestPeerWithChannel(t, getChannels)
require.NoError(t, err, "unable to create test channel")
peer := harness.peer
// Avoid the need to mock the channel graph by marking the channel
// borked. Borked channels still get a reestablish message sent on
// reconnect, while skipping channel graph checks and link creation.
@ -1445,58 +1310,21 @@ func TestStartupWriteMessageRace(t *testing.T) {
mockConn := newMockConn(t, 2)
peer.cfg.Conn = mockConn
// Set up other configuration necessary to successfully execute
// peer.Start().
peer.cfg.LegacyFeatures = lnwire.EmptyFeatureVector()
writeBufferPool := pool.NewWriteBuffer(
pool.DefaultWriteBufferGCInterval,
pool.DefaultWriteBufferExpiryInterval,
)
writePool := pool.NewWrite(
writeBufferPool, 1, timeout,
)
require.NoError(t, writePool.Start())
peer.cfg.WritePool = writePool
readBufferPool := pool.NewReadBuffer(
pool.DefaultReadBufferGCInterval,
pool.DefaultReadBufferExpiryInterval,
)
readPool := pool.NewRead(
readBufferPool, 1, timeout,
)
require.NoError(t, readPool.Start())
peer.cfg.ReadPool = readPool
// Send a message while starting the peer. As the peer starts up, it
// should not trigger a data race between the sending of this message
// and the sending of the channel reestablish message.
sendPingDone := make(chan struct{})
var sendPingDone = make(chan struct{})
go func() {
require.NoError(t, peer.SendMessage(true, lnwire.NewPing(0)))
close(sendPingDone)
}()
// Handle init messages.
go func() {
// Read init message.
<-mockConn.writtenMessages
// Write the init reply message.
initReplyMsg := lnwire.NewInitMessage(
lnwire.NewRawFeatureVector(
lnwire.DataLossProtectRequired,
),
lnwire.NewRawFeatureVector(),
)
var b bytes.Buffer
_, err = lnwire.WriteMessage(&b, initReplyMsg, 0)
require.NoError(t, err)
mockConn.readMessages <- b.Bytes()
}()
// Start the peer. No data race should occur.
require.NoError(t, peer.Start())
startPeerDone := startPeer(t, mockConn, peer)
// Ensure startup is complete.
_, err = fn.RecvOrTimeout(startPeerDone, 2*timeout)
require.NoError(t, err)
// Ensure messages were sent during startup.
<-sendPingDone
@ -1516,21 +1344,12 @@ func TestStartupWriteMessageRace(t *testing.T) {
func TestRemovePendingChannel(t *testing.T) {
t.Parallel()
// Set up parameters for createTestPeer.
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
broadcastTxChan := make(chan *wire.MsgTx)
mockSwitch := &mockMessageSwitch{}
// createTestPeer creates a peer and a channel with that peer.
peer, _, err := createTestPeer(
t, notifier, broadcastTxChan, noUpdate, mockSwitch,
)
// createTestPeerWithChannel creates a peer and a channel.
harness, err := createTestPeerWithChannel(t, noUpdate)
require.NoError(t, err, "unable to create test channel")
peer := harness.peer
// Add a pending channel to the peer Alice.
errChan := make(chan error, 1)
pendingChanID := lnwire.ChannelID{1}

View file

@ -18,6 +18,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/fn"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
@ -27,6 +28,7 @@ import (
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/shachain"
"github.com/stretchr/testify/require"
@ -48,32 +50,52 @@ var (
testKeyLoc = keychain.KeyLocator{Family: keychain.KeyFamilyNodeKey}
)
// noUpdate is a function which can be used as a parameter in createTestPeer to
// call the setup code with no custom values on the channels set up.
// noUpdate is a function which can be used as a parameter in
// createTestPeerWithChannel to call the setup code with no custom values on
// the channels set up.
var noUpdate = func(a, b *channeldb.OpenChannel) {}
// createTestPeer creates a channel between two nodes, and returns a peer for
// one of the nodes, together with the channel seen from both nodes. It takes
// an updateChan function which can be used to modify the default values on
// the channel states for each peer.
func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
publTx chan *wire.MsgTx, updateChan func(a, b *channeldb.OpenChannel),
mockSwitch *mockMessageSwitch) (
*Brontide, *lnwallet.LightningChannel, error) {
type peerTestCtx struct {
peer *Brontide
channel *lnwallet.LightningChannel
notifier *mock.ChainNotifier
publishTx <-chan *wire.MsgTx
mockSwitch *mockMessageSwitch
db *channeldb.DB
privKey *btcec.PrivateKey
mockConn *mockMessageConn
customChan chan *customMsg
chanStatusMgr *netann.ChanStatusManager
}
nodeKeyLocator := keychain.KeyLocator{
Family: keychain.KeyFamilyNodeKey,
}
aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes(
channels.AlicesPrivKey,
)
aliceKeySigner := keychain.NewPrivKeyMessageSigner(
aliceKeyPriv, nodeKeyLocator,
)
bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes(
channels.BobsPrivKey,
// createTestPeerWithChannel creates a channel between two nodes, and returns a
// peer for one of the nodes, together with the channel seen from both nodes.
// It takes an updateChan function which can be used to modify the default
// values on the channel states for each peer.
func createTestPeerWithChannel(t *testing.T, updateChan func(a,
b *channeldb.OpenChannel)) (*peerTestCtx, error) {
params := createTestPeer(t)
var (
publishTx = params.publishTx
mockSwitch = params.mockSwitch
alicePeer = params.peer
notifier = params.notifier
aliceKeyPriv = params.privKey
dbAlice = params.db
chanStatusMgr = params.chanStatusMgr
)
err := chanStatusMgr.Start()
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, chanStatusMgr.Stop())
})
aliceKeyPub := alicePeer.IdentityKey()
estimator := alicePeer.cfg.FeeEstimator
channelCapacity := btcutil.Amount(10 * 1e8)
channelBal := channelCapacity / 2
aliceDustLimit := btcutil.Amount(200)
@ -88,6 +110,10 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
}
fundingTxIn := wire.NewTxIn(prevOut, nil, nil)
bobKeyPriv, bobKeyPub := btcec.PrivKeyFromBytes(
channels.BobsPrivKey,
)
aliceCfg := channeldb.ChannelConfig{
ChannelConstraints: channeldb.ChannelConstraints{
DustLimit: aliceDustLimit,
@ -141,23 +167,23 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
bobRoot, err := chainhash.NewHash(bobKeyPriv.Serialize())
if err != nil {
return nil, nil, err
return nil, err
}
bobPreimageProducer := shachain.NewRevocationProducer(*bobRoot)
bobFirstRevoke, err := bobPreimageProducer.AtIndex(0)
if err != nil {
return nil, nil, err
return nil, err
}
bobCommitPoint := input.ComputeCommitmentPoint(bobFirstRevoke[:])
aliceRoot, err := chainhash.NewHash(aliceKeyPriv.Serialize())
if err != nil {
return nil, nil, err
return nil, err
}
alicePreimageProducer := shachain.NewRevocationProducer(*aliceRoot)
aliceFirstRevoke, err := alicePreimageProducer.AtIndex(0)
if err != nil {
return nil, nil, err
return nil, err
}
aliceCommitPoint := input.ComputeCommitmentPoint(aliceFirstRevoke[:])
@ -167,29 +193,20 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
isAliceInitiator, 0,
)
if err != nil {
return nil, nil, err
return nil, err
}
dbAlice, err := channeldb.Open(t.TempDir())
if err != nil {
return nil, nil, err
}
t.Cleanup(func() {
require.NoError(t, dbAlice.Close())
})
dbBob, err := channeldb.Open(t.TempDir())
if err != nil {
return nil, nil, err
return nil, err
}
t.Cleanup(func() {
require.NoError(t, dbBob.Close())
})
estimator := chainfee.NewStaticEstimator(12500, 0)
feePerKw, err := estimator.EstimateFeePerKW(1)
if err != nil {
return nil, nil, err
return nil, err
}
// TODO(roasbeef): need to factor in commit fee?
@ -214,7 +231,7 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
var chanIDBytes [8]byte
if _, err := io.ReadFull(crand.Reader, chanIDBytes[:]); err != nil {
return nil, nil, err
return nil, err
}
shortChanID := lnwire.NewShortChanIDFromInt(
@ -259,13 +276,9 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
// Set custom values on the channel states.
updateChan(aliceChannelState, bobChannelState)
aliceAddr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18555,
}
aliceAddr := alicePeer.cfg.Addr.Address
if err := aliceChannelState.SyncPending(aliceAddr, 0); err != nil {
return nil, nil, err
return nil, err
}
bobAddr := &net.TCPAddr{
@ -274,7 +287,7 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
}
if err := bobChannelState.SyncPending(bobAddr, 0); err != nil {
return nil, nil, err
return nil, err
}
aliceSigner := input.NewMockSigner(
@ -289,7 +302,7 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
aliceSigner, aliceChannelState, alicePool,
)
if err != nil {
return nil, nil, err
return nil, err
}
_ = alicePool.Start()
t.Cleanup(func() {
@ -301,116 +314,16 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
bobSigner, bobChannelState, bobPool,
)
if err != nil {
return nil, nil, err
return nil, err
}
_ = bobPool.Start()
t.Cleanup(func() {
require.NoError(t, bobPool.Stop())
})
chainIO := &mock.ChainIO{
BestHeight: broadcastHeight,
}
wallet := &lnwallet.LightningWallet{
WalletController: &mock.WalletController{
RootKey: aliceKeyPriv,
PublishedTransactions: publTx,
},
}
// If mockSwitch is not set by the caller, set it to the default as the
// caller does not need to control it.
if mockSwitch == nil {
mockSwitch = &mockMessageSwitch{}
}
nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner)
const chanActiveTimeout = time.Minute
chanStatusMgr, err := netann.NewChanStatusManager(&netann.ChanStatusConfig{
ChanStatusSampleInterval: 30 * time.Second,
ChanEnableTimeout: chanActiveTimeout,
ChanDisableTimeout: 2 * time.Minute,
DB: dbAlice.ChannelStateDB(),
Graph: dbAlice.ChannelGraph(),
MessageSigner: nodeSignerAlice,
OurPubKey: aliceKeyPub,
OurKeyLoc: testKeyLoc,
IsChannelActive: func(lnwire.ChannelID) bool { return true },
ApplyChannelUpdate: func(*lnwire.ChannelUpdate,
*wire.OutPoint, bool) error {
return nil
},
})
if err != nil {
return nil, nil, err
}
if err = chanStatusMgr.Start(); err != nil {
return nil, nil, err
}
errBuffer, err := queue.NewCircularBuffer(ErrorBufferSize)
if err != nil {
return nil, nil, err
}
var pubKey [33]byte
copy(pubKey[:], aliceKeyPub.SerializeCompressed())
cfgAddr := &lnwire.NetAddress{
IdentityKey: aliceKeyPub,
Address: aliceAddr,
ChainNet: wire.SimNet,
}
interceptableSwitchNotifier := &mock.ChainNotifier{
EpochChan: make(chan *chainntnfs.BlockEpoch, 1),
}
interceptableSwitchNotifier.EpochChan <- &chainntnfs.BlockEpoch{
Height: 1,
}
interceptableSwitch, err := htlcswitch.NewInterceptableSwitch(
&htlcswitch.InterceptableSwitchConfig{
CltvRejectDelta: testCltvRejectDelta,
CltvInterceptDelta: testCltvRejectDelta + 3,
Notifier: interceptableSwitchNotifier,
},
alicePeer.remoteFeatures = lnwire.NewFeatureVector(
nil, lnwire.Features,
)
if err != nil {
return nil, nil, err
}
// TODO(yy): change ChannelNotifier to be an interface.
channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB())
require.NoError(t, channelNotifier.Start())
t.Cleanup(func() {
require.NoError(t, channelNotifier.Stop(),
"stop channel notifier failed")
})
cfg := &Config{
Addr: cfgAddr,
PubKeyBytes: pubKey,
ErrorBuffer: errBuffer,
ChainIO: chainIO,
Switch: mockSwitch,
ChanActiveTimeout: chanActiveTimeout,
InterceptSwitch: interceptableSwitch,
ChannelDB: dbAlice.ChannelStateDB(),
FeeEstimator: estimator,
Wallet: wallet,
ChainNotifier: notifier,
ChanStatusMgr: chanStatusMgr,
Features: lnwire.NewFeatureVector(nil, lnwire.Features),
DisconnectPeer: func(b *btcec.PublicKey) error { return nil },
ChannelNotifier: channelNotifier,
}
alicePeer := NewBrontide(*cfg)
alicePeer.remoteFeatures = lnwire.NewFeatureVector(nil, lnwire.Features)
chanID := lnwire.NewChanIDFromOutPoint(channelAlice.ChannelPoint())
alicePeer.activeChannels.Store(chanID, channelAlice)
@ -418,7 +331,13 @@ func createTestPeer(t *testing.T, notifier chainntnfs.ChainNotifier,
alicePeer.wg.Add(1)
go alicePeer.channelManager()
return alicePeer, channelBob, nil
return &peerTestCtx{
peer: alicePeer,
channel: channelBob,
notifier: notifier,
publishTx: publishTx,
mockSwitch: mockSwitch,
}, nil
}
// mockMessageSwitch is a mock implementation of the messageSwitch interface
@ -618,3 +537,240 @@ func (m *mockMessageConn) LocalAddr() net.Addr {
func (m *mockMessageConn) Close() error {
return nil
}
// createTestPeer creates a new peer for testing and returns a context struct
// containing necessary handles and mock objects for conducting tests on peer
// functionalities.
func createTestPeer(t *testing.T) *peerTestCtx {
nodeKeyLocator := keychain.KeyLocator{
Family: keychain.KeyFamilyNodeKey,
}
aliceKeyPriv, aliceKeyPub := btcec.PrivKeyFromBytes(
channels.AlicesPrivKey,
)
aliceKeySigner := keychain.NewPrivKeyMessageSigner(
aliceKeyPriv, nodeKeyLocator,
)
aliceAddr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: 18555,
}
cfgAddr := &lnwire.NetAddress{
IdentityKey: aliceKeyPub,
Address: aliceAddr,
ChainNet: wire.SimNet,
}
errBuffer, err := queue.NewCircularBuffer(ErrorBufferSize)
require.NoError(t, err)
chainIO := &mock.ChainIO{
BestHeight: broadcastHeight,
}
publishTx := make(chan *wire.MsgTx)
wallet := &lnwallet.LightningWallet{
WalletController: &mock.WalletController{
RootKey: aliceKeyPriv,
PublishedTransactions: publishTx,
},
}
const chanActiveTimeout = time.Minute
dbAlice, err := channeldb.Open(t.TempDir())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, dbAlice.Close())
})
nodeSignerAlice := netann.NewNodeSigner(aliceKeySigner)
chanStatusMgr, err := netann.NewChanStatusManager(&netann.
ChanStatusConfig{
ChanStatusSampleInterval: 30 * time.Second,
ChanEnableTimeout: chanActiveTimeout,
ChanDisableTimeout: 2 * time.Minute,
DB: dbAlice.ChannelStateDB(),
Graph: dbAlice.ChannelGraph(),
MessageSigner: nodeSignerAlice,
OurPubKey: aliceKeyPub,
OurKeyLoc: testKeyLoc,
IsChannelActive: func(lnwire.ChannelID) bool {
return true
},
ApplyChannelUpdate: func(*lnwire.ChannelUpdate,
*wire.OutPoint, bool) error {
return nil
},
})
require.NoError(t, err)
interceptableSwitchNotifier := &mock.ChainNotifier{
EpochChan: make(chan *chainntnfs.BlockEpoch, 1),
}
interceptableSwitchNotifier.EpochChan <- &chainntnfs.BlockEpoch{
Height: 1,
}
interceptableSwitch, err := htlcswitch.NewInterceptableSwitch(
&htlcswitch.InterceptableSwitchConfig{
CltvRejectDelta: testCltvRejectDelta,
CltvInterceptDelta: testCltvRejectDelta + 3,
Notifier: interceptableSwitchNotifier,
},
)
require.NoError(t, err)
// TODO(yy): create interface for lnwallet.LightningChannel so we can
// easily mock it without the following setups.
notifier := &mock.ChainNotifier{
SpendChan: make(chan *chainntnfs.SpendDetail),
EpochChan: make(chan *chainntnfs.BlockEpoch),
ConfChan: make(chan *chainntnfs.TxConfirmation),
}
mockSwitch := &mockMessageSwitch{}
// TODO(yy): change ChannelNotifier to be an interface.
channelNotifier := channelnotifier.New(dbAlice.ChannelStateDB())
require.NoError(t, channelNotifier.Start())
t.Cleanup(func() {
require.NoError(t, channelNotifier.Stop(),
"stop channel notifier failed")
})
writeBufferPool := pool.NewWriteBuffer(
pool.DefaultWriteBufferGCInterval,
pool.DefaultWriteBufferExpiryInterval,
)
writePool := pool.NewWrite(
writeBufferPool, 1, timeout,
)
require.NoError(t, writePool.Start())
readBufferPool := pool.NewReadBuffer(
pool.DefaultReadBufferGCInterval,
pool.DefaultReadBufferExpiryInterval,
)
readPool := pool.NewRead(
readBufferPool, 1, timeout,
)
require.NoError(t, readPool.Start())
mockConn := newMockConn(t, 1)
receivedCustomChan := make(chan *customMsg)
var pubKey [33]byte
copy(pubKey[:], aliceKeyPub.SerializeCompressed())
estimator := chainfee.NewStaticEstimator(12500, 0)
cfg := &Config{
Addr: cfgAddr,
PubKeyBytes: pubKey,
ErrorBuffer: errBuffer,
ChainIO: chainIO,
Switch: mockSwitch,
ChanActiveTimeout: chanActiveTimeout,
InterceptSwitch: interceptableSwitch,
ChannelDB: dbAlice.ChannelStateDB(),
FeeEstimator: estimator,
Wallet: wallet,
ChainNotifier: notifier,
ChanStatusMgr: chanStatusMgr,
Features: lnwire.NewFeatureVector(
nil, lnwire.Features,
),
DisconnectPeer: func(b *btcec.PublicKey) error {
return nil
},
ChannelNotifier: channelNotifier,
PrunePersistentPeerConnection: func([33]byte) {},
LegacyFeatures: lnwire.EmptyFeatureVector(),
WritePool: writePool,
ReadPool: readPool,
Conn: mockConn,
HandleCustomMessage: func(
peer [33]byte, msg *lnwire.Custom) error {
receivedCustomChan <- &customMsg{
peer: peer,
msg: *msg,
}
return nil
},
PongBuf: make([]byte, lnwire.MaxPongBytes),
}
alicePeer := NewBrontide(*cfg)
return &peerTestCtx{
publishTx: publishTx,
mockSwitch: mockSwitch,
peer: alicePeer,
notifier: notifier,
db: dbAlice,
privKey: aliceKeyPriv,
mockConn: mockConn,
customChan: receivedCustomChan,
chanStatusMgr: chanStatusMgr,
}
}
// startPeer invokes the `Start` method on the specified peer and handles any
// initial startup messages for testing.
func startPeer(t *testing.T, mockConn *mockMessageConn,
peer *Brontide) <-chan struct{} {
// Start the peer in a goroutine so that we can handle and test for
// startup messages. Successfully sending and receiving init message,
// indicates a successful startup.
done := make(chan struct{})
go func() {
require.NoError(t, peer.Start())
close(done)
}()
// Receive the init message that should be the first message received on
// startup.
rawMsg, err := fn.RecvOrTimeout[[]byte](
mockConn.writtenMessages, timeout,
)
require.NoError(t, err)
msgReader := bytes.NewReader(rawMsg)
nextMsg, err := lnwire.ReadMessage(msgReader, 0)
require.NoError(t, err)
_, ok := nextMsg.(*lnwire.Init)
require.True(t, ok)
// Write the reply for the init message to complete the startup.
initReplyMsg := lnwire.NewInitMessage(
lnwire.NewRawFeatureVector(
lnwire.DataLossProtectRequired,
lnwire.GossipQueriesOptional,
),
lnwire.NewRawFeatureVector(),
)
var b bytes.Buffer
_, err = lnwire.WriteMessage(&b, initReplyMsg, 0)
require.NoError(t, err)
ok = fn.SendOrQuit[[]byte, struct{}](
mockConn.readMessages, b.Bytes(), make(chan struct{}),
)
require.True(t, ok)
return done
}