lnd/discovery/gossiper_test.go
2020-03-17 16:23:01 -07:00

3933 lines
109 KiB
Go

package discovery
import (
"bytes"
"encoding/hex"
"fmt"
"io/ioutil"
"math/big"
prand "math/rand"
"net"
"os"
"reflect"
"strings"
"sync"
"testing"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/netann"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/ticker"
)
var (
testAddr = &net.TCPAddr{IP: (net.IP)([]byte{0xA, 0x0, 0x0, 0x1}),
Port: 9000}
testAddrs = []net.Addr{testAddr}
testFeatures = lnwire.NewRawFeatureVector()
testSig = &btcec.Signature{
R: new(big.Int),
S: new(big.Int),
}
_, _ = testSig.R.SetString("63724406601629180062774974542967536251589935445068131219452686511677818569431", 10)
_, _ = testSig.S.SetString("18801056069249825825291287104931333862866033135609736119018462340006816851118", 10)
bitcoinKeyPriv1, _ = btcec.NewPrivateKey(btcec.S256())
bitcoinKeyPub1 = bitcoinKeyPriv1.PubKey()
nodeKeyPriv1, _ = btcec.NewPrivateKey(btcec.S256())
nodeKeyPub1 = nodeKeyPriv1.PubKey()
bitcoinKeyPriv2, _ = btcec.NewPrivateKey(btcec.S256())
bitcoinKeyPub2 = bitcoinKeyPriv2.PubKey()
nodeKeyPriv2, _ = btcec.NewPrivateKey(btcec.S256())
nodeKeyPub2 = nodeKeyPriv2.PubKey()
trickleDelay = time.Millisecond * 100
retransmitDelay = time.Hour * 1
proofMatureDelta uint32
// The test timestamp + rebroadcast interval makes sure messages won't
// be rebroadcasted automaticallty during the tests.
testTimestamp = uint32(1234567890)
rebroadcastInterval = time.Hour * 1000000
)
// makeTestDB creates a new instance of the ChannelDB for testing purposes. A
// callback which cleans up the created temporary directories is also returned
// and intended to be executed after the test completes.
func makeTestDB() (*channeldb.DB, func(), error) {
// First, create a temporary directory to be used for the duration of
// this test.
tempDirName, err := ioutil.TempDir("", "channeldb")
if err != nil {
return nil, nil, err
}
// Next, create channeldb for the first time.
cdb, err := channeldb.Open(tempDirName)
if err != nil {
return nil, nil, err
}
cleanUp := func() {
cdb.Close()
os.RemoveAll(tempDirName)
}
return cdb, cleanUp, nil
}
type mockSigner struct {
privKey *btcec.PrivateKey
}
func (n *mockSigner) SignMessage(pubKey *btcec.PublicKey,
msg []byte) (*btcec.Signature, error) {
if !pubKey.IsEqual(n.privKey.PubKey()) {
return nil, fmt.Errorf("unknown public key")
}
digest := chainhash.DoubleHashB(msg)
sign, err := n.privKey.Sign(digest)
if err != nil {
return nil, fmt.Errorf("can't sign the message: %v", err)
}
return sign, nil
}
type mockGraphSource struct {
bestHeight uint32
mu sync.Mutex
nodes []channeldb.LightningNode
infos map[uint64]channeldb.ChannelEdgeInfo
edges map[uint64][]channeldb.ChannelEdgePolicy
zombies map[uint64][][33]byte
}
func newMockRouter(height uint32) *mockGraphSource {
return &mockGraphSource{
bestHeight: height,
infos: make(map[uint64]channeldb.ChannelEdgeInfo),
edges: make(map[uint64][]channeldb.ChannelEdgePolicy),
zombies: make(map[uint64][][33]byte),
}
}
var _ routing.ChannelGraphSource = (*mockGraphSource)(nil)
func (r *mockGraphSource) AddNode(node *channeldb.LightningNode) error {
r.mu.Lock()
defer r.mu.Unlock()
r.nodes = append(r.nodes, *node)
return nil
}
func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, ok := r.infos[info.ChannelID]; ok {
return errors.New("info already exist")
}
r.infos[info.ChannelID] = *info
return nil
}
func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy) error {
r.mu.Lock()
defer r.mu.Unlock()
if len(r.edges[edge.ChannelID]) == 0 {
r.edges[edge.ChannelID] = make([]channeldb.ChannelEdgePolicy, 2)
}
if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
r.edges[edge.ChannelID][0] = *edge
} else {
r.edges[edge.ChannelID][1] = *edge
}
return nil
}
func (r *mockGraphSource) CurrentBlockHeight() (uint32, error) {
return r.bestHeight, nil
}
func (r *mockGraphSource) AddProof(chanID lnwire.ShortChannelID,
proof *channeldb.ChannelAuthProof) error {
r.mu.Lock()
defer r.mu.Unlock()
chanIDInt := chanID.ToUint64()
info, ok := r.infos[chanIDInt]
if !ok {
return errors.New("channel does not exist")
}
info.AuthProof = proof
r.infos[chanIDInt] = info
return nil
}
func (r *mockGraphSource) ForEachNode(func(node *channeldb.LightningNode) error) error {
return nil
}
func (r *mockGraphSource) ForAllOutgoingChannels(cb func(i *channeldb.ChannelEdgeInfo,
c *channeldb.ChannelEdgePolicy) error) error {
r.mu.Lock()
defer r.mu.Unlock()
chans := make(map[uint64]channeldb.ChannelEdge)
for _, info := range r.infos {
info := info
edgeInfo := chans[info.ChannelID]
edgeInfo.Info = &info
chans[info.ChannelID] = edgeInfo
}
for _, edges := range r.edges {
edges := edges
edge := chans[edges[0].ChannelID]
edge.Policy1 = &edges[0]
chans[edges[0].ChannelID] = edge
}
for _, channel := range chans {
cb(channel.Info, channel.Policy1)
}
return nil
}
func (r *mockGraphSource) ForEachChannel(func(chanInfo *channeldb.ChannelEdgeInfo,
e1, e2 *channeldb.ChannelEdgePolicy) error) error {
return nil
}
func (r *mockGraphSource) GetChannelByID(chanID lnwire.ShortChannelID) (
*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy,
*channeldb.ChannelEdgePolicy, error) {
r.mu.Lock()
defer r.mu.Unlock()
chanIDInt := chanID.ToUint64()
chanInfo, ok := r.infos[chanIDInt]
if !ok {
pubKeys, isZombie := r.zombies[chanIDInt]
if !isZombie {
return nil, nil, nil, channeldb.ErrEdgeNotFound
}
return &channeldb.ChannelEdgeInfo{
NodeKey1Bytes: pubKeys[0],
NodeKey2Bytes: pubKeys[1],
}, nil, nil, channeldb.ErrZombieEdge
}
edges := r.edges[chanID.ToUint64()]
if len(edges) == 0 {
return &chanInfo, nil, nil, nil
}
var edge1 *channeldb.ChannelEdgePolicy
if !reflect.DeepEqual(edges[0], channeldb.ChannelEdgePolicy{}) {
edge1 = &edges[0]
}
var edge2 *channeldb.ChannelEdgePolicy
if !reflect.DeepEqual(edges[1], channeldb.ChannelEdgePolicy{}) {
edge2 = &edges[1]
}
return &chanInfo, edge1, edge2, nil
}
func (r *mockGraphSource) FetchLightningNode(
nodePub route.Vertex) (*channeldb.LightningNode, error) {
for _, node := range r.nodes {
if bytes.Equal(nodePub[:], node.PubKeyBytes[:]) {
return &node, nil
}
}
return nil, channeldb.ErrGraphNodeNotFound
}
// IsStaleNode returns true if the graph source has a node announcement for the
// target node with a more recent timestamp.
func (r *mockGraphSource) IsStaleNode(nodePub route.Vertex, timestamp time.Time) bool {
r.mu.Lock()
defer r.mu.Unlock()
for _, node := range r.nodes {
if node.PubKeyBytes == nodePub {
return node.LastUpdate.After(timestamp) ||
node.LastUpdate.Equal(timestamp)
}
}
// If we did not find the node among our existing graph nodes, we
// require the node to already have a channel in the graph to not be
// considered stale.
for _, info := range r.infos {
if info.NodeKey1Bytes == nodePub {
return false
}
if info.NodeKey2Bytes == nodePub {
return false
}
}
return true
}
// IsPublicNode determines whether the given vertex is seen as a public node in
// the graph from the graph's source node's point of view.
func (r *mockGraphSource) IsPublicNode(node route.Vertex) (bool, error) {
for _, info := range r.infos {
if !bytes.Equal(node[:], info.NodeKey1Bytes[:]) &&
!bytes.Equal(node[:], info.NodeKey2Bytes[:]) {
continue
}
if info.AuthProof != nil {
return true, nil
}
}
return false, nil
}
// IsKnownEdge returns true if the graph source already knows of the passed
// channel ID either as a live or zombie channel.
func (r *mockGraphSource) IsKnownEdge(chanID lnwire.ShortChannelID) bool {
r.mu.Lock()
defer r.mu.Unlock()
chanIDInt := chanID.ToUint64()
_, exists := r.infos[chanIDInt]
_, isZombie := r.zombies[chanIDInt]
return exists || isZombie
}
// IsStaleEdgePolicy returns true if the graph source has a channel edge for
// the passed channel ID (and flags) that have a more recent timestamp.
func (r *mockGraphSource) IsStaleEdgePolicy(chanID lnwire.ShortChannelID,
timestamp time.Time, flags lnwire.ChanUpdateChanFlags) bool {
r.mu.Lock()
defer r.mu.Unlock()
chanIDInt := chanID.ToUint64()
edges, ok := r.edges[chanIDInt]
if !ok {
// Since the edge doesn't exist, we'll check our zombie index as
// well.
_, isZombie := r.zombies[chanIDInt]
if !isZombie {
return false
}
// Since it exists within our zombie index, we'll check that it
// respects the router's live edge horizon to determine whether
// it is stale or not.
return time.Since(timestamp) > routing.DefaultChannelPruneExpiry
}
switch {
case flags&lnwire.ChanUpdateDirection == 0 &&
!reflect.DeepEqual(edges[0], channeldb.ChannelEdgePolicy{}):
return !timestamp.After(edges[0].LastUpdate)
case flags&lnwire.ChanUpdateDirection == 1 &&
!reflect.DeepEqual(edges[1], channeldb.ChannelEdgePolicy{}):
return !timestamp.After(edges[1].LastUpdate)
default:
return false
}
}
// MarkEdgeLive clears an edge from our zombie index, deeming it as live.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *mockGraphSource) MarkEdgeLive(chanID lnwire.ShortChannelID) error {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.zombies, chanID.ToUint64())
return nil
}
// MarkEdgeZombie marks an edge as a zombie within our zombie index.
func (r *mockGraphSource) MarkEdgeZombie(chanID lnwire.ShortChannelID, pubKey1,
pubKey2 [33]byte) error {
r.mu.Lock()
defer r.mu.Unlock()
r.zombies[chanID.ToUint64()] = [][33]byte{pubKey1, pubKey2}
return nil
}
type mockNotifier struct {
clientCounter uint32
epochClients map[uint32]chan *chainntnfs.BlockEpoch
sync.RWMutex
}
func newMockNotifier() *mockNotifier {
return &mockNotifier{
epochClients: make(map[uint32]chan *chainntnfs.BlockEpoch),
}
}
func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
_ []byte, numConfs, _ uint32) (*chainntnfs.ConfirmationEvent, error) {
return nil, nil
}
func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, _ []byte,
_ uint32) (*chainntnfs.SpendEvent, error) {
return nil, nil
}
func (m *mockNotifier) notifyBlock(hash chainhash.Hash, height uint32) {
m.RLock()
defer m.RUnlock()
for _, client := range m.epochClients {
client <- &chainntnfs.BlockEpoch{
Height: int32(height),
Hash: &hash,
}
}
}
func (m *mockNotifier) RegisterBlockEpochNtfn(
bestBlock *chainntnfs.BlockEpoch) (*chainntnfs.BlockEpochEvent, error) {
m.RLock()
defer m.RUnlock()
epochChan := make(chan *chainntnfs.BlockEpoch)
clientID := m.clientCounter
m.clientCounter++
m.epochClients[clientID] = epochChan
return &chainntnfs.BlockEpochEvent{
Epochs: epochChan,
Cancel: func() {},
}, nil
}
func (m *mockNotifier) Start() error {
return nil
}
func (m *mockNotifier) Stop() error {
return nil
}
type annBatch struct {
nodeAnn1 *lnwire.NodeAnnouncement
nodeAnn2 *lnwire.NodeAnnouncement
localChanAnn *lnwire.ChannelAnnouncement
remoteChanAnn *lnwire.ChannelAnnouncement
chanUpdAnn1 *lnwire.ChannelUpdate
chanUpdAnn2 *lnwire.ChannelUpdate
localProofAnn *lnwire.AnnounceSignatures
remoteProofAnn *lnwire.AnnounceSignatures
}
func createAnnouncements(blockHeight uint32) (*annBatch, error) {
var err error
var batch annBatch
timestamp := testTimestamp
batch.nodeAnn1, err = createNodeAnnouncement(nodeKeyPriv1, timestamp)
if err != nil {
return nil, err
}
batch.nodeAnn2, err = createNodeAnnouncement(nodeKeyPriv2, timestamp)
if err != nil {
return nil, err
}
batch.remoteChanAnn, err = createRemoteChannelAnnouncement(blockHeight)
if err != nil {
return nil, err
}
batch.remoteProofAnn = &lnwire.AnnounceSignatures{
ShortChannelID: lnwire.ShortChannelID{
BlockHeight: blockHeight,
},
NodeSignature: batch.remoteChanAnn.NodeSig2,
BitcoinSignature: batch.remoteChanAnn.BitcoinSig2,
}
batch.localChanAnn, err = createRemoteChannelAnnouncement(blockHeight)
if err != nil {
return nil, err
}
batch.localProofAnn = &lnwire.AnnounceSignatures{
ShortChannelID: lnwire.ShortChannelID{
BlockHeight: blockHeight,
},
NodeSignature: batch.localChanAnn.NodeSig1,
BitcoinSignature: batch.localChanAnn.BitcoinSig1,
}
batch.chanUpdAnn1, err = createUpdateAnnouncement(
blockHeight, 0, nodeKeyPriv1, timestamp,
)
if err != nil {
return nil, err
}
batch.chanUpdAnn2, err = createUpdateAnnouncement(
blockHeight, 1, nodeKeyPriv2, timestamp,
)
if err != nil {
return nil, err
}
return &batch, nil
}
func createNodeAnnouncement(priv *btcec.PrivateKey,
timestamp uint32, extraBytes ...[]byte) (*lnwire.NodeAnnouncement, error) {
var err error
k := hex.EncodeToString(priv.Serialize())
alias, err := lnwire.NewNodeAlias("kek" + k[:10])
if err != nil {
return nil, err
}
a := &lnwire.NodeAnnouncement{
Timestamp: timestamp,
Addresses: testAddrs,
Alias: alias,
Features: testFeatures,
}
copy(a.NodeID[:], priv.PubKey().SerializeCompressed())
if len(extraBytes) == 1 {
a.ExtraOpaqueData = extraBytes[0]
}
signer := mockSigner{priv}
sig, err := netann.SignAnnouncement(&signer, priv.PubKey(), a)
if err != nil {
return nil, err
}
a.Signature, err = lnwire.NewSigFromSignature(sig)
if err != nil {
return nil, err
}
return a, nil
}
func createUpdateAnnouncement(blockHeight uint32,
flags lnwire.ChanUpdateChanFlags,
nodeKey *btcec.PrivateKey, timestamp uint32,
extraBytes ...[]byte) (*lnwire.ChannelUpdate, error) {
var err error
htlcMinMsat := lnwire.MilliSatoshi(prand.Int63())
a := &lnwire.ChannelUpdate{
ShortChannelID: lnwire.ShortChannelID{
BlockHeight: blockHeight,
},
Timestamp: timestamp,
MessageFlags: lnwire.ChanUpdateOptionMaxHtlc,
ChannelFlags: flags,
TimeLockDelta: uint16(prand.Int63()),
HtlcMinimumMsat: htlcMinMsat,
// Since the max HTLC must be greater than the min HTLC to pass channel
// update validation, set it to double the min htlc.
HtlcMaximumMsat: 2 * htlcMinMsat,
FeeRate: uint32(prand.Int31()),
BaseFee: uint32(prand.Int31()),
}
if len(extraBytes) == 1 {
a.ExtraOpaqueData = extraBytes[0]
}
err = signUpdate(nodeKey, a)
if err != nil {
return nil, err
}
return a, nil
}
func signUpdate(nodeKey *btcec.PrivateKey, a *lnwire.ChannelUpdate) error {
pub := nodeKey.PubKey()
signer := mockSigner{nodeKey}
sig, err := netann.SignAnnouncement(&signer, pub, a)
if err != nil {
return err
}
a.Signature, err = lnwire.NewSigFromSignature(sig)
if err != nil {
return err
}
return nil
}
func createAnnouncementWithoutProof(blockHeight uint32,
extraBytes ...[]byte) *lnwire.ChannelAnnouncement {
a := &lnwire.ChannelAnnouncement{
ShortChannelID: lnwire.ShortChannelID{
BlockHeight: blockHeight,
TxIndex: 0,
TxPosition: 0,
},
Features: testFeatures,
}
copy(a.NodeID1[:], nodeKeyPub1.SerializeCompressed())
copy(a.NodeID2[:], nodeKeyPub2.SerializeCompressed())
copy(a.BitcoinKey1[:], bitcoinKeyPub1.SerializeCompressed())
copy(a.BitcoinKey2[:], bitcoinKeyPub2.SerializeCompressed())
if len(extraBytes) == 1 {
a.ExtraOpaqueData = extraBytes[0]
}
return a
}
func createRemoteChannelAnnouncement(blockHeight uint32,
extraBytes ...[]byte) (*lnwire.ChannelAnnouncement, error) {
a := createAnnouncementWithoutProof(blockHeight, extraBytes...)
pub := nodeKeyPriv1.PubKey()
signer := mockSigner{nodeKeyPriv1}
sig, err := netann.SignAnnouncement(&signer, pub, a)
if err != nil {
return nil, err
}
a.NodeSig1, err = lnwire.NewSigFromSignature(sig)
if err != nil {
return nil, err
}
pub = nodeKeyPriv2.PubKey()
signer = mockSigner{nodeKeyPriv2}
sig, err = netann.SignAnnouncement(&signer, pub, a)
if err != nil {
return nil, err
}
a.NodeSig2, err = lnwire.NewSigFromSignature(sig)
if err != nil {
return nil, err
}
pub = bitcoinKeyPriv1.PubKey()
signer = mockSigner{bitcoinKeyPriv1}
sig, err = netann.SignAnnouncement(&signer, pub, a)
if err != nil {
return nil, err
}
a.BitcoinSig1, err = lnwire.NewSigFromSignature(sig)
if err != nil {
return nil, err
}
pub = bitcoinKeyPriv2.PubKey()
signer = mockSigner{bitcoinKeyPriv2}
sig, err = netann.SignAnnouncement(&signer, pub, a)
if err != nil {
return nil, err
}
a.BitcoinSig2, err = lnwire.NewSigFromSignature(sig)
if err != nil {
return nil, err
}
return a, nil
}
type testCtx struct {
gossiper *AuthenticatedGossiper
router *mockGraphSource
notifier *mockNotifier
broadcastedMessage chan msgWithSenders
}
func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
// Next we'll initialize an instance of the channel router with mock
// versions of the chain and channel notifier. As we don't need to test
// any p2p functionality, the peer send and switch send,
// broadcast functions won't be populated.
notifier := newMockNotifier()
router := newMockRouter(startHeight)
db, cleanUpDb, err := makeTestDB()
if err != nil {
return nil, nil, err
}
waitingProofStore, err := channeldb.NewWaitingProofStore(db)
if err != nil {
cleanUpDb()
return nil, nil, err
}
broadcastedMessage := make(chan msgWithSenders, 10)
gossiper := New(Config{
Notifier: notifier,
Broadcast: func(senders map[route.Vertex]struct{},
msgs ...lnwire.Message) error {
for _, msg := range msgs {
broadcastedMessage <- msgWithSenders{
msg: msg,
senders: senders,
}
}
return nil
},
NotifyWhenOnline: func(target [33]byte,
peerChan chan<- lnpeer.Peer) {
pk, _ := btcec.ParsePubKey(target[:], btcec.S256())
peerChan <- &mockPeer{pk, nil, nil}
},
NotifyWhenOffline: func(_ [33]byte) <-chan struct{} {
c := make(chan struct{})
return c
},
SelfNodeAnnouncement: func(bool) (lnwire.NodeAnnouncement, error) {
return lnwire.NodeAnnouncement{
Timestamp: testTimestamp,
}, nil
},
Router: router,
TrickleDelay: trickleDelay,
RetransmitTicker: ticker.NewForce(retransmitDelay),
RebroadcastInterval: rebroadcastInterval,
ProofMatureDelta: proofMatureDelta,
WaitingProofStore: waitingProofStore,
MessageStore: newMockMessageStore(),
RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
NumActiveSyncers: 3,
AnnSigner: &mockSigner{nodeKeyPriv1},
SubBatchDelay: time.Second * 5,
MinimumBatchSize: 10,
}, nodeKeyPub1)
if err := gossiper.Start(); err != nil {
cleanUpDb()
return nil, nil, fmt.Errorf("unable to start router: %v", err)
}
// Mark the graph as synced in order to allow the announcements to be
// broadcast.
gossiper.syncMgr.markGraphSynced()
cleanUp := func() {
gossiper.Stop()
cleanUpDb()
}
return &testCtx{
router: router,
notifier: notifier,
gossiper: gossiper,
broadcastedMessage: broadcastedMessage,
}, cleanUp, nil
}
// TestProcessAnnouncement checks that mature announcements are propagated to
// the router subsystem.
func TestProcessAnnouncement(t *testing.T) {
t.Parallel()
timestamp := testTimestamp
ctx, cleanup, err := createTestCtx(0)
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
assertSenderExistence := func(sender *btcec.PublicKey, msg msgWithSenders) {
if _, ok := msg.senders[route.NewVertex(sender)]; !ok {
t.Fatalf("sender=%x not present in %v",
sender.SerializeCompressed(), spew.Sdump(msg))
}
}
nodePeer := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil}
// First, we'll craft a valid remote channel announcement and send it to
// the gossiper so that it can be processed.
ca, err := createRemoteChannelAnnouncement(0)
if err != nil {
t.Fatalf("can't create channel announcement: %v", err)
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer):
case <-time.After(2 * time.Second):
t.Fatal("remote announcement not processed")
}
if err != nil {
t.Fatalf("can't process remote announcement: %v", err)
}
// The announcement should be broadcast and included in our local view
// of the graph.
select {
case msg := <-ctx.broadcastedMessage:
assertSenderExistence(nodePeer.IdentityKey(), msg)
case <-time.After(2 * trickleDelay):
t.Fatal("announcement wasn't proceeded")
}
if len(ctx.router.infos) != 1 {
t.Fatalf("edge wasn't added to router: %v", err)
}
// We'll then craft the channel policy of the remote party and also send
// it to the gossiper.
ua, err := createUpdateAnnouncement(0, 0, nodeKeyPriv1, timestamp)
if err != nil {
t.Fatalf("can't create update announcement: %v", err)
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer):
case <-time.After(2 * time.Second):
t.Fatal("remote announcement not processed")
}
if err != nil {
t.Fatalf("can't process remote announcement: %v", err)
}
// The channel policy should be broadcast to the rest of the network.
select {
case msg := <-ctx.broadcastedMessage:
assertSenderExistence(nodePeer.IdentityKey(), msg)
case <-time.After(2 * trickleDelay):
t.Fatal("announcement wasn't proceeded")
}
if len(ctx.router.edges) != 1 {
t.Fatalf("edge update wasn't added to router: %v", err)
}
// Finally, we'll craft the remote party's node announcement.
na, err := createNodeAnnouncement(nodeKeyPriv1, timestamp)
if err != nil {
t.Fatalf("can't create node announcement: %v", err)
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(na, nodePeer):
case <-time.After(2 * time.Second):
t.Fatal("remote announcement not processed")
}
if err != nil {
t.Fatalf("can't process remote announcement: %v", err)
}
// It should also be broadcast to the network and included in our local
// view of the graph.
select {
case msg := <-ctx.broadcastedMessage:
assertSenderExistence(nodePeer.IdentityKey(), msg)
case <-time.After(2 * trickleDelay):
t.Fatal("announcement wasn't proceeded")
}
if len(ctx.router.nodes) != 1 {
t.Fatalf("node wasn't added to router: %v", err)
}
}
// TestPrematureAnnouncement checks that premature announcements are
// not propagated to the router subsystem until block with according
// block height received.
func TestPrematureAnnouncement(t *testing.T) {
t.Parallel()
timestamp := testTimestamp
ctx, cleanup, err := createTestCtx(0)
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
_, err = createNodeAnnouncement(nodeKeyPriv1, timestamp)
if err != nil {
t.Fatalf("can't create node announcement: %v", err)
}
nodePeer := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil}
// Pretending that we receive the valid channel announcement from
// remote side, but block height of this announcement is greater than
// highest know to us, for that reason it should be added to the
// repeat/premature batch.
ca, err := createRemoteChannelAnnouncement(1)
if err != nil {
t.Fatalf("can't create channel announcement: %v", err)
}
select {
case <-ctx.gossiper.ProcessRemoteAnnouncement(ca, nodePeer):
t.Fatal("announcement was proceeded")
case <-time.After(100 * time.Millisecond):
}
if len(ctx.router.infos) != 0 {
t.Fatal("edge was added to router")
}
// Pretending that we receive the valid channel update announcement from
// remote side, but block height of this announcement is greater than
// highest know to us, for that reason it should be added to the
// repeat/premature batch.
ua, err := createUpdateAnnouncement(1, 0, nodeKeyPriv1, timestamp)
if err != nil {
t.Fatalf("can't create update announcement: %v", err)
}
select {
case <-ctx.gossiper.ProcessRemoteAnnouncement(ua, nodePeer):
t.Fatal("announcement was proceeded")
case <-time.After(100 * time.Millisecond):
}
if len(ctx.router.edges) != 0 {
t.Fatal("edge update was added to router")
}
// Generate new block and waiting the previously added announcements
// to be proceeded.
newBlock := &wire.MsgBlock{}
ctx.notifier.notifyBlock(newBlock.Header.BlockHash(), 1)
select {
case <-ctx.broadcastedMessage:
case <-time.After(2 * trickleDelay):
t.Fatal("announcement wasn't broadcasted")
}
if len(ctx.router.infos) != 1 {
t.Fatalf("edge wasn't added to router: %v", err)
}
select {
case <-ctx.broadcastedMessage:
case <-time.After(2 * trickleDelay):
t.Fatal("announcement wasn't broadcasted")
}
if len(ctx.router.edges) != 1 {
t.Fatalf("edge update wasn't added to router: %v", err)
}
}
// TestSignatureAnnouncementLocalFirst ensures that the AuthenticatedGossiper
// properly processes partial and fully announcement signatures message.
func TestSignatureAnnouncementLocalFirst(t *testing.T) {
t.Parallel()
ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta))
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
// Set up a channel that we can use to inspect the messages sent
// directly from the gossiper.
sentMsgs := make(chan lnwire.Message, 10)
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target [33]byte,
peerChan chan<- lnpeer.Peer) {
pk, _ := btcec.ParsePubKey(target[:], btcec.S256())
select {
case peerChan <- &mockPeer{pk, sentMsgs, ctx.gossiper.quit}:
case <-ctx.gossiper.quit:
}
}
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("can't generate announcements: %v", err)
}
localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit}
// Recreate lightning network topology. Initialize router with channel
// between two nodes.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localChanAnn, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process channel ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.chanUpdAnn1, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process channel update: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.nodeAnn1, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// The local ChannelUpdate should now be sent directly to the remote peer,
// such that the edge can be used for routing, regardless if this channel
// is announced or not (private channel).
select {
case msg := <-sentMsgs:
assertMessage(t, batch.chanUpdAnn1, msg)
case <-time.After(1 * time.Second):
t.Fatal("gossiper did not send channel update to peer")
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process channel update: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.nodeAnn2, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localProofAnn, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process local proof: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("announcements were broadcast")
case <-time.After(2 * trickleDelay):
}
number := 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
); err != nil {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 1 {
t.Fatal("wrong number of objects in storage")
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteProofAnn, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process remote proof: %v", err)
}
for i := 0; i < 5; i++ {
select {
case <-ctx.broadcastedMessage:
case <-time.After(time.Second):
t.Fatal("announcement wasn't broadcast")
}
}
number = 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
); err != nil && err != channeldb.ErrWaitingProofNotFound {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 0 {
t.Fatal("waiting proof should be removed from storage")
}
}
// TestOrphanSignatureAnnouncement ensures that the gossiper properly
// processes announcement with unknown channel ids.
func TestOrphanSignatureAnnouncement(t *testing.T) {
t.Parallel()
ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta))
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
// Set up a channel that we can use to inspect the messages sent
// directly from the gossiper.
sentMsgs := make(chan lnwire.Message, 10)
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(target [33]byte,
peerChan chan<- lnpeer.Peer) {
pk, _ := btcec.ParsePubKey(target[:], btcec.S256())
select {
case peerChan <- &mockPeer{pk, sentMsgs, ctx.gossiper.quit}:
case <-ctx.gossiper.quit:
}
}
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("can't generate announcements: %v", err)
}
localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit}
// Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process, in
// this case the announcement should be added in the orphan batch
// because we haven't announce the channel yet.
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn,
remotePeer):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to proceed announcement: %v", err)
}
number := 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
); err != nil {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 1 {
t.Fatal("wrong number of objects in storage")
}
// Recreate lightning network topology. Initialize router with channel
// between two nodes.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn,
localKey):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1,
localKey):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.nodeAnn1, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// The local ChannelUpdate should now be sent directly to the remote peer,
// such that the edge can be used for routing, regardless if this channel
// is announced or not (private channel).
select {
case msg := <-sentMsgs:
assertMessage(t, batch.chanUpdAnn1, msg)
case <-time.After(1 * time.Second):
t.Fatal("gossiper did not send channel update to peer")
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
remotePeer):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.nodeAnn2, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// After that we process local announcement, and waiting to receive
// the channel announcement.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn,
localKey):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process: %v", err)
}
// The local proof should be sent to the remote peer.
select {
case msg := <-sentMsgs:
assertMessage(t, batch.localProofAnn, msg)
case <-time.After(2 * time.Second):
t.Fatalf("local proof was not sent to peer")
}
// And since both remote and local announcements are processed, we
// should be broadcasting the final channel announcements.
for i := 0; i < 5; i++ {
select {
case <-ctx.broadcastedMessage:
case <-time.After(time.Second):
t.Fatal("announcement wasn't broadcast")
}
}
number = 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(p *channeldb.WaitingProof) error {
number++
return nil
},
); err != nil {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 0 {
t.Fatalf("wrong number of objects in storage: %v", number)
}
}
// TestSignatureAnnouncementRetryAtStartup tests that if we restart the
// gossiper, it will retry sending the AnnounceSignatures to the peer if it did
// not succeed before shutting down, and the full channel proof is not yet
// assembled.
func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
t.Parallel()
ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta))
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("can't generate announcements: %v", err)
}
localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
// Set up a channel to intercept the messages sent to the remote peer.
sentToPeer := make(chan lnwire.Message, 1)
remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit}
// Since the reliable send to the remote peer of the local channel proof
// requires a notification when the peer comes online, we'll capture the
// channel through which it gets sent to control exactly when to
// dispatch it.
notifyPeers := make(chan chan<- lnpeer.Peer, 1)
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte,
connectedChan chan<- lnpeer.Peer) {
notifyPeers <- connectedChan
}
// Recreate lightning network topology. Initialize router with channel
// between two nodes.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localChanAnn, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process channel ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localProofAnn, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process :%v", err)
}
// The gossiper should register for a notification for when the peer is
// online.
select {
case <-notifyPeers:
case <-time.After(2 * time.Second):
t.Fatalf("gossiper did not ask to get notified when " +
"peer is online")
}
// The proof should not be broadcast yet since we're still missing the
// remote party's.
select {
case <-ctx.broadcastedMessage:
t.Fatal("announcements were broadcast")
case <-time.After(2 * trickleDelay):
}
// And it shouldn't be sent to the peer either as they are offline.
select {
case msg := <-sentToPeer:
t.Fatalf("received unexpected message: %v", spew.Sdump(msg))
case <-time.After(time.Second):
}
number := 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
); err != nil {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 1 {
t.Fatal("wrong number of objects in storage")
}
// Restart the gossiper and restore its original NotifyWhenOnline and
// NotifyWhenOffline methods. This should trigger a new attempt to send
// the message to the peer.
ctx.gossiper.Stop()
gossiper := New(Config{
Notifier: ctx.gossiper.cfg.Notifier,
Broadcast: ctx.gossiper.cfg.Broadcast,
NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline,
NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline,
SelfNodeAnnouncement: ctx.gossiper.cfg.SelfNodeAnnouncement,
Router: ctx.gossiper.cfg.Router,
TrickleDelay: trickleDelay,
RetransmitTicker: ticker.NewForce(retransmitDelay),
RebroadcastInterval: rebroadcastInterval,
ProofMatureDelta: proofMatureDelta,
WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore,
MessageStore: ctx.gossiper.cfg.MessageStore,
RotateTicker: ticker.NewForce(DefaultSyncerRotationInterval),
HistoricalSyncTicker: ticker.NewForce(DefaultHistoricalSyncInterval),
NumActiveSyncers: 3,
MinimumBatchSize: 10,
SubBatchDelay: time.Second * 5,
}, ctx.gossiper.selfKey)
if err != nil {
t.Fatalf("unable to recreate gossiper: %v", err)
}
if err := gossiper.Start(); err != nil {
t.Fatalf("unable to start recreated gossiper: %v", err)
}
defer gossiper.Stop()
// Mark the graph as synced in order to allow the announcements to be
// broadcast.
gossiper.syncMgr.markGraphSynced()
ctx.gossiper = gossiper
remotePeer.quit = ctx.gossiper.quit
// After starting up, the gossiper will see that it has a proof in the
// WaitingProofStore, and will retry sending its part to the remote.
// It should register for a notification for when the peer is online.
var peerChan chan<- lnpeer.Peer
select {
case peerChan = <-notifyPeers:
case <-time.After(2 * time.Second):
t.Fatalf("gossiper did not ask to get notified when " +
"peer is online")
}
// Notify that peer is now online. This should allow the proof to be
// sent.
peerChan <- remotePeer
out:
for {
select {
case msg := <-sentToPeer:
// Since the ChannelUpdate will also be resent as it is
// sent reliably, we'll need to filter it out.
if _, ok := msg.(*lnwire.AnnounceSignatures); !ok {
continue
}
assertMessage(t, batch.localProofAnn, msg)
break out
case <-time.After(2 * time.Second):
t.Fatalf("gossiper did not send message when peer " +
"came online")
}
}
// Now exchanging the remote channel proof, the channel announcement
// broadcast should continue as normal.
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteProofAnn, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
case <-time.After(time.Second):
t.Fatal("announcement wasn't broadcast")
}
number = 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
); err != nil && err != channeldb.ErrWaitingProofNotFound {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 0 {
t.Fatal("waiting proof should be removed from storage")
}
}
// TestSignatureAnnouncementFullProofWhenRemoteProof tests that if a remote
// proof is received when we already have the full proof, the gossiper will send
// the full proof (ChannelAnnouncement) to the remote peer.
func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
t.Parallel()
ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta))
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("can't generate announcements: %v", err)
}
localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
// Set up a channel we can use to inspect messages sent by the
// gossiper to the remote peer.
sentToPeer := make(chan lnwire.Message, 1)
remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit}
// Override NotifyWhenOnline to return the remote peer which we expect
// meesages to be sent to.
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte,
peerChan chan<- lnpeer.Peer) {
peerChan <- remotePeer
}
// Recreate lightning network topology. Initialize router with channel
// between two nodes.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localChanAnn, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process channel ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.chanUpdAnn1, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process channel update: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case msg := <-sentToPeer:
assertMessage(t, batch.chanUpdAnn1, msg)
case <-time.After(2 * time.Second):
t.Fatal("gossiper did not send channel update to remove peer")
}
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.nodeAnn1, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process node ann:%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process channel update: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.nodeAnn2, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localProofAnn, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process local proof: %v", err)
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteProofAnn, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process remote proof: %v", err)
}
// We expect the gossiper to send this message to the remote peer.
select {
case msg := <-sentToPeer:
assertMessage(t, batch.localProofAnn, msg)
case <-time.After(2 * time.Second):
t.Fatal("did not send local proof to peer")
}
// All channel and node announcements should be broadcast.
for i := 0; i < 5; i++ {
select {
case <-ctx.broadcastedMessage:
case <-time.After(time.Second):
t.Fatal("announcement wasn't broadcast")
}
}
number := 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
); err != nil && err != channeldb.ErrWaitingProofNotFound {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 0 {
t.Fatal("waiting proof should be removed from storage")
}
// Now give the gossiper the remote proof yet again. This should
// trigger a send of the full ChannelAnnouncement.
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteProofAnn, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process remote proof: %v", err)
}
// We expect the gossiper to send this message to the remote peer.
select {
case msg := <-sentToPeer:
_, ok := msg.(*lnwire.ChannelAnnouncement)
if !ok {
t.Fatalf("expected ChannelAnnouncement, instead got %T", msg)
}
case <-time.After(2 * time.Second):
t.Fatal("did not send local proof to peer")
}
}
// TestDeDuplicatedAnnouncements ensures that the deDupedAnnouncements struct
// properly stores and delivers the set of de-duplicated announcements.
func TestDeDuplicatedAnnouncements(t *testing.T) {
t.Parallel()
timestamp := testTimestamp
announcements := deDupedAnnouncements{}
announcements.Reset()
// Ensure that after new deDupedAnnouncements struct is created and
// reset that storage of each announcement type is empty.
if len(announcements.channelAnnouncements) != 0 {
t.Fatal("channel announcements map not empty after reset")
}
if len(announcements.channelUpdates) != 0 {
t.Fatal("channel updates map not empty after reset")
}
if len(announcements.nodeAnnouncements) != 0 {
t.Fatal("node announcements map not empty after reset")
}
// Ensure that remote channel announcements are properly stored
// and de-duplicated.
ca, err := createRemoteChannelAnnouncement(0)
if err != nil {
t.Fatalf("can't create remote channel announcement: %v", err)
}
nodePeer := &mockPeer{bitcoinKeyPub2, nil, nil}
announcements.AddMsgs(networkMsg{
msg: ca,
peer: nodePeer,
source: nodePeer.IdentityKey(),
})
if len(announcements.channelAnnouncements) != 1 {
t.Fatal("new channel announcement not stored in batch")
}
// We'll create a second instance of the same announcement with the
// same channel ID. Adding this shouldn't cause an increase in the
// number of items as they should be de-duplicated.
ca2, err := createRemoteChannelAnnouncement(0)
if err != nil {
t.Fatalf("can't create remote channel announcement: %v", err)
}
announcements.AddMsgs(networkMsg{
msg: ca2,
peer: nodePeer,
source: nodePeer.IdentityKey(),
})
if len(announcements.channelAnnouncements) != 1 {
t.Fatal("channel announcement not replaced in batch")
}
// Next, we'll ensure that channel update announcements are properly
// stored and de-duplicated. We do this by creating two updates
// announcements with the same short ID and flag.
ua, err := createUpdateAnnouncement(0, 0, nodeKeyPriv1, timestamp)
if err != nil {
t.Fatalf("can't create update announcement: %v", err)
}
announcements.AddMsgs(networkMsg{
msg: ua,
peer: nodePeer,
source: nodePeer.IdentityKey(),
})
if len(announcements.channelUpdates) != 1 {
t.Fatal("new channel update not stored in batch")
}
// Adding the very same announcement shouldn't cause an increase in the
// number of ChannelUpdate announcements stored.
ua2, err := createUpdateAnnouncement(0, 0, nodeKeyPriv1, timestamp)
if err != nil {
t.Fatalf("can't create update announcement: %v", err)
}
announcements.AddMsgs(networkMsg{
msg: ua2,
peer: nodePeer,
source: nodePeer.IdentityKey(),
})
if len(announcements.channelUpdates) != 1 {
t.Fatal("channel update not replaced in batch")
}
// Adding an announcement with a later timestamp should replace the
// stored one.
ua3, err := createUpdateAnnouncement(0, 0, nodeKeyPriv1, timestamp+1)
if err != nil {
t.Fatalf("can't create update announcement: %v", err)
}
announcements.AddMsgs(networkMsg{
msg: ua3,
peer: nodePeer,
source: nodePeer.IdentityKey(),
})
if len(announcements.channelUpdates) != 1 {
t.Fatal("channel update not replaced in batch")
}
assertChannelUpdate := func(channelUpdate *lnwire.ChannelUpdate) {
channelKey := channelUpdateID{
ua3.ShortChannelID,
ua3.ChannelFlags,
}
mws, ok := announcements.channelUpdates[channelKey]
if !ok {
t.Fatal("channel update not in batch")
}
if mws.msg != channelUpdate {
t.Fatalf("expected channel update %v, got %v)",
channelUpdate, mws.msg)
}
}
// Check that ua3 is the currently stored channel update.
assertChannelUpdate(ua3)
// Adding a channel update with an earlier timestamp should NOT
// replace the one stored.
ua4, err := createUpdateAnnouncement(0, 0, nodeKeyPriv1, timestamp)
if err != nil {
t.Fatalf("can't create update announcement: %v", err)
}
announcements.AddMsgs(networkMsg{
msg: ua4,
peer: nodePeer,
source: nodePeer.IdentityKey(),
})
if len(announcements.channelUpdates) != 1 {
t.Fatal("channel update not in batch")
}
assertChannelUpdate(ua3)
// Next well ensure that node announcements are properly de-duplicated.
// We'll first add a single instance with a node's private key.
na, err := createNodeAnnouncement(nodeKeyPriv1, timestamp)
if err != nil {
t.Fatalf("can't create node announcement: %v", err)
}
announcements.AddMsgs(networkMsg{
msg: na,
peer: nodePeer,
source: nodePeer.IdentityKey(),
})
if len(announcements.nodeAnnouncements) != 1 {
t.Fatal("new node announcement not stored in batch")
}
// We'll now add another node to the batch.
na2, err := createNodeAnnouncement(nodeKeyPriv2, timestamp)
if err != nil {
t.Fatalf("can't create node announcement: %v", err)
}
announcements.AddMsgs(networkMsg{
msg: na2,
peer: nodePeer,
source: nodePeer.IdentityKey(),
})
if len(announcements.nodeAnnouncements) != 2 {
t.Fatal("second node announcement not stored in batch")
}
// Adding a new instance of the _same_ node shouldn't increase the size
// of the node ann batch.
na3, err := createNodeAnnouncement(nodeKeyPriv2, timestamp)
if err != nil {
t.Fatalf("can't create node announcement: %v", err)
}
announcements.AddMsgs(networkMsg{
msg: na3,
peer: nodePeer,
source: nodePeer.IdentityKey(),
})
if len(announcements.nodeAnnouncements) != 2 {
t.Fatal("second node announcement not replaced in batch")
}
// Ensure that node announcement with different pointer to same public
// key is still de-duplicated.
newNodeKeyPointer := nodeKeyPriv2
na4, err := createNodeAnnouncement(newNodeKeyPointer, timestamp)
if err != nil {
t.Fatalf("can't create node announcement: %v", err)
}
announcements.AddMsgs(networkMsg{
msg: na4,
peer: nodePeer,
source: nodePeer.IdentityKey(),
})
if len(announcements.nodeAnnouncements) != 2 {
t.Fatal("second node announcement not replaced again in batch")
}
// Ensure that node announcement with increased timestamp replaces
// what is currently stored.
na5, err := createNodeAnnouncement(nodeKeyPriv2, timestamp+1)
if err != nil {
t.Fatalf("can't create node announcement: %v", err)
}
announcements.AddMsgs(networkMsg{
msg: na5,
peer: nodePeer,
source: nodePeer.IdentityKey(),
})
if len(announcements.nodeAnnouncements) != 2 {
t.Fatal("node announcement not replaced in batch")
}
nodeID := route.NewVertex(nodeKeyPriv2.PubKey())
stored, ok := announcements.nodeAnnouncements[nodeID]
if !ok {
t.Fatalf("node announcement not found in batch")
}
if stored.msg != na5 {
t.Fatalf("expected de-duped node announcement to be %v, got %v",
na5, stored.msg)
}
// Ensure that announcement batch delivers channel announcements,
// channel updates, and node announcements in proper order.
batch := announcements.Emit()
if len(batch) != 4 {
t.Fatal("announcement batch incorrect length")
}
if !reflect.DeepEqual(batch[0].msg, ca2) {
t.Fatalf("channel announcement not first in batch: got %v, "+
"expected %v", spew.Sdump(batch[0].msg), spew.Sdump(ca2))
}
if !reflect.DeepEqual(batch[1].msg, ua3) {
t.Fatalf("channel update not next in batch: got %v, "+
"expected %v", spew.Sdump(batch[1].msg), spew.Sdump(ua2))
}
// We'll ensure that both node announcements are present. We check both
// indexes as due to the randomized order of map iteration they may be
// in either place.
if !reflect.DeepEqual(batch[2].msg, na) && !reflect.DeepEqual(batch[3].msg, na) {
t.Fatal("first node announcement not in last part of batch: "+
"got %v, expected %v", batch[2].msg,
na)
}
if !reflect.DeepEqual(batch[2].msg, na5) && !reflect.DeepEqual(batch[3].msg, na5) {
t.Fatalf("second node announcement not in last part of batch: "+
"got %v, expected %v", batch[3].msg,
na5)
}
// Ensure that after reset, storage of each announcement type
// in deDupedAnnouncements struct is empty again.
announcements.Reset()
if len(announcements.channelAnnouncements) != 0 {
t.Fatal("channel announcements map not empty after reset")
}
if len(announcements.channelUpdates) != 0 {
t.Fatal("channel updates map not empty after reset")
}
if len(announcements.nodeAnnouncements) != 0 {
t.Fatal("node announcements map not empty after reset")
}
}
// TestForwardPrivateNodeAnnouncement ensures that we do not forward node
// announcements for nodes who do not intend to publicly advertise themselves.
func TestForwardPrivateNodeAnnouncement(t *testing.T) {
t.Parallel()
const (
startingHeight = 100
timestamp = 123456
)
ctx, cleanup, err := createTestCtx(startingHeight)
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
// We'll start off by processing a channel announcement without a proof
// (i.e., an unadvertised channel), followed by a node announcement for
// this same channel announcement.
chanAnn := createAnnouncementWithoutProof(startingHeight - 2)
pubKey := nodeKeyPriv1.PubKey()
select {
case err := <-ctx.gossiper.ProcessLocalAnnouncement(chanAnn, pubKey):
if err != nil {
t.Fatalf("unable to process local announcement: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatalf("local announcement not processed")
}
// The gossiper should not broadcast the announcement due to it not
// having its announcement signatures.
select {
case <-ctx.broadcastedMessage:
t.Fatal("gossiper should not have broadcast channel announcement")
case <-time.After(2 * trickleDelay):
}
nodeAnn, err := createNodeAnnouncement(nodeKeyPriv1, timestamp)
if err != nil {
t.Fatalf("unable to create node announcement: %v", err)
}
select {
case err := <-ctx.gossiper.ProcessLocalAnnouncement(nodeAnn, pubKey):
if err != nil {
t.Fatalf("unable to process remote announcement: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("remote announcement not processed")
}
// The gossiper should also not broadcast the node announcement due to
// it not being part of any advertised channels.
select {
case <-ctx.broadcastedMessage:
t.Fatal("gossiper should not have broadcast node announcement")
case <-time.After(2 * trickleDelay):
}
// Now, we'll attempt to forward the NodeAnnouncement for the same node
// by opening a public channel on the network. We'll create a
// ChannelAnnouncement and hand it off to the gossiper in order to
// process it.
remoteChanAnn, err := createRemoteChannelAnnouncement(startingHeight - 1)
if err != nil {
t.Fatalf("unable to create remote channel announcement: %v", err)
}
peer := &mockPeer{pubKey, nil, nil}
select {
case err := <-ctx.gossiper.ProcessRemoteAnnouncement(remoteChanAnn, peer):
if err != nil {
t.Fatalf("unable to process remote announcement: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("remote announcement not processed")
}
select {
case <-ctx.broadcastedMessage:
case <-time.After(2 * trickleDelay):
t.Fatal("gossiper should have broadcast the channel announcement")
}
// We'll recreate the NodeAnnouncement with an updated timestamp to
// prevent a stale update. The NodeAnnouncement should now be forwarded.
nodeAnn, err = createNodeAnnouncement(nodeKeyPriv1, timestamp+1)
if err != nil {
t.Fatalf("unable to create node announcement: %v", err)
}
select {
case err := <-ctx.gossiper.ProcessRemoteAnnouncement(nodeAnn, peer):
if err != nil {
t.Fatalf("unable to process remote announcement: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("remote announcement not processed")
}
select {
case <-ctx.broadcastedMessage:
case <-time.After(2 * trickleDelay):
t.Fatal("gossiper should have broadcast the node announcement")
}
}
// TestRejectZombieEdge ensures that we properly reject any announcements for
// zombie edges.
func TestRejectZombieEdge(t *testing.T) {
t.Parallel()
// We'll start by creating our test context with a batch of
// announcements.
ctx, cleanup, err := createTestCtx(0)
if err != nil {
t.Fatalf("unable to create test context: %v", err)
}
defer cleanup()
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("unable to create announcements: %v", err)
}
remotePeer := &mockPeer{pk: nodeKeyPriv2.PubKey()}
// processAnnouncements is a helper closure we'll use to test that we
// properly process/reject announcements based on whether they're for a
// zombie edge or not.
processAnnouncements := func(isZombie bool) {
t.Helper()
errChan := ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteChanAnn, remotePeer,
)
select {
case err := <-errChan:
if isZombie && err != nil {
t.Fatalf("expected to reject live channel "+
"announcement with nil error: %v", err)
}
if !isZombie && err != nil {
t.Fatalf("expected to process live channel "+
"announcement: %v", err)
}
case <-time.After(time.Second):
t.Fatal("expected to process channel announcement")
}
select {
case <-ctx.broadcastedMessage:
if isZombie {
t.Fatal("expected to not broadcast zombie " +
"channel announcement")
}
case <-time.After(2 * trickleDelay):
if !isZombie {
t.Fatal("expected to broadcast live channel " +
"announcement")
}
}
errChan = ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
)
select {
case err := <-errChan:
if isZombie && err != nil {
t.Fatalf("expected to reject zombie channel "+
"update with nil error: %v", err)
}
if !isZombie && err != nil {
t.Fatalf("expected to process live channel "+
"update: %v", err)
}
case <-time.After(time.Second):
t.Fatal("expected to process channel update")
}
select {
case <-ctx.broadcastedMessage:
if isZombie {
t.Fatal("expected to not broadcast zombie " +
"channel update")
}
case <-time.After(2 * trickleDelay):
if !isZombie {
t.Fatal("expected to broadcast live channel " +
"update")
}
}
}
// We'll mark the edge for which we'll process announcements for as a
// zombie within the router. This should reject any announcements for
// this edge while it remains as a zombie.
chanID := batch.remoteChanAnn.ShortChannelID
err = ctx.router.MarkEdgeZombie(
chanID, batch.remoteChanAnn.NodeID1, batch.remoteChanAnn.NodeID2,
)
if err != nil {
t.Fatalf("unable to mark channel %v as zombie: %v", chanID, err)
}
processAnnouncements(true)
// If we then mark the edge as live, the edge's zombie status should be
// overridden and the announcements should be processed.
if err := ctx.router.MarkEdgeLive(chanID); err != nil {
t.Fatalf("unable mark channel %v as zombie: %v", chanID, err)
}
processAnnouncements(false)
}
// TestProcessZombieEdgeNowLive ensures that we can detect when a zombie edge
// becomes live by receiving a fresh update.
func TestProcessZombieEdgeNowLive(t *testing.T) {
t.Parallel()
// We'll start by creating our test context with a batch of
// announcements.
ctx, cleanup, err := createTestCtx(0)
if err != nil {
t.Fatalf("unable to create test context: %v", err)
}
defer cleanup()
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("unable to create announcements: %v", err)
}
localPrivKey := nodeKeyPriv1
remotePrivKey := nodeKeyPriv2
remotePeer := &mockPeer{pk: remotePrivKey.PubKey()}
// processAnnouncement is a helper closure we'll use to ensure an
// announcement is properly processed/rejected based on whether the edge
// is a zombie or not. The expectsErr boolean can be used to determine
// whether we should expect an error when processing the message, while
// the isZombie boolean can be used to determine whether the
// announcement should be or not be broadcast.
processAnnouncement := func(ann lnwire.Message, isZombie, expectsErr bool) {
t.Helper()
errChan := ctx.gossiper.ProcessRemoteAnnouncement(
ann, remotePeer,
)
var err error
select {
case err = <-errChan:
case <-time.After(time.Second):
t.Fatal("expected to process announcement")
}
if expectsErr && err == nil {
t.Fatal("expected error when processing announcement")
}
if !expectsErr && err != nil {
t.Fatalf("received unexpected error when processing "+
"announcement: %v", err)
}
select {
case msgWithSenders := <-ctx.broadcastedMessage:
if isZombie {
t.Fatal("expected to not broadcast zombie " +
"channel message")
}
assertMessage(t, ann, msgWithSenders.msg)
case <-time.After(2 * trickleDelay):
if !isZombie {
t.Fatal("expected to broadcast live channel " +
"message")
}
}
}
// We'll generate a channel update with a timestamp far enough in the
// past to consider it a zombie.
zombieTimestamp := time.Now().Add(-routing.DefaultChannelPruneExpiry)
batch.chanUpdAnn2.Timestamp = uint32(zombieTimestamp.Unix())
if err := signUpdate(remotePrivKey, batch.chanUpdAnn2); err != nil {
t.Fatalf("unable to sign update with new timestamp: %v", err)
}
// We'll also add the edge to our zombie index.
chanID := batch.remoteChanAnn.ShortChannelID
err = ctx.router.MarkEdgeZombie(
chanID, batch.remoteChanAnn.NodeID1, batch.remoteChanAnn.NodeID2,
)
if err != nil {
t.Fatalf("unable mark channel %v as zombie: %v", chanID, err)
}
// Attempting to process the current channel update should fail due to
// its edge being considered a zombie and its timestamp not being within
// the live horizon. We should not expect an error here since it is just
// a stale update.
processAnnouncement(batch.chanUpdAnn2, true, false)
// Now we'll generate a new update with a fresh timestamp. This should
// allow the channel update to be processed even though it is still
// marked as a zombie within the index, since it is a fresh new update.
// This won't work however since we'll sign it with the wrong private
// key (local rather than remote).
batch.chanUpdAnn2.Timestamp = uint32(time.Now().Unix())
if err := signUpdate(localPrivKey, batch.chanUpdAnn2); err != nil {
t.Fatalf("unable to sign update with new timestamp: %v", err)
}
// We should expect an error due to the signature being invalid.
processAnnouncement(batch.chanUpdAnn2, true, true)
// Signing it with the correct private key should allow it to be
// processed.
if err := signUpdate(remotePrivKey, batch.chanUpdAnn2); err != nil {
t.Fatalf("unable to sign update with new timestamp: %v", err)
}
// The channel update cannot be successfully processed and broadcast
// until the channel announcement is. Since the channel update indicates
// a fresh new update, the gossiper should stash it until it sees the
// corresponding channel announcement.
updateErrChan := ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
)
select {
case <-ctx.broadcastedMessage:
t.Fatal("expected to not broadcast live channel update " +
"without announcement")
case <-time.After(2 * trickleDelay):
}
// We'll go ahead and process the channel announcement to ensure the
// channel update is processed thereafter.
processAnnouncement(batch.remoteChanAnn, false, false)
// After successfully processing the announcement, the channel update
// should have been processed and broadcast successfully as well.
select {
case err := <-updateErrChan:
if err != nil {
t.Fatalf("expected to process live channel update: %v",
err)
}
case <-time.After(time.Second):
t.Fatal("expected to process announcement")
}
select {
case msgWithSenders := <-ctx.broadcastedMessage:
assertMessage(t, batch.chanUpdAnn2, msgWithSenders.msg)
case <-time.After(2 * trickleDelay):
t.Fatal("expected to broadcast live channel update")
}
}
// TestReceiveRemoteChannelUpdateFirst tests that if we receive a ChannelUpdate
// from the remote before we have processed our own ChannelAnnouncement, it will
// be reprocessed later, after our ChannelAnnouncement.
func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
t.Parallel()
ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta))
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("can't generate announcements: %v", err)
}
localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
// Set up a channel that we can use to inspect the messages sent
// directly from the gossiper.
sentMsgs := make(chan lnwire.Message, 10)
remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit}
// Override NotifyWhenOnline to return the remote peer which we expect
// meesages to be sent to.
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(peer [33]byte,
peerChan chan<- lnpeer.Peer) {
peerChan <- remotePeer
}
// Recreate the case where the remote node is sending us its ChannelUpdate
// before we have been able to process our own ChannelAnnouncement and
// ChannelUpdate.
errRemoteAnn := ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
)
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.nodeAnn2, remotePeer)
if err != nil {
t.Fatalf("unable to process node ann: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Since the remote ChannelUpdate was added for an edge that
// we did not already know about, it should have been added
// to the map of premature ChannelUpdates. Check that nothing
// was added to the graph.
chanInfo, e1, e2, err := ctx.router.GetChannelByID(batch.chanUpdAnn1.ShortChannelID)
if err != channeldb.ErrEdgeNotFound {
t.Fatalf("Expected ErrEdgeNotFound, got: %v", err)
}
if chanInfo != nil {
t.Fatalf("chanInfo was not nil")
}
if e1 != nil {
t.Fatalf("e1 was not nil")
}
if e2 != nil {
t.Fatalf("e2 was not nil")
}
// Recreate lightning network topology. Initialize router with channel
// between two nodes.
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn1, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.nodeAnn1, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// The local ChannelUpdate should now be sent directly to the remote peer,
// such that the edge can be used for routing, regardless if this channel
// is announced or not (private channel).
select {
case msg := <-sentMsgs:
assertMessage(t, batch.chanUpdAnn1, msg)
case <-time.After(1 * time.Second):
t.Fatal("gossiper did not send channel update to peer")
}
// At this point the remote ChannelUpdate we received earlier should
// be reprocessed, as we now have the necessary edge entry in the graph.
select {
case err := <-errRemoteAnn:
if err != nil {
t.Fatalf("error re-processing remote update: %v", err)
}
case <-time.After(2 * trickleDelay):
t.Fatalf("remote update was not processed")
}
// Check that the ChannelEdgePolicy was added to the graph.
chanInfo, e1, e2, err = ctx.router.GetChannelByID(
batch.chanUpdAnn1.ShortChannelID,
)
if err != nil {
t.Fatalf("unable to get channel from router: %v", err)
}
if chanInfo == nil {
t.Fatalf("chanInfo was nil")
}
if e1 == nil {
t.Fatalf("e1 was nil")
}
if e2 == nil {
t.Fatalf("e2 was nil")
}
// Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process.
err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localProofAnn, localKey,
)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("announcements were broadcast")
case <-time.After(2 * trickleDelay):
}
number := 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
); err != nil {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 1 {
t.Fatal("wrong number of objects in storage")
}
err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteProofAnn, remotePeer,
)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
for i := 0; i < 4; i++ {
select {
case <-ctx.broadcastedMessage:
case <-time.After(time.Second):
t.Fatal("announcement wasn't broadcast")
}
}
number = 0
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
},
); err != nil && err != channeldb.ErrWaitingProofNotFound {
t.Fatalf("unable to retrieve objects from store: %v", err)
}
if number != 0 {
t.Fatal("waiting proof should be removed from storage")
}
}
// TestExtraDataChannelAnnouncementValidation tests that we're able to properly
// validate a ChannelAnnouncement that includes opaque bytes that we don't
// currently know of.
func TestExtraDataChannelAnnouncementValidation(t *testing.T) {
t.Parallel()
ctx, cleanup, err := createTestCtx(0)
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
remotePeer := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil}
// We'll now create an announcement that contains an extra set of bytes
// that we don't know of ourselves, but should still include in the
// final signature check.
extraBytes := []byte("gotta validate this stil!")
ca, err := createRemoteChannelAnnouncement(0, extraBytes)
if err != nil {
t.Fatalf("can't create channel announcement: %v", err)
}
// We'll now send the announcement to the main gossiper. We should be
// able to validate this announcement to problem.
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, remotePeer):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process :%v", err)
}
}
// TestExtraDataChannelUpdateValidation tests that we're able to properly
// validate a ChannelUpdate that includes opaque bytes that we don't currently
// know of.
func TestExtraDataChannelUpdateValidation(t *testing.T) {
t.Parallel()
timestamp := testTimestamp
ctx, cleanup, err := createTestCtx(0)
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
remotePeer := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil}
// In this scenario, we'll create two announcements, one regular
// channel announcement, and another channel update announcement, that
// has additional data that we won't be interpreting.
chanAnn, err := createRemoteChannelAnnouncement(0)
if err != nil {
t.Fatalf("unable to create chan ann: %v", err)
}
chanUpdAnn1, err := createUpdateAnnouncement(
0, 0, nodeKeyPriv1, timestamp,
[]byte("must also validate"),
)
if err != nil {
t.Fatalf("unable to create chan up: %v", err)
}
chanUpdAnn2, err := createUpdateAnnouncement(
0, 1, nodeKeyPriv2, timestamp,
[]byte("must also validate"),
)
if err != nil {
t.Fatalf("unable to create chan up: %v", err)
}
// We should be able to properly validate all three messages without
// any issue.
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(chanAnn, remotePeer):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process announcement: %v", err)
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(chanUpdAnn1, remotePeer):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process announcement: %v", err)
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(chanUpdAnn2, remotePeer):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process announcement: %v", err)
}
}
// TestExtraDataNodeAnnouncementValidation tests that we're able to properly
// validate a NodeAnnouncement that includes opaque bytes that we don't
// currently know of.
func TestExtraDataNodeAnnouncementValidation(t *testing.T) {
t.Parallel()
ctx, cleanup, err := createTestCtx(0)
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
remotePeer := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil}
timestamp := testTimestamp
// We'll create a node announcement that includes a set of opaque data
// which we don't know of, but will store anyway in order to ensure
// upgrades can flow smoothly in the future.
nodeAnn, err := createNodeAnnouncement(
nodeKeyPriv1, timestamp, []byte("gotta validate"),
)
if err != nil {
t.Fatalf("can't create node announcement: %v", err)
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(nodeAnn, remotePeer):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process announcement: %v", err)
}
}
// assertBroadcast checks that num messages are being broadcasted from the
// gossiper. The broadcasted messages are returned.
func assertBroadcast(t *testing.T, ctx *testCtx, num int) []lnwire.Message {
t.Helper()
var msgs []lnwire.Message
for i := 0; i < num; i++ {
select {
case msg := <-ctx.broadcastedMessage:
msgs = append(msgs, msg.msg)
case <-time.After(time.Second):
t.Fatalf("expected %d messages to be broadcast, only "+
"got %d", num, i)
}
}
// No more messages should be broadcast.
select {
case msg := <-ctx.broadcastedMessage:
t.Fatalf("unexpected message was broadcast: %T", msg.msg)
case <-time.After(2 * trickleDelay):
}
return msgs
}
// assertProcessAnnouncemnt is a helper method that checks that the result of
// processing an announcement is successful.
func assertProcessAnnouncement(t *testing.T, result chan error) {
t.Helper()
select {
case err := <-result:
if err != nil {
t.Fatalf("unable to process :%v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("did not process announcement")
}
}
// TestRetransmit checks that the expected announcements are retransmitted when
// the retransmit ticker ticks.
func TestRetransmit(t *testing.T) {
t.Parallel()
ctx, cleanup, err := createTestCtx(proofMatureDelta)
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("can't generate announcements: %v", err)
}
localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remotePeer := &mockPeer{remoteKey, nil, nil}
// Process a local channel annoucement, channel update and node
// announcement. No messages should be broadcasted yet, since no proof
// has been exchanged.
assertProcessAnnouncement(
t, ctx.gossiper.ProcessLocalAnnouncement(
batch.localChanAnn, localKey,
),
)
assertBroadcast(t, ctx, 0)
assertProcessAnnouncement(
t, ctx.gossiper.ProcessLocalAnnouncement(
batch.chanUpdAnn1, localKey,
),
)
assertBroadcast(t, ctx, 0)
assertProcessAnnouncement(
t, ctx.gossiper.ProcessLocalAnnouncement(
batch.nodeAnn1, localKey,
),
)
assertBroadcast(t, ctx, 0)
// Add the remote channel update to the gossiper. Similarly, nothing
// should be broadcasted.
assertProcessAnnouncement(
t, ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
),
)
assertBroadcast(t, ctx, 0)
// Now add the local and remote proof to the gossiper, which should
// trigger a broadcast of the announcements.
assertProcessAnnouncement(
t, ctx.gossiper.ProcessLocalAnnouncement(
batch.localProofAnn, localKey,
),
)
assertBroadcast(t, ctx, 0)
assertProcessAnnouncement(
t, ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteProofAnn, remotePeer,
),
)
// checkAnncouncments make sure the expected number of channel
// announcements + channel updates + node announcements are broadcast.
checkAnnouncements := func(t *testing.T, chanAnns, chanUpds,
nodeAnns int) {
t.Helper()
num := chanAnns + chanUpds + nodeAnns
anns := assertBroadcast(t, ctx, num)
// Count the received announcements.
var chanAnn, chanUpd, nodeAnn int
for _, msg := range anns {
switch msg.(type) {
case *lnwire.ChannelAnnouncement:
chanAnn++
case *lnwire.ChannelUpdate:
chanUpd++
case *lnwire.NodeAnnouncement:
nodeAnn++
}
}
if chanAnn != chanAnns || chanUpd != chanUpds ||
nodeAnn != nodeAnns {
t.Fatalf("unexpected number of announcements: "+
"chanAnn=%d, chanUpd=%d, nodeAnn=%d",
chanAnn, chanUpd, nodeAnn)
}
}
// All announcements should be broadcast, including the remote channel
// update.
checkAnnouncements(t, 1, 2, 1)
// Now let the retransmit ticker tick, which should trigger updates to
// be rebroadcast.
now := time.Unix(int64(testTimestamp), 0)
future := now.Add(rebroadcastInterval + 10*time.Second)
select {
case ctx.gossiper.cfg.RetransmitTicker.(*ticker.Force).Force <- future:
case <-time.After(2 * time.Second):
t.Fatalf("unable to force tick")
}
// The channel announcement + local channel update + node announcement
// should be re-broadcast.
checkAnnouncements(t, 1, 1, 1)
}
// TestNodeAnnouncementNoChannels tests that NodeAnnouncements for nodes with
// no existing channels in the graph do not get forwarded.
func TestNodeAnnouncementNoChannels(t *testing.T) {
t.Parallel()
ctx, cleanup, err := createTestCtx(0)
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("can't generate announcements: %v", err)
}
remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:],
btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remotePeer := &mockPeer{remoteKey, nil, nil}
// Process the remote node announcement.
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.nodeAnn2,
remotePeer):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process announcement: %v", err)
}
// Since no channels or node announcements were already in the graph,
// the node announcement should be ignored, and not forwarded.
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Now add the node's channel to the graph by processing the channel
// announement and channel update.
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteChanAnn,
remotePeer):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process announcement: %v", err)
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn2,
remotePeer):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process announcement: %v", err)
}
// Now process the node announcement again.
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.nodeAnn2, remotePeer):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process announcement: %v", err)
}
// This time the node announcement should be forwarded. The same should
// the channel announcement and update be.
for i := 0; i < 3; i++ {
select {
case <-ctx.broadcastedMessage:
case <-time.After(time.Second):
t.Fatal("announcement wasn't broadcast")
}
}
// Processing the same node announement again should be ignored, as it
// is stale.
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.nodeAnn2,
remotePeer):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process announcement: %v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("node announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
}
// TestOptionalFieldsChannelUpdateValidation tests that we're able to properly
// validate the msg flags and optional max HTLC field of a ChannelUpdate.
func TestOptionalFieldsChannelUpdateValidation(t *testing.T) {
t.Parallel()
ctx, cleanup, err := createTestCtx(0)
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
chanUpdateHeight := uint32(0)
timestamp := uint32(123456)
nodePeer := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil}
// In this scenario, we'll test whether the message flags field in a channel
// update is properly handled.
chanAnn, err := createRemoteChannelAnnouncement(chanUpdateHeight)
if err != nil {
t.Fatalf("can't create channel announcement: %v", err)
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(chanAnn, nodePeer):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process announcement: %v", err)
}
// The first update should fail from an invalid max HTLC field, which is
// less than the min HTLC.
chanUpdAnn, err := createUpdateAnnouncement(0, 0, nodeKeyPriv1, timestamp)
if err != nil {
t.Fatalf("unable to create channel update: %v", err)
}
chanUpdAnn.HtlcMinimumMsat = 5000
chanUpdAnn.HtlcMaximumMsat = 4000
if err := signUpdate(nodeKeyPriv1, chanUpdAnn); err != nil {
t.Fatalf("unable to sign channel update: %v", err)
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(chanUpdAnn, nodePeer):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err == nil || !strings.Contains(err.Error(), "invalid max htlc") {
t.Fatalf("expected chan update to error, instead got %v", err)
}
// The second update should fail because the message flag is set but
// the max HTLC field is 0.
chanUpdAnn.HtlcMinimumMsat = 0
chanUpdAnn.HtlcMaximumMsat = 0
if err := signUpdate(nodeKeyPriv1, chanUpdAnn); err != nil {
t.Fatalf("unable to sign channel update: %v", err)
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(chanUpdAnn, nodePeer):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err == nil || !strings.Contains(err.Error(), "invalid max htlc") {
t.Fatalf("expected chan update to error, instead got %v", err)
}
// The final update should succeed, since setting the flag 0 means the
// nonsense max_htlc field will just be ignored.
chanUpdAnn.MessageFlags = 0
if err := signUpdate(nodeKeyPriv1, chanUpdAnn); err != nil {
t.Fatalf("unable to sign channel update: %v", err)
}
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(chanUpdAnn, nodePeer):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote announcement")
}
if err != nil {
t.Fatalf("unable to process announcement: %v", err)
}
}
// TestSendChannelUpdateReliably ensures that the latest channel update for a
// channel is always sent upon the remote party reconnecting.
func TestSendChannelUpdateReliably(t *testing.T) {
t.Parallel()
// We'll start by creating our test context and a batch of
// announcements.
ctx, cleanup, err := createTestCtx(uint32(proofMatureDelta))
if err != nil {
t.Fatalf("unable to create test context: %v", err)
}
defer cleanup()
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("can't generate announcements: %v", err)
}
// We'll also create two keys, one for ourselves and another for the
// remote party.
localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
// Set up a channel we can use to inspect messages sent by the
// gossiper to the remote peer.
sentToPeer := make(chan lnwire.Message, 1)
remotePeer := &mockPeer{remoteKey, sentToPeer, ctx.gossiper.quit}
// Since we first wait to be notified of the peer before attempting to
// send the message, we'll overwrite NotifyWhenOnline and
// NotifyWhenOffline to instead give us access to the channel that will
// receive the notification.
notifyOnline := make(chan chan<- lnpeer.Peer, 1)
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(_ [33]byte,
peerChan chan<- lnpeer.Peer) {
notifyOnline <- peerChan
}
notifyOffline := make(chan chan struct{}, 1)
ctx.gossiper.reliableSender.cfg.NotifyWhenOffline = func(
_ [33]byte) <-chan struct{} {
c := make(chan struct{}, 1)
notifyOffline <- c
return c
}
// assertMsgSent is a helper closure we'll use to determine if the
// correct gossip message was sent.
assertMsgSent := func(msg lnwire.Message) {
t.Helper()
select {
case msgSent := <-sentToPeer:
assertMessage(t, msg, msgSent)
case <-time.After(2 * time.Second):
t.Fatalf("did not send %v message to peer",
msg.MsgType())
}
}
// Process the channel announcement for which we'll send a channel
// update for.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localChanAnn, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local channel announcement")
}
if err != nil {
t.Fatalf("unable to process local channel announcement: %v", err)
}
// It should not be broadcast due to not having an announcement proof.
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Now, we'll process the channel update.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.chanUpdAnn1, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local channel update")
}
if err != nil {
t.Fatalf("unable to process local channel update: %v", err)
}
// It should also not be broadcast due to the announcement not having an
// announcement proof.
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// It should however send it to the peer directly. In order to do so,
// it'll request a notification for when the peer is online.
var peerChan chan<- lnpeer.Peer
select {
case peerChan = <-notifyOnline:
case <-time.After(2 * time.Second):
t.Fatal("gossiper did not request notification upon peer " +
"connection")
}
// We can go ahead and notify the peer, which should trigger the message
// to be sent.
peerChan <- remotePeer
assertMsgSent(batch.chanUpdAnn1)
// The gossiper should now request a notification for when the peer
// disconnects. We'll also trigger this now.
var offlineChan chan struct{}
select {
case offlineChan = <-notifyOffline:
case <-time.After(2 * time.Second):
t.Fatal("gossiper did not request notification upon peer " +
"disconnection")
}
close(offlineChan)
// Since it's offline, the gossiper should request another notification
// for when it comes back online.
select {
case peerChan = <-notifyOnline:
case <-time.After(2 * time.Second):
t.Fatal("gossiper did not request notification upon peer " +
"connection")
}
// Now that the remote peer is offline, we'll send a new channel update.
batch.chanUpdAnn1.Timestamp++
if err := signUpdate(nodeKeyPriv1, batch.chanUpdAnn1); err != nil {
t.Fatalf("unable to sign new channel update: %v", err)
}
// With the new update created, we'll go ahead and process it.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.chanUpdAnn1, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local channel update")
}
if err != nil {
t.Fatalf("unable to process local channel update: %v", err)
}
// It should also not be broadcast due to the announcement not having an
// announcement proof.
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// The message should not be sent since the peer remains offline.
select {
case msg := <-sentToPeer:
t.Fatalf("received unexpected message: %v", spew.Sdump(msg))
case <-time.After(time.Second):
}
// Once again, we'll notify the peer is online and ensure the new
// channel update is received. This will also cause an offline
// notification to be requested again.
peerChan <- remotePeer
assertMsgSent(batch.chanUpdAnn1)
select {
case offlineChan = <-notifyOffline:
case <-time.After(2 * time.Second):
t.Fatal("gossiper did not request notification upon peer " +
"disconnection")
}
// We'll then exchange proofs with the remote peer in order to announce
// the channel.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
batch.localProofAnn, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local channel proof")
}
if err != nil {
t.Fatalf("unable to process local channel proof: %v", err)
}
// No messages should be broadcast as we don't have the full proof yet.
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
// Our proof should be sent to the remote peer however.
assertMsgSent(batch.localProofAnn)
select {
case err = <-ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteProofAnn, remotePeer,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process remote channel proof")
}
if err != nil {
t.Fatalf("unable to process remote channel proof: %v", err)
}
// Now that we've constructed our full proof, we can assert that the
// channel has been announced.
for i := 0; i < 2; i++ {
select {
case <-ctx.broadcastedMessage:
case <-time.After(2 * trickleDelay):
t.Fatal("expected channel to be announced")
}
}
// With the channel announced, we'll generate a new channel update. This
// one won't take the path of the reliable sender, as the channel has
// already been announced. We'll keep track of the old message that is
// now stale to use later on.
staleChannelUpdate := batch.chanUpdAnn1
newChannelUpdate := &lnwire.ChannelUpdate{}
*newChannelUpdate = *staleChannelUpdate
newChannelUpdate.Timestamp++
if err := signUpdate(nodeKeyPriv1, newChannelUpdate); err != nil {
t.Fatalf("unable to sign new channel update: %v", err)
}
// Process the new channel update. It should not be sent to the peer
// directly since the reliable sender only applies when the channel is
// not announced.
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
newChannelUpdate, localKey,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local channel update")
}
if err != nil {
t.Fatalf("unable to process local channel update: %v", err)
}
select {
case <-ctx.broadcastedMessage:
case <-time.After(2 * trickleDelay):
t.Fatal("channel update was not broadcast")
}
select {
case msg := <-sentToPeer:
t.Fatalf("received unexpected message: %v", spew.Sdump(msg))
case <-time.After(time.Second):
}
// Then, we'll trigger the reliable sender to send its pending messages
// by triggering an offline notification for the peer, followed by an
// online one.
close(offlineChan)
select {
case peerChan = <-notifyOnline:
case <-time.After(2 * time.Second):
t.Fatal("gossiper did not request notification upon peer " +
"connection")
}
peerChan <- remotePeer
// At this point, we should have sent both the AnnounceSignatures and
// stale ChannelUpdate.
for i := 0; i < 2; i++ {
var msg lnwire.Message
select {
case msg = <-sentToPeer:
case <-time.After(time.Second):
t.Fatal("expected to send message")
}
switch msg := msg.(type) {
case *lnwire.ChannelUpdate:
assertMessage(t, staleChannelUpdate, msg)
case *lnwire.AnnounceSignatures:
assertMessage(t, batch.localProofAnn, msg)
default:
t.Fatalf("send unexpected %v message", msg.MsgType())
}
}
// Since the messages above are now deemed as stale, they should be
// removed from the message store.
err = wait.NoError(func() error {
msgs, err := ctx.gossiper.cfg.MessageStore.Messages()
if err != nil {
return fmt.Errorf("unable to retrieve pending "+
"messages: %v", err)
}
if len(msgs) != 0 {
return fmt.Errorf("expected no messages left, found %d",
len(msgs))
}
return nil
}, time.Second)
if err != nil {
t.Fatal(err)
}
}
func sendLocalMsg(t *testing.T, ctx *testCtx, msg lnwire.Message,
localPub *btcec.PublicKey, optionalMsgFields ...OptionalMsgField) {
t.Helper()
var err error
select {
case err = <-ctx.gossiper.ProcessLocalAnnouncement(
msg, localPub, optionalMsgFields...,
):
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
if err != nil {
t.Fatalf("unable to process channel msg: %v", err)
}
}
func sendRemoteMsg(t *testing.T, ctx *testCtx, msg lnwire.Message,
remotePeer lnpeer.Peer) {
t.Helper()
select {
case err := <-ctx.gossiper.ProcessRemoteAnnouncement(msg, remotePeer):
if err != nil {
t.Fatalf("unable to process channel msg: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("did not process local announcement")
}
}
func assertBroadcastMsg(t *testing.T, ctx *testCtx,
predicate func(lnwire.Message) error) {
t.Helper()
// We don't care about the order of the broadcast, only that our target
// predicate returns true for any of the messages, so we'll continue to
// retry until either we hit our timeout, or it returns with no error
// (message found).
err := wait.NoError(func() error {
select {
case msg := <-ctx.broadcastedMessage:
return predicate(msg.msg)
case <-time.After(2 * trickleDelay):
return fmt.Errorf("no message broadcast")
}
}, time.Second*5)
if err != nil {
t.Fatal(err)
}
}
// TestPropagateChanPolicyUpdate tests that we're able to issue requests to
// update policies for all channels and also select target channels.
// Additionally, we ensure that we don't propagate updates for any private
// channels.
func TestPropagateChanPolicyUpdate(t *testing.T) {
t.Parallel()
// First, we'll make out test context and add 3 random channels to the
// graph.
startingHeight := uint32(10)
ctx, cleanup, err := createTestCtx(startingHeight)
if err != nil {
t.Fatalf("unable to create test context: %v", err)
}
defer cleanup()
const numChannels = 3
channelsToAnnounce := make([]*annBatch, 0, numChannels)
for i := 0; i < numChannels; i++ {
newChan, err := createAnnouncements(uint32(i + 1))
if err != nil {
t.Fatalf("unable to make new channel ann: %v", err)
}
channelsToAnnounce = append(channelsToAnnounce, newChan)
}
localKey := nodeKeyPriv1.PubKey()
remoteKey := nodeKeyPriv2.PubKey()
sentMsgs := make(chan lnwire.Message, 10)
remotePeer := &mockPeer{remoteKey, sentMsgs, ctx.gossiper.quit}
// The forced code path for sending the private ChannelUpdate to the
// remote peer will be hit, forcing it to request a notification that
// the remote peer is active. We'll ensure that it targets the proper
// pubkey, and hand it our mock peer above.
notifyErr := make(chan error, 1)
ctx.gossiper.reliableSender.cfg.NotifyWhenOnline = func(
targetPub [33]byte, peerChan chan<- lnpeer.Peer) {
if !bytes.Equal(targetPub[:], remoteKey.SerializeCompressed()) {
notifyErr <- fmt.Errorf("reliableSender attempted to send the "+
"message to the wrong peer: expected %x got %x",
remoteKey.SerializeCompressed(),
targetPub)
}
peerChan <- remotePeer
}
// With our channel announcements created, we'll now send them all to
// the gossiper in order for it to process. However, we'll hold back
// the channel ann proof from the first channel in order to have it be
// marked as private channel.
firstChanID := channelsToAnnounce[0].localChanAnn.ShortChannelID
for i, batch := range channelsToAnnounce {
// channelPoint ensures that each channel policy in the map
// returned by PropagateChanPolicyUpdate has a unique key. Since
// the map is keyed by wire.OutPoint, we want to ensure that
// each channel has a unique channel point.
channelPoint := ChannelPoint(wire.OutPoint{Index: uint32(i)})
sendLocalMsg(t, ctx, batch.localChanAnn, localKey, channelPoint)
sendLocalMsg(t, ctx, batch.chanUpdAnn1, localKey)
sendLocalMsg(t, ctx, batch.nodeAnn1, localKey)
sendRemoteMsg(t, ctx, batch.chanUpdAnn2, remotePeer)
sendRemoteMsg(t, ctx, batch.nodeAnn2, remotePeer)
// We'll skip sending the auth proofs from the first channel to
// ensure that it's seen as a private channel.
if batch.localChanAnn.ShortChannelID == firstChanID {
continue
}
sendLocalMsg(t, ctx, batch.localProofAnn, localKey)
sendRemoteMsg(t, ctx, batch.remoteProofAnn, remotePeer)
}
// Drain out any broadcast or direct messages we might not have read up
// to this point. We'll also check out notifyErr to detect if the
// reliable sender had an issue sending to the remote peer.
out:
for {
select {
case <-ctx.broadcastedMessage:
case <-sentMsgs:
case err := <-notifyErr:
t.Fatal(err)
default:
break out
}
}
// Now that all of our channels are loaded, we'll attempt to update the
// policy of all of them.
const newTimeLockDelta = 100
var edgesToUpdate []EdgeWithInfo
err = ctx.router.ForAllOutgoingChannels(func(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {
edge.TimeLockDelta = uint16(newTimeLockDelta)
edgesToUpdate = append(edgesToUpdate, EdgeWithInfo{
Info: info,
Edge: edge,
})
return nil
})
if err != nil {
t.Fatal(err)
}
err = ctx.gossiper.PropagateChanPolicyUpdate(edgesToUpdate)
if err != nil {
t.Fatalf("unable to chan policies: %v", err)
}
// Two channel updates should now be broadcast, with neither of them
// being the channel our first private channel.
for i := 0; i < numChannels-1; i++ {
assertBroadcastMsg(t, ctx, func(msg lnwire.Message) error {
upd, ok := msg.(*lnwire.ChannelUpdate)
if !ok {
return fmt.Errorf("channel update not "+
"broadcast, instead %T was", msg)
}
if upd.ShortChannelID == firstChanID {
return fmt.Errorf("private channel upd " +
"broadcast")
}
if upd.TimeLockDelta != newTimeLockDelta {
return fmt.Errorf("wrong delta: expected %v, "+
"got %v", newTimeLockDelta,
upd.TimeLockDelta)
}
return nil
})
}
// Finally the ChannelUpdate should have been sent directly to the
// remote peer via the reliable sender.
select {
case msg := <-sentMsgs:
upd, ok := msg.(*lnwire.ChannelUpdate)
if !ok {
t.Fatalf("channel update not "+
"broadcast, instead %T was", msg)
}
if upd.TimeLockDelta != newTimeLockDelta {
t.Fatalf("wrong delta: expected %v, "+
"got %v", newTimeLockDelta,
upd.TimeLockDelta)
}
if upd.ShortChannelID != firstChanID {
t.Fatalf("private channel upd " +
"broadcast")
}
case <-time.After(time.Second * 5):
t.Fatalf("message not sent directly to peer")
}
// At this point, no other ChannelUpdate messages should be broadcast
// as we sent the two public ones to the network, and the private one
// was sent directly to the peer.
for {
select {
case msg := <-ctx.broadcastedMessage:
if upd, ok := msg.msg.(*lnwire.ChannelUpdate); ok {
if upd.ShortChannelID == firstChanID {
t.Fatalf("chan update msg received: %v",
spew.Sdump(msg))
}
}
default:
return
}
}
}
// TestProcessChannelAnnouncementOptionalMsgFields ensures that the gossiper can
// properly handled optional message fields provided by the caller when
// processing a channel announcement.
func TestProcessChannelAnnouncementOptionalMsgFields(t *testing.T) {
t.Parallel()
// We'll start by creating our test context and a set of test channel
// announcements.
ctx, cleanup, err := createTestCtx(0)
if err != nil {
t.Fatalf("unable to create test context: %v", err)
}
defer cleanup()
chanAnn1 := createAnnouncementWithoutProof(100)
chanAnn2 := createAnnouncementWithoutProof(101)
localKey := nodeKeyPriv1.PubKey()
// assertOptionalMsgFields is a helper closure that ensures the optional
// message fields were set as intended.
assertOptionalMsgFields := func(chanID lnwire.ShortChannelID,
capacity btcutil.Amount, channelPoint wire.OutPoint) {
t.Helper()
edge, _, _, err := ctx.router.GetChannelByID(chanID)
if err != nil {
t.Fatalf("unable to get channel by id: %v", err)
}
if edge.Capacity != capacity {
t.Fatalf("expected capacity %v, got %v", capacity,
edge.Capacity)
}
if edge.ChannelPoint != channelPoint {
t.Fatalf("expected channel point %v, got %v",
channelPoint, edge.ChannelPoint)
}
}
// We'll process the first announcement without any optional fields. We
// should see the channel's capacity and outpoint have a zero value.
sendLocalMsg(t, ctx, chanAnn1, localKey)
assertOptionalMsgFields(chanAnn1.ShortChannelID, 0, wire.OutPoint{})
// Providing the capacity and channel point as optional fields should
// propagate them all the way down to the router.
capacity := btcutil.Amount(1000)
channelPoint := wire.OutPoint{Index: 1}
sendLocalMsg(
t, ctx, chanAnn2, localKey, ChannelCapacity(capacity),
ChannelPoint(channelPoint),
)
assertOptionalMsgFields(chanAnn2.ShortChannelID, capacity, channelPoint)
}
func assertMessage(t *testing.T, expected, got lnwire.Message) {
t.Helper()
if !reflect.DeepEqual(expected, got) {
t.Fatalf("expected: %v\ngot: %v", spew.Sdump(expected),
spew.Sdump(got))
}
}
// TestSplitAnnouncementsCorrectSubBatches checks that we split a given
// sizes of announcement list into the correct number of batches.
func TestSplitAnnouncementsCorrectSubBatches(t *testing.T) {
t.Parallel()
const subBatchSize = 10
announcementBatchSizes := []int{2, 5, 20, 45, 80, 100, 1005}
expectedNumberMiniBatches := []int{1, 1, 2, 5, 8, 10, 101}
lengthAnnouncementBatchSizes := len(announcementBatchSizes)
lengthExpectedNumberMiniBatches := len(expectedNumberMiniBatches)
if lengthAnnouncementBatchSizes != lengthExpectedNumberMiniBatches {
t.Fatal("Length of announcementBatchSizes and " +
"expectedNumberMiniBatches should be equal")
}
for testIndex := range announcementBatchSizes {
var batchSize = announcementBatchSizes[testIndex]
announcementBatch := make([]msgWithSenders, batchSize)
splitAnnouncementBatch := splitAnnouncementBatches(
subBatchSize, announcementBatch,
)
lengthMiniBatches := len(splitAnnouncementBatch)
if lengthMiniBatches != expectedNumberMiniBatches[testIndex] {
t.Fatalf("Expecting %d mini batches, actual %d",
expectedNumberMiniBatches[testIndex], lengthMiniBatches)
}
}
}
func assertCorrectSubBatchSize(t *testing.T, expectedSubBatchSize,
actualSubBatchSize int) {
t.Helper()
if actualSubBatchSize != expectedSubBatchSize {
t.Fatalf("Expecting subBatch size of %d, actual %d",
expectedSubBatchSize, actualSubBatchSize)
}
}
// TestCalculateCorrectSubBatchSize checks that we check the correct
// sub batch size for each of the input vectors of batch sizes.
func TestCalculateCorrectSubBatchSizes(t *testing.T) {
t.Parallel()
const minimumSubBatchSize = 10
const batchDelay = time.Duration(100)
const subBatchDelay = time.Duration(10)
batchSizes := []int{2, 200, 250, 305, 352, 10010, 1000001}
expectedSubBatchSize := []int{10, 20, 25, 31, 36, 1001, 100001}
for testIndex := range batchSizes {
batchSize := batchSizes[testIndex]
expectedBatchSize := expectedSubBatchSize[testIndex]
actualSubBatchSize := calculateSubBatchSize(
batchDelay, subBatchDelay, minimumSubBatchSize, batchSize,
)
assertCorrectSubBatchSize(t, expectedBatchSize, actualSubBatchSize)
}
}
// TestCalculateCorrectSubBatchSizesDifferentDelay checks that we check the
// correct sub batch size for each of different delay.
func TestCalculateCorrectSubBatchSizesDifferentDelay(t *testing.T) {
t.Parallel()
const batchSize = 100
const minimumSubBatchSize = 10
batchDelays := []time.Duration{100, 50, 20, 25, 5, 0}
const subBatchDelay = 10
expectedSubBatchSize := []int{10, 20, 50, 40, 100, 100}
for testIndex := range batchDelays {
batchDelay := batchDelays[testIndex]
expectedBatchSize := expectedSubBatchSize[testIndex]
actualSubBatchSize := calculateSubBatchSize(
batchDelay, subBatchDelay, minimumSubBatchSize, batchSize,
)
assertCorrectSubBatchSize(t, expectedBatchSize, actualSubBatchSize)
}
}
// TestBroadcastAnnsAfterGraphSynced ensures that we only broadcast
// announcements after the graph has been considered as synced, i.e., after our
// initial historical sync has completed.
func TestBroadcastAnnsAfterGraphSynced(t *testing.T) {
t.Parallel()
ctx, cleanup, err := createTestCtx(10)
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
// We'll mark the graph as not synced. This should prevent us from
// broadcasting any messages we've received as part of our initial
// historical sync.
ctx.gossiper.syncMgr.markGraphSyncing()
assertBroadcast := func(msg lnwire.Message, isRemote bool,
shouldBroadcast bool) {
t.Helper()
nodePeer := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil}
var errChan chan error
if isRemote {
errChan = ctx.gossiper.ProcessRemoteAnnouncement(
msg, nodePeer,
)
} else {
errChan = ctx.gossiper.ProcessLocalAnnouncement(
msg, nodePeer.pk,
)
}
select {
case err := <-errChan:
if err != nil {
t.Fatalf("unable to process gossip message: %v",
err)
}
case <-time.After(2 * time.Second):
t.Fatal("gossip message not processed")
}
select {
case <-ctx.broadcastedMessage:
if !shouldBroadcast {
t.Fatal("gossip message was broadcast")
}
case <-time.After(2 * trickleDelay):
if shouldBroadcast {
t.Fatal("gossip message wasn't broadcast")
}
}
}
// A remote channel announcement should not be broadcast since the graph
// has not yet been synced.
chanAnn1, err := createRemoteChannelAnnouncement(0)
if err != nil {
t.Fatalf("unable to create channel announcement: %v", err)
}
assertBroadcast(chanAnn1, true, false)
// A local channel announcement should be broadcast though, regardless
// of whether we've synced our graph or not.
chanUpd, err := createUpdateAnnouncement(0, 0, nodeKeyPriv1, 1)
if err != nil {
t.Fatalf("unable to create channel announcement: %v", err)
}
assertBroadcast(chanUpd, false, true)
// Mark the graph as synced, which should allow the channel announcement
// should to be broadcast.
ctx.gossiper.syncMgr.markGraphSynced()
chanAnn2, err := createRemoteChannelAnnouncement(1)
if err != nil {
t.Fatalf("unable to create channel announcement: %v", err)
}
assertBroadcast(chanAnn2, true, true)
}