db: fix sqlite3 code which manipulates columns.

Because it used internal routines, it didn't pass operations through the
db hook!  So make it use the generic routines, with the twist that they
are not translated.

And when we use this in a migration hook, we're actually in a
transaction.

This, in turn, introduces an issue: we need to be outside a transaction
to "PRAGMA foreign_keys = OFF", but completing the transaction when
there is a db hook actually enters the io loop, freeing the tmpctx!

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell 2022-09-11 15:21:31 +09:30 committed by Christian Decker
parent d7aa2749c3
commit e853cdc3ff
5 changed files with 109 additions and 107 deletions

View file

@ -277,19 +277,11 @@ static bool db_postgres_rename_column(struct db *db,
const char *tablename,
const char *from, const char *to)
{
PGresult *res;
char *cmd;
cmd = tal_fmt(db, "ALTER TABLE %s RENAME %s TO %s;",
tablename, from, to);
res = PQexec(db->conn, cmd);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
db->error = tal_fmt(db, "Rename '%s' failed: %s",
cmd, PQerrorMessage(db->conn));
PQclear(res);
return false;
}
PQclear(res);
db_exec_prepared_v2(take(db_prepare_untranslated(db, cmd)));
return true;
}
@ -297,7 +289,6 @@ static bool db_postgres_delete_columns(struct db *db,
const char *tablename,
const char **colnames, size_t num_cols)
{
PGresult *res;
char *cmd;
cmd = tal_fmt(db, "ALTER TABLE %s ", tablename);
@ -307,14 +298,8 @@ static bool db_postgres_delete_columns(struct db *db,
tal_append_fmt(&cmd, "DROP %s", colnames[i]);
}
tal_append_fmt(&cmd, ";");
res = PQexec(db->conn, cmd);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
db->error = tal_fmt(db, "Delete '%s' failed: %s",
cmd, PQerrorMessage(db->conn));
PQclear(res);
return false;
}
PQclear(res);
db_exec_prepared_v2(take(db_prepare_untranslated(db, cmd)));
return true;
}

View file

@ -1,7 +1,9 @@
#include "config.h"
#include <ccan/ccan/tal/str/str.h>
#include <common/utils.h>
#include <db/bindings.h>
#include <db/common.h>
#include <db/exec.h>
#include <db/utils.h>
#if HAVE_SQLITE3
@ -461,7 +463,8 @@ static const char *find_column_name(const tal_t *ctx,
return tal_strndup(ctx, sqlpart + start, *after - start);
}
/* Move table out the way, return columns */
/* Move table out the way, return columns.
* Note: with db_hook, frees tmpctx! */
static char **prepare_table_manip(const tal_t *ctx,
struct db *db, const char *tablename)
{
@ -488,39 +491,35 @@ static char **prepare_table_manip(const tal_t *ctx,
sql = tal_strdup(tmpctx, (const char *)sqlite3_column_text(stmt, 0));
sqlite3_finalize(stmt);
/* We MUST use generic routines to write to db, since they
* mirror changes to the db hook! */
bracket = strchr(sql, '(');
if (!strstarts(sql, "CREATE TABLE") || !bracket) {
db->error = tal_fmt(db, "strange schema for %s: %s",
tablename, sql);
return NULL;
}
if (!strstarts(sql, "CREATE TABLE") || !bracket)
db_fatal("Bad sql from prepare_table_manip %s: %s",
tablename, sql);
/* Split after ( by commas: any lower case is assumed to be a field */
parts = tal_strsplit(ctx, bracket + 1, ",", STR_EMPTY_OK);
/* Turn off foreign keys first. */
sqlite3_prepare_v2(wrapper->conn, "PRAGMA foreign_keys = OFF;", -1, &stmt, NULL);
if (sqlite3_step(stmt) != SQLITE_DONE)
goto sqlite_stmt_err;
sqlite3_finalize(stmt);
/* Now, we actually need to turn OFF transactions for a moment, as
* this pragma only has an effect outside a tx! */
db_commit_transaction(db);
/* But core insists we're "in a transaction" for all ops, so fake it */
db->in_transaction = "Not really";
/* Turn off foreign keys first. */
db_prepare_for_changes(db);
db_exec_prepared_v2(take(db_prepare_untranslated(db,
"PRAGMA foreign_keys = OFF;")));
db_report_changes(db, NULL, 0);
db->in_transaction = NULL;
db_begin_transaction(db);
cmd = tal_fmt(tmpctx, "ALTER TABLE %s RENAME TO temp_%s;",
tablename, tablename);
sqlite3_prepare_v2(wrapper->conn, cmd, -1, &stmt, NULL);
if (sqlite3_step(stmt) != SQLITE_DONE)
goto sqlite_stmt_err;
sqlite3_finalize(stmt);
/* Make sure we do the same to backup! */
replicate_statement(wrapper, "PRAGMA foreign_keys = OFF;");
replicate_statement(wrapper, cmd);
db_exec_prepared_v2(take(db_prepare_untranslated(db, cmd)));
return parts;
sqlite_stmt_err:
db->error = tal_fmt(db, "%s", sqlite3_errmsg(wrapper->conn));
sqlite3_finalize(stmt);
return tal_free(parts);
}
static bool complete_table_manip(struct db *db,
@ -528,9 +527,7 @@ static bool complete_table_manip(struct db *db,
const char **coldefs,
const char **oldcolnames)
{
sqlite3_stmt *stmt;
char *create_cmd, *insert_cmd, *drop_cmd;
struct db_sqlite3 *wrapper = (struct db_sqlite3 *)db->conn;
/* Create table */
create_cmd = tal_fmt(tmpctx, "CREATE TABLE %s (", tablename);
@ -541,13 +538,7 @@ static bool complete_table_manip(struct db *db,
}
tal_append_fmt(&create_cmd, ";");
sqlite3_prepare_v2(wrapper->conn, create_cmd, -1, &stmt, NULL);
if (sqlite3_step(stmt) != SQLITE_DONE)
goto sqlite_stmt_err;
sqlite3_finalize(stmt);
/* Make sure we do the same to backup! */
replicate_statement(wrapper, create_cmd);
db_exec_prepared_v2(take(db_prepare_untranslated(db, create_cmd)));
/* Populate table from old one */
insert_cmd = tal_fmt(tmpctx, "INSERT INTO %s SELECT ", tablename);
@ -558,33 +549,24 @@ static bool complete_table_manip(struct db *db,
}
tal_append_fmt(&insert_cmd, " FROM temp_%s;", tablename);
sqlite3_prepare_v2(wrapper->conn, insert_cmd, -1, &stmt, NULL);
if (sqlite3_step(stmt) != SQLITE_DONE)
goto sqlite_stmt_err;
sqlite3_finalize(stmt);
replicate_statement(wrapper, insert_cmd);
db_exec_prepared_v2(take(db_prepare_untranslated(db, insert_cmd)));
/* Cleanup temp table */
drop_cmd = tal_fmt(tmpctx, "DROP TABLE temp_%s;", tablename);
sqlite3_prepare_v2(wrapper->conn, drop_cmd, -1, &stmt, NULL);
if (sqlite3_step(stmt) != SQLITE_DONE)
goto sqlite_stmt_err;
sqlite3_finalize(stmt);
replicate_statement(wrapper, drop_cmd);
db_exec_prepared_v2(take(db_prepare_untranslated(db, drop_cmd)));
db_commit_transaction(db);
/* Allow links between them (esp. cascade deletes!) */
sqlite3_prepare_v2(wrapper->conn, "PRAGMA foreign_keys = ON;", -1, &stmt, NULL);
if (sqlite3_step(stmt) != SQLITE_DONE)
goto sqlite_stmt_err;
sqlite3_finalize(stmt);
replicate_statement(wrapper, "PRAGMA foreign_keys = ON;");
db->in_transaction = "Not really";
db_prepare_for_changes(db);
db_exec_prepared_v2(take(db_prepare_untranslated(db,
"PRAGMA foreign_keys = ON;")));
db_report_changes(db, NULL, 0);
db->in_transaction = NULL;
/* migrations are performed inside transactions, so start one. */
db_begin_transaction(db);
return true;
sqlite_stmt_err:
db->error = tal_fmt(db, "%s", sqlite3_errmsg(wrapper->conn));
sqlite3_finalize(stmt);
return false;
}
static bool db_sqlite3_rename_column(struct db *db,
@ -595,10 +577,11 @@ static bool db_sqlite3_rename_column(struct db *db,
const char **coldefs, **oldcolnames;
bool colname_found = false;
parts = prepare_table_manip(tmpctx, db, tablename);
parts = prepare_table_manip(NULL, db, tablename);
if (!parts)
return false;
tal_steal(tmpctx, parts);
coldefs = tal_arr(tmpctx, const char *, 0);
oldcolnames = tal_arr(tmpctx, const char *, 0);
@ -643,10 +626,11 @@ static bool db_sqlite3_delete_columns(struct db *db,
const char **coldefs, **oldcolnames;
size_t colnames_found = 0;
parts = prepare_table_manip(tmpctx, db, tablename);
parts = prepare_table_manip(NULL, db, tablename);
if (!parts)
return false;
tal_steal(tmpctx, parts);
coldefs = tal_arr(tmpctx, const char *, 0);
oldcolnames = tal_arr(tmpctx, const char *, 0);

View file

@ -61,11 +61,38 @@ static void db_stmt_free(struct db_stmt *stmt)
}
static struct db_stmt *db_prepare_core(struct db *db,
const char *location,
const struct db_query *db_query)
{
struct db_stmt *stmt = tal(db, struct db_stmt);
size_t num_slots = db_query->placeholders;
/* Allocate the slots for placeholders/bindings, zeroed next since
* that sets the type to DB_BINDING_UNINITIALIZED for later checks. */
stmt->bindings = tal_arrz(stmt, struct db_binding, num_slots);
stmt->location = location;
stmt->error = NULL;
stmt->db = db;
stmt->query = db_query;
stmt->executed = false;
stmt->inner_stmt = NULL;
tal_add_destructor(stmt, db_stmt_free);
list_add(&db->pending_statements, &stmt->list);
#if DEVELOPER
stmt->cols_used = NULL;
#endif /* DEVELOPER */
return stmt;
}
struct db_stmt *db_prepare_v2_(const char *location, struct db *db,
const char *query_id)
{
struct db_stmt *stmt = tal(db, struct db_stmt);
size_t num_slots, pos;
size_t pos;
/* Normalize query_id paths, because unit tests are compiled with this
* prefix. */
@ -81,39 +108,32 @@ struct db_stmt *db_prepare_v2_(const char *location, struct db *db,
for (;;) {
if (!db->queries->query_table[pos].name)
db_fatal("Could not resolve query %s", query_id);
if (streq(query_id, db->queries->query_table[pos].name)) {
stmt->query = &db->queries->query_table[pos];
if (streq(query_id, db->queries->query_table[pos].name))
break;
}
pos = (pos + 1) % db->queries->query_table_size;
}
num_slots = stmt->query->placeholders;
/* Allocate the slots for placeholders/bindings, zeroed next since
* that sets the type to DB_BINDING_UNINITIALIZED for later checks. */
stmt->bindings = tal_arr(stmt, struct db_binding, num_slots);
for (size_t i=0; i<num_slots; i++)
stmt->bindings[i].type = DB_BINDING_UNINITIALIZED;
stmt->location = location;
stmt->error = NULL;
stmt->db = db;
stmt->executed = false;
stmt->inner_stmt = NULL;
tal_add_destructor(stmt, db_stmt_free);
list_add(&db->pending_statements, &stmt->list);
#if DEVELOPER
stmt->cols_used = NULL;
#endif /* DEVELOPER */
return stmt;
return db_prepare_core(db, location, &db->queries->query_table[pos]);
}
#define db_prepare_v2(db,query) \
db_prepare_v2_(__FILE__ ":" stringify(__LINE__), db, query)
/* Provides replication and hook interface for raw SQL too */
struct db_stmt *db_prepare_untranslated(struct db *db, const char *query)
{
struct db_query *db_query = tal(NULL, struct db_query);
struct db_stmt *stmt;
db_query->name = db_query->query = query;
db_query->placeholders = strcount(query, "?");
db_query->readonly = false;
/* Use raw accessors! */
db_query->colnames = NULL;
db_query->num_colnames = 0;
stmt = db_prepare_core(db, "db_prepare_untranslated", db_query);
tal_steal(stmt, db_query);
return stmt;
}
bool db_query_prepared(struct db_stmt *stmt)
{

View file

@ -97,4 +97,12 @@ void db_assert_no_outstanding_statements(struct db *db);
*/
const char **db_changes(struct db *db);
/**
* Accessor for internal use.
*
* Like db_prepare_v2() but creates temporary noop translation, and
* assumes not a read-only op. Use this inside db-specific backends
* to re-use the normal db hook and replication logic.
*/
struct db_stmt *db_prepare_untranslated(struct db *db, const char *query);
#endif /* LIGHTNING_DB_UTILS_H */

View file

@ -201,17 +201,22 @@ static bool test_manip_columns(void)
CHECK_MSG(db_exec_prepared_v2(stmt), "db_exec_prepared must succeed");
CHECK_MSG(!db_err, "Simple correct SQL command");
tal_free(stmt);
/* Don't let it try to set a version field (we don't have one!) */
db->dirty = false;
db->changes = tal_arr(db, const char *, 0);
db_commit_transaction(db);
/* Needs vars table, since this changes db. */
stmt = db_prepare_v2(db, SQL("CREATE TABLE vars (name VARCHAR(32), intval);"));
CHECK_MSG(db_exec_prepared_v2(stmt), "db_exec_prepared must succeed");
CHECK_MSG(!db_err, "Simple correct SQL command");
tal_free(stmt);
stmt = db_prepare_v2(db, SQL("INSERT INTO vars VALUES ('data_version', 0);"));
CHECK_MSG(db_exec_prepared_v2(stmt), "db_exec_prepared must succeed");
CHECK_MSG(!db_err, "Simple correct SQL command");
tal_free(stmt);
/* Rename tablea.field1 -> table1.field1a. */
CHECK(db->config->rename_column(db, "tablea", "field1", "field1a"));
/* Remove tableb.field1. */
CHECK(db->config->delete_columns(db, "tableb", &field1, 1));
db_begin_transaction(db);
stmt = db_prepare_v2(db, SQL("SELECT id, field1a FROM tablea;"));
CHECK_MSG(db_query_prepared(stmt), "db_query_prepared must succeed");
CHECK_MSG(!db_err, "Simple correct SQL command");