From 62a2f3c809fa1d459380b658145ed620ad900aa5 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 20 Dec 2021 14:57:28 +0100 Subject: [PATCH 1/2] kvdb/postgres: fix tests --- go.mod | 2 +- kvdb/postgres/fixture.go | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 693c09d3d..0af0bdadf 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,7 @@ require ( github.com/lightningnetwork/lnd/cert v1.1.0 github.com/lightningnetwork/lnd/clock v1.1.0 github.com/lightningnetwork/lnd/healthcheck v1.2.0 - github.com/lightningnetwork/lnd/kvdb v1.2.3 + github.com/lightningnetwork/lnd/kvdb v1.2.4 github.com/lightningnetwork/lnd/queue v1.1.0 github.com/lightningnetwork/lnd/ticker v1.1.0 github.com/ltcsuite/ltcd v0.0.0-20190101042124-f37f8bf35796 diff --git a/kvdb/postgres/fixture.go b/kvdb/postgres/fixture.go index fe6c323d5..16170f80a 100644 --- a/kvdb/postgres/fixture.go +++ b/kvdb/postgres/fixture.go @@ -26,10 +26,14 @@ func getTestDsn(dbName string) string { var testPostgres *embeddedpostgres.EmbeddedPostgres +const testMaxConnections = 50 + // StartEmbeddedPostgres starts an embedded postgres instance. This only needs // to be done once, because NewFixture will create random new databases on every // call. It returns a stop closure that stops the database if called. func StartEmbeddedPostgres() (func() error, error) { + Init(testMaxConnections) + postgres := embeddedpostgres.NewDatabase( embeddedpostgres.DefaultConfig(). Port(9876)) From 0cae55c1627ddb725eb043d6933caa6f10a1a5d2 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 22 Dec 2021 12:33:50 +0100 Subject: [PATCH 2/2] kvdb/postgres: fix context cancellation --- docs/release-notes/release-notes-0.14.2.md | 2 + kvdb/postgres/fixture.go | 4 +- kvdb/postgres/readwrite_bucket.go | 50 ++++++++++++++-------- kvdb/postgres/readwrite_cursor.go | 44 ++++++++++++------- kvdb/postgres/readwrite_tx.go | 16 ++----- 5 files changed, 69 insertions(+), 47 deletions(-) diff --git a/docs/release-notes/release-notes-0.14.2.md b/docs/release-notes/release-notes-0.14.2.md index 4514b13ae..d24e34d24 100644 --- a/docs/release-notes/release-notes-0.14.2.md +++ b/docs/release-notes/release-notes-0.14.2.md @@ -45,6 +45,8 @@ * [Clarify log message about not running within systemd](https://github.com/lightningnetwork/lnd/pull/6096) +* [Fix Postgres context cancellation](https://github.com/lightningnetwork/lnd/pull/6108) + ## RPC Server * [ChanStatusFlags is now diff --git a/kvdb/postgres/fixture.go b/kvdb/postgres/fixture.go index 16170f80a..94356c31a 100644 --- a/kvdb/postgres/fixture.go +++ b/kvdb/postgres/fixture.go @@ -10,6 +10,7 @@ import ( "encoding/hex" "fmt" "strings" + "time" "github.com/btcsuite/btcwallet/walletdb" embeddedpostgres "github.com/fergusstrange/embedded-postgres" @@ -81,7 +82,8 @@ func NewFixture(dbName string) (*fixture, error) { db, err := newPostgresBackend( context.Background(), &Config{ - Dsn: dsn, + Dsn: dsn, + Timeout: time.Minute, }, prefix, ) diff --git a/kvdb/postgres/readwrite_bucket.go b/kvdb/postgres/readwrite_bucket.go index 131151d86..5e01aa6cd 100644 --- a/kvdb/postgres/readwrite_bucket.go +++ b/kvdb/postgres/readwrite_bucket.go @@ -75,10 +75,12 @@ func (b *readWriteBucket) Get(key []byte) []byte { } var value *[]byte - err := b.tx.QueryRow( + row, cancel := b.tx.QueryRow( "SELECT value FROM "+b.table+" WHERE "+parentSelector(b.id)+ " AND key=$1", key, - ).Scan(&value) + ) + defer cancel() + err := row.Scan(&value) switch { case err == sql.ErrNoRows: @@ -106,10 +108,12 @@ func (b *readWriteBucket) NestedReadWriteBucket( } var id int64 - err := b.tx.QueryRow( + row, cancel := b.tx.QueryRow( "SELECT id FROM "+b.table+" WHERE "+parentSelector(b.id)+ " AND key=$1 AND value IS NULL", key, - ).Scan(&id) + ) + defer cancel() + err := row.Scan(&id) switch { case err == sql.ErrNoRows: @@ -139,10 +143,12 @@ func (b *readWriteBucket) CreateBucket(key []byte) ( value *[]byte id int64 ) - err := b.tx.QueryRow( + row, cancel := b.tx.QueryRow( "SELECT id,value FROM "+b.table+" WHERE "+parentSelector(b.id)+ " AND key=$1", key, - ).Scan(&id, &value) + ) + defer cancel() + err := row.Scan(&id, &value) switch { case err == sql.ErrNoRows: @@ -159,10 +165,12 @@ func (b *readWriteBucket) CreateBucket(key []byte) ( // Bucket does not yet exist, so create it. Postgres will generate a // bucket id for the new bucket. - err = b.tx.QueryRow( + row, cancel = b.tx.QueryRow( "INSERT INTO "+b.table+" (parent_id, key) "+ "VALUES($1, $2) RETURNING id", b.id, key, - ).Scan(&id) + ) + defer cancel() + err = row.Scan(&id) if err != nil { return nil, err } @@ -187,19 +195,23 @@ func (b *readWriteBucket) CreateBucketIfNotExists(key []byte) ( value *[]byte id int64 ) - err := b.tx.QueryRow( + row, cancel := b.tx.QueryRow( "SELECT id,value FROM "+b.table+" WHERE "+parentSelector(b.id)+ " AND key=$1", key, - ).Scan(&id, &value) + ) + defer cancel() + err := row.Scan(&id, &value) switch { // Bucket does not yet exist, so create it now. Postgres will generate a // bucket id for the new bucket. case err == sql.ErrNoRows: - err = b.tx.QueryRow( + row, cancel := b.tx.QueryRow( "INSERT INTO "+b.table+" (parent_id, key) "+ - "VALUES($1, $2) RETURNING id", b.id, key). - Scan(&id) + "VALUES($1, $2) RETURNING id", b.id, key, + ) + defer cancel() + err := row.Scan(&id) if err != nil { return nil, err } @@ -315,10 +327,12 @@ func (b *readWriteBucket) Delete(key []byte) error { // Check to see if a bucket with this key exists. var dummy int - err := b.tx.QueryRow( + row, cancel := b.tx.QueryRow( "SELECT 1 FROM "+b.table+" WHERE "+parentSelector(b.id)+ " AND key=$1 AND value IS NULL", key, - ).Scan(&dummy) + ) + defer cancel() + err := row.Scan(&dummy) switch { // No bucket exists, proceed to deletion of the key. case err == sql.ErrNoRows: @@ -395,11 +409,13 @@ func (b *readWriteBucket) Sequence() uint64 { } var seq int64 - err := b.tx.QueryRow( + row, cancel := b.tx.QueryRow( "SELECT sequence FROM "+b.table+" WHERE id=$1 "+ "AND sequence IS NOT NULL", b.id, - ).Scan(&seq) + ) + defer cancel() + err := row.Scan(&seq) switch { case err == sql.ErrNoRows: diff --git a/kvdb/postgres/readwrite_cursor.go b/kvdb/postgres/readwrite_cursor.go index d56363bbf..80e321e06 100644 --- a/kvdb/postgres/readwrite_cursor.go +++ b/kvdb/postgres/readwrite_cursor.go @@ -31,11 +31,13 @@ func (c *readWriteCursor) First() ([]byte, []byte) { key []byte value []byte ) - err := c.bucket.tx.QueryRow( - "SELECT key, value FROM "+c.bucket.table+" WHERE "+ - parentSelector(c.bucket.id)+ + row, cancel := c.bucket.tx.QueryRow( + "SELECT key, value FROM " + c.bucket.table + " WHERE " + + parentSelector(c.bucket.id) + " ORDER BY key LIMIT 1", - ).Scan(&key, &value) + ) + defer cancel() + err := row.Scan(&key, &value) switch { case err == sql.ErrNoRows: @@ -59,11 +61,13 @@ func (c *readWriteCursor) Last() ([]byte, []byte) { key []byte value []byte ) - err := c.bucket.tx.QueryRow( - "SELECT key, value FROM "+c.bucket.table+" WHERE "+ - parentSelector(c.bucket.id)+ + row, cancel := c.bucket.tx.QueryRow( + "SELECT key, value FROM " + c.bucket.table + " WHERE " + + parentSelector(c.bucket.id) + " ORDER BY key DESC LIMIT 1", - ).Scan(&key, &value) + ) + defer cancel() + err := row.Scan(&key, &value) switch { case err == sql.ErrNoRows: @@ -87,12 +91,14 @@ func (c *readWriteCursor) Next() ([]byte, []byte) { key []byte value []byte ) - err := c.bucket.tx.QueryRow( + row, cancel := c.bucket.tx.QueryRow( "SELECT key, value FROM "+c.bucket.table+" WHERE "+ parentSelector(c.bucket.id)+ " AND key>$1 ORDER BY key LIMIT 1", c.currKey, - ).Scan(&key, &value) + ) + defer cancel() + err := row.Scan(&key, &value) switch { case err == sql.ErrNoRows: @@ -116,12 +122,14 @@ func (c *readWriteCursor) Prev() ([]byte, []byte) { key []byte value []byte ) - err := c.bucket.tx.QueryRow( + row, cancel := c.bucket.tx.QueryRow( "SELECT key, value FROM "+c.bucket.table+" WHERE "+ parentSelector(c.bucket.id)+ " AND key<$1 ORDER BY key DESC LIMIT 1", c.currKey, - ).Scan(&key, &value) + ) + defer cancel() + err := row.Scan(&key, &value) switch { case err == sql.ErrNoRows: @@ -152,12 +160,14 @@ func (c *readWriteCursor) Seek(seek []byte) ([]byte, []byte) { key []byte value []byte ) - err := c.bucket.tx.QueryRow( + row, cancel := c.bucket.tx.QueryRow( "SELECT key, value FROM "+c.bucket.table+" WHERE "+ parentSelector(c.bucket.id)+ " AND key>=$1 ORDER BY key LIMIT 1", seek, - ).Scan(&key, &value) + ) + defer cancel() + err := row.Scan(&key, &value) switch { case err == sql.ErrNoRows: @@ -180,12 +190,14 @@ func (c *readWriteCursor) Seek(seek []byte) ([]byte, []byte) { func (c *readWriteCursor) Delete() error { // Get first record at or after cursor. var key []byte - err := c.bucket.tx.QueryRow( + row, cancel := c.bucket.tx.QueryRow( "SELECT key FROM "+c.bucket.table+" WHERE "+ parentSelector(c.bucket.id)+ " AND key>=$1 ORDER BY key LIMIT 1", c.currKey, - ).Scan(&key) + ) + defer cancel() + err := row.Scan(&key) switch { case err == sql.ErrNoRows: diff --git a/kvdb/postgres/readwrite_tx.go b/kvdb/postgres/readwrite_tx.go index 99701a7a8..942552580 100644 --- a/kvdb/postgres/readwrite_tx.go +++ b/kvdb/postgres/readwrite_tx.go @@ -158,21 +158,11 @@ func (tx *readWriteTx) OnCommit(cb func()) { } // QueryRow executes a QueryRow call with a timeout context. -func (tx *readWriteTx) QueryRow(query string, args ...interface{}) *sql.Row { - ctx, cancel := tx.db.getTimeoutCtx() - defer cancel() - - return tx.tx.QueryRowContext(ctx, query, args...) -} - -// Query executes a Query call with a timeout context. -func (tx *readWriteTx) Query(query string, args ...interface{}) (*sql.Rows, - error) { +func (tx *readWriteTx) QueryRow(query string, args ...interface{}) (*sql.Row, + func()) { ctx, cancel := tx.db.getTimeoutCtx() - defer cancel() - - return tx.tx.QueryContext(ctx, query, args...) + return tx.tx.QueryRowContext(ctx, query, args...), cancel } // Exec executes a Exec call with a timeout context.