Merge pull request #7648 from guggero/initial-forwarding-policy

funding: fix flake in itest caused by persistent fee param changes
This commit is contained in:
Oliver Gugger 2023-08-22 15:15:28 +02:00 committed by GitHub
commit e49f6fcdd3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 504 additions and 450 deletions

View file

@ -25,7 +25,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs/btcdnotify"
"github.com/lightningnetwork/lnd/chainntnfs/neutrinonotify"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/kvdb"
@ -195,7 +195,7 @@ type PartialChainControl struct {
ChainSource chain.Interface
// RoutingPolicy is the routing policy we have decided to use.
RoutingPolicy htlcswitch.ForwardingPolicy
RoutingPolicy models.ForwardingPolicy
// MinHtlcIn is the minimum HTLC we will accept.
MinHtlcIn lnwire.MilliSatoshi
@ -270,7 +270,7 @@ func NewPartialChainControl(cfg *Config) (*PartialChainControl, func(), error) {
switch cfg.PrimaryChain() {
case BitcoinChain:
cc.RoutingPolicy = htlcswitch.ForwardingPolicy{
cc.RoutingPolicy = models.ForwardingPolicy{
MinHTLCOut: cfg.Bitcoin.MinHTLCOut,
BaseFee: cfg.Bitcoin.BaseFee,
FeeRate: cfg.Bitcoin.FeeRate,
@ -282,7 +282,7 @@ func NewPartialChainControl(cfg *Config) (*PartialChainControl, func(), error) {
DefaultBitcoinStaticMinRelayFeeRate,
)
case LitecoinChain:
cc.RoutingPolicy = htlcswitch.ForwardingPolicy{
cc.RoutingPolicy = models.ForwardingPolicy{
MinHTLCOut: cfg.Litecoin.MinHTLCOut,
BaseFee: cfg.Litecoin.BaseFee,
FeeRate: cfg.Litecoin.FeeRate,

View file

@ -312,11 +312,6 @@ var (
// channelOpeningState for each channel that is currently in the process
// of being opened.
channelOpeningStateBucket = []byte("channelOpeningState")
// initialChannelFwdingPolicyBucket is the database bucket used to store
// the forwarding policy for each permanent channel that is currently
// in the process of being opened.
initialChannelFwdingPolicyBucket = []byte("initialChannelFwdingPolicy")
)
// DB is the primary datastore for the lnd daemon. The database stores
@ -1433,64 +1428,6 @@ func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint,
return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator)
}
// SaveInitialFwdingPolicy saves the serialized forwarding policy for the
// provided permanent channel id to the initialChannelFwdingPolicyBucket.
func (c *ChannelStateDB) SaveInitialFwdingPolicy(chanID,
forwardingPolicy []byte) error {
return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
bucket, err := tx.CreateTopLevelBucket(
initialChannelFwdingPolicyBucket,
)
if err != nil {
return err
}
return bucket.Put(chanID, forwardingPolicy)
}, func() {})
}
// GetInitialFwdingPolicy fetches the serialized forwarding policy for the
// provided channel id from the database, or returns ErrChannelNotFound if
// a forwarding policy for this channel id is not found.
func (c *ChannelStateDB) GetInitialFwdingPolicy(chanID []byte) ([]byte, error) {
var serializedState []byte
err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
bucket := tx.ReadBucket(initialChannelFwdingPolicyBucket)
if bucket == nil {
// If the bucket does not exist, it means we
// never added a channel fees to the db, so
// return ErrChannelNotFound.
return ErrChannelNotFound
}
stateBytes := bucket.Get(chanID)
if stateBytes == nil {
return ErrChannelNotFound
}
serializedState = append(serializedState, stateBytes...)
return nil
}, func() {
serializedState = nil
})
return serializedState, err
}
// DeleteInitialFwdingPolicy removes the forwarding policy for a given channel
// from the database.
func (c *ChannelStateDB) DeleteInitialFwdingPolicy(chanID []byte) error {
return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
bucket := tx.ReadWriteBucket(initialChannelFwdingPolicyBucket)
if bucket == nil {
return ErrChannelNotFound
}
return bucket.Delete(chanID)
}, func() {})
}
// SaveChannelOpeningState saves the serialized channel state for the provided
// chanPoint to the channelOpeningStateBucket.
func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,

View file

@ -0,0 +1,111 @@
package channeldb
import (
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
)
var (
// initialChannelForwardingPolicyBucket is the database bucket used to
// store the forwarding policy for each permanent channel that is
// currently in the process of being opened.
initialChannelForwardingPolicyBucket = []byte(
"initialChannelFwdingPolicy",
)
)
// SaveInitialForwardingPolicy saves the serialized forwarding policy for the
// provided permanent channel id to the initialChannelForwardingPolicyBucket.
func (c *ChannelStateDB) SaveInitialForwardingPolicy(chanID lnwire.ChannelID,
forwardingPolicy *models.ForwardingPolicy) error {
chanIDCopy := make([]byte, 32)
copy(chanIDCopy, chanID[:])
scratch := make([]byte, 36)
byteOrder.PutUint64(scratch[:8], uint64(forwardingPolicy.MinHTLCOut))
byteOrder.PutUint64(scratch[8:16], uint64(forwardingPolicy.MaxHTLC))
byteOrder.PutUint64(scratch[16:24], uint64(forwardingPolicy.BaseFee))
byteOrder.PutUint64(scratch[24:32], uint64(forwardingPolicy.FeeRate))
byteOrder.PutUint32(scratch[32:], forwardingPolicy.TimeLockDelta)
return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
bucket, err := tx.CreateTopLevelBucket(
initialChannelForwardingPolicyBucket,
)
if err != nil {
return err
}
return bucket.Put(chanIDCopy, scratch)
}, func() {})
}
// GetInitialForwardingPolicy fetches the serialized forwarding policy for the
// provided channel id from the database, or returns ErrChannelNotFound if
// a forwarding policy for this channel id is not found.
func (c *ChannelStateDB) GetInitialForwardingPolicy(
chanID lnwire.ChannelID) (*models.ForwardingPolicy, error) {
chanIDCopy := make([]byte, 32)
copy(chanIDCopy, chanID[:])
var forwardingPolicy *models.ForwardingPolicy
err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
bucket := tx.ReadBucket(initialChannelForwardingPolicyBucket)
if bucket == nil {
// If the bucket does not exist, it means we
// never added a channel fees to the db, so
// return ErrChannelNotFound.
return ErrChannelNotFound
}
stateBytes := bucket.Get(chanIDCopy)
if stateBytes == nil {
return ErrChannelNotFound
}
forwardingPolicy = &models.ForwardingPolicy{
MinHTLCOut: lnwire.MilliSatoshi(
byteOrder.Uint64(stateBytes[:8]),
),
MaxHTLC: lnwire.MilliSatoshi(
byteOrder.Uint64(stateBytes[8:16]),
),
BaseFee: lnwire.MilliSatoshi(
byteOrder.Uint64(stateBytes[16:24]),
),
FeeRate: lnwire.MilliSatoshi(
byteOrder.Uint64(stateBytes[24:32]),
),
TimeLockDelta: byteOrder.Uint32(stateBytes[32:36]),
}
return nil
}, func() {
forwardingPolicy = nil
})
return forwardingPolicy, err
}
// DeleteInitialForwardingPolicy removes the forwarding policy for a given
// channel from the database.
func (c *ChannelStateDB) DeleteInitialForwardingPolicy(
chanID lnwire.ChannelID) error {
chanIDCopy := make([]byte, 32)
copy(chanIDCopy, chanID[:])
return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
bucket := tx.ReadWriteBucket(
initialChannelForwardingPolicyBucket,
)
if bucket == nil {
return ErrChannelNotFound
}
return bucket.Delete(chanIDCopy)
}, func() {})
}

View file

@ -90,3 +90,41 @@ func (k *CircuitKey) Decode(r io.Reader) error {
func (k CircuitKey) String() string {
return fmt.Sprintf("(Chan ID=%s, HTLC ID=%d)", k.ChanID, k.HtlcID)
}
// ForwardingPolicy describes the set of constraints that a given ChannelLink
// is to adhere to when forwarding HTLC's. For each incoming HTLC, this set of
// constraints will be consulted in order to ensure that adequate fees are
// paid, and our time-lock parameters are respected. In the event that an
// incoming HTLC violates any of these constraints, it is to be _rejected_ with
// the error possibly carrying along a ChannelUpdate message that includes the
// latest policy.
type ForwardingPolicy struct {
// MinHTLCOut is the smallest HTLC that is to be forwarded.
MinHTLCOut lnwire.MilliSatoshi
// MaxHTLC is the largest HTLC that is to be forwarded.
MaxHTLC lnwire.MilliSatoshi
// BaseFee is the base fee, expressed in milli-satoshi that must be
// paid for each incoming HTLC. This field, combined with FeeRate is
// used to compute the required fee for a given HTLC.
BaseFee lnwire.MilliSatoshi
// FeeRate is the fee rate, expressed in milli-satoshi that must be
// paid for each incoming HTLC. This field combined with BaseFee is
// used to compute the required fee for a given HTLC.
FeeRate lnwire.MilliSatoshi
// TimeLockDelta is the absolute time-lock value, expressed in blocks,
// that will be subtracted from an incoming HTLC's timelock value to
// create the time-lock value for the forwarded outgoing HTLC. The
// following constraint MUST hold for an HTLC to be forwarded:
//
// * incomingHtlc.timeLock - timeLockDelta = fwdInfo.OutgoingCTLV
//
// where fwdInfo is the forwarding information extracted from the
// per-hop payload of the incoming HTLC's onion packet.
TimeLockDelta uint32
// TODO(roasbeef): add fee module inside of switch
}

View file

@ -20,8 +20,8 @@ import (
"github.com/lightningnetwork/lnd/chainreg"
"github.com/lightningnetwork/lnd/chanacceptor"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/labels"
@ -154,7 +154,7 @@ type reservationWithCtx struct {
chanAmt btcutil.Amount
// forwardingPolicy is the policy provided by the initFundingMsg.
forwardingPolicy htlcswitch.ForwardingPolicy
forwardingPolicy models.ForwardingPolicy
// Constraints we require for the remote.
remoteCsvDelay uint16
@ -386,6 +386,9 @@ type Config struct {
// so that the channel creation process can be completed.
Notifier chainntnfs.ChainNotifier
// ChannelDB is the database that keeps track of all channel state.
ChannelDB *channeldb.ChannelStateDB
// SignMessage signs an arbitrary message with a given public key. The
// actual digest signed is the double sha-256 of the message. In the
// case that the private key corresponding to the passed public key
@ -429,7 +432,7 @@ type Config struct {
// DefaultRoutingPolicy is the default routing policy used when
// initially announcing channels.
DefaultRoutingPolicy htlcswitch.ForwardingPolicy
DefaultRoutingPolicy models.ForwardingPolicy
// DefaultMinHtlcIn is the default minimum incoming htlc value that is
// set as a channel parameter.
@ -513,11 +516,6 @@ type Config struct {
// transition from pending open to open.
NotifyOpenChannelEvent func(wire.OutPoint)
// UpdateForwardingPolicies is used by the manager to update active
// links with a new policy.
UpdateForwardingPolicies func(
chanPolicies map[wire.OutPoint]htlcswitch.ForwardingPolicy)
// OpenChannelPredicate is a predicate on the lnwire.OpenChannel message
// and on the requesting node's public key that returns a bool which
// tells the funding manager whether or not to accept the channel.
@ -641,7 +639,7 @@ func (c channelOpeningState) String() string {
case markedOpen:
return "markedOpen"
case channelReadySent:
return "channelReady"
return "channelReadySent"
case addedToRouterGraph:
return "addedToRouterGraph"
default:
@ -698,7 +696,7 @@ func (f *Manager) start() error {
// down.
// TODO(roasbeef): store height that funding finished?
// * would then replace call below
allChannels, err := f.cfg.Wallet.Cfg.Database.FetchAllChannels()
allChannels, err := f.cfg.ChannelDB.FetchAllChannels()
if err != nil {
return err
}
@ -1153,79 +1151,9 @@ func (f *Manager) stateStep(channel *channeldb.OpenChannel,
return nil
}
var peerAlias *lnwire.ShortChannelID
if channel.IsZeroConf() {
// We'll need to wait until channel_ready has been
// received and the peer lets us know the alias they
// want to use for the channel. With this information,
// we can then construct a ChannelUpdate for them.
// If an alias does not yet exist, we'll just return,
// letting the next iteration of the loop check again.
var defaultAlias lnwire.ShortChannelID
chanID := lnwire.NewChanIDFromOutPoint(
&channel.FundingOutpoint,
)
foundAlias, _ := f.cfg.AliasManager.GetPeerAlias(
chanID,
)
if foundAlias == defaultAlias {
return nil
}
peerAlias = &foundAlias
}
err = f.addToRouterGraph(channel, shortChanID, peerAlias, nil)
if err != nil {
return fmt.Errorf("failed adding to "+
"router graph: %v", err)
}
// As the channel is now added to the ChannelRouter's topology,
// the channel is moved to the next state of the state machine.
// It will be moved to the last state (actually deleted from
// the database) after the channel is finally announced.
err = f.saveChannelOpeningState(
&channel.FundingOutpoint, addedToRouterGraph,
shortChanID,
return f.handleChannelReadyReceived(
channel, shortChanID, pendingChanID, updateChan,
)
if err != nil {
return fmt.Errorf("error setting channel state to"+
" addedToRouterGraph: %v", err)
}
log.Debugf("Channel(%v) with ShortChanID %v: successfully "+
"added to router graph", chanID, shortChanID)
// Give the caller a final update notifying them that
// the channel is now open.
// TODO(roasbeef): only notify after recv of channel_ready?
fundingPoint := channel.FundingOutpoint
cp := &lnrpc.ChannelPoint{
FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{
FundingTxidBytes: fundingPoint.Hash[:],
},
OutputIndex: fundingPoint.Index,
}
if updateChan != nil {
upd := &lnrpc.OpenStatusUpdate{
Update: &lnrpc.OpenStatusUpdate_ChanOpen{
ChanOpen: &lnrpc.ChannelOpenUpdate{
ChannelPoint: cp,
},
},
PendingChanId: pendingChanID[:],
}
select {
case updateChan <- upd:
case <-f.quit:
return ErrFundingManagerShuttingDown
}
}
return nil
// The channel was added to the Router's topology, but the channel
// announcement was not sent.
@ -1263,6 +1191,16 @@ func (f *Manager) stateStep(channel *channeldb.OpenChannel,
err)
}
// After the fee parameters have been stored in the
// announcement we can delete them from the database. For
// private channels we do not announce the channel policy to
// the network but still need to delete them from the database.
err = f.deleteInitialForwardingPolicy(chanID)
if err != nil {
log.Infof("Could not delete initial policy for chanId "+
"%x", chanID)
}
log.Debugf("Channel(%v) with ShortChanID %v: successfully "+
"announced", chanID, shortChanID)
@ -1396,7 +1334,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
// Also count the channels that are already pending. There we don't know
// the underlying intent anymore, unfortunately.
channels, err := f.cfg.Wallet.Cfg.Database.FetchOpenChannels(peerPubKey)
channels, err := f.cfg.ChannelDB.FetchOpenChannels(peerPubKey)
if err != nil {
f.failFundingFlow(peer, cid, err)
return
@ -1423,7 +1361,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
}
// Ensure that the pendingChansLimit is respected.
pendingChans, err := f.cfg.Wallet.Cfg.Database.FetchPendingChannels()
pendingChans, err := f.cfg.ChannelDB.FetchPendingChannels()
if err != nil {
f.failFundingFlow(peer, cid, err)
return
@ -1744,13 +1682,13 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
minHtlc = acceptorResp.MinHtlcIn
}
// If we are handling a FundingOpen request then we need to
// specify the default channel fees since they are not provided
// by the responder interactively.
forwardingPolicy := htlcswitch.ForwardingPolicy{
BaseFee: f.cfg.DefaultRoutingPolicy.BaseFee,
FeeRate: f.cfg.DefaultRoutingPolicy.FeeRate,
}
// If we are handling a FundingOpen request then we need to specify the
// default channel fees since they are not provided by the responder
// interactively.
ourContribution := reservation.OurContribution()
forwardingPolicy := f.defaultForwardingPolicy(
ourContribution.ChannelConstraints,
)
// Once the reservation has been created successfully, we add it to
// this peer's map of pending reservations to track this particular
@ -1762,7 +1700,7 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
resCtx := &reservationWithCtx{
reservation: reservation,
chanAmt: amt,
forwardingPolicy: forwardingPolicy,
forwardingPolicy: *forwardingPolicy,
remoteCsvDelay: remoteCsvDelay,
remoteMinHtlc: minHtlc,
remoteMaxValue: remoteMaxValue,
@ -1825,7 +1763,6 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
// With the initiator's contribution recorded, respond with our
// contribution in the next message of the workflow.
ourContribution := reservation.OurContribution()
fundingAccept := lnwire.AcceptChannel{
PendingChannelID: msg.PendingChannelID,
DustLimit: ourContribution.DustLimit,
@ -2370,7 +2307,7 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer,
// With a permanent channel id established we can save the respective
// forwarding policy in the database. In the channel announcement phase
// this forwarding policy is retrieved and applied.
err = f.saveInitialFwdingPolicy(cid.chanID, &forwardingPolicy)
err = f.saveInitialForwardingPolicy(cid.chanID, &forwardingPolicy)
if err != nil {
log.Errorf("Unable to store the forwarding policy: %v", err)
}
@ -2473,7 +2410,9 @@ func (f *Manager) handleFundingSigned(peer lnpeer.Peer,
// We have to store the forwardingPolicy before the reservation context
// is deleted. The policy will then be read and applied in
// newChanAnnouncement.
err = f.saveInitialFwdingPolicy(permChanID, &resCtx.forwardingPolicy)
err = f.saveInitialForwardingPolicy(
permChanID, &resCtx.forwardingPolicy,
)
if err != nil {
log.Errorf("Unable to store the forwarding policy: %v", err)
}
@ -3131,6 +3070,8 @@ func (f *Manager) receivedChannelReady(node *btcec.PublicKey,
return false, err
}
// If we cannot find the channel, then we haven't processed the
// remote's channelReady message.
channel, err := f.cfg.FindChannel(node, chanID)
if err != nil {
log.Errorf("Unable to locate ChannelID(%v) to determine if "+
@ -3138,7 +3079,18 @@ func (f *Manager) receivedChannelReady(node *btcec.PublicKey,
return false, err
}
return channel.RemoteNextRevocation != nil, nil
// If we haven't insert the next revocation point, we haven't finished
// processing the channel ready message.
if channel.RemoteNextRevocation == nil {
return false, nil
}
// Finally, the barrier signal is removed once we finish
// `handleChannelReady`. If we can still find the signal, we haven't
// finished processing it yet.
_, loaded := f.handleChannelReadyBarriers.Load(chanID)
return !loaded, nil
}
// extractAnnounceParams extracts the various channel announcement and update
@ -3242,28 +3194,6 @@ func (f *Manager) addToRouterGraph(completeChan *channeldb.OpenChannel,
return ErrFundingManagerShuttingDown
}
// The user can define non-default channel policies when opening a
// channel. They are stored in the database to be persisted from the
// moment of funding the channel to it being confirmed. We just
// announced those policies to the network, but we also need to update
// our local policy in the switch to make sure we can forward payments
// with the correct fees. We can't do this when creating the link
// initially as that only takes the static channel parameters.
updatedPolicy := map[wire.OutPoint]htlcswitch.ForwardingPolicy{
completeChan.FundingOutpoint: {
MinHTLCOut: ann.chanUpdateAnn.HtlcMinimumMsat,
MaxHTLC: ann.chanUpdateAnn.HtlcMaximumMsat,
BaseFee: lnwire.MilliSatoshi(
ann.chanUpdateAnn.BaseFee,
),
FeeRate: lnwire.MilliSatoshi(
ann.chanUpdateAnn.FeeRate,
),
TimeLockDelta: uint32(ann.chanUpdateAnn.TimeLockDelta),
},
}
f.cfg.UpdateForwardingPolicies(updatedPolicy)
return nil
}
@ -3309,15 +3239,6 @@ func (f *Manager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
return fmt.Errorf("unable to send node announcement "+
"to peer %x: %v", pubKey, err)
}
// For private channels we do not announce the channel policy
// to the network but still need to delete them from the
// database.
err = f.deleteInitialFwdingPolicy(chanID)
if err != nil {
log.Infof("Could not delete channel fees "+
"for chanId %x.", chanID)
}
} else {
// Otherwise, we'll wait until the funding transaction has
// reached 6 confirmations before announcing it.
@ -3707,6 +3628,13 @@ func (f *Manager) handleChannelReady(peer lnpeer.Peer,
}
}()
// Before we can add the channel to the peer, we'll need to ensure that
// we have an initial forwarding policy set.
if err := f.ensureInitialForwardingPolicy(chanID, channel); err != nil {
log.Errorf("Unable to ensure initial forwarding policy: %v",
err)
}
if err := peer.AddNewChannel(channel, f.quit); err != nil {
log.Errorf("Unable to add new channel %v with peer %x: %v",
channel.FundingOutpoint,
@ -3715,6 +3643,132 @@ func (f *Manager) handleChannelReady(peer lnpeer.Peer,
}
}
// handleChannelReadyReceived is called once the remote's channelReady message
// is received and processed. At this stage, we must have sent out our
// channelReady message, once the remote's channelReady is processed, the
// channel is now active, thus we change its state to `addedToRouterGraph` to
// let the channel start handling routing.
func (f *Manager) handleChannelReadyReceived(channel *channeldb.OpenChannel,
scid *lnwire.ShortChannelID, pendingChanID [32]byte,
updateChan chan<- *lnrpc.OpenStatusUpdate) error {
chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint)
var peerAlias *lnwire.ShortChannelID
if channel.IsZeroConf() {
// We'll need to wait until channel_ready has been received and
// the peer lets us know the alias they want to use for the
// channel. With this information, we can then construct a
// ChannelUpdate for them. If an alias does not yet exist,
// we'll just return, letting the next iteration of the loop
// check again.
var defaultAlias lnwire.ShortChannelID
chanID := lnwire.NewChanIDFromOutPoint(&channel.FundingOutpoint)
foundAlias, _ := f.cfg.AliasManager.GetPeerAlias(chanID)
if foundAlias == defaultAlias {
return nil
}
peerAlias = &foundAlias
}
err := f.addToRouterGraph(channel, scid, peerAlias, nil)
if err != nil {
return fmt.Errorf("failed adding to router graph: %w", err)
}
// As the channel is now added to the ChannelRouter's topology, the
// channel is moved to the next state of the state machine. It will be
// moved to the last state (actually deleted from the database) after
// the channel is finally announced.
err = f.saveChannelOpeningState(
&channel.FundingOutpoint, addedToRouterGraph, scid,
)
if err != nil {
return fmt.Errorf("error setting channel state to"+
" addedToRouterGraph: %w", err)
}
log.Debugf("Channel(%v) with ShortChanID %v: successfully "+
"added to router graph", chanID, scid)
// Give the caller a final update notifying them that the channel is
fundingPoint := channel.FundingOutpoint
cp := &lnrpc.ChannelPoint{
FundingTxid: &lnrpc.ChannelPoint_FundingTxidBytes{
FundingTxidBytes: fundingPoint.Hash[:],
},
OutputIndex: fundingPoint.Index,
}
if updateChan != nil {
upd := &lnrpc.OpenStatusUpdate{
Update: &lnrpc.OpenStatusUpdate_ChanOpen{
ChanOpen: &lnrpc.ChannelOpenUpdate{
ChannelPoint: cp,
},
},
PendingChanId: pendingChanID[:],
}
select {
case updateChan <- upd:
case <-f.quit:
return ErrFundingManagerShuttingDown
}
}
return nil
}
// ensureInitialForwardingPolicy ensures that we have an initial forwarding
// policy set for the given channel. If we don't, we'll fall back to the default
// values.
func (f *Manager) ensureInitialForwardingPolicy(chanID lnwire.ChannelID,
channel *channeldb.OpenChannel) error {
// Before we can add the channel to the peer, we'll need to ensure that
// we have an initial forwarding policy set. This should always be the
// case except for a channel that was created with lnd <= 0.15.5 and
// is still pending while updating to this version.
var needDBUpdate bool
forwardingPolicy, err := f.getInitialForwardingPolicy(chanID)
if err != nil {
log.Errorf("Unable to fetch initial forwarding policy, "+
"falling back to default values: %v", err)
forwardingPolicy = f.defaultForwardingPolicy(
channel.LocalChanCfg.ChannelConstraints,
)
needDBUpdate = true
}
// We only started storing the actual values for MinHTLCOut and MaxHTLC
// after 0.16.x, so if a channel was opened with such a version and is
// still pending while updating to this version, we'll need to set the
// values to the default values.
if forwardingPolicy.MinHTLCOut == 0 {
forwardingPolicy.MinHTLCOut = channel.LocalChanCfg.MinHTLC
needDBUpdate = true
}
if forwardingPolicy.MaxHTLC == 0 {
forwardingPolicy.MaxHTLC = channel.LocalChanCfg.MaxPendingAmount
needDBUpdate = true
}
// And finally, if we found that the values currently stored aren't
// sufficient for the link, we'll update the database.
if needDBUpdate {
err := f.saveInitialForwardingPolicy(chanID, forwardingPolicy)
if err != nil {
return fmt.Errorf("unable to update initial "+
"forwarding policy: %v", err)
}
}
return nil
}
// chanAnnouncement encapsulates the two authenticated announcements that we
// send out to the network after a new channel has been created locally.
type chanAnnouncement struct {
@ -3816,7 +3870,7 @@ func (f *Manager) newChanAnnouncement(localPubKey,
// forwarding policy to be announced. If no persisted initial policy
// values are found, then we will use the default policy values in the
// channel announcement.
storedFwdingPolicy, err := f.getInitialFwdingPolicy(chanID)
storedFwdingPolicy, err := f.getInitialForwardingPolicy(chanID)
if err != nil && !errors.Is(err, channeldb.ErrChannelNotFound) {
return nil, errors.Errorf("unable to generate channel "+
"update announcement: %v", err)
@ -3946,14 +4000,6 @@ func (f *Manager) announceChannel(localIDKey, remoteIDKey *btcec.PublicKey,
return err
}
// After the fee parameters have been stored in the announcement
// we can delete them from the database.
err = f.deleteInitialFwdingPolicy(chanID)
if err != nil {
log.Infof("Could not delete channel fees for chanId %x.",
chanID)
}
// We only send the channel proof announcement and the node announcement
// because addToRouterGraph previously sent the ChannelAnnouncement and
// the ChannelUpdate announcement messages. The channel proof and node
@ -4279,14 +4325,16 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
maxHtlcs = f.cfg.RequiredRemoteMaxHTLCs(capacity)
}
// Prepare the optional channel fee values from the initFundingMsg.
// If useBaseFee or useFeeRate are false the client did not
// provide fee values hence we assume default fee settings from
// the config.
forwardingPolicy := htlcswitch.ForwardingPolicy{
BaseFee: f.cfg.DefaultRoutingPolicy.BaseFee,
FeeRate: f.cfg.DefaultRoutingPolicy.FeeRate,
}
// Once the reservation has been created, and indexed, queue a funding
// request to the remote peer, kicking off the funding workflow.
ourContribution := reservation.OurContribution()
// Prepare the optional channel fee values from the initFundingMsg. If
// useBaseFee or useFeeRate are false the client did not provide fee
// values hence we assume default fee settings from the config.
forwardingPolicy := f.defaultForwardingPolicy(
ourContribution.ChannelConstraints,
)
if baseFee != nil {
forwardingPolicy.BaseFee = lnwire.MilliSatoshi(*baseFee)
}
@ -4295,10 +4343,6 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
forwardingPolicy.FeeRate = lnwire.MilliSatoshi(*feeRate)
}
// Once the reservation has been created, and indexed, queue a funding
// request to the remote peer, kicking off the funding workflow.
ourContribution := reservation.OurContribution()
// Fetch our dust limit which is part of the default channel
// constraints, and log it.
ourDustLimit := ourContribution.DustLimit
@ -4324,7 +4368,7 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
resCtx := &reservationWithCtx{
chanAmt: capacity,
forwardingPolicy: forwardingPolicy,
forwardingPolicy: *forwardingPolicy,
remoteCsvDelay: remoteCsvDelay,
remoteMinHtlc: minHtlcIn,
remoteMaxValue: maxValue,
@ -4618,65 +4662,43 @@ func copyPubKey(pub *btcec.PublicKey) *btcec.PublicKey {
return btcec.NewPublicKey(&tmp.X, &tmp.Y)
}
// saveInitialFwdingPolicy saves the forwarding policy for the provided
// defaultForwardingPolicy returns the default forwarding policy based on the
// default routing policy and our local channel constraints.
func (f *Manager) defaultForwardingPolicy(
constraints channeldb.ChannelConstraints) *models.ForwardingPolicy {
return &models.ForwardingPolicy{
MinHTLCOut: constraints.MinHTLC,
MaxHTLC: constraints.MaxPendingAmount,
BaseFee: f.cfg.DefaultRoutingPolicy.BaseFee,
FeeRate: f.cfg.DefaultRoutingPolicy.FeeRate,
TimeLockDelta: f.cfg.DefaultRoutingPolicy.TimeLockDelta,
}
}
// saveInitialForwardingPolicy saves the forwarding policy for the provided
// chanPoint in the channelOpeningStateBucket.
func (f *Manager) saveInitialFwdingPolicy(permChanID lnwire.ChannelID,
forwardingPolicy *htlcswitch.ForwardingPolicy) error {
func (f *Manager) saveInitialForwardingPolicy(chanID lnwire.ChannelID,
forwardingPolicy *models.ForwardingPolicy) error {
chanID := make([]byte, 32)
copy(chanID, permChanID[:])
scratch := make([]byte, 36)
byteOrder.PutUint64(scratch[:8], uint64(forwardingPolicy.MinHTLCOut))
byteOrder.PutUint64(scratch[8:16], uint64(forwardingPolicy.MaxHTLC))
byteOrder.PutUint64(scratch[16:24], uint64(forwardingPolicy.BaseFee))
byteOrder.PutUint64(scratch[24:32], uint64(forwardingPolicy.FeeRate))
byteOrder.PutUint32(scratch[32:], forwardingPolicy.TimeLockDelta)
return f.cfg.Wallet.Cfg.Database.SaveInitialFwdingPolicy(
chanID, scratch,
return f.cfg.ChannelDB.SaveInitialForwardingPolicy(
chanID, forwardingPolicy,
)
}
// getInitialFwdingPolicy fetches the initial forwarding policy for a given
// getInitialForwardingPolicy fetches the initial forwarding policy for a given
// channel id from the database which will be applied during the channel
// announcement phase.
func (f *Manager) getInitialFwdingPolicy(permChanID lnwire.ChannelID) (
*htlcswitch.ForwardingPolicy, error) {
func (f *Manager) getInitialForwardingPolicy(
chanID lnwire.ChannelID) (*models.ForwardingPolicy, error) {
chanID := make([]byte, 32)
copy(chanID, permChanID[:])
value, err := f.cfg.Wallet.Cfg.Database.GetInitialFwdingPolicy(chanID)
if err != nil {
return nil, err
}
var fwdingPolicy htlcswitch.ForwardingPolicy
fwdingPolicy.MinHTLCOut = lnwire.MilliSatoshi(
byteOrder.Uint64(value[:8]),
)
fwdingPolicy.MaxHTLC = lnwire.MilliSatoshi(
byteOrder.Uint64(value[8:16]),
)
fwdingPolicy.BaseFee = lnwire.MilliSatoshi(
byteOrder.Uint64(value[16:24]),
)
fwdingPolicy.FeeRate = lnwire.MilliSatoshi(
byteOrder.Uint64(value[24:32]),
)
fwdingPolicy.TimeLockDelta = byteOrder.Uint32(value[32:36])
return &fwdingPolicy, nil
return f.cfg.ChannelDB.GetInitialForwardingPolicy(chanID)
}
// deleteInitialFwdingPolicy removes channel fees for this chanID from
// deleteInitialForwardingPolicy removes channel fees for this chanID from
// the database.
func (f *Manager) deleteInitialFwdingPolicy(permChanID lnwire.ChannelID) error {
chanID := make([]byte, 32)
copy(chanID, permChanID[:])
return f.cfg.Wallet.Cfg.Database.DeleteInitialFwdingPolicy(chanID)
func (f *Manager) deleteInitialForwardingPolicy(chanID lnwire.ChannelID) error {
return f.cfg.ChannelDB.DeleteInitialForwardingPolicy(chanID)
}
// saveChannelOpeningState saves the channelOpeningState for the provided
@ -4694,7 +4716,8 @@ func (f *Manager) saveChannelOpeningState(chanPoint *wire.OutPoint,
scratch := make([]byte, 10)
byteOrder.PutUint16(scratch[:2], uint16(state))
byteOrder.PutUint64(scratch[2:], shortChanID.ToUint64())
return f.cfg.Wallet.Cfg.Database.SaveChannelOpeningState(
return f.cfg.ChannelDB.SaveChannelOpeningState(
outpointBytes.Bytes(), scratch,
)
}
@ -4710,7 +4733,7 @@ func (f *Manager) getChannelOpeningState(chanPoint *wire.OutPoint) (
return 0, nil, err
}
value, err := f.cfg.Wallet.Cfg.Database.GetChannelOpeningState(
value, err := f.cfg.ChannelDB.GetChannelOpeningState(
outpointBytes.Bytes(),
)
if err != nil {
@ -4729,7 +4752,7 @@ func (f *Manager) deleteChannelOpeningState(chanPoint *wire.OutPoint) error {
return err
}
return f.cfg.Wallet.Cfg.Database.DeleteChannelOpeningState(
return f.cfg.ChannelDB.DeleteChannelOpeningState(
outpointBytes.Bytes(),
)
}

View file

@ -7,6 +7,7 @@ import (
"fmt"
"net"
"path/filepath"
"reflect"
"runtime"
"strings"
"testing"
@ -22,15 +23,16 @@ import (
"github.com/lightningnetwork/lnd/chainreg"
acpt "github.com/lightningnetwork/lnd/chanacceptor"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lncfg"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lntest/mock"
"github.com/lightningnetwork/lnd/lntest/wait"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwire"
@ -250,7 +252,6 @@ type testNode struct {
testDir string
shutdownChannel chan struct{}
reportScidChan chan struct{}
updatedPolicies chan map[wire.OutPoint]htlcswitch.ForwardingPolicy
localFeatures []lnwire.FeatureBit
remoteFeatures []lnwire.FeatureBit
@ -379,9 +380,6 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
publTxChan := make(chan *wire.MsgTx, 1)
shutdownChan := make(chan struct{})
reportScidChan := make(chan struct{})
updatedPolicies := make(
chan map[wire.OutPoint]htlcswitch.ForwardingPolicy, 1,
)
wc := &mock.WalletController{
RootKey: alicePrivKey,
@ -432,6 +430,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
IDKeyLoc: testKeyLoc,
Wallet: lnw,
Notifier: chainNotifier,
ChannelDB: cdb,
FeeEstimator: estimator,
SignMessage: func(_ keychain.KeyLocator,
_ []byte, _ bool) (*ecdsa.Signature, error) {
@ -476,7 +475,7 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
return nil, fmt.Errorf("unable to find channel")
},
DefaultRoutingPolicy: htlcswitch.ForwardingPolicy{
DefaultRoutingPolicy: models.ForwardingPolicy{
MinHTLCOut: 5,
BaseFee: 100,
FeeRate: 1000,
@ -538,11 +537,6 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
return nil, nil
},
AliasManager: aliasMgr,
UpdateForwardingPolicies: func(
p map[wire.OutPoint]htlcswitch.ForwardingPolicy) {
updatedPolicies <- p
},
}
for _, op := range options {
@ -567,7 +561,6 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
testDir: tempTestDir,
shutdownChannel: shutdownChan,
reportScidChan: reportScidChan,
updatedPolicies: updatedPolicies,
addr: addr,
}
@ -601,6 +594,7 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
IDKeyLoc: oldCfg.IDKeyLoc,
Wallet: oldCfg.Wallet,
Notifier: oldCfg.Notifier,
ChannelDB: oldCfg.ChannelDB,
FeeEstimator: oldCfg.FeeEstimator,
SignMessage: func(_ keychain.KeyLocator,
_ []byte, _ bool) (*ecdsa.Signature, error) {
@ -631,7 +625,7 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
},
TempChanIDSeed: oldCfg.TempChanIDSeed,
FindChannel: oldCfg.FindChannel,
DefaultRoutingPolicy: htlcswitch.ForwardingPolicy{
DefaultRoutingPolicy: models.ForwardingPolicy{
MinHTLCOut: 5,
BaseFee: 100,
FeeRate: 1000,
@ -647,12 +641,11 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
UpdateLabel: func(chainhash.Hash, string) error {
return nil
},
ZombieSweeperInterval: oldCfg.ZombieSweeperInterval,
ReservationTimeout: oldCfg.ReservationTimeout,
OpenChannelPredicate: chainedAcceptor,
DeleteAliasEdge: oldCfg.DeleteAliasEdge,
AliasManager: oldCfg.AliasManager,
UpdateForwardingPolicies: oldCfg.UpdateForwardingPolicies,
ZombieSweeperInterval: oldCfg.ZombieSweeperInterval,
ReservationTimeout: oldCfg.ReservationTimeout,
OpenChannelPredicate: chainedAcceptor,
DeleteAliasEdge: oldCfg.DeleteAliasEdge,
AliasManager: oldCfg.AliasManager,
})
require.NoError(t, err, "failed recreating aliceFundingManager")
@ -1141,21 +1134,6 @@ func assertChannelAnnouncements(t *testing.T, alice, bob *testNode,
}
}
// At this point we should also have gotten a policy update that
// was sent to the switch subsystem. Make sure it contains the
// same values.
var policyUpdate htlcswitch.ForwardingPolicy
select {
case policyUpdateMap := <-node.updatedPolicies:
require.Len(t, policyUpdateMap, 1)
for _, policy := range policyUpdateMap {
policyUpdate = policy
}
case <-time.After(time.Second * 5):
t.Fatalf("node didn't send policy update")
}
gotChannelAnnouncement := false
gotChannelUpdate := false
for _, msg := range announcements {
@ -1184,34 +1162,24 @@ func assertChannelAnnouncements(t *testing.T, alice, bob *testNode,
minHtlc = customMinHtlc[j]
}
require.Equal(t, minHtlc, m.HtlcMinimumMsat)
require.Equal(
t, minHtlc, policyUpdate.MinHTLCOut,
)
// We might expect a custom MaxHltc value.
if len(customMaxHtlc) > 0 {
maxHtlc = customMaxHtlc[j]
}
require.Equal(t, maxHtlc, m.HtlcMaximumMsat)
require.Equal(t, maxHtlc, policyUpdate.MaxHTLC)
// We might expect a custom baseFee value.
if len(baseFees) > 0 {
baseFee = baseFees[j]
}
require.EqualValues(t, baseFee, m.BaseFee)
require.EqualValues(
t, baseFee, policyUpdate.BaseFee,
)
// We might expect a custom feeRate value.
if len(feeRates) > 0 {
feeRate = feeRates[j]
}
require.EqualValues(t, feeRate, m.FeeRate)
require.EqualValues(
t, feeRate, policyUpdate.FeeRate,
)
gotChannelUpdate = true
}
@ -1299,13 +1267,12 @@ func assertNoChannelState(t *testing.T, alice, bob *testNode,
}
func assertNoFwdingPolicy(t *testing.T, alice, bob *testNode,
fundingOutPoint *wire.OutPoint) {
chanID lnwire.ChannelID) {
t.Helper()
chandID := lnwire.NewChanIDFromOutPoint(fundingOutPoint)
assertInitialFwdingPolicyNotFound(t, alice, &chandID)
assertInitialFwdingPolicyNotFound(t, bob, &chandID)
assertInitialFwdingPolicyNotFound(t, alice, &chanID)
assertInitialFwdingPolicyNotFound(t, bob, &chanID)
}
func assertErrChannelNotFound(t *testing.T, node *testNode,
@ -1338,42 +1305,46 @@ func assertInitialFwdingPolicyNotFound(t *testing.T, node *testNode,
t.Helper()
var fwdingPolicy *htlcswitch.ForwardingPolicy
var err error
for i := 0; i < testPollNumTries; i++ {
// If this is not the first try, sleep before retrying.
if i > 0 {
time.Sleep(testPollSleepMs * time.Millisecond)
err := wait.NoError(func() error {
_, err := node.fundingMgr.getInitialForwardingPolicy(*chanID)
if errors.Is(err, channeldb.ErrChannelNotFound) {
return nil
}
fwdingPolicy, err = node.fundingMgr.getInitialFwdingPolicy(
*chanID,
)
require.ErrorIs(t, err, channeldb.ErrChannelNotFound)
// Got expected result, return with success.
return
}
return fmt.Errorf("expected ErrChannelNotFound, got %w", err)
}, wait.DefaultTimeout)
// 10 tries without success.
t.Fatalf("expected to not find a forwarding policy, found policy %v",
fwdingPolicy)
require.NoError(t, err)
}
func assertHandleChannelReady(t *testing.T, alice, bob *testNode) {
func assertHandleChannelReady(t *testing.T, alice, bob *testNode,
checkChannel ...func(node *testNode, msg *newChannelMsg) bool) {
t.Helper()
const timeout = time.Second * 15
// They should both send the new channel state to their peer.
select {
case c := <-alice.newChannels:
for _, check := range checkChannel {
require.NoError(t, wait.Predicate(func() bool {
return check(alice, c)
}, timeout))
}
close(c.err)
case <-time.After(time.Second * 15):
case <-time.After(timeout):
t.Fatalf("alice did not send new channel to peer")
}
select {
case c := <-bob.newChannels:
for _, check := range checkChannel {
require.NoError(t, wait.Predicate(func() bool {
return check(bob, c)
}, timeout))
}
close(c.err)
case <-time.After(time.Second * 15):
case <-time.After(timeout):
t.Fatalf("bob did not send new channel to peer")
}
}
@ -1469,7 +1440,7 @@ func TestFundingManagerNormalWorkflow(t *testing.T) {
// The forwarding policy for the channel announcement should
// have been deleted from the database, as the channel is announced.
assertNoFwdingPolicy(t, alice, bob, fundingOutPoint)
assertNoFwdingPolicy(t, alice, bob, channelReadyAlice.ChanID)
}
// TestFundingManagerRejectCSV tests checking of local CSV values against our
@ -1782,7 +1753,7 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
// The forwarding policy for the channel announcement should
// have been deleted from the database, as the channel is announced.
assertNoFwdingPolicy(t, alice, bob, fundingOutPoint)
assertNoFwdingPolicy(t, alice, bob, channelReadyAlice.ChanID)
}
// TestFundingManagerOfflinePeer checks that the fundingManager waits for the
@ -1945,7 +1916,7 @@ func TestFundingManagerOfflinePeer(t *testing.T) {
// The forwarding policy for the channel announcement should
// have been deleted from the database, as the channel is announced.
assertNoFwdingPolicy(t, alice, bob, fundingOutPoint)
assertNoFwdingPolicy(t, alice, bob, channelReadyAlice.ChanID)
}
// TestFundingManagerPeerTimeoutAfterInitFunding checks that the zombie sweeper
@ -2403,7 +2374,7 @@ func TestFundingManagerReceiveChannelReadyTwice(t *testing.T) {
// The forwarding policy for the channel announcement should
// have been deleted from the database, as the channel is announced.
assertNoFwdingPolicy(t, alice, bob, fundingOutPoint)
assertNoFwdingPolicy(t, alice, bob, channelReadyAlice.ChanID)
}
// TestFundingManagerRestartAfterChanAnn checks that the fundingManager properly
@ -2502,7 +2473,7 @@ func TestFundingManagerRestartAfterChanAnn(t *testing.T) {
// The forwarding policy for the channel announcement should
// have been deleted from the database, as the channel is announced.
assertNoFwdingPolicy(t, alice, bob, fundingOutPoint)
assertNoFwdingPolicy(t, alice, bob, channelReadyAlice.ChanID)
}
// TestFundingManagerRestartAfterReceivingChannelReady checks that the
@ -2597,7 +2568,7 @@ func TestFundingManagerRestartAfterReceivingChannelReady(t *testing.T) {
// The forwarding policy for the channel announcement should
// have been deleted from the database, as the channel is announced.
assertNoFwdingPolicy(t, alice, bob, fundingOutPoint)
assertNoFwdingPolicy(t, alice, bob, channelReadyAlice.ChanID)
}
// TestFundingManagerPrivateChannel tests that if we open a private channel
@ -2721,7 +2692,7 @@ func TestFundingManagerPrivateChannel(t *testing.T) {
// The forwarding policy for the channel announcement should
// have been deleted from the database.
assertNoFwdingPolicy(t, alice, bob, fundingOutPoint)
assertNoFwdingPolicy(t, alice, bob, channelReadyAlice.ChanID)
}
// TestFundingManagerPrivateRestart tests that the privacy guarantees granted
@ -2870,7 +2841,7 @@ func TestFundingManagerPrivateRestart(t *testing.T) {
// The forwarding policy for the channel announcement should
// have been deleted from the database.
assertNoFwdingPolicy(t, alice, bob, fundingOutPoint)
assertNoFwdingPolicy(t, alice, bob, channelReadyAlice.ChanID)
}
// TestFundingManagerCustomChannelParameters checks that custom requirements we
@ -2890,12 +2861,12 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
const fundingAmt = 5000000
const chanReserve = 100000
// Use custom channel fees.
// These will show up in the channel reservation context
var baseFee uint64
var feeRate uint64
baseFee = 42
feeRate = 1337
// Use custom channel fees. These will show up in the channel
// reservation context.
var (
baseFee uint64 = 42
feeRate uint64 = 1337
)
// We will consume the channel updates as we go, so no buffering is
// needed.
@ -3074,7 +3045,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
// Helper method for checking baseFee and feeRate stored for a
// reservation.
assertFees := func(forwardingPolicy *htlcswitch.ForwardingPolicy,
assertFees := func(forwardingPolicy *models.ForwardingPolicy,
baseFee, feeRate lnwire.MilliSatoshi) error {
if forwardingPolicy.BaseFee != baseFee {
@ -3171,13 +3142,13 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
// After the funding is signed and before the channel announcement
// we expect Alice and Bob to store their respective fees in the
// database.
forwardingPolicy, err := alice.fundingMgr.getInitialFwdingPolicy(
forwardingPolicy, err := alice.fundingMgr.getInitialForwardingPolicy(
fundingSigned.ChanID,
)
require.NoError(t, err)
require.NoError(t, assertFees(forwardingPolicy, 42, 1337))
forwardingPolicy, err = bob.fundingMgr.getInitialFwdingPolicy(
forwardingPolicy, err = bob.fundingMgr.getInitialForwardingPolicy(
fundingSigned.ChanID,
)
require.NoError(t, err)
@ -3216,10 +3187,6 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
alice.fundingMgr.ProcessFundingMsg(channelReadyBob, bob)
bob.fundingMgr.ProcessFundingMsg(channelReadyAlice, alice)
// Check that they notify the breach arbiter and peer about the new
// channel.
assertHandleChannelReady(t, alice, bob)
// Make sure both fundingManagers send the expected channel
// announcements.
// Alice should advertise the default MinHTLC value of
@ -3236,15 +3203,45 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
// Alice should have custom fees set whereas Bob should see his
// configured default fees announced.
defaultDelta := bob.fundingMgr.cfg.DefaultRoutingPolicy.TimeLockDelta
defaultBaseFee := bob.fundingMgr.cfg.DefaultRoutingPolicy.BaseFee
defaultFeerate := bob.fundingMgr.cfg.DefaultRoutingPolicy.FeeRate
defaultFeeRate := bob.fundingMgr.cfg.DefaultRoutingPolicy.FeeRate
baseFees := []lnwire.MilliSatoshi{
lnwire.MilliSatoshi(baseFee), defaultBaseFee,
}
feeRates := []lnwire.MilliSatoshi{
lnwire.MilliSatoshi(feeRate), defaultFeerate,
lnwire.MilliSatoshi(feeRate), defaultFeeRate,
}
// Check that they notify the breach arbiter and peer about the new
// channel.
assertHandleChannelReady(
t, alice, bob, func(node *testNode, msg *newChannelMsg) bool {
aliceDB := alice.fundingMgr.cfg.ChannelDB
chanID := lnwire.NewChanIDFromOutPoint(
&msg.channel.FundingOutpoint,
)
if node == alice {
p, err := aliceDB.GetInitialForwardingPolicy(
chanID,
)
require.NoError(t, err)
return reflect.DeepEqual(
p, &models.ForwardingPolicy{
MaxHTLC: maxHtlcArr[0],
MinHTLCOut: minHtlcArr[0],
BaseFee: baseFees[0],
FeeRate: feeRates[0],
TimeLockDelta: defaultDelta,
},
)
}
return true
},
)
assertChannelAnnouncements(
t, alice, bob, capacity, minHtlcArr, maxHtlcArr, baseFees,
feeRates,
@ -3267,18 +3264,7 @@ func TestFundingManagerCustomChannelParameters(t *testing.T) {
// After the announcement we expect Alice and Bob to have cleared
// the fees for the channel from the database.
_, err = alice.fundingMgr.getInitialFwdingPolicy(fundingSigned.ChanID)
if err != channeldb.ErrChannelNotFound {
err = fmt.Errorf("channel fees were expected to be deleted" +
" but were not")
t.Fatal(err)
}
_, err = bob.fundingMgr.getInitialFwdingPolicy(fundingSigned.ChanID)
if err != channeldb.ErrChannelNotFound {
err = fmt.Errorf("channel fees were expected to be deleted" +
" but were not")
t.Fatal(err)
}
assertNoFwdingPolicy(t, alice, bob, channelReadyAlice.ChanID)
}
// TestFundingManagerInvalidChanReserve ensures proper validation is done on
@ -4473,7 +4459,7 @@ func TestFundingManagerZeroConf(t *testing.T) {
// The forwarding policy for the channel announcement should
// have been deleted from the database, as the channel is announced.
assertNoFwdingPolicy(t, alice, bob, fundingOp)
assertNoFwdingPolicy(t, alice, bob, aliceChannelReady.ChanID)
}
// TestCommitmentTypeFundmaxSanityCheck was introduced as a way of reminding

View file

@ -191,7 +191,7 @@ type ChannelLink interface {
// UpdateForwardingPolicy updates the forwarding policy for the target
// ChannelLink. Once updated, the link will use the new forwarding
// policy to govern if it an incoming HTLC should be forwarded or not.
UpdateForwardingPolicy(ForwardingPolicy)
UpdateForwardingPolicy(models.ForwardingPolicy)
// CheckHtlcForward should return a nil error if the passed HTLC details
// satisfy the current forwarding policy fo the target link. Otherwise,

View file

@ -64,44 +64,6 @@ const (
DefaultMaxLinkFeeAllocation float64 = 0.5
)
// ForwardingPolicy describes the set of constraints that a given ChannelLink
// is to adhere to when forwarding HTLC's. For each incoming HTLC, this set of
// constraints will be consulted in order to ensure that adequate fees are
// paid, and our time-lock parameters are respected. In the event that an
// incoming HTLC violates any of these constraints, it is to be _rejected_ with
// the error possibly carrying along a ChannelUpdate message that includes the
// latest policy.
type ForwardingPolicy struct {
// MinHTLC is the smallest HTLC that is to be forwarded.
MinHTLCOut lnwire.MilliSatoshi
// MaxHTLC is the largest HTLC that is to be forwarded.
MaxHTLC lnwire.MilliSatoshi
// BaseFee is the base fee, expressed in milli-satoshi that must be
// paid for each incoming HTLC. This field, combined with FeeRate is
// used to compute the required fee for a given HTLC.
BaseFee lnwire.MilliSatoshi
// FeeRate is the fee rate, expressed in milli-satoshi that must be
// paid for each incoming HTLC. This field combined with BaseFee is
// used to compute the required fee for a given HTLC.
FeeRate lnwire.MilliSatoshi
// TimeLockDelta is the absolute time-lock value, expressed in blocks,
// that will be subtracted from an incoming HTLC's timelock value to
// create the time-lock value for the forwarded outgoing HTLC. The
// following constraint MUST hold for an HTLC to be forwarded:
//
// * incomingHtlc.timeLock - timeLockDelta = fwdInfo.OutgoingCTLV
//
// where fwdInfo is the forwarding information extracted from the
// per-hop payload of the incoming HTLC's onion packet.
TimeLockDelta uint32
// TODO(roasbeef): add fee module inside of switch
}
// ExpectedFee computes the expected fee for a given htlc amount. The value
// returned from this function is to be used as a sanity check when forwarding
// HTLC's to ensure that an incoming HTLC properly adheres to our propagated
@ -109,7 +71,7 @@ type ForwardingPolicy struct {
//
// TODO(roasbeef): also add in current available channel bandwidth, inverse
// func
func ExpectedFee(f ForwardingPolicy,
func ExpectedFee(f models.ForwardingPolicy,
htlcAmt lnwire.MilliSatoshi) lnwire.MilliSatoshi {
return f.BaseFee + (htlcAmt*f.FeeRate)/1000000
@ -123,7 +85,7 @@ type ChannelLinkConfig struct {
// deciding whether to forwarding incoming HTLC's or not. This value
// can be updated with subsequent calls to UpdateForwardingPolicy
// targeted at a given ChannelLink concrete interface implementation.
FwrdingPolicy ForwardingPolicy
FwrdingPolicy models.ForwardingPolicy
// Circuits provides restricted access to the switch's circuit map,
// allowing the link to open and close circuits.
@ -2522,7 +2484,9 @@ func (l *channelLink) AttachMailBox(mailbox MailBox) {
// update all of the link's FwrdingPolicy's values.
//
// NOTE: Part of the ChannelLink interface.
func (l *channelLink) UpdateForwardingPolicy(newPolicy ForwardingPolicy) {
func (l *channelLink) UpdateForwardingPolicy(
newPolicy models.ForwardingPolicy) {
l.Lock()
defer l.Unlock()
@ -2627,7 +2591,7 @@ func (l *channelLink) CheckHtlcTransit(payHash [32]byte,
// canSendHtlc checks whether the given htlc parameters satisfy
// the channel's amount and time lock constraints.
func (l *channelLink) canSendHtlc(policy ForwardingPolicy,
func (l *channelLink) canSendHtlc(policy models.ForwardingPolicy,
payHash [32]byte, amt lnwire.MilliSatoshi, timeout uint32,
heightNow uint32, originalScid lnwire.ShortChannelID) *LinkError {

View file

@ -23,6 +23,7 @@ import (
sphinx "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch/hodl"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
@ -1925,7 +1926,7 @@ func newSingleLinkTestHarness(t *testing.T, chanAmt, chanReserve btcutil.Amount)
sentMsgs: make(chan lnwire.Message, 2000),
quit: make(chan struct{}),
}
globalPolicy = ForwardingPolicy{
globalPolicy = models.ForwardingPolicy{
MinHTLCOut: lnwire.NewMSatFromSatoshis(5),
MaxHTLC: lnwire.NewMSatFromSatoshis(chanAmt),
BaseFee: lnwire.NewMSatFromSatoshis(1),
@ -4374,7 +4375,7 @@ func (h *persistentLinkHarness) restartLink(
quit: make(chan struct{}),
}
globalPolicy = ForwardingPolicy{
globalPolicy = models.ForwardingPolicy{
MinHTLCOut: lnwire.NewMSatFromSatoshis(5),
BaseFee: lnwire.NewMSatFromSatoshis(1),
TimeLockDelta: 6,
@ -5630,7 +5631,7 @@ func TestExpectedFee(t *testing.T) {
}
for _, test := range testCases {
f := ForwardingPolicy{
f := models.ForwardingPolicy{
BaseFee: test.baseFee,
FeeRate: test.feeRate,
}
@ -5716,7 +5717,7 @@ func TestCheckHtlcForward(t *testing.T) {
link := channelLink{
cfg: ChannelLinkConfig{
FwrdingPolicy: ForwardingPolicy{
FwrdingPolicy: models.ForwardingPolicy{
TimeLockDelta: 20,
MinHTLCOut: 500,
MaxHTLC: 1000,

View file

@ -840,7 +840,7 @@ func (f *mockChannelLink) getDustClosure() dustClosure {
func (f *mockChannelLink) HandleChannelUpdate(lnwire.Message) {
}
func (f *mockChannelLink) UpdateForwardingPolicy(_ ForwardingPolicy) {
func (f *mockChannelLink) UpdateForwardingPolicy(_ models.ForwardingPolicy) {
}
func (f *mockChannelLink) CheckHtlcForward([32]byte, lnwire.MilliSatoshi,
lnwire.MilliSatoshi, uint32, uint32, uint32,

View file

@ -611,7 +611,7 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, attemptID uint64,
// forwarding policies for all links have been updated, or the switch shuts
// down.
func (s *Switch) UpdateForwardingPolicies(
chanPolicies map[wire.OutPoint]ForwardingPolicy) {
chanPolicies map[wire.OutPoint]models.ForwardingPolicy) {
log.Tracef("Updating link policies: %v", newLogClosure(func() string {
return spew.Sdump(chanPolicies)

View file

@ -23,6 +23,7 @@ import (
"github.com/go-errors/errors"
sphinx "github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/htlcswitch/hop"
"github.com/lightningnetwork/lnd/input"
@ -1069,7 +1070,7 @@ func createTwoClusterChannels(t *testing.T, aliceToBob,
// hopNetwork is the base struct for two and three hop networks
type hopNetwork struct {
feeEstimator *mockFeeEstimator
globalPolicy ForwardingPolicy
globalPolicy models.ForwardingPolicy
obfuscator hop.ErrorEncrypter
defaultDelta uint32
@ -1078,7 +1079,7 @@ type hopNetwork struct {
func newHopNetwork() *hopNetwork {
defaultDelta := uint32(6)
globalPolicy := ForwardingPolicy{
globalPolicy := models.ForwardingPolicy{
MinHTLCOut: lnwire.NewMSatFromSatoshis(5),
BaseFee: lnwire.NewMSatFromSatoshis(1),
TimeLockDelta: defaultDelta,

View file

@ -21,6 +21,7 @@ import (
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/channelnotifier"
"github.com/lightningnetwork/lnd/contractcourt"
"github.com/lightningnetwork/lnd/discovery"
@ -234,7 +235,7 @@ type Config struct {
// RoutingPolicy is used to set the forwarding policy for links created by
// the Brontide.
RoutingPolicy htlcswitch.ForwardingPolicy
RoutingPolicy models.ForwardingPolicy
// Sphinx is used when setting up ChannelLinks so they can decode sphinx
// onion blobs.
@ -877,9 +878,9 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
// If we don't yet have an advertised routing policy, then
// we'll use the current default, otherwise we'll translate the
// routing policy into a forwarding policy.
var forwardingPolicy *htlcswitch.ForwardingPolicy
var forwardingPolicy *models.ForwardingPolicy
if selfPolicy != nil {
forwardingPolicy = &htlcswitch.ForwardingPolicy{
forwardingPolicy = &models.ForwardingPolicy{
MinHTLCOut: selfPolicy.MinHTLC,
MaxHTLC: selfPolicy.MaxHTLC,
BaseFee: selfPolicy.FeeBaseMSat,
@ -934,7 +935,7 @@ func (p *Brontide) loadActiveChannels(chans []*channeldb.OpenChannel) (
// addLink creates and adds a new ChannelLink from the specified channel.
func (p *Brontide) addLink(chanPoint *wire.OutPoint,
lnChan *lnwallet.LightningChannel,
forwardingPolicy *htlcswitch.ForwardingPolicy,
forwardingPolicy *models.ForwardingPolicy,
chainEvents *contractcourt.ChainEventSubscription,
syncStates bool) error {
@ -3831,26 +3832,18 @@ func (p *Brontide) addActiveChannel(c *channeldb.OpenChannel) error {
err)
}
// We'll query the localChanCfg of the new channel to determine the
// minimum HTLC value that can be forwarded. For the maximum HTLC value
// that can be forwarded and fees we'll use the default values, as they
// currently are always set to the default values at initial channel
// creation. Note that the maximum HTLC value defaults to the cap on
// the total value of outstanding HTLCs.
fwdMinHtlc := lnChan.FwdMinHtlc()
defaultPolicy := p.cfg.RoutingPolicy
forwardingPolicy := &htlcswitch.ForwardingPolicy{
MinHTLCOut: fwdMinHtlc,
MaxHTLC: c.LocalChanCfg.MaxPendingAmount,
BaseFee: defaultPolicy.BaseFee,
FeeRate: defaultPolicy.FeeRate,
TimeLockDelta: defaultPolicy.TimeLockDelta,
// We'll query the channel DB for the new channel's initial forwarding
// policies to determine the policy we start out with.
initialPolicy, err := p.cfg.ChannelDB.GetInitialForwardingPolicy(chanID)
if err != nil {
return fmt.Errorf("unable to query for initial forwarding "+
"policy: %v", err)
}
// Create the link and add it to the switch.
err = p.addLink(
chanPoint, lnChan, forwardingPolicy,
chainEvents, shouldReestablish,
chanPoint, lnChan, initialPolicy, chainEvents,
shouldReestablish,
)
if err != nil {
return fmt.Errorf("can't register new channel link(%v) with "+

View file

@ -7,8 +7,8 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwire"
@ -21,7 +21,7 @@ type Manager struct {
// UpdateForwardingPolicies is used by the manager to update active
// links with a new policy.
UpdateForwardingPolicies func(
chanPolicies map[wire.OutPoint]htlcswitch.ForwardingPolicy)
chanPolicies map[wire.OutPoint]models.ForwardingPolicy)
// PropagateChanPolicyUpdate is called to persist a new policy to disk
// and broadcast it to the network.
@ -66,7 +66,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
var failedUpdates []*lnrpc.FailedUpdate
var edgesToUpdate []discovery.EdgeWithInfo
policiesToUpdate := make(map[wire.OutPoint]htlcswitch.ForwardingPolicy)
policiesToUpdate := make(map[wire.OutPoint]models.ForwardingPolicy)
// Next, we'll loop over all the outgoing channels the router knows of.
// If we have a filter then we'll only collected those channels,
@ -106,7 +106,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
})
// Add updated policy to list of policies to send to switch.
policiesToUpdate[info.ChannelPoint] = htlcswitch.ForwardingPolicy{
policiesToUpdate[info.ChannelPoint] = models.ForwardingPolicy{
BaseFee: edge.FeeBaseMSat,
FeeRate: edge.FeeProportionalMillionths,
TimeLockDelta: uint32(edge.TimeLockDelta),

View file

@ -7,8 +7,8 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwire"
@ -50,7 +50,7 @@ func TestManager(t *testing.T) {
}
updateForwardingPolicies := func(
chanPolicies map[wire.OutPoint]htlcswitch.ForwardingPolicy) {
chanPolicies map[wire.OutPoint]models.ForwardingPolicy) {
if len(chanPolicies) == 0 {
return

View file

@ -1297,6 +1297,7 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
return cc.Wallet.LabelTransaction(hash, label, true)
},
Notifier: cc.ChainNotifier,
ChannelDB: s.chanStateDB,
FeeEstimator: cc.FeeEstimator,
SignMessage: cc.MsgSigner.SignMessage,
CurrentNodeAnnouncement: func() (lnwire.NodeAnnouncement,
@ -1458,9 +1459,8 @@ func newServer(cfg *Config, listenAddrs []net.Addr,
RegisteredChains: cfg.registeredChains,
MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte(
s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(),
DeleteAliasEdge: deleteAliasEdge,
AliasManager: s.aliasMgr,
UpdateForwardingPolicies: s.htlcSwitch.UpdateForwardingPolicies,
DeleteAliasEdge: deleteAliasEdge,
AliasManager: s.aliasMgr,
})
if err != nil {
return nil, err