diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 64302de94..56bd79294 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -14,6 +14,7 @@ import ( "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/multimutex" @@ -35,8 +36,9 @@ var ( // networkMsg couples a routing related wire message with the peer that // originally sent it. type networkMsg struct { - peer *btcec.PublicKey - msg lnwire.Message + peer lnpeer.Peer + source *btcec.PublicKey + msg lnwire.Message isRemote bool @@ -97,6 +99,11 @@ type Config struct { // messages to a particular peer identified by the target public key. SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error + // FindPeer returns the actively registered peer for a given remote + // public key. An error is returned if the peer was not found or a + // shutdown has been requested. + FindPeer func(identityKey *btcec.PublicKey) (lnpeer.Peer, error) + // NotifyWhenOnline is a function that allows the gossiper to be // notified when a certain peer comes online, allowing it to // retry sending a peer message. @@ -147,8 +154,8 @@ type AuthenticatedGossiper struct { // as we know it. To be used atomically. bestHeight uint32 - quit chan struct{} - wg sync.WaitGroup + quit chan struct{} + wg sync.WaitGroup // cfg is a copy of the configuration struct that the gossiper service // was initialized with. @@ -244,7 +251,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) { // the entire network graph is read from disk, then serialized to the format // defined within the current wire protocol. This cache of graph data is then // sent directly to the target node. -func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) error { +func (d *AuthenticatedGossiper) SynchronizeNode(syncPeer lnpeer.Peer) error { // TODO(roasbeef): need to also store sig data in db // * will be nice when we switch to pairing sigs would only need one ^_^ @@ -355,12 +362,12 @@ func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) error { } log.Infof("Syncing channel graph state with %x, sending %v "+ - "vertexes and %v edges", pub.SerializeCompressed(), + "vertexes and %v edges", syncPeer.PubKey(), numNodes, numEdges) // With all the announcement messages gathered, send them all in a // single batch to the target peer. - return d.cfg.SendToPeer(pub, announceMessages...) + return syncPeer.SendMessage(false, announceMessages...) } // PropagateChanPolicyUpdate signals the AuthenticatedGossiper to update the @@ -452,12 +459,13 @@ func (d *AuthenticatedGossiper) Stop() { // peers. Remote channel announcements should contain the announcement proof // and be fully validated. func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, - src *btcec.PublicKey) chan error { + peer lnpeer.Peer) chan error { nMsg := &networkMsg{ msg: msg, isRemote: true, - peer: src, + peer: peer, + source: peer.IdentityKey(), err: make(chan error, 1), } @@ -478,12 +486,12 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, // entire channel announcement and update messages will be re-constructed and // broadcast to the rest of the network. func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message, - src *btcec.PublicKey) chan error { + source *btcec.PublicKey) chan error { nMsg := &networkMsg{ msg: msg, isRemote: false, - peer: src, + source: source, err: make(chan error, 1), } @@ -587,7 +595,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) { // Channel announcements are identified by the short channel id field. case *lnwire.ChannelAnnouncement: deDupKey := msg.ShortChannelID - sender := routing.NewVertex(message.peer) + sender := routing.NewVertex(message.source) mws, ok := d.channelAnnouncements[deDupKey] if !ok { @@ -609,7 +617,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) { // Channel updates are identified by the (short channel id, flags) // tuple. case *lnwire.ChannelUpdate: - sender := routing.NewVertex(message.peer) + sender := routing.NewVertex(message.source) deDupKey := channelUpdateID{ msg.ShortChannelID, msg.Flags, @@ -658,7 +666,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) { // Node announcements are identified by the Vertex field. Use the // NodeID to create the corresponding Vertex. case *lnwire.NodeAnnouncement: - sender := routing.NewVertex(message.peer) + sender := routing.NewVertex(message.source) deDupKey := routing.Vertex(msg.NodeID) // We do the same for node announcements as we did for channel @@ -875,7 +883,7 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { // gossip syncer for an inbound message so we can properly dispatch the // incoming message. If a gossip syncer isn't found, then one will be created // for the target peer. -func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSyncer { +func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (*gossipSyncer, error) { target := routing.NewVertex(pub) // First, we'll try to find an existing gossiper for this peer. @@ -885,16 +893,26 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSy // If one exists, then we'll return it directly. if ok { - return syncer + return syncer, nil } - // Otherwise, we'll obtain the mutex, then check again if a gossiper - // was added after we dropped the read mutex. + // A known gossip syncer doesn't exist, so we may have to create one + // from scratch. To do so, we'll query for a reference directly to the + // active peer. + syncPeer, err := d.cfg.FindPeer(pub) + if err != nil { + log.Debugf("unable to find gossip peer %v: %v", + pub.SerializeCompressed(), err) + return nil, err + } + + // Finally, we'll obtain the exclusive mutex, then check again if a + // gossiper was added after we dropped the read mutex. d.syncerMtx.Lock() syncer, ok = d.peerSyncers[target] if ok { d.syncerMtx.Unlock() - return syncer + return syncer, nil } // At this point, a syncer doesn't yet exist, so we'll create a new one @@ -905,7 +923,7 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSy channelSeries: d.cfg.ChanSeries, encodingType: lnwire.EncodingSortedPlain, sendToPeer: func(msgs ...lnwire.Message) error { - return d.cfg.SendToPeer(pub, msgs...) + return syncPeer.SendMessage(false, msgs...) }, }) copy(syncer.peerPub[:], pub.SerializeCompressed()) @@ -914,7 +932,7 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSy d.syncerMtx.Unlock() - return syncer + return syncer, nil } // networkHandler is the primary goroutine that drives this service. The roles @@ -992,15 +1010,20 @@ func (d *AuthenticatedGossiper) networkHandler() { // then we'll dispatch that directly to the proper // gossipSyncer. case *lnwire.GossipTimestampRange: - syncer := d.findGossipSyncer(announcement.peer) + syncer, err := d.findGossipSyncer( + announcement.source, + ) + if err != nil { + continue + } // If we've found the message target, then // we'll dispatch the message directly to it. - err := syncer.ApplyGossipFilter(msg) + err = syncer.ApplyGossipFilter(msg) if err != nil { log.Warnf("unable to apply gossip "+ "filter for peer=%x: %v", - announcement.peer.SerializeCompressed(), err) + announcement.peer.PubKey(), err) } continue @@ -1012,7 +1035,12 @@ func (d *AuthenticatedGossiper) networkHandler() { *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd: - syncer := d.findGossipSyncer(announcement.peer) + syncer, err := d.findGossipSyncer( + announcement.source, + ) + if err != nil { + continue + } syncer.ProcessQueryMsg(announcement.msg) continue @@ -1193,19 +1221,19 @@ func (d *AuthenticatedGossiper) networkHandler() { // needed to handle new queries. The recvUpdates bool indicates if we should // continue to receive real-time updates from the remote peer once we've synced // channel state. -func (d *AuthenticatedGossiper) InitSyncState(peer *btcec.PublicKey, recvUpdates bool) { +func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, recvUpdates bool) { d.syncerMtx.Lock() defer d.syncerMtx.Unlock() // If we already have a syncer, then we'll exit early as we don't want // to override it. - nodeID := routing.NewVertex(peer) + nodeID := routing.Vertex(syncPeer.PubKey()) if _, ok := d.peerSyncers[nodeID]; ok { return } log.Infof("Creating new gossipSyncer for peer=%x", - peer.SerializeCompressed()) + nodeID) syncer := newGossiperSyncer(gossipSyncerCfg{ chainHash: d.cfg.ChainHash, @@ -1213,10 +1241,10 @@ func (d *AuthenticatedGossiper) InitSyncState(peer *btcec.PublicKey, recvUpdates channelSeries: d.cfg.ChanSeries, encodingType: lnwire.EncodingSortedPlain, sendToPeer: func(msgs ...lnwire.Message) error { - return d.cfg.SendToPeer(peer, msgs...) + return syncPeer.SendMessage(false, msgs...) }, }) - copy(syncer.peerPub[:], peer.SerializeCompressed()) + copy(syncer.peerPub[:], nodeID[:]) d.peerSyncers[nodeID] = syncer syncer.Start() @@ -1429,8 +1457,8 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate( // We set ourselves as the source of this message to indicate // that we shouldn't skip any peers when sending this message. chanUpdates = append(chanUpdates, networkMsg{ - peer: d.selfKey, - msg: chanUpdate, + source: d.selfKey, + msg: chanUpdate, }) } @@ -1502,19 +1530,19 @@ func (d *AuthenticatedGossiper) processRejectedEdge(chanAnnMsg *lnwire.ChannelAn // our peers. announcements := make([]networkMsg, 0, 3) announcements = append(announcements, networkMsg{ - msg: chanAnn, - peer: d.selfKey, + source: d.selfKey, + msg: chanAnn, }) if e1Ann != nil { announcements = append(announcements, networkMsg{ - msg: e1Ann, - peer: d.selfKey, + source: d.selfKey, + msg: e1Ann, }) } if e2Ann != nil { announcements = append(announcements, networkMsg{ - msg: e2Ann, - peer: d.selfKey, + source: d.selfKey, + msg: e2Ann, }) } @@ -1591,8 +1619,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // Node announcement was successfully proceeded and know it // might be broadcast to other connected nodes. announcements = append(announcements, networkMsg{ - msg: msg, - peer: nMsg.peer, + peer: nMsg.peer, + source: nMsg.source, + msg: msg, }) nMsg.err <- nil @@ -1776,7 +1805,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n msg, nMsg.peer) } else { err = <-d.ProcessLocalAnnouncement( - msg, nMsg.peer) + msg, nMsg.source) } if err != nil { log.Errorf("Failed reprocessing"+ @@ -1801,8 +1830,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // announcement with proof (remote). if proof != nil { announcements = append(announcements, networkMsg{ - msg: msg, - peer: nMsg.peer, + peer: nMsg.peer, + source: nMsg.source, + msg: msg, }) } @@ -1987,20 +2017,29 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // so we'll try sending the update directly to the remote peer. if !nMsg.isRemote && chanInfo.AuthProof == nil { // Get our peer's public key. - var remotePeer *btcec.PublicKey + var remotePub *btcec.PublicKey switch { case msg.Flags&lnwire.ChanUpdateDirection == 0: - remotePeer, _ = chanInfo.NodeKey2() + remotePub, _ = chanInfo.NodeKey2() case msg.Flags&lnwire.ChanUpdateDirection == 1: - remotePeer, _ = chanInfo.NodeKey1() + remotePub, _ = chanInfo.NodeKey1() } - // Send ChannelUpdate directly to remotePeer. - // TODO(halseth): make reliable send? - if err = d.cfg.SendToPeer(remotePeer, msg); err != nil { - log.Errorf("unable to send channel update "+ - "message to peer %x: %v", - remotePeer.SerializeCompressed(), err) + sPeer, err := d.cfg.FindPeer(remotePub) + if err != nil { + log.Errorf("unable to send channel update -- "+ + "could not find peer %x: %v", + remotePub, err) + } else { + // Send ChannelUpdate directly to remotePeer. + // TODO(halseth): make reliable send? + err = sPeer.SendMessage(false, msg) + if err != nil { + log.Errorf("unable to send channel "+ + "update message to peer %x: %v", + remotePub.SerializeCompressed(), + err) + } } } @@ -2010,8 +2049,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // has an attached authentication proof. if chanInfo.AuthProof != nil { announcements = append(announcements, networkMsg{ - msg: msg, - peer: nMsg.peer, + peer: nMsg.peer, + source: nMsg.source, + msg: msg, }) } @@ -2083,10 +2123,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n return nil } - isFirstNode := bytes.Equal(nMsg.peer.SerializeCompressed(), - chanInfo.NodeKey1Bytes[:]) - isSecondNode := bytes.Equal(nMsg.peer.SerializeCompressed(), - chanInfo.NodeKey2Bytes[:]) + nodeID := nMsg.source.SerializeCompressed() + isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:]) + isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:]) // Ensure that channel that was retrieved belongs to the peer // which sent the proof announcement. @@ -2130,7 +2169,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // received our local proof yet. So be kind and send // them the full proof. if nMsg.isRemote { - peerID := nMsg.peer.SerializeCompressed() + peerID := nMsg.source.SerializeCompressed() log.Debugf("Got AnnounceSignatures for " + "channel with full proof.") @@ -2151,7 +2190,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n log.Errorf("unable to gen ann: %v", err) return } - err = d.cfg.SendToPeer(nMsg.peer, chanAnn) + err = nMsg.peer.SendMessage(false, chanAnn) if err != nil { log.Errorf("Failed sending "+ "full proof to "+ @@ -2271,19 +2310,22 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // Assemble the necessary announcements to add to the next // broadcasting batch. announcements = append(announcements, networkMsg{ - msg: chanAnn, - peer: nMsg.peer, + peer: nMsg.peer, + source: nMsg.source, + msg: chanAnn, }) if e1Ann != nil { announcements = append(announcements, networkMsg{ - msg: e1Ann, - peer: nMsg.peer, + peer: nMsg.peer, + source: nMsg.source, + msg: e1Ann, }) } if e2Ann != nil { announcements = append(announcements, networkMsg{ - msg: e2Ann, - peer: nMsg.peer, + peer: nMsg.peer, + source: nMsg.source, + msg: e2Ann, }) } @@ -2493,15 +2535,10 @@ func (d *AuthenticatedGossiper) maybeRequestChanAnn(cid lnwire.ShortChannelID) e // If this syncer is already at the terminal state, then we'll // chose it to request the fully channel update. if syncer.SyncState() == chansSynced { - pub, err := btcec.ParsePubKey(nodeID[:], btcec.S256()) - if err != nil { - return err - } - log.Debugf("attempting to request chan ann for "+ "chan_id=%v from node=%x", cid, nodeID[:]) - return d.cfg.SendToPeer(pub, &lnwire.QueryShortChanIDs{ + return syncer.cfg.sendToPeer(&lnwire.QueryShortChanIDs{ ChainHash: d.cfg.ChainHash, EncodingType: lnwire.EncodingSortedPlain, ShortChanIDs: []lnwire.ShortChannelID{cid},