Skip to content

Commit a3a020f

Browse files
authored
Fix Schema Duplication Errors in Self‑Referential INTERSECT/EXCEPT by Requalifying Input Sides (#18814)
## Which issue does this PR close? * Closes #16295. ## Rationale for this change Self-referential INTERSECT and EXCEPT queries (where both sides originate from the same table) failed during Substrait round‑trip consumption with the error: > "Schema contains duplicate qualified field name" This happened because the join-based implementation of set operations attempted to merge two identical schemas without requalification, resulting in duplicate or ambiguous field names. By ensuring both sides are requalified when needed, DataFusion can correctly construct valid logical plans for these operations. ### Before ``` ❯ cargo test --test sqllogictests -- --substrait-round-trip intersection.slt:33 Finished `test` profile [unoptimized + debuginfo] target(s) in 0.24s Running bin/sqllogictests.rs (target/debug/deps/sqllogictests-917e139464eeea33) Completed 1 test files in 0 seconds External error: 1 errors in file /Users/kosiew/GitHub/datafusion/datafusion/sqllogictest/test_files/intersection.slt 1. query failed: DataFusion error: Schema error: Schema contains duplicate qualified field name alltypes_plain.int_col ... ``` ### After ``` ❯ cargo test --test sqllogictests -- --substrait-round-trip intersection.slt:33 Finished `test` profile [unoptimized + debuginfo] target(s) in 0.64s Running bin/sqllogictests.rs (target/debug/deps/sqllogictests-917e139464eeea33) Completed 1 test files in 0 seconds ``` ## What changes are included in this PR? * Added a requalification step (`requalify_sides_if_needed`) inside `intersect_or_except` to avoid duplicate or ambiguous field names. * Improved conflict detection logic in `requalify_sides_if_needed` to handle: 1. Duplicate qualified fields 2. Duplicate unqualified fields 3. Ambiguous references (qualified vs. unqualified collisions) * Updated optimizer tests to reflect correct aliasing (`left`, `right`). * Added new Substrait round‑trip tests for: * INTERSECT and EXCEPT (both DISTINCT and ALL variants) * Self-referential queries that previously failed * Minor formatting and consistency improvements in Substrait consumer code. ## Are these changes tested? Yes. The PR includes comprehensive tests that: * Reproduce the original failure modes. * Validate that requalification produces stable and correct logical plans. * Confirm correct behavior across INTERSECT, EXCEPT, ALL, and DISTINCT cases. ## Are there any user-facing changes? No user-facing behavior changes. This is a correctness improvement ensuring that valid SQL queries—previously failing only in Substrait round‑trip mode—now work without error. ## LLM-generated code disclosure This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and validated.
1 parent 107cb5e commit a3a020f

File tree

4 files changed

+162
-22
lines changed

4 files changed

+162
-22
lines changed

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1352,6 +1352,15 @@ impl LogicalPlanBuilder {
13521352
);
13531353
}
13541354

1355+
// Requalify sides if needed to avoid duplicate qualified field names
1356+
// (e.g., when both sides reference the same table)
1357+
let left_builder = LogicalPlanBuilder::from(left_plan);
1358+
let right_builder = LogicalPlanBuilder::from(right_plan);
1359+
let (left_builder, right_builder, _requalified) =
1360+
requalify_sides_if_needed(left_builder, right_builder)?;
1361+
let left_plan = left_builder.build()?;
1362+
let right_plan = right_builder.build()?;
1363+
13551364
let join_keys = left_plan
13561365
.schema()
13571366
.fields()
@@ -1731,23 +1740,61 @@ pub fn requalify_sides_if_needed(
17311740
) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
17321741
let left_cols = left.schema().columns();
17331742
let right_cols = right.schema().columns();
1734-
if left_cols.iter().any(|l| {
1735-
right_cols.iter().any(|r| {
1736-
l == r || (l.name == r.name && (l.relation.is_none() || r.relation.is_none()))
1737-
})
1738-
}) {
1739-
// These names have no connection to the original plan, but they'll make the columns
1740-
// (mostly) unique.
1741-
Ok((
1742-
left.alias(TableReference::bare("left"))?,
1743-
right.alias(TableReference::bare("right"))?,
1744-
true,
1745-
))
1746-
} else {
1747-
Ok((left, right, false))
1743+
1744+
// Requalify if merging the schemas would cause an error during join.
1745+
// This can happen in several cases:
1746+
// 1. Duplicate qualified fields: both sides have same relation.name
1747+
// 2. Duplicate unqualified fields: both sides have same unqualified name
1748+
// 3. Ambiguous reference: one side qualified, other unqualified, same name
1749+
//
1750+
// Implementation note: This uses a simple O(n*m) nested loop rather than
1751+
// a HashMap-based O(n+m) approach. The nested loop is preferred because:
1752+
// - Schemas are typically small (in TPCH benchmark, max is 16 columns),
1753+
// so n*m is negligible
1754+
// - Early return on first conflict makes common case very fast
1755+
// - Code is simpler and easier to reason about
1756+
// - Called only during plan construction, not in execution hot path
1757+
for l in &left_cols {
1758+
for r in &right_cols {
1759+
if l.name != r.name {
1760+
continue;
1761+
}
1762+
1763+
// Same name - check if this would cause a conflict
1764+
match (&l.relation, &r.relation) {
1765+
// Both qualified with same relation - duplicate qualified field
1766+
(Some(l_rel), Some(r_rel)) if l_rel == r_rel => {
1767+
return Ok((
1768+
left.alias(TableReference::bare("left"))?,
1769+
right.alias(TableReference::bare("right"))?,
1770+
true,
1771+
));
1772+
}
1773+
// Both unqualified - duplicate unqualified field
1774+
(None, None) => {
1775+
return Ok((
1776+
left.alias(TableReference::bare("left"))?,
1777+
right.alias(TableReference::bare("right"))?,
1778+
true,
1779+
));
1780+
}
1781+
// One qualified, one not - ambiguous reference
1782+
(Some(_), None) | (None, Some(_)) => {
1783+
return Ok((
1784+
left.alias(TableReference::bare("left"))?,
1785+
right.alias(TableReference::bare("right"))?,
1786+
true,
1787+
));
1788+
}
1789+
// Different qualifiers - OK, no conflict
1790+
_ => {}
1791+
}
1792+
}
17481793
}
1749-
}
17501794

1795+
// No conflicts found
1796+
Ok((left, right, false))
1797+
}
17511798
/// Add additional "synthetic" group by expressions based on functional
17521799
/// dependencies.
17531800
///

datafusion/optimizer/tests/optimizer_integration.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -270,12 +270,14 @@ fn intersect() -> Result<()> {
270270
assert_snapshot!(
271271
format!("{plan}"),
272272
@r#"
273-
LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 = test.col_utf8
274-
Aggregate: groupBy=[[test.col_int32, test.col_utf8]], aggr=[[]]
275-
LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 = test.col_utf8
276-
Aggregate: groupBy=[[test.col_int32, test.col_utf8]], aggr=[[]]
273+
LeftSemi Join: left.col_int32 = test.col_int32, left.col_utf8 = test.col_utf8
274+
Aggregate: groupBy=[[left.col_int32, left.col_utf8]], aggr=[[]]
275+
LeftSemi Join: left.col_int32 = right.col_int32, left.col_utf8 = right.col_utf8
276+
Aggregate: groupBy=[[left.col_int32, left.col_utf8]], aggr=[[]]
277+
SubqueryAlias: left
278+
TableScan: test projection=[col_int32, col_utf8]
279+
SubqueryAlias: right
277280
TableScan: test projection=[col_int32, col_utf8]
278-
TableScan: test projection=[col_int32, col_utf8]
279281
TableScan: test projection=[col_int32, col_utf8]
280282
"#
281283
);

datafusion/substrait/src/logical_plan/consumer/rel/set_rel.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ async fn intersect_rels(
8181
rel,
8282
consumer.consume_rel(input).await?,
8383
is_all,
84-
)?
84+
)?;
8585
}
8686

8787
Ok(rel)
@@ -95,7 +95,8 @@ async fn except_rels(
9595
let mut rel = consumer.consume_rel(&rels[0]).await?;
9696

9797
for input in &rels[1..] {
98-
rel = LogicalPlanBuilder::except(rel, consumer.consume_rel(input).await?, is_all)?
98+
rel =
99+
LogicalPlanBuilder::except(rel, consumer.consume_rel(input).await?, is_all)?;
99100
}
100101

101102
Ok(rel)

datafusion/substrait/tests/cases/roundtrip_logical_plan.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1162,6 +1162,96 @@ async fn simple_intersect_table_reuse() -> Result<()> {
11621162
Ok(())
11631163
}
11641164

1165+
#[tokio::test]
1166+
async fn self_referential_intersect() -> Result<()> {
1167+
// Test INTERSECT with the same table on both sides
1168+
// This previously failed with "Schema contains duplicate qualified field name"
1169+
// The fix ensures requalify_sides_if_needed is called in intersect_or_except
1170+
// After roundtrip through Substrait, SubqueryAlias is lost and requalification
1171+
// produces "left" and "right" aliases
1172+
// Note: INTERSECT (without ALL) includes DISTINCT, but the outer Aggregate
1173+
// is optimized away, resulting in just the **LeftSemi** join
1174+
// (LeftSemi returns rows from left that exist in right)
1175+
assert_expected_plan(
1176+
"SELECT a FROM data WHERE a > 0 INTERSECT SELECT a FROM data WHERE a < 5",
1177+
"LeftSemi Join: left.a = right.a\
1178+
\n SubqueryAlias: left\
1179+
\n Aggregate: groupBy=[[data.a]], aggr=[[]]\
1180+
\n Filter: data.a > Int64(0)\
1181+
\n TableScan: data projection=[a], partial_filters=[data.a > Int64(0)]\
1182+
\n SubqueryAlias: right\
1183+
\n Filter: data.a < Int64(5)\
1184+
\n TableScan: data projection=[a], partial_filters=[data.a < Int64(5)]",
1185+
true,
1186+
)
1187+
.await
1188+
}
1189+
1190+
#[tokio::test]
1191+
async fn self_referential_except() -> Result<()> {
1192+
// Test EXCEPT with the same table on both sides
1193+
// This previously failed with "Schema contains duplicate qualified field name"
1194+
// The fix ensures requalify_sides_if_needed is called in intersect_or_except
1195+
// After roundtrip through Substrait, SubqueryAlias is lost and requalification
1196+
// produces "left" and "right" aliases
1197+
// Note: EXCEPT (without ALL) includes DISTINCT, but the outer Aggregate
1198+
// is optimized away, resulting in just the **LeftAnti** join
1199+
// (LeftAnti returns rows from left that don't exist in right)
1200+
assert_expected_plan(
1201+
"SELECT a FROM data WHERE a > 0 EXCEPT SELECT a FROM data WHERE a < 5",
1202+
"LeftAnti Join: left.a = right.a\
1203+
\n SubqueryAlias: left\
1204+
\n Aggregate: groupBy=[[data.a]], aggr=[[]]\
1205+
\n Filter: data.a > Int64(0)\
1206+
\n TableScan: data projection=[a], partial_filters=[data.a > Int64(0)]\
1207+
\n SubqueryAlias: right\
1208+
\n Filter: data.a < Int64(5)\
1209+
\n TableScan: data projection=[a], partial_filters=[data.a < Int64(5)]",
1210+
true,
1211+
)
1212+
.await
1213+
}
1214+
1215+
#[tokio::test]
1216+
async fn self_referential_intersect_all() -> Result<()> {
1217+
// Test INTERSECT ALL with the same table on both sides
1218+
// INTERSECT ALL preserves duplicates and does not include DISTINCT
1219+
// Uses **LeftSemi** join (returns rows from left that exist in right)
1220+
// The requalification ensures no duplicate field name errors
1221+
assert_expected_plan(
1222+
"SELECT a FROM data WHERE a > 0 INTERSECT ALL SELECT a FROM data WHERE a < 5",
1223+
"LeftSemi Join: left.a = right.a\
1224+
\n SubqueryAlias: left\
1225+
\n Filter: data.a > Int64(0)\
1226+
\n TableScan: data projection=[a], partial_filters=[data.a > Int64(0)]\
1227+
\n SubqueryAlias: right\
1228+
\n Filter: data.a < Int64(5)\
1229+
\n TableScan: data projection=[a], partial_filters=[data.a < Int64(5)]",
1230+
true,
1231+
)
1232+
.await
1233+
}
1234+
1235+
#[tokio::test]
1236+
async fn self_referential_except_all() -> Result<()> {
1237+
// Test EXCEPT ALL with the same table on both sides
1238+
// EXCEPT ALL preserves duplicates and does not include DISTINCT
1239+
// Uses **LeftAnti** join (returns rows from left that don't exist in right)
1240+
// The requalification ensures no duplicate field name errors
1241+
assert_expected_plan(
1242+
"SELECT a FROM data WHERE a > 0 EXCEPT ALL SELECT a FROM data WHERE a < 5",
1243+
"LeftAnti Join: left.a = right.a\
1244+
\n SubqueryAlias: left\
1245+
\n Filter: data.a > Int64(0)\
1246+
\n TableScan: data projection=[a], partial_filters=[data.a > Int64(0)]\
1247+
\n SubqueryAlias: right\
1248+
\n Filter: data.a < Int64(5)\
1249+
\n TableScan: data projection=[a], partial_filters=[data.a < Int64(5)]",
1250+
true,
1251+
)
1252+
.await
1253+
}
1254+
11651255
#[tokio::test]
11661256
async fn simple_window_function() -> Result<()> {
11671257
roundtrip("SELECT RANK() OVER (PARTITION BY a ORDER BY b), d, sum(b) OVER (PARTITION BY a) FROM data;").await

0 commit comments

Comments
 (0)