funding+channeldb: handle and persist channel fees

This commit is contained in:
Slyghtning 2022-09-26 11:10:39 -04:00
parent 021cb07b39
commit e87412bd63
2 changed files with 285 additions and 41 deletions

View File

@ -276,6 +276,11 @@ var (
// channelOpeningState for each channel that is currently in the process
// of being opened.
channelOpeningStateBucket = []byte("channelOpeningState")
// initialChannelFwdingPolicyBucket is the database bucket used to store
// the forwarding policy for each permanent channel that is currently
// in the process of being opened.
initialChannelFwdingPolicyBucket = []byte("initialChannelFwdingPolicy")
)
// DB is the primary datastore for the lnd daemon. The database stores
@ -324,7 +329,9 @@ func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) {
// CreateWithBackend creates channeldb instance using the passed kvdb.Backend.
// Any necessary schemas migrations due to updates will take place as necessary.
func CreateWithBackend(backend kvdb.Backend, modifiers ...OptionModifier) (*DB, error) {
func CreateWithBackend(backend kvdb.Backend,
modifiers ...OptionModifier) (*DB, error) {
opts := DefaultOptions()
for _, modifier := range modifiers {
modifier(&opts)
@ -656,7 +663,9 @@ func (c *ChannelStateDB) FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) (
// The next layer down is all the chains that this node
// has channels on with us.
return nodeChanBucket.ForEach(func(chainHash, v []byte) error {
return nodeChanBucket.ForEach(func(chainHash,
v []byte) error {
// If there's a value, it's not a bucket so
// ignore it.
if v != nil {
@ -1108,8 +1117,8 @@ func (c *ChannelStateDB) pruneLinkNode(openChannels []*OpenChannel,
return c.linkNodeDB.DeleteLinkNode(remotePub)
}
// PruneLinkNodes attempts to prune all link nodes found within the database with
// whom we no longer have any open channels with.
// PruneLinkNodes attempts to prune all link nodes found within the database
// with whom we no longer have any open channels with.
func (c *ChannelStateDB) PruneLinkNodes() error {
allLinkNodes, err := c.linkNodeDB.FetchAllLinkNodes()
if err != nil {
@ -1290,6 +1299,64 @@ func (c *ChannelStateDB) AbandonChannel(chanPoint *wire.OutPoint,
return dbChan.CloseChannel(summary, ChanStatusLocalCloseInitiator)
}
// SaveInitialFwdingPolicy saves the serialized forwarding policy for the
// provided permanent channel id to the initialChannelFwdingPolicyBucket.
func (c *ChannelStateDB) SaveInitialFwdingPolicy(chanID,
forwardingPolicy []byte) error {
return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
bucket, err := tx.CreateTopLevelBucket(
initialChannelFwdingPolicyBucket,
)
if err != nil {
return err
}
return bucket.Put(chanID, forwardingPolicy)
}, func() {})
}
// GetInitialFwdingPolicy fetches the serialized forwarding policy for the
// provided channel id from the database, or returns ErrChannelNotFound if
// a forwarding policy for this channel id is not found.
func (c *ChannelStateDB) GetInitialFwdingPolicy(chanID []byte) ([]byte, error) {
var serializedState []byte
err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
bucket := tx.ReadBucket(initialChannelFwdingPolicyBucket)
if bucket == nil {
// If the bucket does not exist, it means we
// never added a channel fees to the db, so
// return ErrChannelNotFound.
return ErrChannelNotFound
}
stateBytes := bucket.Get(chanID)
if stateBytes == nil {
return ErrChannelNotFound
}
serializedState = append(serializedState, stateBytes...)
return nil
}, func() {
serializedState = nil
})
return serializedState, err
}
// DeleteInitialFwdingPolicy removes the forwarding policy for a given channel
// from the database.
func (c *ChannelStateDB) DeleteInitialFwdingPolicy(chanID []byte) error {
return kvdb.Update(c.backend, func(tx kvdb.RwTx) error {
bucket := tx.ReadWriteBucket(initialChannelFwdingPolicyBucket)
if bucket == nil {
return ErrChannelNotFound
}
return bucket.Delete(chanID)
}, func() {})
}
// SaveChannelOpeningState saves the serialized channel state for the provided
// chanPoint to the channelOpeningStateBucket.
func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,
@ -1308,7 +1375,9 @@ func (c *ChannelStateDB) SaveChannelOpeningState(outPoint,
// GetChannelOpeningState fetches the serialized channel state for the provided
// outPoint from the database, or returns ErrChannelNotFound if the channel
// is not found.
func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte, error) {
func (c *ChannelStateDB) GetChannelOpeningState(outPoint []byte) ([]byte,
error) {
var serializedState []byte
err := kvdb.View(c.backend, func(tx kvdb.RTx) error {
bucket := tx.ReadBucket(channelOpeningStateBucket)
@ -1392,7 +1461,8 @@ func (d *DB) syncVersions(versions []mandatoryVersion) error {
continue
}
log.Infof("Applying migration #%v", migrationVersions[i])
log.Infof("Applying migration #%v",
migrationVersions[i])
if err := migration(tx); err != nil {
log.Infof("Unable to apply migration #%v",
@ -1532,7 +1602,9 @@ func fetchHistoricalChanBucket(tx kvdb.RTx,
if err := writeOutpoint(&chanPointBuf, outPoint); err != nil {
return nil, err
}
chanBucket := historicalChanBucket.NestedReadBucket(chanPointBuf.Bytes())
chanBucket := historicalChanBucket.NestedReadBucket(
chanPointBuf.Bytes(),
)
if chanBucket == nil {
return nil, ErrChannelNotFound
}

View File

@ -138,6 +138,9 @@ type reservationWithCtx struct {
chanAmt btcutil.Amount
// forwardingPolicy is the policy provided by the initFundingMsg.
forwardingPolicy htlcswitch.ForwardingPolicy
// Constraints we require for the remote.
remoteCsvDelay uint16
remoteMinHtlc lnwire.MilliSatoshi
@ -197,6 +200,15 @@ type InitFundingMsg struct {
// LocalFundingAmt is the size of the channel.
LocalFundingAmt btcutil.Amount
// BaseFee is the base fee charged for routing payments regardless of the
// number of milli-satoshis sent.
BaseFee *uint64
// FeeRate is the fee rate in ppm (parts per million) that will be charged
// proportionally based on the value of each forwarded HTLC, the lowest
// possible rate is 0 with a granularity of 0.000001 (millionths).
FeeRate *uint64
// PushAmt is the amount pushed to the counterparty.
PushAmt lnwire.MilliSatoshi
@ -1592,6 +1604,14 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
minHtlc = acceptorResp.MinHtlcIn
}
// If we are handling a FundingOpen request then we need to
// specify the default channel fees since they are not provided
// by the responder interactively.
forwardingPolicy := htlcswitch.ForwardingPolicy{
BaseFee: f.cfg.DefaultRoutingPolicy.BaseFee,
FeeRate: f.cfg.DefaultRoutingPolicy.FeeRate,
}
// Once the reservation has been created successfully, we add it to
// this peer's map of pending reservations to track this particular
// reservation until either abort or completion.
@ -1600,16 +1620,17 @@ func (f *Manager) handleFundingOpen(peer lnpeer.Peer,
f.activeReservations[peerIDKey] = make(pendingChannels)
}
resCtx := &reservationWithCtx{
reservation: reservation,
chanAmt: amt,
remoteCsvDelay: remoteCsvDelay,
remoteMinHtlc: minHtlc,
remoteMaxValue: remoteMaxValue,
remoteMaxHtlcs: maxHtlcs,
maxLocalCsv: f.cfg.MaxLocalCSVDelay,
channelType: msg.ChannelType,
err: make(chan error, 1),
peer: peer,
reservation: reservation,
chanAmt: amt,
forwardingPolicy: forwardingPolicy,
remoteCsvDelay: remoteCsvDelay,
remoteMinHtlc: minHtlc,
remoteMaxValue: remoteMaxValue,
remoteMaxHtlcs: maxHtlcs,
maxLocalCsv: f.cfg.MaxLocalCSVDelay,
channelType: msg.ChannelType,
err: make(chan error, 1),
peer: peer,
}
f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx
f.resMtx.Unlock()
@ -2110,6 +2131,9 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer,
return
}
// Get forwarding policy before deleting the reservation context.
forwardingPolicy := resCtx.forwardingPolicy
// The channel is marked IsPending in the database, and can be removed
// from the set of active reservations.
f.deleteReservationCtx(peerKey, msg.PendingChannelID)
@ -2176,6 +2200,14 @@ func (f *Manager) handleFundingCreated(peer lnpeer.Peer,
return
}
// With a permanent channel id established we can save the respective
// forwarding policy in the database. In the channel announcement phase
// this forwarding policy is retrieved and applied.
err = f.saveInitialFwdingPolicy(channelID, &forwardingPolicy)
if err != nil {
log.Errorf("Unable to store the forwarding policy: %v", err)
}
// Now that we've sent over our final signature for this channel, we'll
// send it to the ChainArbitrator so it can watch for any on-chain
// actions during this final confirmation stage.
@ -2257,6 +2289,14 @@ func (f *Manager) handleFundingSigned(peer lnpeer.Peer,
f.localDiscoverySignals[permChanID] = make(chan struct{})
f.localDiscoveryMtx.Unlock()
// We have to store the forwardingPolicy before the reservation context
// is deleted. The policy will then be read and applied in
// newChanAnnouncement.
err = f.saveInitialFwdingPolicy(permChanID, &resCtx.forwardingPolicy)
if err != nil {
log.Errorf("Unable to store the forwarding policy: %v", err)
}
// The remote peer has responded with a signature for our commitment
// transaction. We'll verify the signature for validity, then commit
// the state to disk as we can now open the channel.
@ -3082,6 +3122,15 @@ func (f *Manager) annAfterSixConfs(completeChan *channeldb.OpenChannel,
return fmt.Errorf("unable to send node announcement "+
"to peer %x: %v", pubKey, err)
}
// For private channels we do not announce the channel policy
// to the network but still need to delete them from the
// database.
err = f.deleteInitialFwdingPolicy(chanID)
if err != nil {
log.Infof("Could not delete channel fees "+
"for chanId %x.", chanID)
}
} else {
// Otherwise, we'll wait until the funding transaction has
// reached 6 confirmations before announcing it.
@ -3523,8 +3572,14 @@ func (f *Manager) newChanAnnouncement(localPubKey,
if bytes.Compare(selfBytes, remoteBytes) == -1 {
copy(chanAnn.NodeID1[:], localPubKey.SerializeCompressed())
copy(chanAnn.NodeID2[:], remotePubKey.SerializeCompressed())
copy(chanAnn.BitcoinKey1[:], localFundingKey.PubKey.SerializeCompressed())
copy(chanAnn.BitcoinKey2[:], remoteFundingKey.SerializeCompressed())
copy(
chanAnn.BitcoinKey1[:],
localFundingKey.PubKey.SerializeCompressed(),
)
copy(
chanAnn.BitcoinKey2[:],
remoteFundingKey.SerializeCompressed(),
)
// If we're the first node then update the chanFlags to
// indicate the "direction" of the update.
@ -3532,8 +3587,14 @@ func (f *Manager) newChanAnnouncement(localPubKey,
} else {
copy(chanAnn.NodeID1[:], remotePubKey.SerializeCompressed())
copy(chanAnn.NodeID2[:], localPubKey.SerializeCompressed())
copy(chanAnn.BitcoinKey1[:], remoteFundingKey.SerializeCompressed())
copy(chanAnn.BitcoinKey2[:], localFundingKey.PubKey.SerializeCompressed())
copy(
chanAnn.BitcoinKey1[:],
remoteFundingKey.SerializeCompressed(),
)
copy(
chanAnn.BitcoinKey2[:],
localFundingKey.PubKey.SerializeCompressed(),
)
// If we're the second node then update the chanFlags to
// indicate the "direction" of the update.
@ -3552,19 +3613,24 @@ func (f *Manager) newChanAnnouncement(localPubKey,
Timestamp: uint32(time.Now().Unix()),
MessageFlags: msgFlags,
ChannelFlags: chanFlags,
TimeLockDelta: uint16(f.cfg.DefaultRoutingPolicy.TimeLockDelta),
// We use the HtlcMinimumMsat that the remote party required us
// to use, as our ChannelUpdate will be used to carry HTLCs
// towards them.
TimeLockDelta: uint16(
f.cfg.DefaultRoutingPolicy.TimeLockDelta,
),
HtlcMinimumMsat: fwdMinHTLC,
HtlcMaximumMsat: fwdMaxHTLC,
BaseFee: uint32(f.cfg.DefaultRoutingPolicy.BaseFee),
FeeRate: uint32(f.cfg.DefaultRoutingPolicy.FeeRate),
}
if ourPolicy != nil {
// The caller of newChanAnnouncement is expected to provide the initial
// forwarding policy to be announced. We abort the channel announcement
// if they are not provided.
storedFwdingPolicy, err := f.getInitialFwdingPolicy(chanID)
if err != nil {
return nil, errors.Errorf("unable to generate channel "+
"update announcement: %v", err)
}
switch {
case ourPolicy != nil:
// If ourPolicy is non-nil, modify the default parameters of the
// ChannelUpdate.
chanUpdateAnn.MessageFlags = ourPolicy.MessageFlags
@ -3576,6 +3642,21 @@ func (f *Manager) newChanAnnouncement(localPubKey,
chanUpdateAnn.FeeRate = uint32(
ourPolicy.FeeProportionalMillionths,
)
case storedFwdingPolicy != nil:
chanUpdateAnn.BaseFee = uint32(storedFwdingPolicy.BaseFee)
chanUpdateAnn.FeeRate = uint32(storedFwdingPolicy.FeeRate)
default:
log.Infof("No channel forwaring policy specified for channel "+
"announcement of ChannelID(%v). "+
"Assuming default fee parameters.", chanID)
chanUpdateAnn.BaseFee = uint32(
f.cfg.DefaultRoutingPolicy.BaseFee,
)
chanUpdateAnn.FeeRate = uint32(
f.cfg.DefaultRoutingPolicy.FeeRate,
)
}
// With the channel update announcement constructed, we'll generate a
@ -3672,6 +3753,14 @@ func (f *Manager) announceChannel(localIDKey, remoteIDKey *btcec.PublicKey,
return err
}
// After the fee parameters have been stored in the announcement
// we can delete them from the database.
err = f.deleteInitialFwdingPolicy(chanID)
if err != nil {
log.Infof("Could not delete channel fees for chanId %x.",
chanID)
}
// We only send the channel proof announcement and the node announcement
// because addToRouterGraph previously sent the ChannelAnnouncement and
// the ChannelUpdate announcement messages. The channel proof and node
@ -3792,6 +3881,8 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
var (
peerKey = msg.Peer.IdentityKey()
localAmt = msg.LocalFundingAmt
baseFee = msg.BaseFee
feeRate = msg.FeeRate
minHtlcIn = msg.MinHtlcIn
remoteCsvDelay = msg.RemoteCsvDelay
maxValue = msg.MaxValueInFlight
@ -3985,6 +4076,22 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
maxHtlcs = f.cfg.RequiredRemoteMaxHTLCs(capacity)
}
// Prepare the optional channel fee values from the initFundingMsg.
// If useBaseFee or useFeeRate are false the client did not
// provide fee values hence we assume default fee settings from
// the config.
forwardingPolicy := htlcswitch.ForwardingPolicy{
BaseFee: f.cfg.DefaultRoutingPolicy.BaseFee,
FeeRate: f.cfg.DefaultRoutingPolicy.FeeRate,
}
if baseFee != nil {
forwardingPolicy.BaseFee = lnwire.MilliSatoshi(*baseFee)
}
if feeRate != nil {
forwardingPolicy.FeeRate = lnwire.MilliSatoshi(*feeRate)
}
// If a pending channel map for this peer isn't already created, then
// we create one, ultimately allowing us to track this pending
// reservation within the target peer.
@ -3995,17 +4102,18 @@ func (f *Manager) handleInitFundingMsg(msg *InitFundingMsg) {
}
resCtx := &reservationWithCtx{
chanAmt: capacity,
remoteCsvDelay: remoteCsvDelay,
remoteMinHtlc: minHtlcIn,
remoteMaxValue: maxValue,
remoteMaxHtlcs: maxHtlcs,
maxLocalCsv: maxCSV,
channelType: msg.ChannelType,
reservation: reservation,
peer: msg.Peer,
updates: msg.Updates,
err: msg.Err,
chanAmt: capacity,
forwardingPolicy: forwardingPolicy,
remoteCsvDelay: remoteCsvDelay,
remoteMinHtlc: minHtlcIn,
remoteMaxValue: maxValue,
remoteMaxHtlcs: maxHtlcs,
maxLocalCsv: maxCSV,
channelType: msg.ChannelType,
reservation: reservation,
peer: msg.Peer,
updates: msg.Updates,
err: msg.Err,
}
f.activeReservations[peerIDKey][chanID] = resCtx
f.resMtx.Unlock()
@ -4267,6 +4375,70 @@ func copyPubKey(pub *btcec.PublicKey) *btcec.PublicKey {
return btcec.NewPublicKey(&tmp.X, &tmp.Y)
}
// saveInitialFwdingPolicy saves the forwarding policy for the provided
// chanPoint in the channelOpeningStateBucket.
func (f *Manager) saveInitialFwdingPolicy(permChanID lnwire.ChannelID,
forwardingPolicy *htlcswitch.ForwardingPolicy) error {
chanID := make([]byte, 32)
copy(chanID, permChanID[:])
scratch := make([]byte, 36)
byteOrder.PutUint64(scratch[:8], uint64(forwardingPolicy.MinHTLCOut))
byteOrder.PutUint64(scratch[8:16], uint64(forwardingPolicy.MaxHTLC))
byteOrder.PutUint64(scratch[16:24], uint64(forwardingPolicy.BaseFee))
byteOrder.PutUint64(scratch[24:32], uint64(forwardingPolicy.FeeRate))
byteOrder.PutUint32(scratch[32:], forwardingPolicy.TimeLockDelta)
return f.cfg.Wallet.Cfg.Database.SaveInitialFwdingPolicy(
chanID, scratch,
)
}
// getInitialFwdingPolicy fetches the initial forwarding policy for a given
// channel id from the database which will be applied during the channel
// announcement phase.
func (f *Manager) getInitialFwdingPolicy(permChanID lnwire.ChannelID) (
*htlcswitch.ForwardingPolicy, error) {
chanID := make([]byte, 32)
copy(chanID, permChanID[:])
value, err := f.cfg.Wallet.Cfg.Database.GetInitialFwdingPolicy(
chanID,
)
if err != nil {
return nil, err
}
var fwdingPolicy htlcswitch.ForwardingPolicy
fwdingPolicy.MinHTLCOut = lnwire.MilliSatoshi(
byteOrder.Uint64(value[:8]),
)
fwdingPolicy.MaxHTLC = lnwire.MilliSatoshi(
byteOrder.Uint64(value[8:16]),
)
fwdingPolicy.BaseFee = lnwire.MilliSatoshi(
byteOrder.Uint64(value[16:24]),
)
fwdingPolicy.FeeRate = lnwire.MilliSatoshi(
byteOrder.Uint64(value[24:32]),
)
fwdingPolicy.TimeLockDelta = byteOrder.Uint32(value[32:36])
return &fwdingPolicy, nil
}
// deleteInitialFwdingPolicy removes channel fees for this chanID from
// the database.
func (f *Manager) deleteInitialFwdingPolicy(permChanID lnwire.ChannelID) error {
chanID := make([]byte, 32)
copy(chanID, permChanID[:])
return f.cfg.Wallet.Cfg.Database.DeleteInitialFwdingPolicy(chanID)
}
// saveChannelOpeningState saves the channelOpeningState for the provided
// chanPoint to the channelOpeningStateBucket.
func (f *Manager) saveChannelOpeningState(chanPoint *wire.OutPoint,