Merge pull request #5992 from bottlepay/pg-max-conn

postgres: add connection limit
This commit is contained in:
Olaoluwa Osuntokun 2021-11-17 11:53:52 -08:00 committed by GitHub
commit 451280daa4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 130 additions and 7 deletions

View file

@ -194,7 +194,7 @@ ifeq ($(dbbackend),postgres)
# Start a fresh postgres instance. Allow a maximum of 500 connections. # Start a fresh postgres instance. Allow a maximum of 500 connections.
# This is required for the async benchmark to pass. # This is required for the async benchmark to pass.
docker run --name lnd-postgres -e POSTGRES_PASSWORD=postgres -p 6432:5432 -d postgres:13-alpine -N 500 docker run --name lnd-postgres -e POSTGRES_PASSWORD=postgres -p 6432:5432 -d postgres:13-alpine
docker logs -f lnd-postgres & docker logs -f lnd-postgres &
# Wait for the instance to be started. # Wait for the instance to be started.

View file

@ -634,6 +634,8 @@ messages directly. There is no routing/path finding involved.
* [Fixes a bug that would cause pruned nodes to stall out](https://github.com/lightningnetwork/lnd/pull/5970) * [Fixes a bug that would cause pruned nodes to stall out](https://github.com/lightningnetwork/lnd/pull/5970)
* [Add Postgres connection limit](https://github.com/lightningnetwork/lnd/pull/5992)
## Documentation ## Documentation
The [code contribution guidelines have been updated to mention the new The [code contribution guidelines have been updated to mention the new

View file

@ -6,4 +6,5 @@ import "time"
type Config struct { type Config struct {
Dsn string `long:"dsn" description:"Database connection string."` Dsn string `long:"dsn" description:"Database connection string."`
Timeout time.Duration `long:"timeout" description:"Database connection timeout. Set to zero to disable."` Timeout time.Duration `long:"timeout" description:"Database connection timeout. Set to zero to disable."`
MaxConnections int `long:"maxconnections" description:"The maximum number of open connections to the database. Set to zero for unlimited."`
} }

View file

@ -13,7 +13,6 @@ import (
"time" "time"
"github.com/btcsuite/btcwallet/walletdb" "github.com/btcsuite/btcwallet/walletdb"
_ "github.com/jackc/pgx/v4/stdlib"
) )
const ( const (
@ -58,6 +57,14 @@ type db struct {
// Enforce db implements the walletdb.DB interface. // Enforce db implements the walletdb.DB interface.
var _ walletdb.DB = (*db)(nil) var _ walletdb.DB = (*db)(nil)
// Global set of database connections.
var dbConns *dbConnSet
// Init initializes the global set of database connections.
func Init(maxConnections int) {
dbConns = newDbConnSet(maxConnections)
}
// newPostgresBackend returns a db object initialized with the passed backend // newPostgresBackend returns a db object initialized with the passed backend
// config. If postgres connection cannot be estabished, then returns error. // config. If postgres connection cannot be estabished, then returns error.
func newPostgresBackend(ctx context.Context, config *Config, prefix string) ( func newPostgresBackend(ctx context.Context, config *Config, prefix string) (
@ -67,7 +74,11 @@ func newPostgresBackend(ctx context.Context, config *Config, prefix string) (
return nil, errors.New("empty postgres prefix") return nil, errors.New("empty postgres prefix")
} }
dbConn, err := sql.Open("pgx", config.Dsn) if dbConns == nil {
return nil, errors.New("db connection set not initialized")
}
dbConn, err := dbConns.Open(config.Dsn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -245,5 +256,5 @@ func (db *db) Copy(w io.Writer) error {
// Close cleanly shuts down the database and syncs all data. // Close cleanly shuts down the database and syncs all data.
// This function is part of the walletdb.Db interface implementation. // This function is part of the walletdb.Db interface implementation.
func (db *db) Close() error { func (db *db) Close() error {
return db.db.Close() return dbConns.Close(db.cfg.Dsn)
} }

View file

@ -0,0 +1,89 @@
package postgres
import (
"database/sql"
"fmt"
"sync"
_ "github.com/jackc/pgx/v4/stdlib"
)
// dbConn stores the actual connection and a user count.
type dbConn struct {
db *sql.DB
count int
}
// dbConnSet stores a set of connections.
type dbConnSet struct {
dbConn map[string]*dbConn
maxConnections int
sync.Mutex
}
// newDbConnSet initializes a new set of connections.
func newDbConnSet(maxConnections int) *dbConnSet {
return &dbConnSet{
dbConn: make(map[string]*dbConn),
maxConnections: maxConnections,
}
}
// Open opens a new database connection. If a connection already exists for the
// given dsn, the existing connection is returned.
func (d *dbConnSet) Open(dsn string) (*sql.DB, error) {
d.Lock()
defer d.Unlock()
if dbConn, ok := d.dbConn[dsn]; ok {
dbConn.count++
return dbConn.db, nil
}
db, err := sql.Open("pgx", dsn)
if err != nil {
return nil, err
}
// Limit maximum number of open connections. This is useful to prevent
// the server from running out of connections and returning an error.
// With this client-side limit in place, lnd will wait for a connection
// to become available.
if d.maxConnections != 0 {
db.SetMaxOpenConns(d.maxConnections)
}
d.dbConn[dsn] = &dbConn{
db: db,
count: 1,
}
return db, nil
}
// Close closes the connection with the given dsn. If there are still other
// users of the same connection, this function does nothing.
func (d *dbConnSet) Close(dsn string) error {
d.Lock()
defer d.Unlock()
dbConn, ok := d.dbConn[dsn]
if !ok {
return fmt.Errorf("connection not found: %v", dsn)
}
// Reduce user count.
dbConn.count--
// Do not close if there are other users.
if dbConn.count > 0 {
return nil
}
// Close connection.
delete(d.dbConn, dsn)
return dbConn.db.Close()
}

6
kvdb/postgres/no_db.go Normal file
View file

@ -0,0 +1,6 @@
//go:build !kvdb_postgres
// +build !kvdb_postgres
package postgres
func Init(maxConnections int) {}

View file

@ -23,6 +23,8 @@ const (
PostgresBackend = "postgres" PostgresBackend = "postgres"
DefaultBatchCommitInterval = 500 * time.Millisecond DefaultBatchCommitInterval = 500 * time.Millisecond
defaultPostgresMaxConnections = 50
// NSChannelDB is the namespace name that we use for the combined graph // NSChannelDB is the namespace name that we use for the combined graph
// and channel state DB. // and channel state DB.
NSChannelDB = "channeldb" NSChannelDB = "channeldb"
@ -71,6 +73,9 @@ func DefaultDB() *DB {
AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge, AutoCompactMinAge: kvdb.DefaultBoltAutoCompactMinAge,
DBTimeout: kvdb.DefaultDBTimeout, DBTimeout: kvdb.DefaultDBTimeout,
}, },
Postgres: &postgres.Config{
MaxConnections: defaultPostgresMaxConnections,
},
} }
} }
@ -113,7 +118,8 @@ func (db *DB) Validate() error {
// on configuration. // on configuration.
func (db *DB) Init(ctx context.Context, dbPath string) error { func (db *DB) Init(ctx context.Context, dbPath string) error {
// Start embedded etcd server if requested. // Start embedded etcd server if requested.
if db.Backend == EtcdBackend && db.Etcd.Embedded { switch {
case db.Backend == EtcdBackend && db.Etcd.Embedded:
cfg, _, err := kvdb.StartEtcdTestBackend( cfg, _, err := kvdb.StartEtcdTestBackend(
dbPath, db.Etcd.EmbeddedClientPort, dbPath, db.Etcd.EmbeddedClientPort,
db.Etcd.EmbeddedPeerPort, db.Etcd.EmbeddedLogFile, db.Etcd.EmbeddedPeerPort, db.Etcd.EmbeddedLogFile,
@ -125,6 +131,9 @@ func (db *DB) Init(ctx context.Context, dbPath string) error {
// Override the original config with the config for // Override the original config with the config for
// the embedded instance. // the embedded instance.
db.Etcd = cfg db.Etcd = cfg
case db.Backend == PostgresBackend:
postgres.Init(db.Postgres.MaxConnections)
} }
return nil return nil

View file

@ -1177,6 +1177,11 @@ litecoin.node=ltcd
; disable. ; disable.
; db.postgres.timeout= ; db.postgres.timeout=
; Postgres maximum number of connections. Set to zero for unlimited. It is
; recommended to set a limit that is below the server connection limit.
; Otherwise errors may occur in lnd under high-load conditions.
; db.postgres.maxconnections=
[bolt] [bolt]
; If true, prevents the database from syncing its freelist to disk. ; If true, prevents the database from syncing its freelist to disk.