Merge pull request #6108 from bottlepay/pg-fix

kvdb/postgres: fix context cancellation
This commit is contained in:
Oliver Gugger 2021-12-22 15:44:51 +01:00 committed by GitHub
commit 5d9b59ac5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 74 additions and 48 deletions

View File

@ -45,6 +45,8 @@
* [Clarify log message about not running within
systemd](https://github.com/lightningnetwork/lnd/pull/6096)
* [Fix Postgres context cancellation](https://github.com/lightningnetwork/lnd/pull/6108)
## RPC Server
* [ChanStatusFlags is now

2
go.mod
View File

@ -46,7 +46,7 @@ require (
github.com/lightningnetwork/lnd/cert v1.1.0
github.com/lightningnetwork/lnd/clock v1.1.0
github.com/lightningnetwork/lnd/healthcheck v1.2.0
github.com/lightningnetwork/lnd/kvdb v1.2.3
github.com/lightningnetwork/lnd/kvdb v1.2.4
github.com/lightningnetwork/lnd/queue v1.1.0
github.com/lightningnetwork/lnd/ticker v1.1.0
github.com/ltcsuite/ltcd v0.0.0-20190101042124-f37f8bf35796

View File

@ -10,6 +10,7 @@ import (
"encoding/hex"
"fmt"
"strings"
"time"
"github.com/btcsuite/btcwallet/walletdb"
embeddedpostgres "github.com/fergusstrange/embedded-postgres"
@ -26,10 +27,14 @@ func getTestDsn(dbName string) string {
var testPostgres *embeddedpostgres.EmbeddedPostgres
const testMaxConnections = 50
// StartEmbeddedPostgres starts an embedded postgres instance. This only needs
// to be done once, because NewFixture will create random new databases on every
// call. It returns a stop closure that stops the database if called.
func StartEmbeddedPostgres() (func() error, error) {
Init(testMaxConnections)
postgres := embeddedpostgres.NewDatabase(
embeddedpostgres.DefaultConfig().
Port(9876))
@ -77,7 +82,8 @@ func NewFixture(dbName string) (*fixture, error) {
db, err := newPostgresBackend(
context.Background(),
&Config{
Dsn: dsn,
Dsn: dsn,
Timeout: time.Minute,
},
prefix,
)

View File

@ -75,10 +75,12 @@ func (b *readWriteBucket) Get(key []byte) []byte {
}
var value *[]byte
err := b.tx.QueryRow(
row, cancel := b.tx.QueryRow(
"SELECT value FROM "+b.table+" WHERE "+parentSelector(b.id)+
" AND key=$1", key,
).Scan(&value)
)
defer cancel()
err := row.Scan(&value)
switch {
case err == sql.ErrNoRows:
@ -106,10 +108,12 @@ func (b *readWriteBucket) NestedReadWriteBucket(
}
var id int64
err := b.tx.QueryRow(
row, cancel := b.tx.QueryRow(
"SELECT id FROM "+b.table+" WHERE "+parentSelector(b.id)+
" AND key=$1 AND value IS NULL", key,
).Scan(&id)
)
defer cancel()
err := row.Scan(&id)
switch {
case err == sql.ErrNoRows:
@ -139,10 +143,12 @@ func (b *readWriteBucket) CreateBucket(key []byte) (
value *[]byte
id int64
)
err := b.tx.QueryRow(
row, cancel := b.tx.QueryRow(
"SELECT id,value FROM "+b.table+" WHERE "+parentSelector(b.id)+
" AND key=$1", key,
).Scan(&id, &value)
)
defer cancel()
err := row.Scan(&id, &value)
switch {
case err == sql.ErrNoRows:
@ -159,10 +165,12 @@ func (b *readWriteBucket) CreateBucket(key []byte) (
// Bucket does not yet exist, so create it. Postgres will generate a
// bucket id for the new bucket.
err = b.tx.QueryRow(
row, cancel = b.tx.QueryRow(
"INSERT INTO "+b.table+" (parent_id, key) "+
"VALUES($1, $2) RETURNING id", b.id, key,
).Scan(&id)
)
defer cancel()
err = row.Scan(&id)
if err != nil {
return nil, err
}
@ -187,19 +195,23 @@ func (b *readWriteBucket) CreateBucketIfNotExists(key []byte) (
value *[]byte
id int64
)
err := b.tx.QueryRow(
row, cancel := b.tx.QueryRow(
"SELECT id,value FROM "+b.table+" WHERE "+parentSelector(b.id)+
" AND key=$1", key,
).Scan(&id, &value)
)
defer cancel()
err := row.Scan(&id, &value)
switch {
// Bucket does not yet exist, so create it now. Postgres will generate a
// bucket id for the new bucket.
case err == sql.ErrNoRows:
err = b.tx.QueryRow(
row, cancel := b.tx.QueryRow(
"INSERT INTO "+b.table+" (parent_id, key) "+
"VALUES($1, $2) RETURNING id", b.id, key).
Scan(&id)
"VALUES($1, $2) RETURNING id", b.id, key,
)
defer cancel()
err := row.Scan(&id)
if err != nil {
return nil, err
}
@ -315,10 +327,12 @@ func (b *readWriteBucket) Delete(key []byte) error {
// Check to see if a bucket with this key exists.
var dummy int
err := b.tx.QueryRow(
row, cancel := b.tx.QueryRow(
"SELECT 1 FROM "+b.table+" WHERE "+parentSelector(b.id)+
" AND key=$1 AND value IS NULL", key,
).Scan(&dummy)
)
defer cancel()
err := row.Scan(&dummy)
switch {
// No bucket exists, proceed to deletion of the key.
case err == sql.ErrNoRows:
@ -395,11 +409,13 @@ func (b *readWriteBucket) Sequence() uint64 {
}
var seq int64
err := b.tx.QueryRow(
row, cancel := b.tx.QueryRow(
"SELECT sequence FROM "+b.table+" WHERE id=$1 "+
"AND sequence IS NOT NULL",
b.id,
).Scan(&seq)
)
defer cancel()
err := row.Scan(&seq)
switch {
case err == sql.ErrNoRows:

View File

@ -31,11 +31,13 @@ func (c *readWriteCursor) First() ([]byte, []byte) {
key []byte
value []byte
)
err := c.bucket.tx.QueryRow(
"SELECT key, value FROM "+c.bucket.table+" WHERE "+
parentSelector(c.bucket.id)+
row, cancel := c.bucket.tx.QueryRow(
"SELECT key, value FROM " + c.bucket.table + " WHERE " +
parentSelector(c.bucket.id) +
" ORDER BY key LIMIT 1",
).Scan(&key, &value)
)
defer cancel()
err := row.Scan(&key, &value)
switch {
case err == sql.ErrNoRows:
@ -59,11 +61,13 @@ func (c *readWriteCursor) Last() ([]byte, []byte) {
key []byte
value []byte
)
err := c.bucket.tx.QueryRow(
"SELECT key, value FROM "+c.bucket.table+" WHERE "+
parentSelector(c.bucket.id)+
row, cancel := c.bucket.tx.QueryRow(
"SELECT key, value FROM " + c.bucket.table + " WHERE " +
parentSelector(c.bucket.id) +
" ORDER BY key DESC LIMIT 1",
).Scan(&key, &value)
)
defer cancel()
err := row.Scan(&key, &value)
switch {
case err == sql.ErrNoRows:
@ -87,12 +91,14 @@ func (c *readWriteCursor) Next() ([]byte, []byte) {
key []byte
value []byte
)
err := c.bucket.tx.QueryRow(
row, cancel := c.bucket.tx.QueryRow(
"SELECT key, value FROM "+c.bucket.table+" WHERE "+
parentSelector(c.bucket.id)+
" AND key>$1 ORDER BY key LIMIT 1",
c.currKey,
).Scan(&key, &value)
)
defer cancel()
err := row.Scan(&key, &value)
switch {
case err == sql.ErrNoRows:
@ -116,12 +122,14 @@ func (c *readWriteCursor) Prev() ([]byte, []byte) {
key []byte
value []byte
)
err := c.bucket.tx.QueryRow(
row, cancel := c.bucket.tx.QueryRow(
"SELECT key, value FROM "+c.bucket.table+" WHERE "+
parentSelector(c.bucket.id)+
" AND key<$1 ORDER BY key DESC LIMIT 1",
c.currKey,
).Scan(&key, &value)
)
defer cancel()
err := row.Scan(&key, &value)
switch {
case err == sql.ErrNoRows:
@ -152,12 +160,14 @@ func (c *readWriteCursor) Seek(seek []byte) ([]byte, []byte) {
key []byte
value []byte
)
err := c.bucket.tx.QueryRow(
row, cancel := c.bucket.tx.QueryRow(
"SELECT key, value FROM "+c.bucket.table+" WHERE "+
parentSelector(c.bucket.id)+
" AND key>=$1 ORDER BY key LIMIT 1",
seek,
).Scan(&key, &value)
)
defer cancel()
err := row.Scan(&key, &value)
switch {
case err == sql.ErrNoRows:
@ -180,12 +190,14 @@ func (c *readWriteCursor) Seek(seek []byte) ([]byte, []byte) {
func (c *readWriteCursor) Delete() error {
// Get first record at or after cursor.
var key []byte
err := c.bucket.tx.QueryRow(
row, cancel := c.bucket.tx.QueryRow(
"SELECT key FROM "+c.bucket.table+" WHERE "+
parentSelector(c.bucket.id)+
" AND key>=$1 ORDER BY key LIMIT 1",
c.currKey,
).Scan(&key)
)
defer cancel()
err := row.Scan(&key)
switch {
case err == sql.ErrNoRows:

View File

@ -158,21 +158,11 @@ func (tx *readWriteTx) OnCommit(cb func()) {
}
// QueryRow executes a QueryRow call with a timeout context.
func (tx *readWriteTx) QueryRow(query string, args ...interface{}) *sql.Row {
ctx, cancel := tx.db.getTimeoutCtx()
defer cancel()
return tx.tx.QueryRowContext(ctx, query, args...)
}
// Query executes a Query call with a timeout context.
func (tx *readWriteTx) Query(query string, args ...interface{}) (*sql.Rows,
error) {
func (tx *readWriteTx) QueryRow(query string, args ...interface{}) (*sql.Row,
func()) {
ctx, cancel := tx.db.getTimeoutCtx()
defer cancel()
return tx.tx.QueryContext(ctx, query, args...)
return tx.tx.QueryRowContext(ctx, query, args...), cancel
}
// Exec executes a Exec call with a timeout context.