mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-01-18 21:35:24 +01:00
3343 lines
106 KiB
Go
3343 lines
106 KiB
Go
package discovery
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/btcsuite/btcd/btcec/v2"
|
|
"github.com/btcsuite/btcd/btcec/v2/ecdsa"
|
|
"github.com/btcsuite/btcd/btcutil"
|
|
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
|
"github.com/btcsuite/btcd/wire"
|
|
"github.com/davecgh/go-spew/spew"
|
|
"github.com/lightninglabs/neutrino/cache"
|
|
"github.com/lightninglabs/neutrino/cache/lru"
|
|
"github.com/lightningnetwork/lnd/batch"
|
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
|
"github.com/lightningnetwork/lnd/channeldb"
|
|
"github.com/lightningnetwork/lnd/keychain"
|
|
"github.com/lightningnetwork/lnd/kvdb"
|
|
"github.com/lightningnetwork/lnd/lnpeer"
|
|
"github.com/lightningnetwork/lnd/lnutils"
|
|
"github.com/lightningnetwork/lnd/lnwallet"
|
|
"github.com/lightningnetwork/lnd/lnwire"
|
|
"github.com/lightningnetwork/lnd/multimutex"
|
|
"github.com/lightningnetwork/lnd/netann"
|
|
"github.com/lightningnetwork/lnd/routing"
|
|
"github.com/lightningnetwork/lnd/routing/route"
|
|
"github.com/lightningnetwork/lnd/ticker"
|
|
"golang.org/x/time/rate"
|
|
)
|
|
|
|
const (
|
|
// DefaultMaxChannelUpdateBurst is the default maximum number of updates
|
|
// for a specific channel and direction that we'll accept over an
|
|
// interval.
|
|
DefaultMaxChannelUpdateBurst = 10
|
|
|
|
// DefaultChannelUpdateInterval is the default interval we'll use to
|
|
// determine how often we should allow a new update for a specific
|
|
// channel and direction.
|
|
DefaultChannelUpdateInterval = time.Minute
|
|
|
|
// maxPrematureUpdates tracks the max amount of premature channel
|
|
// updates that we'll hold onto.
|
|
maxPrematureUpdates = 100
|
|
|
|
// maxFutureMessages tracks the max amount of future messages that
|
|
// we'll hold onto.
|
|
maxFutureMessages = 1000
|
|
|
|
// DefaultSubBatchDelay is the default delay we'll use when
|
|
// broadcasting the next announcement batch.
|
|
DefaultSubBatchDelay = 5 * time.Second
|
|
|
|
// maxRejectedUpdates tracks the max amount of rejected channel updates
|
|
// we'll maintain. This is the global size across all peers. We'll
|
|
// allocate ~3 MB max to the cache.
|
|
maxRejectedUpdates = 10_000
|
|
)
|
|
|
|
var (
|
|
// ErrGossiperShuttingDown is an error that is returned if the gossiper
|
|
// is in the process of being shut down.
|
|
ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
|
|
|
|
// ErrGossipSyncerNotFound signals that we were unable to find an active
|
|
// gossip syncer corresponding to a gossip query message received from
|
|
// the remote peer.
|
|
ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
|
|
|
|
// emptyPubkey is used to compare compressed pubkeys against an empty
|
|
// byte array.
|
|
emptyPubkey [33]byte
|
|
)
|
|
|
|
// optionalMsgFields is a set of optional message fields that external callers
|
|
// can provide that serve useful when processing a specific network
|
|
// announcement.
|
|
type optionalMsgFields struct {
|
|
capacity *btcutil.Amount
|
|
channelPoint *wire.OutPoint
|
|
remoteAlias *lnwire.ShortChannelID
|
|
}
|
|
|
|
// apply applies the optional fields within the functional options.
|
|
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
|
|
for _, optionalMsgField := range optionalMsgFields {
|
|
optionalMsgField(f)
|
|
}
|
|
}
|
|
|
|
// OptionalMsgField is a functional option parameter that can be used to provide
|
|
// external information that is not included within a network message but serves
|
|
// useful when processing it.
|
|
type OptionalMsgField func(*optionalMsgFields)
|
|
|
|
// ChannelCapacity is an optional field that lets the gossiper know of the
|
|
// capacity of a channel.
|
|
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
|
|
return func(f *optionalMsgFields) {
|
|
f.capacity = &capacity
|
|
}
|
|
}
|
|
|
|
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
|
|
// of a channel.
|
|
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
|
|
return func(f *optionalMsgFields) {
|
|
f.channelPoint = &op
|
|
}
|
|
}
|
|
|
|
// RemoteAlias is an optional field that lets the gossiper know that a locally
|
|
// sent channel update is actually an update for the peer that should replace
|
|
// the ShortChannelID field with the remote's alias. This is only used for
|
|
// channels with peers where the option-scid-alias feature bit was negotiated.
|
|
// The channel update will be added to the graph under the original SCID, but
|
|
// will be modified and re-signed with this alias.
|
|
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
|
|
return func(f *optionalMsgFields) {
|
|
f.remoteAlias = alias
|
|
}
|
|
}
|
|
|
|
// networkMsg couples a routing related wire message with the peer that
|
|
// originally sent it.
|
|
type networkMsg struct {
|
|
peer lnpeer.Peer
|
|
source *btcec.PublicKey
|
|
msg lnwire.Message
|
|
optionalMsgFields *optionalMsgFields
|
|
|
|
isRemote bool
|
|
|
|
err chan error
|
|
}
|
|
|
|
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
|
|
// wishes to update a particular set of channels. New ChannelUpdate messages
|
|
// will be crafted to be sent out during the next broadcast epoch and the fee
|
|
// updates committed to the lower layer.
|
|
type chanPolicyUpdateRequest struct {
|
|
edgesToUpdate []EdgeWithInfo
|
|
errChan chan error
|
|
}
|
|
|
|
// PinnedSyncers is a set of node pubkeys for which we will maintain an active
|
|
// syncer at all times.
|
|
type PinnedSyncers map[route.Vertex]struct{}
|
|
|
|
// Config defines the configuration for the service. ALL elements within the
|
|
// configuration MUST be non-nil for the service to carry out its duties.
|
|
type Config struct {
|
|
// ChainHash is a hash that indicates which resident chain of the
|
|
// AuthenticatedGossiper. Any announcements that don't match this
|
|
// chain hash will be ignored.
|
|
//
|
|
// TODO(roasbeef): eventually make into map so can de-multiplex
|
|
// incoming announcements
|
|
// * also need to do same for Notifier
|
|
ChainHash chainhash.Hash
|
|
|
|
// Router is the subsystem which is responsible for managing the
|
|
// topology of lightning network. After incoming channel, node, channel
|
|
// updates announcements are validated they are sent to the router in
|
|
// order to be included in the LN graph.
|
|
Router routing.ChannelGraphSource
|
|
|
|
// ChanSeries is an interfaces that provides access to a time series
|
|
// view of the current known channel graph. Each GossipSyncer enabled
|
|
// peer will utilize this in order to create and respond to channel
|
|
// graph time series queries.
|
|
ChanSeries ChannelGraphTimeSeries
|
|
|
|
// Notifier is used for receiving notifications of incoming blocks.
|
|
// With each new incoming block found we process previously premature
|
|
// announcements.
|
|
//
|
|
// TODO(roasbeef): could possibly just replace this with an epoch
|
|
// channel.
|
|
Notifier chainntnfs.ChainNotifier
|
|
|
|
// Broadcast broadcasts a particular set of announcements to all peers
|
|
// that the daemon is connected to. If supplied, the exclude parameter
|
|
// indicates that the target peer should be excluded from the
|
|
// broadcast.
|
|
Broadcast func(skips map[route.Vertex]struct{},
|
|
msg ...lnwire.Message) error
|
|
|
|
// NotifyWhenOnline is a function that allows the gossiper to be
|
|
// notified when a certain peer comes online, allowing it to
|
|
// retry sending a peer message.
|
|
//
|
|
// NOTE: The peerChan channel must be buffered.
|
|
NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer)
|
|
|
|
// NotifyWhenOffline is a function that allows the gossiper to be
|
|
// notified when a certain peer disconnects, allowing it to request a
|
|
// notification for when it reconnects.
|
|
NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
|
|
|
|
// SelfNodeAnnouncement is a function that fetches our own current node
|
|
// announcement, for use when determining whether we should update our
|
|
// peers about our presence on the network. If the refresh is true, a
|
|
// new and updated announcement will be returned.
|
|
SelfNodeAnnouncement func(refresh bool) (lnwire.NodeAnnouncement, error)
|
|
|
|
// ProofMatureDelta the number of confirmations which is needed before
|
|
// exchange the channel announcement proofs.
|
|
ProofMatureDelta uint32
|
|
|
|
// TrickleDelay the period of trickle timer which flushes to the
|
|
// network the pending batch of new announcements we've received since
|
|
// the last trickle tick.
|
|
TrickleDelay time.Duration
|
|
|
|
// RetransmitTicker is a ticker that ticks with a period which
|
|
// indicates that we should check if we need re-broadcast any of our
|
|
// personal channels.
|
|
RetransmitTicker ticker.Ticker
|
|
|
|
// RebroadcastInterval is the maximum time we wait between sending out
|
|
// channel updates for our active channels and our own node
|
|
// announcement. We do this to ensure our active presence on the
|
|
// network is known, and we are not being considered a zombie node or
|
|
// having zombie channels.
|
|
RebroadcastInterval time.Duration
|
|
|
|
// WaitingProofStore is a persistent storage of partial channel proof
|
|
// announcement messages. We use it to buffer half of the material
|
|
// needed to reconstruct a full authenticated channel announcement.
|
|
// Once we receive the other half the channel proof, we'll be able to
|
|
// properly validate it and re-broadcast it out to the network.
|
|
//
|
|
// TODO(wilmer): make interface to prevent channeldb dependency.
|
|
WaitingProofStore *channeldb.WaitingProofStore
|
|
|
|
// MessageStore is a persistent storage of gossip messages which we will
|
|
// use to determine which messages need to be resent for a given peer.
|
|
MessageStore GossipMessageStore
|
|
|
|
// AnnSigner is an instance of the MessageSigner interface which will
|
|
// be used to manually sign any outgoing channel updates. The signer
|
|
// implementation should be backed by the public key of the backing
|
|
// Lightning node.
|
|
//
|
|
// TODO(roasbeef): extract ann crafting + sign from fundingMgr into
|
|
// here?
|
|
AnnSigner lnwallet.MessageSigner
|
|
|
|
// NumActiveSyncers is the number of peers for which we should have
|
|
// active syncers with. After reaching NumActiveSyncers, any future
|
|
// gossip syncers will be passive.
|
|
NumActiveSyncers int
|
|
|
|
// RotateTicker is a ticker responsible for notifying the SyncManager
|
|
// when it should rotate its active syncers. A single active syncer with
|
|
// a chansSynced state will be exchanged for a passive syncer in order
|
|
// to ensure we don't keep syncing with the same peers.
|
|
RotateTicker ticker.Ticker
|
|
|
|
// HistoricalSyncTicker is a ticker responsible for notifying the
|
|
// syncManager when it should attempt a historical sync with a gossip
|
|
// sync peer.
|
|
HistoricalSyncTicker ticker.Ticker
|
|
|
|
// ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
|
|
// syncManager when it should attempt to start the next pending
|
|
// activeSyncer due to the current one not completing its state machine
|
|
// within the timeout.
|
|
ActiveSyncerTimeoutTicker ticker.Ticker
|
|
|
|
// MinimumBatchSize is minimum size of a sub batch of announcement
|
|
// messages.
|
|
MinimumBatchSize int
|
|
|
|
// SubBatchDelay is the delay between sending sub batches of
|
|
// gossip messages.
|
|
SubBatchDelay time.Duration
|
|
|
|
// IgnoreHistoricalFilters will prevent syncers from replying with
|
|
// historical data when the remote peer sets a gossip_timestamp_range.
|
|
// This prevents ranges with old start times from causing us to dump the
|
|
// graph on connect.
|
|
IgnoreHistoricalFilters bool
|
|
|
|
// PinnedSyncers is a set of peers that will always transition to
|
|
// ActiveSync upon connection. These peers will never transition to
|
|
// PassiveSync.
|
|
PinnedSyncers PinnedSyncers
|
|
|
|
// MaxChannelUpdateBurst specifies the maximum number of updates for a
|
|
// specific channel and direction that we'll accept over an interval.
|
|
MaxChannelUpdateBurst int
|
|
|
|
// ChannelUpdateInterval specifies the interval we'll use to determine
|
|
// how often we should allow a new update for a specific channel and
|
|
// direction.
|
|
ChannelUpdateInterval time.Duration
|
|
|
|
// IsAlias returns true if a given ShortChannelID is an alias for
|
|
// option_scid_alias channels.
|
|
IsAlias func(scid lnwire.ShortChannelID) bool
|
|
|
|
// SignAliasUpdate is used to re-sign a channel update using the
|
|
// remote's alias if the option-scid-alias feature bit was negotiated.
|
|
SignAliasUpdate func(u *lnwire.ChannelUpdate) (*ecdsa.Signature,
|
|
error)
|
|
|
|
// FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
|
|
// This is used for channels that have negotiated the option-scid-alias
|
|
// feature bit.
|
|
FindBaseByAlias func(alias lnwire.ShortChannelID) (
|
|
lnwire.ShortChannelID, error)
|
|
|
|
// GetAlias allows the gossiper to look up the peer's alias for a given
|
|
// ChannelID. This is used to sign updates for them if the channel has
|
|
// no AuthProof and the option-scid-alias feature bit was negotiated.
|
|
GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
|
|
|
|
// FindChannel allows the gossiper to find a channel that we're party
|
|
// to without iterating over the entire set of open channels.
|
|
FindChannel func(node *btcec.PublicKey, chanID lnwire.ChannelID) (
|
|
*channeldb.OpenChannel, error)
|
|
}
|
|
|
|
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
|
|
// used to let the caller of the lru.Cache know if a message has already been
|
|
// processed or not.
|
|
type processedNetworkMsg struct {
|
|
processed bool
|
|
msg *networkMsg
|
|
}
|
|
|
|
// cachedNetworkMsg is a wrapper around a network message that can be used with
|
|
// *lru.Cache.
|
|
type cachedNetworkMsg struct {
|
|
msgs []*processedNetworkMsg
|
|
}
|
|
|
|
// Size returns the "size" of an entry. We return the number of items as we
|
|
// just want to limit the total amount of entries rather than do accurate size
|
|
// accounting.
|
|
func (c *cachedNetworkMsg) Size() (uint64, error) {
|
|
return uint64(len(c.msgs)), nil
|
|
}
|
|
|
|
// rejectCacheKey is the cache key that we'll use to track announcements we've
|
|
// recently rejected.
|
|
type rejectCacheKey struct {
|
|
pubkey [33]byte
|
|
chanID uint64
|
|
}
|
|
|
|
// newRejectCacheKey returns a new cache key for the reject cache.
|
|
func newRejectCacheKey(cid uint64, pub [33]byte) rejectCacheKey {
|
|
k := rejectCacheKey{
|
|
chanID: cid,
|
|
pubkey: pub,
|
|
}
|
|
|
|
return k
|
|
}
|
|
|
|
// sourceToPub returns a serialized-compressed public key for use in the reject
|
|
// cache.
|
|
func sourceToPub(pk *btcec.PublicKey) [33]byte {
|
|
var pub [33]byte
|
|
copy(pub[:], pk.SerializeCompressed())
|
|
return pub
|
|
}
|
|
|
|
// cachedReject is the empty value used to track the value for rejects.
|
|
type cachedReject struct {
|
|
}
|
|
|
|
// Size returns the "size" of an entry. We return 1 as we just want to limit
|
|
// the total size.
|
|
func (c *cachedReject) Size() (uint64, error) {
|
|
return 1, nil
|
|
}
|
|
|
|
// AuthenticatedGossiper is a subsystem which is responsible for receiving
|
|
// announcements, validating them and applying the changes to router, syncing
|
|
// lightning network with newly connected nodes, broadcasting announcements
|
|
// after validation, negotiating the channel announcement proofs exchange and
|
|
// handling the premature announcements. All outgoing announcements are
|
|
// expected to be properly signed as dictated in BOLT#7, additionally, all
|
|
// incoming message are expected to be well formed and signed. Invalid messages
|
|
// will be rejected by this struct.
|
|
type AuthenticatedGossiper struct {
|
|
// Parameters which are needed to properly handle the start and stop of
|
|
// the service.
|
|
started sync.Once
|
|
stopped sync.Once
|
|
|
|
// bestHeight is the height of the block at the tip of the main chain
|
|
// as we know it. Accesses *MUST* be done with the gossiper's lock
|
|
// held.
|
|
bestHeight uint32
|
|
|
|
quit chan struct{}
|
|
wg sync.WaitGroup
|
|
|
|
// cfg is a copy of the configuration struct that the gossiper service
|
|
// was initialized with.
|
|
cfg *Config
|
|
|
|
// blockEpochs encapsulates a stream of block epochs that are sent at
|
|
// every new block height.
|
|
blockEpochs *chainntnfs.BlockEpochEvent
|
|
|
|
// prematureChannelUpdates is a map of ChannelUpdates we have received
|
|
// that wasn't associated with any channel we know about. We store
|
|
// them temporarily, such that we can reprocess them when a
|
|
// ChannelAnnouncement for the channel is received.
|
|
prematureChannelUpdates *lru.Cache[uint64, *cachedNetworkMsg]
|
|
|
|
// networkMsgs is a channel that carries new network broadcasted
|
|
// message from outside the gossiper service to be processed by the
|
|
// networkHandler.
|
|
networkMsgs chan *networkMsg
|
|
|
|
// futureMsgs is a list of premature network messages that have a block
|
|
// height specified in the future. We will save them and resend it to
|
|
// the chan networkMsgs once the block height has reached. The cached
|
|
// map format is,
|
|
// {msgID1: msg1, msgID2: msg2, ...}
|
|
futureMsgs *futureMsgCache
|
|
|
|
// chanPolicyUpdates is a channel that requests to update the
|
|
// forwarding policy of a set of channels is sent over.
|
|
chanPolicyUpdates chan *chanPolicyUpdateRequest
|
|
|
|
// selfKey is the identity public key of the backing Lightning node.
|
|
selfKey *btcec.PublicKey
|
|
|
|
// selfKeyLoc is the locator for the identity public key of the backing
|
|
// Lightning node.
|
|
selfKeyLoc keychain.KeyLocator
|
|
|
|
// channelMtx is used to restrict the database access to one
|
|
// goroutine per channel ID. This is done to ensure that when
|
|
// the gossiper is handling an announcement, the db state stays
|
|
// consistent between when the DB is first read until it's written.
|
|
channelMtx *multimutex.Mutex
|
|
|
|
recentRejects *lru.Cache[rejectCacheKey, *cachedReject]
|
|
|
|
// syncMgr is a subsystem responsible for managing the gossip syncers
|
|
// for peers currently connected. When a new peer is connected, the
|
|
// manager will create its accompanying gossip syncer and determine
|
|
// whether it should have an activeSync or passiveSync sync type based
|
|
// on how many other gossip syncers are currently active. Any activeSync
|
|
// gossip syncers are started in a round-robin manner to ensure we're
|
|
// not syncing with multiple peers at the same time.
|
|
syncMgr *SyncManager
|
|
|
|
// reliableSender is a subsystem responsible for handling reliable
|
|
// message send requests to peers. This should only be used for channels
|
|
// that are unadvertised at the time of handling the message since if it
|
|
// is advertised, then peers should be able to get the message from the
|
|
// network.
|
|
reliableSender *reliableSender
|
|
|
|
// chanUpdateRateLimiter contains rate limiters for each direction of
|
|
// a channel update we've processed. We'll use these to determine
|
|
// whether we should accept a new update for a specific channel and
|
|
// direction.
|
|
//
|
|
// NOTE: This map must be synchronized with the main
|
|
// AuthenticatedGossiper lock.
|
|
chanUpdateRateLimiter map[uint64][2]*rate.Limiter
|
|
|
|
sync.Mutex
|
|
}
|
|
|
|
// New creates a new AuthenticatedGossiper instance, initialized with the
|
|
// passed configuration parameters.
|
|
func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper {
|
|
gossiper := &AuthenticatedGossiper{
|
|
selfKey: selfKeyDesc.PubKey,
|
|
selfKeyLoc: selfKeyDesc.KeyLocator,
|
|
cfg: &cfg,
|
|
networkMsgs: make(chan *networkMsg),
|
|
futureMsgs: newFutureMsgCache(maxFutureMessages),
|
|
quit: make(chan struct{}),
|
|
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
|
|
prematureChannelUpdates: lru.NewCache[uint64, *cachedNetworkMsg]( //nolint: lll
|
|
maxPrematureUpdates,
|
|
),
|
|
channelMtx: multimutex.NewMutex(),
|
|
recentRejects: lru.NewCache[rejectCacheKey, *cachedReject](
|
|
maxRejectedUpdates,
|
|
),
|
|
chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
|
|
}
|
|
|
|
gossiper.syncMgr = newSyncManager(&SyncManagerCfg{
|
|
ChainHash: cfg.ChainHash,
|
|
ChanSeries: cfg.ChanSeries,
|
|
RotateTicker: cfg.RotateTicker,
|
|
HistoricalSyncTicker: cfg.HistoricalSyncTicker,
|
|
NumActiveSyncers: cfg.NumActiveSyncers,
|
|
IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters,
|
|
BestHeight: gossiper.latestHeight,
|
|
PinnedSyncers: cfg.PinnedSyncers,
|
|
})
|
|
|
|
gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
|
|
NotifyWhenOnline: cfg.NotifyWhenOnline,
|
|
NotifyWhenOffline: cfg.NotifyWhenOffline,
|
|
MessageStore: cfg.MessageStore,
|
|
IsMsgStale: gossiper.isMsgStale,
|
|
})
|
|
|
|
return gossiper
|
|
}
|
|
|
|
// EdgeWithInfo contains the information that is required to update an edge.
|
|
type EdgeWithInfo struct {
|
|
// Info describes the channel.
|
|
Info *channeldb.ChannelEdgeInfo
|
|
|
|
// Edge describes the policy in one direction of the channel.
|
|
Edge *channeldb.ChannelEdgePolicy
|
|
}
|
|
|
|
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
|
|
// specified edge updates. Updates are done in two stages: first, the
|
|
// AuthenticatedGossiper ensures the update has been committed by dependent
|
|
// sub-systems, then it signs and broadcasts new updates to the network. A
|
|
// mapping between outpoints and updated channel policies is returned, which is
|
|
// used to update the forwarding policies of the underlying links.
|
|
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
|
|
edgesToUpdate []EdgeWithInfo) error {
|
|
|
|
errChan := make(chan error, 1)
|
|
policyUpdate := &chanPolicyUpdateRequest{
|
|
edgesToUpdate: edgesToUpdate,
|
|
errChan: errChan,
|
|
}
|
|
|
|
select {
|
|
case d.chanPolicyUpdates <- policyUpdate:
|
|
err := <-errChan
|
|
return err
|
|
case <-d.quit:
|
|
return fmt.Errorf("AuthenticatedGossiper shutting down")
|
|
}
|
|
}
|
|
|
|
// Start spawns network messages handler goroutine and registers on new block
|
|
// notifications in order to properly handle the premature announcements.
|
|
func (d *AuthenticatedGossiper) Start() error {
|
|
var err error
|
|
d.started.Do(func() {
|
|
log.Info("Authenticated Gossiper starting")
|
|
err = d.start()
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (d *AuthenticatedGossiper) start() error {
|
|
// First we register for new notifications of newly discovered blocks.
|
|
// We do this immediately so we'll later be able to consume any/all
|
|
// blocks which were discovered.
|
|
blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
d.blockEpochs = blockEpochs
|
|
|
|
height, err := d.cfg.Router.CurrentBlockHeight()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
d.bestHeight = height
|
|
|
|
// Start the reliable sender. In case we had any pending messages ready
|
|
// to be sent when the gossiper was last shut down, we must continue on
|
|
// our quest to deliver them to their respective peers.
|
|
if err := d.reliableSender.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
d.syncMgr.Start()
|
|
|
|
// Start receiving blocks in its dedicated goroutine.
|
|
d.wg.Add(2)
|
|
go d.syncBlockHeight()
|
|
go d.networkHandler()
|
|
|
|
return nil
|
|
}
|
|
|
|
// syncBlockHeight syncs the best block height for the gossiper by reading
|
|
// blockEpochs.
|
|
//
|
|
// NOTE: must be run as a goroutine.
|
|
func (d *AuthenticatedGossiper) syncBlockHeight() {
|
|
defer d.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
// A new block has arrived, so we can re-process the previously
|
|
// premature announcements.
|
|
case newBlock, ok := <-d.blockEpochs.Epochs:
|
|
// If the channel has been closed, then this indicates
|
|
// the daemon is shutting down, so we exit ourselves.
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// Once a new block arrives, we update our running
|
|
// track of the height of the chain tip.
|
|
d.Lock()
|
|
blockHeight := uint32(newBlock.Height)
|
|
d.bestHeight = blockHeight
|
|
d.Unlock()
|
|
|
|
log.Debugf("New block: height=%d, hash=%s", blockHeight,
|
|
newBlock.Hash)
|
|
|
|
// Resend future messages, if any.
|
|
d.resendFutureMessages(blockHeight)
|
|
|
|
case <-d.quit:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// futureMsgCache embeds a `lru.Cache` with a message counter that's served as
|
|
// the unique ID when saving the message.
|
|
type futureMsgCache struct {
|
|
*lru.Cache[uint64, *cachedFutureMsg]
|
|
|
|
// msgID is a monotonically increased integer.
|
|
msgID atomic.Uint64
|
|
}
|
|
|
|
// nextMsgID returns a unique message ID.
|
|
func (f *futureMsgCache) nextMsgID() uint64 {
|
|
return f.msgID.Add(1)
|
|
}
|
|
|
|
// newFutureMsgCache creates a new future message cache with the underlying lru
|
|
// cache being initialized with the specified capacity.
|
|
func newFutureMsgCache(capacity uint64) *futureMsgCache {
|
|
// Create a new cache.
|
|
cache := lru.NewCache[uint64, *cachedFutureMsg](capacity)
|
|
|
|
return &futureMsgCache{
|
|
Cache: cache,
|
|
}
|
|
}
|
|
|
|
// cachedFutureMsg is a future message that's saved to the `futureMsgCache`.
|
|
type cachedFutureMsg struct {
|
|
// msg is the network message.
|
|
msg *networkMsg
|
|
|
|
// height is the block height.
|
|
height uint32
|
|
}
|
|
|
|
// Size returns the size of the message.
|
|
func (c *cachedFutureMsg) Size() (uint64, error) {
|
|
// Return a constant 1.
|
|
return 1, nil
|
|
}
|
|
|
|
// resendFutureMessages takes a block height, resends all the future messages
|
|
// found below and equal to that height and deletes those messages found in the
|
|
// gossiper's futureMsgs.
|
|
func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
|
|
var (
|
|
// msgs are the target messages.
|
|
msgs []*networkMsg
|
|
|
|
// keys are the target messages' caching keys.
|
|
keys []uint64
|
|
)
|
|
|
|
// filterMsgs is the visitor used when iterating the future cache.
|
|
filterMsgs := func(k uint64, cmsg *cachedFutureMsg) bool {
|
|
if cmsg.height <= height {
|
|
msgs = append(msgs, cmsg.msg)
|
|
keys = append(keys, k)
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// Filter out the target messages.
|
|
d.futureMsgs.Range(filterMsgs)
|
|
|
|
// Return early if no messages found.
|
|
if len(msgs) == 0 {
|
|
return
|
|
}
|
|
|
|
// Remove the filtered messages.
|
|
for _, key := range keys {
|
|
d.futureMsgs.Delete(key)
|
|
}
|
|
|
|
log.Debugf("Resending %d network messages at height %d",
|
|
len(msgs), height)
|
|
|
|
for _, msg := range msgs {
|
|
select {
|
|
case d.networkMsgs <- msg:
|
|
case <-d.quit:
|
|
msg.err <- ErrGossiperShuttingDown
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop signals any active goroutines for a graceful closure.
|
|
func (d *AuthenticatedGossiper) Stop() error {
|
|
d.stopped.Do(func() {
|
|
log.Info("Authenticated gossiper shutting down")
|
|
d.stop()
|
|
})
|
|
return nil
|
|
}
|
|
|
|
func (d *AuthenticatedGossiper) stop() {
|
|
log.Info("Authenticated Gossiper is stopping")
|
|
defer log.Info("Authenticated Gossiper stopped")
|
|
|
|
d.blockEpochs.Cancel()
|
|
|
|
d.syncMgr.Stop()
|
|
|
|
close(d.quit)
|
|
d.wg.Wait()
|
|
|
|
// We'll stop our reliable sender after all of the gossiper's goroutines
|
|
// have exited to ensure nothing can cause it to continue executing.
|
|
d.reliableSender.Stop()
|
|
}
|
|
|
|
// TODO(roasbeef): need method to get current gossip timestamp?
|
|
// * using mtx, check time rotate forward is needed?
|
|
|
|
// ProcessRemoteAnnouncement sends a new remote announcement message along with
|
|
// the peer that sent the routing message. The announcement will be processed
|
|
// then added to a queue for batched trickled announcement to all connected
|
|
// peers. Remote channel announcements should contain the announcement proof
|
|
// and be fully validated.
|
|
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
|
|
peer lnpeer.Peer) chan error {
|
|
|
|
errChan := make(chan error, 1)
|
|
|
|
// For messages in the known set of channel series queries, we'll
|
|
// dispatch the message directly to the GossipSyncer, and skip the main
|
|
// processing loop.
|
|
switch m := msg.(type) {
|
|
case *lnwire.QueryShortChanIDs,
|
|
*lnwire.QueryChannelRange,
|
|
*lnwire.ReplyChannelRange,
|
|
*lnwire.ReplyShortChanIDsEnd:
|
|
|
|
syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
|
|
if !ok {
|
|
log.Warnf("Gossip syncer for peer=%x not found",
|
|
peer.PubKey())
|
|
|
|
errChan <- ErrGossipSyncerNotFound
|
|
return errChan
|
|
}
|
|
|
|
// If we've found the message target, then we'll dispatch the
|
|
// message directly to it.
|
|
syncer.ProcessQueryMsg(m, peer.QuitSignal())
|
|
|
|
errChan <- nil
|
|
return errChan
|
|
|
|
// If a peer is updating its current update horizon, then we'll dispatch
|
|
// that directly to the proper GossipSyncer.
|
|
case *lnwire.GossipTimestampRange:
|
|
syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
|
|
if !ok {
|
|
log.Warnf("Gossip syncer for peer=%x not found",
|
|
peer.PubKey())
|
|
|
|
errChan <- ErrGossipSyncerNotFound
|
|
return errChan
|
|
}
|
|
|
|
// If we've found the message target, then we'll dispatch the
|
|
// message directly to it.
|
|
if err := syncer.ApplyGossipFilter(m); err != nil {
|
|
log.Warnf("Unable to apply gossip filter for peer=%x: "+
|
|
"%v", peer.PubKey(), err)
|
|
|
|
errChan <- err
|
|
return errChan
|
|
}
|
|
|
|
errChan <- nil
|
|
return errChan
|
|
|
|
// To avoid inserting edges in the graph for our own channels that we
|
|
// have already closed, we ignore such channel announcements coming
|
|
// from the remote.
|
|
case *lnwire.ChannelAnnouncement:
|
|
ownKey := d.selfKey.SerializeCompressed()
|
|
ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement " +
|
|
"for own channel")
|
|
|
|
if bytes.Equal(m.NodeID1[:], ownKey) ||
|
|
bytes.Equal(m.NodeID2[:], ownKey) {
|
|
|
|
log.Warn(ownErr)
|
|
errChan <- ownErr
|
|
return errChan
|
|
}
|
|
}
|
|
|
|
nMsg := &networkMsg{
|
|
msg: msg,
|
|
isRemote: true,
|
|
peer: peer,
|
|
source: peer.IdentityKey(),
|
|
err: errChan,
|
|
}
|
|
|
|
select {
|
|
case d.networkMsgs <- nMsg:
|
|
|
|
// If the peer that sent us this error is quitting, then we don't need
|
|
// to send back an error and can return immediately.
|
|
case <-peer.QuitSignal():
|
|
return nil
|
|
case <-d.quit:
|
|
nMsg.err <- ErrGossiperShuttingDown
|
|
}
|
|
|
|
return nMsg.err
|
|
}
|
|
|
|
// ProcessLocalAnnouncement sends a new remote announcement message along with
|
|
// the peer that sent the routing message. The announcement will be processed
|
|
// then added to a queue for batched trickled announcement to all connected
|
|
// peers. Local channel announcements don't contain the announcement proof and
|
|
// will not be fully validated. Once the channel proofs are received, the
|
|
// entire channel announcement and update messages will be re-constructed and
|
|
// broadcast to the rest of the network.
|
|
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
|
|
optionalFields ...OptionalMsgField) chan error {
|
|
|
|
optionalMsgFields := &optionalMsgFields{}
|
|
optionalMsgFields.apply(optionalFields...)
|
|
|
|
nMsg := &networkMsg{
|
|
msg: msg,
|
|
optionalMsgFields: optionalMsgFields,
|
|
isRemote: false,
|
|
source: d.selfKey,
|
|
err: make(chan error, 1),
|
|
}
|
|
|
|
select {
|
|
case d.networkMsgs <- nMsg:
|
|
case <-d.quit:
|
|
nMsg.err <- ErrGossiperShuttingDown
|
|
}
|
|
|
|
return nMsg.err
|
|
}
|
|
|
|
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
|
|
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
|
|
// tuple.
|
|
type channelUpdateID struct {
|
|
// channelID represents the set of data which is needed to
|
|
// retrieve all necessary data to validate the channel existence.
|
|
channelID lnwire.ShortChannelID
|
|
|
|
// Flags least-significant bit must be set to 0 if the creating node
|
|
// corresponds to the first node in the previously sent channel
|
|
// announcement and 1 otherwise.
|
|
flags lnwire.ChanUpdateChanFlags
|
|
}
|
|
|
|
// msgWithSenders is a wrapper struct around a message, and the set of peers
|
|
// that originally sent us this message. Using this struct, we can ensure that
|
|
// we don't re-send a message to the peer that sent it to us in the first
|
|
// place.
|
|
type msgWithSenders struct {
|
|
// msg is the wire message itself.
|
|
msg lnwire.Message
|
|
|
|
// isLocal is true if this was a message that originated locally. We'll
|
|
// use this to bypass our normal checks to ensure we prioritize sending
|
|
// out our own updates.
|
|
isLocal bool
|
|
|
|
// sender is the set of peers that sent us this message.
|
|
senders map[route.Vertex]struct{}
|
|
}
|
|
|
|
// mergeSyncerMap is used to merge the set of senders of a particular message
|
|
// with peers that we have an active GossipSyncer with. We do this to ensure
|
|
// that we don't broadcast messages to any peers that we have active gossip
|
|
// syncers for.
|
|
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
|
|
for peerPub := range syncers {
|
|
m.senders[peerPub] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// deDupedAnnouncements de-duplicates announcements that have been added to the
|
|
// batch. Internally, announcements are stored in three maps
|
|
// (one each for channel announcements, channel updates, and node
|
|
// announcements). These maps keep track of unique announcements and ensure no
|
|
// announcements are duplicated. We keep the three message types separate, such
|
|
// that we can send channel announcements first, then channel updates, and
|
|
// finally node announcements when it's time to broadcast them.
|
|
type deDupedAnnouncements struct {
|
|
// channelAnnouncements are identified by the short channel id field.
|
|
channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
|
|
|
|
// channelUpdates are identified by the channel update id field.
|
|
channelUpdates map[channelUpdateID]msgWithSenders
|
|
|
|
// nodeAnnouncements are identified by the Vertex field.
|
|
nodeAnnouncements map[route.Vertex]msgWithSenders
|
|
|
|
sync.Mutex
|
|
}
|
|
|
|
// Reset operates on deDupedAnnouncements to reset the storage of
|
|
// announcements.
|
|
func (d *deDupedAnnouncements) Reset() {
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
|
|
d.reset()
|
|
}
|
|
|
|
// reset is the private version of the Reset method. We have this so we can
|
|
// call this method within method that are already holding the lock.
|
|
func (d *deDupedAnnouncements) reset() {
|
|
// Storage of each type of announcement (channel announcements, channel
|
|
// updates, node announcements) is set to an empty map where the
|
|
// appropriate key points to the corresponding lnwire.Message.
|
|
d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
|
|
d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
|
|
d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
|
|
}
|
|
|
|
// addMsg adds a new message to the current batch. If the message is already
|
|
// present in the current batch, then this new instance replaces the latter,
|
|
// and the set of senders is updated to reflect which node sent us this
|
|
// message.
|
|
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
|
|
log.Tracef("Adding network message: %v to batch", message.msg.MsgType())
|
|
|
|
// Depending on the message type (channel announcement, channel update,
|
|
// or node announcement), the message is added to the corresponding map
|
|
// in deDupedAnnouncements. Because each identifying key can have at
|
|
// most one value, the announcements are de-duplicated, with newer ones
|
|
// replacing older ones.
|
|
switch msg := message.msg.(type) {
|
|
|
|
// Channel announcements are identified by the short channel id field.
|
|
case *lnwire.ChannelAnnouncement:
|
|
deDupKey := msg.ShortChannelID
|
|
sender := route.NewVertex(message.source)
|
|
|
|
mws, ok := d.channelAnnouncements[deDupKey]
|
|
if !ok {
|
|
mws = msgWithSenders{
|
|
msg: msg,
|
|
isLocal: !message.isRemote,
|
|
senders: make(map[route.Vertex]struct{}),
|
|
}
|
|
mws.senders[sender] = struct{}{}
|
|
|
|
d.channelAnnouncements[deDupKey] = mws
|
|
|
|
return
|
|
}
|
|
|
|
mws.msg = msg
|
|
mws.senders[sender] = struct{}{}
|
|
d.channelAnnouncements[deDupKey] = mws
|
|
|
|
// Channel updates are identified by the (short channel id,
|
|
// channelflags) tuple.
|
|
case *lnwire.ChannelUpdate:
|
|
sender := route.NewVertex(message.source)
|
|
deDupKey := channelUpdateID{
|
|
msg.ShortChannelID,
|
|
msg.ChannelFlags,
|
|
}
|
|
|
|
oldTimestamp := uint32(0)
|
|
mws, ok := d.channelUpdates[deDupKey]
|
|
if ok {
|
|
// If we already have seen this message, record its
|
|
// timestamp.
|
|
oldTimestamp = mws.msg.(*lnwire.ChannelUpdate).Timestamp
|
|
}
|
|
|
|
// If we already had this message with a strictly newer
|
|
// timestamp, then we'll just discard the message we got.
|
|
if oldTimestamp > msg.Timestamp {
|
|
log.Debugf("Ignored outdated network message: "+
|
|
"peer=%v, msg=%s", message.peer, msg.MsgType())
|
|
return
|
|
}
|
|
|
|
// If the message we just got is newer than what we previously
|
|
// have seen, or this is the first time we see it, then we'll
|
|
// add it to our map of announcements.
|
|
if oldTimestamp < msg.Timestamp {
|
|
mws = msgWithSenders{
|
|
msg: msg,
|
|
isLocal: !message.isRemote,
|
|
senders: make(map[route.Vertex]struct{}),
|
|
}
|
|
|
|
// We'll mark the sender of the message in the
|
|
// senders map.
|
|
mws.senders[sender] = struct{}{}
|
|
|
|
d.channelUpdates[deDupKey] = mws
|
|
|
|
return
|
|
}
|
|
|
|
// Lastly, if we had seen this exact message from before, with
|
|
// the same timestamp, we'll add the sender to the map of
|
|
// senders, such that we can skip sending this message back in
|
|
// the next batch.
|
|
mws.msg = msg
|
|
mws.senders[sender] = struct{}{}
|
|
d.channelUpdates[deDupKey] = mws
|
|
|
|
// Node announcements are identified by the Vertex field. Use the
|
|
// NodeID to create the corresponding Vertex.
|
|
case *lnwire.NodeAnnouncement:
|
|
sender := route.NewVertex(message.source)
|
|
deDupKey := route.Vertex(msg.NodeID)
|
|
|
|
// We do the same for node announcements as we did for channel
|
|
// updates, as they also carry a timestamp.
|
|
oldTimestamp := uint32(0)
|
|
mws, ok := d.nodeAnnouncements[deDupKey]
|
|
if ok {
|
|
oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp
|
|
}
|
|
|
|
// Discard the message if it's old.
|
|
if oldTimestamp > msg.Timestamp {
|
|
return
|
|
}
|
|
|
|
// Replace if it's newer.
|
|
if oldTimestamp < msg.Timestamp {
|
|
mws = msgWithSenders{
|
|
msg: msg,
|
|
isLocal: !message.isRemote,
|
|
senders: make(map[route.Vertex]struct{}),
|
|
}
|
|
|
|
mws.senders[sender] = struct{}{}
|
|
|
|
d.nodeAnnouncements[deDupKey] = mws
|
|
|
|
return
|
|
}
|
|
|
|
// Add to senders map if it's the same as we had.
|
|
mws.msg = msg
|
|
mws.senders[sender] = struct{}{}
|
|
d.nodeAnnouncements[deDupKey] = mws
|
|
}
|
|
}
|
|
|
|
// AddMsgs is a helper method to add multiple messages to the announcement
|
|
// batch.
|
|
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
|
|
for _, msg := range msgs {
|
|
d.addMsg(msg)
|
|
}
|
|
}
|
|
|
|
// msgsToBroadcast is returned by Emit() and partitions the messages we'd like
|
|
// to broadcast next into messages that are locally sourced and those that are
|
|
// sourced remotely.
|
|
type msgsToBroadcast struct {
|
|
// localMsgs is the set of messages we created locally.
|
|
localMsgs []msgWithSenders
|
|
|
|
// remoteMsgs is the set of messages that we received from a remote
|
|
// party.
|
|
remoteMsgs []msgWithSenders
|
|
}
|
|
|
|
// addMsg adds a new message to the appropriate sub-slice.
|
|
func (m *msgsToBroadcast) addMsg(msg msgWithSenders) {
|
|
if msg.isLocal {
|
|
m.localMsgs = append(m.localMsgs, msg)
|
|
} else {
|
|
m.remoteMsgs = append(m.remoteMsgs, msg)
|
|
}
|
|
}
|
|
|
|
// isEmpty returns true if the batch is empty.
|
|
func (m *msgsToBroadcast) isEmpty() bool {
|
|
return len(m.localMsgs) == 0 && len(m.remoteMsgs) == 0
|
|
}
|
|
|
|
// length returns the length of the combined message set.
|
|
func (m *msgsToBroadcast) length() int {
|
|
return len(m.localMsgs) + len(m.remoteMsgs)
|
|
}
|
|
|
|
// Emit returns the set of de-duplicated announcements to be sent out during
|
|
// the next announcement epoch, in the order of channel announcements, channel
|
|
// updates, and node announcements. Each message emitted, contains the set of
|
|
// peers that sent us the message. This way, we can ensure that we don't waste
|
|
// bandwidth by re-sending a message to the peer that sent it to us in the
|
|
// first place. Additionally, the set of stored messages are reset.
|
|
func (d *deDupedAnnouncements) Emit() msgsToBroadcast {
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
|
|
// Get the total number of announcements.
|
|
numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
|
|
len(d.nodeAnnouncements)
|
|
|
|
// Create an empty array of lnwire.Messages with a length equal to
|
|
// the total number of announcements.
|
|
msgs := msgsToBroadcast{
|
|
localMsgs: make([]msgWithSenders, 0, numAnnouncements),
|
|
remoteMsgs: make([]msgWithSenders, 0, numAnnouncements),
|
|
}
|
|
|
|
// Add the channel announcements to the array first.
|
|
for _, message := range d.channelAnnouncements {
|
|
msgs.addMsg(message)
|
|
}
|
|
|
|
// Then add the channel updates.
|
|
for _, message := range d.channelUpdates {
|
|
msgs.addMsg(message)
|
|
}
|
|
|
|
// Finally add the node announcements.
|
|
for _, message := range d.nodeAnnouncements {
|
|
msgs.addMsg(message)
|
|
}
|
|
|
|
d.reset()
|
|
|
|
// Return the array of lnwire.messages.
|
|
return msgs
|
|
}
|
|
|
|
// calculateSubBatchSize is a helper function that calculates the size to break
|
|
// down the batchSize into.
|
|
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
|
|
minimumBatchSize, batchSize int) int {
|
|
if subBatchDelay > totalDelay {
|
|
return batchSize
|
|
}
|
|
|
|
subBatchSize := (batchSize*int(subBatchDelay) +
|
|
int(totalDelay) - 1) / int(totalDelay)
|
|
|
|
if subBatchSize < minimumBatchSize {
|
|
return minimumBatchSize
|
|
}
|
|
|
|
return subBatchSize
|
|
}
|
|
|
|
// batchSizeCalculator maps to the function `calculateSubBatchSize`. We create
|
|
// this variable so the function can be mocked in our test.
|
|
var batchSizeCalculator = calculateSubBatchSize
|
|
|
|
// splitAnnouncementBatches takes an exiting list of announcements and
|
|
// decomposes it into sub batches controlled by the `subBatchSize`.
|
|
func (d *AuthenticatedGossiper) splitAnnouncementBatches(
|
|
announcementBatch []msgWithSenders) [][]msgWithSenders {
|
|
|
|
subBatchSize := batchSizeCalculator(
|
|
d.cfg.TrickleDelay, d.cfg.SubBatchDelay,
|
|
d.cfg.MinimumBatchSize, len(announcementBatch),
|
|
)
|
|
|
|
var splitAnnouncementBatch [][]msgWithSenders
|
|
|
|
for subBatchSize < len(announcementBatch) {
|
|
// For slicing with minimal allocation
|
|
// https://github.com/golang/go/wiki/SliceTricks
|
|
announcementBatch, splitAnnouncementBatch =
|
|
announcementBatch[subBatchSize:],
|
|
append(splitAnnouncementBatch,
|
|
announcementBatch[0:subBatchSize:subBatchSize])
|
|
}
|
|
splitAnnouncementBatch = append(
|
|
splitAnnouncementBatch, announcementBatch,
|
|
)
|
|
|
|
return splitAnnouncementBatch
|
|
}
|
|
|
|
// splitAndSendAnnBatch takes a batch of messages, computes the proper batch
|
|
// split size, and then sends out all items to the set of target peers. Locally
|
|
// generated announcements are always sent before remotely generated
|
|
// announcements.
|
|
func (d *AuthenticatedGossiper) splitAndSendAnnBatch(
|
|
annBatch msgsToBroadcast) {
|
|
|
|
// delayNextBatch is a helper closure that blocks for `SubBatchDelay`
|
|
// duration to delay the sending of next announcement batch.
|
|
delayNextBatch := func() {
|
|
select {
|
|
case <-time.After(d.cfg.SubBatchDelay):
|
|
case <-d.quit:
|
|
return
|
|
}
|
|
}
|
|
|
|
// Fetch the local and remote announcements.
|
|
localBatches := d.splitAnnouncementBatches(annBatch.localMsgs)
|
|
remoteBatches := d.splitAnnouncementBatches(annBatch.remoteMsgs)
|
|
|
|
d.wg.Add(1)
|
|
go func() {
|
|
defer d.wg.Done()
|
|
|
|
log.Debugf("Broadcasting %v new local announcements in %d "+
|
|
"sub batches", len(annBatch.localMsgs),
|
|
len(localBatches))
|
|
|
|
// Send out the local announcements first.
|
|
for _, annBatch := range localBatches {
|
|
d.sendLocalBatch(annBatch)
|
|
delayNextBatch()
|
|
}
|
|
|
|
log.Debugf("Broadcasting %v new remote announcements in %d "+
|
|
"sub batches", len(annBatch.remoteMsgs),
|
|
len(remoteBatches))
|
|
|
|
// Now send the remote announcements.
|
|
for _, annBatch := range remoteBatches {
|
|
d.sendRemoteBatch(annBatch)
|
|
delayNextBatch()
|
|
}
|
|
}()
|
|
}
|
|
|
|
// sendLocalBatch broadcasts a list of locally generated announcements to our
|
|
// peers. For local announcements, we skip the filter and dedup logic and just
|
|
// send the announcements out to all our coonnected peers.
|
|
func (d *AuthenticatedGossiper) sendLocalBatch(annBatch []msgWithSenders) {
|
|
msgsToSend := lnutils.Map(
|
|
annBatch, func(m msgWithSenders) lnwire.Message {
|
|
return m.msg
|
|
},
|
|
)
|
|
|
|
err := d.cfg.Broadcast(nil, msgsToSend...)
|
|
if err != nil {
|
|
log.Errorf("Unable to send local batch announcements: %v", err)
|
|
}
|
|
}
|
|
|
|
// sendRemoteBatch broadcasts a list of remotely generated announcements to our
|
|
// peers.
|
|
func (d *AuthenticatedGossiper) sendRemoteBatch(annBatch []msgWithSenders) {
|
|
syncerPeers := d.syncMgr.GossipSyncers()
|
|
|
|
// We'll first attempt to filter out this new message for all peers
|
|
// that have active gossip syncers active.
|
|
for pub, syncer := range syncerPeers {
|
|
log.Tracef("Sending messages batch to GossipSyncer(%s)", pub)
|
|
syncer.FilterGossipMsgs(annBatch...)
|
|
}
|
|
|
|
for _, msgChunk := range annBatch {
|
|
msgChunk := msgChunk
|
|
|
|
// With the syncers taken care of, we'll merge the sender map
|
|
// with the set of syncers, so we don't send out duplicate
|
|
// messages.
|
|
msgChunk.mergeSyncerMap(syncerPeers)
|
|
|
|
err := d.cfg.Broadcast(msgChunk.senders, msgChunk.msg)
|
|
if err != nil {
|
|
log.Errorf("Unable to send batch "+
|
|
"announcements: %v", err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
// networkHandler is the primary goroutine that drives this service. The roles
|
|
// of this goroutine includes answering queries related to the state of the
|
|
// network, syncing up newly connected peers, and also periodically
|
|
// broadcasting our latest topology state to all connected peers.
|
|
//
|
|
// NOTE: This MUST be run as a goroutine.
|
|
func (d *AuthenticatedGossiper) networkHandler() {
|
|
defer d.wg.Done()
|
|
|
|
// Initialize empty deDupedAnnouncements to store announcement batch.
|
|
announcements := deDupedAnnouncements{}
|
|
announcements.Reset()
|
|
|
|
d.cfg.RetransmitTicker.Resume()
|
|
defer d.cfg.RetransmitTicker.Stop()
|
|
|
|
trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
|
|
defer trickleTimer.Stop()
|
|
|
|
// To start, we'll first check to see if there are any stale channel or
|
|
// node announcements that we need to re-transmit.
|
|
if err := d.retransmitStaleAnns(time.Now()); err != nil {
|
|
log.Errorf("Unable to rebroadcast stale announcements: %v", err)
|
|
}
|
|
|
|
// We'll use this validation to ensure that we process jobs in their
|
|
// dependency order during parallel validation.
|
|
validationBarrier := routing.NewValidationBarrier(1000, d.quit)
|
|
|
|
for {
|
|
select {
|
|
// A new policy update has arrived. We'll commit it to the
|
|
// sub-systems below us, then craft, sign, and broadcast a new
|
|
// ChannelUpdate for the set of affected clients.
|
|
case policyUpdate := <-d.chanPolicyUpdates:
|
|
log.Tracef("Received channel %d policy update requests",
|
|
len(policyUpdate.edgesToUpdate))
|
|
|
|
// First, we'll now create new fully signed updates for
|
|
// the affected channels and also update the underlying
|
|
// graph with the new state.
|
|
newChanUpdates, err := d.processChanPolicyUpdate(
|
|
policyUpdate.edgesToUpdate,
|
|
)
|
|
policyUpdate.errChan <- err
|
|
if err != nil {
|
|
log.Errorf("Unable to craft policy updates: %v",
|
|
err)
|
|
continue
|
|
}
|
|
|
|
// Finally, with the updates committed, we'll now add
|
|
// them to the announcement batch to be flushed at the
|
|
// start of the next epoch.
|
|
announcements.AddMsgs(newChanUpdates...)
|
|
|
|
case announcement := <-d.networkMsgs:
|
|
log.Tracef("Received network message: "+
|
|
"peer=%v, msg=%s, is_remote=%v",
|
|
announcement.peer, announcement.msg.MsgType(),
|
|
announcement.isRemote)
|
|
|
|
switch announcement.msg.(type) {
|
|
// Channel announcement signatures are amongst the only
|
|
// messages that we'll process serially.
|
|
case *lnwire.AnnounceSignatures:
|
|
emittedAnnouncements, _ := d.processNetworkAnnouncement(
|
|
announcement,
|
|
)
|
|
log.Debugf("Processed network message %s, "+
|
|
"returned len(announcements)=%v",
|
|
announcement.msg.MsgType(),
|
|
len(emittedAnnouncements))
|
|
|
|
if emittedAnnouncements != nil {
|
|
announcements.AddMsgs(
|
|
emittedAnnouncements...,
|
|
)
|
|
}
|
|
continue
|
|
}
|
|
|
|
// If this message was recently rejected, then we won't
|
|
// attempt to re-process it.
|
|
if announcement.isRemote && d.isRecentlyRejectedMsg(
|
|
announcement.msg,
|
|
sourceToPub(announcement.source),
|
|
) {
|
|
|
|
announcement.err <- fmt.Errorf("recently " +
|
|
"rejected")
|
|
continue
|
|
}
|
|
|
|
// We'll set up any dependent, and wait until a free
|
|
// slot for this job opens up, this allow us to not
|
|
// have thousands of goroutines active.
|
|
validationBarrier.InitJobDependencies(announcement.msg)
|
|
|
|
d.wg.Add(1)
|
|
go d.handleNetworkMessages(
|
|
announcement, &announcements, validationBarrier,
|
|
)
|
|
|
|
// The trickle timer has ticked, which indicates we should
|
|
// flush to the network the pending batch of new announcements
|
|
// we've received since the last trickle tick.
|
|
case <-trickleTimer.C:
|
|
// Emit the current batch of announcements from
|
|
// deDupedAnnouncements.
|
|
announcementBatch := announcements.Emit()
|
|
|
|
// If the current announcements batch is nil, then we
|
|
// have no further work here.
|
|
if announcementBatch.isEmpty() {
|
|
continue
|
|
}
|
|
|
|
// At this point, we have the set of local and remote
|
|
// announcements we want to send out. We'll do the
|
|
// batching as normal for both, but for local
|
|
// announcements, we'll blast them out w/o regard for
|
|
// our peer's policies so we ensure they propagate
|
|
// properly.
|
|
d.splitAndSendAnnBatch(announcementBatch)
|
|
|
|
// The retransmission timer has ticked which indicates that we
|
|
// should check if we need to prune or re-broadcast any of our
|
|
// personal channels or node announcement. This addresses the
|
|
// case of "zombie" channels and channel advertisements that
|
|
// have been dropped, or not properly propagated through the
|
|
// network.
|
|
case tick := <-d.cfg.RetransmitTicker.Ticks():
|
|
if err := d.retransmitStaleAnns(tick); err != nil {
|
|
log.Errorf("unable to rebroadcast stale "+
|
|
"announcements: %v", err)
|
|
}
|
|
|
|
// The gossiper has been signalled to exit, to we exit our
|
|
// main loop so the wait group can be decremented.
|
|
case <-d.quit:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleNetworkMessages is responsible for waiting for dependencies for a
|
|
// given network message and processing the message. Once processed, it will
|
|
// signal its dependants and add the new announcements to the announce batch.
|
|
//
|
|
// NOTE: must be run as a goroutine.
|
|
func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg,
|
|
deDuped *deDupedAnnouncements, vb *routing.ValidationBarrier) {
|
|
|
|
defer d.wg.Done()
|
|
defer vb.CompleteJob()
|
|
|
|
// We should only broadcast this message forward if it originated from
|
|
// us or it wasn't received as part of our initial historical sync.
|
|
shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()
|
|
|
|
// If this message has an existing dependency, then we'll wait until
|
|
// that has been fully validated before we proceed.
|
|
err := vb.WaitForDependants(nMsg.msg)
|
|
if err != nil {
|
|
log.Debugf("Validating network message %s got err: %v",
|
|
nMsg.msg.MsgType(), err)
|
|
|
|
if !routing.IsError(
|
|
err,
|
|
routing.ErrVBarrierShuttingDown,
|
|
routing.ErrParentValidationFailed,
|
|
) {
|
|
|
|
log.Warnf("unexpected error during validation "+
|
|
"barrier shutdown: %v", err)
|
|
}
|
|
nMsg.err <- err
|
|
|
|
return
|
|
}
|
|
|
|
// Process the network announcement to determine if this is either a
|
|
// new announcement from our PoV or an edges to a prior vertex/edge we
|
|
// previously proceeded.
|
|
newAnns, allow := d.processNetworkAnnouncement(nMsg)
|
|
|
|
log.Tracef("Processed network message %s, returned "+
|
|
"len(announcements)=%v, allowDependents=%v",
|
|
nMsg.msg.MsgType(), len(newAnns), allow)
|
|
|
|
// If this message had any dependencies, then we can now signal them to
|
|
// continue.
|
|
vb.SignalDependants(nMsg.msg, allow)
|
|
|
|
// If the announcement was accepted, then add the emitted announcements
|
|
// to our announce batch to be broadcast once the trickle timer ticks
|
|
// gain.
|
|
if newAnns != nil && shouldBroadcast {
|
|
// TODO(roasbeef): exclude peer that sent.
|
|
deDuped.AddMsgs(newAnns...)
|
|
} else if newAnns != nil {
|
|
log.Trace("Skipping broadcast of announcements received " +
|
|
"during initial graph sync")
|
|
}
|
|
}
|
|
|
|
// TODO(roasbeef): d/c peers that send updates not on our chain
|
|
|
|
// InitSyncState is called by outside sub-systems when a connection is
|
|
// established to a new peer that understands how to perform channel range
|
|
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
|
|
// needed to handle new queries.
|
|
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
|
|
d.syncMgr.InitSyncState(syncPeer)
|
|
}
|
|
|
|
// PruneSyncState is called by outside sub-systems once a peer that we were
|
|
// previously connected to has been disconnected. In this case we can stop the
|
|
// existing GossipSyncer assigned to the peer and free up resources.
|
|
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
|
|
d.syncMgr.PruneSyncState(peer)
|
|
}
|
|
|
|
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
|
|
// false otherwise, This avoids expensive reprocessing of the message.
|
|
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message,
|
|
peerPub [33]byte) bool {
|
|
|
|
var scid uint64
|
|
switch m := msg.(type) {
|
|
case *lnwire.ChannelUpdate:
|
|
scid = m.ShortChannelID.ToUint64()
|
|
|
|
case *lnwire.ChannelAnnouncement:
|
|
scid = m.ShortChannelID.ToUint64()
|
|
|
|
default:
|
|
return false
|
|
}
|
|
|
|
_, err := d.recentRejects.Get(newRejectCacheKey(scid, peerPub))
|
|
return err != cache.ErrElementNotFound
|
|
}
|
|
|
|
// retransmitStaleAnns examines all outgoing channels that the source node is
|
|
// known to maintain to check to see if any of them are "stale". A channel is
|
|
// stale iff, the last timestamp of its rebroadcast is older than the
|
|
// RebroadcastInterval. We also check if a refreshed node announcement should
|
|
// be resent.
|
|
func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
|
|
// Iterate over all of our channels and check if any of them fall
|
|
// within the prune interval or re-broadcast interval.
|
|
type updateTuple struct {
|
|
info *channeldb.ChannelEdgeInfo
|
|
edge *channeldb.ChannelEdgePolicy
|
|
}
|
|
|
|
var (
|
|
havePublicChannels bool
|
|
edgesToUpdate []updateTuple
|
|
)
|
|
err := d.cfg.Router.ForAllOutgoingChannels(func(
|
|
_ kvdb.RTx,
|
|
info *channeldb.ChannelEdgeInfo,
|
|
edge *channeldb.ChannelEdgePolicy) error {
|
|
|
|
// If there's no auth proof attached to this edge, it means
|
|
// that it is a private channel not meant to be announced to
|
|
// the greater network, so avoid sending channel updates for
|
|
// this channel to not leak its
|
|
// existence.
|
|
if info.AuthProof == nil {
|
|
log.Debugf("Skipping retransmission of channel "+
|
|
"without AuthProof: %v", info.ChannelID)
|
|
return nil
|
|
}
|
|
|
|
// We make a note that we have at least one public channel. We
|
|
// use this to determine whether we should send a node
|
|
// announcement below.
|
|
havePublicChannels = true
|
|
|
|
// If this edge has a ChannelUpdate that was created before the
|
|
// introduction of the MaxHTLC field, then we'll update this
|
|
// edge to propagate this information in the network.
|
|
if !edge.MessageFlags.HasMaxHtlc() {
|
|
// We'll make sure we support the new max_htlc field if
|
|
// not already present.
|
|
edge.MessageFlags |= lnwire.ChanUpdateRequiredMaxHtlc
|
|
edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
|
|
|
|
edgesToUpdate = append(edgesToUpdate, updateTuple{
|
|
info: info,
|
|
edge: edge,
|
|
})
|
|
return nil
|
|
}
|
|
|
|
timeElapsed := now.Sub(edge.LastUpdate)
|
|
|
|
// If it's been longer than RebroadcastInterval since we've
|
|
// re-broadcasted the channel, add the channel to the set of
|
|
// edges we need to update.
|
|
if timeElapsed >= d.cfg.RebroadcastInterval {
|
|
edgesToUpdate = append(edgesToUpdate, updateTuple{
|
|
info: info,
|
|
edge: edge,
|
|
})
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil && err != channeldb.ErrGraphNoEdgesFound {
|
|
return fmt.Errorf("unable to retrieve outgoing channels: %v",
|
|
err)
|
|
}
|
|
|
|
var signedUpdates []lnwire.Message
|
|
for _, chanToUpdate := range edgesToUpdate {
|
|
// Re-sign and update the channel on disk and retrieve our
|
|
// ChannelUpdate to broadcast.
|
|
chanAnn, chanUpdate, err := d.updateChannel(
|
|
chanToUpdate.info, chanToUpdate.edge,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to update channel: %v", err)
|
|
}
|
|
|
|
// If we have a valid announcement to transmit, then we'll send
|
|
// that along with the update.
|
|
if chanAnn != nil {
|
|
signedUpdates = append(signedUpdates, chanAnn)
|
|
}
|
|
|
|
signedUpdates = append(signedUpdates, chanUpdate)
|
|
}
|
|
|
|
// If we don't have any public channels, we return as we don't want to
|
|
// broadcast anything that would reveal our existence.
|
|
if !havePublicChannels {
|
|
return nil
|
|
}
|
|
|
|
// We'll also check that our NodeAnnouncement is not too old.
|
|
currentNodeAnn, err := d.cfg.SelfNodeAnnouncement(false)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to get current node announment: %v",
|
|
err)
|
|
}
|
|
|
|
timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
|
|
timeElapsed := now.Sub(timestamp)
|
|
|
|
// If it's been a full day since we've re-broadcasted the
|
|
// node announcement, refresh it and resend it.
|
|
nodeAnnStr := ""
|
|
if timeElapsed >= d.cfg.RebroadcastInterval {
|
|
newNodeAnn, err := d.cfg.SelfNodeAnnouncement(true)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to get refreshed node "+
|
|
"announcement: %v", err)
|
|
}
|
|
|
|
signedUpdates = append(signedUpdates, &newNodeAnn)
|
|
nodeAnnStr = " and our refreshed node announcement"
|
|
|
|
// Before broadcasting the refreshed node announcement, add it
|
|
// to our own graph.
|
|
if err := d.addNode(&newNodeAnn); err != nil {
|
|
log.Errorf("Unable to add refreshed node announcement "+
|
|
"to graph: %v", err)
|
|
}
|
|
}
|
|
|
|
// If we don't have any updates to re-broadcast, then we'll exit
|
|
// early.
|
|
if len(signedUpdates) == 0 {
|
|
return nil
|
|
}
|
|
|
|
log.Infof("Retransmitting %v outgoing channels%v",
|
|
len(edgesToUpdate), nodeAnnStr)
|
|
|
|
// With all the wire announcements properly crafted, we'll broadcast
|
|
// our known outgoing channels to all our immediate peers.
|
|
if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
|
|
return fmt.Errorf("unable to re-broadcast channels: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// processChanPolicyUpdate generates a new set of channel updates for the
|
|
// provided list of edges and updates the backing ChannelGraphSource.
|
|
func (d *AuthenticatedGossiper) processChanPolicyUpdate(
|
|
edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
|
|
|
|
var chanUpdates []networkMsg
|
|
for _, edgeInfo := range edgesToUpdate {
|
|
// Now that we've collected all the channels we need to update,
|
|
// we'll re-sign and update the backing ChannelGraphSource, and
|
|
// retrieve our ChannelUpdate to broadcast.
|
|
_, chanUpdate, err := d.updateChannel(
|
|
edgeInfo.Info, edgeInfo.Edge,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// We'll avoid broadcasting any updates for private channels to
|
|
// avoid directly giving away their existence. Instead, we'll
|
|
// send the update directly to the remote party.
|
|
if edgeInfo.Info.AuthProof == nil {
|
|
// If AuthProof is nil and an alias was found for this
|
|
// ChannelID (meaning the option-scid-alias feature was
|
|
// negotiated), we'll replace the ShortChannelID in the
|
|
// update with the peer's alias. We do this after
|
|
// updateChannel so that the alias isn't persisted to
|
|
// the database.
|
|
op := &edgeInfo.Info.ChannelPoint
|
|
chanID := lnwire.NewChanIDFromOutPoint(op)
|
|
|
|
var defaultAlias lnwire.ShortChannelID
|
|
foundAlias, _ := d.cfg.GetAlias(chanID)
|
|
if foundAlias != defaultAlias {
|
|
chanUpdate.ShortChannelID = foundAlias
|
|
|
|
sig, err := d.cfg.SignAliasUpdate(chanUpdate)
|
|
if err != nil {
|
|
log.Errorf("Unable to sign alias "+
|
|
"update: %v", err)
|
|
continue
|
|
}
|
|
|
|
lnSig, err := lnwire.NewSigFromSignature(sig)
|
|
if err != nil {
|
|
log.Errorf("Unable to create sig: %v",
|
|
err)
|
|
continue
|
|
}
|
|
|
|
chanUpdate.Signature = lnSig
|
|
}
|
|
|
|
remotePubKey := remotePubFromChanInfo(
|
|
edgeInfo.Info, chanUpdate.ChannelFlags,
|
|
)
|
|
err := d.reliableSender.sendMessage(
|
|
chanUpdate, remotePubKey,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("Unable to reliably send %v for "+
|
|
"channel=%v to peer=%x: %v",
|
|
chanUpdate.MsgType(),
|
|
chanUpdate.ShortChannelID,
|
|
remotePubKey, err)
|
|
}
|
|
continue
|
|
}
|
|
|
|
// We set ourselves as the source of this message to indicate
|
|
// that we shouldn't skip any peers when sending this message.
|
|
chanUpdates = append(chanUpdates, networkMsg{
|
|
source: d.selfKey,
|
|
isRemote: false,
|
|
msg: chanUpdate,
|
|
})
|
|
}
|
|
|
|
return chanUpdates, nil
|
|
}
|
|
|
|
// remotePubFromChanInfo returns the public key of the remote peer given a
|
|
// ChannelEdgeInfo that describe a channel we have with them.
|
|
func remotePubFromChanInfo(chanInfo *channeldb.ChannelEdgeInfo,
|
|
chanFlags lnwire.ChanUpdateChanFlags) [33]byte {
|
|
|
|
var remotePubKey [33]byte
|
|
switch {
|
|
case chanFlags&lnwire.ChanUpdateDirection == 0:
|
|
remotePubKey = chanInfo.NodeKey2Bytes
|
|
case chanFlags&lnwire.ChanUpdateDirection == 1:
|
|
remotePubKey = chanInfo.NodeKey1Bytes
|
|
}
|
|
|
|
return remotePubKey
|
|
}
|
|
|
|
// processRejectedEdge examines a rejected edge to see if we can extract any
|
|
// new announcements from it. An edge will get rejected if we already added
|
|
// the same edge without AuthProof to the graph. If the received announcement
|
|
// contains a proof, we can add this proof to our edge. We can end up in this
|
|
// situation in the case where we create a channel, but for some reason fail
|
|
// to receive the remote peer's proof, while the remote peer is able to fully
|
|
// assemble the proof and craft the ChannelAnnouncement.
|
|
func (d *AuthenticatedGossiper) processRejectedEdge(
|
|
chanAnnMsg *lnwire.ChannelAnnouncement,
|
|
proof *channeldb.ChannelAuthProof) ([]networkMsg, error) {
|
|
|
|
// First, we'll fetch the state of the channel as we know if from the
|
|
// database.
|
|
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(
|
|
chanAnnMsg.ShortChannelID,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// The edge is in the graph, and has a proof attached, then we'll just
|
|
// reject it as normal.
|
|
if chanInfo.AuthProof != nil {
|
|
return nil, nil
|
|
}
|
|
|
|
// Otherwise, this means that the edge is within the graph, but it
|
|
// doesn't yet have a proper proof attached. If we did not receive
|
|
// the proof such that we now can add it, there's nothing more we
|
|
// can do.
|
|
if proof == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
// We'll then create then validate the new fully assembled
|
|
// announcement.
|
|
chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
|
|
proof, chanInfo, e1, e2,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = routing.ValidateChannelAnn(chanAnn)
|
|
if err != nil {
|
|
err := fmt.Errorf("assembled channel announcement proof "+
|
|
"for shortChanID=%v isn't valid: %v",
|
|
chanAnnMsg.ShortChannelID, err)
|
|
log.Error(err)
|
|
return nil, err
|
|
}
|
|
|
|
// If everything checks out, then we'll add the fully assembled proof
|
|
// to the database.
|
|
err = d.cfg.Router.AddProof(chanAnnMsg.ShortChannelID, proof)
|
|
if err != nil {
|
|
err := fmt.Errorf("unable add proof to shortChanID=%v: %v",
|
|
chanAnnMsg.ShortChannelID, err)
|
|
log.Error(err)
|
|
return nil, err
|
|
}
|
|
|
|
// As we now have a complete channel announcement for this channel,
|
|
// we'll construct the announcement so they can be broadcast out to all
|
|
// our peers.
|
|
announcements := make([]networkMsg, 0, 3)
|
|
announcements = append(announcements, networkMsg{
|
|
source: d.selfKey,
|
|
msg: chanAnn,
|
|
})
|
|
if e1Ann != nil {
|
|
announcements = append(announcements, networkMsg{
|
|
source: d.selfKey,
|
|
msg: e1Ann,
|
|
})
|
|
}
|
|
if e2Ann != nil {
|
|
announcements = append(announcements, networkMsg{
|
|
source: d.selfKey,
|
|
msg: e2Ann,
|
|
})
|
|
|
|
}
|
|
|
|
return announcements, nil
|
|
}
|
|
|
|
// addNode processes the given node announcement, and adds it to our channel
|
|
// graph.
|
|
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement,
|
|
op ...batch.SchedulerOption) error {
|
|
|
|
if err := routing.ValidateNodeAnn(msg); err != nil {
|
|
return fmt.Errorf("unable to validate node announcement: %v",
|
|
err)
|
|
}
|
|
|
|
timestamp := time.Unix(int64(msg.Timestamp), 0)
|
|
features := lnwire.NewFeatureVector(msg.Features, lnwire.Features)
|
|
node := &channeldb.LightningNode{
|
|
HaveNodeAnnouncement: true,
|
|
LastUpdate: timestamp,
|
|
Addresses: msg.Addresses,
|
|
PubKeyBytes: msg.NodeID,
|
|
Alias: msg.Alias.String(),
|
|
AuthSigBytes: msg.Signature.ToSignatureBytes(),
|
|
Features: features,
|
|
Color: msg.RGBColor,
|
|
ExtraOpaqueData: msg.ExtraOpaqueData,
|
|
}
|
|
|
|
return d.cfg.Router.AddNode(node, op...)
|
|
}
|
|
|
|
// isPremature decides whether a given network message has a block height+delta
|
|
// value specified in the future. If so, the message will be added to the
|
|
// future message map and be processed when the block height as reached.
|
|
//
|
|
// NOTE: must be used inside a lock.
|
|
func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
|
|
delta uint32, msg *networkMsg) bool {
|
|
// TODO(roasbeef) make height delta 6
|
|
// * or configurable
|
|
|
|
msgHeight := chanID.BlockHeight + delta
|
|
|
|
// The message height is smaller or equal to our best known height,
|
|
// thus the message is mature.
|
|
if msgHeight <= d.bestHeight {
|
|
return false
|
|
}
|
|
|
|
// Add the premature message to our future messages which will be
|
|
// resent once the block height has reached.
|
|
//
|
|
// Copy the networkMsgs since the old message's err chan will be
|
|
// consumed.
|
|
copied := &networkMsg{
|
|
peer: msg.peer,
|
|
source: msg.source,
|
|
msg: msg.msg,
|
|
optionalMsgFields: msg.optionalMsgFields,
|
|
isRemote: msg.isRemote,
|
|
err: make(chan error, 1),
|
|
}
|
|
|
|
// Create the cached message.
|
|
cachedMsg := &cachedFutureMsg{
|
|
msg: copied,
|
|
height: msgHeight,
|
|
}
|
|
|
|
// Increment the msg ID and add it to the cache.
|
|
nextMsgID := d.futureMsgs.nextMsgID()
|
|
_, err := d.futureMsgs.Put(nextMsgID, cachedMsg)
|
|
if err != nil {
|
|
log.Errorf("Adding future message got error: %v", err)
|
|
}
|
|
|
|
log.Debugf("Network message: %v added to future messages for "+
|
|
"msgHeight=%d, bestHeight=%d", msg.msg.MsgType(),
|
|
msgHeight, d.bestHeight)
|
|
|
|
return true
|
|
}
|
|
|
|
// processNetworkAnnouncement processes a new network relate authenticated
|
|
// channel or node announcement or announcements proofs. If the announcement
|
|
// didn't affect the internal state due to either being out of date, invalid,
|
|
// or redundant, then nil is returned. Otherwise, the set of announcements will
|
|
// be returned which should be broadcasted to the rest of the network. The
|
|
// boolean returned indicates whether any dependents of the announcement should
|
|
// attempt to be processed as well.
|
|
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
|
nMsg *networkMsg) ([]networkMsg, bool) {
|
|
|
|
// If this is a remote update, we set the scheduler option to lazily
|
|
// add it to the graph.
|
|
var schedulerOp []batch.SchedulerOption
|
|
if nMsg.isRemote {
|
|
schedulerOp = append(schedulerOp, batch.LazyAdd())
|
|
}
|
|
|
|
switch msg := nMsg.msg.(type) {
|
|
// A new node announcement has arrived which either presents new
|
|
// information about a node in one of the channels we know about, or a
|
|
// updating previously advertised information.
|
|
case *lnwire.NodeAnnouncement:
|
|
return d.handleNodeAnnouncement(nMsg, msg, schedulerOp)
|
|
|
|
// A new channel announcement has arrived, this indicates the
|
|
// *creation* of a new channel within the network. This only advertises
|
|
// the existence of a channel and not yet the routing policies in
|
|
// either direction of the channel.
|
|
case *lnwire.ChannelAnnouncement:
|
|
return d.handleChanAnnouncement(nMsg, msg, schedulerOp)
|
|
|
|
// A new authenticated channel edge update has arrived. This indicates
|
|
// that the directional information for an already known channel has
|
|
// been updated.
|
|
case *lnwire.ChannelUpdate:
|
|
return d.handleChanUpdate(nMsg, msg, schedulerOp)
|
|
|
|
// A new signature announcement has been received. This indicates
|
|
// willingness of nodes involved in the funding of a channel to
|
|
// announce this new channel to the rest of the world.
|
|
case *lnwire.AnnounceSignatures:
|
|
return d.handleAnnSig(nMsg, msg)
|
|
|
|
default:
|
|
err := errors.New("wrong type of the announcement")
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
}
|
|
|
|
// processZombieUpdate determines whether the provided channel update should
|
|
// resurrect a given zombie edge.
|
|
func (d *AuthenticatedGossiper) processZombieUpdate(
|
|
chanInfo *channeldb.ChannelEdgeInfo, msg *lnwire.ChannelUpdate) error {
|
|
|
|
// The least-significant bit in the flag on the channel update tells us
|
|
// which edge is being updated.
|
|
isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0
|
|
|
|
// Since we've deemed the update as not stale above, before marking it
|
|
// live, we'll make sure it has been signed by the correct party. If we
|
|
// have both pubkeys, either party can resurect the channel. If we've
|
|
// already marked this with the stricter, single-sided resurrection we
|
|
// will only have the pubkey of the node with the oldest timestamp.
|
|
var pubKey *btcec.PublicKey
|
|
switch {
|
|
case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey:
|
|
pubKey, _ = chanInfo.NodeKey1()
|
|
case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey:
|
|
pubKey, _ = chanInfo.NodeKey2()
|
|
}
|
|
if pubKey == nil {
|
|
return fmt.Errorf("incorrect pubkey to resurrect zombie "+
|
|
"with chan_id=%v", msg.ShortChannelID)
|
|
}
|
|
|
|
err := routing.VerifyChannelUpdateSignature(msg, pubKey)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to verify channel "+
|
|
"update signature: %v", err)
|
|
}
|
|
|
|
// With the signature valid, we'll proceed to mark the
|
|
// edge as live and wait for the channel announcement to
|
|
// come through again.
|
|
baseScid := lnwire.NewShortChanIDFromInt(chanInfo.ChannelID)
|
|
err = d.cfg.Router.MarkEdgeLive(baseScid)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to remove edge with "+
|
|
"chan_id=%v from zombie index: %v",
|
|
msg.ShortChannelID, err)
|
|
}
|
|
|
|
log.Debugf("Removed edge with chan_id=%v from zombie "+
|
|
"index", msg.ShortChannelID)
|
|
|
|
return nil
|
|
}
|
|
|
|
// fetchNodeAnn fetches the latest signed node announcement from our point of
|
|
// view for the node with the given public key.
|
|
func (d *AuthenticatedGossiper) fetchNodeAnn(
|
|
pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
|
|
|
|
node, err := d.cfg.Router.FetchLightningNode(pubKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return node.NodeAnnouncement(true)
|
|
}
|
|
|
|
// isMsgStale determines whether a message retrieved from the backing
|
|
// MessageStore is seen as stale by the current graph.
|
|
func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
|
|
switch msg := msg.(type) {
|
|
case *lnwire.AnnounceSignatures:
|
|
chanInfo, _, _, err := d.cfg.Router.GetChannelByID(
|
|
msg.ShortChannelID,
|
|
)
|
|
|
|
// If the channel cannot be found, it is most likely a leftover
|
|
// message for a channel that was closed, so we can consider it
|
|
// stale.
|
|
if err == channeldb.ErrEdgeNotFound {
|
|
return true
|
|
}
|
|
if err != nil {
|
|
log.Debugf("Unable to retrieve channel=%v from graph: "+
|
|
"%v", chanInfo.ChannelID, err)
|
|
return false
|
|
}
|
|
|
|
// If the proof exists in the graph, then we have successfully
|
|
// received the remote proof and assembled the full proof, so we
|
|
// can safely delete the local proof from the database.
|
|
return chanInfo.AuthProof != nil
|
|
|
|
case *lnwire.ChannelUpdate:
|
|
_, p1, p2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
|
|
|
|
// If the channel cannot be found, it is most likely a leftover
|
|
// message for a channel that was closed, so we can consider it
|
|
// stale.
|
|
if err == channeldb.ErrEdgeNotFound {
|
|
return true
|
|
}
|
|
if err != nil {
|
|
log.Debugf("Unable to retrieve channel=%v from graph: "+
|
|
"%v", msg.ShortChannelID, err)
|
|
return false
|
|
}
|
|
|
|
// Otherwise, we'll retrieve the correct policy that we
|
|
// currently have stored within our graph to check if this
|
|
// message is stale by comparing its timestamp.
|
|
var p *channeldb.ChannelEdgePolicy
|
|
if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
|
|
p = p1
|
|
} else {
|
|
p = p2
|
|
}
|
|
|
|
// If the policy is still unknown, then we can consider this
|
|
// policy fresh.
|
|
if p == nil {
|
|
return false
|
|
}
|
|
|
|
timestamp := time.Unix(int64(msg.Timestamp), 0)
|
|
return p.LastUpdate.After(timestamp)
|
|
|
|
default:
|
|
// We'll make sure to not mark any unsupported messages as stale
|
|
// to ensure they are not removed.
|
|
return false
|
|
}
|
|
}
|
|
|
|
// updateChannel creates a new fully signed update for the channel, and updates
|
|
// the underlying graph with the new state.
|
|
func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
|
|
edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement,
|
|
*lnwire.ChannelUpdate, error) {
|
|
|
|
// Parse the unsigned edge into a channel update.
|
|
chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge)
|
|
|
|
// We'll generate a new signature over a digest of the channel
|
|
// announcement itself and update the timestamp to ensure it propagate.
|
|
err := netann.SignChannelUpdate(
|
|
d.cfg.AnnSigner, d.selfKeyLoc, chanUpdate,
|
|
netann.ChanUpdSetTimestamp,
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Next, we'll set the new signature in place, and update the reference
|
|
// in the backing slice.
|
|
edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0)
|
|
edge.SigBytes = chanUpdate.Signature.ToSignatureBytes()
|
|
|
|
// To ensure that our signature is valid, we'll verify it ourself
|
|
// before committing it to the slice returned.
|
|
err = routing.ValidateChannelUpdateAnn(d.selfKey, info.Capacity, chanUpdate)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("generated invalid channel "+
|
|
"update sig: %v", err)
|
|
}
|
|
|
|
// Finally, we'll write the new edge policy to disk.
|
|
if err := d.cfg.Router.UpdateEdge(edge); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// We'll also create the original channel announcement so the two can
|
|
// be broadcast along side each other (if necessary), but only if we
|
|
// have a full channel announcement for this channel.
|
|
var chanAnn *lnwire.ChannelAnnouncement
|
|
if info.AuthProof != nil {
|
|
chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
|
|
chanAnn = &lnwire.ChannelAnnouncement{
|
|
ShortChannelID: chanID,
|
|
NodeID1: info.NodeKey1Bytes,
|
|
NodeID2: info.NodeKey2Bytes,
|
|
ChainHash: info.ChainHash,
|
|
BitcoinKey1: info.BitcoinKey1Bytes,
|
|
Features: lnwire.NewRawFeatureVector(),
|
|
BitcoinKey2: info.BitcoinKey2Bytes,
|
|
ExtraOpaqueData: edge.ExtraOpaqueData,
|
|
}
|
|
chanAnn.NodeSig1, err = lnwire.NewSigFromRawSignature(
|
|
info.AuthProof.NodeSig1Bytes,
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
chanAnn.NodeSig2, err = lnwire.NewSigFromRawSignature(
|
|
info.AuthProof.NodeSig2Bytes,
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
chanAnn.BitcoinSig1, err = lnwire.NewSigFromRawSignature(
|
|
info.AuthProof.BitcoinSig1Bytes,
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
chanAnn.BitcoinSig2, err = lnwire.NewSigFromRawSignature(
|
|
info.AuthProof.BitcoinSig2Bytes,
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
return chanAnn, chanUpdate, err
|
|
}
|
|
|
|
// SyncManager returns the gossiper's SyncManager instance.
|
|
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
|
|
return d.syncMgr
|
|
}
|
|
|
|
// IsKeepAliveUpdate determines whether this channel update is considered a
|
|
// keep-alive update based on the previous channel update processed for the same
|
|
// direction.
|
|
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate,
|
|
prev *channeldb.ChannelEdgePolicy) bool {
|
|
|
|
// Both updates should be from the same direction.
|
|
if update.ChannelFlags&lnwire.ChanUpdateDirection !=
|
|
prev.ChannelFlags&lnwire.ChanUpdateDirection {
|
|
|
|
return false
|
|
}
|
|
|
|
// The timestamp should always increase for a keep-alive update.
|
|
timestamp := time.Unix(int64(update.Timestamp), 0)
|
|
if !timestamp.After(prev.LastUpdate) {
|
|
return false
|
|
}
|
|
|
|
// None of the remaining fields should change for a keep-alive update.
|
|
if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() {
|
|
return false
|
|
}
|
|
if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat {
|
|
return false
|
|
}
|
|
if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths {
|
|
return false
|
|
}
|
|
if update.TimeLockDelta != prev.TimeLockDelta {
|
|
return false
|
|
}
|
|
if update.HtlcMinimumMsat != prev.MinHTLC {
|
|
return false
|
|
}
|
|
if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() {
|
|
return false
|
|
}
|
|
if update.HtlcMaximumMsat != prev.MaxHTLC {
|
|
return false
|
|
}
|
|
if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
// latestHeight returns the gossiper's latest height known of the chain.
|
|
func (d *AuthenticatedGossiper) latestHeight() uint32 {
|
|
d.Lock()
|
|
defer d.Unlock()
|
|
return d.bestHeight
|
|
}
|
|
|
|
// handleNodeAnnouncement processes a new node announcement.
|
|
func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
|
|
nodeAnn *lnwire.NodeAnnouncement,
|
|
ops []batch.SchedulerOption) ([]networkMsg, bool) {
|
|
|
|
timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)
|
|
|
|
log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
|
|
"node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
|
|
|
|
// We'll quickly ask the router if it already has a newer update for
|
|
// this node so we can skip validating signatures if not required.
|
|
if d.cfg.Router.IsStaleNode(nodeAnn.NodeID, timestamp) {
|
|
log.Debugf("Skipped processing stale node: %x", nodeAnn.NodeID)
|
|
nMsg.err <- nil
|
|
return nil, true
|
|
}
|
|
|
|
if err := d.addNode(nodeAnn, ops...); err != nil {
|
|
log.Debugf("Adding node: %x got error: %v", nodeAnn.NodeID,
|
|
err)
|
|
|
|
if !routing.IsError(
|
|
err,
|
|
routing.ErrOutdated,
|
|
routing.ErrIgnored,
|
|
routing.ErrVBarrierShuttingDown,
|
|
) {
|
|
|
|
log.Error(err)
|
|
}
|
|
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
// In order to ensure we don't leak unadvertised nodes, we'll make a
|
|
// quick check to ensure this node intends to publicly advertise itself
|
|
// to the network.
|
|
isPublic, err := d.cfg.Router.IsPublicNode(nodeAnn.NodeID)
|
|
if err != nil {
|
|
log.Errorf("Unable to determine if node %x is advertised: %v",
|
|
nodeAnn.NodeID, err)
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
var announcements []networkMsg
|
|
|
|
// If it does, we'll add their announcement to our batch so that it can
|
|
// be broadcast to the rest of our peers.
|
|
if isPublic {
|
|
announcements = append(announcements, networkMsg{
|
|
peer: nMsg.peer,
|
|
isRemote: nMsg.isRemote,
|
|
source: nMsg.source,
|
|
msg: nodeAnn,
|
|
})
|
|
} else {
|
|
log.Tracef("Skipping broadcasting node announcement for %x "+
|
|
"due to being unadvertised", nodeAnn.NodeID)
|
|
}
|
|
|
|
nMsg.err <- nil
|
|
// TODO(roasbeef): get rid of the above
|
|
|
|
log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
|
|
"node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
|
|
|
|
return announcements, true
|
|
}
|
|
|
|
// handleChanAnnouncement processes a new channel announcement.
|
|
func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
|
|
ann *lnwire.ChannelAnnouncement,
|
|
ops []batch.SchedulerOption) ([]networkMsg, bool) {
|
|
|
|
log.Debugf("Processing ChannelAnnouncement: peer=%v, short_chan_id=%v",
|
|
nMsg.peer, ann.ShortChannelID.ToUint64())
|
|
|
|
// We'll ignore any channel announcements that target any chain other
|
|
// than the set of chains we know of.
|
|
if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) {
|
|
err := fmt.Errorf("ignoring ChannelAnnouncement from chain=%v"+
|
|
", gossiper on chain=%v", ann.ChainHash,
|
|
d.cfg.ChainHash)
|
|
log.Errorf(err.Error())
|
|
|
|
key := newRejectCacheKey(
|
|
ann.ShortChannelID.ToUint64(),
|
|
sourceToPub(nMsg.source),
|
|
)
|
|
_, _ = d.recentRejects.Put(key, &cachedReject{})
|
|
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
// If this is a remote ChannelAnnouncement with an alias SCID, we'll
|
|
// reject the announcement. Since the router accepts alias SCIDs,
|
|
// not erroring out would be a DoS vector.
|
|
if nMsg.isRemote && d.cfg.IsAlias(ann.ShortChannelID) {
|
|
err := fmt.Errorf("ignoring remote alias channel=%v",
|
|
ann.ShortChannelID)
|
|
log.Errorf(err.Error())
|
|
|
|
key := newRejectCacheKey(
|
|
ann.ShortChannelID.ToUint64(),
|
|
sourceToPub(nMsg.source),
|
|
)
|
|
_, _ = d.recentRejects.Put(key, &cachedReject{})
|
|
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
// If the advertised inclusionary block is beyond our knowledge of the
|
|
// chain tip, then we'll ignore it for now.
|
|
d.Lock()
|
|
if nMsg.isRemote && d.isPremature(ann.ShortChannelID, 0, nMsg) {
|
|
log.Warnf("Announcement for chan_id=(%v), is premature: "+
|
|
"advertises height %v, only height %v is known",
|
|
ann.ShortChannelID.ToUint64(),
|
|
ann.ShortChannelID.BlockHeight, d.bestHeight)
|
|
d.Unlock()
|
|
nMsg.err <- nil
|
|
return nil, false
|
|
}
|
|
d.Unlock()
|
|
|
|
// At this point, we'll now ask the router if this is a zombie/known
|
|
// edge. If so we can skip all the processing below.
|
|
if d.cfg.Router.IsKnownEdge(ann.ShortChannelID) {
|
|
nMsg.err <- nil
|
|
return nil, true
|
|
}
|
|
|
|
// If this is a remote channel announcement, then we'll validate all
|
|
// the signatures within the proof as it should be well formed.
|
|
var proof *channeldb.ChannelAuthProof
|
|
if nMsg.isRemote {
|
|
if err := routing.ValidateChannelAnn(ann); err != nil {
|
|
err := fmt.Errorf("unable to validate announcement: "+
|
|
"%v", err)
|
|
|
|
key := newRejectCacheKey(
|
|
ann.ShortChannelID.ToUint64(),
|
|
sourceToPub(nMsg.source),
|
|
)
|
|
_, _ = d.recentRejects.Put(key, &cachedReject{})
|
|
|
|
log.Error(err)
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
// If the proof checks out, then we'll save the proof itself to
|
|
// the database so we can fetch it later when gossiping with
|
|
// other nodes.
|
|
proof = &channeldb.ChannelAuthProof{
|
|
NodeSig1Bytes: ann.NodeSig1.ToSignatureBytes(),
|
|
NodeSig2Bytes: ann.NodeSig2.ToSignatureBytes(),
|
|
BitcoinSig1Bytes: ann.BitcoinSig1.ToSignatureBytes(),
|
|
BitcoinSig2Bytes: ann.BitcoinSig2.ToSignatureBytes(),
|
|
}
|
|
}
|
|
|
|
// With the proof validated (if necessary), we can now store it within
|
|
// the database for our path finding and syncing needs.
|
|
var featureBuf bytes.Buffer
|
|
if err := ann.Features.Encode(&featureBuf); err != nil {
|
|
log.Errorf("unable to encode features: %v", err)
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
edge := &channeldb.ChannelEdgeInfo{
|
|
ChannelID: ann.ShortChannelID.ToUint64(),
|
|
ChainHash: ann.ChainHash,
|
|
NodeKey1Bytes: ann.NodeID1,
|
|
NodeKey2Bytes: ann.NodeID2,
|
|
BitcoinKey1Bytes: ann.BitcoinKey1,
|
|
BitcoinKey2Bytes: ann.BitcoinKey2,
|
|
AuthProof: proof,
|
|
Features: featureBuf.Bytes(),
|
|
ExtraOpaqueData: ann.ExtraOpaqueData,
|
|
}
|
|
|
|
// If there were any optional message fields provided, we'll include
|
|
// them in its serialized disk representation now.
|
|
if nMsg.optionalMsgFields != nil {
|
|
if nMsg.optionalMsgFields.capacity != nil {
|
|
edge.Capacity = *nMsg.optionalMsgFields.capacity
|
|
}
|
|
if nMsg.optionalMsgFields.channelPoint != nil {
|
|
cp := *nMsg.optionalMsgFields.channelPoint
|
|
edge.ChannelPoint = cp
|
|
}
|
|
}
|
|
|
|
log.Debugf("Adding edge for short_chan_id: %v",
|
|
ann.ShortChannelID.ToUint64())
|
|
|
|
// We will add the edge to the channel router. If the nodes present in
|
|
// this channel are not present in the database, a partial node will be
|
|
// added to represent each node while we wait for a node announcement.
|
|
//
|
|
// Before we add the edge to the database, we obtain the mutex for this
|
|
// channel ID. We do this to ensure no other goroutine has read the
|
|
// database and is now making decisions based on this DB state, before
|
|
// it writes to the DB.
|
|
d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
|
|
err := d.cfg.Router.AddEdge(edge, ops...)
|
|
if err != nil {
|
|
log.Debugf("Router rejected edge for short_chan_id(%v): %v",
|
|
ann.ShortChannelID.ToUint64(), err)
|
|
|
|
defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
|
|
|
|
// If the edge was rejected due to already being known, then it
|
|
// may be the case that this new message has a fresh channel
|
|
// proof, so we'll check.
|
|
if routing.IsError(err, routing.ErrIgnored) {
|
|
// Attempt to process the rejected message to see if we
|
|
// get any new announcements.
|
|
anns, rErr := d.processRejectedEdge(ann, proof)
|
|
if rErr != nil {
|
|
key := newRejectCacheKey(
|
|
ann.ShortChannelID.ToUint64(),
|
|
sourceToPub(nMsg.source),
|
|
)
|
|
cr := &cachedReject{}
|
|
_, _ = d.recentRejects.Put(key, cr)
|
|
|
|
nMsg.err <- rErr
|
|
return nil, false
|
|
}
|
|
|
|
log.Debugf("Extracted %v announcements from rejected "+
|
|
"msgs", len(anns))
|
|
|
|
// If while processing this rejected edge, we realized
|
|
// there's a set of announcements we could extract,
|
|
// then we'll return those directly.
|
|
//
|
|
// NOTE: since this is an ErrIgnored, we can return
|
|
// true here to signal "allow" to its dependants.
|
|
nMsg.err <- nil
|
|
|
|
return anns, true
|
|
} else {
|
|
// Otherwise, this is just a regular rejected edge.
|
|
key := newRejectCacheKey(
|
|
ann.ShortChannelID.ToUint64(),
|
|
sourceToPub(nMsg.source),
|
|
)
|
|
_, _ = d.recentRejects.Put(key, &cachedReject{})
|
|
}
|
|
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
// If err is nil, release the lock immediately.
|
|
d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
|
|
|
|
log.Debugf("Finish adding edge for short_chan_id: %v",
|
|
ann.ShortChannelID.ToUint64())
|
|
|
|
// If we earlier received any ChannelUpdates for this channel, we can
|
|
// now process them, as the channel is added to the graph.
|
|
shortChanID := ann.ShortChannelID.ToUint64()
|
|
var channelUpdates []*processedNetworkMsg
|
|
|
|
earlyChanUpdates, err := d.prematureChannelUpdates.Get(shortChanID)
|
|
if err == nil {
|
|
// There was actually an entry in the map, so we'll accumulate
|
|
// it. We don't worry about deletion, since it'll eventually
|
|
// fall out anyway.
|
|
chanMsgs := earlyChanUpdates
|
|
channelUpdates = append(channelUpdates, chanMsgs.msgs...)
|
|
}
|
|
|
|
// Launch a new goroutine to handle each ChannelUpdate, this is to
|
|
// ensure we don't block here, as we can handle only one announcement
|
|
// at a time.
|
|
for _, cu := range channelUpdates {
|
|
// Skip if already processed.
|
|
if cu.processed {
|
|
continue
|
|
}
|
|
|
|
// Mark the ChannelUpdate as processed. This ensures that a
|
|
// subsequent announcement in the option-scid-alias case does
|
|
// not re-use an old ChannelUpdate.
|
|
cu.processed = true
|
|
|
|
d.wg.Add(1)
|
|
go func(updMsg *networkMsg) {
|
|
defer d.wg.Done()
|
|
|
|
switch msg := updMsg.msg.(type) {
|
|
// Reprocess the message, making sure we return an
|
|
// error to the original caller in case the gossiper
|
|
// shuts down.
|
|
case *lnwire.ChannelUpdate:
|
|
log.Debugf("Reprocessing ChannelUpdate for "+
|
|
"shortChanID=%v",
|
|
msg.ShortChannelID.ToUint64())
|
|
|
|
select {
|
|
case d.networkMsgs <- updMsg:
|
|
case <-d.quit:
|
|
updMsg.err <- ErrGossiperShuttingDown
|
|
}
|
|
|
|
// We don't expect any other message type than
|
|
// ChannelUpdate to be in this cache.
|
|
default:
|
|
log.Errorf("Unsupported message type found "+
|
|
"among ChannelUpdates: %T", msg)
|
|
}
|
|
}(cu.msg)
|
|
}
|
|
|
|
// Channel announcement was successfully processed and now it might be
|
|
// broadcast to other connected nodes if it was an announcement with
|
|
// proof (remote).
|
|
var announcements []networkMsg
|
|
|
|
if proof != nil {
|
|
announcements = append(announcements, networkMsg{
|
|
peer: nMsg.peer,
|
|
isRemote: nMsg.isRemote,
|
|
source: nMsg.source,
|
|
msg: ann,
|
|
})
|
|
}
|
|
|
|
nMsg.err <- nil
|
|
|
|
log.Debugf("Processed ChannelAnnouncement: peer=%v, short_chan_id=%v",
|
|
nMsg.peer, ann.ShortChannelID.ToUint64())
|
|
|
|
return announcements, true
|
|
}
|
|
|
|
// handleChanUpdate processes a new channel update.
|
|
func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
|
|
upd *lnwire.ChannelUpdate,
|
|
ops []batch.SchedulerOption) ([]networkMsg, bool) {
|
|
|
|
log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ",
|
|
nMsg.peer, upd.ShortChannelID.ToUint64())
|
|
|
|
// We'll ignore any channel updates that target any chain other than
|
|
// the set of chains we know of.
|
|
if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) {
|
|
err := fmt.Errorf("ignoring ChannelUpdate from chain=%v, "+
|
|
"gossiper on chain=%v", upd.ChainHash, d.cfg.ChainHash)
|
|
log.Errorf(err.Error())
|
|
|
|
key := newRejectCacheKey(
|
|
upd.ShortChannelID.ToUint64(),
|
|
sourceToPub(nMsg.source),
|
|
)
|
|
_, _ = d.recentRejects.Put(key, &cachedReject{})
|
|
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
blockHeight := upd.ShortChannelID.BlockHeight
|
|
shortChanID := upd.ShortChannelID.ToUint64()
|
|
|
|
// If the advertised inclusionary block is beyond our knowledge of the
|
|
// chain tip, then we'll put the announcement in limbo to be fully
|
|
// verified once we advance forward in the chain. If the update has an
|
|
// alias SCID, we'll skip the isPremature check. This is necessary
|
|
// since aliases start at block height 16_000_000.
|
|
d.Lock()
|
|
if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
|
|
d.isPremature(upd.ShortChannelID, 0, nMsg) {
|
|
|
|
log.Warnf("Update announcement for short_chan_id(%v), is "+
|
|
"premature: advertises height %v, only height %v is "+
|
|
"known", shortChanID, blockHeight, d.bestHeight)
|
|
d.Unlock()
|
|
nMsg.err <- nil
|
|
return nil, false
|
|
}
|
|
d.Unlock()
|
|
|
|
// Before we perform any of the expensive checks below, we'll check
|
|
// whether this update is stale or is for a zombie channel in order to
|
|
// quickly reject it.
|
|
timestamp := time.Unix(int64(upd.Timestamp), 0)
|
|
|
|
// Fetch the SCID we should be using to lock the channelMtx and make
|
|
// graph queries with.
|
|
graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
|
|
if err != nil {
|
|
// Fallback and set the graphScid to the peer-provided SCID.
|
|
// This will occur for non-option-scid-alias channels and for
|
|
// public option-scid-alias channels after 6 confirmations.
|
|
// Once public option-scid-alias channels have 6 confs, we'll
|
|
// ignore ChannelUpdates with one of their aliases.
|
|
graphScid = upd.ShortChannelID
|
|
}
|
|
|
|
if d.cfg.Router.IsStaleEdgePolicy(
|
|
graphScid, timestamp, upd.ChannelFlags,
|
|
) {
|
|
|
|
log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+
|
|
"peer=%v, msg=%s, is_remote=%v", shortChanID,
|
|
nMsg.peer, nMsg.msg.MsgType(), nMsg.isRemote,
|
|
)
|
|
|
|
nMsg.err <- nil
|
|
return nil, true
|
|
}
|
|
|
|
// Get the node pub key as far since we don't have it in the channel
|
|
// update announcement message. We'll need this to properly verify the
|
|
// message's signature.
|
|
//
|
|
// We make sure to obtain the mutex for this channel ID before we
|
|
// access the database. This ensures the state we read from the
|
|
// database has not changed between this point and when we call
|
|
// UpdateEdge() later.
|
|
d.channelMtx.Lock(graphScid.ToUint64())
|
|
defer d.channelMtx.Unlock(graphScid.ToUint64())
|
|
|
|
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(graphScid)
|
|
switch err {
|
|
// No error, break.
|
|
case nil:
|
|
break
|
|
|
|
case channeldb.ErrZombieEdge:
|
|
err = d.processZombieUpdate(chanInfo, upd)
|
|
if err != nil {
|
|
log.Debug(err)
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
// We'll fallthrough to ensure we stash the update until we
|
|
// receive its corresponding ChannelAnnouncement. This is
|
|
// needed to ensure the edge exists in the graph before
|
|
// applying the update.
|
|
fallthrough
|
|
case channeldb.ErrGraphNotFound:
|
|
fallthrough
|
|
case channeldb.ErrGraphNoEdgesFound:
|
|
fallthrough
|
|
case channeldb.ErrEdgeNotFound:
|
|
// If the edge corresponding to this ChannelUpdate was not
|
|
// found in the graph, this might be a channel in the process
|
|
// of being opened, and we haven't processed our own
|
|
// ChannelAnnouncement yet, hence it is not not found in the
|
|
// graph. This usually gets resolved after the channel proofs
|
|
// are exchanged and the channel is broadcasted to the rest of
|
|
// the network, but in case this is a private channel this
|
|
// won't ever happen. This can also happen in the case of a
|
|
// zombie channel with a fresh update for which we don't have a
|
|
// ChannelAnnouncement for since we reject them. Because of
|
|
// this, we temporarily add it to a map, and reprocess it after
|
|
// our own ChannelAnnouncement has been processed.
|
|
//
|
|
// The shortChanID may be an alias, but it is fine to use here
|
|
// since we don't have an edge in the graph and if the peer is
|
|
// not buggy, we should be able to use it once the gossiper
|
|
// receives the local announcement.
|
|
pMsg := &processedNetworkMsg{msg: nMsg}
|
|
|
|
earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
|
|
switch {
|
|
// Nothing in the cache yet, we can just directly insert this
|
|
// element.
|
|
case err == cache.ErrElementNotFound:
|
|
_, _ = d.prematureChannelUpdates.Put(
|
|
shortChanID, &cachedNetworkMsg{
|
|
msgs: []*processedNetworkMsg{pMsg},
|
|
})
|
|
|
|
// There's already something in the cache, so we'll combine the
|
|
// set of messages into a single value.
|
|
default:
|
|
msgs := earlyMsgs.msgs
|
|
msgs = append(msgs, pMsg)
|
|
_, _ = d.prematureChannelUpdates.Put(
|
|
shortChanID, &cachedNetworkMsg{
|
|
msgs: msgs,
|
|
})
|
|
}
|
|
|
|
log.Debugf("Got ChannelUpdate for edge not found in graph"+
|
|
"(shortChanID=%v), saving for reprocessing later",
|
|
shortChanID)
|
|
|
|
// NOTE: We don't return anything on the error channel for this
|
|
// message, as we expect that will be done when this
|
|
// ChannelUpdate is later reprocessed.
|
|
return nil, false
|
|
|
|
default:
|
|
err := fmt.Errorf("unable to validate channel update "+
|
|
"short_chan_id=%v: %v", shortChanID, err)
|
|
log.Error(err)
|
|
nMsg.err <- err
|
|
|
|
key := newRejectCacheKey(
|
|
upd.ShortChannelID.ToUint64(),
|
|
sourceToPub(nMsg.source),
|
|
)
|
|
_, _ = d.recentRejects.Put(key, &cachedReject{})
|
|
|
|
return nil, false
|
|
}
|
|
|
|
// The least-significant bit in the flag on the channel update
|
|
// announcement tells us "which" side of the channels directed edge is
|
|
// being updated.
|
|
var (
|
|
pubKey *btcec.PublicKey
|
|
edgeToUpdate *channeldb.ChannelEdgePolicy
|
|
)
|
|
direction := upd.ChannelFlags & lnwire.ChanUpdateDirection
|
|
switch direction {
|
|
case 0:
|
|
pubKey, _ = chanInfo.NodeKey1()
|
|
edgeToUpdate = e1
|
|
case 1:
|
|
pubKey, _ = chanInfo.NodeKey2()
|
|
edgeToUpdate = e2
|
|
}
|
|
|
|
log.Debugf("Validating ChannelUpdate: channel=%v, from node=%x, has "+
|
|
"edge=%v", chanInfo.ChannelID, pubKey.SerializeCompressed(),
|
|
edgeToUpdate != nil)
|
|
|
|
// Validate the channel announcement with the expected public key and
|
|
// channel capacity. In the case of an invalid channel update, we'll
|
|
// return an error to the caller and exit early.
|
|
err = routing.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, upd)
|
|
if err != nil {
|
|
rErr := fmt.Errorf("unable to validate channel update "+
|
|
"announcement for short_chan_id=%v: %v",
|
|
spew.Sdump(upd.ShortChannelID), err)
|
|
|
|
log.Error(rErr)
|
|
nMsg.err <- rErr
|
|
return nil, false
|
|
}
|
|
|
|
// If we have a previous version of the edge being updated, we'll want
|
|
// to rate limit its updates to prevent spam throughout the network.
|
|
if nMsg.isRemote && edgeToUpdate != nil {
|
|
// If it's a keep-alive update, we'll only propagate one if
|
|
// it's been a day since the previous. This follows our own
|
|
// heuristic of sending keep-alive updates after the same
|
|
// duration (see retransmitStaleAnns).
|
|
timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate)
|
|
if IsKeepAliveUpdate(upd, edgeToUpdate) {
|
|
if timeSinceLastUpdate < d.cfg.RebroadcastInterval {
|
|
log.Debugf("Ignoring keep alive update not "+
|
|
"within %v period for channel %v",
|
|
d.cfg.RebroadcastInterval, shortChanID)
|
|
nMsg.err <- nil
|
|
return nil, false
|
|
}
|
|
} else {
|
|
// If it's not, we'll allow an update per minute with a
|
|
// maximum burst of 10. If we haven't seen an update
|
|
// for this channel before, we'll need to initialize a
|
|
// rate limiter for each direction.
|
|
//
|
|
// Since the edge exists in the graph, we'll create a
|
|
// rate limiter for chanInfo.ChannelID rather then the
|
|
// SCID the peer sent. This is because there may be
|
|
// multiple aliases for a channel and we may otherwise
|
|
// rate-limit only a single alias of the channel,
|
|
// instead of the whole channel.
|
|
baseScid := chanInfo.ChannelID
|
|
d.Lock()
|
|
rls, ok := d.chanUpdateRateLimiter[baseScid]
|
|
if !ok {
|
|
r := rate.Every(d.cfg.ChannelUpdateInterval)
|
|
b := d.cfg.MaxChannelUpdateBurst
|
|
rls = [2]*rate.Limiter{
|
|
rate.NewLimiter(r, b),
|
|
rate.NewLimiter(r, b),
|
|
}
|
|
d.chanUpdateRateLimiter[baseScid] = rls
|
|
}
|
|
d.Unlock()
|
|
|
|
if !rls[direction].Allow() {
|
|
log.Debugf("Rate limiting update for channel "+
|
|
"%v from direction %x", shortChanID,
|
|
pubKey.SerializeCompressed())
|
|
nMsg.err <- nil
|
|
return nil, false
|
|
}
|
|
}
|
|
}
|
|
|
|
// We'll use chanInfo.ChannelID rather than the peer-supplied
|
|
// ShortChannelID in the ChannelUpdate to avoid the router having to
|
|
// lookup the stored SCID. If we're sending the update, we'll always
|
|
// use the SCID stored in the database rather than a potentially
|
|
// different alias. This might mean that SigBytes is incorrect as it
|
|
// signs a different SCID than the database SCID, but since there will
|
|
// only be a difference if AuthProof == nil, this is fine.
|
|
update := &channeldb.ChannelEdgePolicy{
|
|
SigBytes: upd.Signature.ToSignatureBytes(),
|
|
ChannelID: chanInfo.ChannelID,
|
|
LastUpdate: timestamp,
|
|
MessageFlags: upd.MessageFlags,
|
|
ChannelFlags: upd.ChannelFlags,
|
|
TimeLockDelta: upd.TimeLockDelta,
|
|
MinHTLC: upd.HtlcMinimumMsat,
|
|
MaxHTLC: upd.HtlcMaximumMsat,
|
|
FeeBaseMSat: lnwire.MilliSatoshi(upd.BaseFee),
|
|
FeeProportionalMillionths: lnwire.MilliSatoshi(upd.FeeRate),
|
|
ExtraOpaqueData: upd.ExtraOpaqueData,
|
|
}
|
|
|
|
if err := d.cfg.Router.UpdateEdge(update, ops...); err != nil {
|
|
if routing.IsError(
|
|
err, routing.ErrOutdated,
|
|
routing.ErrIgnored,
|
|
routing.ErrVBarrierShuttingDown,
|
|
) {
|
|
|
|
log.Debugf("Update edge for short_chan_id(%v) got: %v",
|
|
shortChanID, err)
|
|
} else {
|
|
// Since we know the stored SCID in the graph, we'll
|
|
// cache that SCID.
|
|
key := newRejectCacheKey(
|
|
chanInfo.ChannelID,
|
|
sourceToPub(nMsg.source),
|
|
)
|
|
_, _ = d.recentRejects.Put(key, &cachedReject{})
|
|
|
|
log.Errorf("Update edge for short_chan_id(%v) got: %v",
|
|
shortChanID, err)
|
|
}
|
|
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
// If this is a local ChannelUpdate without an AuthProof, it means it
|
|
// is an update to a channel that is not (yet) supposed to be announced
|
|
// to the greater network. However, our channel counter party will need
|
|
// to be given the update, so we'll try sending the update directly to
|
|
// the remote peer.
|
|
if !nMsg.isRemote && chanInfo.AuthProof == nil {
|
|
if nMsg.optionalMsgFields != nil {
|
|
remoteAlias := nMsg.optionalMsgFields.remoteAlias
|
|
if remoteAlias != nil {
|
|
// The remoteAlias field was specified, meaning
|
|
// that we should replace the SCID in the
|
|
// update with the remote's alias. We'll also
|
|
// need to re-sign the channel update. This is
|
|
// required for option-scid-alias feature-bit
|
|
// negotiated channels.
|
|
upd.ShortChannelID = *remoteAlias
|
|
|
|
sig, err := d.cfg.SignAliasUpdate(upd)
|
|
if err != nil {
|
|
log.Error(err)
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
lnSig, err := lnwire.NewSigFromSignature(sig)
|
|
if err != nil {
|
|
log.Error(err)
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
upd.Signature = lnSig
|
|
}
|
|
}
|
|
|
|
// Get our peer's public key.
|
|
remotePubKey := remotePubFromChanInfo(
|
|
chanInfo, upd.ChannelFlags,
|
|
)
|
|
|
|
log.Debugf("The message %v has no AuthProof, sending the "+
|
|
"update to remote peer %x", upd.MsgType(), remotePubKey)
|
|
|
|
// Now we'll attempt to send the channel update message
|
|
// reliably to the remote peer in the background, so that we
|
|
// don't block if the peer happens to be offline at the moment.
|
|
err := d.reliableSender.sendMessage(upd, remotePubKey)
|
|
if err != nil {
|
|
err := fmt.Errorf("unable to reliably send %v for "+
|
|
"channel=%v to peer=%x: %v", upd.MsgType(),
|
|
upd.ShortChannelID, remotePubKey, err)
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
}
|
|
|
|
// Channel update announcement was successfully processed and now it
|
|
// can be broadcast to the rest of the network. However, we'll only
|
|
// broadcast the channel update announcement if it has an attached
|
|
// authentication proof. We also won't broadcast the update if it
|
|
// contains an alias because the network would reject this.
|
|
var announcements []networkMsg
|
|
if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
|
|
announcements = append(announcements, networkMsg{
|
|
peer: nMsg.peer,
|
|
source: nMsg.source,
|
|
isRemote: nMsg.isRemote,
|
|
msg: upd,
|
|
})
|
|
}
|
|
|
|
nMsg.err <- nil
|
|
|
|
log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+
|
|
"timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(),
|
|
timestamp)
|
|
return announcements, true
|
|
}
|
|
|
|
// handleAnnSig processes a new announcement signatures message.
|
|
func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
|
|
ann *lnwire.AnnounceSignatures) ([]networkMsg, bool) {
|
|
|
|
needBlockHeight := ann.ShortChannelID.BlockHeight +
|
|
d.cfg.ProofMatureDelta
|
|
shortChanID := ann.ShortChannelID.ToUint64()
|
|
|
|
prefix := "local"
|
|
if nMsg.isRemote {
|
|
prefix = "remote"
|
|
}
|
|
|
|
log.Infof("Received new %v announcement signature for %v", prefix,
|
|
ann.ShortChannelID)
|
|
|
|
// By the specification, channel announcement proofs should be sent
|
|
// after some number of confirmations after channel was registered in
|
|
// bitcoin blockchain. Therefore, we check if the proof is mature.
|
|
d.Lock()
|
|
premature := d.isPremature(
|
|
ann.ShortChannelID, d.cfg.ProofMatureDelta, nMsg,
|
|
)
|
|
if premature {
|
|
log.Warnf("Premature proof announcement, current block height"+
|
|
"lower than needed: %v < %v", d.bestHeight,
|
|
needBlockHeight)
|
|
d.Unlock()
|
|
nMsg.err <- nil
|
|
return nil, false
|
|
}
|
|
d.Unlock()
|
|
|
|
// Ensure that we know of a channel with the target channel ID before
|
|
// proceeding further.
|
|
//
|
|
// We must acquire the mutex for this channel ID before getting the
|
|
// channel from the database, to ensure what we read does not change
|
|
// before we call AddProof() later.
|
|
d.channelMtx.Lock(ann.ShortChannelID.ToUint64())
|
|
defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64())
|
|
|
|
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(
|
|
ann.ShortChannelID,
|
|
)
|
|
if err != nil {
|
|
_, err = d.cfg.FindChannel(nMsg.source, ann.ChannelID)
|
|
if err != nil {
|
|
err := fmt.Errorf("unable to store the proof for "+
|
|
"short_chan_id=%v: %v", shortChanID, err)
|
|
log.Error(err)
|
|
nMsg.err <- err
|
|
|
|
return nil, false
|
|
}
|
|
|
|
proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
|
|
err := d.cfg.WaitingProofStore.Add(proof)
|
|
if err != nil {
|
|
err := fmt.Errorf("unable to store the proof for "+
|
|
"short_chan_id=%v: %v", shortChanID, err)
|
|
log.Error(err)
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
log.Infof("Orphan %v proof announcement with short_chan_id=%v"+
|
|
", adding to waiting batch", prefix, shortChanID)
|
|
nMsg.err <- nil
|
|
return nil, false
|
|
}
|
|
|
|
nodeID := nMsg.source.SerializeCompressed()
|
|
isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
|
|
isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
|
|
|
|
// Ensure that channel that was retrieved belongs to the peer which
|
|
// sent the proof announcement.
|
|
if !(isFirstNode || isSecondNode) {
|
|
err := fmt.Errorf("channel that was received doesn't belong "+
|
|
"to the peer which sent the proof, short_chan_id=%v",
|
|
shortChanID)
|
|
log.Error(err)
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
// If proof was sent by a local sub-system, then we'll send the
|
|
// announcement signature to the remote node so they can also
|
|
// reconstruct the full channel announcement.
|
|
if !nMsg.isRemote {
|
|
var remotePubKey [33]byte
|
|
if isFirstNode {
|
|
remotePubKey = chanInfo.NodeKey2Bytes
|
|
} else {
|
|
remotePubKey = chanInfo.NodeKey1Bytes
|
|
}
|
|
|
|
// Since the remote peer might not be online we'll call a
|
|
// method that will attempt to deliver the proof when it comes
|
|
// online.
|
|
err := d.reliableSender.sendMessage(ann, remotePubKey)
|
|
if err != nil {
|
|
err := fmt.Errorf("unable to reliably send %v for "+
|
|
"channel=%v to peer=%x: %v", ann.MsgType(),
|
|
ann.ShortChannelID, remotePubKey, err)
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
}
|
|
|
|
// Check if we already have the full proof for this channel.
|
|
if chanInfo.AuthProof != nil {
|
|
// If we already have the fully assembled proof, then the peer
|
|
// sending us their proof has probably not received our local
|
|
// proof yet. So be kind and send them the full proof.
|
|
if nMsg.isRemote {
|
|
peerID := nMsg.source.SerializeCompressed()
|
|
log.Debugf("Got AnnounceSignatures for channel with " +
|
|
"full proof.")
|
|
|
|
d.wg.Add(1)
|
|
go func() {
|
|
defer d.wg.Done()
|
|
|
|
log.Debugf("Received half proof for channel "+
|
|
"%v with existing full proof. Sending"+
|
|
" full proof to peer=%x",
|
|
ann.ChannelID, peerID)
|
|
|
|
ca, _, _, err := netann.CreateChanAnnouncement(
|
|
chanInfo.AuthProof, chanInfo, e1, e2,
|
|
)
|
|
if err != nil {
|
|
log.Errorf("unable to gen ann: %v",
|
|
err)
|
|
return
|
|
}
|
|
|
|
err = nMsg.peer.SendMessage(false, ca)
|
|
if err != nil {
|
|
log.Errorf("Failed sending full proof"+
|
|
" to peer=%x: %v", peerID, err)
|
|
return
|
|
}
|
|
|
|
log.Debugf("Full proof sent to peer=%x for "+
|
|
"chanID=%v", peerID, ann.ChannelID)
|
|
}()
|
|
}
|
|
|
|
log.Debugf("Already have proof for channel with chanID=%v",
|
|
ann.ChannelID)
|
|
nMsg.err <- nil
|
|
return nil, true
|
|
}
|
|
|
|
// Check that we received the opposite proof. If so, then we're now
|
|
// able to construct the full proof, and create the channel
|
|
// announcement. If we didn't receive the opposite half of the proof
|
|
// then we should store this one, and wait for the opposite to be
|
|
// received.
|
|
proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
|
|
oppProof, err := d.cfg.WaitingProofStore.Get(proof.OppositeKey())
|
|
if err != nil && err != channeldb.ErrWaitingProofNotFound {
|
|
err := fmt.Errorf("unable to get the opposite proof for "+
|
|
"short_chan_id=%v: %v", shortChanID, err)
|
|
log.Error(err)
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
if err == channeldb.ErrWaitingProofNotFound {
|
|
err := d.cfg.WaitingProofStore.Add(proof)
|
|
if err != nil {
|
|
err := fmt.Errorf("unable to store the proof for "+
|
|
"short_chan_id=%v: %v", shortChanID, err)
|
|
log.Error(err)
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
log.Infof("1/2 of channel ann proof received for "+
|
|
"short_chan_id=%v, waiting for other half",
|
|
shortChanID)
|
|
|
|
nMsg.err <- nil
|
|
return nil, false
|
|
}
|
|
|
|
// We now have both halves of the channel announcement proof, then
|
|
// we'll reconstruct the initial announcement so we can validate it
|
|
// shortly below.
|
|
var dbProof channeldb.ChannelAuthProof
|
|
if isFirstNode {
|
|
dbProof.NodeSig1Bytes = ann.NodeSignature.ToSignatureBytes()
|
|
dbProof.NodeSig2Bytes = oppProof.NodeSignature.ToSignatureBytes()
|
|
dbProof.BitcoinSig1Bytes = ann.BitcoinSignature.ToSignatureBytes()
|
|
dbProof.BitcoinSig2Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
|
|
} else {
|
|
dbProof.NodeSig1Bytes = oppProof.NodeSignature.ToSignatureBytes()
|
|
dbProof.NodeSig2Bytes = ann.NodeSignature.ToSignatureBytes()
|
|
dbProof.BitcoinSig1Bytes = oppProof.BitcoinSignature.ToSignatureBytes()
|
|
dbProof.BitcoinSig2Bytes = ann.BitcoinSignature.ToSignatureBytes()
|
|
}
|
|
|
|
chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement(
|
|
&dbProof, chanInfo, e1, e2,
|
|
)
|
|
if err != nil {
|
|
log.Error(err)
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
// With all the necessary components assembled validate the full
|
|
// channel announcement proof.
|
|
if err := routing.ValidateChannelAnn(chanAnn); err != nil {
|
|
err := fmt.Errorf("channel announcement proof for "+
|
|
"short_chan_id=%v isn't valid: %v", shortChanID, err)
|
|
|
|
log.Error(err)
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
// If the channel was returned by the router it means that existence of
|
|
// funding point and inclusion of nodes bitcoin keys in it already
|
|
// checked by the router. In this stage we should check that node keys
|
|
// attest to the bitcoin keys by validating the signatures of
|
|
// announcement. If proof is valid then we'll populate the channel edge
|
|
// with it, so we can announce it on peer connect.
|
|
err = d.cfg.Router.AddProof(ann.ShortChannelID, &dbProof)
|
|
if err != nil {
|
|
err := fmt.Errorf("unable add proof to the channel chanID=%v:"+
|
|
" %v", ann.ChannelID, err)
|
|
log.Error(err)
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
|
|
if err != nil {
|
|
err := fmt.Errorf("unable to remove opposite proof for the "+
|
|
"channel with chanID=%v: %v", ann.ChannelID, err)
|
|
log.Error(err)
|
|
nMsg.err <- err
|
|
return nil, false
|
|
}
|
|
|
|
// Proof was successfully created and now can announce the channel to
|
|
// the remain network.
|
|
log.Infof("Fully valid channel proof for short_chan_id=%v constructed"+
|
|
", adding to next ann batch", shortChanID)
|
|
|
|
// Assemble the necessary announcements to add to the next broadcasting
|
|
// batch.
|
|
var announcements []networkMsg
|
|
announcements = append(announcements, networkMsg{
|
|
peer: nMsg.peer,
|
|
source: nMsg.source,
|
|
msg: chanAnn,
|
|
})
|
|
if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil {
|
|
announcements = append(announcements, networkMsg{
|
|
peer: nMsg.peer,
|
|
source: src,
|
|
msg: e1Ann,
|
|
})
|
|
}
|
|
if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil {
|
|
announcements = append(announcements, networkMsg{
|
|
peer: nMsg.peer,
|
|
source: src,
|
|
msg: e2Ann,
|
|
})
|
|
}
|
|
|
|
// We'll also send along the node announcements for each channel
|
|
// participant if we know of them. To ensure our node announcement
|
|
// propagates to our channel counterparty, we'll set the source for
|
|
// each announcement to the node it belongs to, otherwise we won't send
|
|
// it since the source gets skipped. This isn't necessary for channel
|
|
// updates and announcement signatures since we send those directly to
|
|
// our channel counterparty through the gossiper's reliable sender.
|
|
node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes)
|
|
if err != nil {
|
|
log.Debugf("Unable to fetch node announcement for %x: %v",
|
|
chanInfo.NodeKey1Bytes, err)
|
|
} else {
|
|
if nodeKey1, err := chanInfo.NodeKey1(); err == nil {
|
|
announcements = append(announcements, networkMsg{
|
|
peer: nMsg.peer,
|
|
source: nodeKey1,
|
|
msg: node1Ann,
|
|
})
|
|
}
|
|
}
|
|
|
|
node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes)
|
|
if err != nil {
|
|
log.Debugf("Unable to fetch node announcement for %x: %v",
|
|
chanInfo.NodeKey2Bytes, err)
|
|
} else {
|
|
if nodeKey2, err := chanInfo.NodeKey2(); err == nil {
|
|
announcements = append(announcements, networkMsg{
|
|
peer: nMsg.peer,
|
|
source: nodeKey2,
|
|
msg: node2Ann,
|
|
})
|
|
}
|
|
}
|
|
|
|
nMsg.err <- nil
|
|
return announcements, true
|
|
}
|