Merge pull request #3523 from joostjager/enable-update-max-htlc

multi: enable max htlc update
This commit is contained in:
Olaoluwa Osuntokun 2019-09-23 17:42:22 -07:00 committed by GitHub
commit 9da8951cf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1060 additions and 691 deletions

View File

@ -2735,7 +2735,8 @@ func (c *ChannelAuthProof) IsEmpty() bool {
type ChannelEdgePolicy struct {
// SigBytes is the raw bytes of the signature of the channel edge
// policy. We'll only parse these if the caller needs to access the
// signature for validation purposes.
// signature for validation purposes. Do not set SigBytes directly, but
// use SetSigBytes instead to make sure that the cache is invalidated.
SigBytes []byte
// sig is a cached fully parsed signature.
@ -2814,6 +2815,13 @@ func (c *ChannelEdgePolicy) Signature() (*btcec.Signature, error) {
return sig, nil
}
// SetSigBytes updates the signature and invalidates the cached parsed
// signature.
func (c *ChannelEdgePolicy) SetSigBytes(sig []byte) {
c.SigBytes = sig
c.sig = nil
}
// IsDisabled determines whether the edge has the disabled bit set.
func (c *ChannelEdgePolicy) IsDisabled() bool {
return c.ChannelFlags&lnwire.ChanUpdateDisabled ==

View File

@ -3337,7 +3337,8 @@ var updateChannelPolicyCommand = cli.Command{
Category: "Channels",
Usage: "Update the channel policy for all channels, or a single " +
"channel.",
ArgsUsage: "base_fee_msat fee_rate time_lock_delta [channel_point]",
ArgsUsage: "base_fee_msat fee_rate time_lock_delta " +
"[--max_htlc_msat=N] [channel_point]",
Description: `
Updates the channel policy for all channels, or just a particular channel
identified by its channel point. The update will be committed, and
@ -3362,6 +3363,12 @@ var updateChannelPolicyCommand = cli.Command{
Usage: "the CLTV delta that will be applied to all " +
"forwarded HTLCs",
},
cli.Uint64Flag{
Name: "max_htlc_msat",
Usage: "if set, the max HTLC size that will be applied " +
"to all forwarded HTLCs. If unset, the max HTLC " +
"is left unchanged.",
},
cli.StringFlag{
Name: "chan_point",
Usage: "The channel whose fee policy should be " +
@ -3475,6 +3482,7 @@ func updateChannelPolicy(ctx *cli.Context) error {
BaseFeeMsat: baseFee,
FeeRate: feeRate,
TimeLockDelta: uint32(timeLockDelta),
MaxHtlcMsat: ctx.Uint64("max_htlc_msat"),
}
if chanPoint != nil {

View File

@ -86,14 +86,12 @@ type networkMsg struct {
}
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
// wishes to update the channel policy (fees e.g.) for a particular set of
// channels. New ChannelUpdate messages will be crafted to be sent out during
// the next broadcast epoch and the fee updates committed to the lower layer.
// wishes to update a particular set of channels. New ChannelUpdate messages
// will be crafted to be sent out during the next broadcast epoch and the fee
// updates committed to the lower layer.
type chanPolicyUpdateRequest struct {
targetChans []wire.OutPoint
newSchema routing.ChannelPolicy
chanPolicies chan updatedChanPolicies
edgesToUpdate []EdgeWithInfo
errChan chan error
}
// Config defines the configuration for the service. ALL elements within the
@ -361,31 +359,36 @@ type updatedChanPolicies struct {
err error
}
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to update the
// channel forwarding policies for the specified channels. If no channels are
// specified, then the update will be applied to all outgoing channels from the
// source node. Policy updates are done in two stages: first, the
// EdgeWithInfo contains the information that is required to update an edge.
type EdgeWithInfo struct {
// Info describes the channel.
Info *channeldb.ChannelEdgeInfo
// Edge describes the policy in one direction of the channel.
Edge *channeldb.ChannelEdgePolicy
}
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the
// specified edge updates. Updates are done in two stages: first, the
// AuthenticatedGossiper ensures the update has been committed by dependent
// sub-systems, then it signs and broadcasts new updates to the network. A
// mapping between outpoints and updated channel policies is returned, which is
// used to update the forwarding policies of the underlying links.
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
newSchema routing.ChannelPolicy, chanPoints ...wire.OutPoint) (
map[wire.OutPoint]*channeldb.ChannelEdgePolicy, error) {
edgesToUpdate []EdgeWithInfo) error {
chanPolicyChan := make(chan updatedChanPolicies, 1)
errChan := make(chan error, 1)
policyUpdate := &chanPolicyUpdateRequest{
targetChans: chanPoints,
newSchema: newSchema,
chanPolicies: chanPolicyChan,
edgesToUpdate: edgesToUpdate,
errChan: errChan,
}
select {
case d.chanPolicyUpdates <- policyUpdate:
updatedPolicies := <-chanPolicyChan
return updatedPolicies.chanPolicies, updatedPolicies.err
err := <-errChan
return err
case <-d.quit:
return nil, fmt.Errorf("AuthenticatedGossiper shutting down")
return fmt.Errorf("AuthenticatedGossiper shutting down")
}
}
@ -922,14 +925,10 @@ func (d *AuthenticatedGossiper) networkHandler() {
// First, we'll now create new fully signed updates for
// the affected channels and also update the underlying
// graph with the new state.
chanPolicies, newChanUpdates, err := d.processChanPolicyUpdate(
policyUpdate,
newChanUpdates, err := d.processChanPolicyUpdate(
policyUpdate.edgesToUpdate,
)
update := updatedChanPolicies{
chanPolicies,
err,
}
policyUpdate.chanPolicies <- update
policyUpdate.errChan <- err
if err != nil {
log.Errorf("Unable to craft policy updates: %v",
err)
@ -1211,6 +1210,11 @@ func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
// introduction of the MaxHTLC field, then we'll update this
// edge to propagate this information in the network.
if !edge.MessageFlags.HasMaxHtlc() {
// We'll make sure we support the new max_htlc field if
// not already present.
edge.MessageFlags |= lnwire.ChanUpdateOptionMaxHtlc
edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
edgesToUpdate = append(edgesToUpdate, updateTuple{
info: info,
edge: edge,
@ -1312,98 +1316,29 @@ func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
return nil
}
// processChanPolicyUpdate generates a new set of channel updates with the new
// channel policy applied for each specified channel identified by its channel
// point. In the case that no channel points are specified, then the update
// will be applied to all channels. Finally, the backing ChannelGraphSource is
// updated with the latest information reflecting the applied updates.
//
// TODO(roasbeef): generalize into generic for any channel update
// processChanPolicyUpdate generates a new set of channel updates for the
// provided list of edges and updates the backing ChannelGraphSource.
func (d *AuthenticatedGossiper) processChanPolicyUpdate(
policyUpdate *chanPolicyUpdateRequest) (
map[wire.OutPoint]*channeldb.ChannelEdgePolicy, []networkMsg, error) {
edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) {
// First, we'll construct a set of all the channels that need to be
// updated.
chansToUpdate := make(map[wire.OutPoint]struct{})
for _, chanPoint := range policyUpdate.targetChans {
chansToUpdate[chanPoint] = struct{}{}
}
// Next, we'll create a mapping from outpoint to edge policy that will
// be used by each edge's underlying link to update its policy.
chanPolicies := make(map[wire.OutPoint]*channeldb.ChannelEdgePolicy)
haveChanFilter := len(chansToUpdate) != 0
if haveChanFilter {
log.Infof("Updating routing policies for chan_points=%v",
spew.Sdump(chansToUpdate))
} else {
log.Infof("Updating routing policies for all chans")
}
type edgeWithInfo struct {
info *channeldb.ChannelEdgeInfo
edge *channeldb.ChannelEdgePolicy
}
var edgesToUpdate []edgeWithInfo
// Next, we'll loop over all the outgoing channels the router knows of.
// If we have a filter then we'll only collected those channels,
// otherwise we'll collect them all.
err := d.cfg.Router.ForAllOutgoingChannels(func(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {
// If we have a channel filter, and this channel isn't a part
// of it, then we'll skip it.
if _, ok := chansToUpdate[info.ChannelPoint]; !ok && haveChanFilter {
return nil
}
// Now that we know we should update this channel, we'll update
// its set of policies.
edge.FeeBaseMSat = policyUpdate.newSchema.BaseFee
edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
policyUpdate.newSchema.FeeRate,
)
edge.TimeLockDelta = uint16(policyUpdate.newSchema.TimeLockDelta)
edgesToUpdate = append(edgesToUpdate, edgeWithInfo{
info: info,
edge: edge,
})
return nil
})
if err != nil {
return nil, nil, err
}
// With the set of edges we need to update retrieved, we'll now re-sign
// them, and insert them into the database.
var chanUpdates []networkMsg
for _, edgeInfo := range edgesToUpdate {
// Now that we've collected all the channels we need to update,
// we'll Re-sign and update the backing ChannelGraphSource, and
// we'll re-sign and update the backing ChannelGraphSource, and
// retrieve our ChannelUpdate to broadcast.
_, chanUpdate, err := d.updateChannel(
edgeInfo.info, edgeInfo.edge,
edgeInfo.Info, edgeInfo.Edge,
)
if err != nil {
return nil, nil, err
return nil, err
}
// Since the update succeeded, add the edge to our policy
// mapping.
chanPolicies[edgeInfo.info.ChannelPoint] = edgeInfo.edge
// We'll avoid broadcasting any updates for private channels to
// avoid directly giving away their existence. Instead, we'll
// send the update directly to the remote party.
if edgeInfo.info.AuthProof == nil {
if edgeInfo.Info.AuthProof == nil {
remotePubKey := remotePubFromChanInfo(
edgeInfo.info, chanUpdate.ChannelFlags,
edgeInfo.Info, chanUpdate.ChannelFlags,
)
err := d.reliableSender.sendMessage(
chanUpdate, remotePubKey,
@ -1426,7 +1361,7 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(
})
}
return chanPolicies, chanUpdates, nil
return chanUpdates, nil
}
// processRejectedEdge examines a rejected edge to see if we can extract any
@ -2514,13 +2449,6 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement,
*lnwire.ChannelUpdate, error) {
// We'll make sure we support the new max_htlc field if not already
// present.
if !edge.MessageFlags.HasMaxHtlc() {
edge.MessageFlags |= lnwire.ChanUpdateOptionMaxHtlc
edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
}
// Make sure timestamp is always increased, such that our update gets
// propagated.
timestamp := time.Now().Unix()
@ -2543,12 +2471,6 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
ExtraOpaqueData: edge.ExtraOpaqueData,
}
var err error
chanUpdate.Signature, err = lnwire.NewSigFromRawSignature(edge.SigBytes)
if err != nil {
return nil, nil, err
}
// With the update applied, we'll generate a new signature over a
// digest of the channel announcement itself.
sig, err := SignAnnouncement(d.cfg.AnnSigner, d.selfKey, chanUpdate)
@ -2558,7 +2480,7 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
// Next, we'll set the new signature in place, and update the reference
// in the backing slice.
edge.SigBytes = sig.Serialize()
edge.SetSigBytes(sig.Serialize())
chanUpdate.Signature, err = lnwire.NewSigFromSignature(sig)
if err != nil {
return nil, nil, err

View File

@ -3602,20 +3602,26 @@ out:
// Now that all of our channels are loaded, we'll attempt to update the
// policy of all of them.
const newTimeLockDelta = 100
newPolicy := routing.ChannelPolicy{
TimeLockDelta: newTimeLockDelta,
}
newChanPolicies, err := ctx.gossiper.PropagateChanPolicyUpdate(newPolicy)
var edgesToUpdate []EdgeWithInfo
err = ctx.router.ForAllOutgoingChannels(func(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {
edge.TimeLockDelta = uint16(newTimeLockDelta)
edgesToUpdate = append(edgesToUpdate, EdgeWithInfo{
Info: info,
Edge: edge,
})
return nil
})
if err != nil {
t.Fatalf("unable to chan policies: %v", err)
t.Fatal(err)
}
// Ensure that the updated channel policies are as expected.
for _, dbPolicy := range newChanPolicies {
if dbPolicy.TimeLockDelta != uint16(newPolicy.TimeLockDelta) {
t.Fatalf("wrong delta: expected %v, got %v",
newPolicy.TimeLockDelta, dbPolicy.TimeLockDelta)
}
err = ctx.gossiper.PropagateChanPolicyUpdate(edgesToUpdate)
if err != nil {
t.Fatalf("unable to chan policies: %v", err)
}
// Two channel updates should now be broadcast, with neither of them

View File

@ -971,6 +971,47 @@ func TestUpdateForwardingPolicy(t *testing.T) {
default:
t.Fatalf("expected FailFeeInsufficient instead got: %v", err)
}
// Reset the policy so we can then test updating the max HTLC policy.
n.secondBobChannelLink.UpdateForwardingPolicy(n.globalPolicy)
// As a sanity check, ensure the original payment now succeeds again.
_, err = makePayment(
n.aliceServer, n.carolServer, firstHop, hops, amountNoFee,
htlcAmt, htlcExpiry,
).Wait(30 * time.Second)
if err != nil {
t.Fatalf("unable to send payment: %v", err)
}
// Now we'll update Bob's policy to lower his max HTLC to an extent
// that'll cause him to reject the same HTLC that we just sent.
newPolicy = n.globalPolicy
newPolicy.MaxHTLC = amountNoFee - 1
n.secondBobChannelLink.UpdateForwardingPolicy(newPolicy)
// Next, we'll send the payment again, using the exact same per-hop
// payload for each node. This payment should fail as it won't factor
// in Bob's new max HTLC policy.
_, err = makePayment(
n.aliceServer, n.carolServer, firstHop, hops, amountNoFee,
htlcAmt, htlcExpiry,
).Wait(30 * time.Second)
if err == nil {
t.Fatalf("payment should've been rejected")
}
ferr, ok = err.(*ForwardingError)
if !ok {
t.Fatalf("expected a ForwardingError, instead got (%T): %v",
err, err)
}
switch ferr.FailureMessage.(type) {
case *lnwire.FailTemporaryChannelFailure:
default:
t.Fatalf("expected TemporaryChannelFailure, instead got: %v",
err)
}
}
// TestChannelLinkMultiHopInsufficientPayment checks that we receive error if

View File

@ -431,7 +431,7 @@ func (s *Switch) SendHTLC(firstHop lnwire.ShortChannelID, paymentID uint64,
// forwarding policies for all links have been updated, or the switch shuts
// down.
func (s *Switch) UpdateForwardingPolicies(
chanPolicies map[wire.OutPoint]*channeldb.ChannelEdgePolicy) {
chanPolicies map[wire.OutPoint]ForwardingPolicy) {
log.Tracef("Updating link policies: %v", newLogClosure(func() string {
return spew.Sdump(chanPolicies)
@ -440,7 +440,7 @@ func (s *Switch) UpdateForwardingPolicies(
s.indexMtx.RLock()
// Update each link in chanPolicies.
for targetLink := range chanPolicies {
for targetLink, policy := range chanPolicies {
cid := lnwire.NewChanIDFromOutPoint(&targetLink)
link, ok := s.linkIndex[cid]
@ -450,28 +450,12 @@ func (s *Switch) UpdateForwardingPolicies(
continue
}
newPolicy := dbPolicyToFwdingPolicy(
chanPolicies[*link.ChannelPoint()],
)
link.UpdateForwardingPolicy(newPolicy)
link.UpdateForwardingPolicy(policy)
}
s.indexMtx.RUnlock()
}
// dbPolicyToFwdingPolicy is a helper function that converts a channeldb
// ChannelEdgePolicy into a ForwardingPolicy struct for the purpose of updating
// the forwarding policy of a link.
func dbPolicyToFwdingPolicy(policy *channeldb.ChannelEdgePolicy) ForwardingPolicy {
return ForwardingPolicy{
BaseFee: policy.FeeBaseMSat,
FeeRate: policy.FeeProportionalMillionths,
TimeLockDelta: uint32(policy.TimeLockDelta),
MinHTLC: policy.MinHTLC,
MaxHTLC: policy.MaxHTLC,
}
}
// forward is used in order to find next channel link and apply htlc update.
// Also this function is used by channel links itself in order to forward the
// update after it has been included in the channel.

File diff suppressed because it is too large Load Diff

View File

@ -2395,6 +2395,9 @@ message PolicyUpdateRequest {
/// The required timelock delta for HTLCs forwarded over the channel.
uint32 time_lock_delta = 5 [json_name = "time_lock_delta"];
/// If set, the maximum HTLC size in milli-satoshis. If unset, the maximum HTLC will be unchanged.
uint64 max_htlc_msat = 6 [json_name = "max_htlc_msat"];
}
message PolicyUpdateResponse {
}

View File

@ -3150,6 +3150,11 @@
"type": "integer",
"format": "int64",
"description": "/ The required timelock delta for HTLCs forwarded over the channel."
},
"max_htlc_msat": {
"type": "string",
"format": "uint64",
"description": "/ If set, the maximum HTLC size in milli-satoshis. If unset, the maximum HTLC will be unchanged."
}
}
},

View File

@ -1122,6 +1122,15 @@ type expectedChanUpdate struct {
chanPoint *lnrpc.ChannelPoint
}
// calculateMaxHtlc re-implements the RequiredRemoteChannelReserve of the
// funding manager's config, which corresponds to the maximum MaxHTLC value we
// allow users to set when updating a channel policy.
func calculateMaxHtlc(chanCap btcutil.Amount) uint64 {
reserve := lnwire.NewMSatFromSatoshis(chanCap / 100)
max := lnwire.NewMSatFromSatoshis(chanCap) - reserve
return uint64(max)
}
// waitForChannelUpdate waits for a node to receive the expected channel
// updates.
func waitForChannelUpdate(t *harnessTest, subscription graphSubscription,
@ -1292,6 +1301,10 @@ func checkChannelPolicy(policy, expectedPolicy *lnrpc.RoutingPolicy) error {
return fmt.Errorf("expected min htlc %v, got %v",
expectedPolicy.MinHtlc, policy.MinHtlc)
}
if policy.MaxHtlcMsat != expectedPolicy.MaxHtlcMsat {
return fmt.Errorf("expected max htlc %v, got %v",
expectedPolicy.MaxHtlcMsat, policy.MaxHtlcMsat)
}
if policy.Disabled != expectedPolicy.Disabled {
return errors.New("edge should be disabled but isn't")
}
@ -1310,6 +1323,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
defaultTimeLockDelta = lnd.DefaultBitcoinTimeLockDelta
defaultMinHtlc = 1000
)
defaultMaxHtlc := calculateMaxHtlc(lnd.MaxBtcFundingAmount)
// Launch notification clients for all nodes, such that we can
// get notified when they discover new channels and updates in the
@ -1344,6 +1358,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
FeeRateMilliMsat: defaultFeeRate,
TimeLockDelta: defaultTimeLockDelta,
MinHtlc: defaultMinHtlc,
MaxHtlcMsat: defaultMaxHtlc,
}
for _, graphSub := range graphSubs {
@ -1422,6 +1437,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
FeeRateMilliMsat: defaultFeeRate,
TimeLockDelta: defaultTimeLockDelta,
MinHtlc: customMinHtlc,
MaxHtlcMsat: defaultMaxHtlc,
}
expectedPolicyCarol := &lnrpc.RoutingPolicy{
@ -1429,6 +1445,7 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
FeeRateMilliMsat: defaultFeeRate,
TimeLockDelta: defaultTimeLockDelta,
MinHtlc: defaultMinHtlc,
MaxHtlcMsat: defaultMaxHtlc,
}
for _, graphSub := range graphSubs {
@ -1589,24 +1606,27 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
sendResp.PaymentError)
}
// With our little cluster set up, we'll update the fees for the
// channel Bob side of the Alice->Bob channel, and make sure all nodes
// learn about it.
// With our little cluster set up, we'll update the fees and the max htlc
// size for the Bob side of the Alice->Bob channel, and make sure
// all nodes learn about it.
baseFee := int64(1500)
feeRate := int64(12)
timeLockDelta := uint32(66)
maxHtlc := uint64(500000)
expectedPolicy = &lnrpc.RoutingPolicy{
FeeBaseMsat: baseFee,
FeeRateMilliMsat: testFeeBase * feeRate,
TimeLockDelta: timeLockDelta,
MinHtlc: defaultMinHtlc,
MaxHtlcMsat: maxHtlc,
}
req := &lnrpc.PolicyUpdateRequest{
BaseFeeMsat: baseFee,
FeeRate: float64(feeRate),
TimeLockDelta: timeLockDelta,
MaxHtlcMsat: maxHtlc,
Scope: &lnrpc.PolicyUpdateRequest_ChanPoint{
ChanPoint: chanPoint,
},
@ -1689,22 +1709,25 @@ func testUpdateChannelPolicy(net *lntest.NetworkHarness, t *harnessTest) {
baseFee = int64(800)
feeRate = int64(123)
timeLockDelta = uint32(22)
maxHtlc = maxHtlc * 2
expectedPolicy.FeeBaseMsat = baseFee
expectedPolicy.FeeRateMilliMsat = testFeeBase * feeRate
expectedPolicy.TimeLockDelta = timeLockDelta
expectedPolicy.MaxHtlcMsat = maxHtlc
req = &lnrpc.PolicyUpdateRequest{
BaseFeeMsat: baseFee,
FeeRate: float64(feeRate),
TimeLockDelta: timeLockDelta,
MaxHtlcMsat: maxHtlc,
}
req.Scope = &lnrpc.PolicyUpdateRequest_Global{}
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)
_, err = net.Alice.UpdateChannelPolicy(ctxt, req)
if err != nil {
t.Fatalf("unable to get alice's balance: %v", err)
t.Fatalf("unable to update alice's channel policy: %v", err)
}
// Wait for all nodes to have seen the policy updates for both of
@ -3950,7 +3973,8 @@ func assertAmountPaid(t *harnessTest, channelName string,
// listenerNode has received the policy update.
func updateChannelPolicy(t *harnessTest, node *lntest.HarnessNode,
chanPoint *lnrpc.ChannelPoint, baseFee int64, feeRate int64,
timeLockDelta uint32, listenerNode *lntest.HarnessNode) {
timeLockDelta uint32, maxHtlc uint64, listenerNode *lntest.HarnessNode) {
ctxb := context.Background()
expectedPolicy := &lnrpc.RoutingPolicy{
@ -3958,6 +3982,7 @@ func updateChannelPolicy(t *harnessTest, node *lntest.HarnessNode,
FeeRateMilliMsat: feeRate,
TimeLockDelta: timeLockDelta,
MinHtlc: 1000, // default value
MaxHtlcMsat: maxHtlc,
}
updateFeeReq := &lnrpc.PolicyUpdateRequest{
@ -3967,6 +3992,7 @@ func updateChannelPolicy(t *harnessTest, node *lntest.HarnessNode,
Scope: &lnrpc.PolicyUpdateRequest_ChanPoint{
ChanPoint: chanPoint,
},
MaxHtlcMsat: maxHtlc,
}
ctxt, _ := context.WithTimeout(ctxb, defaultTimeout)
@ -4143,14 +4169,15 @@ func testMultiHopPayments(net *lntest.NetworkHarness, t *harnessTest) {
// Set the fee policies of the Alice -> Bob and the Dave -> Alice
// channel edges to relatively large non default values. This makes it
// possible to pick up more subtle fee calculation errors.
maxHtlc := uint64(calculateMaxHtlc(chanAmt))
updateChannelPolicy(
t, net.Alice, chanPointAlice, 1000, 100000,
lnd.DefaultBitcoinTimeLockDelta, carol,
lnd.DefaultBitcoinTimeLockDelta, maxHtlc, carol,
)
updateChannelPolicy(
t, dave, chanPointDave, 5000, 150000,
lnd.DefaultBitcoinTimeLockDelta, carol,
lnd.DefaultBitcoinTimeLockDelta, maxHtlc, carol,
)
// Using Carol as the source, pay to the 5 invoices from Bob created
@ -12372,18 +12399,21 @@ func testRouteFeeCutoff(net *lntest.NetworkHarness, t *harnessTest) {
baseFee := int64(10000)
feeRate := int64(5)
timeLockDelta := uint32(lnd.DefaultBitcoinTimeLockDelta)
maxHtlc := calculateMaxHtlc(chanAmt)
expectedPolicy := &lnrpc.RoutingPolicy{
FeeBaseMsat: baseFee,
FeeRateMilliMsat: testFeeBase * feeRate,
TimeLockDelta: timeLockDelta,
MinHtlc: 1000, // default value
MaxHtlcMsat: maxHtlc,
}
updateFeeReq := &lnrpc.PolicyUpdateRequest{
BaseFeeMsat: baseFee,
FeeRate: float64(feeRate),
TimeLockDelta: timeLockDelta,
MaxHtlcMsat: maxHtlc,
Scope: &lnrpc.PolicyUpdateRequest_ChanPoint{
ChanPoint: chanPointCarolDave,
},
@ -12634,6 +12664,7 @@ func testSendUpdateDisableChannel(net *lntest.NetworkHarness, t *harnessTest) {
FeeRateMilliMsat: int64(lnd.DefaultBitcoinFeeRate),
TimeLockDelta: lnd.DefaultBitcoinTimeLockDelta,
MinHtlc: 1000, // default value
MaxHtlcMsat: calculateMaxHtlc(chanAmt),
Disabled: true,
}

View File

@ -0,0 +1,204 @@
package localchans
import (
"fmt"
"sync"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
)
// Manager manages the node's local channels. The only operation that is
// currently implemented is updating forwarding policies.
type Manager struct {
// UpdateForwardingPolicies is used by the manager to update active
// links with a new policy.
UpdateForwardingPolicies func(
chanPolicies map[wire.OutPoint]htlcswitch.ForwardingPolicy)
// PropagateChanPolicyUpdate is called to persist a new policy to disk
// and broadcast it to the network.
PropagateChanPolicyUpdate func(
edgesToUpdate []discovery.EdgeWithInfo) error
// ForAllOutgoingChannels is required to iterate over all our local
// channels.
ForAllOutgoingChannels func(cb func(*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy) error) error
// FetchChannel is used to query local channel parameters.
FetchChannel func(chanPoint wire.OutPoint) (*channeldb.OpenChannel,
error)
// policyUpdateLock ensures that the database and the link do not fall
// out of sync if there are concurrent fee update calls. Without it,
// there is a chance that policy A updates the database, then policy B
// updates the database, then policy B updates the link, then policy A
// updates the link.
policyUpdateLock sync.Mutex
}
// UpdatePolicy updates the policy for the specified channels on disk and in the
// active links.
func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
chanPoints ...wire.OutPoint) error {
r.policyUpdateLock.Lock()
defer r.policyUpdateLock.Unlock()
// First, we'll construct a set of all the channels that need to be
// updated.
chansToUpdate := make(map[wire.OutPoint]struct{})
for _, chanPoint := range chanPoints {
chansToUpdate[chanPoint] = struct{}{}
}
haveChanFilter := len(chansToUpdate) != 0
var edgesToUpdate []discovery.EdgeWithInfo
policiesToUpdate := make(map[wire.OutPoint]htlcswitch.ForwardingPolicy)
// Next, we'll loop over all the outgoing channels the router knows of.
// If we have a filter then we'll only collected those channels,
// otherwise we'll collect them all.
err := r.ForAllOutgoingChannels(func(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {
// If we have a channel filter, and this channel isn't a part
// of it, then we'll skip it.
_, ok := chansToUpdate[info.ChannelPoint]
if !ok && haveChanFilter {
return nil
}
// Apply the new policy to the edge.
err := r.updateEdge(info.ChannelPoint, edge, newSchema)
if err != nil {
return nil
}
// Add updated edge to list of edges to send to gossiper.
edgesToUpdate = append(edgesToUpdate, discovery.EdgeWithInfo{
Info: info,
Edge: edge,
})
// Add updated policy to list of policies to send to switch.
policiesToUpdate[info.ChannelPoint] = htlcswitch.ForwardingPolicy{
BaseFee: edge.FeeBaseMSat,
FeeRate: edge.FeeProportionalMillionths,
TimeLockDelta: uint32(edge.TimeLockDelta),
MinHTLC: edge.MinHTLC,
MaxHTLC: edge.MaxHTLC,
}
return nil
})
if err != nil {
return err
}
// Commit the policy updates to disk and broadcast to the network. We
// validated the new policy above, so we expect no validation errors. If
// this would happen because of a bug, the link policy will be
// desynchronized. It is currently not possible to atomically commit
// multiple edge updates.
err = r.PropagateChanPolicyUpdate(edgesToUpdate)
if err != nil {
return err
}
// Update active links.
r.UpdateForwardingPolicies(policiesToUpdate)
return nil
}
// updateEdge updates the given edge with the new schema.
func (r *Manager) updateEdge(chanPoint wire.OutPoint,
edge *channeldb.ChannelEdgePolicy,
newSchema routing.ChannelPolicy) error {
// Update forwarding fee scheme and required time lock delta.
edge.FeeBaseMSat = newSchema.BaseFee
edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
newSchema.FeeRate,
)
edge.TimeLockDelta = uint16(newSchema.TimeLockDelta)
// Retrieve negotiated channel htlc amt limits.
amtMin, amtMax, err := r.getHtlcAmtLimits(chanPoint)
if err != nil {
return nil
}
// We now update the edge max htlc value.
switch {
// If a non-zero max htlc was specified, use it to update the edge.
// Otherwise keep the value unchanged.
case newSchema.MaxHTLC != 0:
edge.MaxHTLC = newSchema.MaxHTLC
// If this edge still doesn't have a max htlc set, set it to the max.
// This is an on-the-fly migration.
case !edge.MessageFlags.HasMaxHtlc():
edge.MaxHTLC = amtMax
// If this edge has a max htlc that exceeds what the channel can
// actually carry, correct it now. This can happen, because we
// previously set the max htlc to the channel capacity.
case edge.MaxHTLC > amtMax:
edge.MaxHTLC = amtMax
}
// If the MaxHtlc flag wasn't already set, we can set it now.
edge.MessageFlags |= lnwire.ChanUpdateOptionMaxHtlc
// Validate htlc amount constraints.
switch {
case edge.MinHTLC < amtMin:
return fmt.Errorf("min htlc amount of %v msat is below "+
"min htlc parameter of %v msat for channel %v",
edge.MinHTLC, amtMin,
chanPoint)
case edge.MaxHTLC > amtMax:
return fmt.Errorf("max htlc size of %v msat is above "+
"max pending amount of %v msat for channel %v",
edge.MaxHTLC, amtMax, chanPoint)
case edge.MinHTLC > edge.MaxHTLC:
return fmt.Errorf("min_htlc %v greater than max_htlc %v",
edge.MinHTLC, edge.MaxHTLC)
}
// Clear signature to help prevent usage of the previous signature.
edge.SetSigBytes(nil)
return nil
}
// getHtlcAmtLimits retrieves the negotiated channel min and max htlc amount
// constraints.
func (r *Manager) getHtlcAmtLimits(chanPoint wire.OutPoint) (
lnwire.MilliSatoshi, lnwire.MilliSatoshi, error) {
ch, err := r.FetchChannel(chanPoint)
if err != nil {
return 0, 0, err
}
// The max htlc policy field must be less than or equal to the channel
// capacity AND less than or equal to the max in-flight HTLC value.
// Since the latter is always less than or equal to the former, just
// return the max in-flight value.
maxAmt := ch.LocalChanCfg.ChannelConstraints.MaxPendingAmount
return ch.LocalChanCfg.MinHTLC, maxAmt, nil
}

View File

@ -0,0 +1,148 @@
package localchans
import (
"testing"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcutil"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/routing"
)
// TestManager tests that the local channel manager properly propagates fee
// updates to gossiper and links.
func TestManager(t *testing.T) {
var (
chanPoint = wire.OutPoint{Hash: chainhash.Hash{1}, Index: 2}
chanCap = btcutil.Amount(1000)
maxPendingAmount = lnwire.MilliSatoshi(999000)
minHTLC = lnwire.MilliSatoshi(2000)
)
newPolicy := routing.ChannelPolicy{
FeeSchema: routing.FeeSchema{
BaseFee: 100,
FeeRate: 200,
},
TimeLockDelta: 80,
MaxHTLC: 5000,
}
currentPolicy := channeldb.ChannelEdgePolicy{
MinHTLC: minHTLC,
MessageFlags: lnwire.ChanUpdateOptionMaxHtlc,
}
updateForwardingPolicies := func(
chanPolicies map[wire.OutPoint]htlcswitch.ForwardingPolicy) {
if len(chanPolicies) != 1 {
t.Fatal("unexpected number of policies to apply")
}
policy := chanPolicies[chanPoint]
if policy.TimeLockDelta != newPolicy.TimeLockDelta {
t.Fatal("unexpected time lock delta")
}
if policy.BaseFee != newPolicy.BaseFee {
t.Fatal("unexpected base fee")
}
if uint32(policy.FeeRate) != newPolicy.FeeRate {
t.Fatal("unexpected base fee")
}
if policy.MaxHTLC != newPolicy.MaxHTLC {
t.Fatal("unexpected max htlc")
}
}
propagateChanPolicyUpdate := func(
edgesToUpdate []discovery.EdgeWithInfo) error {
if len(edgesToUpdate) != 1 {
t.Fatal("unexpected number of edges to update")
}
policy := edgesToUpdate[0].Edge
if !policy.MessageFlags.HasMaxHtlc() {
t.Fatal("expected max htlc flag")
}
if policy.TimeLockDelta != uint16(newPolicy.TimeLockDelta) {
t.Fatal("unexpected time lock delta")
}
if policy.FeeBaseMSat != newPolicy.BaseFee {
t.Fatal("unexpected base fee")
}
if uint32(policy.FeeProportionalMillionths) != newPolicy.FeeRate {
t.Fatal("unexpected base fee")
}
if policy.MaxHTLC != newPolicy.MaxHTLC {
t.Fatal("unexpected max htlc")
}
return nil
}
forAllOutgoingChannels := func(cb func(*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy) error) error {
return cb(
&channeldb.ChannelEdgeInfo{
Capacity: chanCap,
ChannelPoint: chanPoint,
},
&currentPolicy,
)
}
fetchChannel := func(chanPoint wire.OutPoint) (*channeldb.OpenChannel,
error) {
constraints := channeldb.ChannelConstraints{
MaxPendingAmount: maxPendingAmount,
MinHTLC: minHTLC,
}
return &channeldb.OpenChannel{
LocalChanCfg: channeldb.ChannelConfig{
ChannelConstraints: constraints,
},
}, nil
}
manager := Manager{
UpdateForwardingPolicies: updateForwardingPolicies,
PropagateChanPolicyUpdate: propagateChanPolicyUpdate,
ForAllOutgoingChannels: forAllOutgoingChannels,
FetchChannel: fetchChannel,
}
// Test updating a specific channels.
err := manager.UpdatePolicy(newPolicy, chanPoint)
if err != nil {
t.Fatal(err)
}
// Test updating all channels, which comes down to the same as testing a
// specific channel because there is only one channel.
err = manager.UpdatePolicy(newPolicy)
if err != nil {
t.Fatal(err)
}
// If no max htlc is specified, the max htlc value should be kept
// unchanged.
currentPolicy.MaxHTLC = newPolicy.MaxHTLC
noMaxHtlcPolicy := newPolicy
noMaxHtlcPolicy.MaxHTLC = 0
err = manager.UpdatePolicy(noMaxHtlcPolicy)
if err != nil {
t.Fatal(err)
}
}

View File

@ -219,6 +219,10 @@ type ChannelPolicy struct {
// TimeLockDelta is the required HTLC timelock delta to be used
// when forwarding payments.
TimeLockDelta uint32
// MaxHTLC is the maximum HTLC size including fees we are allowed to
// forward over this channel.
MaxHTLC lnwire.MilliSatoshi
}
// Config defines the configuration for the ChannelRouter. ALL elements within

View File

@ -4545,12 +4545,6 @@ func (r *rpcServer) FeeReport(ctx context.Context,
// 0.000001, or 0.0001%.
const minFeeRate = 1e-6
// policyUpdateLock ensures that the database and the link do not fall out of
// sync if there are concurrent fee update calls. Without it, there is a chance
// that policy A updates the database, then policy B updates the database, then
// policy B updates the link, then policy A updates the link.
var policyUpdateLock sync.Mutex
// UpdateChannelPolicy allows the caller to update the channel forwarding policy
// for all channels globally, or a particular channel.
func (r *rpcServer) UpdateChannelPolicy(ctx context.Context,
@ -4608,6 +4602,7 @@ func (r *rpcServer) UpdateChannelPolicy(ctx context.Context,
chanPolicy := routing.ChannelPolicy{
FeeSchema: feeSchema,
TimeLockDelta: req.TimeLockDelta,
MaxHTLC: lnwire.MilliSatoshi(req.MaxHtlcMsat),
}
rpcsLog.Debugf("[updatechanpolicy] updating channel policy base_fee=%v, "+
@ -4615,22 +4610,13 @@ func (r *rpcServer) UpdateChannelPolicy(ctx context.Context,
req.BaseFeeMsat, req.FeeRate, feeRateFixed, req.TimeLockDelta,
spew.Sdump(targetChans))
// With the scope resolved, we'll now send this to the
// AuthenticatedGossiper so it can propagate the new policy for our
// target channel(s).
policyUpdateLock.Lock()
defer policyUpdateLock.Unlock()
chanPolicies, err := r.server.authGossiper.PropagateChanPolicyUpdate(
chanPolicy, targetChans...,
)
// With the scope resolved, we'll now send this to the local channel
// manager so it can propagate the new policy for our target channel(s).
err := r.server.localChanMgr.UpdatePolicy(chanPolicy, targetChans...)
if err != nil {
return nil, err
}
// Finally, we'll apply the set of channel policies to the target
// channels' links.
r.server.htlcSwitch.UpdateForwardingPolicies(chanPolicies)
return &lnrpc.PolicyUpdateResponse{}, nil
}

View File

@ -48,6 +48,7 @@ import (
"github.com/lightningnetwork/lnd/peernotifier"
"github.com/lightningnetwork/lnd/pool"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/localchans"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/sweep"
"github.com/lightningnetwork/lnd/ticker"
@ -203,6 +204,8 @@ type server struct {
authGossiper *discovery.AuthenticatedGossiper
localChanMgr *localchans.Manager
utxoNursery *utxoNursery
sweeper *sweep.UtxoSweeper
@ -735,6 +738,13 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
s.identityPriv.PubKey(),
)
s.localChanMgr = &localchans.Manager{
ForAllOutgoingChannels: s.chanRouter.ForAllOutgoingChannels,
PropagateChanPolicyUpdate: s.authGossiper.PropagateChanPolicyUpdate,
UpdateForwardingPolicies: s.htlcSwitch.UpdateForwardingPolicies,
FetchChannel: s.chanDB.FetchChannel,
}
utxnStore, err := newNurseryStore(activeNetParams.GenesisHash, chanDB)
if err != nil {
srvrLog.Errorf("unable to create nursery store: %v", err)