From 4a7785ea3dcb7a84a619e2a7c6daa8233742c88b Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 16 Sep 2025 21:17:59 +0800 Subject: [PATCH 01/13] feat: new vaccum table option `ALL` - Vacuum table table ALL will vacuum all applicable tables According to table's vacuum settings and global setting Using the vauum2 implementation - Vacuum specific table will swith to the vacuumw impl by default --- .../deploy/config/databend-query-node-1.toml | 2 +- src/admin_procedures/vacuu_all.sql | 19 +++++ src/query/ast/src/ast/statements/statement.rs | 5 ++ src/query/ast/src/ast/statements/table.rs | 55 ++++++++++++-- src/query/ast/src/parser/statement.rs | 34 +++++++-- .../interpreters/access/privilege_access.rs | 11 ++- .../interpreters/interpreter_table_vacuum.rs | 63 ++++++++++++---- .../fuse_vacuum2/fuse_vacuum2_table.rs | 11 ++- src/query/sql/src/planner/binder/binder.rs | 4 + src/query/sql/src/planner/binder/ddl/table.rs | 30 ++++---- src/query/sql/src/planner/plans/ddl/table.rs | 16 +++- src/query/storages/fuse/src/operations/mod.rs | 2 + .../storages/fuse/src/operations/vacuum.rs | 73 ++++++++++++++++++- 13 files changed, 274 insertions(+), 51 deletions(-) create mode 100644 src/admin_procedures/vacuu_all.sql diff --git a/scripts/ci/deploy/config/databend-query-node-1.toml b/scripts/ci/deploy/config/databend-query-node-1.toml index d613c17a2287a..d70ceda29caaf 100644 --- a/scripts/ci/deploy/config/databend-query-node-1.toml +++ b/scripts/ci/deploy/config/databend-query-node-1.toml @@ -84,7 +84,7 @@ join_spilling_memory_ratio = 60 [log] [log.file] -level = "DEBUG" +level = "INFO" format = "text" dir = "./.databend/logs_1" limit = 12 # 12 files, 1 file per hour diff --git a/src/admin_procedures/vacuu_all.sql b/src/admin_procedures/vacuu_all.sql new file mode 100644 index 0000000000000..dadb4f85fc337 --- /dev/null +++ b/src/admin_procedures/vacuu_all.sql @@ -0,0 +1,19 @@ +-- Vacuum objects in one go +CREATE OR REPLACE PROCEDURE sys_vacuum_all() +RETURNS STRING +LANGUAGE SQL +as +$$ +begin + +-- Vacuum all temporary tables that have not been cleaned by accident (session panic, force cluster restart etc.) +SELECT * FROM FUSE_VACUUM_TEMPORARY_TABLE(); + +-- Vacuum dropped database / tables +VACUUM DROP TABLE; + +-- Vacuum all table historical data (using vacuum2) +CALL SYSTEM$FUSE_VACUUM2(); + +end; +$$ diff --git a/src/query/ast/src/ast/statements/statement.rs b/src/query/ast/src/ast/statements/statement.rs index e121a6caf0a13..b26a0ee2021a4 100644 --- a/src/query/ast/src/ast/statements/statement.rs +++ b/src/query/ast/src/ast/statements/statement.rs @@ -188,6 +188,7 @@ pub enum Statement { OptimizeTable(OptimizeTableStmt), VacuumTable(VacuumTableStmt), VacuumDropTable(VacuumDropTableStmt), + VacuumAll(VacuumAllStmt), VacuumTemporaryFiles(VacuumTemporaryFiles), AnalyzeTable(AnalyzeTableStmt), ExistsTable(ExistsTableStmt), @@ -477,6 +478,7 @@ impl Statement { | Statement::VacuumTable(..) | Statement::VacuumDropTable(..) | Statement::VacuumTemporaryFiles(..) + | Statement::VacuumAll(..) | Statement::AnalyzeTable(..) | Statement::ExistsTable(..) | Statement::ShowCreateDictionary(..) @@ -1081,6 +1083,9 @@ impl Display for Statement { Statement::RenameWorkloadGroup(stmt) => write!(f, "{stmt}")?, Statement::SetWorkloadQuotasGroup(stmt) => write!(f, "{stmt}")?, Statement::UnsetWorkloadQuotasGroup(stmt) => write!(f, "{stmt}")?, + Statement::VacuumAll(_) => { + todo!() + } } Ok(()) } diff --git a/src/query/ast/src/ast/statements/table.rs b/src/query/ast/src/ast/statements/table.rs index 4e7f8fa8e8684..2645aa999af7c 100644 --- a/src/query/ast/src/ast/statements/table.rs +++ b/src/query/ast/src/ast/statements/table.rs @@ -666,24 +666,44 @@ impl Display for TruncateTableStmt { } } + + #[derive(Debug, Clone, PartialEq, Drive, DriveMut)] -pub struct VacuumTableStmt { +pub struct VacuumTargetTable { pub catalog: Option, pub database: Option, pub table: Identifier, +} + +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub enum VacuumTarget { + Table(VacuumTargetTable), + All, +} + +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub struct VacuumTableStmt { + pub target: VacuumTarget, pub option: VacuumTableOption, } impl Display for VacuumTableStmt { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { write!(f, "VACUUM TABLE ")?; - write_dot_separated_list( - f, - self.catalog - .iter() - .chain(&self.database) - .chain(Some(&self.table)), - )?; + match &self.target { + VacuumTarget::Table(target) => { + write_dot_separated_list( + f, + target.catalog + .iter() + .chain(&target.database) + .chain(Some(&target.table)), + )?; + } + VacuumTarget::All => { + write!(f, " ALL")?; + } + } write!(f, " {}", &self.option)?; Ok(()) @@ -711,6 +731,25 @@ impl Display for VacuumDropTableStmt { } } +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub struct VacuumAllStmt { + pub catalog: Option, + pub database: Option, +} + +impl Display for VacuumAllStmt { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "VACUUM ALL")?; + if self.catalog.is_some() || self.database.is_some() { + write!(f, " FROM ")?; + write_dot_separated_list(f, self.catalog.iter().chain(&self.database))?; + write!(f, " ")?; + } + Ok(()) + } +} + + #[derive(Debug, Clone, PartialEq, Drive, DriveMut)] pub struct VacuumTemporaryFiles { pub limit: Option, diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index b5268ec985362..ffb07a32f5aae 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -1163,9 +1163,11 @@ pub fn statement_body(i: Input) -> IResult { }, |(_, _, (catalog, database, table), option)| { Statement::VacuumTable(VacuumTableStmt { - catalog, - database, - table, + target: VacuumTarget::Table(VacuumTargetTable { + catalog, + database, + table, + }), option, }) }, @@ -1186,6 +1188,20 @@ pub fn statement_body(i: Input) -> IResult { }) }, ); + + let vacuum_all = map( + rule! { + VACUUM ~ ALL ~ (FROM ~ ^#dot_separated_idents_1_to_2)? + }, + |(_, _, database_option)| { + let (catalog, database) = database_option.map_or_else( + || (None, None), + |(_, catalog_database)| (catalog_database.0, Some(catalog_database.1)), + ); + Statement::VacuumAll(VacuumAllStmt { catalog, database }) + }, + ); + let analyze_table = map( rule! { ANALYZE ~ TABLE ~ #dot_separated_idents_1_to_3 ~ NOSCAN? @@ -2572,7 +2588,6 @@ pub fn statement_body(i: Input) -> IResult { | #show_indexes : "`SHOW INDEXES`" | #show_locks : "`SHOW LOCKS [IN ACCOUNT] [WHERE ...]`" | #kill_stmt : "`KILL (QUERY | CONNECTION) `" - | #vacuum_temp_files : "VACUUM TEMPORARY FILES [RETAIN number SECONDS|DAYS] [LIMIT number]" | #set_priority: "`SET PRIORITY (HIGH | MEDIUM | LOW) `" | #system_action: "`SYSTEM (ENABLE | DISABLE) EXCEPTION_BACKTRACE`" ), @@ -2676,8 +2691,6 @@ pub fn statement_body(i: Input) -> IResult { | #rename_table : "`RENAME TABLE [.] TO `" | #truncate_table : "`TRUNCATE TABLE [.]
`" | #optimize_table : "`OPTIMIZE TABLE [.]
(ALL | PURGE | COMPACT [SEGMENT])`" - | #vacuum_table : "`VACUUM TABLE [.]
[RETAIN number HOURS] [DRY RUN | DRY RUN SUMMARY]`" - | #vacuum_drop_table : "`VACUUM DROP TABLE [FROM [.]] [RETAIN number HOURS] [DRY RUN | DRY RUN SUMMARY]`" | #analyze_table : "`ANALYZE TABLE [.]
`" | #exists_table : "`EXISTS TABLE [.]
`" | #show_table_functions : "`SHOW TABLE_FUNCTIONS []`" @@ -2801,7 +2814,14 @@ AS | #call_procedure : "`CALL PROCEDURE ()`" ), rule!(#comment), - rule!(#vacuum_temporary_tables), + // Vacuum + rule!( + #vacuum_temporary_tables + | #vacuum_temp_files : "VACUUM TEMPORARY FILES [RETAIN number SECONDS|DAYS] [LIMIT number]" + | #vacuum_table : "`VACUUM TABLE [.]
[RETAIN number HOURS] [DRY RUN | DRY RUN SUMMARY]`" + | #vacuum_drop_table : "`VACUUM DROP TABLE [FROM [.]] [RETAIN number HOURS] [DRY RUN | DRY RUN SUMMARY]`" + | #vacuum_all : "`VACUUM ALL [FROM [.]]`" + ), ))(i) } diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index c839d99b832fe..237c0834f7b09 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -38,7 +38,7 @@ use databend_common_meta_app::principal::SYSTEM_TABLES_ALLOW_LIST; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_types::SeqV; use databend_common_sql::binder::MutationType; -use databend_common_sql::plans::InsertInputSource; +use databend_common_sql::plans::{InsertInputSource, VacuumTarget}; use databend_common_sql::plans::Mutation; use databend_common_sql::plans::OptimizeCompactBlock; use databend_common_sql::plans::PresignAction; @@ -1204,7 +1204,14 @@ impl AccessChecker for PrivilegeAccess { self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Super, false, false).await? } Plan::VacuumTable(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Super, false, false).await? + match &plan.target { + VacuumTarget::Table(tgt) => { + self.validate_table_access(&tgt.catalog, &tgt.database, &tgt.table, UserPrivilegeType::Super, false, false).await? + } + VacuumTarget::All => { + self.validate_access(&GrantObject::Global, UserPrivilegeType::Super, false, false).await? + } + } } Plan::VacuumDropTable(plan) => { self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Super, false).await? diff --git a/src/query/service/src/interpreters/interpreter_table_vacuum.rs b/src/query/service/src/interpreters/interpreter_table_vacuum.rs index 78a05c04c683a..ec63e45c8161c 100644 --- a/src/query/service/src/interpreters/interpreter_table_vacuum.rs +++ b/src/query/service/src/interpreters/interpreter_table_vacuum.rs @@ -23,6 +23,9 @@ use databend_common_expression::FromData; use databend_common_license::license::Feature::Vacuum; use databend_common_license::license_manager::LicenseManagerSwitch; use databend_common_sql::plans::VacuumTablePlan; +use databend_common_sql::plans::VacuumTarget; +use databend_common_sql::plans::VacuumTargetTable; +use databend_common_storages_fuse::operations::vacuum_all_tables; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::FUSE_TBL_BLOCK_PREFIX; use databend_common_storages_fuse::FUSE_TBL_SEGMENT_PREFIX; @@ -95,26 +98,34 @@ impl VacuumTableInterpreter { index_files, }) } -} -#[async_trait::async_trait] -impl Interpreter for VacuumTableInterpreter { - fn name(&self) -> &str { - "VacuumTableInterpreter" + async fn vacuum_table(&self, target: &VacuumTargetTable) -> Result { + let handler = get_vacuum_handler(); + let table = self + .ctx + .get_table(&target.catalog, &target.database, &target.table) + .await?; + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + let target_removed = handler + .do_vacuum2(fuse_table, self.ctx.clone(), false) + .await?; + let res_block = DataBlock::new_from_columns(vec![StringType::from_data(target_removed)]); + PipelineBuildResult::from_blocks(vec![res_block]) } - fn is_ddl(&self) -> bool { - true + async fn vacuum_all(&self) -> Result { + let handler = get_vacuum_handler(); + let catalog = self.ctx.get_default_catalog()?; + let ctx: Arc = self.ctx.clone() as _; + let target_removed = vacuum_all_tables(&ctx, &handler, catalog.as_ref()).await?; + let res_block = DataBlock::new_from_columns(vec![StringType::from_data(target_removed)]); + PipelineBuildResult::from_blocks(vec![res_block]) } - #[async_backtrace::framed] - async fn execute2(&self) -> Result { - LicenseManagerSwitch::instance() - .check_enterprise_enabled(self.ctx.get_license_key(), Vacuum)?; - - let catalog_name = self.plan.catalog.clone(); - let db_name = self.plan.database.clone(); - let tbl_name = self.plan.table.clone(); + async fn legacy_vacuum_table(&self, target: &VacuumTargetTable) -> Result { + let catalog_name = target.catalog.clone(); + let db_name = target.database.clone(); + let tbl_name = target.table.clone(); let table = self .ctx .get_table(&catalog_name, &db_name, &tbl_name) @@ -183,3 +194,25 @@ impl Interpreter for VacuumTableInterpreter { } } } + +#[async_trait::async_trait] +impl Interpreter for VacuumTableInterpreter { + fn name(&self) -> &str { + "VacuumTableInterpreter" + } + + fn is_ddl(&self) -> bool { + true + } + + #[async_backtrace::framed] + async fn execute2(&self) -> Result { + LicenseManagerSwitch::instance() + .check_enterprise_enabled(self.ctx.get_license_key(), Vacuum)?; + + match &self.plan.target { + VacuumTarget::Table(tgt_table) => self.legacy_vacuum_table(tgt_table).await, + VacuumTarget::All => self.vacuum_all().await, + } + } +} diff --git a/src/query/service/src/table_functions/fuse_vacuum2/fuse_vacuum2_table.rs b/src/query/service/src/table_functions/fuse_vacuum2/fuse_vacuum2_table.rs index be84dff9d7ae2..384e6cf851c17 100644 --- a/src/query/service/src/table_functions/fuse_vacuum2/fuse_vacuum2_table.rs +++ b/src/query/service/src/table_functions/fuse_vacuum2/fuse_vacuum2_table.rs @@ -181,6 +181,14 @@ impl FuseVacuum2Table { &self, ctx: &Arc, catalog: &dyn Catalog, + ) -> Result> { + databend_common_storages_fuse::operations::vacuum_all_tables(ctx, self.handler.as_ref(), catalog).await + } + + async fn apply_all_tables_internal( + ctx: &Arc, + handler: & VacuumHandlerWrapper, + catalog: &dyn Catalog, ) -> Result> { let tenant_id = ctx.get_tenant(); let dbs = catalog.list_databases(&tenant_id).await?; @@ -229,7 +237,7 @@ impl FuseVacuum2Table { continue; } - let res = self.handler.do_vacuum2(tbl, ctx.clone(), false).await; + let res = handler.do_vacuum2(tbl, ctx.clone(), false).await; if let Err(e) = res { warn!( @@ -244,4 +252,5 @@ impl FuseVacuum2Table { Ok(vec![]) } + } diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index 4e6652443f34c..f9937a54921ac 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -329,6 +329,10 @@ impl Binder { Statement::VacuumTemporaryFiles(stmt) => { self.bind_vacuum_temporary_files(bind_context, stmt).await? } + + Statement::VacuumAll(stmt) => { + todo!() + } Statement::AnalyzeTable(stmt) => self.bind_analyze_table(stmt).await?, Statement::ExistsTable(stmt) => self.bind_exists_table(stmt).await?, // Dictionaries diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 453d77ec18504..afd569ec18011 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -119,7 +119,6 @@ use crate::parse_computed_expr_to_string; use crate::planner::semantic::normalize_identifier; use crate::planner::semantic::resolve_type_name; use crate::planner::semantic::IdentifierNormalizer; -use crate::plans::AddColumnOption; use crate::plans::AddTableColumnPlan; use crate::plans::AddTableConstraintPlan; use crate::plans::AddTableRowAccessPolicyPlan; @@ -159,6 +158,7 @@ use crate::plans::VacuumDropTablePlan; use crate::plans::VacuumTableOption; use crate::plans::VacuumTablePlan; use crate::plans::VacuumTemporaryFilesPlan; +use crate::plans::{AddColumnOption, VacuumTarget, VacuumTargetTable}; use crate::BindContext; use crate::DefaultExprBinder; use crate::Planner; @@ -1503,23 +1503,27 @@ impl Binder { _bind_context: &mut BindContext, stmt: &VacuumTableStmt, ) -> Result { - let VacuumTableStmt { - catalog, - database, - table, - option, - } = stmt; - let (catalog, database, table) = - self.normalize_object_identifier_triple(catalog, database, table); + let target = match &stmt.target { + databend_common_ast::ast::VacuumTarget::Table(tgt) => { + let (catalog, database, table) = + self.normalize_object_identifier_triple(&tgt.catalog, &tgt.database, &tgt.table); + VacuumTarget::Table(VacuumTargetTable { + catalog, database, table, + }) + } + databend_common_ast::ast::VacuumTarget::All => { + VacuumTarget::All + } + } ; + + let option = VacuumTableOption { - dry_run: option.dry_run, + dry_run: stmt.option.dry_run, }; Ok(Plan::VacuumTable(Box::new(VacuumTablePlan { - catalog, - database, - table, + target, option, }))) } diff --git a/src/query/sql/src/planner/plans/ddl/table.rs b/src/query/sql/src/planner/plans/ddl/table.rs index ef024a42a84bc..b82b67918766f 100644 --- a/src/query/sql/src/planner/plans/ddl/table.rs +++ b/src/query/sql/src/planner/plans/ddl/table.rs @@ -105,12 +105,24 @@ impl DropTablePlan { } } -/// Vacuum + #[derive(Clone, Debug)] -pub struct VacuumTablePlan { +pub struct VacuumTargetTable { pub catalog: String, pub database: String, pub table: String, +} + +#[derive(Clone, Debug)] +pub enum VacuumTarget { + Table(VacuumTargetTable), + All, +} + +/// Vacuum +#[derive(Clone, Debug)] +pub struct VacuumTablePlan { + pub target: VacuumTarget, pub option: VacuumTableOption, } diff --git a/src/query/storages/fuse/src/operations/mod.rs b/src/query/storages/fuse/src/operations/mod.rs index 911ab15637abe..f3723982a9391 100644 --- a/src/query/storages/fuse/src/operations/mod.rs +++ b/src/query/storages/fuse/src/operations/mod.rs @@ -57,3 +57,5 @@ pub use util::column_parquet_metas; pub use util::read_block; pub use util::set_backoff; pub use vacuum::vacuum_tables_from_info; +pub use vacuum::vacuum_all_tables; +pub use vacuum::vacuum_table; diff --git a/src/query/storages/fuse/src/operations/vacuum.rs b/src/query/storages/fuse/src/operations/vacuum.rs index 506bebf56a078..26a8e045ea573 100644 --- a/src/query/storages/fuse/src/operations/vacuum.rs +++ b/src/query/storages/fuse/src/operations/vacuum.rs @@ -16,14 +16,14 @@ use std::sync::Arc; -use databend_common_catalog::table::TableExt; +use databend_common_catalog::table::{Table, TableExt}; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_meta_app::schema::TableInfo; use databend_enterprise_vacuum_handler::VacuumHandlerWrapper; use log::info; use log::warn; - +use databend_common_catalog::catalog::Catalog; use crate::FuseTable; pub async fn vacuum_table( @@ -55,6 +55,7 @@ pub async fn vacuum_tables_from_info( table_infos: Vec, ctx: Arc, vacuum_handler: Arc, + ) -> Result<()> { for table_info in table_infos { let table = FuseTable::do_create(table_info)? @@ -66,3 +67,71 @@ pub async fn vacuum_tables_from_info( Ok(()) } + +pub async fn vacuum_all_tables( + ctx: &Arc, + handler: & VacuumHandlerWrapper, + catalog: &dyn Catalog, +) -> Result> { + let tenant_id = ctx.get_tenant(); + let dbs = catalog.list_databases(&tenant_id).await?; + let num_db = dbs.len(); + + for (idx_db, db) in dbs.iter().enumerate() { + if db.engine().to_uppercase() == "SYSTEM" { + info!("Bypass system database [{}]", db.name()); + continue; + } + + info!( + "Processing db {}, progress: {}/{}", + db.name(), + idx_db + 1, + num_db + ); + let tables = catalog.list_tables(&tenant_id, db.name()).await?; + info!("Found {} tables in db {}", tables.len(), db.name()); + + let num_tbl = tables.len(); + for (idx_tbl, table) in tables.iter().enumerate() { + info!( + "Processing table {}.{}, db level progress: {}/{}", + db.name(), + table.get_table_info().name, + idx_tbl + 1, + num_tbl + ); + + let Ok(tbl) = FuseTable::try_from_table(table.as_ref()) else { + info!( + "Bypass non-fuse table {}.{}", + db.name(), + table.get_table_info().name + ); + continue; + }; + + if tbl.is_read_only() { + info!( + "Bypass read only table {}.{}", + db.name(), + table.get_table_info().name + ); + continue; + } + + let res = handler.do_vacuum2(tbl, ctx.clone(), false).await; + + if let Err(e) = res { + warn!( + "vacuum2 table {}.{} failed: {}", + db.name(), + table.get_table_info().name, + e + ); + }; + } + } + + Ok(vec![]) +} From 2885ceebf91bc9cd857edc7a66f26af67195efb6 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 16 Sep 2025 22:55:40 +0800 Subject: [PATCH 02/13] add settings `fallback_to_legacy_vacuum` --- src/query/ast/src/ast/statements/table.rs | 6 +- .../interpreters/access/privilege_access.rs | 3 +- .../interpreters/interpreter_table_vacuum.rs | 52 ++++++------ .../fuse_vacuum2/fuse_vacuum2_table.rs | 79 ++----------------- src/query/settings/src/settings_default.rs | 11 +++ .../settings/src/settings_getter_setter.rs | 4 + src/query/sql/src/planner/binder/binder.rs | 2 +- src/query/sql/src/planner/binder/ddl/table.rs | 37 +++++---- src/query/sql/src/planner/plans/ddl/table.rs | 11 ++- src/query/storages/fuse/src/operations/mod.rs | 2 +- .../storages/fuse/src/operations/vacuum.rs | 57 ++++++------- 11 files changed, 114 insertions(+), 150 deletions(-) diff --git a/src/query/ast/src/ast/statements/table.rs b/src/query/ast/src/ast/statements/table.rs index 2645aa999af7c..297efe8ab8781 100644 --- a/src/query/ast/src/ast/statements/table.rs +++ b/src/query/ast/src/ast/statements/table.rs @@ -666,8 +666,6 @@ impl Display for TruncateTableStmt { } } - - #[derive(Debug, Clone, PartialEq, Drive, DriveMut)] pub struct VacuumTargetTable { pub catalog: Option, @@ -694,7 +692,8 @@ impl Display for VacuumTableStmt { VacuumTarget::Table(target) => { write_dot_separated_list( f, - target.catalog + target + .catalog .iter() .chain(&target.database) .chain(Some(&target.table)), @@ -749,7 +748,6 @@ impl Display for VacuumAllStmt { } } - #[derive(Debug, Clone, PartialEq, Drive, DriveMut)] pub struct VacuumTemporaryFiles { pub limit: Option, diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index 237c0834f7b09..7d4804920a62c 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -38,11 +38,12 @@ use databend_common_meta_app::principal::SYSTEM_TABLES_ALLOW_LIST; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_types::SeqV; use databend_common_sql::binder::MutationType; -use databend_common_sql::plans::{InsertInputSource, VacuumTarget}; +use databend_common_sql::plans::InsertInputSource; use databend_common_sql::plans::Mutation; use databend_common_sql::plans::OptimizeCompactBlock; use databend_common_sql::plans::PresignAction; use databend_common_sql::plans::RewriteKind; +use databend_common_sql::plans::VacuumTarget; use databend_common_sql::Planner; use databend_common_users::RoleCacheManager; use databend_common_users::UserApiProvider; diff --git a/src/query/service/src/interpreters/interpreter_table_vacuum.rs b/src/query/service/src/interpreters/interpreter_table_vacuum.rs index ec63e45c8161c..2cb4b486d6fb4 100644 --- a/src/query/service/src/interpreters/interpreter_table_vacuum.rs +++ b/src/query/service/src/interpreters/interpreter_table_vacuum.rs @@ -147,29 +147,27 @@ impl VacuumTableInterpreter { match purge_files_opt { None => { - return { - let stat = self.get_statistics(fuse_table).await?; - let total_files = stat.snapshot_files.0 - + stat.segment_files.0 - + stat.block_files.0 - + stat.index_files.0; - let total_size = stat.snapshot_files.1 - + stat.segment_files.1 - + stat.block_files.1 - + stat.index_files.1; - PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ - UInt64Type::from_data(vec![stat.snapshot_files.0]), - UInt64Type::from_data(vec![stat.snapshot_files.1]), - UInt64Type::from_data(vec![stat.segment_files.0]), - UInt64Type::from_data(vec![stat.segment_files.1]), - UInt64Type::from_data(vec![stat.block_files.0]), - UInt64Type::from_data(vec![stat.block_files.1]), - UInt64Type::from_data(vec![stat.index_files.0]), - UInt64Type::from_data(vec![stat.index_files.1]), - UInt64Type::from_data(vec![total_files]), - UInt64Type::from_data(vec![total_size]), - ])]) - }; + let stat = self.get_statistics(fuse_table).await?; + let total_files = stat.snapshot_files.0 + + stat.segment_files.0 + + stat.block_files.0 + + stat.index_files.0; + let total_size = stat.snapshot_files.1 + + stat.segment_files.1 + + stat.block_files.1 + + stat.index_files.1; + PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ + UInt64Type::from_data(vec![stat.snapshot_files.0]), + UInt64Type::from_data(vec![stat.snapshot_files.1]), + UInt64Type::from_data(vec![stat.segment_files.0]), + UInt64Type::from_data(vec![stat.segment_files.1]), + UInt64Type::from_data(vec![stat.block_files.0]), + UInt64Type::from_data(vec![stat.block_files.1]), + UInt64Type::from_data(vec![stat.index_files.0]), + UInt64Type::from_data(vec![stat.index_files.1]), + UInt64Type::from_data(vec![total_files]), + UInt64Type::from_data(vec![total_size]), + ])]) } Some(purge_files) => { let mut file_sizes = vec![]; @@ -211,7 +209,13 @@ impl Interpreter for VacuumTableInterpreter { .check_enterprise_enabled(self.ctx.get_license_key(), Vacuum)?; match &self.plan.target { - VacuumTarget::Table(tgt_table) => self.legacy_vacuum_table(tgt_table).await, + VacuumTarget::Table(tgt_table) => { + if self.plan.use_legacy_vacuum { + self.legacy_vacuum_table(tgt_table).await + } else { + self.vacuum_table(tgt_table).await + } + } VacuumTarget::All => self.vacuum_all().await, } } diff --git a/src/query/service/src/table_functions/fuse_vacuum2/fuse_vacuum2_table.rs b/src/query/service/src/table_functions/fuse_vacuum2/fuse_vacuum2_table.rs index 384e6cf851c17..fcbd3b94b7beb 100644 --- a/src/query/service/src/table_functions/fuse_vacuum2/fuse_vacuum2_table.rs +++ b/src/query/service/src/table_functions/fuse_vacuum2/fuse_vacuum2_table.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use databend_common_catalog::catalog::Catalog; use databend_common_catalog::catalog_kind::CATALOG_DEFAULT; use databend_common_catalog::plan::DataSourcePlan; -use databend_common_catalog::table::Table; use databend_common_catalog::table::TableExt; use databend_common_catalog::table_args::TableArgs; use databend_common_exception::ErrorCode; @@ -40,8 +39,6 @@ use databend_common_storages_fuse::table_functions::SimpleTableFunc; use databend_common_storages_fuse::FuseTable; use databend_enterprise_vacuum_handler::get_vacuum_handler; use databend_enterprise_vacuum_handler::VacuumHandlerWrapper; -use log::info; -use log::warn; use crate::sessions::TableContext; @@ -182,75 +179,11 @@ impl FuseVacuum2Table { ctx: &Arc, catalog: &dyn Catalog, ) -> Result> { - databend_common_storages_fuse::operations::vacuum_all_tables(ctx, self.handler.as_ref(), catalog).await + databend_common_storages_fuse::operations::vacuum_all_tables( + ctx, + self.handler.as_ref(), + catalog, + ) + .await } - - async fn apply_all_tables_internal( - ctx: &Arc, - handler: & VacuumHandlerWrapper, - catalog: &dyn Catalog, - ) -> Result> { - let tenant_id = ctx.get_tenant(); - let dbs = catalog.list_databases(&tenant_id).await?; - let num_db = dbs.len(); - - for (idx_db, db) in dbs.iter().enumerate() { - if db.engine().to_uppercase() == "SYSTEM" { - info!("Bypass system database [{}]", db.name()); - continue; - } - - info!( - "Processing db {}, progress: {}/{}", - db.name(), - idx_db + 1, - num_db - ); - let tables = catalog.list_tables(&tenant_id, db.name()).await?; - info!("Found {} tables in db {}", tables.len(), db.name()); - - let num_tbl = tables.len(); - for (idx_tbl, table) in tables.iter().enumerate() { - info!( - "Processing table {}.{}, db level progress: {}/{}", - db.name(), - table.get_table_info().name, - idx_tbl + 1, - num_tbl - ); - - let Ok(tbl) = FuseTable::try_from_table(table.as_ref()) else { - info!( - "Bypass non-fuse table {}.{}", - db.name(), - table.get_table_info().name - ); - continue; - }; - - if tbl.is_read_only() { - info!( - "Bypass read only table {}.{}", - db.name(), - table.get_table_info().name - ); - continue; - } - - let res = handler.do_vacuum2(tbl, ctx.clone(), false).await; - - if let Err(e) = res { - warn!( - "vacuum2 table {}.{} failed: {}", - db.name(), - table.get_table_info().name, - e - ); - }; - } - } - - Ok(vec![]) - } - } diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 1fba150428237..0501cd2e4746b 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1480,6 +1480,17 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), }), + + // Note : + // - This is a temporary flag which will be removed when legacy vacuum impl is removed from codebase. + // - This flag has no effect on `VACUUM TABLE ALL` which will use the vacuum2 impl unconditionally. + ("fallback_to_legacy_vacuum", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Using legacy vacuum implementation when vacuum a specific table", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ]); Ok(Arc::new(DefaultSettings { diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 3ba9d9b0fa2d0..7eada4e9d68b5 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -1087,4 +1087,8 @@ impl Settings { pub fn get_queries_queue_retry_timeout(&self) -> Result { self.try_get_u64("queries_queue_retry_timeout") } + + pub fn get_fallback_to_legacy_vacuum(&self) -> Result { + Ok(self.try_get_u64("fallback_to_legacy_vacuum")? != 1) + } } diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index f9937a54921ac..4581f29683444 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -330,7 +330,7 @@ impl Binder { self.bind_vacuum_temporary_files(bind_context, stmt).await? } - Statement::VacuumAll(stmt) => { + Statement::VacuumAll(_stmt) => { todo!() } Statement::AnalyzeTable(stmt) => self.bind_analyze_table(stmt).await?, diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index afd569ec18011..87e61c0221727 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -119,6 +119,7 @@ use crate::parse_computed_expr_to_string; use crate::planner::semantic::normalize_identifier; use crate::planner::semantic::resolve_type_name; use crate::planner::semantic::IdentifierNormalizer; +use crate::plans::AddColumnOption; use crate::plans::AddTableColumnPlan; use crate::plans::AddTableConstraintPlan; use crate::plans::AddTableRowAccessPolicyPlan; @@ -157,8 +158,9 @@ use crate::plans::VacuumDropTableOption; use crate::plans::VacuumDropTablePlan; use crate::plans::VacuumTableOption; use crate::plans::VacuumTablePlan; +use crate::plans::VacuumTarget; +use crate::plans::VacuumTargetTable; use crate::plans::VacuumTemporaryFilesPlan; -use crate::plans::{AddColumnOption, VacuumTarget, VacuumTargetTable}; use crate::BindContext; use crate::DefaultExprBinder; use crate::Planner; @@ -1503,21 +1505,23 @@ impl Binder { _bind_context: &mut BindContext, stmt: &VacuumTableStmt, ) -> Result { - - let target = match &stmt.target { - databend_common_ast::ast::VacuumTarget::Table(tgt) => { - let (catalog, database, table) = - self.normalize_object_identifier_triple(&tgt.catalog, &tgt.database, &tgt.table); - VacuumTarget::Table(VacuumTargetTable { - catalog, database, table, - }) - } - databend_common_ast::ast::VacuumTarget::All => { - VacuumTarget::All - } - } ; - - + let use_legacy_vacuum = self.ctx.get_settings().get_fallback_to_legacy_vacuum()?; + + let target = match &stmt.target { + databend_common_ast::ast::VacuumTarget::Table(tgt) => { + let (catalog, database, table) = self.normalize_object_identifier_triple( + &tgt.catalog, + &tgt.database, + &tgt.table, + ); + VacuumTarget::Table(VacuumTargetTable { + catalog, + database, + table, + }) + } + databend_common_ast::ast::VacuumTarget::All => VacuumTarget::All, + }; let option = VacuumTableOption { dry_run: stmt.option.dry_run, @@ -1525,6 +1529,7 @@ impl Binder { Ok(Plan::VacuumTable(Box::new(VacuumTablePlan { target, option, + use_legacy_vacuum, }))) } diff --git a/src/query/sql/src/planner/plans/ddl/table.rs b/src/query/sql/src/planner/plans/ddl/table.rs index b82b67918766f..f6429f1de0ec3 100644 --- a/src/query/sql/src/planner/plans/ddl/table.rs +++ b/src/query/sql/src/planner/plans/ddl/table.rs @@ -105,7 +105,6 @@ impl DropTablePlan { } } - #[derive(Clone, Debug)] pub struct VacuumTargetTable { pub catalog: String, @@ -115,7 +114,7 @@ pub struct VacuumTargetTable { #[derive(Clone, Debug)] pub enum VacuumTarget { - Table(VacuumTargetTable), + Table(VacuumTargetTable), All, } @@ -124,10 +123,18 @@ pub enum VacuumTarget { pub struct VacuumTablePlan { pub target: VacuumTarget, pub option: VacuumTableOption, + pub use_legacy_vacuum: bool, } impl VacuumTablePlan { pub fn schema(&self) -> DataSchemaRef { + if !self.use_legacy_vacuum { + return Arc::new(DataSchema::new(vec![DataField::new( + "object_removed", + DataType::String, + )])); + } + if let Some(summary) = self.option.dry_run { if summary { Arc::new(DataSchema::new(vec![ diff --git a/src/query/storages/fuse/src/operations/mod.rs b/src/query/storages/fuse/src/operations/mod.rs index f3723982a9391..96551fb38ee83 100644 --- a/src/query/storages/fuse/src/operations/mod.rs +++ b/src/query/storages/fuse/src/operations/mod.rs @@ -56,6 +56,6 @@ pub use util::acquire_task_permit; pub use util::column_parquet_metas; pub use util::read_block; pub use util::set_backoff; -pub use vacuum::vacuum_tables_from_info; pub use vacuum::vacuum_all_tables; pub use vacuum::vacuum_table; +pub use vacuum::vacuum_tables_from_info; diff --git a/src/query/storages/fuse/src/operations/vacuum.rs b/src/query/storages/fuse/src/operations/vacuum.rs index 26a8e045ea573..08889e361841c 100644 --- a/src/query/storages/fuse/src/operations/vacuum.rs +++ b/src/query/storages/fuse/src/operations/vacuum.rs @@ -16,14 +16,16 @@ use std::sync::Arc; -use databend_common_catalog::table::{Table, TableExt}; +use databend_common_catalog::catalog::Catalog; +use databend_common_catalog::table::Table; +use databend_common_catalog::table::TableExt; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_meta_app::schema::TableInfo; use databend_enterprise_vacuum_handler::VacuumHandlerWrapper; use log::info; use log::warn; -use databend_common_catalog::catalog::Catalog; + use crate::FuseTable; pub async fn vacuum_table( @@ -55,7 +57,6 @@ pub async fn vacuum_tables_from_info( table_infos: Vec, ctx: Arc, vacuum_handler: Arc, - ) -> Result<()> { for table_info in table_infos { let table = FuseTable::do_create(table_info)? @@ -70,7 +71,7 @@ pub async fn vacuum_tables_from_info( pub async fn vacuum_all_tables( ctx: &Arc, - handler: & VacuumHandlerWrapper, + handler: &VacuumHandlerWrapper, catalog: &dyn Catalog, ) -> Result> { let tenant_id = ctx.get_tenant(); @@ -84,39 +85,39 @@ pub async fn vacuum_all_tables( } info!( - "Processing db {}, progress: {}/{}", - db.name(), - idx_db + 1, - num_db - ); + "Processing db {}, progress: {}/{}", + db.name(), + idx_db + 1, + num_db + ); let tables = catalog.list_tables(&tenant_id, db.name()).await?; info!("Found {} tables in db {}", tables.len(), db.name()); let num_tbl = tables.len(); for (idx_tbl, table) in tables.iter().enumerate() { info!( - "Processing table {}.{}, db level progress: {}/{}", - db.name(), - table.get_table_info().name, - idx_tbl + 1, - num_tbl - ); + "Processing table {}.{}, db level progress: {}/{}", + db.name(), + table.get_table_info().name, + idx_tbl + 1, + num_tbl + ); let Ok(tbl) = FuseTable::try_from_table(table.as_ref()) else { info!( - "Bypass non-fuse table {}.{}", - db.name(), - table.get_table_info().name - ); + "Bypass non-fuse table {}.{}", + db.name(), + table.get_table_info().name + ); continue; }; if tbl.is_read_only() { info!( - "Bypass read only table {}.{}", - db.name(), - table.get_table_info().name - ); + "Bypass read only table {}.{}", + db.name(), + table.get_table_info().name + ); continue; } @@ -124,11 +125,11 @@ pub async fn vacuum_all_tables( if let Err(e) = res { warn!( - "vacuum2 table {}.{} failed: {}", - db.name(), - table.get_table_info().name, - e - ); + "vacuum2 table {}.{} failed: {}", + db.name(), + table.get_table_info().name, + e + ); }; } } From 5568557aa46e76209c3d51c3dfa2bb246454675f Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 17 Sep 2025 08:46:45 +0800 Subject: [PATCH 03/13] tweak unit test --- src/query/ast/tests/it/testdata/stmt.txt | 72 ++++++++++++++---------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/src/query/ast/tests/it/testdata/stmt.txt b/src/query/ast/tests/it/testdata/stmt.txt index 83aed60f959e9..09f0e59213534 100644 --- a/src/query/ast/tests/it/testdata/stmt.txt +++ b/src/query/ast/tests/it/testdata/stmt.txt @@ -15459,16 +15459,20 @@ VACUUM TABLE t ---------- AST ------------ VacuumTable( VacuumTableStmt { - catalog: None, - database: None, - table: Identifier { - span: Some( - 13..14, - ), - name: "t", - quote: None, - ident_type: None, - }, + target: Table( + VacuumTargetTable { + catalog: None, + database: None, + table: Identifier { + span: Some( + 13..14, + ), + name: "t", + quote: None, + ident_type: None, + }, + }, + ), option: VacuumTableOption { dry_run: None, }, @@ -15483,16 +15487,20 @@ VACUUM TABLE t DRY RUN ---------- AST ------------ VacuumTable( VacuumTableStmt { - catalog: None, - database: None, - table: Identifier { - span: Some( - 13..14, - ), - name: "t", - quote: None, - ident_type: None, - }, + target: Table( + VacuumTargetTable { + catalog: None, + database: None, + table: Identifier { + span: Some( + 13..14, + ), + name: "t", + quote: None, + ident_type: None, + }, + }, + ), option: VacuumTableOption { dry_run: Some( false, @@ -15509,16 +15517,20 @@ VACUUM TABLE t DRY RUN SUMMARY ---------- AST ------------ VacuumTable( VacuumTableStmt { - catalog: None, - database: None, - table: Identifier { - span: Some( - 13..14, - ), - name: "t", - quote: None, - ident_type: None, - }, + target: Table( + VacuumTargetTable { + catalog: None, + database: None, + table: Identifier { + span: Some( + 13..14, + ), + name: "t", + quote: None, + ident_type: None, + }, + }, + ), option: VacuumTableOption { dry_run: Some( true, From 2c8e797e02544234191dd7afee093b95f9b4d130 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 17 Sep 2025 09:17:22 +0800 Subject: [PATCH 04/13] remove VacuumAll, VacuumTable is enough --- src/query/ast/src/ast/statements/statement.rs | 5 ---- src/query/ast/src/ast/statements/table.rs | 18 ----------- src/query/ast/src/parser/statement.rs | 30 +++++++++---------- src/query/sql/src/planner/binder/binder.rs | 3 -- 4 files changed, 15 insertions(+), 41 deletions(-) diff --git a/src/query/ast/src/ast/statements/statement.rs b/src/query/ast/src/ast/statements/statement.rs index b26a0ee2021a4..e121a6caf0a13 100644 --- a/src/query/ast/src/ast/statements/statement.rs +++ b/src/query/ast/src/ast/statements/statement.rs @@ -188,7 +188,6 @@ pub enum Statement { OptimizeTable(OptimizeTableStmt), VacuumTable(VacuumTableStmt), VacuumDropTable(VacuumDropTableStmt), - VacuumAll(VacuumAllStmt), VacuumTemporaryFiles(VacuumTemporaryFiles), AnalyzeTable(AnalyzeTableStmt), ExistsTable(ExistsTableStmt), @@ -478,7 +477,6 @@ impl Statement { | Statement::VacuumTable(..) | Statement::VacuumDropTable(..) | Statement::VacuumTemporaryFiles(..) - | Statement::VacuumAll(..) | Statement::AnalyzeTable(..) | Statement::ExistsTable(..) | Statement::ShowCreateDictionary(..) @@ -1083,9 +1081,6 @@ impl Display for Statement { Statement::RenameWorkloadGroup(stmt) => write!(f, "{stmt}")?, Statement::SetWorkloadQuotasGroup(stmt) => write!(f, "{stmt}")?, Statement::UnsetWorkloadQuotasGroup(stmt) => write!(f, "{stmt}")?, - Statement::VacuumAll(_) => { - todo!() - } } Ok(()) } diff --git a/src/query/ast/src/ast/statements/table.rs b/src/query/ast/src/ast/statements/table.rs index 297efe8ab8781..d168d1dcf547d 100644 --- a/src/query/ast/src/ast/statements/table.rs +++ b/src/query/ast/src/ast/statements/table.rs @@ -730,24 +730,6 @@ impl Display for VacuumDropTableStmt { } } -#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] -pub struct VacuumAllStmt { - pub catalog: Option, - pub database: Option, -} - -impl Display for VacuumAllStmt { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "VACUUM ALL")?; - if self.catalog.is_some() || self.database.is_some() { - write!(f, " FROM ")?; - write_dot_separated_list(f, self.catalog.iter().chain(&self.database))?; - write!(f, " ")?; - } - Ok(()) - } -} - #[derive(Debug, Clone, PartialEq, Drive, DriveMut)] pub struct VacuumTemporaryFiles { pub limit: Option, diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index ffb07a32f5aae..5823580725d2c 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -1172,6 +1172,19 @@ pub fn statement_body(i: Input) -> IResult { }) }, ); + + let vacuum_all_table = map( + rule! { + VACUUM ~ TABLE ~ ALL ~ #vacuum_table_option + }, + |(_, _, _, option)| { + Statement::VacuumTable(VacuumTableStmt { + target: VacuumTarget::All, + option, + }) + }, + ); + let vacuum_drop_table = map( rule! { VACUUM ~ DROP ~ TABLE ~ (FROM ~ ^#dot_separated_idents_1_to_2)? ~ #vacuum_drop_table_option @@ -1189,19 +1202,6 @@ pub fn statement_body(i: Input) -> IResult { }, ); - let vacuum_all = map( - rule! { - VACUUM ~ ALL ~ (FROM ~ ^#dot_separated_idents_1_to_2)? - }, - |(_, _, database_option)| { - let (catalog, database) = database_option.map_or_else( - || (None, None), - |(_, catalog_database)| (catalog_database.0, Some(catalog_database.1)), - ); - Statement::VacuumAll(VacuumAllStmt { catalog, database }) - }, - ); - let analyze_table = map( rule! { ANALYZE ~ TABLE ~ #dot_separated_idents_1_to_3 ~ NOSCAN? @@ -2818,9 +2818,9 @@ AS rule!( #vacuum_temporary_tables | #vacuum_temp_files : "VACUUM TEMPORARY FILES [RETAIN number SECONDS|DAYS] [LIMIT number]" - | #vacuum_table : "`VACUUM TABLE [.]
[RETAIN number HOURS] [DRY RUN | DRY RUN SUMMARY]`" + | #vacuum_table : "`VACUUM TABLE [.]
[DRY RUN | DRY RUN SUMMARY]`" + | #vacuum_all_table : "`VACUUM TABLE ALL [DRY RUN | DRY RUN SUMMARY]`" | #vacuum_drop_table : "`VACUUM DROP TABLE [FROM [.]] [RETAIN number HOURS] [DRY RUN | DRY RUN SUMMARY]`" - | #vacuum_all : "`VACUUM ALL [FROM [.]]`" ), ))(i) } diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index 4581f29683444..cae70b425e087 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -330,9 +330,6 @@ impl Binder { self.bind_vacuum_temporary_files(bind_context, stmt).await? } - Statement::VacuumAll(_stmt) => { - todo!() - } Statement::AnalyzeTable(stmt) => self.bind_analyze_table(stmt).await?, Statement::ExistsTable(stmt) => self.bind_exists_table(stmt).await?, // Dictionaries From d1483092b178ac62c2b1e1128e3f34e1d4118d0d Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 17 Sep 2025 12:00:30 +0800 Subject: [PATCH 05/13] adjust result schema --- .../interpreters/interpreter_table_vacuum.rs | 11 +++-- .../interpreters/interpreter_txn_commit.rs | 2 +- .../fuse_vacuum2/fuse_vacuum2_table.rs | 41 ++++++++++++------- src/query/sql/src/planner/plans/ddl/table.rs | 4 +- .../storages/fuse/src/operations/vacuum.rs | 33 ++++++++------- 5 files changed, 56 insertions(+), 35 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_table_vacuum.rs b/src/query/service/src/interpreters/interpreter_table_vacuum.rs index 2cb4b486d6fb4..80fc2ff8ebe98 100644 --- a/src/query/service/src/interpreters/interpreter_table_vacuum.rs +++ b/src/query/service/src/interpreters/interpreter_table_vacuum.rs @@ -106,10 +106,12 @@ impl VacuumTableInterpreter { .get_table(&target.catalog, &target.database, &target.table) .await?; let fuse_table = FuseTable::try_from_table(table.as_ref())?; - let target_removed = handler + let obj_removed = handler .do_vacuum2(fuse_table, self.ctx.clone(), false) .await?; - let res_block = DataBlock::new_from_columns(vec![StringType::from_data(target_removed)]); + let num_obj_removed = obj_removed.len() as u64; + let res_block = + DataBlock::new_from_columns(vec![UInt64Type::from_data(vec![num_obj_removed])]); PipelineBuildResult::from_blocks(vec![res_block]) } @@ -117,8 +119,9 @@ impl VacuumTableInterpreter { let handler = get_vacuum_handler(); let catalog = self.ctx.get_default_catalog()?; let ctx: Arc = self.ctx.clone() as _; - let target_removed = vacuum_all_tables(&ctx, &handler, catalog.as_ref()).await?; - let res_block = DataBlock::new_from_columns(vec![StringType::from_data(target_removed)]); + let num_obj_removed = vacuum_all_tables(&ctx, &handler, catalog.as_ref()).await?; + let res_block = + DataBlock::new_from_columns(vec![UInt64Type::from_data(vec![num_obj_removed])]); PipelineBuildResult::from_blocks(vec![res_block]) } diff --git a/src/query/service/src/interpreters/interpreter_txn_commit.rs b/src/query/service/src/interpreters/interpreter_txn_commit.rs index 6f13fd58647f4..f8af9c3adabc7 100644 --- a/src/query/service/src/interpreters/interpreter_txn_commit.rs +++ b/src/query/service/src/interpreters/interpreter_txn_commit.rs @@ -103,7 +103,7 @@ pub async fn execute_commit_statement(ctx: Arc) -> Result<()> if let Err(e) = vacuum_tables_from_info(tables_need_purge, ctx.clone(), handler).await { - warn!( "Failed to vacuum tables after transaction commit (best-effort operation): {e}"); + warn!("Failed to vacuum tables after transaction commit : {e}"); } else { info!( "{num_tables} tables vacuumed after transaction commit in a best-effort manner" ); } diff --git a/src/query/service/src/table_functions/fuse_vacuum2/fuse_vacuum2_table.rs b/src/query/service/src/table_functions/fuse_vacuum2/fuse_vacuum2_table.rs index fcbd3b94b7beb..1f7e7baa72a8b 100644 --- a/src/query/service/src/table_functions/fuse_vacuum2/fuse_vacuum2_table.rs +++ b/src/query/service/src/table_functions/fuse_vacuum2/fuse_vacuum2_table.rs @@ -21,7 +21,9 @@ use databend_common_catalog::table::TableExt; use databend_common_catalog::table_args::TableArgs; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::types::NumberDataType; use databend_common_expression::types::StringType; +use databend_common_expression::types::UInt64Type; use databend_common_expression::DataBlock; use databend_common_expression::FromData; use databend_common_expression::TableDataType; @@ -80,7 +82,15 @@ impl SimpleTableFunc for FuseVacuum2Table { } fn schema(&self) -> TableSchemaRef { - TableSchemaRefExt::create(vec![TableField::new("vacuumed", TableDataType::String)]) + match &self.args { + Vacuum2TableArgs::SingleTable { .. } => { + TableSchemaRefExt::create(vec![TableField::new("vacuumed", TableDataType::String)]) + } + Vacuum2TableArgs::All => TableSchemaRefExt::create(vec![TableField::new( + "num_object_removed", + TableDataType::Number(NumberDataType::UInt64), + )]), + } } async fn apply( @@ -97,20 +107,23 @@ impl SimpleTableFunc for FuseVacuum2Table { arg_table_name, respect_flash_back, } => { - self.apply_single_table( - ctx, - catalog.as_ref(), - arg_database_name, - arg_table_name, - respect_flash_back.unwrap_or_default(), - ) - .await? + let obj_removed = self + .apply_single_table( + ctx, + catalog.as_ref(), + arg_database_name, + arg_table_name, + respect_flash_back.unwrap_or_default(), + ) + .await?; + DataBlock::new_from_columns(vec![StringType::from_data(obj_removed)]) + } + Vacuum2TableArgs::All => { + let num_obj_removed = self.apply_all_tables(ctx, catalog.as_ref()).await?; + DataBlock::new_from_columns(vec![UInt64Type::from_data(vec![num_obj_removed])]) } - Vacuum2TableArgs::All => self.apply_all_tables(ctx, catalog.as_ref()).await?, }; - Ok(Some(DataBlock::new_from_columns(vec![ - StringType::from_data(res), - ]))) + Ok(Some(res)) } fn create(func_name: &str, table_args: TableArgs) -> Result @@ -178,7 +191,7 @@ impl FuseVacuum2Table { &self, ctx: &Arc, catalog: &dyn Catalog, - ) -> Result> { + ) -> Result { databend_common_storages_fuse::operations::vacuum_all_tables( ctx, self.handler.as_ref(), diff --git a/src/query/sql/src/planner/plans/ddl/table.rs b/src/query/sql/src/planner/plans/ddl/table.rs index f6429f1de0ec3..1ee9b3018ba0b 100644 --- a/src/query/sql/src/planner/plans/ddl/table.rs +++ b/src/query/sql/src/planner/plans/ddl/table.rs @@ -130,8 +130,8 @@ impl VacuumTablePlan { pub fn schema(&self) -> DataSchemaRef { if !self.use_legacy_vacuum { return Arc::new(DataSchema::new(vec![DataField::new( - "object_removed", - DataType::String, + "num_object_removed", + DataType::Number(NumberDataType::UInt64), )])); } diff --git a/src/query/storages/fuse/src/operations/vacuum.rs b/src/query/storages/fuse/src/operations/vacuum.rs index 08889e361841c..4ac56c8feeeff 100644 --- a/src/query/storages/fuse/src/operations/vacuum.rs +++ b/src/query/storages/fuse/src/operations/vacuum.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -// src/query/storages/fuse/src/vacuum/mod.rs - use std::sync::Arc; use databend_common_catalog::catalog::Catalog; @@ -73,11 +71,13 @@ pub async fn vacuum_all_tables( ctx: &Arc, handler: &VacuumHandlerWrapper, catalog: &dyn Catalog, -) -> Result> { +) -> Result { let tenant_id = ctx.get_tenant(); let dbs = catalog.list_databases(&tenant_id).await?; let num_db = dbs.len(); + let mut num_obj_removed = 0; + for (idx_db, db) in dbs.iter().enumerate() { if db.engine().to_uppercase() == "SYSTEM" { info!("Bypass system database [{}]", db.name()); @@ -96,9 +96,10 @@ pub async fn vacuum_all_tables( let num_tbl = tables.len(); for (idx_tbl, table) in tables.iter().enumerate() { info!( - "Processing table {}.{}, db level progress: {}/{}", + "Processing table {}.{}, progress of db {}: {}/{}", db.name(), table.get_table_info().name, + db.name(), idx_tbl + 1, num_tbl ); @@ -123,16 +124,20 @@ pub async fn vacuum_all_tables( let res = handler.do_vacuum2(tbl, ctx.clone(), false).await; - if let Err(e) = res { - warn!( - "vacuum2 table {}.{} failed: {}", - db.name(), - table.get_table_info().name, - e - ); - }; + match res { + Ok(removed) => { + num_obj_removed += removed.len() as u64; + } + Err(e) => { + warn!( + "vacuum2 table {}.{} failed: {}", + db.name(), + table.get_table_info().name, + e + ); + } + } } } - - Ok(vec![]) + Ok(num_obj_removed) } From 09d35edab223b6dff8c2c93395e0dad8e4996051 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 17 Sep 2025 14:51:47 +0800 Subject: [PATCH 06/13] fix : typo in setting getter --- src/query/settings/src/settings_getter_setter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 7eada4e9d68b5..637d9fd20f960 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -1089,6 +1089,6 @@ impl Settings { } pub fn get_fallback_to_legacy_vacuum(&self) -> Result { - Ok(self.try_get_u64("fallback_to_legacy_vacuum")? != 1) + Ok(self.try_get_u64("fallback_to_legacy_vacuum")? == 1) } } From 5d42e7f4fab7e24f2db0bd40f5b94b7da1f3d279 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 17 Sep 2025 20:50:34 +0800 Subject: [PATCH 07/13] Add integration test --- Cargo.lock | 1 + src/query/ee/Cargo.toml | 1 + .../it/storages/fuse/operations/vacuum2.rs | 169 +++++++++++------- .../ee/tests/it/testdata/vacuum_all_stmt.txt | 5 + src/query/sql/src/planner/plans/ddl/table.rs | 2 +- 5 files changed, 114 insertions(+), 64 deletions(-) create mode 100644 src/query/ee/tests/it/testdata/vacuum_all_stmt.txt diff --git a/Cargo.lock b/Cargo.lock index 07a476125133e..725a2c376bbb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4861,6 +4861,7 @@ dependencies = [ "derive-visitor", "futures", "futures-util", + "goldenfile", "jsonb", "jwt-simple", "log", diff --git a/src/query/ee/Cargo.toml b/src/query/ee/Cargo.toml index c3ed823e7435f..a87f8985f68fe 100644 --- a/src/query/ee/Cargo.toml +++ b/src/query/ee/Cargo.toml @@ -66,6 +66,7 @@ uuid = { workspace = true } [dev-dependencies] databend-common-functions = { workspace = true } +goldenfile = { workspace = true } jsonb = { workspace = true } tantivy = { workspace = true } walkdir = { workspace = true } diff --git a/src/query/ee/tests/it/storages/fuse/operations/vacuum2.rs b/src/query/ee/tests/it/storages/fuse/operations/vacuum2.rs index f799d0464846a..c5932d859405c 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/vacuum2.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/vacuum2.rs @@ -17,23 +17,84 @@ use std::path::Path; use databend_common_base::base::tokio; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::block_debug::pretty_format_blocks; +use databend_common_expression::DataBlock; use databend_enterprise_query::test_kits::context::EESetup; use databend_query::sessions::QueryContext; use databend_query::sessions::TableContext; use databend_query::test_kits::TestFixture; +use futures_util::TryStreamExt; +use goldenfile::Mint; // TODO investigate this // NOTE: SHOULD specify flavor = "multi_thread", otherwise query execution might be hanged #[tokio::test(flavor = "multi_thread")] -async fn test_vacuum2_all() -> Result<()> { +async fn test_table_function_fuse_vacuum2_all() -> Result<()> { let ee_setup = EESetup::new(); let fixture = TestFixture::setup_with_custom(ee_setup).await?; + + setup(&fixture).await?; + + + // vacuum them all + let res = fixture.execute_command("call system$fuse_vacuum2()").await; + + // Check that: + + // 1. non-fuse tables should not stop us + + assert!(res.is_ok()); + + // 2. fuse table data should be vacuumed + + let storage_root = fixture.storage_root(); + + let ctx = fixture.new_query_ctx().await?; + check_files_left(&ctx, storage_root, "db1", "t1").await?; + check_files_left(&ctx, storage_root, "default", "t1").await?; + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_vacuum_all_stmt() -> Result<()> { + let ee_setup = EESetup::new(); + let fixture = TestFixture::setup_with_custom(ee_setup).await?; + + setup(&fixture).await?; + + let mut mint = Mint::new("tests/it/testdata"); + let file = &mut mint.new_goldenfile("vacuum_all_stmt.txt").unwrap(); + + // execute the VACUUM TABLE ALL + let res_block = fixture + .execute_query("vacuum table all") + .await? + .try_collect::>() + .await?; + + let block_string = pretty_format_blocks(&res_block).unwrap(); + use std::io::Write; + writeln!(file, "{}", block_string).unwrap(); + + // Check: fuse table data should be vacuumed + + let storage_root = fixture.storage_root(); + + let ctx = fixture.new_query_ctx().await?; + check_files_left(&ctx, storage_root, "db1", "t1").await?; + check_files_left(&ctx, storage_root, "default", "t1").await?; + + Ok(()) +} + +async fn setup(fixture: &TestFixture) -> Result<()>{ + // Adjust retention period to 0, so that dropped tables will be vacuumed immediately let session = fixture.default_session(); session.get_settings().set_data_retention_time_in_days(0)?; - let ctx = fixture.new_query_ctx().await?; - + // Prepare test db / tables let setup_statements = vec![ // create non-system db1, create fuse and non-fuse table in it. "create database db1", @@ -53,67 +114,49 @@ async fn test_vacuum2_all() -> Result<()> { for stmt in setup_statements { fixture.execute_command(stmt).await?; } - - // vacuum them all - let res = fixture.execute_command("call system$fuse_vacuum2()").await; - - // Check that: - - // 1. non-fuse tables should not stop us - - assert!(res.is_ok()); - - // 2. fuse table data should be vacuumed - - let storage_root = fixture.storage_root(); - - async fn check_files_left( - ctx: &QueryContext, - storage_root: &str, - db_name: &str, - tbl_name: &str, - ) -> Result<()> { - let tenant = ctx.get_tenant(); - let table = ctx - .get_default_catalog()? - .get_table(&tenant, db_name, tbl_name) - .await?; - - let db = ctx - .get_default_catalog()? - .get_database(&tenant, db_name) - .await?; - - let path = Path::new(storage_root) - .join(db.get_db_info().database_id.db_id.to_string()) - .join(table.get_id().to_string()); - - let walker = walkdir::WalkDir::new(path).into_iter(); - - let mut files_left = Vec::new(); - for entry in walker { - let entry = entry.unwrap(); - if entry.file_type().is_file() { - files_left.push(entry); - } + Ok(()) +} +async fn check_files_left( + ctx: &QueryContext, + storage_root: &str, + db_name: &str, + tbl_name: &str, +) -> Result<()> { + let tenant = ctx.get_tenant(); + let table = ctx + .get_default_catalog()? + .get_table(&tenant, db_name, tbl_name) + .await?; + + let db = ctx + .get_default_catalog()? + .get_database(&tenant, db_name) + .await?; + + let path = Path::new(storage_root) + .join(db.get_db_info().database_id.db_id.to_string()) + .join(table.get_id().to_string()); + + let walker = walkdir::WalkDir::new(path).into_iter(); + + let mut files_left = Vec::new(); + for entry in walker { + let entry = entry.unwrap(); + if entry.file_type().is_file() { + files_left.push(entry); } - - // There should be one snapshot file and one snapshot hint file left - assert_eq!(files_left.len(), 2); - - files_left.sort_by(|a, b| a.file_name().cmp(b.file_name())); - // First is the only snapshot left - files_left[0].path().to_string_lossy().contains("/_ss/"); - // Second one is the last snapshot location hint - files_left[1] - .path() - .to_string_lossy() - .contains("last_snapshot_location_hint_v2"); - Ok::<(), ErrorCode>(()) } - check_files_left(&ctx, storage_root, "db1", "t1").await?; - check_files_left(&ctx, storage_root, "default", "t1").await?; - - Ok(()) + // There should be one snapshot file and one snapshot hint file left + assert_eq!(files_left.len(), 2); + + files_left.sort_by(|a, b| a.file_name().cmp(b.file_name())); + // First is the only snapshot left + files_left[0].path().to_string_lossy().contains("/_ss/"); + // Second one is the last snapshot location hint + files_left[1] + .path() + .to_string_lossy() + .contains("last_snapshot_location_hint_v2"); + Ok::<(), ErrorCode>(()) } diff --git a/src/query/ee/tests/it/testdata/vacuum_all_stmt.txt b/src/query/ee/tests/it/testdata/vacuum_all_stmt.txt new file mode 100644 index 0000000000000..7e9094bd36643 --- /dev/null +++ b/src/query/ee/tests/it/testdata/vacuum_all_stmt.txt @@ -0,0 +1,5 @@ ++----------+ +| Column 0 | ++----------+ +| 20 | ++----------+ diff --git a/src/query/sql/src/planner/plans/ddl/table.rs b/src/query/sql/src/planner/plans/ddl/table.rs index 1ee9b3018ba0b..ac2f0d3d8d529 100644 --- a/src/query/sql/src/planner/plans/ddl/table.rs +++ b/src/query/sql/src/planner/plans/ddl/table.rs @@ -130,7 +130,7 @@ impl VacuumTablePlan { pub fn schema(&self) -> DataSchemaRef { if !self.use_legacy_vacuum { return Arc::new(DataSchema::new(vec![DataField::new( - "num_object_removed", + "num_of_objects_removed", DataType::Number(NumberDataType::UInt64), )])); } From 85c004d5bee575fa1367860812c783c05d5a610e Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 17 Sep 2025 20:52:01 +0800 Subject: [PATCH 08/13] revert test config --- scripts/ci/deploy/config/databend-query-node-1.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/ci/deploy/config/databend-query-node-1.toml b/scripts/ci/deploy/config/databend-query-node-1.toml index d70ceda29caaf..d613c17a2287a 100644 --- a/scripts/ci/deploy/config/databend-query-node-1.toml +++ b/scripts/ci/deploy/config/databend-query-node-1.toml @@ -84,7 +84,7 @@ join_spilling_memory_ratio = 60 [log] [log.file] -level = "INFO" +level = "DEBUG" format = "text" dir = "./.databend/logs_1" limit = 12 # 12 files, 1 file per hour From acdd29647a2be71ead9a526e4135ad7717b4ee08 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 17 Sep 2025 20:53:13 +0800 Subject: [PATCH 09/13] remove test admin procedure --- src/admin_procedures/vacuu_all.sql | 19 ------------------- 1 file changed, 19 deletions(-) delete mode 100644 src/admin_procedures/vacuu_all.sql diff --git a/src/admin_procedures/vacuu_all.sql b/src/admin_procedures/vacuu_all.sql deleted file mode 100644 index dadb4f85fc337..0000000000000 --- a/src/admin_procedures/vacuu_all.sql +++ /dev/null @@ -1,19 +0,0 @@ --- Vacuum objects in one go -CREATE OR REPLACE PROCEDURE sys_vacuum_all() -RETURNS STRING -LANGUAGE SQL -as -$$ -begin - --- Vacuum all temporary tables that have not been cleaned by accident (session panic, force cluster restart etc.) -SELECT * FROM FUSE_VACUUM_TEMPORARY_TABLE(); - --- Vacuum dropped database / tables -VACUUM DROP TABLE; - --- Vacuum all table historical data (using vacuum2) -CALL SYSTEM$FUSE_VACUUM2(); - -end; -$$ From e71ce63ee4728afd098921183e19d5189c081aa3 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 17 Sep 2025 21:26:09 +0800 Subject: [PATCH 10/13] fmt --- src/query/ee/tests/it/storages/fuse/operations/vacuum2.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/query/ee/tests/it/storages/fuse/operations/vacuum2.rs b/src/query/ee/tests/it/storages/fuse/operations/vacuum2.rs index c5932d859405c..7284ffd841cd0 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/vacuum2.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/vacuum2.rs @@ -35,7 +35,6 @@ async fn test_table_function_fuse_vacuum2_all() -> Result<()> { setup(&fixture).await?; - // vacuum them all let res = fixture.execute_command("call system$fuse_vacuum2()").await; @@ -88,8 +87,7 @@ async fn test_vacuum_all_stmt() -> Result<()> { Ok(()) } -async fn setup(fixture: &TestFixture) -> Result<()>{ - +async fn setup(fixture: &TestFixture) -> Result<()> { // Adjust retention period to 0, so that dropped tables will be vacuumed immediately let session = fixture.default_session(); session.get_settings().set_data_retention_time_in_days(0)?; From 71165d8b0dc238078e05ca8aad83c40c7f486378 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 17 Sep 2025 22:42:10 +0800 Subject: [PATCH 11/13] tweak stateless tests --- tests/suites/5_ee/01_vacuum/01_0000_ee_vacuum.py | 4 ++-- .../suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh | 8 ++++---- .../5_ee/01_vacuum/01_003_vacuum_table_only_orphans.sh | 2 +- .../5_ee/04_attach_read_only/04_0001_check_mutations.sh | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/suites/5_ee/01_vacuum/01_0000_ee_vacuum.py b/tests/suites/5_ee/01_vacuum/01_0000_ee_vacuum.py index 0ed3b191b2ce6..3afe8c1a08931 100755 --- a/tests/suites/5_ee/01_vacuum/01_0000_ee_vacuum.py +++ b/tests/suites/5_ee/01_vacuum/01_0000_ee_vacuum.py @@ -66,7 +66,7 @@ def compact_data(name): mycursor.execute("select a from gc_test order by a;") old_datas = mycursor.fetchall() - mycursor.execute("vacuum table gc_test dry run;") + mycursor.execute("settings (fallback_to_legacy_vacuum=1) vacuum table gc_test dry run;") datas = mycursor.fetchall() print(datas) @@ -76,7 +76,7 @@ def compact_data(name): if old_datas != datas: print("vacuum dry run lose data: %s : %s" % (old_datas, datas)) - client1.send("vacuum table gc_test;") + client1.send("settings (fallback_to_legacy_vacuum=1) vacuum table gc_test;") client1.expect(prompt) mycursor.execute("select a from gc_test order by a;") diff --git a/tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh b/tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh index 0f4fb8b2453b8..3483cb4150254 100755 --- a/tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh +++ b/tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh @@ -9,12 +9,12 @@ echo "CREATE DATABASE test_vacuum_drop_dry_run" | $BENDSQL_CLIENT_CONNECT echo "create table test_vacuum_drop_dry_run.a(c int)" | $BENDSQL_CLIENT_CONNECT echo "INSERT INTO test_vacuum_drop_dry_run.a VALUES (1)" | $BENDSQL_CLIENT_OUTPUT_NULL echo "drop table test_vacuum_drop_dry_run.a" | $BENDSQL_CLIENT_CONNECT -count=$(echo "set data_retention_time_in_days=0; vacuum drop table dry run" | $BENDSQL_CLIENT_CONNECT | wc -l) +count=$(echo "set data_retention_time_in_days=0; settings (fallback_to_legacy_vacuum=1) vacuum drop table dry run" | $BENDSQL_CLIENT_CONNECT | wc -l) if [[ ! "$count" ]]; then echo "vacuum drop table dry run, count:$count" exit 1 fi -count=$(echo "set data_retention_time_in_days=0; vacuum drop table dry run summary" | $BENDSQL_CLIENT_CONNECT | wc -l) +count=$(echo "set data_retention_time_in_days=0; settings (fallback_to_legacy_vacuum=1) vacuum drop table dry run summary" | $BENDSQL_CLIENT_CONNECT | wc -l) if [[ ! "$count" ]]; then echo "vacuum drop table dry run summary, count:$count" exit 1 @@ -44,7 +44,7 @@ echo "INSERT INTO test_vacuum_drop.b VALUES (2)" | $BENDSQL_CLIENT_OUTPUT_NULL echo "drop table test_vacuum_drop.b" | $BENDSQL_CLIENT_CONNECT -echo "vacuum drop table from test_vacuum_drop" | $BENDSQL_CLIENT_CONNECT > /dev/null +echo "settings (fallback_to_legacy_vacuum=1) vacuum drop table from test_vacuum_drop" | $BENDSQL_CLIENT_CONNECT > /dev/null echo "undrop table test_vacuum_drop.b" | $BENDSQL_CLIENT_CONNECT @@ -125,7 +125,7 @@ if [[ "$count" != "4" ]]; then echo "vacuum table, count:$count" exit 1 fi -count=$(echo "set data_retention_time_in_days=0; vacuum table test_vacuum_drop_4.c dry run summary" | $BENDSQL_CLIENT_CONNECT | wc -l) +count=$(echo "set data_retention_time_in_days=0; settings (fallback_to_legacy_vacuum=1) vacuum table test_vacuum_drop_4.c dry run summary" | $BENDSQL_CLIENT_CONNECT | wc -l) if [[ "$count" != "1" ]]; then echo "vacuum table dry run summary, count:$count" exit 1 diff --git a/tests/suites/5_ee/01_vacuum/01_003_vacuum_table_only_orphans.sh b/tests/suites/5_ee/01_vacuum/01_003_vacuum_table_only_orphans.sh index f86d0cd356963..6ad630c23d7e1 100755 --- a/tests/suites/5_ee/01_vacuum/01_003_vacuum_table_only_orphans.sh +++ b/tests/suites/5_ee/01_vacuum/01_003_vacuum_table_only_orphans.sh @@ -57,7 +57,7 @@ ls -l /tmp/test_vacuum_table_only_orphans/"$PREFIX"/_sg/ | wc -l ls -l /tmp/test_vacuum_table_only_orphans/"$PREFIX"/_i_b_v2/ | wc -l -stmt "set data_retention_time_in_days=0; vacuum table test_vacuum_table_only_orphans.a" > /dev/null +stmt "set data_retention_time_in_days=0; settings (fallback_to_legacy_vacuum=1) vacuum table test_vacuum_table_only_orphans.a" > /dev/null echo "after vacuum" diff --git a/tests/suites/5_ee/04_attach_read_only/04_0001_check_mutations.sh b/tests/suites/5_ee/04_attach_read_only/04_0001_check_mutations.sh index e797fa52e6f06..e25ef3012ec6e 100755 --- a/tests/suites/5_ee/04_attach_read_only/04_0001_check_mutations.sh +++ b/tests/suites/5_ee/04_attach_read_only/04_0001_check_mutations.sh @@ -20,12 +20,12 @@ echo "CREATE INVERTED INDEX IF NOT EXISTS idx1 ON test_attach_only.test_json_rea echo "vacuum table" echo "vacuum table should fail" -echo "VACUUM TABLE test_attach_only.test_json_read_only;" | $BENDSQL_CLIENT_CONNECT +echo "settings (fallback_to_legacy_vacuum=1) VACUUM TABLE test_attach_only.test_json_read_only;" | $BENDSQL_CLIENT_CONNECT echo "vacuum drop table from db should not include the read_only attach table" # drop & vacuum echo "drop table test_attach_only.test_json_read_only" | $BENDSQL_CLIENT_CONNECT -echo "vacuum drop table from test_attach_only" | $BENDSQL_CLIENT_CONNECT > /dev/null +echo "settings (fallback_to_legacy_vacuum=1) vacuum drop table from test_attach_only" | $BENDSQL_CLIENT_CONNECT > /dev/null # attach it back echo "attach table test_attach_only.test_json_read_only 's3://testbucket/data/$storage_prefix' connection=(access_key_id ='minioadmin' secret_access_key ='minioadmin' endpoint_url='${STORAGE_S3_ENDPOINT_URL}')" | $BENDSQL_CLIENT_CONNECT echo "expect table data still there" From 3592fa81213bebd3d1ad06b882b2650eb62f5e14 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 17 Sep 2025 23:46:46 +0800 Subject: [PATCH 12/13] tweak stateless test --- .../service/src/interpreters/interpreter_view_describe.rs | 1 + .../suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_view_describe.rs b/src/query/service/src/interpreters/interpreter_view_describe.rs index 3b2b08f4f6dfc..523b718c0b5ea 100644 --- a/src/query/service/src/interpreters/interpreter_view_describe.rs +++ b/src/query/service/src/interpreters/interpreter_view_describe.rs @@ -65,6 +65,7 @@ impl Interpreter for DescribeViewInterpreter { if let Some(query) = tbl_info.options().get(QUERY) { let mut planner = Planner::new(self.ctx.clone()); let (plan, _) = planner.plan_sql(query).await?; + eprintln!("plan schema: {:#?}", plan.schema()); infer_table_schema(&plan.schema()) } else { return Err(ErrorCode::Internal( diff --git a/tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh b/tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh index 3483cb4150254..ec5b74a7e7be0 100755 --- a/tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh +++ b/tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh @@ -9,12 +9,12 @@ echo "CREATE DATABASE test_vacuum_drop_dry_run" | $BENDSQL_CLIENT_CONNECT echo "create table test_vacuum_drop_dry_run.a(c int)" | $BENDSQL_CLIENT_CONNECT echo "INSERT INTO test_vacuum_drop_dry_run.a VALUES (1)" | $BENDSQL_CLIENT_OUTPUT_NULL echo "drop table test_vacuum_drop_dry_run.a" | $BENDSQL_CLIENT_CONNECT -count=$(echo "set data_retention_time_in_days=0; settings (fallback_to_legacy_vacuum=1) vacuum drop table dry run" | $BENDSQL_CLIENT_CONNECT | wc -l) +count=$(echo "set data_retention_time_in_days=0; set fallback_to_legacy_vacuum=1; vacuum drop table dry run" | $BENDSQL_CLIENT_CONNECT | wc -l) if [[ ! "$count" ]]; then echo "vacuum drop table dry run, count:$count" exit 1 fi -count=$(echo "set data_retention_time_in_days=0; settings (fallback_to_legacy_vacuum=1) vacuum drop table dry run summary" | $BENDSQL_CLIENT_CONNECT | wc -l) +count=$(echo "set data_retention_time_in_days=0; set fallback_to_legacy_vacuum=1; vacuum drop table dry run summary" | $BENDSQL_CLIENT_CONNECT | wc -l) if [[ ! "$count" ]]; then echo "vacuum drop table dry run summary, count:$count" exit 1 @@ -44,7 +44,7 @@ echo "INSERT INTO test_vacuum_drop.b VALUES (2)" | $BENDSQL_CLIENT_OUTPUT_NULL echo "drop table test_vacuum_drop.b" | $BENDSQL_CLIENT_CONNECT -echo "settings (fallback_to_legacy_vacuum=1) vacuum drop table from test_vacuum_drop" | $BENDSQL_CLIENT_CONNECT > /dev/null +echo "set fallback_to_legacy_vacuum=1; vacuum drop table from test_vacuum_drop" | $BENDSQL_CLIENT_CONNECT > /dev/null echo "undrop table test_vacuum_drop.b" | $BENDSQL_CLIENT_CONNECT @@ -125,7 +125,7 @@ if [[ "$count" != "4" ]]; then echo "vacuum table, count:$count" exit 1 fi -count=$(echo "set data_retention_time_in_days=0; settings (fallback_to_legacy_vacuum=1) vacuum table test_vacuum_drop_4.c dry run summary" | $BENDSQL_CLIENT_CONNECT | wc -l) +count=$(echo "set data_retention_time_in_days=0; set fallback_to_legacy_vacuum = 1; vacuum table test_vacuum_drop_4.c dry run summary" | $BENDSQL_CLIENT_CONNECT | wc -l) if [[ "$count" != "1" ]]; then echo "vacuum table dry run summary, count:$count" exit 1 From dcc6ce49f2cc1a7ee6272963359fa667c9a987e0 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Thu, 18 Sep 2025 14:52:07 +0800 Subject: [PATCH 13/13] tweak stateless test --- tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh b/tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh index ec5b74a7e7be0..4fdf37bc8a0e5 100755 --- a/tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh +++ b/tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh @@ -120,7 +120,7 @@ echo "select * from test_vacuum_drop_4.b" | $BENDSQL_CLIENT_CONNECT ## test vacuum table output echo "create table test_vacuum_drop_4.c(c int)" | $BENDSQL_CLIENT_CONNECT echo "INSERT INTO test_vacuum_drop_4.c VALUES (1),(2)" | $BENDSQL_CLIENT_OUTPUT_NULL -count=$(echo "set data_retention_time_in_days=0; vacuum table test_vacuum_drop_4.c" | $BENDSQL_CLIENT_CONNECT | awk '{print $9}') +count=$(echo "set data_retention_time_in_days=0; set fallback_to_legacy_vacuum = 1; vacuum table test_vacuum_drop_4.c" | $BENDSQL_CLIENT_CONNECT | awk '{print $9}') if [[ "$count" != "4" ]]; then echo "vacuum table, count:$count" exit 1