diff --git a/lib/pg_repack.sql.in b/lib/pg_repack.sql.in index b059645..e682cd5 100644 --- a/lib/pg_repack.sql.in +++ b/lib/pg_repack.sql.in @@ -307,11 +307,11 @@ CREATE VIEW repack.tables AS 'DELETE FROM repack.log_' || R.oid AS delete_log, 'LOCK TABLE ' || repack.oid2text(R.oid) || ' IN ACCESS EXCLUSIVE MODE' AS lock_table, repack.get_order_by(CK.indexrelid, R.oid) AS ckey, - 'SELECT * FROM repack.log_' || R.oid || ' ORDER BY id LIMIT $1' AS sql_peek, + 'WITH peek_data AS (SELECT * FROM repack.log_' || R.oid || ' LIMIT $1), non_inserts AS (SELECT max(id) as id FROM peek_data where pk is not null group by pk), inserts AS (SELECT id FROM peek_data WHERE pk is null), ids_to_process AS (Select id FROM non_inserts UNION Select id FROM inserts) SELECT pd.* FROM peek_data pd JOIN ids_to_process ids on pd.id = ids.id order by id' AS sql_peek, 'INSERT INTO repack.table_' || R.oid || ' VALUES ($1.*)' AS sql_insert, 'DELETE FROM repack.table_' || R.oid || ' WHERE ' || repack.get_compare_pkey(PK.indexrelid, '$1') AS sql_delete, 'UPDATE repack.table_' || R.oid || ' SET ' || repack.get_assign(R.oid, '$2') || ' WHERE ' || repack.get_compare_pkey(PK.indexrelid, '$1') AS sql_update, - 'DELETE FROM repack.log_' || R.oid || ' WHERE id IN (' AS sql_pop + 'DELETE FROM repack.log_' || R.oid || ' WHERE id <= ' AS sql_pop FROM pg_class R LEFT JOIN pg_class T ON R.reltoastrelid = T.oid LEFT JOIN repack.primary_keys PK diff --git a/lib/repack.c b/lib/repack.c index 6aa519b..8c8bd29 100644 --- a/lib/repack.c +++ b/lib/repack.c @@ -280,6 +280,7 @@ repack_apply(PG_FUNCTION_ARGS) Oid argtypes[3]; /* id, pk, row */ Datum values[3]; /* id, pk, row */ bool nulls[3]; /* id, pk, row */ + char *pkid; /* peek tuple in log */ if (count <= 0) @@ -302,10 +303,15 @@ repack_apply(PG_FUNCTION_ARGS) resetStringInfo(&sql_pop); appendStringInfoString(&sql_pop, PG_GETARG_CSTRING(4)); + pkid = NULL; + for (i = 0; i < ntuples; i++, n++) { HeapTuple tuple; - char *pkid; + + if (pkid != NULL) { + pfree(pkid); + } tuple = tuptable->vals[i]; values[0] = SPI_getbinval(tuple, desc, 1, &nulls[0]); @@ -336,23 +342,13 @@ repack_apply(PG_FUNCTION_ARGS) plan_update = repack_prepare(sql_update, 2, &argtypes[1]); execute_plan(SPI_OK_UPDATE, plan_update, &values[1], (nulls[1] ? "n" : " ")); } - - /* Add the primary key ID of each row from the log - * table we have processed so far to this - * DELETE ... IN (...) query string, so we - * can delete all the rows we have processed at-once. - */ - if (i == 0) - appendStringInfoString(&sql_pop, pkid); - else - appendStringInfo(&sql_pop, ",%s", pkid); - pfree(pkid); } /* i must be > 0 (and hence we must have some rows to delete) * since SPI_processed > 0 */ Assert(i > 0); - appendStringInfoString(&sql_pop, ");"); + appendStringInfoString(&sql_pop, pkid); + pfree(pkid); /* Bulk delete of processed rows from the log table */ execute(SPI_OK_DELETE, sql_pop.data);