server+discovery: alias-handling in gossiper

An OptionalMsgField has been added that allows outside subsystems
to provide a short channel id we should insert into a ChannelUpdate
that we then sign and send to our peer.

When the gossiper receives a ChannelUpdate, it will query the
alias manager by the passed-in FindBaseByAlias function to determine
if the short channel id in the ChannelUpdate points to a known
channel. If this lookup returns an error, we'll fallback to using
the original id in the ChannelUpdate when querying the router.
The lookup and potential fallback must occur in order to properly
lock the multimutex, query the correct router channels, and rate
limit the correct short channel id. An unfortunate side effect of
receiving ChannelUpdates from our peer that reference on of our
aliases rather than the real SCID is that we must store this policy.
Yet it is not broadcast-able. Care has been taken to ensure the
gossiper does not broadcast *any* ChannelUpdate with an alias SCID.

The cachedNetworkMsg uses the new processedNetworkMsg struct. This
is necessary so that delete-and-reinsert in the funding manager
doesn't process a ChannelUpdate twice and end up in a deadlock since
the err chan is no longer being used.
This commit is contained in:
eugene 2022-04-04 16:26:04 -04:00
parent 15b871de36
commit 01f28ba540
No known key found for this signature in database
GPG Key ID: 118759E83439A9B1
4 changed files with 282 additions and 28 deletions

View File

@ -2167,8 +2167,18 @@ func (c *ChannelGraph) FilterChannelRange(startHeight,
// We'll now iterate through the database, and find each
// channel ID that resides within the specified range.
for k, _ := cursor.Seek(chanIDStart[:]); k != nil &&
bytes.Compare(k, chanIDEnd[:]) <= 0; k, _ = cursor.Next() {
for k, v := cursor.Seek(chanIDStart[:]); k != nil &&
bytes.Compare(k, chanIDEnd[:]) <= 0; k, v = cursor.Next() {
// Don't send alias SCIDs during gossip sync.
edgeReader := bytes.NewReader(v)
edgeInfo, err := deserializeChanEdgeInfo(edgeReader)
if err != nil {
return err
}
if edgeInfo.AuthProof == nil {
continue
}
// This channel ID rests within the target range, so
// we'll add it to our returned set.

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcec/v2/ecdsa"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
@ -72,6 +73,7 @@ var (
type optionalMsgFields struct {
capacity *btcutil.Amount
channelPoint *wire.OutPoint
remoteAlias *lnwire.ShortChannelID
}
// apply applies the optional fields within the functional options.
@ -102,6 +104,18 @@ func ChannelPoint(op wire.OutPoint) OptionalMsgField {
}
}
// RemoteAlias is an optional field that lets the gossiper know that a locally
// sent channel update is actually an update for the peer that should replace
// the ShortChannelID field with the remote's alias. This is only used for
// channels with peers where the option-scid-alias feature bit was negotiated.
// The channel update will be added to the graph under the original SCID, but
// will be modified and re-signed with this alias.
func RemoteAlias(alias *lnwire.ShortChannelID) OptionalMsgField {
return func(f *optionalMsgFields) {
f.remoteAlias = alias
}
}
// networkMsg couples a routing related wire message with the peer that
// originally sent it.
type networkMsg struct {
@ -277,12 +291,40 @@ type Config struct {
// how often we should allow a new update for a specific channel and
// direction.
ChannelUpdateInterval time.Duration
// IsAlias returns true if a given ShortChannelID is an alias for
// option_scid_alias channels.
IsAlias func(scid lnwire.ShortChannelID) bool
// SignAliasUpdate is used to re-sign a channel update using the
// remote's alias if the option-scid-alias feature bit was negotiated.
SignAliasUpdate func(u *lnwire.ChannelUpdate) (*ecdsa.Signature,
error)
// FindBaseByAlias finds the SCID stored in the graph by an alias SCID.
// This is used for channels that have negotiated the option-scid-alias
// feature bit.
FindBaseByAlias func(alias lnwire.ShortChannelID) (
lnwire.ShortChannelID, error)
// GetAlias allows the gossiper to look up the peer's alias for a given
// ChannelID. This is used to sign updates for them if the channel has
// no AuthProof and the option-scid-alias feature bit was negotiated.
GetAlias func(lnwire.ChannelID) (lnwire.ShortChannelID, error)
}
// processedNetworkMsg is a wrapper around networkMsg and a boolean. It is
// used to let the caller of the lru.Cache know if a message has already been
// processed or not.
type processedNetworkMsg struct {
processed bool
msg *networkMsg
}
// cachedNetworkMsg is a wrapper around a network message that can be used with
// *lru.Cache.
type cachedNetworkMsg struct {
msgs []*networkMsg
msgs []*processedNetworkMsg
}
// Size returns the "size" of an entry. We return the number of items as we
@ -596,11 +638,11 @@ func (d *AuthenticatedGossiper) resendFutureMessages(height uint32) {
log.Debugf("Resending %d network messages at height %d",
len(msgs), height)
for _, msg := range msgs {
for _, pMsg := range msgs {
select {
case d.networkMsgs <- msg:
case d.networkMsgs <- pMsg.msg:
case <-d.quit:
msg.err <- ErrGossiperShuttingDown
pMsg.msg.err <- ErrGossiperShuttingDown
}
}
}
@ -1523,6 +1565,37 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(
// avoid directly giving away their existence. Instead, we'll
// send the update directly to the remote party.
if edgeInfo.Info.AuthProof == nil {
// If AuthProof is nil and an alias was found for this
// ChannelID (meaning the option-scid-alias feature was
// negotiated), we'll replace the ShortChannelID in the
// update with the peer's alias. We do this after
// updateChannel so that the alias isn't persisted to
// the database.
op := &edgeInfo.Info.ChannelPoint
chanID := lnwire.NewChanIDFromOutPoint(op)
var defaultAlias lnwire.ShortChannelID
foundAlias, _ := d.cfg.GetAlias(chanID)
if foundAlias != defaultAlias {
chanUpdate.ShortChannelID = foundAlias
sig, err := d.cfg.SignAliasUpdate(chanUpdate)
if err != nil {
log.Errorf("Unable to sign alias "+
"update: %v", err)
continue
}
lnSig, err := lnwire.NewSigFromSignature(sig)
if err != nil {
log.Errorf("Unable to create sig: %v",
err)
continue
}
chanUpdate.Signature = lnSig
}
remotePubKey := remotePubFromChanInfo(
edgeInfo.Info, chanUpdate.ChannelFlags,
)
@ -1703,7 +1776,7 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
// Init an empty cached message and overwrite it if there are cached
// messages found.
cachedMsgs := &cachedNetworkMsg{
msgs: make([]*networkMsg, 0),
msgs: make([]*processedNetworkMsg, 0),
}
result, err := d.futureMsgs.Get(msgHeight)
@ -1723,8 +1796,11 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID,
err: make(chan error, 1),
}
// The processed boolean is unused in the futureMsgs case.
pMsg := &processedNetworkMsg{msg: copied}
// Add the network message.
cachedMsgs.msgs = append(cachedMsgs.msgs, copied)
cachedMsgs.msgs = append(cachedMsgs.msgs, pMsg)
_, err = d.futureMsgs.Put(msgHeight, cachedMsgs)
if err != nil {
log.Errorf("Adding future message got error: %v", err)
@ -1826,7 +1902,8 @@ func (d *AuthenticatedGossiper) processZombieUpdate(
// With the signature valid, we'll proceed to mark the
// edge as live and wait for the channel announcement to
// come through again.
err = d.cfg.Router.MarkEdgeLive(msg.ShortChannelID)
baseScid := lnwire.NewShortChanIDFromInt(chanInfo.ChannelID)
err = d.cfg.Router.MarkEdgeLive(baseScid)
if err != nil {
return fmt.Errorf("unable to remove edge with "+
"chan_id=%v from zombie index: %v",
@ -2147,6 +2224,24 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
return nil, false
}
// If this is a remote ChannelAnnouncement with an alias SCID, we'll
// reject the announcement. Since the router accepts alias SCIDs,
// not erroring out would be a DoS vector.
if nMsg.isRemote && d.cfg.IsAlias(ann.ShortChannelID) {
err := fmt.Errorf("ignoring remote alias channel=%v",
ann.ShortChannelID)
log.Errorf(err.Error())
key := newRejectCacheKey(
ann.ShortChannelID.ToUint64(),
sourceToPub(nMsg.source),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
nMsg.err <- err
return nil, false
}
// If the advertised inclusionary block is beyond our knowledge of the
// chain tip, then we'll ignore it for now.
d.Lock()
@ -2293,7 +2388,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
// If we earlier received any ChannelUpdates for this channel, we can
// now process them, as the channel is added to the graph.
shortChanID := ann.ShortChannelID.ToUint64()
var channelUpdates []*networkMsg
var channelUpdates []*processedNetworkMsg
earlyChanUpdates, err := d.prematureChannelUpdates.Get(shortChanID)
if err == nil {
@ -2308,6 +2403,16 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
// ensure we don't block here, as we can handle only one announcement
// at a time.
for _, cu := range channelUpdates {
// Skip if already processed.
if cu.processed {
continue
}
// Mark the ChannelUpdate as processed. This ensures that a
// subsequent announcement in the option-scid-alias case does
// not re-use an old ChannelUpdate.
cu.processed = true
d.wg.Add(1)
go func(updMsg *networkMsg) {
defer d.wg.Done()
@ -2333,7 +2438,7 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg,
log.Errorf("Unsupported message type found "+
"among ChannelUpdates: %T", msg)
}
}(cu)
}(cu.msg)
}
// Channel announcement was successfully processed and now it might be
@ -2380,9 +2485,13 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
// If the advertised inclusionary block is beyond our knowledge of the
// chain tip, then we'll put the announcement in limbo to be fully
// verified once we advance forward in the chain.
// verified once we advance forward in the chain. If the update has an
// alias SCID, we'll skip the isPremature check. This is necessary
// since aliases start at block height 16_000_000.
d.Lock()
if nMsg.isRemote && d.isPremature(upd.ShortChannelID, 0, nMsg) {
if nMsg.isRemote && !d.cfg.IsAlias(upd.ShortChannelID) &&
d.isPremature(upd.ShortChannelID, 0, nMsg) {
log.Warnf("Update announcement for short_chan_id(%v), is "+
"premature: advertises height %v, only height %v is "+
"known", shortChanID, blockHeight, d.bestHeight)
@ -2396,8 +2505,21 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
// whether this update is stale or is for a zombie channel in order to
// quickly reject it.
timestamp := time.Unix(int64(upd.Timestamp), 0)
// Fetch the SCID we should be using to lock the channelMtx and make
// graph queries with.
graphScid, err := d.cfg.FindBaseByAlias(upd.ShortChannelID)
if err != nil {
// Fallback and set the graphScid to the peer-provided SCID.
// This will occur for non-option-scid-alias channels and for
// public option-scid-alias channels after 6 confirmations.
// Once public option-scid-alias channels have 6 confs, we'll
// ignore ChannelUpdates with one of their aliases.
graphScid = upd.ShortChannelID
}
if d.cfg.Router.IsStaleEdgePolicy(
upd.ShortChannelID, timestamp, upd.ChannelFlags,
graphScid, timestamp, upd.ChannelFlags,
) {
log.Debugf("Ignored stale edge policy: peer=%v, source=%x, "+
@ -2418,11 +2540,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
// access the database. This ensures the state we read from the
// database has not changed between this point and when we call
// UpdateEdge() later.
d.channelMtx.Lock(upd.ShortChannelID.ToUint64())
defer d.channelMtx.Unlock(upd.ShortChannelID.ToUint64())
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(
upd.ShortChannelID,
)
d.channelMtx.Lock(graphScid.ToUint64())
defer d.channelMtx.Unlock(graphScid.ToUint64())
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(graphScid)
switch err {
// No error, break.
case nil:
@ -2458,6 +2579,13 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
// ChannelAnnouncement for since we reject them. Because of
// this, we temporarily add it to a map, and reprocess it after
// our own ChannelAnnouncement has been processed.
//
// The shortChanID may be an alias, but it is fine to use here
// since we don't have an edge in the graph and if the peer is
// not buggy, we should be able to use it once the gossiper
// receives the local announcement.
pMsg := &processedNetworkMsg{msg: nMsg}
earlyMsgs, err := d.prematureChannelUpdates.Get(shortChanID)
switch {
// Nothing in the cache yet, we can just directly insert this
@ -2465,14 +2593,14 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
case err == cache.ErrElementNotFound:
_, _ = d.prematureChannelUpdates.Put(
shortChanID, &cachedNetworkMsg{
msgs: []*networkMsg{nMsg},
msgs: []*processedNetworkMsg{pMsg},
})
// There's already something in the cache, so we'll combine the
// set of messages into a single value.
default:
msgs := earlyMsgs.(*cachedNetworkMsg).msgs
msgs = append(msgs, nMsg)
msgs = append(msgs, pMsg)
_, _ = d.prematureChannelUpdates.Put(
shortChanID, &cachedNetworkMsg{
msgs: msgs,
@ -2555,8 +2683,16 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
// maximum burst of 10. If we haven't seen an update
// for this channel before, we'll need to initialize a
// rate limiter for each direction.
//
// Since the edge exists in the graph, we'll create a
// rate limiter for chanInfo.ChannelID rather then the
// SCID the peer sent. This is because there may be
// multiple aliases for a channel and we may otherwise
// rate-limit only a single alias of the channel,
// instead of the whole channel.
baseScid := chanInfo.ChannelID
d.Lock()
rls, ok := d.chanUpdateRateLimiter[shortChanID]
rls, ok := d.chanUpdateRateLimiter[baseScid]
if !ok {
r := rate.Every(d.cfg.ChannelUpdateInterval)
b := d.cfg.MaxChannelUpdateBurst
@ -2564,7 +2700,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
rate.NewLimiter(r, b),
rate.NewLimiter(r, b),
}
d.chanUpdateRateLimiter[shortChanID] = rls
d.chanUpdateRateLimiter[baseScid] = rls
}
d.Unlock()
@ -2578,9 +2714,16 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
}
}
// We'll use chanInfo.ChannelID rather than the peer-supplied
// ShortChannelID in the ChannelUpdate to avoid the router having to
// lookup the stored SCID. If we're sending the update, we'll always
// use the SCID stored in the database rather than a potentially
// different alias. This might mean that SigBytes is incorrect as it
// signs a different SCID than the database SCID, but since there will
// only be a difference if AuthProof == nil, this is fine.
update := &channeldb.ChannelEdgePolicy{
SigBytes: upd.Signature.ToSignatureBytes(),
ChannelID: shortChanID,
ChannelID: chanInfo.ChannelID,
LastUpdate: timestamp,
MessageFlags: upd.MessageFlags,
ChannelFlags: upd.ChannelFlags,
@ -2601,8 +2744,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
log.Debug(err)
} else {
// Since we know the stored SCID in the graph, we'll
// cache that SCID.
key := newRejectCacheKey(
upd.ShortChannelID.ToUint64(),
chanInfo.ChannelID,
sourceToPub(nMsg.source),
)
_, _ = d.recentRejects.Put(key, &cachedReject{})
@ -2620,6 +2765,35 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
// to be given the update, so we'll try sending the update directly to
// the remote peer.
if !nMsg.isRemote && chanInfo.AuthProof == nil {
if nMsg.optionalMsgFields != nil {
remoteAlias := nMsg.optionalMsgFields.remoteAlias
if remoteAlias != nil {
// The remoteAlias field was specified, meaning
// that we should replace the SCID in the
// update with the remote's alias. We'll also
// need to re-sign the channel update. This is
// required for option-scid-alias feature-bit
// negotiated channels.
upd.ShortChannelID = *remoteAlias
sig, err := d.cfg.SignAliasUpdate(upd)
if err != nil {
log.Error(err)
nMsg.err <- err
return nil, false
}
lnSig, err := lnwire.NewSigFromSignature(sig)
if err != nil {
log.Error(err)
nMsg.err <- err
return nil, false
}
upd.Signature = lnSig
}
}
// Get our peer's public key.
remotePubKey := remotePubFromChanInfo(
chanInfo, upd.ChannelFlags,
@ -2645,9 +2819,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
// Channel update announcement was successfully processed and now it
// can be broadcast to the rest of the network. However, we'll only
// broadcast the channel update announcement if it has an attached
// authentication proof.
// authentication proof. We also won't broadcast the update if it
// contains an alias because the network would reject this.
var announcements []networkMsg
if chanInfo.AuthProof != nil {
if chanInfo.AuthProof != nil && !d.cfg.IsAlias(upd.ShortChannelID) {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
@ -2704,7 +2879,6 @@ func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg,
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(
ann.ShortChannelID,
)
if err != nil {
proof := channeldb.NewWaitingProof(nMsg.isRemote, ann)
err := d.cfg.WaitingProofStore.Add(proof)

View File

@ -16,6 +16,7 @@ import (
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcec/v2/ecdsa"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
@ -734,6 +735,27 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
}
broadcastedMessage := make(chan msgWithSenders, 10)
isAlias := func(lnwire.ShortChannelID) bool {
return false
}
signAliasUpdate := func(*lnwire.ChannelUpdate) (*ecdsa.Signature,
error) {
return nil, nil
}
findBaseByAlias := func(lnwire.ShortChannelID) (lnwire.ShortChannelID,
error) {
return lnwire.ShortChannelID{}, fmt.Errorf("no base scid")
}
getAlias := func(lnwire.ChannelID) (lnwire.ShortChannelID, error) {
return lnwire.ShortChannelID{}, fmt.Errorf("no peer alias")
}
gossiper := New(Config{
Notifier: notifier,
Broadcast: func(senders map[route.Vertex]struct{},
@ -778,6 +800,10 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
MinimumBatchSize: 10,
MaxChannelUpdateBurst: DefaultMaxChannelUpdateBurst,
ChannelUpdateInterval: DefaultChannelUpdateInterval,
IsAlias: isAlias,
SignAliasUpdate: signAliasUpdate,
FindBaseByAlias: findBaseByAlias,
GetAlias: getAlias,
}, selfKeyDesc)
if err := gossiper.Start(); err != nil {
@ -1387,6 +1413,27 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
// NotifyWhenOffline methods. This should trigger a new attempt to send
// the message to the peer.
ctx.gossiper.Stop()
isAlias := func(lnwire.ShortChannelID) bool {
return false
}
signAliasUpdate := func(*lnwire.ChannelUpdate) (*ecdsa.Signature,
error) {
return nil, nil
}
findBaseByAlias := func(lnwire.ShortChannelID) (lnwire.ShortChannelID,
error) {
return lnwire.ShortChannelID{}, fmt.Errorf("no base scid")
}
getAlias := func(lnwire.ChannelID) (lnwire.ShortChannelID, error) {
return lnwire.ShortChannelID{}, fmt.Errorf("no peer alias")
}
gossiper := New(Config{
Notifier: ctx.gossiper.cfg.Notifier,
Broadcast: ctx.gossiper.cfg.Broadcast,
@ -1405,6 +1452,10 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
NumActiveSyncers: 3,
MinimumBatchSize: 10,
SubBatchDelay: time.Second * 5,
IsAlias: isAlias,
SignAliasUpdate: signAliasUpdate,
FindBaseByAlias: findBaseByAlias,
GetAlias: getAlias,
}, &keychain.KeyDescriptor{
PubKey: ctx.gossiper.selfKey,
KeyLocator: ctx.gossiper.selfKeyLoc,

View File

@ -16,6 +16,7 @@ import (
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcec/v2/ecdsa"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/connmgr"
@ -961,6 +962,10 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
PinnedSyncers: cfg.Gossip.PinnedSyncers,
MaxChannelUpdateBurst: cfg.Gossip.MaxChannelUpdateBurst,
ChannelUpdateInterval: cfg.Gossip.ChannelUpdateInterval,
IsAlias: aliasmgr.IsAlias,
SignAliasUpdate: s.signAliasUpdate,
FindBaseByAlias: s.aliasMgr.FindBaseSCID,
GetAlias: s.aliasMgr.GetPeerAlias,
}, nodeKeyDesc)
s.localChanMgr = &localchans.Manager{
@ -1505,6 +1510,20 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
return s, nil
}
// signAliasUpdate takes a ChannelUpdate and returns the signature. This is
// used for option_scid_alias channels where the ChannelUpdate to be sent back
// may differ from what is on disk.
func (s *server) signAliasUpdate(u *lnwire.ChannelUpdate) (*ecdsa.Signature,
error) {
data, err := u.DataToSign()
if err != nil {
return nil, err
}
return s.cc.MsgSigner.SignMessage(s.identityKeyLoc, data, true)
}
// createLivenessMonitor creates a set of health checks using our configured
// values and uses these checks to create a liveliness monitor. Available
// health checks,