mirror of
https://github.com/lightningnetwork/lnd.git
synced 2025-03-04 01:36:24 +01:00
kvdb/postgres: fix context cancellation
This commit is contained in:
parent
62a2f3c809
commit
0cae55c162
5 changed files with 69 additions and 47 deletions
|
@ -45,6 +45,8 @@
|
||||||
* [Clarify log message about not running within
|
* [Clarify log message about not running within
|
||||||
systemd](https://github.com/lightningnetwork/lnd/pull/6096)
|
systemd](https://github.com/lightningnetwork/lnd/pull/6096)
|
||||||
|
|
||||||
|
* [Fix Postgres context cancellation](https://github.com/lightningnetwork/lnd/pull/6108)
|
||||||
|
|
||||||
## RPC Server
|
## RPC Server
|
||||||
|
|
||||||
* [ChanStatusFlags is now
|
* [ChanStatusFlags is now
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/btcsuite/btcwallet/walletdb"
|
"github.com/btcsuite/btcwallet/walletdb"
|
||||||
embeddedpostgres "github.com/fergusstrange/embedded-postgres"
|
embeddedpostgres "github.com/fergusstrange/embedded-postgres"
|
||||||
|
@ -82,6 +83,7 @@ func NewFixture(dbName string) (*fixture, error) {
|
||||||
context.Background(),
|
context.Background(),
|
||||||
&Config{
|
&Config{
|
||||||
Dsn: dsn,
|
Dsn: dsn,
|
||||||
|
Timeout: time.Minute,
|
||||||
},
|
},
|
||||||
prefix,
|
prefix,
|
||||||
)
|
)
|
||||||
|
|
|
@ -75,10 +75,12 @@ func (b *readWriteBucket) Get(key []byte) []byte {
|
||||||
}
|
}
|
||||||
|
|
||||||
var value *[]byte
|
var value *[]byte
|
||||||
err := b.tx.QueryRow(
|
row, cancel := b.tx.QueryRow(
|
||||||
"SELECT value FROM "+b.table+" WHERE "+parentSelector(b.id)+
|
"SELECT value FROM "+b.table+" WHERE "+parentSelector(b.id)+
|
||||||
" AND key=$1", key,
|
" AND key=$1", key,
|
||||||
).Scan(&value)
|
)
|
||||||
|
defer cancel()
|
||||||
|
err := row.Scan(&value)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case err == sql.ErrNoRows:
|
case err == sql.ErrNoRows:
|
||||||
|
@ -106,10 +108,12 @@ func (b *readWriteBucket) NestedReadWriteBucket(
|
||||||
}
|
}
|
||||||
|
|
||||||
var id int64
|
var id int64
|
||||||
err := b.tx.QueryRow(
|
row, cancel := b.tx.QueryRow(
|
||||||
"SELECT id FROM "+b.table+" WHERE "+parentSelector(b.id)+
|
"SELECT id FROM "+b.table+" WHERE "+parentSelector(b.id)+
|
||||||
" AND key=$1 AND value IS NULL", key,
|
" AND key=$1 AND value IS NULL", key,
|
||||||
).Scan(&id)
|
)
|
||||||
|
defer cancel()
|
||||||
|
err := row.Scan(&id)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case err == sql.ErrNoRows:
|
case err == sql.ErrNoRows:
|
||||||
|
@ -139,10 +143,12 @@ func (b *readWriteBucket) CreateBucket(key []byte) (
|
||||||
value *[]byte
|
value *[]byte
|
||||||
id int64
|
id int64
|
||||||
)
|
)
|
||||||
err := b.tx.QueryRow(
|
row, cancel := b.tx.QueryRow(
|
||||||
"SELECT id,value FROM "+b.table+" WHERE "+parentSelector(b.id)+
|
"SELECT id,value FROM "+b.table+" WHERE "+parentSelector(b.id)+
|
||||||
" AND key=$1", key,
|
" AND key=$1", key,
|
||||||
).Scan(&id, &value)
|
)
|
||||||
|
defer cancel()
|
||||||
|
err := row.Scan(&id, &value)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case err == sql.ErrNoRows:
|
case err == sql.ErrNoRows:
|
||||||
|
@ -159,10 +165,12 @@ func (b *readWriteBucket) CreateBucket(key []byte) (
|
||||||
|
|
||||||
// Bucket does not yet exist, so create it. Postgres will generate a
|
// Bucket does not yet exist, so create it. Postgres will generate a
|
||||||
// bucket id for the new bucket.
|
// bucket id for the new bucket.
|
||||||
err = b.tx.QueryRow(
|
row, cancel = b.tx.QueryRow(
|
||||||
"INSERT INTO "+b.table+" (parent_id, key) "+
|
"INSERT INTO "+b.table+" (parent_id, key) "+
|
||||||
"VALUES($1, $2) RETURNING id", b.id, key,
|
"VALUES($1, $2) RETURNING id", b.id, key,
|
||||||
).Scan(&id)
|
)
|
||||||
|
defer cancel()
|
||||||
|
err = row.Scan(&id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -187,19 +195,23 @@ func (b *readWriteBucket) CreateBucketIfNotExists(key []byte) (
|
||||||
value *[]byte
|
value *[]byte
|
||||||
id int64
|
id int64
|
||||||
)
|
)
|
||||||
err := b.tx.QueryRow(
|
row, cancel := b.tx.QueryRow(
|
||||||
"SELECT id,value FROM "+b.table+" WHERE "+parentSelector(b.id)+
|
"SELECT id,value FROM "+b.table+" WHERE "+parentSelector(b.id)+
|
||||||
" AND key=$1", key,
|
" AND key=$1", key,
|
||||||
).Scan(&id, &value)
|
)
|
||||||
|
defer cancel()
|
||||||
|
err := row.Scan(&id, &value)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
// Bucket does not yet exist, so create it now. Postgres will generate a
|
// Bucket does not yet exist, so create it now. Postgres will generate a
|
||||||
// bucket id for the new bucket.
|
// bucket id for the new bucket.
|
||||||
case err == sql.ErrNoRows:
|
case err == sql.ErrNoRows:
|
||||||
err = b.tx.QueryRow(
|
row, cancel := b.tx.QueryRow(
|
||||||
"INSERT INTO "+b.table+" (parent_id, key) "+
|
"INSERT INTO "+b.table+" (parent_id, key) "+
|
||||||
"VALUES($1, $2) RETURNING id", b.id, key).
|
"VALUES($1, $2) RETURNING id", b.id, key,
|
||||||
Scan(&id)
|
)
|
||||||
|
defer cancel()
|
||||||
|
err := row.Scan(&id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -315,10 +327,12 @@ func (b *readWriteBucket) Delete(key []byte) error {
|
||||||
|
|
||||||
// Check to see if a bucket with this key exists.
|
// Check to see if a bucket with this key exists.
|
||||||
var dummy int
|
var dummy int
|
||||||
err := b.tx.QueryRow(
|
row, cancel := b.tx.QueryRow(
|
||||||
"SELECT 1 FROM "+b.table+" WHERE "+parentSelector(b.id)+
|
"SELECT 1 FROM "+b.table+" WHERE "+parentSelector(b.id)+
|
||||||
" AND key=$1 AND value IS NULL", key,
|
" AND key=$1 AND value IS NULL", key,
|
||||||
).Scan(&dummy)
|
)
|
||||||
|
defer cancel()
|
||||||
|
err := row.Scan(&dummy)
|
||||||
switch {
|
switch {
|
||||||
// No bucket exists, proceed to deletion of the key.
|
// No bucket exists, proceed to deletion of the key.
|
||||||
case err == sql.ErrNoRows:
|
case err == sql.ErrNoRows:
|
||||||
|
@ -395,11 +409,13 @@ func (b *readWriteBucket) Sequence() uint64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
var seq int64
|
var seq int64
|
||||||
err := b.tx.QueryRow(
|
row, cancel := b.tx.QueryRow(
|
||||||
"SELECT sequence FROM "+b.table+" WHERE id=$1 "+
|
"SELECT sequence FROM "+b.table+" WHERE id=$1 "+
|
||||||
"AND sequence IS NOT NULL",
|
"AND sequence IS NOT NULL",
|
||||||
b.id,
|
b.id,
|
||||||
).Scan(&seq)
|
)
|
||||||
|
defer cancel()
|
||||||
|
err := row.Scan(&seq)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case err == sql.ErrNoRows:
|
case err == sql.ErrNoRows:
|
||||||
|
|
|
@ -31,11 +31,13 @@ func (c *readWriteCursor) First() ([]byte, []byte) {
|
||||||
key []byte
|
key []byte
|
||||||
value []byte
|
value []byte
|
||||||
)
|
)
|
||||||
err := c.bucket.tx.QueryRow(
|
row, cancel := c.bucket.tx.QueryRow(
|
||||||
"SELECT key, value FROM "+c.bucket.table+" WHERE "+
|
"SELECT key, value FROM " + c.bucket.table + " WHERE " +
|
||||||
parentSelector(c.bucket.id)+
|
parentSelector(c.bucket.id) +
|
||||||
" ORDER BY key LIMIT 1",
|
" ORDER BY key LIMIT 1",
|
||||||
).Scan(&key, &value)
|
)
|
||||||
|
defer cancel()
|
||||||
|
err := row.Scan(&key, &value)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case err == sql.ErrNoRows:
|
case err == sql.ErrNoRows:
|
||||||
|
@ -59,11 +61,13 @@ func (c *readWriteCursor) Last() ([]byte, []byte) {
|
||||||
key []byte
|
key []byte
|
||||||
value []byte
|
value []byte
|
||||||
)
|
)
|
||||||
err := c.bucket.tx.QueryRow(
|
row, cancel := c.bucket.tx.QueryRow(
|
||||||
"SELECT key, value FROM "+c.bucket.table+" WHERE "+
|
"SELECT key, value FROM " + c.bucket.table + " WHERE " +
|
||||||
parentSelector(c.bucket.id)+
|
parentSelector(c.bucket.id) +
|
||||||
" ORDER BY key DESC LIMIT 1",
|
" ORDER BY key DESC LIMIT 1",
|
||||||
).Scan(&key, &value)
|
)
|
||||||
|
defer cancel()
|
||||||
|
err := row.Scan(&key, &value)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case err == sql.ErrNoRows:
|
case err == sql.ErrNoRows:
|
||||||
|
@ -87,12 +91,14 @@ func (c *readWriteCursor) Next() ([]byte, []byte) {
|
||||||
key []byte
|
key []byte
|
||||||
value []byte
|
value []byte
|
||||||
)
|
)
|
||||||
err := c.bucket.tx.QueryRow(
|
row, cancel := c.bucket.tx.QueryRow(
|
||||||
"SELECT key, value FROM "+c.bucket.table+" WHERE "+
|
"SELECT key, value FROM "+c.bucket.table+" WHERE "+
|
||||||
parentSelector(c.bucket.id)+
|
parentSelector(c.bucket.id)+
|
||||||
" AND key>$1 ORDER BY key LIMIT 1",
|
" AND key>$1 ORDER BY key LIMIT 1",
|
||||||
c.currKey,
|
c.currKey,
|
||||||
).Scan(&key, &value)
|
)
|
||||||
|
defer cancel()
|
||||||
|
err := row.Scan(&key, &value)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case err == sql.ErrNoRows:
|
case err == sql.ErrNoRows:
|
||||||
|
@ -116,12 +122,14 @@ func (c *readWriteCursor) Prev() ([]byte, []byte) {
|
||||||
key []byte
|
key []byte
|
||||||
value []byte
|
value []byte
|
||||||
)
|
)
|
||||||
err := c.bucket.tx.QueryRow(
|
row, cancel := c.bucket.tx.QueryRow(
|
||||||
"SELECT key, value FROM "+c.bucket.table+" WHERE "+
|
"SELECT key, value FROM "+c.bucket.table+" WHERE "+
|
||||||
parentSelector(c.bucket.id)+
|
parentSelector(c.bucket.id)+
|
||||||
" AND key<$1 ORDER BY key DESC LIMIT 1",
|
" AND key<$1 ORDER BY key DESC LIMIT 1",
|
||||||
c.currKey,
|
c.currKey,
|
||||||
).Scan(&key, &value)
|
)
|
||||||
|
defer cancel()
|
||||||
|
err := row.Scan(&key, &value)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case err == sql.ErrNoRows:
|
case err == sql.ErrNoRows:
|
||||||
|
@ -152,12 +160,14 @@ func (c *readWriteCursor) Seek(seek []byte) ([]byte, []byte) {
|
||||||
key []byte
|
key []byte
|
||||||
value []byte
|
value []byte
|
||||||
)
|
)
|
||||||
err := c.bucket.tx.QueryRow(
|
row, cancel := c.bucket.tx.QueryRow(
|
||||||
"SELECT key, value FROM "+c.bucket.table+" WHERE "+
|
"SELECT key, value FROM "+c.bucket.table+" WHERE "+
|
||||||
parentSelector(c.bucket.id)+
|
parentSelector(c.bucket.id)+
|
||||||
" AND key>=$1 ORDER BY key LIMIT 1",
|
" AND key>=$1 ORDER BY key LIMIT 1",
|
||||||
seek,
|
seek,
|
||||||
).Scan(&key, &value)
|
)
|
||||||
|
defer cancel()
|
||||||
|
err := row.Scan(&key, &value)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case err == sql.ErrNoRows:
|
case err == sql.ErrNoRows:
|
||||||
|
@ -180,12 +190,14 @@ func (c *readWriteCursor) Seek(seek []byte) ([]byte, []byte) {
|
||||||
func (c *readWriteCursor) Delete() error {
|
func (c *readWriteCursor) Delete() error {
|
||||||
// Get first record at or after cursor.
|
// Get first record at or after cursor.
|
||||||
var key []byte
|
var key []byte
|
||||||
err := c.bucket.tx.QueryRow(
|
row, cancel := c.bucket.tx.QueryRow(
|
||||||
"SELECT key FROM "+c.bucket.table+" WHERE "+
|
"SELECT key FROM "+c.bucket.table+" WHERE "+
|
||||||
parentSelector(c.bucket.id)+
|
parentSelector(c.bucket.id)+
|
||||||
" AND key>=$1 ORDER BY key LIMIT 1",
|
" AND key>=$1 ORDER BY key LIMIT 1",
|
||||||
c.currKey,
|
c.currKey,
|
||||||
).Scan(&key)
|
)
|
||||||
|
defer cancel()
|
||||||
|
err := row.Scan(&key)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case err == sql.ErrNoRows:
|
case err == sql.ErrNoRows:
|
||||||
|
|
|
@ -158,21 +158,11 @@ func (tx *readWriteTx) OnCommit(cb func()) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryRow executes a QueryRow call with a timeout context.
|
// QueryRow executes a QueryRow call with a timeout context.
|
||||||
func (tx *readWriteTx) QueryRow(query string, args ...interface{}) *sql.Row {
|
func (tx *readWriteTx) QueryRow(query string, args ...interface{}) (*sql.Row,
|
||||||
ctx, cancel := tx.db.getTimeoutCtx()
|
func()) {
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
return tx.tx.QueryRowContext(ctx, query, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Query executes a Query call with a timeout context.
|
|
||||||
func (tx *readWriteTx) Query(query string, args ...interface{}) (*sql.Rows,
|
|
||||||
error) {
|
|
||||||
|
|
||||||
ctx, cancel := tx.db.getTimeoutCtx()
|
ctx, cancel := tx.db.getTimeoutCtx()
|
||||||
defer cancel()
|
return tx.tx.QueryRowContext(ctx, query, args...), cancel
|
||||||
|
|
||||||
return tx.tx.QueryContext(ctx, query, args...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exec executes a Exec call with a timeout context.
|
// Exec executes a Exec call with a timeout context.
|
||||||
|
|
Loading…
Add table
Reference in a new issue