Skip to content

Commit fa8ef00

Browse files
authored
fix: deserialize parquet error when stream's base table modify column type (#18828)
* fix * add test * fix * fix * fix * fix * fix * fix * fix * fix test * fix * fix
1 parent a7973fd commit fa8ef00

File tree

5 files changed

+99
-14
lines changed

5 files changed

+99
-14
lines changed

src/query/expression/src/schema.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,13 @@ impl TableSchema {
647647
Ok(i)
648648
}
649649

650+
pub fn drop_column_unchecked(&mut self, column: &str) -> Result<FieldIndex> {
651+
let i = self.index_of(column)?;
652+
self.fields.remove(i);
653+
654+
Ok(i)
655+
}
656+
650657
pub fn to_leaf_column_id_set(&self) -> HashSet<ColumnId> {
651658
HashSet::from_iter(self.to_leaf_column_ids().iter().cloned())
652659
}

src/query/service/src/interpreters/interpreter_table_drop.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ impl Interpreter for DropTableInterpreter {
9393
let engine = tbl.get_table_info().engine();
9494
if matches!(engine, VIEW_ENGINE | STREAM_ENGINE) {
9595
return Err(ErrorCode::TableEngineNotSupported(format!(
96-
"{}.{} engine is {} that doesn't support drop, use `DROP {} {}.{}` instead",
96+
"{}.{} engine is {}, use `DROP {} {}.{}` instead",
9797
&self.plan.database,
9898
&self.plan.table,
9999
engine,

src/query/service/src/interpreters/interpreter_table_modify_column.rs

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::HashSet;
15+
use std::collections::HashMap;
1616
use std::sync::Arc;
1717

1818
use databend_common_catalog::catalog::Catalog;
@@ -151,28 +151,28 @@ impl ModifyTableColumnInterpreter {
151151
let schema = table.schema().as_ref().clone();
152152
let table_info = table.get_table_info();
153153
let mut new_schema = schema.clone();
154-
let mut default_expr_binder = DefaultExprBinder::try_new(self.ctx.clone())?;
155154
// first check default expr before lock table
156155
for (field, _comment) in field_and_comments {
157156
if let Some((i, old_field)) = schema.column_with_name(&field.name) {
158157
// if the field has different leaf column numbers, we need drop the old column
159158
// and add a new one to generate new column id. otherwise, leaf column ids will conflict.
160159
if old_field.data_type.num_leaf_columns() != field.data_type.num_leaf_columns() {
161-
let _ = new_schema.drop_column(&field.name);
162-
let _ = new_schema.add_column(field, i);
160+
let _ = new_schema.drop_column_unchecked(&field.name)?;
161+
new_schema.add_column(field, i)?;
163162
} else {
164163
// new field don't have `column_id`, assign field directly will cause `column_id` lost.
165164
new_schema.fields[i].data_type = field.data_type.clone();
166165
// TODO: support set computed field.
167166
new_schema.fields[i].computed_expr = field.computed_expr.clone();
168167
}
168+
169169
if let Some(default_expr) = &field.default_expr {
170170
let default_expr = default_expr.to_string();
171171
new_schema.fields[i].default_expr = Some(default_expr);
172-
let _ = default_expr_binder.get_scalar(&new_schema.fields[i])?;
173172
} else {
174173
new_schema.fields[i].default_expr = None;
175174
}
175+
176176
if old_field.data_type != field.data_type {
177177
// Check if this column is referenced by computed columns.
178178
let data_schema = DataSchema::from(&new_schema);
@@ -256,11 +256,12 @@ impl ModifyTableColumnInterpreter {
256256
return Ok(PipelineBuildResult::create());
257257
}
258258

259-
let mut modified_field_indices = HashSet::new();
259+
let mut modified_default_scalars = HashMap::new();
260+
let mut default_expr_binder = DefaultExprBinder::try_new(self.ctx.clone())?;
260261
let new_schema_without_computed_fields = new_schema.remove_computed_fields();
262+
let format_as_parquet = fuse_table.storage_format_as_parquet();
261263
if schema != new_schema {
262264
for (field, _) in field_and_comments {
263-
let field_index = new_schema_without_computed_fields.index_of(&field.name)?;
264265
let old_field = schema.field_with_name(&field.name)?;
265266
let is_alter_column_string_to_binary =
266267
is_string_to_binary(&old_field.data_type, &field.data_type);
@@ -269,20 +270,25 @@ impl ModifyTableColumnInterpreter {
269270
// 1. alter column from string to binary in parquet or data type not changed.
270271
// 2. default expr and computed expr not changed. Otherwise, we need fill value for
271272
// new added column.
272-
if ((table.storage_format_as_parquet() && is_alter_column_string_to_binary)
273+
if ((format_as_parquet && is_alter_column_string_to_binary)
273274
|| old_field.data_type == field.data_type)
274275
&& old_field.default_expr == field.default_expr
275276
&& old_field.computed_expr == field.computed_expr
276277
{
277278
continue;
278279
}
279-
modified_field_indices.insert(field_index);
280+
let field_index = new_schema_without_computed_fields.index_of(&field.name)?;
281+
let default_scalar = default_expr_binder
282+
.get_scalar(&new_schema_without_computed_fields.fields[field_index])?;
283+
modified_default_scalars.insert(field_index, default_scalar);
280284
}
281285
table_info.meta.schema = new_schema.clone().into();
282286
}
283287

284288
// if don't need rebuild table, only update table meta.
285-
if modified_field_indices.is_empty() {
289+
if modified_default_scalars.is_empty()
290+
|| base_snapshot.is_none_or(|v| v.summary.row_count == 0)
291+
{
286292
commit_table_meta(
287293
&self.ctx,
288294
table.as_ref(),
@@ -295,14 +301,24 @@ impl ModifyTableColumnInterpreter {
295301
return Ok(PipelineBuildResult::create());
296302
}
297303

304+
if fuse_table.change_tracking_enabled() {
305+
// Modifying columns while change tracking is active may break
306+
// the consistency between tracked changes and the current table schema,
307+
// leading to incorrect or incomplete change records.
308+
return Err(ErrorCode::AlterTableError(format!(
309+
"table {} has change tracking enabled, modifying columns should be avoided",
310+
table_info.desc
311+
)));
312+
}
313+
298314
// construct sql for selecting data from old table.
299315
// computed columns are ignored, as it is build from other columns.
300316
let query_fields = new_schema_without_computed_fields
301317
.fields()
302318
.iter()
303319
.enumerate()
304320
.map(|(index, field)| {
305-
if modified_field_indices.contains(&index) {
321+
if let Some(default_scalar) = modified_default_scalars.get(&index) {
306322
let old_field = schema.field_with_name(&field.name).unwrap();
307323
let need_remove_nullable =
308324
old_field.data_type.is_nullable() && !field.data_type.is_nullable();
@@ -427,7 +443,13 @@ impl ModifyTableColumnInterpreter {
427443
}
428444
(_, _) => {
429445
if need_remove_nullable {
430-
format!("remove_nullable(`{}`)", field.name)
446+
// If the column is being changed from NULLABLE to NOT NULL,
447+
// wrap it with `coalesce()` to replace NULL values with the default,
448+
// and `remove_nullable()` to mark the resulting expression as non-nullable.
449+
format!(
450+
"remove_nullable(coalesce(`{}`, {}))",
451+
field.name, default_scalar
452+
)
431453
} else {
432454
format!("`{}`", field.name)
433455
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
## Copyright 2023 Databend Cloud
2+
##
3+
## Licensed under the Elastic License, Version 2.0 (the "License");
4+
## you may not use this file except in compliance with the License.
5+
## You may obtain a copy of the License at
6+
##
7+
## https://www.elastic.co/licensing/elastic-license
8+
##
9+
## Unless required by applicable law or agreed to in writing, software
10+
## distributed under the License is distributed on an "AS IS" BASIS,
11+
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
## See the License for the specific language governing permissions and
13+
## limitations under the License.
14+
15+
statement ok
16+
create or replace table t_18827(a string, b int not null default 1);
17+
18+
query I
19+
insert into t_18827 values('a', 1),('b', 2);
20+
----
21+
2
22+
23+
statement ok
24+
create or replace stream s_18827 on table t_18827 append_only=false;
25+
26+
statement ok
27+
create or replace stream s1_18827 on table t_18827 append_only=true;
28+
29+
statement error 1132
30+
alter table t_18827 modify column b float64;
31+
32+
statement ok
33+
alter table t_18827 modify column a binary;
34+
35+
query T
36+
select a, b from t_18827;
37+
----
38+
61 1
39+
62 2
40+
41+
query T
42+
select a, b, change$action, change$is_update from s1_18827;
43+
----
44+
45+
query T
46+
select a, b, change$action, change$is_update from s_18827 order by change$action, a;
47+
----
48+
49+
statement ok
50+
drop stream s_18827;
51+
52+
statement ok
53+
drop stream s1_18827;
54+
55+
statement ok
56+
drop table t_18827 all;

tests/suites/0_stateless/17_altertable/17_0005_alter_table_modify_column_type.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ echo "SELECT a,b from test_modify_column_type.f order by b" | $BENDSQL_CLIENT_C
6161
echo "DESC test_modify_column_type.f" | $BENDSQL_CLIENT_CONNECT
6262

6363
echo "begin test modify column NULL to not NULL"
64-
echo "CREATE TABLE test_modify_column_type.g(a STRING NULL, b INT NULL) STORAGE_FORMAT='native'" | $BENDSQL_CLIENT_CONNECT
64+
echo "CREATE TABLE test_modify_column_type.g(a STRING NULL, b INT NULL)" | $BENDSQL_CLIENT_CONNECT
6565
echo "INSERT INTO test_modify_column_type.g VALUES('a',1),('b',NULL),(NULL,3),('d',4)" | $BENDSQL_CLIENT_CONNECT
6666
echo "SELECT a,b from test_modify_column_type.g" | $BENDSQL_CLIENT_CONNECT
6767
echo "ALTER TABLE test_modify_column_type.g MODIFY COLUMN a STRING NOT NULL" | $BENDSQL_CLIENT_CONNECT

0 commit comments

Comments
 (0)