From 170160f28a6b8e039bf841df0d806b187f7d7a10 Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Tue, 17 Jan 2023 13:54:12 +0200 Subject: [PATCH] kvdb+refactor: move all general sqlite code to seprate dir In this commit, all the sql, non-postgres-specific, code is moved out of the postgres package and into a new sqlbase package. This will make it more easily reusable for future sql integrations. --- kvdb/log.go | 4 +- kvdb/postgres/db.go | 263 +---------------- kvdb/postgres/fixture.go | 3 +- kvdb/sqlbase/db.go | 267 ++++++++++++++++++ kvdb/{postgres => sqlbase}/db_conn_set.go | 2 +- kvdb/{postgres => sqlbase}/log.go | 2 +- kvdb/{postgres/no_db.go => sqlbase/no_sql.go} | 2 +- .../{postgres => sqlbase}/readwrite_bucket.go | 2 +- .../{postgres => sqlbase}/readwrite_cursor.go | 2 +- kvdb/{postgres => sqlbase}/readwrite_tx.go | 6 +- kvdb/{postgres => sqlbase}/schema.go | 2 +- 11 files changed, 285 insertions(+), 270 deletions(-) create mode 100644 kvdb/sqlbase/db.go rename kvdb/{postgres => sqlbase}/db_conn_set.go (99%) rename kvdb/{postgres => sqlbase}/log.go (95%) rename kvdb/{postgres/no_db.go => sqlbase/no_sql.go} (78%) rename kvdb/{postgres => sqlbase}/readwrite_bucket.go (99%) rename kvdb/{postgres => sqlbase}/readwrite_cursor.go (99%) rename kvdb/{postgres => sqlbase}/readwrite_tx.go (98%) rename kvdb/{postgres => sqlbase}/schema.go (99%) diff --git a/kvdb/log.go b/kvdb/log.go index 6dc3ad714..824715432 100644 --- a/kvdb/log.go +++ b/kvdb/log.go @@ -2,7 +2,7 @@ package kvdb import ( "github.com/btcsuite/btclog" - "github.com/lightningnetwork/lnd/kvdb/postgres" + "github.com/lightningnetwork/lnd/kvdb/sqlbase" ) // log is a logger that is initialized as disabled. This means the package will @@ -13,5 +13,5 @@ var log = btclog.Disabled func UseLogger(logger btclog.Logger) { log = logger - postgres.UseLogger(log) + sqlbase.UseLogger(log) } diff --git a/kvdb/postgres/db.go b/kvdb/postgres/db.go index 2c31d46e0..90ca8324a 100644 --- a/kvdb/postgres/db.go +++ b/kvdb/postgres/db.go @@ -4,269 +4,14 @@ package postgres import ( "context" - "database/sql" - "errors" - "fmt" - "io" - "sync" - "time" "github.com/btcsuite/btcwallet/walletdb" + "github.com/lightningnetwork/lnd/kvdb/sqlbase" ) -const ( - // kvTableName is the name of the table that will contain all the kv - // pairs. - kvTableName = "kv" -) - -// SqlConfig holds a set of configuration options of a sql database connection. -type SqlConfig struct { - // DriverName is the string that defines the registered sql driver that - // is to be used. - DriverName string - - // Dsn is the database connection string that will be used to connect - // to the db. - Dsn string - - // Timeout is the time after which a query to the db will be canceled if - // it has not yet completed. - Timeout time.Duration - - // Schema is the name of the schema under which the sql tables should be - // created. It should be left empty for backends like sqlite that do not - // support having more than one schema. - Schema string - - // TableNamePrefix is the name that should be used as a table name - // prefix when constructing the KV style table. - TableNamePrefix string - - // SQLiteCmdReplacements define a one-to-one string mapping of sql - // keywords to the strings that should replace those keywords in any - // commands. Note that the sqlite keywords to be replaced are - // case-sensitive. - SQLiteCmdReplacements SQLiteCmdReplacements - - // WithTxLevelLock when set will ensure that there is a transaction - // level lock. - WithTxLevelLock bool -} - -// db holds a reference to the postgres connection. -type db struct { - // cfg is the sql db connection config. - cfg *SqlConfig - - // prefix is the table name prefix that is used to simulate namespaces. - // We don't use schemas because at least sqlite does not support that. - prefix string - - // ctx is the overall context for the database driver. - // - // TODO: This is an anti-pattern that is in place until the kvdb - // interface supports a context. - ctx context.Context - - // db is the underlying database connection instance. - db *sql.DB - - // lock is the global write lock that ensures single writer. This is - // only used if cfg.WithTxLevelLock is set. - lock sync.RWMutex - - // table is the name of the table that contains the data for all - // top-level buckets that have keys that cannot be mapped to a distinct - // sql table. - table string -} - -// Enforce db implements the walletdb.DB interface. -var _ walletdb.DB = (*db)(nil) - -var ( - // dbConns is a global set of database connections. - dbConns *dbConnSet - dbConnsMu sync.Mutex -) - -// Init initializes the global set of database connections. -func Init(maxConnections int) { - dbConnsMu.Lock() - defer dbConnsMu.Unlock() - - if dbConns != nil { - return - } - - dbConns = newDbConnSet(maxConnections) -} - -// NewSqlBackend returns a db object initialized with the passed backend -// config. If database connection cannot be established, then returns error. -func NewSqlBackend(ctx context.Context, cfg *SqlConfig) (*db, error) { - dbConnsMu.Lock() - defer dbConnsMu.Unlock() - - if dbConns == nil { - return nil, errors.New("db connection set not initialized") - } - - if cfg.TableNamePrefix == "" { - return nil, errors.New("empty table name prefix") - } - - table := fmt.Sprintf("%s_%s", cfg.TableNamePrefix, kvTableName) - - query := newKVSchemaCreationCmd( - table, cfg.Schema, cfg.SQLiteCmdReplacements, - ) - - dbConn, err := dbConns.Open(cfg.DriverName, cfg.Dsn) - if err != nil { - return nil, err - } - - _, err = dbConn.ExecContext(ctx, query) - if err != nil { - _ = dbConn.Close() - - return nil, err - } - - return &db{ - cfg: cfg, - ctx: ctx, - db: dbConn, - table: table, - prefix: cfg.TableNamePrefix, - }, nil -} - -// getTimeoutCtx gets a timeout context for database requests. -func (db *db) getTimeoutCtx() (context.Context, func()) { - if db.cfg.Timeout == time.Duration(0) { - return db.ctx, func() {} - } - - return context.WithTimeout(db.ctx, db.cfg.Timeout) -} - -// getPrefixedTableName returns a table name for this prefix (namespace). -func (db *db) getPrefixedTableName(table string) string { - return fmt.Sprintf("%s_%s", db.prefix, table) -} - -// catchPanic executes the specified function. If a panic occurs, it is returned -// as an error value. -func catchPanic(f func() error) (err error) { - defer func() { - if r := recover(); r != nil { - log.Criticalf("Caught unhandled error: %v", r) - - switch data := r.(type) { - case error: - err = data - - default: - err = errors.New(fmt.Sprintf("%v", data)) - } - } - }() - - err = f() - - return -} - -// View opens a database read transaction and executes the function f with the -// transaction passed as a parameter. After f exits, the transaction is rolled -// back. If f errors, its error is returned, not a rollback error (if any -// occur). The passed reset function is called before the start of the -// transaction and can be used to reset intermediate state. As callers may -// expect retries of the f closure (depending on the database backend used), the -// reset function will be called before each retry respectively. -func (db *db) View(f func(tx walletdb.ReadTx) error, reset func()) error { - return db.executeTransaction( - func(tx walletdb.ReadWriteTx) error { - return f(tx.(walletdb.ReadTx)) - }, - reset, true, - ) -} - -// Update opens a database read/write transaction and executes the function f -// with the transaction passed as a parameter. After f exits, if f did not -// error, the transaction is committed. Otherwise, if f did error, the -// transaction is rolled back. If the rollback fails, the original error -// returned by f is still returned. If the commit fails, the commit error is -// returned. As callers may expect retries of the f closure, the reset function -// will be called before each retry respectively. -func (db *db) Update(f func(tx walletdb.ReadWriteTx) error, reset func()) (err error) { - return db.executeTransaction(f, reset, false) -} - -// executeTransaction creates a new read-only or read-write transaction and -// executes the given function within it. -func (db *db) executeTransaction(f func(tx walletdb.ReadWriteTx) error, - reset func(), readOnly bool) error { - - reset() - - tx, err := newReadWriteTx(db, readOnly) - if err != nil { - return err - } - - err = catchPanic(func() error { return f(tx) }) - if err != nil { - if rollbackErr := tx.Rollback(); rollbackErr != nil { - log.Errorf("Error rolling back tx: %v", rollbackErr) - } - - return err - } - - return tx.Commit() -} - -// PrintStats returns all collected stats pretty printed into a string. -func (db *db) PrintStats() string { - return "stats not supported by Postgres driver" -} - -// BeginReadWriteTx opens a database read+write transaction. -func (db *db) BeginReadWriteTx() (walletdb.ReadWriteTx, error) { - return newReadWriteTx(db, false) -} - -// BeginReadTx opens a database read transaction. -func (db *db) BeginReadTx() (walletdb.ReadTx, error) { - return newReadWriteTx(db, true) -} - -// Copy writes a copy of the database to the provided writer. This call will -// start a read-only transaction to perform all operations. -// This function is part of the walletdb.Db interface implementation. -func (db *db) Copy(w io.Writer) error { - return errors.New("not implemented") -} - -// Close cleanly shuts down the database and syncs all data. -// This function is part of the walletdb.Db interface implementation. -func (db *db) Close() error { - dbConnsMu.Lock() - defer dbConnsMu.Unlock() - - log.Infof("Closing database %v", db.prefix) - - return dbConns.Close(db.cfg.Dsn) -} - // sqliteCmdReplacements defines a mapping from some SQLite keywords and phrases // to their postgres counterparts. -var sqliteCmdReplacements = SQLiteCmdReplacements{ +var sqliteCmdReplacements = sqlbase.SQLiteCmdReplacements{ "BLOB": "BYTEA", "INTEGER PRIMARY KEY": "BIGSERIAL PRIMARY KEY", } @@ -276,7 +21,7 @@ var sqliteCmdReplacements = SQLiteCmdReplacements{ func newPostgresBackend(ctx context.Context, config *Config, prefix string) ( walletdb.DB, error) { - cfg := &SqlConfig{ + cfg := &sqlbase.Config{ DriverName: "pgx", Dsn: config.Dsn, Timeout: config.Timeout, @@ -286,5 +31,5 @@ func newPostgresBackend(ctx context.Context, config *Config, prefix string) ( WithTxLevelLock: true, } - return NewSqlBackend(ctx, cfg) + return sqlbase.NewSqlBackend(ctx, cfg) } diff --git a/kvdb/postgres/fixture.go b/kvdb/postgres/fixture.go index c7e2b014f..26d95c33d 100644 --- a/kvdb/postgres/fixture.go +++ b/kvdb/postgres/fixture.go @@ -13,6 +13,7 @@ import ( "github.com/btcsuite/btcwallet/walletdb" embeddedpostgres "github.com/fergusstrange/embedded-postgres" + "github.com/lightningnetwork/lnd/kvdb/sqlbase" ) const ( @@ -32,7 +33,7 @@ const testMaxConnections = 50 // to be done once, because NewFixture will create random new databases on every // call. It returns a stop closure that stops the database if called. func StartEmbeddedPostgres() (func() error, error) { - Init(testMaxConnections) + sqlbase.Init(testMaxConnections) postgres := embeddedpostgres.NewDatabase( embeddedpostgres.DefaultConfig(). diff --git a/kvdb/sqlbase/db.go b/kvdb/sqlbase/db.go new file mode 100644 index 000000000..3e5d3c8e1 --- /dev/null +++ b/kvdb/sqlbase/db.go @@ -0,0 +1,267 @@ +//go:build kvdb_postgres + +package sqlbase + +import ( + "context" + "database/sql" + "errors" + "fmt" + "io" + "sync" + "time" + + "github.com/btcsuite/btcwallet/walletdb" +) + +const ( + // kvTableName is the name of the table that will contain all the kv + // pairs. + kvTableName = "kv" +) + +// Config holds a set of configuration options of a sql database connection. +type Config struct { + // DriverName is the string that defines the registered sql driver that + // is to be used. + DriverName string + + // Dsn is the database connection string that will be used to connect + // to the db. + Dsn string + + // Timeout is the time after which a query to the db will be canceled if + // it has not yet completed. + Timeout time.Duration + + // Schema is the name of the schema under which the sql tables should be + // created. It should be left empty for backends like sqlite that do not + // support having more than one schema. + Schema string + + // TableNamePrefix is the name that should be used as a table name + // prefix when constructing the KV style table. + TableNamePrefix string + + // SQLiteCmdReplacements define a one-to-one string mapping of sql + // keywords to the strings that should replace those keywords in any + // commands. Note that the sqlite keywords to be replaced are + // case-sensitive. + SQLiteCmdReplacements SQLiteCmdReplacements + + // WithTxLevelLock when set will ensure that there is a transaction + // level lock. + WithTxLevelLock bool +} + +// db holds a reference to the sql db connection. +type db struct { + // cfg is the sql db connection config. + cfg *Config + + // prefix is the table name prefix that is used to simulate namespaces. + // We don't use schemas because at least sqlite does not support that. + prefix string + + // ctx is the overall context for the database driver. + // + // TODO: This is an anti-pattern that is in place until the kvdb + // interface supports a context. + ctx context.Context + + // db is the underlying database connection instance. + db *sql.DB + + // lock is the global write lock that ensures single writer. This is + // only used if cfg.WithTxLevelLock is set. + lock sync.RWMutex + + // table is the name of the table that contains the data for all + // top-level buckets that have keys that cannot be mapped to a distinct + // sql table. + table string +} + +// Enforce db implements the walletdb.DB interface. +var _ walletdb.DB = (*db)(nil) + +var ( + // dbConns is a global set of database connections. + dbConns *dbConnSet + dbConnsMu sync.Mutex +) + +// Init initializes the global set of database connections. +func Init(maxConnections int) { + dbConnsMu.Lock() + defer dbConnsMu.Unlock() + + if dbConns != nil { + return + } + + dbConns = newDbConnSet(maxConnections) +} + +// NewSqlBackend returns a db object initialized with the passed backend +// config. If database connection cannot be established, then returns error. +func NewSqlBackend(ctx context.Context, cfg *Config) (*db, error) { + dbConnsMu.Lock() + defer dbConnsMu.Unlock() + + if dbConns == nil { + return nil, errors.New("db connection set not initialized") + } + + if cfg.TableNamePrefix == "" { + return nil, errors.New("empty table name prefix") + } + + table := fmt.Sprintf("%s_%s", cfg.TableNamePrefix, kvTableName) + + query := newKVSchemaCreationCmd( + table, cfg.Schema, cfg.SQLiteCmdReplacements, + ) + + dbConn, err := dbConns.Open(cfg.DriverName, cfg.Dsn) + if err != nil { + return nil, err + } + + _, err = dbConn.ExecContext(ctx, query) + if err != nil { + _ = dbConn.Close() + + return nil, err + } + + return &db{ + cfg: cfg, + ctx: ctx, + db: dbConn, + table: table, + prefix: cfg.TableNamePrefix, + }, nil +} + +// getTimeoutCtx gets a timeout context for database requests. +func (db *db) getTimeoutCtx() (context.Context, func()) { + if db.cfg.Timeout == time.Duration(0) { + return db.ctx, func() {} + } + + return context.WithTimeout(db.ctx, db.cfg.Timeout) +} + +// getPrefixedTableName returns a table name for this prefix (namespace). +func (db *db) getPrefixedTableName(table string) string { + return fmt.Sprintf("%s_%s", db.prefix, table) +} + +// catchPanic executes the specified function. If a panic occurs, it is returned +// as an error value. +func catchPanic(f func() error) (err error) { + defer func() { + if r := recover(); r != nil { + log.Criticalf("Caught unhandled error: %v", r) + + switch data := r.(type) { + case error: + err = data + + default: + err = errors.New(fmt.Sprintf("%v", data)) + } + } + }() + + err = f() + + return +} + +// View opens a database read transaction and executes the function f with the +// transaction passed as a parameter. After f exits, the transaction is rolled +// back. If f errors, its error is returned, not a rollback error (if any +// occur). The passed reset function is called before the start of the +// transaction and can be used to reset intermediate state. As callers may +// expect retries of the f closure (depending on the database backend used), the +// reset function will be called before each retry respectively. +func (db *db) View(f func(tx walletdb.ReadTx) error, reset func()) error { + return db.executeTransaction( + func(tx walletdb.ReadWriteTx) error { + return f(tx.(walletdb.ReadTx)) + }, + reset, true, + ) +} + +// Update opens a database read/write transaction and executes the function f +// with the transaction passed as a parameter. After f exits, if f did not +// error, the transaction is committed. Otherwise, if f did error, the +// transaction is rolled back. If the rollback fails, the original error +// returned by f is still returned. If the commit fails, the commit error is +// returned. As callers may expect retries of the f closure, the reset function +// will be called before each retry respectively. +func (db *db) Update(f func(tx walletdb.ReadWriteTx) error, + reset func()) error { + + return db.executeTransaction(f, reset, false) +} + +// executeTransaction creates a new read-only or read-write transaction and +// executes the given function within it. +func (db *db) executeTransaction(f func(tx walletdb.ReadWriteTx) error, + reset func(), readOnly bool) error { + + reset() + + tx, err := newReadWriteTx(db, readOnly) + if err != nil { + return err + } + + err = catchPanic(func() error { return f(tx) }) + if err != nil { + if rollbackErr := tx.Rollback(); rollbackErr != nil { + log.Errorf("Error rolling back tx: %v", rollbackErr) + } + + return err + } + + return tx.Commit() +} + +// PrintStats returns all collected stats pretty printed into a string. +func (db *db) PrintStats() string { + return "stats not supported by Postgres driver" +} + +// BeginReadWriteTx opens a database read+write transaction. +func (db *db) BeginReadWriteTx() (walletdb.ReadWriteTx, error) { + return newReadWriteTx(db, false) +} + +// BeginReadTx opens a database read transaction. +func (db *db) BeginReadTx() (walletdb.ReadTx, error) { + return newReadWriteTx(db, true) +} + +// Copy writes a copy of the database to the provided writer. This call will +// start a read-only transaction to perform all operations. +// This function is part of the walletdb.Db interface implementation. +func (db *db) Copy(w io.Writer) error { + return errors.New("not implemented") +} + +// Close cleanly shuts down the database and syncs all data. +// This function is part of the walletdb.Db interface implementation. +func (db *db) Close() error { + dbConnsMu.Lock() + defer dbConnsMu.Unlock() + + log.Infof("Closing database %v", db.prefix) + + return dbConns.Close(db.cfg.Dsn) +} diff --git a/kvdb/postgres/db_conn_set.go b/kvdb/sqlbase/db_conn_set.go similarity index 99% rename from kvdb/postgres/db_conn_set.go rename to kvdb/sqlbase/db_conn_set.go index 736f8516b..ee360a973 100644 --- a/kvdb/postgres/db_conn_set.go +++ b/kvdb/sqlbase/db_conn_set.go @@ -1,4 +1,4 @@ -package postgres +package sqlbase import ( "database/sql" diff --git a/kvdb/postgres/log.go b/kvdb/sqlbase/log.go similarity index 95% rename from kvdb/postgres/log.go rename to kvdb/sqlbase/log.go index 5a27e9eac..c6cbe6577 100644 --- a/kvdb/postgres/log.go +++ b/kvdb/sqlbase/log.go @@ -1,4 +1,4 @@ -package postgres +package sqlbase import "github.com/btcsuite/btclog" diff --git a/kvdb/postgres/no_db.go b/kvdb/sqlbase/no_sql.go similarity index 78% rename from kvdb/postgres/no_db.go rename to kvdb/sqlbase/no_sql.go index aa49471ae..92c002d9e 100644 --- a/kvdb/postgres/no_db.go +++ b/kvdb/sqlbase/no_sql.go @@ -1,5 +1,5 @@ //go:build !kvdb_postgres -package postgres +package sqlbase func Init(maxConnections int) {} diff --git a/kvdb/postgres/readwrite_bucket.go b/kvdb/sqlbase/readwrite_bucket.go similarity index 99% rename from kvdb/postgres/readwrite_bucket.go rename to kvdb/sqlbase/readwrite_bucket.go index 71856b213..c1eff6876 100644 --- a/kvdb/postgres/readwrite_bucket.go +++ b/kvdb/sqlbase/readwrite_bucket.go @@ -1,6 +1,6 @@ //go:build kvdb_postgres -package postgres +package sqlbase import ( "database/sql" diff --git a/kvdb/postgres/readwrite_cursor.go b/kvdb/sqlbase/readwrite_cursor.go similarity index 99% rename from kvdb/postgres/readwrite_cursor.go rename to kvdb/sqlbase/readwrite_cursor.go index 67c1a39ae..bb5a0251e 100644 --- a/kvdb/postgres/readwrite_cursor.go +++ b/kvdb/sqlbase/readwrite_cursor.go @@ -1,6 +1,6 @@ //go:build kvdb_postgres -package postgres +package sqlbase import ( "database/sql" diff --git a/kvdb/postgres/readwrite_tx.go b/kvdb/sqlbase/readwrite_tx.go similarity index 98% rename from kvdb/postgres/readwrite_tx.go rename to kvdb/sqlbase/readwrite_tx.go index c31b05603..7e82a1570 100644 --- a/kvdb/postgres/readwrite_tx.go +++ b/kvdb/sqlbase/readwrite_tx.go @@ -1,6 +1,6 @@ //go:build kvdb_postgres -package postgres +package sqlbase import ( "context" @@ -110,7 +110,9 @@ func (tx *readWriteTx) ReadWriteBucket(key []byte) walletdb.ReadWriteBucket { // CreateTopLevelBucket creates the top level bucket for a key if it // does not exist. The newly-created bucket it returned. -func (tx *readWriteTx) CreateTopLevelBucket(key []byte) (walletdb.ReadWriteBucket, error) { +func (tx *readWriteTx) CreateTopLevelBucket(key []byte) ( + walletdb.ReadWriteBucket, error) { + if len(key) == 0 { return nil, walletdb.ErrBucketNameRequired } diff --git a/kvdb/postgres/schema.go b/kvdb/sqlbase/schema.go similarity index 99% rename from kvdb/postgres/schema.go rename to kvdb/sqlbase/schema.go index 9276326e3..6cced1c3d 100644 --- a/kvdb/postgres/schema.go +++ b/kvdb/sqlbase/schema.go @@ -1,6 +1,6 @@ //go:build kvdb_postgres -package postgres +package sqlbase import ( "fmt"