Merge pull request #5643 from bottlepay/updatepolicy-nested-tx

routing/localchans: fix nested db tx
This commit is contained in:
Oliver Gugger 2021-08-24 17:42:39 +02:00 committed by GitHub
commit 93d12cd9fc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 68 additions and 36 deletions

View file

@ -7,6 +7,7 @@ import (
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
)
// LiveChannelSource is an interface that allows us to query for the set of
@ -17,8 +18,9 @@ type LiveChannelSource interface {
FetchAllChannels() ([]*channeldb.OpenChannel, error)
// FetchChannel attempts to locate a live channel identified by the
// passed chanPoint.
FetchChannel(chanPoint wire.OutPoint) (*channeldb.OpenChannel, error)
// passed chanPoint. Optionally an existing db tx can be supplied.
FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) (
*channeldb.OpenChannel, error)
// AddrsForNode returns all known addresses for the target node public
// key.
@ -55,7 +57,7 @@ func FetchBackupForChan(chanPoint wire.OutPoint,
// First, we'll query the channel source to see if the channel is known
// and open within the database.
targetChan, err := chanSource.FetchChannel(chanPoint)
targetChan, err := chanSource.FetchChannel(nil, chanPoint)
if err != nil {
// If we can't find the channel, then we return with an error,
// as we have nothing to backup.

View file

@ -8,6 +8,7 @@ import (
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
)
type mockChannelSource struct {
@ -38,7 +39,9 @@ func (m *mockChannelSource) FetchAllChannels() ([]*channeldb.OpenChannel, error)
return chans, nil
}
func (m *mockChannelSource) FetchChannel(chanPoint wire.OutPoint) (*channeldb.OpenChannel, error) {
func (m *mockChannelSource) FetchChannel(_ kvdb.RTx, chanPoint wire.OutPoint) (
*channeldb.OpenChannel, error) {
if m.failQuery {
return nil, fmt.Errorf("fail")
}

View file

@ -501,7 +501,11 @@ func (d *DB) fetchNodeChannels(chainBucket kvdb.RBucket) ([]*OpenChannel, error)
// FetchChannel attempts to locate a channel specified by the passed channel
// point. If the channel cannot be found, then an error will be returned.
func (d *DB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel, error) {
// Optionally an existing db tx can be supplied. Optionally an existing db tx
// can be supplied.
func (d *DB) FetchChannel(tx kvdb.RTx, chanPoint wire.OutPoint) (*OpenChannel,
error) {
var (
targetChan *OpenChannel
targetChanPoint bytes.Buffer
@ -583,7 +587,12 @@ func (d *DB) FetchChannel(chanPoint wire.OutPoint) (*OpenChannel, error) {
})
}
err := kvdb.View(d, chanScan, func() {})
var err error
if tx == nil {
err = kvdb.View(d, chanScan, func() {})
} else {
err = chanScan(tx)
}
if err != nil {
return nil, err
}
@ -1102,7 +1111,7 @@ func (d *DB) AbandonChannel(chanPoint *wire.OutPoint, bestHeight uint32) error {
// With the chanPoint constructed, we'll attempt to find the target
// channel in the database. If we can't find the channel, then we'll
// return the error back to the caller.
dbChan, err := d.FetchChannel(*chanPoint)
dbChan, err := d.FetchChannel(nil, *chanPoint)
switch {
// If the channel wasn't found, then it's possible that it was already
// abandoned from the database.

View file

@ -255,7 +255,7 @@ func TestFetchChannel(t *testing.T) {
channelState := createTestChannel(t, cdb, openChannelOption())
// Next, attempt to fetch the channel by its chan point.
dbChannel, err := cdb.FetchChannel(channelState.FundingOutpoint)
dbChannel, err := cdb.FetchChannel(nil, channelState.FundingOutpoint)
if err != nil {
t.Fatalf("unable to fetch channel: %v", err)
}
@ -275,7 +275,7 @@ func TestFetchChannel(t *testing.T) {
}
channelState2.FundingOutpoint.Index ^= 1
_, err = cdb.FetchChannel(channelState2.FundingOutpoint)
_, err = cdb.FetchChannel(nil, channelState2.FundingOutpoint)
if err == nil {
t.Fatalf("expected query to fail")
}
@ -416,7 +416,7 @@ func TestRestoreChannelShells(t *testing.T) {
// We should also be able to find the channel if we query for it
// directly.
_, err = cdb.FetchChannel(channelShell.Chan.FundingOutpoint)
_, err = cdb.FetchChannel(nil, channelShell.Chan.FundingOutpoint)
if err != nil {
t.Fatalf("unable to fetch channel: %v", err)
}
@ -470,7 +470,7 @@ func TestAbandonChannel(t *testing.T) {
// At this point, the channel should no longer be found in the set of
// open channels.
_, err = cdb.FetchChannel(chanState.FundingOutpoint)
_, err = cdb.FetchChannel(nil, chanState.FundingOutpoint)
if err != ErrChannelNotFound {
t.Fatalf("channel should not have been found: %v", err)
}

View file

@ -134,7 +134,7 @@ func (c *ChannelNotifier) NotifyPendingOpenChannelEvent(chanPoint wire.OutPoint,
// channel has gone from pending open to open.
func (c *ChannelNotifier) NotifyOpenChannelEvent(chanPoint wire.OutPoint) {
// Fetch the relevant channel from the database.
channel, err := c.chanDB.FetchChannel(chanPoint)
channel, err := c.chanDB.FetchChannel(nil, chanPoint)
if err != nil {
log.Warnf("Unable to fetch open channel from the db: %v", err)
}

View file

@ -258,7 +258,7 @@ func (a *arbChannel) NewAnchorResolutions() (*lnwallet.AnchorResolutions,
// same instance that is used by the link.
chanPoint := a.channel.FundingOutpoint
channel, err := a.c.chanSource.FetchChannel(chanPoint)
channel, err := a.c.chanSource.FetchChannel(nil, chanPoint)
if err != nil {
return nil, err
}
@ -301,7 +301,7 @@ func (a *arbChannel) ForceCloseChan() (*lnwallet.LocalForceCloseSummary, error)
// Now that we know the link can't mutate the channel
// state, we'll read the channel from disk the target
// channel according to its channel point.
channel, err := a.c.chanSource.FetchChannel(chanPoint)
channel, err := a.c.chanSource.FetchChannel(nil, chanPoint)
if err != nil {
return nil, err
}

View file

@ -15,6 +15,7 @@ import (
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
@ -1210,6 +1211,7 @@ func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
edgesToUpdate []updateTuple
)
err := d.cfg.Router.ForAllOutgoingChannels(func(
_ kvdb.RTx,
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {

View file

@ -25,6 +25,7 @@ import (
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lntest/mock"
"github.com/lightningnetwork/lnd/lntest/wait"
@ -200,7 +201,8 @@ func (r *mockGraphSource) ForEachNode(func(node *channeldb.LightningNode) error)
return nil
}
func (r *mockGraphSource) ForAllOutgoingChannels(cb func(i *channeldb.ChannelEdgeInfo,
func (r *mockGraphSource) ForAllOutgoingChannels(cb func(tx kvdb.RTx,
i *channeldb.ChannelEdgeInfo,
c *channeldb.ChannelEdgePolicy) error) error {
r.mu.Lock()
@ -223,7 +225,9 @@ func (r *mockGraphSource) ForAllOutgoingChannels(cb func(i *channeldb.ChannelEdg
}
for _, channel := range chans {
cb(channel.Info, channel.Policy1)
if err := cb(nil, channel.Info, channel.Policy1); err != nil {
return err
}
}
return nil
@ -3568,6 +3572,7 @@ out:
const newTimeLockDelta = 100
var edgesToUpdate []EdgeWithInfo
err = ctx.router.ForAllOutgoingChannels(func(
_ kvdb.RTx,
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {

View file

@ -153,6 +153,8 @@ you.
* [Fixed context leak in integration tests, and properly handled context
timeout](https://github.com/lightningnetwork/lnd/pull/5646).
* [Removed nested db tx](https://github.com/lightningnetwork/lnd/pull/5643)
## Database
* [Ensure single writer for legacy

View file

@ -282,7 +282,7 @@ func initAutoPilot(svr *server, cfg *lncfg.AutoPilot,
ChannelInfo: func(chanPoint wire.OutPoint) (
*autopilot.LocalChannel, error) {
channel, err := svr.chanStateDB.FetchChannel(chanPoint)
channel, err := svr.chanStateDB.FetchChannel(nil, chanPoint)
if err != nil {
return nil, err
}

View file

@ -8,6 +8,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/discovery"
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
)
@ -27,12 +28,14 @@ type Manager struct {
// ForAllOutgoingChannels is required to iterate over all our local
// channels.
ForAllOutgoingChannels func(cb func(*channeldb.ChannelEdgeInfo,
ForAllOutgoingChannels func(cb func(kvdb.RTx,
*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy) error) error
// FetchChannel is used to query local channel parameters.
FetchChannel func(chanPoint wire.OutPoint) (*channeldb.OpenChannel,
error)
// FetchChannel is used to query local channel parameters. Optionally an
// existing db tx can be supplied.
FetchChannel func(tx kvdb.RTx, chanPoint wire.OutPoint) (
*channeldb.OpenChannel, error)
// policyUpdateLock ensures that the database and the link do not fall
// out of sync if there are concurrent fee update calls. Without it,
@ -66,6 +69,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
// If we have a filter then we'll only collected those channels,
// otherwise we'll collect them all.
err := r.ForAllOutgoingChannels(func(
tx kvdb.RTx,
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {
@ -77,7 +81,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
}
// Apply the new policy to the edge.
err := r.updateEdge(info.ChannelPoint, edge, newSchema)
err := r.updateEdge(tx, info.ChannelPoint, edge, newSchema)
if err != nil {
log.Warnf("Cannot update policy for %v: %v\n",
info.ChannelPoint, err,
@ -123,7 +127,7 @@ func (r *Manager) UpdatePolicy(newSchema routing.ChannelPolicy,
}
// updateEdge updates the given edge with the new schema.
func (r *Manager) updateEdge(chanPoint wire.OutPoint,
func (r *Manager) updateEdge(tx kvdb.RTx, chanPoint wire.OutPoint,
edge *channeldb.ChannelEdgePolicy,
newSchema routing.ChannelPolicy) error {
@ -135,7 +139,7 @@ func (r *Manager) updateEdge(chanPoint wire.OutPoint,
edge.TimeLockDelta = uint16(newSchema.TimeLockDelta)
// Retrieve negotiated channel htlc amt limits.
amtMin, amtMax, err := r.getHtlcAmtLimits(chanPoint)
amtMin, amtMax, err := r.getHtlcAmtLimits(tx, chanPoint)
if err != nil {
return nil
}
@ -194,10 +198,10 @@ func (r *Manager) updateEdge(chanPoint wire.OutPoint,
// getHtlcAmtLimits retrieves the negotiated channel min and max htlc amount
// constraints.
func (r *Manager) getHtlcAmtLimits(chanPoint wire.OutPoint) (
func (r *Manager) getHtlcAmtLimits(tx kvdb.RTx, chanPoint wire.OutPoint) (
lnwire.MilliSatoshi, lnwire.MilliSatoshi, error) {
ch, err := r.FetchChannel(chanPoint)
ch, err := r.FetchChannel(tx, chanPoint)
if err != nil {
return 0, 0, err
}

View file

@ -3,6 +3,7 @@ package localchans
import (
"testing"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/btcsuite/btcd/chaincfg/chainhash"
@ -88,10 +89,12 @@ func TestManager(t *testing.T) {
return nil
}
forAllOutgoingChannels := func(cb func(*channeldb.ChannelEdgeInfo,
forAllOutgoingChannels := func(cb func(kvdb.RTx,
*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy) error) error {
return cb(
nil,
&channeldb.ChannelEdgeInfo{
Capacity: chanCap,
ChannelPoint: chanPoint,
@ -100,8 +103,8 @@ func TestManager(t *testing.T) {
)
}
fetchChannel := func(chanPoint wire.OutPoint) (*channeldb.OpenChannel,
error) {
fetchChannel := func(tx kvdb.RTx, chanPoint wire.OutPoint) (
*channeldb.OpenChannel, error) {
constraints := channeldb.ChannelConstraints{
MaxPendingAmount: maxPendingAmount,

View file

@ -137,7 +137,8 @@ type ChannelGraphSource interface {
// ForAllOutgoingChannels is used to iterate over all channels
// emanating from the "source" node which is the center of the
// star-graph.
ForAllOutgoingChannels(cb func(c *channeldb.ChannelEdgeInfo,
ForAllOutgoingChannels(cb func(tx kvdb.RTx,
c *channeldb.ChannelEdgeInfo,
e *channeldb.ChannelEdgePolicy) error) error
// CurrentBlockHeight returns the block height from POV of the router
@ -2426,17 +2427,18 @@ func (r *ChannelRouter) ForEachNode(cb func(*channeldb.LightningNode) error) err
// the router.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) ForAllOutgoingChannels(cb func(*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy) error) error {
func (r *ChannelRouter) ForAllOutgoingChannels(cb func(kvdb.RTx,
*channeldb.ChannelEdgeInfo, *channeldb.ChannelEdgePolicy) error) error {
return r.selfNode.ForEachChannel(nil, func(_ kvdb.RTx, c *channeldb.ChannelEdgeInfo,
return r.selfNode.ForEachChannel(nil, func(tx kvdb.RTx,
c *channeldb.ChannelEdgeInfo,
e, _ *channeldb.ChannelEdgePolicy) error {
if e == nil {
return fmt.Errorf("channel from self node has no policy")
}
return cb(c, e)
return cb(tx, c, e)
})
}

View file

@ -2124,7 +2124,7 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest,
// First, we'll fetch the channel as is, as we'll need to examine it
// regardless of if this is a force close or not.
channel, err := r.server.chanStateDB.FetchChannel(*chanPoint)
channel, err := r.server.chanStateDB.FetchChannel(nil, *chanPoint)
if err != nil {
return err
}
@ -2402,7 +2402,7 @@ func (r *rpcServer) AbandonChannel(_ context.Context,
return nil, err
}
dbChan, err := r.server.chanStateDB.FetchChannel(*chanPoint)
dbChan, err := r.server.chanStateDB.FetchChannel(nil, *chanPoint)
switch {
// If the channel isn't found in the set of open channels, then we can
// continue on as it can't be loaded into the link/peer.