package discovery import ( "bytes" "errors" "fmt" "sync" "time" "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/davecgh/go-spew/spew" "github.com/lightninglabs/neutrino/cache" "github.com/lightninglabs/neutrino/cache/lru" "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/kvdb" "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/multimutex" "github.com/lightningnetwork/lnd/netann" "github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/ticker" "golang.org/x/time/rate" ) const ( // DefaultMaxChannelUpdateBurst is the default maximum number of updates // for a specific channel and direction that we'll accept over an // interval. DefaultMaxChannelUpdateBurst = 10 // DefaultChannelUpdateInterval is the default interval we'll use to // determine how often we should allow a new update for a specific // channel and direction. DefaultChannelUpdateInterval = time.Minute // maxPrematureUpdates tracks the max amount of premature channel // updates that we'll hold onto. maxPrematureUpdates = 100 // maxRejectedUpdates tracks the max amount of rejected channel updates // we'll maintain. This is the global size across all peers. We'll // allocate ~3 MB max to the cache. maxRejectedUpdates = 10_000 ) var ( // ErrGossiperShuttingDown is an error that is returned if the gossiper // is in the process of being shut down. ErrGossiperShuttingDown = errors.New("gossiper is shutting down") // ErrGossipSyncerNotFound signals that we were unable to find an active // gossip syncer corresponding to a gossip query message received from // the remote peer. ErrGossipSyncerNotFound = errors.New("gossip syncer not found") // emptyPubkey is used to compare compressed pubkeys against an empty // byte array. emptyPubkey [33]byte ) // optionalMsgFields is a set of optional message fields that external callers // can provide that serve useful when processing a specific network // announcement. type optionalMsgFields struct { capacity *btcutil.Amount channelPoint *wire.OutPoint } // apply applies the optional fields within the functional options. func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) { for _, optionalMsgField := range optionalMsgFields { optionalMsgField(f) } } // OptionalMsgField is a functional option parameter that can be used to provide // external information that is not included within a network message but serves // useful when processing it. type OptionalMsgField func(*optionalMsgFields) // ChannelCapacity is an optional field that lets the gossiper know of the // capacity of a channel. func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField { return func(f *optionalMsgFields) { f.capacity = &capacity } } // ChannelPoint is an optional field that lets the gossiper know of the outpoint // of a channel. func ChannelPoint(op wire.OutPoint) OptionalMsgField { return func(f *optionalMsgFields) { f.channelPoint = &op } } // networkMsg couples a routing related wire message with the peer that // originally sent it. type networkMsg struct { peer lnpeer.Peer source *btcec.PublicKey msg lnwire.Message optionalMsgFields *optionalMsgFields isRemote bool err chan error } // chanPolicyUpdateRequest is a request that is sent to the server when a caller // wishes to update a particular set of channels. New ChannelUpdate messages // will be crafted to be sent out during the next broadcast epoch and the fee // updates committed to the lower layer. type chanPolicyUpdateRequest struct { edgesToUpdate []EdgeWithInfo errChan chan error } // PinnedSyncers is a set of node pubkeys for which we will maintain an active // syncer at all times. type PinnedSyncers map[route.Vertex]struct{} // Config defines the configuration for the service. ALL elements within the // configuration MUST be non-nil for the service to carry out its duties. type Config struct { // ChainHash is a hash that indicates which resident chain of the // AuthenticatedGossiper. Any announcements that don't match this // chain hash will be ignored. // // TODO(roasbeef): eventually make into map so can de-multiplex // incoming announcements // * also need to do same for Notifier ChainHash chainhash.Hash // Router is the subsystem which is responsible for managing the // topology of lightning network. After incoming channel, node, channel // updates announcements are validated they are sent to the router in // order to be included in the LN graph. Router routing.ChannelGraphSource // ChanSeries is an interfaces that provides access to a time series // view of the current known channel graph. Each GossipSyncer enabled // peer will utilize this in order to create and respond to channel // graph time series queries. ChanSeries ChannelGraphTimeSeries // Notifier is used for receiving notifications of incoming blocks. // With each new incoming block found we process previously premature // announcements. // // TODO(roasbeef): could possibly just replace this with an epoch // channel. Notifier chainntnfs.ChainNotifier // Broadcast broadcasts a particular set of announcements to all peers // that the daemon is connected to. If supplied, the exclude parameter // indicates that the target peer should be excluded from the // broadcast. Broadcast func(skips map[route.Vertex]struct{}, msg ...lnwire.Message) error // NotifyWhenOnline is a function that allows the gossiper to be // notified when a certain peer comes online, allowing it to // retry sending a peer message. // // NOTE: The peerChan channel must be buffered. NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer) // NotifyWhenOffline is a function that allows the gossiper to be // notified when a certain peer disconnects, allowing it to request a // notification for when it reconnects. NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{} // SelfNodeAnnouncement is a function that fetches our own current node // announcement, for use when determining whether we should update our // peers about our presence on the network. If the refresh is true, a // new and updated announcement will be returned. SelfNodeAnnouncement func(refresh bool) (lnwire.NodeAnnouncement, error) // ProofMatureDelta the number of confirmations which is needed before // exchange the channel announcement proofs. ProofMatureDelta uint32 // TrickleDelay the period of trickle timer which flushes to the // network the pending batch of new announcements we've received since // the last trickle tick. TrickleDelay time.Duration // RetransmitTicker is a ticker that ticks with a period which // indicates that we should check if we need re-broadcast any of our // personal channels. RetransmitTicker ticker.Ticker // RebroadcastInterval is the maximum time we wait between sending out // channel updates for our active channels and our own node // announcement. We do this to ensure our active presence on the // network is known, and we are not being considered a zombie node or // having zombie channels. RebroadcastInterval time.Duration // WaitingProofStore is a persistent storage of partial channel proof // announcement messages. We use it to buffer half of the material // needed to reconstruct a full authenticated channel announcement. // Once we receive the other half the channel proof, we'll be able to // properly validate it and re-broadcast it out to the network. // // TODO(wilmer): make interface to prevent channeldb dependency. WaitingProofStore *channeldb.WaitingProofStore // MessageStore is a persistent storage of gossip messages which we will // use to determine which messages need to be resent for a given peer. MessageStore GossipMessageStore // AnnSigner is an instance of the MessageSigner interface which will // be used to manually sign any outgoing channel updates. The signer // implementation should be backed by the public key of the backing // Lightning node. // // TODO(roasbeef): extract ann crafting + sign from fundingMgr into // here? AnnSigner lnwallet.MessageSigner // NumActiveSyncers is the number of peers for which we should have // active syncers with. After reaching NumActiveSyncers, any future // gossip syncers will be passive. NumActiveSyncers int // RotateTicker is a ticker responsible for notifying the SyncManager // when it should rotate its active syncers. A single active syncer with // a chansSynced state will be exchanged for a passive syncer in order // to ensure we don't keep syncing with the same peers. RotateTicker ticker.Ticker // HistoricalSyncTicker is a ticker responsible for notifying the // syncManager when it should attempt a historical sync with a gossip // sync peer. HistoricalSyncTicker ticker.Ticker // ActiveSyncerTimeoutTicker is a ticker responsible for notifying the // syncManager when it should attempt to start the next pending // activeSyncer due to the current one not completing its state machine // within the timeout. ActiveSyncerTimeoutTicker ticker.Ticker // MinimumBatchSize is minimum size of a sub batch of announcement // messages. MinimumBatchSize int // SubBatchDelay is the delay between sending sub batches of // gossip messages. SubBatchDelay time.Duration // IgnoreHistoricalFilters will prevent syncers from replying with // historical data when the remote peer sets a gossip_timestamp_range. // This prevents ranges with old start times from causing us to dump the // graph on connect. IgnoreHistoricalFilters bool // PinnedSyncers is a set of peers that will always transition to // ActiveSync upon connection. These peers will never transition to // PassiveSync. PinnedSyncers PinnedSyncers // MaxChannelUpdateBurst specifies the maximum number of updates for a // specific channel and direction that we'll accept over an interval. MaxChannelUpdateBurst int // ChannelUpdateInterval specifies the interval we'll use to determine // how often we should allow a new update for a specific channel and // direction. ChannelUpdateInterval time.Duration } // cachedNetworkMsg is a wrapper around a network message that can be used with // *lru.Cache. type cachedNetworkMsg struct { msgs []*networkMsg } // Size returns the "size" of an entry. We return the number of items as we // just want to limit the total amount of entries rather than do accurate size // accounting. func (c *cachedNetworkMsg) Size() (uint64, error) { return uint64(len(c.msgs)), nil } // rejectCacheKey is the cache key that we'll use to track announcements we've // recently rejected. type rejectCacheKey struct { pubkey [33]byte chanID uint64 } // newRejectCacheKey returns a new cache key for the reject cache. func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey { k := rejectCacheKey{ chanID: cid, pubkey: pub, } return k } // sourceToPub returns a serialized-compressed public key for use in the reject // cache. func sourceToPub(pk *btcec.PublicKey) [33]byte { var pub [33]byte copy(pub[:], pk.SerializeCompressed()) return pub } // cachedReject is the empty value used to track the value for rejects. type cachedReject struct { } // Size returns the "size" of an entry. We return 1 as we just want to limit // the total size. func (c *cachedReject) Size() (uint64, error) { return 1, nil } // AuthenticatedGossiper is a subsystem which is responsible for receiving // announcements, validating them and applying the changes to router, syncing // lightning network with newly connected nodes, broadcasting announcements // after validation, negotiating the channel announcement proofs exchange and // handling the premature announcements. All outgoing announcements are // expected to be properly signed as dictated in BOLT#7, additionally, all // incoming message are expected to be well formed and signed. Invalid messages // will be rejected by this struct. type AuthenticatedGossiper struct { // Parameters which are needed to properly handle the start and stop of // the service. started sync.Once stopped sync.Once // bestHeight is the height of the block at the tip of the main chain // as we know it. Accesses *MUST* be done with the gossiper's lock // held. bestHeight uint32 quit chan struct{} wg sync.WaitGroup // cfg is a copy of the configuration struct that the gossiper service // was initialized with. cfg *Config // blockEpochs encapsulates a stream of block epochs that are sent at // every new block height. blockEpochs *chainntnfs.BlockEpochEvent // prematureChannelUpdates is a map of ChannelUpdates we have received // that wasn't associated with any channel we know about. We store // them temporarily, such that we can reprocess them when a // ChannelAnnouncement for the channel is received. prematureChannelUpdates *lru.Cache // networkMsgs is a channel that carries new network broadcasted // message from outside the gossiper service to be processed by the // networkHandler. networkMsgs chan *networkMsg // futureMsgs is a list of premature network messages that have a block // height specified in the future. We will save them and resend it to // the chan networkMsgs once the block height has reached. The cached // map format is, // {blockHeight: [msg1, msg2, ...], ...} futureMsgs *lru.Cache // chanPolicyUpdates is a channel that requests to update the // forwarding policy of a set of channels is sent over. chanPolicyUpdates chan *chanPolicyUpdateRequest // selfKey is the identity public key of the backing Lightning node. selfKey *btcec.PublicKey // selfKeyLoc is the locator for the identity public key of the backing // Lightning node. selfKeyLoc keychain.KeyLocator // channelMtx is used to restrict the database access to one // goroutine per channel ID. This is done to ensure that when // the gossiper is handling an announcement, the db state stays // consistent between when the DB is first read until it's written. channelMtx *multimutex.Mutex recentRejects *lru.Cache // syncMgr is a subsystem responsible for managing the gossip syncers // for peers currently connected. When a new peer is connected, the // manager will create its accompanying gossip syncer and determine // whether it should have an activeSync or passiveSync sync type based // on how many other gossip syncers are currently active. Any activeSync // gossip syncers are started in a round-robin manner to ensure we're // not syncing with multiple peers at the same time. syncMgr *SyncManager // reliableSender is a subsystem responsible for handling reliable // message send requests to peers. This should only be used for channels // that are unadvertised at the time of handling the message since if it // is advertised, then peers should be able to get the message from the // network. reliableSender *reliableSender // chanUpdateRateLimiter contains rate limiters for each direction of // a channel update we've processed. We'll use these to determine // whether we should accept a new update for a specific channel and // direction. // // NOTE: This map must be synchronized with the main // AuthenticatedGossiper lock. chanUpdateRateLimiter map[uint64][2]*rate.Limiter sync.Mutex } // New creates a new AuthenticatedGossiper instance, initialized with the // passed configuration parameters. func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper { gossiper := &AuthenticatedGossiper{ selfKey: selfKeyDesc.PubKey, selfKeyLoc: selfKeyDesc.KeyLocator, cfg: &cfg, networkMsgs: make(chan *networkMsg), futureMsgs: lru.NewCache(maxPrematureUpdates), quit: make(chan struct{}), chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), prematureChannelUpdates: lru.NewCache(maxPrematureUpdates), channelMtx: multimutex.NewMutex(), recentRejects: lru.NewCache(maxRejectedUpdates), chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter), } gossiper.syncMgr = newSyncManager(&SyncManagerCfg{ ChainHash: cfg.ChainHash, ChanSeries: cfg.ChanSeries, RotateTicker: cfg.RotateTicker, HistoricalSyncTicker: cfg.HistoricalSyncTicker, NumActiveSyncers: cfg.NumActiveSyncers, IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters, BestHeight: gossiper.latestHeight, PinnedSyncers: cfg.PinnedSyncers, }) gossiper.reliableSender = newReliableSender(&reliableSenderCfg{ NotifyWhenOnline: cfg.NotifyWhenOnline, NotifyWhenOffline: cfg.NotifyWhenOffline, MessageStore: cfg.MessageStore, IsMsgStale: gossiper.isMsgStale, }) return gossiper } // EdgeWithInfo contains the information that is required to update an edge. type EdgeWithInfo struct { // Info describes the channel. Info *channeldb.ChannelEdgeInfo // Edge describes the policy in one direction of the channel. Edge *channeldb.ChannelEdgePolicy } // PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the // specified edge updates. Updates are done in two stages: first, the // AuthenticatedGossiper ensures the update has been committed by dependent // sub-systems, then it signs and broadcasts new updates to the network. A // mapping between outpoints and updated channel policies is returned, which is // used to update the forwarding policies of the underlying links. func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate( edgesToUpdate []EdgeWithInfo) error { errChan := make(chan error, 1) policyUpdate := &chanPolicyUpdateRequest{ edgesToUpdate: edgesToUpdate, errChan: errChan, } select { case d.chanPolicyUpdates <- policyUpdate: err := <-errChan return err case <-d.quit: return fmt.Errorf("AuthenticatedGossiper shutting down") } } // Start spawns network messages handler goroutine and registers on new block // notifications in order to properly handle the premature announcements. func (d *AuthenticatedGossiper) Start() error { var err error d.started.Do(func() { log.Info("Authenticated Gossiper starting") err = d.start() }) return err } func (d *AuthenticatedGossiper) start() error { // First we register for new notifications of newly discovered blocks. // We do this immediately so we'll later be able to consume any/all // blocks which were discovered. blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil) if err != nil { return err } d.blockEpochs = blockEpochs height, err := d.cfg.Router.CurrentBlockHeight() if err != nil { return err } d.bestHeight = height // Start the reliable sender. In case we had any pending messages ready // to be sent when the gossiper was last shut down, we must continue on // our quest to deliver them to their respective peers. if err := d.reliableSender.Start(); err != nil { return err } d.syncMgr.Start() // Start receiving blocks in its dedicated goroutine. d.wg.Add(2) go d.syncBlockHeight() go d.networkHandler() return nil } // syncBlockHeight syncs the best block height for the gossiper by reading // blockEpochs. // // NOTE: must be run as a goroutine. func (d *AuthenticatedGossiper) syncBlockHeight() { defer d.wg.Done() for { select { // A new block has arrived, so we can re-process the previously // premature announcements. case newBlock, ok := <-d.blockEpochs.Epochs: // If the channel has been closed, then this indicates // the daemon is shutting down, so we exit ourselves. if !ok { return } // Once a new block arrives, we update our running // track of the height of the chain tip. d.Lock() blockHeight := uint32(newBlock.Height) d.bestHeight = blockHeight d.Unlock() log.Debugf("New block: height=%d, hash=%s", blockHeight, newBlock.Hash) // Resend future messages, if any. d.resendFutureMessages(blockHeight) case <-d.quit: return } } } // resendFutureMessages takes a block height, resends all the future messages // found at that height and deletes those messages found in the gossiper's // futureMsgs. func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) { result, err := d.futureMsgs.Get(height) // Return early if no messages found. if err == cache.ErrElementNotFound { return } // The error must nil, we will log an error and exit. if err != nil { log.Errorf("Reading future messages got error: %v", err) return } msgs := result.(*cachedNetworkMsg).msgs log.Debugf("Resending %d network messages at height %d", len(msgs), height) for _, msg := range msgs { select { case d.networkMsgs <- msg: case <-d.quit: msg.err <- ErrGossiperShuttingDown } } } // Stop signals any active goroutines for a graceful closure. func (d *AuthenticatedGossiper) Stop() error { d.stopped.Do(func() { log.Info("Authenticated gossiper shutting down") d.stop() }) return nil } func (d *AuthenticatedGossiper) stop() { log.Info("Authenticated Gossiper is stopping") d.blockEpochs.Cancel() d.syncMgr.Stop() close(d.quit) d.wg.Wait() // We'll stop our reliable sender after all of the gossiper's goroutines // have exited to ensure nothing can cause it to continue executing. d.reliableSender.Stop() } // TODO(roasbeef): need method to get current gossip timestamp? // * using mtx, check time rotate forward is needed? // ProcessRemoteAnnouncement sends a new remote announcement message along with // the peer that sent the routing message. The announcement will be processed // then added to a queue for batched trickled announcement to all connected // peers. Remote channel announcements should contain the announcement proof // and be fully validated. func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, peer lnpeer.Peer) chan error { errChan := make(chan error, 1) // For messages in the known set of channel series queries, we'll // dispatch the message directly to the GossipSyncer, and skip the main // processing loop. switch m := msg.(type) { case *lnwire.QueryShortChanIDs, *lnwire.QueryChannelRange, *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd: syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey()) if !ok { log.Warnf("Gossip syncer for peer=%x not found", peer.PubKey()) errChan <- ErrGossipSyncerNotFound return errChan } // If we've found the message target, then we'll dispatch the // message directly to it. syncer.ProcessQueryMsg(m, peer.QuitSignal()) errChan <- nil return errChan // If a peer is updating its current update horizon, then we'll dispatch // that directly to the proper GossipSyncer. case *lnwire.GossipTimestampRange: syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey()) if !ok { log.Warnf("Gossip syncer for peer=%x not found", peer.PubKey()) errChan <- ErrGossipSyncerNotFound return errChan } // If we've found the message target, then we'll dispatch the // message directly to it. if err := syncer.ApplyGossipFilter(m); err != nil { log.Warnf("Unable to apply gossip filter for peer=%x: "+ "%v", peer.PubKey(), err) errChan <- err return errChan } errChan <- nil return errChan // To avoid inserting edges in the graph for our own channels that we // have already closed, we ignore such channel announcements coming // from the remote. case *lnwire.ChannelAnnouncement: ownKey := d.selfKey.SerializeCompressed() ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement " + "for own channel") if bytes.Equal(m.NodeID1[:], ownKey) || bytes.Equal(m.NodeID2[:], ownKey) { log.Warn(ownErr) errChan <- ownErr return errChan } } nMsg := &networkMsg{ msg: msg, isRemote: true, peer: peer, source: peer.IdentityKey(), err: errChan, } select { case d.networkMsgs <- nMsg: // If the peer that sent us this error is quitting, then we don't need // to send back an error and can return immediately. case <-peer.QuitSignal(): return nil case <-d.quit: nMsg.err <- ErrGossiperShuttingDown } return nMsg.err } // ProcessLocalAnnouncement sends a new remote announcement message along with // the peer that sent the routing message. The announcement will be processed // then added to a queue for batched trickled announcement to all connected // peers. Local channel announcements don't contain the announcement proof and // will not be fully validated. Once the channel proofs are received, the // entire channel announcement and update messages will be re-constructed and // broadcast to the rest of the network. func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message, optionalFields ...OptionalMsgField) chan error { optionalMsgFields := &optionalMsgFields{} optionalMsgFields.apply(optionalFields...) nMsg := &networkMsg{ msg: msg, optionalMsgFields: optionalMsgFields, isRemote: false, source: d.selfKey, err: make(chan error, 1), } select { case d.networkMsgs <- nMsg: case <-d.quit: nMsg.err <- ErrGossiperShuttingDown } return nMsg.err } // channelUpdateID is a unique identifier for ChannelUpdate messages, as // channel updates can be identified by the (ShortChannelID, ChannelFlags) // tuple. type channelUpdateID struct { // channelID represents the set of data which is needed to // retrieve all necessary data to validate the channel existence. channelID lnwire.ShortChannelID // Flags least-significant bit must be set to 0 if the creating node // corresponds to the first node in the previously sent channel // announcement and 1 otherwise. flags lnwire.ChanUpdateChanFlags } // msgWithSenders is a wrapper struct around a message, and the set of peers // that originally sent us this message. Using this struct, we can ensure that // we don't re-send a message to the peer that sent it to us in the first // place. type msgWithSenders struct { // msg is the wire message itself. msg lnwire.Message // sender is the set of peers that sent us this message. senders map[route.Vertex]struct{} } // mergeSyncerMap is used to merge the set of senders of a particular message // with peers that we have an active GossipSyncer with. We do this to ensure // that we don't broadcast messages to any peers that we have active gossip // syncers for. func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) { for peerPub := range syncers { m.senders[peerPub] = struct{}{} } } // deDupedAnnouncements de-duplicates announcements that have been added to the // batch. Internally, announcements are stored in three maps // (one each for channel announcements, channel updates, and node // announcements). These maps keep track of unique announcements and ensure no // announcements are duplicated. We keep the three message types separate, such // that we can send channel announcements first, then channel updates, and // finally node announcements when it's time to broadcast them. type deDupedAnnouncements struct { // channelAnnouncements are identified by the short channel id field. channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders // channelUpdates are identified by the channel update id field. channelUpdates map[channelUpdateID]msgWithSenders // nodeAnnouncements are identified by the Vertex field. nodeAnnouncements map[route.Vertex]msgWithSenders sync.Mutex } // Reset operates on deDupedAnnouncements to reset the storage of // announcements. func (d *deDupedAnnouncements) Reset() { d.Lock() defer d.Unlock() d.reset() } // reset is the private version of the Reset method. We have this so we can // call this method within method that are already holding the lock. func (d *deDupedAnnouncements) reset() { // Storage of each type of announcement (channel announcements, channel // updates, node announcements) is set to an empty map where the // appropriate key points to the corresponding lnwire.Message. d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders) d.channelUpdates = make(map[channelUpdateID]msgWithSenders) d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders) } // addMsg adds a new message to the current batch. If the message is already // present in the current batch, then this new instance replaces the latter, // and the set of senders is updated to reflect which node sent us this // message. func (d *deDupedAnnouncements) addMsg(message networkMsg) { log.Tracef("Adding network message: %v to batch", message.msg.MsgType()) // Depending on the message type (channel announcement, channel update, // or node announcement), the message is added to the corresponding map // in deDupedAnnouncements. Because each identifying key can have at // most one value, the announcements are de-duplicated, with newer ones // replacing older ones. switch msg := message.msg.(type) { // Channel announcements are identified by the short channel id field. case *lnwire.ChannelAnnouncement: deDupKey := msg.ShortChannelID sender := route.NewVertex(message.source) mws, ok := d.channelAnnouncements[deDupKey] if !ok { mws = msgWithSenders{ msg: msg, senders: make(map[route.Vertex]struct{}), } mws.senders[sender] = struct{}{} d.channelAnnouncements[deDupKey] = mws return } mws.msg = msg mws.senders[sender] = struct{}{} d.channelAnnouncements[deDupKey] = mws // Channel updates are identified by the (short channel id, // channelflags) tuple. case *lnwire.ChannelUpdate: sender := route.NewVertex(message.source) deDupKey := channelUpdateID{ msg.ShortChannelID, msg.ChannelFlags, } oldTimestamp := uint32(0) mws, ok := d.channelUpdates[deDupKey] if ok { // If we already have seen this message, record its // timestamp. oldTimestamp = mws.msg.(*lnwire.ChannelUpdate).Timestamp } // If we already had this message with a strictly newer // timestamp, then we'll just discard the message we got. if oldTimestamp > msg.Timestamp { log.Debugf("Ignored outdated network message: "+ "peer=%v, source=%x, msg=%s, ", message.peer, message.source.SerializeCompressed(), msg.MsgType()) return } // If the message we just got is newer than what we previously // have seen, or this is the first time we see it, then we'll // add it to our map of announcements. if oldTimestamp < msg.Timestamp { mws = msgWithSenders{ msg: msg, senders: make(map[route.Vertex]struct{}), } // We'll mark the sender of the message in the // senders map. mws.senders[sender] = struct{}{} d.channelUpdates[deDupKey] = mws return } // Lastly, if we had seen this exact message from before, with // the same timestamp, we'll add the sender to the map of // senders, such that we can skip sending this message back in // the next batch. mws.msg = msg mws.senders[sender] = struct{}{} d.channelUpdates[deDupKey] = mws // Node announcements are identified by the Vertex field. Use the // NodeID to create the corresponding Vertex. case *lnwire.NodeAnnouncement: sender := route.NewVertex(message.source) deDupKey := route.Vertex(msg.NodeID) // We do the same for node announcements as we did for channel // updates, as they also carry a timestamp. oldTimestamp := uint32(0) mws, ok := d.nodeAnnouncements[deDupKey] if ok { oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp } // Discard the message if it's old. if oldTimestamp > msg.Timestamp { return } // Replace if it's newer. if oldTimestamp < msg.Timestamp { mws = msgWithSenders{ msg: msg, senders: make(map[route.Vertex]struct{}), } mws.senders[sender] = struct{}{} d.nodeAnnouncements[deDupKey] = mws return } // Add to senders map if it's the same as we had. mws.msg = msg mws.senders[sender] = struct{}{} d.nodeAnnouncements[deDupKey] = mws } } // AddMsgs is a helper method to add multiple messages to the announcement // batch. func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) { d.Lock() defer d.Unlock() for _, msg := range msgs { d.addMsg(msg) } } // Emit returns the set of de-duplicated announcements to be sent out during // the next announcement epoch, in the order of channel announcements, channel // updates, and node announcements. Each message emitted, contains the set of // peers that sent us the message. This way, we can ensure that we don't waste // bandwidth by re-sending a message to the peer that sent it to us in the // first place. Additionally, the set of stored messages are reset. func (d *deDupedAnnouncements) Emit() []msgWithSenders { d.Lock() defer d.Unlock() // Get the total number of announcements. numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) + len(d.nodeAnnouncements) // Create an empty array of lnwire.Messages with a length equal to // the total number of announcements. msgs := make([]msgWithSenders, 0, numAnnouncements) // Add the channel announcements to the array first. for _, message := range d.channelAnnouncements { msgs = append(msgs, message) } // Then add the channel updates. for _, message := range d.channelUpdates { msgs = append(msgs, message) } // Finally add the node announcements. for _, message := range d.nodeAnnouncements { msgs = append(msgs, message) } d.reset() // Return the array of lnwire.messages. return msgs } // calculateSubBatchSize is a helper function that calculates the size to break // down the batchSize into. func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration, minimumBatchSize, batchSize int) int { if subBatchDelay > totalDelay { return batchSize } subBatchSize := (int(batchSize)*int(subBatchDelay) + int(totalDelay) - 1) / int(totalDelay) if subBatchSize < minimumBatchSize { return minimumBatchSize } return subBatchSize } // splitAnnouncementBatches takes an exiting list of announcements and // decomposes it into sub batches controlled by the `subBatchSize`. func splitAnnouncementBatches(subBatchSize int, announcementBatch []msgWithSenders) [][]msgWithSenders { var splitAnnouncementBatch [][]msgWithSenders for subBatchSize < len(announcementBatch) { // For slicing with minimal allocation // https://github.com/golang/go/wiki/SliceTricks announcementBatch, splitAnnouncementBatch = announcementBatch[subBatchSize:], append(splitAnnouncementBatch, announcementBatch[0:subBatchSize:subBatchSize]) } splitAnnouncementBatch = append(splitAnnouncementBatch, announcementBatch) return splitAnnouncementBatch } // sendBatch broadcasts a list of announcements to our peers. func (d *AuthenticatedGossiper) sendBatch(announcementBatch []msgWithSenders) { syncerPeers := d.syncMgr.GossipSyncers() // We'll first attempt to filter out this new message // for all peers that have active gossip syncers // active. for _, syncer := range syncerPeers { syncer.FilterGossipMsgs(announcementBatch...) } for _, msgChunk := range announcementBatch { // With the syncers taken care of, we'll merge // the sender map with the set of syncers, so // we don't send out duplicate messages. msgChunk.mergeSyncerMap(syncerPeers) err := d.cfg.Broadcast( msgChunk.senders, msgChunk.msg, ) if err != nil { log.Errorf("Unable to send batch "+ "announcements: %v", err) continue } } } // networkHandler is the primary goroutine that drives this service. The roles // of this goroutine includes answering queries related to the state of the // network, syncing up newly connected peers, and also periodically // broadcasting our latest topology state to all connected peers. // // NOTE: This MUST be run as a goroutine. func (d *AuthenticatedGossiper) networkHandler() { defer d.wg.Done() // Initialize empty deDupedAnnouncements to store announcement batch. announcements := deDupedAnnouncements{} announcements.Reset() d.cfg.RetransmitTicker.Resume() defer d.cfg.RetransmitTicker.Stop() trickleTimer := time.NewTicker(d.cfg.TrickleDelay) defer trickleTimer.Stop() // To start, we'll first check to see if there are any stale channel or // node announcements that we need to re-transmit. if err := d.retransmitStaleAnns(time.Now()); err != nil { log.Errorf("Unable to rebroadcast stale announcements: %v", err) } // We'll use this validation to ensure that we process jobs in their // dependency order during parallel validation. validationBarrier := routing.NewValidationBarrier(1000, d.quit) for { select { // A new policy update has arrived. We'll commit it to the // sub-systems below us, then craft, sign, and broadcast a new // ChannelUpdate for the set of affected clients. case policyUpdate := <-d.chanPolicyUpdates: log.Tracef("Received channel %d policy update requests", len(policyUpdate.edgesToUpdate)) // First, we'll now create new fully signed updates for // the affected channels and also update the underlying // graph with the new state. newChanUpdates, err := d.processChanPolicyUpdate( policyUpdate.edgesToUpdate, ) policyUpdate.errChan <- err if err != nil { log.Errorf("Unable to craft policy updates: %v", err) continue } // Finally, with the updates committed, we'll now add // them to the announcement batch to be flushed at the // start of the next epoch. announcements.AddMsgs(newChanUpdates...) case announcement := <-d.networkMsgs: log.Tracef("Received network message: "+ "peer=%v, source=%x, msg=%s, is_remote=%v", announcement.peer, announcement.source.SerializeCompressed(), announcement.msg.MsgType(), announcement.isRemote) // We should only broadcast this message forward if it // originated from us or it wasn't received as part of // our initial historical sync. shouldBroadcast := !announcement.isRemote || d.syncMgr.IsGraphSynced() switch announcement.msg.(type) { // Channel announcement signatures are amongst the only // messages that we'll process serially. case *lnwire.AnnounceSignatures: emittedAnnouncements, _ := d.processNetworkAnnouncement( announcement, ) log.Debugf("Processed network message %s, "+ "returned len(announcements)=%v", announcement.msg.MsgType(), len(emittedAnnouncements)) if emittedAnnouncements != nil { announcements.AddMsgs( emittedAnnouncements..., ) } continue } // If this message was recently rejected, then we won't // attempt to re-process it. if announcement.isRemote && d.isRecentlyRejectedMsg( announcement.msg, sourceToPub(announcement.source), ) { announcement.err <- fmt.Errorf("recently " + "rejected") continue } // We'll set up any dependent, and wait until a free // slot for this job opens up, this allow us to not // have thousands of goroutines active. validationBarrier.InitJobDependencies(announcement.msg) d.wg.Add(1) go func() { defer d.wg.Done() defer validationBarrier.CompleteJob() // If this message has an existing dependency, // then we'll wait until that has been fully // validated before we proceed. err := validationBarrier.WaitForDependants( announcement.msg, ) if err != nil { if !routing.IsError( err, routing.ErrVBarrierShuttingDown, routing.ErrParentValidationFailed, ) { log.Warnf("unexpected error "+ "during validation "+ "barrier shutdown: %v", err) } announcement.err <- err return } // Process the network announcement to // determine if this is either a new // announcement from our PoV or an edges to a // prior vertex/edge we previously proceeded. emittedAnnouncements, allowDependents := d.processNetworkAnnouncement( announcement, ) log.Tracef("Processed network message %s, "+ "returned len(announcements)=%v, "+ "allowDependents=%v", announcement.msg.MsgType(), len(emittedAnnouncements), allowDependents) // If this message had any dependencies, then // we can now signal them to continue. validationBarrier.SignalDependants( announcement.msg, allowDependents, ) // If the announcement was accepted, then add // the emitted announcements to our announce // batch to be broadcast once the trickle timer // ticks gain. if emittedAnnouncements != nil && shouldBroadcast { // TODO(roasbeef): exclude peer that // sent. announcements.AddMsgs( emittedAnnouncements..., ) } else if emittedAnnouncements != nil { log.Trace("Skipping broadcast of " + "announcements received " + "during initial graph sync") } }() // The trickle timer has ticked, which indicates we should // flush to the network the pending batch of new announcements // we've received since the last trickle tick. case <-trickleTimer.C: // Emit the current batch of announcements from // deDupedAnnouncements. announcementBatch := announcements.Emit() // If the current announcements batch is nil, then we // have no further work here. if len(announcementBatch) == 0 { continue } // Next, If we have new things to announce then // broadcast them to all our immediately connected // peers. subBatchSize := calculateSubBatchSize( d.cfg.TrickleDelay, d.cfg.SubBatchDelay, d.cfg.MinimumBatchSize, len(announcementBatch), ) splitAnnouncementBatch := splitAnnouncementBatches( subBatchSize, announcementBatch, ) d.wg.Add(1) go func() { defer d.wg.Done() log.Infof("Broadcasting %v new announcements in %d sub batches", len(announcementBatch), len(splitAnnouncementBatch)) for _, announcementBatch := range splitAnnouncementBatch { d.sendBatch(announcementBatch) select { case <-time.After(d.cfg.SubBatchDelay): case <-d.quit: return } } }() // The retransmission timer has ticked which indicates that we // should check if we need to prune or re-broadcast any of our // personal channels or node announcement. This addresses the // case of "zombie" channels and channel advertisements that // have been dropped, or not properly propagated through the // network. case tick := <-d.cfg.RetransmitTicker.Ticks(): if err := d.retransmitStaleAnns(tick); err != nil { log.Errorf("unable to rebroadcast stale "+ "announcements: %v", err) } // The gossiper has been signalled to exit, to we exit our // main loop so the wait group can be decremented. case <-d.quit: return } } } // TODO(roasbeef): d/c peers that send updates not on our chain // InitSyncState is called by outside sub-systems when a connection is // established to a new peer that understands how to perform channel range // queries. We'll allocate a new gossip syncer for it, and start any goroutines // needed to handle new queries. func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) { d.syncMgr.InitSyncState(syncPeer) } // PruneSyncState is called by outside sub-systems once a peer that we were // previously connected to has been disconnected. In this case we can stop the // existing GossipSyncer assigned to the peer and free up resources. func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) { d.syncMgr.PruneSyncState(peer) } // isRecentlyRejectedMsg returns true if we recently rejected a message, and // false otherwise, This avoids expensive reprocessing of the message. func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message, peerPub [33]byte) bool { var scid uint64 switch m := msg.(type) { case *lnwire.ChannelUpdate: scid = m.ShortChannelID.ToUint64() case *lnwire.ChannelAnnouncement: scid = m.ShortChannelID.ToUint64() default: return false } _, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub)) return err != cache.ErrElementNotFound } // retransmitStaleAnns examines all outgoing channels that the source node is // known to maintain to check to see if any of them are "stale". A channel is // stale iff, the last timestamp of its rebroadcast is older than the // RebroadcastInterval. We also check if a refreshed node announcement should // be resent. func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error { // Iterate over all of our channels and check if any of them fall // within the prune interval or re-broadcast interval. type updateTuple struct { info *channeldb.ChannelEdgeInfo edge *channeldb.ChannelEdgePolicy } var ( havePublicChannels bool edgesToUpdate []updateTuple ) err := d.cfg.Router.ForAllOutgoingChannels(func( _ kvdb.RTx, info *channeldb.ChannelEdgeInfo, edge *channeldb.ChannelEdgePolicy) error { // If there's no auth proof attached to this edge, it means // that it is a private channel not meant to be announced to // the greater network, so avoid sending channel updates for // this channel to not leak its // existence. if info.AuthProof == nil { log.Debugf("Skipping retransmission of channel "+ "without AuthProof: %v", info.ChannelID) return nil } // We make a note that we have at least one public channel. We // use this to determine whether we should send a node // announcement below. havePublicChannels = true // If this edge has a ChannelUpdate that was created before the // introduction of the MaxHTLC field, then we'll update this // edge to propagate this information in the network. if !edge.MessageFlags.HasMaxHtlc() { // We'll make sure we support the new max_htlc field if // not already present. edge.MessageFlags |= lnwire.ChanUpdateOptionMaxHtlc edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity) edgesToUpdate = append(edgesToUpdate, updateTuple{ info: info, edge: edge, }) return nil } timeElapsed := now.Sub(edge.LastUpdate) // If it's been longer than RebroadcastInterval since we've // re-broadcasted the channel, add the channel to the set of // edges we need to update. if timeElapsed >= d.cfg.RebroadcastInterval { edgesToUpdate = append(edgesToUpdate, updateTuple{ info: info, edge: edge, }) } return nil }) if err != nil && err != channeldb.ErrGraphNoEdgesFound { return fmt.Errorf("unable to retrieve outgoing channels: %v", err) } var signedUpdates []lnwire.Message for _, chanToUpdate := range edgesToUpdate { // Re-sign and update the channel on disk and retrieve our // ChannelUpdate to broadcast. chanAnn, chanUpdate, err := d.updateChannel( chanToUpdate.info, chanToUpdate.edge, ) if err != nil { return fmt.Errorf("unable to update channel: %v", err) } // If we have a valid announcement to transmit, then we'll send // that along with the update. if chanAnn != nil { signedUpdates = append(signedUpdates, chanAnn) } signedUpdates = append(signedUpdates, chanUpdate) } // If we don't have any public channels, we return as we don't want to // broadcast anything that would reveal our existence. if !havePublicChannels { return nil } // We'll also check that our NodeAnnouncement is not too old. currentNodeAnn, err := d.cfg.SelfNodeAnnouncement(false) if err != nil { return fmt.Errorf("unable to get current node announment: %v", err) } timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0) timeElapsed := now.Sub(timestamp) // If it's been a full day since we've re-broadcasted the // node announcement, refresh it and resend it. nodeAnnStr := "" if timeElapsed >= d.cfg.RebroadcastInterval { newNodeAnn, err := d.cfg.SelfNodeAnnouncement(true) if err != nil { return fmt.Errorf("unable to get refreshed node "+ "announcement: %v", err) } signedUpdates = append(signedUpdates, &newNodeAnn) nodeAnnStr = " and our refreshed node announcement" // Before broadcasting the refreshed node announcement, add it // to our own graph. if err := d.addNode(&newNodeAnn); err != nil { log.Errorf("Unable to add refreshed node announcement "+ "to graph: %v", err) } } // If we don't have any updates to re-broadcast, then we'll exit // early. if len(signedUpdates) == 0 { return nil } log.Infof("Retransmitting %v outgoing channels%v", len(edgesToUpdate), nodeAnnStr) // With all the wire announcements properly crafted, we'll broadcast // our known outgoing channels to all our immediate peers. if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil { return fmt.Errorf("unable to re-broadcast channels: %v", err) } return nil } // processChanPolicyUpdate generates a new set of channel updates for the // provided list of edges and updates the backing ChannelGraphSource. func (d *AuthenticatedGossiper) processChanPolicyUpdate( edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) { var chanUpdates []networkMsg for _, edgeInfo := range edgesToUpdate { // Now that we've collected all the channels we need to update, // we'll re-sign and update the backing ChannelGraphSource, and // retrieve our ChannelUpdate to broadcast. _, chanUpdate, err := d.updateChannel( edgeInfo.Info, edgeInfo.Edge, ) if err != nil { return nil, err } // We'll avoid broadcasting any updates for private channels to // avoid directly giving away their existence. Instead, we'll // send the update directly to the remote party. if edgeInfo.Info.AuthProof == nil { remotePubKey := remotePubFromChanInfo( edgeInfo.Info, chanUpdate.ChannelFlags, ) err := d.reliableSender.sendMessage( chanUpdate, remotePubKey, ) if err != nil { log.Errorf("Unable to reliably send %v for "+ "channel=%v to peer=%x: %v", chanUpdate.MsgType(), chanUpdate.ShortChannelID, remotePubKey, err) } continue } // We set ourselves as the source of this message to indicate // that we shouldn't skip any peers when sending this message. chanUpdates = append(chanUpdates, networkMsg{ source: d.selfKey, msg: chanUpdate, }) } return chanUpdates, nil } // remotePubFromChanInfo returns the public key of the remote peer given a // ChannelEdgeInfo that describe a channel we have with them. func remotePubFromChanInfo(chanInfo *channeldb.ChannelEdgeInfo, chanFlags lnwire.ChanUpdateChanFlags) [33]byte { var remotePubKey [33]byte switch { case chanFlags&lnwire.ChanUpdateDirection == 0: remotePubKey = chanInfo.NodeKey2Bytes case chanFlags&lnwire.ChanUpdateDirection == 1: remotePubKey = chanInfo.NodeKey1Bytes } return remotePubKey } // processRejectedEdge examines a rejected edge to see if we can extract any // new announcements from it. An edge will get rejected if we already added // the same edge without AuthProof to the graph. If the received announcement // contains a proof, we can add this proof to our edge. We can end up in this // situation in the case where we create a channel, but for some reason fail // to receive the remote peer's proof, while the remote peer is able to fully // assemble the proof and craft the ChannelAnnouncement. func (d *AuthenticatedGossiper) processRejectedEdge( chanAnnMsg *lnwire.ChannelAnnouncement, proof *channeldb.ChannelAuthProof) ([]networkMsg, error) { // First, we'll fetch the state of the channel as we know if from the // database. chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID( chanAnnMsg.ShortChannelID, ) if err != nil { return nil, err } // The edge is in the graph, and has a proof attached, then we'll just // reject it as normal. if chanInfo.AuthProof != nil { return nil, nil } // Otherwise, this means that the edge is within the graph, but it // doesn't yet have a proper proof attached. If we did not receive // the proof such that we now can add it, there's nothing more we // can do. if proof == nil { return nil, nil } // We'll then create then validate the new fully assembled // announcement. chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement( proof, chanInfo, e1, e2, ) if err != nil { return nil, err } err = routing.ValidateChannelAnn(chanAnn) if err != nil { err := fmt.Errorf("assembled channel announcement proof "+ "for shortChanID=%v isn't valid: %v", chanAnnMsg.ShortChannelID, err) log.Error(err) return nil, err } // If everything checks out, then we'll add the fully assembled proof // to the database. err = d.cfg.Router.AddProof(chanAnnMsg.ShortChannelID, proof) if err != nil { err := fmt.Errorf("unable add proof to shortChanID=%v: %v", chanAnnMsg.ShortChannelID, err) log.Error(err) return nil, err } // As we now have a complete channel announcement for this channel, // we'll construct the announcement so they can be broadcast out to all // our peers. announcements := make([]networkMsg, 0, 3) announcements = append(announcements, networkMsg{ source: d.selfKey, msg: chanAnn, }) if e1Ann != nil { announcements = append(announcements, networkMsg{ source: d.selfKey, msg: e1Ann, }) } if e2Ann != nil { announcements = append(announcements, networkMsg{ source: d.selfKey, msg: e2Ann, }) } return announcements, nil } // addNode processes the given node announcement, and adds it to our channel // graph. func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement, op ...batch.SchedulerOption) error { if err := routing.ValidateNodeAnn(msg); err != nil { return fmt.Errorf("unable to validate node announcement: %v", err) } timestamp := time.Unix(int64(msg.Timestamp), 0) features := lnwire.NewFeatureVector(msg.Features, lnwire.Features) node := &channeldb.LightningNode{ HaveNodeAnnouncement: true, LastUpdate: timestamp, Addresses: msg.Addresses, PubKeyBytes: msg.NodeID, Alias: msg.Alias.String(), AuthSigBytes: msg.Signature.ToSignatureBytes(), Features: features, Color: msg.RGBColor, ExtraOpaqueData: msg.ExtraOpaqueData, } return d.cfg.Router.AddNode(node, op...) } // isPremature decides whether a given network message has a block height+delta // value specified in the future. If so, the message will be added to the // future message map and be processed when the block height as reached. // // NOTE: must be used inside a lock. func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID, delta uint32, msg *networkMsg) bool { // TODO(roasbeef) make height delta 6 // * or configurable msgHeight := chanID.BlockHeight + delta // The message height is smaller or equal to our best known height, // thus the message is mature. if msgHeight <= d.bestHeight { return false } // Add the premature message to our future messages which will // be resent once the block height has reached. // // Init an empty cached message and overwrite it if there are cached // messages found. cachedMsgs := &cachedNetworkMsg{ msgs: make([]*networkMsg, 0), } result, err := d.futureMsgs.Get(msgHeight) // No error returned means we have old messages cached. if err == nil { cachedMsgs = result.(*cachedNetworkMsg) } // Copy the networkMsgs since the old message's err chan will // be consumed. copied := &networkMsg{ peer: msg.peer, source: msg.source, msg: msg.msg, optionalMsgFields: msg.optionalMsgFields, isRemote: msg.isRemote, err: make(chan error, 1), } // Add the network message. cachedMsgs.msgs = append(cachedMsgs.msgs, copied) _, err = d.futureMsgs.Put(msgHeight, cachedMsgs) if err != nil { log.Errorf("Adding future message got error: %v", err) } log.Debugf("Network message: %v added to future messages for "+ "msgHeight=%d, bestHeight=%d", msg.msg.MsgType(), msgHeight, d.bestHeight) return true } // processNetworkAnnouncement processes a new network relate authenticated // channel or node announcement or announcements proofs. If the announcement // didn't affect the internal state due to either being out of date, invalid, // or redundant, then nil is returned. Otherwise, the set of announcements will // be returned which should be broadcasted to the rest of the network. The // boolean returned indicates whether any dependents of the announcement should // attempt to be processed as well. func (d *AuthenticatedGossiper) processNetworkAnnouncement( nMsg *networkMsg) ([]networkMsg, bool) { log.Debugf("Processing network message: peer=%v, source=%x, msg=%s, "+ "is_remote=%v", nMsg.peer, nMsg.source.SerializeCompressed(), nMsg.msg.MsgType(), nMsg.isRemote) // If this is a remote update, we set the scheduler option to lazily // add it to the graph. var schedulerOp []batch.SchedulerOption if nMsg.isRemote { schedulerOp = append(schedulerOp, batch.LazyAdd()) } var announcements []networkMsg switch msg := nMsg.msg.(type) { // A new node announcement has arrived which either presents new // information about a node in one of the channels we know about, or a // updating previously advertised information. case *lnwire.NodeAnnouncement: return d.handleNodeAnnouncement(nMsg, msg, schedulerOp) // A new channel announcement has arrived, this indicates the // *creation* of a new channel within the network. This only advertises // the existence of a channel and not yet the routing policies in // either direction of the channel. case *lnwire.ChannelAnnouncement: return d.handleChanAnnouncement(nMsg, msg, schedulerOp) // A new authenticated channel edge update has arrived. This indicates // that the directional information for an already known channel has // been updated. case *lnwire.ChannelUpdate: // We'll ignore any channel announcements that target any chain // other than the set of chains we know of. if !bytes.Equal(msg.ChainHash[:], d.cfg.ChainHash[:]) { err := fmt.Errorf("ignoring ChannelUpdate from "+ "chain=%v, gossiper on chain=%v", msg.ChainHash, d.cfg.ChainHash) log.Errorf(err.Error()) key := newRejectCacheKey( msg.ShortChannelID.ToUint64(), sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) nMsg.err <- err return nil, false } blockHeight := msg.ShortChannelID.BlockHeight shortChanID := msg.ShortChannelID.ToUint64() // If the advertised inclusionary block is beyond our knowledge // of the chain tip, then we'll put the announcement in limbo // to be fully verified once we advance forward in the chain. d.Lock() if nMsg.isRemote && d.isPremature(msg.ShortChannelID, 0, nMsg) { log.Warnf("Update announcement for "+ "short_chan_id(%v), is premature: advertises "+ "height %v, only height %v is known", shortChanID, blockHeight, d.bestHeight) d.Unlock() nMsg.err <- nil return nil, false } d.Unlock() // Before we perform any of the expensive checks below, we'll // check whether this update is stale or is for a zombie // channel in order to quickly reject it. timestamp := time.Unix(int64(msg.Timestamp), 0) if d.cfg.Router.IsStaleEdgePolicy( msg.ShortChannelID, timestamp, msg.ChannelFlags, ) { log.Debugf("Ignored stale edge policy: peer=%v, "+ "source=%x, msg=%s, is_remote=%v", nMsg.peer, nMsg.source.SerializeCompressed(), nMsg.msg.MsgType(), nMsg.isRemote) nMsg.err <- nil return nil, true } // Get the node pub key as far as we don't have it in channel // update announcement message. We'll need this to properly // verify message signature. // // We make sure to obtain the mutex for this channel ID // before we access the database. This ensures the state // we read from the database has not changed between this // point and when we call UpdateEdge() later. d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) chanInfo, edge1, edge2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) switch err { // No error, break. case nil: break case channeldb.ErrZombieEdge: err = d.processZombieUpdate(chanInfo, msg) if err != nil { log.Debug(err) nMsg.err <- err return nil, false } // We'll fallthrough to ensure we stash the update until // we receive its corresponding ChannelAnnouncement. // This is needed to ensure the edge exists in the graph // before applying the update. fallthrough case channeldb.ErrGraphNotFound: fallthrough case channeldb.ErrGraphNoEdgesFound: fallthrough case channeldb.ErrEdgeNotFound: // If the edge corresponding to this ChannelUpdate was // not found in the graph, this might be a channel in // the process of being opened, and we haven't processed // our own ChannelAnnouncement yet, hence it is not // found in the graph. This usually gets resolved after // the channel proofs are exchanged and the channel is // broadcasted to the rest of the network, but in case // this is a private channel this won't ever happen. // This can also happen in the case of a zombie channel // with a fresh update for which we don't have a // ChannelAnnouncement for since we reject them. Because // of this, we temporarily add it to a map, and // reprocess it after our own ChannelAnnouncement has // been processed. earlyMsgs, err := d.prematureChannelUpdates.Get( shortChanID, ) switch { // Nothing in the cache yet, we can just directly // insert this element. case err == cache.ErrElementNotFound: _, _ = d.prematureChannelUpdates.Put( shortChanID, &cachedNetworkMsg{ msgs: []*networkMsg{nMsg}, }) // There's already something in the cache, so we'll // combine the set of messages into a single value. default: msgs := earlyMsgs.(*cachedNetworkMsg).msgs msgs = append(msgs, nMsg) _, _ = d.prematureChannelUpdates.Put( shortChanID, &cachedNetworkMsg{ msgs: msgs, }) } log.Debugf("Got ChannelUpdate for edge not found in "+ "graph(shortChanID=%v), saving for "+ "reprocessing later", shortChanID) // NOTE: We don't return anything on the error channel // for this message, as we expect that will be done when // this ChannelUpdate is later reprocessed. return nil, false default: err := fmt.Errorf("unable to validate channel update "+ "short_chan_id=%v: %v", shortChanID, err) log.Error(err) nMsg.err <- err key := newRejectCacheKey( msg.ShortChannelID.ToUint64(), sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) return nil, false } // The least-significant bit in the flag on the channel update // announcement tells us "which" side of the channels directed // edge is being updated. var ( pubKey *btcec.PublicKey edgeToUpdate *channeldb.ChannelEdgePolicy ) direction := msg.ChannelFlags & lnwire.ChanUpdateDirection switch direction { case 0: pubKey, _ = chanInfo.NodeKey1() edgeToUpdate = edge1 case 1: pubKey, _ = chanInfo.NodeKey2() edgeToUpdate = edge2 } // If we have a previous version of the edge being updated, // we'll want to rate limit its updates to prevent spam // throughout the network. if nMsg.isRemote && edgeToUpdate != nil { // If it's a keep-alive update, we'll only propagate one // if it's been a day since the previous. This follows // our own heuristic of sending keep-alive updates after // the same duration (see retransmitStaleAnns). timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate) if IsKeepAliveUpdate(msg, edgeToUpdate) { if timeSinceLastUpdate < d.cfg.RebroadcastInterval { log.Debugf("Ignoring keep alive update "+ "not within %v period for "+ "channel %v", d.cfg.RebroadcastInterval, shortChanID) nMsg.err <- nil return nil, false } } else { // If it's not, we'll allow an update per minute // with a maximum burst of 10. If we haven't // seen an update for this channel before, we'll // need to initialize a rate limiter for each // direction. d.Lock() rateLimiters, ok := d.chanUpdateRateLimiter[shortChanID] if !ok { r := rate.Every(d.cfg.ChannelUpdateInterval) b := d.cfg.MaxChannelUpdateBurst rateLimiters = [2]*rate.Limiter{ rate.NewLimiter(r, b), rate.NewLimiter(r, b), } d.chanUpdateRateLimiter[shortChanID] = rateLimiters } d.Unlock() if !rateLimiters[direction].Allow() { log.Debugf("Rate limiting update for "+ "channel %v from direction %x", shortChanID, pubKey.SerializeCompressed()) nMsg.err <- nil return nil, false } } } // Validate the channel announcement with the expected public key and // channel capacity. In the case of an invalid channel update, we'll // return an error to the caller and exit early. err = routing.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, msg) if err != nil { rErr := fmt.Errorf("unable to validate channel "+ "update announcement for short_chan_id=%v: %v", spew.Sdump(msg.ShortChannelID), err) log.Error(rErr) nMsg.err <- rErr return nil, false } update := &channeldb.ChannelEdgePolicy{ SigBytes: msg.Signature.ToSignatureBytes(), ChannelID: shortChanID, LastUpdate: timestamp, MessageFlags: msg.MessageFlags, ChannelFlags: msg.ChannelFlags, TimeLockDelta: msg.TimeLockDelta, MinHTLC: msg.HtlcMinimumMsat, MaxHTLC: msg.HtlcMaximumMsat, FeeBaseMSat: lnwire.MilliSatoshi(msg.BaseFee), FeeProportionalMillionths: lnwire.MilliSatoshi(msg.FeeRate), ExtraOpaqueData: msg.ExtraOpaqueData, } if err := d.cfg.Router.UpdateEdge(update, schedulerOp...); err != nil { if routing.IsError( err, routing.ErrOutdated, routing.ErrIgnored, routing.ErrVBarrierShuttingDown, ) { log.Debug(err) } else { key := newRejectCacheKey( msg.ShortChannelID.ToUint64(), sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) log.Error(err) } nMsg.err <- err return nil, false } // If this is a local ChannelUpdate without an AuthProof, it // means it is an update to a channel that is not (yet) // supposed to be announced to the greater network. However, // our channel counter party will need to be given the update, // so we'll try sending the update directly to the remote peer. if !nMsg.isRemote && chanInfo.AuthProof == nil { // Get our peer's public key. remotePubKey := remotePubFromChanInfo( chanInfo, msg.ChannelFlags, ) log.Debugf("The message %v has no AuthProof, sending "+ "the update to remote peer %x", msg.MsgType(), remotePubKey) // Now, we'll attempt to send the channel update message // reliably to the remote peer in the background, so // that we don't block if the peer happens to be offline // at the moment. err := d.reliableSender.sendMessage(msg, remotePubKey) if err != nil { err := fmt.Errorf("unable to reliably send %v "+ "for channel=%v to peer=%x: %v", msg.MsgType(), msg.ShortChannelID, remotePubKey, err) nMsg.err <- err return nil, false } } // Channel update announcement was successfully processed and // now it can be broadcast to the rest of the network. However, // we'll only broadcast the channel update announcement if it // has an attached authentication proof. if chanInfo.AuthProof != nil { announcements = append(announcements, networkMsg{ peer: nMsg.peer, source: nMsg.source, msg: msg, }) } nMsg.err <- nil return announcements, true // A new signature announcement has been received. This indicates // willingness of nodes involved in the funding of a channel to // announce this new channel to the rest of the world. case *lnwire.AnnounceSignatures: needBlockHeight := msg.ShortChannelID.BlockHeight + d.cfg.ProofMatureDelta shortChanID := msg.ShortChannelID.ToUint64() prefix := "local" if nMsg.isRemote { prefix = "remote" } log.Infof("Received new %v channel announcement for %v", prefix, msg.ShortChannelID) // By the specification, channel announcement proofs should be // sent after some number of confirmations after channel was // registered in bitcoin blockchain. Therefore, we check if the // proof is premature. d.Lock() premature := d.isPremature( msg.ShortChannelID, d.cfg.ProofMatureDelta, nMsg, ) if premature { log.Warnf("Premature proof announcement, current "+ "block height lower than needed: %v < %v", d.bestHeight, needBlockHeight) d.Unlock() nMsg.err <- nil return nil, false } d.Unlock() // Ensure that we know of a channel with the target channel ID // before proceeding further. // // We must acquire the mutex for this channel ID before getting // the channel from the database, to ensure what we read does // not change before we call AddProof() later. d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID( msg.ShortChannelID) if err != nil { // TODO(andrew.shvv) this is dangerous because remote // node might rewrite the waiting proof. proof := channeldb.NewWaitingProof(nMsg.isRemote, msg) err := d.cfg.WaitingProofStore.Add(proof) if err != nil { err := fmt.Errorf("unable to store "+ "the proof for short_chan_id=%v: %v", shortChanID, err) log.Error(err) nMsg.err <- err return nil, false } log.Infof("Orphan %v proof announcement with "+ "short_chan_id=%v, adding "+ "to waiting batch", prefix, shortChanID) nMsg.err <- nil return nil, false } nodeID := nMsg.source.SerializeCompressed() isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:]) isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:]) // Ensure that channel that was retrieved belongs to the peer // which sent the proof announcement. if !(isFirstNode || isSecondNode) { err := fmt.Errorf("channel that was received not "+ "belongs to the peer which sent the proof, "+ "short_chan_id=%v", shortChanID) log.Error(err) nMsg.err <- err return nil, false } // If proof was sent by a local sub-system, then we'll // send the announcement signature to the remote node // so they can also reconstruct the full channel // announcement. if !nMsg.isRemote { var remotePubKey [33]byte if isFirstNode { remotePubKey = chanInfo.NodeKey2Bytes } else { remotePubKey = chanInfo.NodeKey1Bytes } // Since the remote peer might not be online // we'll call a method that will attempt to // deliver the proof when it comes online. err := d.reliableSender.sendMessage(msg, remotePubKey) if err != nil { err := fmt.Errorf("unable to reliably send %v "+ "for channel=%v to peer=%x: %v", msg.MsgType(), msg.ShortChannelID, remotePubKey, err) nMsg.err <- err return nil, false } } // Check if we already have the full proof for this channel. if chanInfo.AuthProof != nil { // If we already have the fully assembled proof, then // the peer sending us their proof has probably not // received our local proof yet. So be kind and send // them the full proof. if nMsg.isRemote { peerID := nMsg.source.SerializeCompressed() log.Debugf("Got AnnounceSignatures for " + "channel with full proof.") d.wg.Add(1) go func() { defer d.wg.Done() log.Debugf("Received half proof for "+ "channel %v with existing "+ "full proof. Sending full "+ "proof to peer=%x", msg.ChannelID, peerID) chanAnn, _, _, err := netann.CreateChanAnnouncement( chanInfo.AuthProof, chanInfo, e1, e2, ) if err != nil { log.Errorf("unable to gen "+ "ann: %v", err) return } err = nMsg.peer.SendMessage( false, chanAnn, ) if err != nil { log.Errorf("Failed sending "+ "full proof to "+ "peer=%x: %v", peerID, err) return } log.Debugf("Full proof sent to peer=%x"+ " for chanID=%v", peerID, msg.ChannelID) }() } log.Debugf("Already have proof for channel "+ "with chanID=%v", msg.ChannelID) nMsg.err <- nil return nil, true } // Check that we received the opposite proof. If so, then we're // now able to construct the full proof, and create the channel // announcement. If we didn't receive the opposite half of the // proof than we should store it this one, and wait for // opposite to be received. proof := channeldb.NewWaitingProof(nMsg.isRemote, msg) oppositeProof, err := d.cfg.WaitingProofStore.Get( proof.OppositeKey(), ) if err != nil && err != channeldb.ErrWaitingProofNotFound { err := fmt.Errorf("unable to get "+ "the opposite proof for short_chan_id=%v: %v", shortChanID, err) log.Error(err) nMsg.err <- err return nil, false } if err == channeldb.ErrWaitingProofNotFound { err := d.cfg.WaitingProofStore.Add(proof) if err != nil { err := fmt.Errorf("unable to store "+ "the proof for short_chan_id=%v: %v", shortChanID, err) log.Error(err) nMsg.err <- err return nil, false } log.Infof("1/2 of channel ann proof received for "+ "short_chan_id=%v, waiting for other half", shortChanID) nMsg.err <- nil return nil, false } // We now have both halves of the channel announcement proof, // then we'll reconstruct the initial announcement so we can // validate it shortly below. var dbProof channeldb.ChannelAuthProof if isFirstNode { dbProof.NodeSig1Bytes = msg.NodeSignature.ToSignatureBytes() dbProof.NodeSig2Bytes = oppositeProof.NodeSignature.ToSignatureBytes() dbProof.BitcoinSig1Bytes = msg.BitcoinSignature.ToSignatureBytes() dbProof.BitcoinSig2Bytes = oppositeProof.BitcoinSignature.ToSignatureBytes() } else { dbProof.NodeSig1Bytes = oppositeProof.NodeSignature.ToSignatureBytes() dbProof.NodeSig2Bytes = msg.NodeSignature.ToSignatureBytes() dbProof.BitcoinSig1Bytes = oppositeProof.BitcoinSignature.ToSignatureBytes() dbProof.BitcoinSig2Bytes = msg.BitcoinSignature.ToSignatureBytes() } chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement( &dbProof, chanInfo, e1, e2, ) if err != nil { log.Error(err) nMsg.err <- err return nil, false } // With all the necessary components assembled validate the // full channel announcement proof. if err := routing.ValidateChannelAnn(chanAnn); err != nil { err := fmt.Errorf("channel announcement proof "+ "for short_chan_id=%v isn't valid: %v", shortChanID, err) log.Error(err) nMsg.err <- err return nil, false } // If the channel was returned by the router it means that // existence of funding point and inclusion of nodes bitcoin // keys in it already checked by the router. In this stage we // should check that node keys are attest to the bitcoin keys // by validating the signatures of announcement. If proof is // valid then we'll populate the channel edge with it, so we // can announce it on peer connect. err = d.cfg.Router.AddProof(msg.ShortChannelID, &dbProof) if err != nil { err := fmt.Errorf("unable add proof to the "+ "channel chanID=%v: %v", msg.ChannelID, err) log.Error(err) nMsg.err <- err return nil, false } err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey()) if err != nil { err := fmt.Errorf("unable remove opposite proof "+ "for the channel with chanID=%v: %v", msg.ChannelID, err) log.Error(err) nMsg.err <- err return nil, false } // Proof was successfully created and now can announce the // channel to the remain network. log.Infof("Fully valid channel proof for short_chan_id=%v "+ "constructed, adding to next ann batch", shortChanID) // Assemble the necessary announcements to add to the next // broadcasting batch. announcements = append(announcements, networkMsg{ peer: nMsg.peer, source: nMsg.source, msg: chanAnn, }) if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil { announcements = append(announcements, networkMsg{ peer: nMsg.peer, source: src, msg: e1Ann, }) } if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil { announcements = append(announcements, networkMsg{ peer: nMsg.peer, source: src, msg: e2Ann, }) } // We'll also send along the node announcements for each channel // participant if we know of them. To ensure our node // announcement propagates to our channel counterparty, we'll // set the source for each announcement to the node it belongs // to, otherwise we won't send it since the source gets skipped. // This isn't necessary for channel updates and announcement // signatures since we send those directly to our channel // counterparty through the gossiper's reliable sender. node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes) if err != nil { log.Debugf("Unable to fetch node announcement for "+ "%x: %v", chanInfo.NodeKey1Bytes, err) } else { if nodeKey1, err := chanInfo.NodeKey1(); err == nil { announcements = append(announcements, networkMsg{ peer: nMsg.peer, source: nodeKey1, msg: node1Ann, }) } } node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes) if err != nil { log.Debugf("Unable to fetch node announcement for "+ "%x: %v", chanInfo.NodeKey2Bytes, err) } else { if nodeKey2, err := chanInfo.NodeKey2(); err == nil { announcements = append(announcements, networkMsg{ peer: nMsg.peer, source: nodeKey2, msg: node2Ann, }) } } nMsg.err <- nil return announcements, true default: err := errors.New("wrong type of the announcement") nMsg.err <- err return nil, false } } // processZombieUpdate determines whether the provided channel update should // resurrect a given zombie edge. func (d *AuthenticatedGossiper) processZombieUpdate( chanInfo *channeldb.ChannelEdgeInfo, msg *lnwire.ChannelUpdate) error { // The least-significant bit in the flag on the channel update tells us // which edge is being updated. isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 // Since we've deemed the update as not stale above, before marking it // live, we'll make sure it has been signed by the correct party. If we // have both pubkeys, either party can resurect the channel. If we've // already marked this with the stricter, single-sided resurrection we // will only have the pubkey of the node with the oldest timestamp. var pubKey *btcec.PublicKey switch { case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey: pubKey, _ = chanInfo.NodeKey1() case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey: pubKey, _ = chanInfo.NodeKey2() } if pubKey == nil { return fmt.Errorf("incorrect pubkey to resurrect zombie "+ "with chan_id=%v", msg.ShortChannelID) } err := routing.VerifyChannelUpdateSignature(msg, pubKey) if err != nil { return fmt.Errorf("unable to verify channel "+ "update signature: %v", err) } // With the signature valid, we'll proceed to mark the // edge as live and wait for the channel announcement to // come through again. err = d.cfg.Router.MarkEdgeLive(msg.ShortChannelID) if err != nil { return fmt.Errorf("unable to remove edge with "+ "chan_id=%v from zombie index: %v", msg.ShortChannelID, err) } log.Debugf("Removed edge with chan_id=%v from zombie "+ "index", msg.ShortChannelID) return nil } // fetchNodeAnn fetches the latest signed node announcement from our point of // view for the node with the given public key. func (d *AuthenticatedGossiper) fetchNodeAnn( pubKey [33]byte) (*lnwire.NodeAnnouncement, error) { node, err := d.cfg.Router.FetchLightningNode(pubKey) if err != nil { return nil, err } return node.NodeAnnouncement(true) } // isMsgStale determines whether a message retrieved from the backing // MessageStore is seen as stale by the current graph. func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool { switch msg := msg.(type) { case *lnwire.AnnounceSignatures: chanInfo, _, _, err := d.cfg.Router.GetChannelByID( msg.ShortChannelID, ) // If the channel cannot be found, it is most likely a leftover // message for a channel that was closed, so we can consider it // stale. if err == channeldb.ErrEdgeNotFound { return true } if err != nil { log.Debugf("Unable to retrieve channel=%v from graph: "+ "%v", err) return false } // If the proof exists in the graph, then we have successfully // received the remote proof and assembled the full proof, so we // can safely delete the local proof from the database. return chanInfo.AuthProof != nil case *lnwire.ChannelUpdate: _, p1, p2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) // If the channel cannot be found, it is most likely a leftover // message for a channel that was closed, so we can consider it // stale. if err == channeldb.ErrEdgeNotFound { return true } if err != nil { log.Debugf("Unable to retrieve channel=%v from graph: "+ "%v", msg.ShortChannelID, err) return false } // Otherwise, we'll retrieve the correct policy that we // currently have stored within our graph to check if this // message is stale by comparing its timestamp. var p *channeldb.ChannelEdgePolicy if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 { p = p1 } else { p = p2 } // If the policy is still unknown, then we can consider this // policy fresh. if p == nil { return false } timestamp := time.Unix(int64(msg.Timestamp), 0) return p.LastUpdate.After(timestamp) default: // We'll make sure to not mark any unsupported messages as stale // to ensure they are not removed. return false } } // updateChannel creates a new fully signed update for the channel, and updates // the underlying graph with the new state. func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo, edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement, *lnwire.ChannelUpdate, error) { // Parse the unsigned edge into a channel update. chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge) // We'll generate a new signature over a digest of the channel // announcement itself and update the timestamp to ensure it propagate. err := netann.SignChannelUpdate( d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate, netann.ChanUpdSetTimestamp, ) if err != nil { return nil, nil, err } // Next, we'll set the new signature in place, and update the reference // in the backing slice. edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0) edge.SigBytes = chanUpdate.Signature.ToSignatureBytes() // To ensure that our signature is valid, we'll verify it ourself // before committing it to the slice returned. err = routing.ValidateChannelUpdateAnn(d.selfKey, info.Capacity, chanUpdate) if err != nil { return nil, nil, fmt.Errorf("generated invalid channel "+ "update sig: %v", err) } // Finally, we'll write the new edge policy to disk. if err := d.cfg.Router.UpdateEdge(edge); err != nil { return nil, nil, err } // We'll also create the original channel announcement so the two can // be broadcast along side each other (if necessary), but only if we // have a full channel announcement for this channel. var chanAnn *lnwire.ChannelAnnouncement if info.AuthProof != nil { chanID := lnwire.NewShortChanIDFromInt(info.ChannelID) chanAnn = &lnwire.ChannelAnnouncement{ ShortChannelID: chanID, NodeID1: info.NodeKey1Bytes, NodeID2: info.NodeKey2Bytes, ChainHash: info.ChainHash, BitcoinKey1: info.BitcoinKey1Bytes, Features: lnwire.NewRawFeatureVector(), BitcoinKey2: info.BitcoinKey2Bytes, ExtraOpaqueData: edge.ExtraOpaqueData, } chanAnn.NodeSig1, err = lnwire.NewSigFromRawSignature( info.AuthProof.NodeSig1Bytes, ) if err != nil { return nil, nil, err } chanAnn.NodeSig2, err = lnwire.NewSigFromRawSignature( info.AuthProof.NodeSig2Bytes, ) if err != nil { return nil, nil, err } chanAnn.BitcoinSig1, err = lnwire.NewSigFromRawSignature( info.AuthProof.BitcoinSig1Bytes, ) if err != nil { return nil, nil, err } chanAnn.BitcoinSig2, err = lnwire.NewSigFromRawSignature( info.AuthProof.BitcoinSig2Bytes, ) if err != nil { return nil, nil, err } } return chanAnn, chanUpdate, err } // SyncManager returns the gossiper's SyncManager instance. func (d *AuthenticatedGossiper) SyncManager() *SyncManager { return d.syncMgr } // IsKeepAliveUpdate determines whether this channel update is considered a // keep-alive update based on the previous channel update processed for the same // direction. func IsKeepAliveUpdate(update *lnwire.ChannelUpdate, prev *channeldb.ChannelEdgePolicy) bool { // Both updates should be from the same direction. if update.ChannelFlags&lnwire.ChanUpdateDirection != prev.ChannelFlags&lnwire.ChanUpdateDirection { return false } // The timestamp should always increase for a keep-alive update. timestamp := time.Unix(int64(update.Timestamp), 0) if !timestamp.After(prev.LastUpdate) { return false } // None of the remaining fields should change for a keep-alive update. if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() { return false } if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat { return false } if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths { return false } if update.TimeLockDelta != prev.TimeLockDelta { return false } if update.HtlcMinimumMsat != prev.MinHTLC { return false } if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() { return false } if update.HtlcMaximumMsat != prev.MaxHTLC { return false } if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) { return false } return true } // latestHeight returns the gossiper's latest height known of the chain. func (d *AuthenticatedGossiper) latestHeight() uint32 { d.Lock() defer d.Unlock() return d.bestHeight } // handleNodeAnnouncement processes a new node announcement. func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg, nodeAnn *lnwire.NodeAnnouncement, ops []batch.SchedulerOption) ([]networkMsg, bool) { timestamp := time.Unix(int64(nodeAnn.Timestamp), 0) // We'll quickly ask the router if it already has a newer update for // this node so we can skip validating signatures if not required. if d.cfg.Router.IsStaleNode(nodeAnn.NodeID, timestamp) { log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID) nMsg.err <- nil return nil, true } if err := d.addNode(nodeAnn, ops...); err != nil { log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID, err) if !routing.IsError( err, routing.ErrOutdated, routing.ErrIgnored, routing.ErrVBarrierShuttingDown, ) { log.Error(err) } nMsg.err <- err return nil, false } // In order to ensure we don't leak unadvertised nodes, we'll make a // quick check to ensure this node intends to publicly advertise itself // to the network. isPublic, err := d.cfg.Router.IsPublicNode(nodeAnn.NodeID) if err != nil { log.Errorf("Unable to determine if node %x is advertised: %v", nodeAnn.NodeID, err) nMsg.err <- err return nil, false } var announcements []networkMsg // If it does, we'll add their announcement to our batch so that it can // be broadcast to the rest of our peers. if isPublic { announcements = append(announcements, networkMsg{ peer: nMsg.peer, source: nMsg.source, msg: nodeAnn, }) } else { log.Tracef("Skipping broadcasting node announcement for %x "+ "due to being unadvertised", nodeAnn.NodeID) } nMsg.err <- nil // TODO(roasbeef): get rid of the above return announcements, true } // handleChanAnnouncement processes a new channel announcement. func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, ann *lnwire.ChannelAnnouncement, ops []batch.SchedulerOption) ([]networkMsg, bool) { // We'll ignore any channel announcements that target any chain other // than the set of chains we know of. if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) { err := fmt.Errorf("ignoring ChannelAnnouncement from chain=%v"+ ", gossiper on chain=%v", ann.ChainHash, d.cfg.ChainHash) log.Errorf(err.Error()) key := newRejectCacheKey( ann.ShortChannelID.ToUint64(), sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) nMsg.err <- err return nil, false } // If the advertised inclusionary block is beyond our knowledge of the // chain tip, then we'll ignore it for now. d.Lock() if nMsg.isRemote && d.isPremature(ann.ShortChannelID, 0, nMsg) { log.Warnf("Announcement for chan_id=(%v), is premature: "+ "advertises height %v, only height %v is known", ann.ShortChannelID.ToUint64(), ann.ShortChannelID.BlockHeight, d.bestHeight) d.Unlock() nMsg.err <- nil return nil, false } d.Unlock() // At this point, we'll now ask the router if this is a zombie/known // edge. If so we can skip all the processing below. if d.cfg.Router.IsKnownEdge(ann.ShortChannelID) { nMsg.err <- nil return nil, true } // If this is a remote channel announcement, then we'll validate all // the signatures within the proof as it should be well formed. var proof *channeldb.ChannelAuthProof if nMsg.isRemote { if err := routing.ValidateChannelAnn(ann); err != nil { err := fmt.Errorf("unable to validate announcement: "+ "%v", err) key := newRejectCacheKey( ann.ShortChannelID.ToUint64(), sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) log.Error(err) nMsg.err <- err return nil, false } // If the proof checks out, then we'll save the proof itself to // the database so we can fetch it later when gossiping with // other nodes. proof = &channeldb.ChannelAuthProof{ NodeSig1Bytes: ann.NodeSig1.ToSignatureBytes(), NodeSig2Bytes: ann.NodeSig2.ToSignatureBytes(), BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(), BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(), } } // With the proof validated (if necessary), we can now store it within // the database for our path finding and syncing needs. var featureBuf bytes.Buffer if err := ann.Features.Encode(&featureBuf); err != nil { log.Errorf("unable to encode features: %v", err) nMsg.err <- err return nil, false } edge := &channeldb.ChannelEdgeInfo{ ChannelID: ann.ShortChannelID.ToUint64(), ChainHash: ann.ChainHash, NodeKey1Bytes: ann.NodeID1, NodeKey2Bytes: ann.NodeID2, BitcoinKey1Bytes: ann.BitcoinKey1, BitcoinKey2Bytes: ann.BitcoinKey2, AuthProof: proof, Features: featureBuf.Bytes(), ExtraOpaqueData: ann.ExtraOpaqueData, } // If there were any optional message fields provided, we'll include // them in its serialized disk representation now. if nMsg.optionalMsgFields != nil { if nMsg.optionalMsgFields.capacity != nil { edge.Capacity = *nMsg.optionalMsgFields.capacity } if nMsg.optionalMsgFields.channelPoint != nil { cp := *nMsg.optionalMsgFields.channelPoint edge.ChannelPoint = cp } } // We will add the edge to the channel router. If the nodes present in // this channel are not present in the database, a partial node will be // added to represent each node while we wait for a node announcement. // // Before we add the edge to the database, we obtain the mutex for this // channel ID. We do this to ensure no other goroutine has read the // database and is now making decisions based on this DB state, before // it writes to the DB. d.channelMtx.Lock(ann.ShortChannelID.ToUint64()) err := d.cfg.Router.AddEdge(edge, ops...) if err != nil { defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64()) // If the edge was rejected due to already being known, then it // may be the case that this new message has a fresh channel // proof, so we'll check. if routing.IsError(err, routing.ErrIgnored) { // Attempt to process the rejected message to see if we // get any new announcements. anns, rErr := d.processRejectedEdge(ann, proof) if rErr != nil { key := newRejectCacheKey( ann.ShortChannelID.ToUint64(), sourceToPub(nMsg.source), ) cr := &cachedReject{} _, _ = d.recentRejects.Put(key, cr) nMsg.err <- rErr return nil, false } // If while processing this rejected edge, we realized // there's a set of announcements we could extract, // then we'll return those directly. if len(anns) != 0 { nMsg.err <- nil return anns, true } // Otherwise, this is just a regular rejected edge. log.Debugf("Router rejected channel edge: %v", err) } else { log.Debugf("Router rejected channel edge: %v", err) key := newRejectCacheKey( ann.ShortChannelID.ToUint64(), sourceToPub(nMsg.source), ) _, _ = d.recentRejects.Put(key, &cachedReject{}) } nMsg.err <- err return nil, false } // If err is nil, release the lock immediately. d.channelMtx.Unlock(ann.ShortChannelID.ToUint64()) // If we earlier received any ChannelUpdates for this channel, we can // now process them, as the channel is added to the graph. shortChanID := ann.ShortChannelID.ToUint64() var channelUpdates []*networkMsg earlyChanUpdates, err := d.prematureChannelUpdates.Get(shortChanID) if err == nil { // There was actually an entry in the map, so we'll accumulate // it. We don't worry about deletion, since it'll eventually // fall out anyway. chanMsgs := earlyChanUpdates.(*cachedNetworkMsg) channelUpdates = append(channelUpdates, chanMsgs.msgs...) } // Launch a new goroutine to handle each ChannelUpdate, this is to // ensure we don't block here, as we can handle only one announcement // at a time. for _, cu := range channelUpdates { d.wg.Add(1) go func(updMsg *networkMsg) { defer d.wg.Done() switch msg := updMsg.msg.(type) { // Reprocess the message, making sure we return an // error to the original caller in case the gossiper // shuts down. case *lnwire.ChannelUpdate: log.Debugf("Reprocessing ChannelUpdate for "+ "shortChanID=%v", msg.ShortChannelID.ToUint64()) select { case d.networkMsgs <- updMsg: case <-d.quit: updMsg.err <- ErrGossiperShuttingDown } // We don't expect any other message type than // ChannelUpdate to be in this cache. default: log.Errorf("Unsupported message type found "+ "among ChannelUpdates: %T", msg) } }(cu) } // Channel announcement was successfully processed and now it might be // broadcast to other connected nodes if it was an announcement with // proof (remote). var announcements []networkMsg if proof != nil { announcements = append(announcements, networkMsg{ peer: nMsg.peer, source: nMsg.source, msg: ann, }) } nMsg.err <- nil return announcements, true }