From 1a1eb0a73331aa201d91d70784351c33461a6b72 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Fri, 10 Oct 2025 11:58:39 -0700 Subject: [PATCH 1/6] Fix for Multisearch and Append command Signed-off-by: Kai Huang # Conflicts: # docs/category.json --- .../sql/calcite/CalciteRelNodeVisitor.java | 16 ++-- .../opensearch/sql/calcite/SchemaUnifier.java | 33 ++++--- docs/category.json | 1 + docs/user/ppl/cmd/append.rst | 25 ++---- docs/user/ppl/cmd/multisearch.rst | 87 +++++-------------- .../remote/CalciteMultisearchCommandIT.java | 39 ++++----- .../remote/CalcitePPLAppendCommandIT.java | 42 +++++---- 7 files changed, 87 insertions(+), 156 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 7cec960b82a..cbdd0a05234 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -1809,18 +1809,16 @@ public RelNode visitMultisearch(Multisearch node, CalcitePlanContext context) { } /** - * Finds the timestamp field for multisearch ordering. + * Finds the @timestamp field for multisearch ordering. Only @timestamp field is used for + * timestamp interleaving. Other timestamp-like fields are ignored. * - * @param rowType The row type to search for timestamp fields - * @return The name of the timestamp field, or null if not found + * @param rowType The row type to search for @timestamp field + * @return "@timestamp" if the field exists, or null if not found */ private String findTimestampField(RelDataType rowType) { - String[] candidates = {"@timestamp", "_time", "timestamp", "time"}; - for (String fieldName : candidates) { - RelDataTypeField field = rowType.getField(fieldName, false, false); - if (field != null) { - return fieldName; - } + RelDataTypeField field = rowType.getField("@timestamp", false, false); + if (field != null) { + return "@timestamp"; } return null; } diff --git a/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java b/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java index 627d1de8dc4..a6a392a8297 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java +++ b/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java @@ -16,20 +16,20 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.validate.SqlValidatorUtil; /** - * Utility class for unifying schemas across multiple RelNodes with type conflict resolution. Uses - * the same strategy as append command - renames conflicting fields to avoid type conflicts. + * Utility class for unifying schemas across multiple RelNodes. Throws an exception when type + * conflicts are detected. */ public class SchemaUnifier { /** - * Builds a unified schema for multiple nodes with type conflict resolution. + * Builds a unified schema for multiple nodes. Throws an exception if type conflicts are detected. * * @param nodes List of RelNodes to unify schemas for * @param context Calcite plan context * @return List of projected RelNodes with unified schema + * @throws IllegalArgumentException if type conflicts are detected */ public static List buildUnifiedSchemaWithConflictResolution( List nodes, CalcitePlanContext context) { @@ -55,19 +55,15 @@ public static List buildUnifiedSchemaWithConflictResolution( projectedNodes.add(projectedNode); } - // Step 3: Unify names to handle type conflicts (this creates age0, age1, etc.) - List uniqueNames = - SqlValidatorUtil.uniquify(fieldNames, SqlValidatorUtil.EXPR_SUGGESTER, true); - - // Step 4: Re-project with unique names if needed - if (!uniqueNames.equals(fieldNames)) { - List renamedNodes = new ArrayList<>(); - for (RelNode node : projectedNodes) { - RelNode renamedNode = - context.relBuilder.push(node).project(context.relBuilder.fields(), uniqueNames).build(); - renamedNodes.add(renamedNode); + // Step 3: Check for type conflicts and throw exception if found + Set uniqueFieldNames = new HashSet<>(); + for (String fieldName : fieldNames) { + if (!uniqueFieldNames.add(fieldName)) { + throw new IllegalArgumentException( + String.format( + "Schema unification failed: field '%s' has conflicting types across subsearches", + fieldName)); } - return renamedNodes; } return projectedNodes; @@ -75,10 +71,11 @@ public static List buildUnifiedSchemaWithConflictResolution( /** * Builds a unified schema by merging fields from all nodes. Fields with the same name but - * different types are added as separate entries (which will be renamed during uniquification). + * different types are added as separate entries (will cause an exception to be thrown). * * @param nodes List of RelNodes to merge schemas from - * @return List of SchemaField representing the unified schema (may contain duplicate names) + * @return List of SchemaField representing the unified schema (may contain duplicate names if + * there are type conflicts) */ private static List buildUnifiedSchema(List nodes) { List schema = new ArrayList<>(); diff --git a/docs/category.json b/docs/category.json index 49529b08bdc..d9605598800 100644 --- a/docs/category.json +++ b/docs/category.json @@ -40,6 +40,7 @@ "user/ppl/cmd/rare.rst", "user/ppl/cmd/regex.rst", "user/ppl/cmd/rename.rst", + "user/ppl/cmd/multisearch.rst", "user/ppl/cmd/replace.rst", "user/ppl/cmd/rex.rst", "user/ppl/cmd/search.rst", diff --git a/docs/user/ppl/cmd/append.rst b/docs/user/ppl/cmd/append.rst index 982e0e33024..25303aeb87b 100644 --- a/docs/user/ppl/cmd/append.rst +++ b/docs/user/ppl/cmd/append.rst @@ -24,6 +24,11 @@ append * sub-search: mandatory. Executes PPL commands as a secondary search. +Limitations +=========== + +* **Schema Compatibility**: When fields with the same name exist between the main search and sub-search but have incompatible types, the query will fail with an error. To avoid type conflicts, ensure that fields with the same name have the same data type, or use different field names (e.g., by renaming with ``eval`` or using ``fields`` to select non-conflicting columns). + Example 1: Append rows from a count aggregation to existing search result =============================================================== @@ -64,23 +69,3 @@ PPL query:: | 101 | M | null | +-----+--------+-------+ -Example 3: Append rows with column type conflict -============================================= - -This example shows how column type conflicts are handled when appending results. Same name columns with different types will generate two different columns in appended result. - -PPL query:: - - os> source=accounts | stats sum(age) as sum by gender, state | sort -sum | head 5 | append [ source=accounts | stats sum(age) as sum by gender | eval sum = cast(sum as double) ]; - fetched rows / total rows = 6/6 - +------+--------+-------+-------+ - | sum | gender | state | sum0 | - |------+--------+-------+-------| - | 36 | M | TN | null | - | 33 | M | MD | null | - | 32 | M | IL | null | - | 28 | F | VA | null | - | null | F | null | 28.0 | - | null | M | null | 101.0 | - +------+--------+-------+-------+ - diff --git a/docs/user/ppl/cmd/multisearch.rst b/docs/user/ppl/cmd/multisearch.rst index 10820badc54..2bac577ef23 100644 --- a/docs/user/ppl/cmd/multisearch.rst +++ b/docs/user/ppl/cmd/multisearch.rst @@ -30,10 +30,6 @@ Description * **A/B Testing Analysis**: Combine results from different test groups for comparison * **Time-series Data Merging**: Interleave events from multiple sources based on timestamps -Version -======= -3.3.0 - Syntax ====== | multisearch ... @@ -59,7 +55,7 @@ Limitations =========== * **Minimum Subsearches**: At least two subsearches must be specified -* **Schema Compatibility**: When fields with the same name exist across subsearches but have incompatible types, the system automatically resolves conflicts by renaming the conflicting fields. The first occurrence retains the original name, while subsequent conflicting fields are renamed with a numeric suffix (e.g., ``age`` becomes ``age0``, ``age1``, etc.). This ensures all data is preserved while maintaining schema consistency. +* **Schema Compatibility**: When fields with the same name exist across subsearches but have incompatible types, the query will fail with an error. To avoid type conflicts, ensure that fields with the same name have the same data type across all subsearches, or use different field names (e.g., by renaming with ``eval`` or using ``fields`` to select non-conflicting columns). Usage ===== @@ -84,8 +80,8 @@ PPL query:: |-----------+-----+-----------| | Nanette | 28 | young | | Amber | 32 | adult | + | Dale | 33 | adult | | Hattie | 36 | adult | - | Dale | 37 | adult | +-----------+-----+-----------+ Example 2: Success Rate Pattern @@ -97,14 +93,14 @@ PPL query:: os> | multisearch [search source=accounts | where balance > 20000 | eval query_type = "high_balance" | fields firstname, balance, query_type] [search source=accounts | where balance > 0 AND balance <= 20000 | eval query_type = "regular" | fields firstname, balance, query_type] | sort balance desc; fetched rows / total rows = 4/4 - +-----------+---------+-------------+ - | firstname | balance | query_type | - |-----------+---------+-------------| - | Amber | 39225 | high_balance| - | Nanette | 32838 | high_balance| - | Hattie | 5686 | regular | - | Dale | 4180 | regular | - +-----------+---------+-------------+ + +-----------+---------+--------------+ + | firstname | balance | query_type | + |-----------+---------+--------------| + | Amber | 39225 | high_balance | + | Nanette | 32838 | high_balance | + | Hattie | 5686 | regular | + | Dale | 4180 | regular | + +-----------+---------+--------------+ Example 3: Timestamp Interleaving ================================== @@ -113,37 +109,19 @@ Combine time-series data from multiple sources with automatic timestamp-based or PPL query:: - os> | multisearch [search source=time_data | where category IN ("A", "B")] [search source=time_data2 | where category IN ("E", "F")] | head 5; + os> | multisearch [search source=time_data | where category IN ("A", "B")] [search source=time_data2 | where category IN ("E", "F")] | fields @timestamp, category, value, timestamp | head 5; fetched rows / total rows = 5/5 - +-------+---------------------+----------+-------+---------------------+ - | index | @timestamp | category | value | timestamp | - |-------+---------------------+----------+-------+---------------------| - | null | 2025-08-01 04:00:00 | E | 2001 | 2025-08-01 04:00:00 | - | null | 2025-08-01 03:47:41 | A | 8762 | 2025-08-01 03:47:41 | - | null | 2025-08-01 02:30:00 | F | 2002 | 2025-08-01 02:30:00 | - | null | 2025-08-01 01:14:11 | B | 9015 | 2025-08-01 01:14:11 | - | null | 2025-08-01 01:00:00 | E | 2003 | 2025-08-01 01:00:00 | - +-------+---------------------+----------+-------+---------------------+ - -Example 4: Handling Empty Results -================================== - -Multisearch gracefully handles cases where some subsearches return no results. - -PPL query:: - - os> | multisearch [search source=accounts | where age > 25 | fields firstname, age] [search source=accounts | where age > 200 | eval impossible = "yes" | fields firstname, age, impossible] | head 5; - fetched rows / total rows = 4/4 - +-----------+-----+------------+ - | firstname | age | impossible | - |-----------+-----+------------| - | Nanette | 28 | null | - | Amber | 32 | null | - | Hattie | 36 | null | - | Dale | 37 | null | - +-----------+-----+------------+ - -Example 5: Type Compatibility - Missing Fields + +---------------------+----------+-------+---------------------+ + | @timestamp | category | value | timestamp | + |---------------------+----------+-------+---------------------| + | 2025-08-01 04:00:00 | E | 2001 | 2025-08-01 04:00:00 | + | 2025-08-01 03:47:41 | A | 8762 | 2025-08-01 03:47:41 | + | 2025-08-01 02:30:00 | F | 2002 | 2025-08-01 02:30:00 | + | 2025-08-01 01:14:11 | B | 9015 | 2025-08-01 01:14:11 | + | 2025-08-01 01:00:00 | E | 2003 | 2025-08-01 01:00:00 | + +---------------------+----------+-------+---------------------+ + +Example 4: Type Compatibility - Missing Fields ================================================= Demonstrate how missing fields are handled with NULL insertion. @@ -157,26 +135,7 @@ PPL query:: |-----------+-----+------------| | Nanette | 28 | yes | | Amber | 32 | null | + | Dale | 33 | null | | Hattie | 36 | null | - | Dale | 37 | null | +-----------+-----+------------+ -Example 6: Type Conflict Resolution - Automatic Renaming -=========================================================== - -When the same field name has incompatible types across subsearches, the system automatically renames conflicting fields with numeric suffixes. - -PPL query:: - - os> | multisearch [search source=accounts | fields firstname, age, balance | head 2] [search source=locations | fields description, age, place_id | head 2]; - fetched rows / total rows = 4/4 - +-----------+-----+---------+------------------+------+----------+ - | firstname | age | balance | description | age0 | place_id | - |-----------+-----+---------+------------------+------+----------| - | Amber | 32 | 39225 | null | null | null | - | Hattie | 36 | 5686 | null | null | null | - | null | null| null | Central Park | old | 1001 | - | null | null| null | Times Square | modern| 1002 | - +-----------+-----+---------+------------------+------+----------+ - -In this example, the ``age`` field has type ``bigint`` in accounts but type ``string`` in locations. The system keeps the first occurrence as ``age`` (bigint) and renames the second occurrence to ``age0`` (string), preserving all data while avoiding type conflicts. diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java index 72772c1cd84..918a1103ce3 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java @@ -300,30 +300,23 @@ public void testMultisearchNullFillingAcrossIndices() throws IOException { } @Test - public void testMultisearchWithDirectTypeConflict() throws IOException { - JSONObject result = - executeQuery( - String.format( - "| multisearch " - + "[search source=%s | fields firstname, age, balance | head 2] " - + "[search source=%s | fields description, age, place_id | head 2]", - TEST_INDEX_ACCOUNT, TEST_INDEX_LOCATIONS_TYPE_CONFLICT)); - - verifySchema( - result, - schema("firstname", null, "string"), - schema("age", null, "bigint"), - schema("balance", null, "bigint"), - schema("description", null, "string"), - schema("age0", null, "string"), - schema("place_id", null, "int")); + public void testMultisearchWithDirectTypeConflict() { + Exception exception = + assertThrows( + ResponseException.class, + () -> + executeQuery( + String.format( + "| multisearch " + + "[search source=%s | fields firstname, age, balance | head 2] " + + "[search source=%s | fields description, age, place_id | head 2]", + TEST_INDEX_ACCOUNT, TEST_INDEX_LOCATIONS_TYPE_CONFLICT))); - verifyDataRows( - result, - rows("Amber", 32L, 39225L, null, null, null), - rows("Hattie", 36L, 5686L, null, null, null), - rows(null, null, null, "Central Park", "old", 1001), - rows(null, null, null, "Times Square", "modern", 1002)); + assertTrue( + "Error message should indicate type conflict", + exception + .getMessage() + .contains("Schema unification failed: field 'age' has conflicting types")); } @Test diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendCommandIT.java index bc1e11a908c..8f2371b9c8a 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendCommandIT.java @@ -19,6 +19,7 @@ import java.util.Locale; import org.json.JSONObject; import org.junit.Test; +import org.opensearch.client.ResponseException; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.ppl.PPLIntegTestCase; @@ -215,28 +216,25 @@ public void testAppendWithMergedColumn() throws IOException { } @Test - public void testAppendWithConflictTypeColumn() throws IOException { - JSONObject actual = - executeQuery( - String.format( - Locale.ROOT, - "source=%s | stats sum(age) as sum by gender | append [ source=%s | stats sum(age)" - + " as sum by state | sort sum | eval sum = cast(sum as double) ] | head 5", - TEST_INDEX_ACCOUNT, - TEST_INDEX_ACCOUNT)); - verifySchemaInOrder( - actual, - schema("sum", "bigint"), - schema("gender", "string"), - schema("state", "string"), - schema("sum0", "double")); - verifyDataRows( - actual, - rows(14947, "F", null, null), - rows(15224, "M", null, null), - rows(null, null, "NV", 369d), - rows(null, null, "NM", 412d), - rows(null, null, "AZ", 414d)); + public void testAppendWithConflictTypeColumn() { + Exception exception = + assertThrows( + ResponseException.class, + () -> + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum by gender | append [ source=%s | stats" + + " sum(age) as sum by state | sort sum | eval sum = cast(sum as" + + " double) ] | head 5", + TEST_INDEX_ACCOUNT, + TEST_INDEX_ACCOUNT))); + + assertTrue( + "Error message should indicate type conflict", + exception + .getMessage() + .contains("Schema unification failed: field 'sum' has conflicting types")); } @Test From 3fbe4dfe425ee217f11e3fe86d47b8f9884438e3 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Fri, 10 Oct 2025 13:46:18 -0700 Subject: [PATCH 2/6] fix tests Signed-off-by: Kai Huang --- .../remote/CalciteMultisearchCommandIT.java | 33 ++++--- .../sql/ppl/calcite/CalcitePPLAppendTest.java | 97 +++++++++---------- .../calcite/CalcitePPLMultisearchTest.java | 16 ++- 3 files changed, 70 insertions(+), 76 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java index 918a1103ce3..ff5e3fd9ca3 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java @@ -210,8 +210,8 @@ public void testMultisearchWithDifferentIndicesSchemaMerge() throws IOException executeQuery( String.format( "| multisearch [search source=%s | where age > 35 | fields account_number," - + " firstname, age, balance] [search source=%s | where age > 35 | fields" - + " account_number, balance, age] | stats count() as total_count", + + " firstname, balance] [search source=%s | where age > 35 | fields" + + " account_number, balance] | stats count() as total_count", TEST_INDEX_ACCOUNT, TEST_INDEX_BANK)); verifySchema(result, schema("total_count", null, "bigint")); @@ -345,18 +345,23 @@ public void testMultisearchCrossIndexFieldSelection() throws IOException { } @Test - public void testMultisearchTypeConflictWithStats() throws IOException { - JSONObject result = - executeQuery( - String.format( - "| multisearch " - + "[search source=%s | fields age] " - + "[search source=%s | fields age] " - + "| stats count() as total", - TEST_INDEX_ACCOUNT, TEST_INDEX_LOCATIONS_TYPE_CONFLICT)); - - verifySchema(result, schema("total", null, "bigint")); + public void testMultisearchTypeConflictWithStats() { + Exception exception = + assertThrows( + ResponseException.class, + () -> + executeQuery( + String.format( + "| multisearch " + + "[search source=%s | fields age] " + + "[search source=%s | fields age] " + + "| stats count() as total", + TEST_INDEX_ACCOUNT, TEST_INDEX_LOCATIONS_TYPE_CONFLICT))); - verifyDataRows(result, rows(1010L)); + assertTrue( + "Error message should indicate type conflict", + exception + .getMessage() + .contains("Schema unification failed: field 'age' has conflicting types")); } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendTest.java index 614ed2ec32d..c57d1d2f136 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendTest.java @@ -9,6 +9,7 @@ import java.util.List; import org.apache.calcite.rel.RelNode; import org.apache.calcite.test.CalciteAssert; +import org.junit.Assert; import org.junit.Test; public class CalcitePPLAppendTest extends CalcitePPLAbstractTest { @@ -71,15 +72,16 @@ public void testAppendEmptySearchCommand() { @Test public void testAppendNested() { String ppl = - "source=EMP | append [ | where DEPTNO = 10 | append [ source=EMP | where DEPTNO = 20 ] ]"; + "source=EMP | fields ENAME, SAL | append [ | append [ source=EMP | where DEPTNO = 20 ] ]"; RelNode root = getRelNode(ppl); String expectedLogical = "LogicalUnion(all=[true])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], EMPNO0=[null:SMALLINT])\n" + + " LogicalProject(ENAME=[$1], SAL=[$5], EMPNO=[null:SMALLINT], JOB=[null:VARCHAR(9)]," + + " MGR=[null:SMALLINT], HIREDATE=[null:DATE], COMM=[null:DECIMAL(7, 2)]," + + " DEPTNO=[null:TINYINT])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[$1], JOB=[$2], MGR=[$3]," - + " HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], EMPNO0=[$0])\n" + + " LogicalProject(ENAME=[$1], SAL=[$5], EMPNO=[$0], JOB=[$2], MGR=[$3]," + + " HIREDATE=[$4], COMM=[$6], DEPTNO=[$7])\n" + " LogicalUnion(all=[true])\n" + " LogicalValues(tuples=[[]])\n" + " LogicalFilter(condition=[=($7, 20)])\n" @@ -88,12 +90,12 @@ public void testAppendNested() { verifyResultCount(root, 19); // 14 original table rows + 5 filtered subquery rows String expectedSparkSql = - "SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`, CAST(NULL AS" - + " SMALLINT) `EMPNO0`\n" + "SELECT `ENAME`, `SAL`, CAST(NULL AS SMALLINT) `EMPNO`, CAST(NULL AS STRING) `JOB`," + + " CAST(NULL AS SMALLINT) `MGR`, CAST(NULL AS DATE) `HIREDATE`, CAST(NULL AS" + + " DECIMAL(7, 2)) `COMM`, CAST(NULL AS TINYINT) `DEPTNO`\n" + "FROM `scott`.`EMP`\n" + "UNION ALL\n" - + "SELECT CAST(NULL AS SMALLINT) `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`," - + " `COMM`, `DEPTNO`, `EMPNO` `EMPNO0`\n" + + "SELECT `ENAME`, `SAL`, `EMPNO`, `JOB`, `MGR`, `HIREDATE`, `COMM`, `DEPTNO`\n" + "FROM (SELECT *\n" + "FROM (VALUES (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)) `t` (`EMPNO`," + " `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`)\n" @@ -109,61 +111,63 @@ public void testAppendNested() { public void testAppendEmptySourceWithJoin() { List emptySourceWithEmptySourceJoinPPLs = Arrays.asList( - "source=EMP | append [ | where DEPTNO = 10 | join on ENAME = DNAME DEPT ]", - "source=EMP | append [ | where DEPTNO = 10 | cross join on ENAME = DNAME DEPT ]", - "source=EMP | append [ | where DEPTNO = 10 | left join on ENAME = DNAME DEPT ]", - "source=EMP | append [ | where DEPTNO = 10 | semi join on ENAME = DNAME DEPT ]", - "source=EMP | append [ | where DEPTNO = 10 | anti join on ENAME = DNAME DEPT ]"); + "source=EMP | fields EMPNO, ENAME, JOB | append [ | where DEPTNO = 10 | join on ENAME" + + " = DNAME DEPT ]", + "source=EMP | fields EMPNO, ENAME, JOB | append [ | where DEPTNO = 10 | cross join on" + + " ENAME = DNAME DEPT ]", + "source=EMP | fields EMPNO, ENAME, JOB | append [ | where DEPTNO = 10 | left join on" + + " ENAME = DNAME DEPT ]", + "source=EMP | fields EMPNO, ENAME, JOB | append [ | where DEPTNO = 10 | semi join on" + + " ENAME = DNAME DEPT ]", + "source=EMP | fields EMPNO, ENAME, JOB | append [ | where DEPTNO = 10 | anti join on" + + " ENAME = DNAME DEPT ]"); for (String ppl : emptySourceWithEmptySourceJoinPPLs) { RelNode root = getRelNode(ppl); String expectedLogical = "LogicalUnion(all=[true])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + " LogicalValues(tuples=[[]])\n"; verifyLogical(root, expectedLogical); verifyResultCount(root, 14); String expectedSparkSql = - "SELECT *\n" + "SELECT `EMPNO`, `ENAME`, `JOB`\n" + "FROM `scott`.`EMP`\n" + "UNION ALL\n" + "SELECT *\n" - + "FROM (VALUES (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)) `t` (`EMPNO`," - + " `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`)\n" + + "FROM (VALUES (NULL, NULL, NULL)) `t` (`EMPNO`, `ENAME`, `JOB`)\n" + "WHERE 1 = 0"; verifyPPLToSparkSQL(root, expectedSparkSql); } List emptySourceWithRightOrFullJoinPPLs = Arrays.asList( - "source=EMP | append [ | where DEPTNO = 10 | right join on ENAME = DNAME DEPT ]", - "source=EMP | append [ | where DEPTNO = 10 | full join on ENAME = DNAME DEPT ]"); + "source=EMP | fields EMPNO, ENAME, JOB | append [ | where DEPTNO = 10 | right join on" + + " ENAME = DNAME DEPT ]", + "source=EMP | fields EMPNO, ENAME, JOB | append [ | where DEPTNO = 10 | full join on" + + " ENAME = DNAME DEPT ]"); for (String ppl : emptySourceWithRightOrFullJoinPPLs) { RelNode root = getRelNode(ppl); String expectedLogical = "LogicalUnion(all=[true])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4]," - + " SAL=[$5], COMM=[$6], DEPTNO=[$7], DEPTNO0=[null:TINYINT]," + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], DEPTNO=[null:TINYINT]," + " DNAME=[null:VARCHAR(14)], LOC=[null:VARCHAR(13)])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)]," - + " JOB=[null:VARCHAR(9)], MGR=[null:SMALLINT], HIREDATE=[null:DATE]," - + " SAL=[null:DECIMAL(7, 2)], COMM=[null:DECIMAL(7, 2)], DEPTNO=[null:TINYINT]," - + " DEPTNO0=[$0], DNAME=[$1], LOC=[$2])\n" + + " JOB=[null:VARCHAR(9)], DEPTNO=[$0], DNAME=[$1], LOC=[$2])\n" + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = - "SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`, CAST(NULL AS" - + " TINYINT) `DEPTNO0`, CAST(NULL AS STRING) `DNAME`, CAST(NULL AS STRING) `LOC`\n" + "SELECT `EMPNO`, `ENAME`, `JOB`, CAST(NULL AS TINYINT) `DEPTNO`, CAST(NULL AS STRING)" + + " `DNAME`, CAST(NULL AS STRING) `LOC`\n" + "FROM `scott`.`EMP`\n" + "UNION ALL\n" + "SELECT CAST(NULL AS SMALLINT) `EMPNO`, CAST(NULL AS STRING) `ENAME`, CAST(NULL AS" - + " STRING) `JOB`, CAST(NULL AS SMALLINT) `MGR`, CAST(NULL AS DATE) `HIREDATE`," - + " CAST(NULL AS DECIMAL(7, 2)) `SAL`, CAST(NULL AS DECIMAL(7, 2)) `COMM`, CAST(NULL" - + " AS TINYINT) `DEPTNO`, `DEPTNO` `DEPTNO0`, `DNAME`, `LOC`\n" + + " STRING) `JOB`, `DEPTNO`, `DNAME`, `LOC`\n" + "FROM `scott`.`DEPT`"; verifyPPLToSparkSQL(root, expectedSparkSql); } @@ -172,15 +176,15 @@ public void testAppendEmptySourceWithJoin() { @Test public void testAppendDifferentIndex() { String ppl = - "source=EMP | fields EMPNO, DEPTNO | append [ source=DEPT | fields DEPTNO, DNAME | where" + "source=EMP | fields EMPNO, ENAME | append [ source=DEPT | fields DEPTNO, DNAME | where" + " DEPTNO = 20 ]"; RelNode root = getRelNode(ppl); String expectedLogical = "LogicalUnion(all=[true])\n" - + " LogicalProject(EMPNO=[$0], DEPTNO=[$7], DEPTNO0=[null:TINYINT]," + + " LogicalProject(EMPNO=[$0], ENAME=[$1], DEPTNO=[null:TINYINT]," + " DNAME=[null:VARCHAR(14)])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalProject(EMPNO=[null:SMALLINT], DEPTNO=[null:TINYINT], DEPTNO0=[$0]," + + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)], DEPTNO=[$0]," + " DNAME=[$1])\n" + " LogicalFilter(condition=[=($0, 20)])\n" + " LogicalProject(DEPTNO=[$0], DNAME=[$1])\n" @@ -188,11 +192,11 @@ public void testAppendDifferentIndex() { verifyLogical(root, expectedLogical); String expectedSparkSql = - "SELECT `EMPNO`, `DEPTNO`, CAST(NULL AS TINYINT) `DEPTNO0`, CAST(NULL AS STRING) `DNAME`\n" + "SELECT `EMPNO`, `ENAME`, CAST(NULL AS TINYINT) `DEPTNO`, CAST(NULL AS STRING) `DNAME`\n" + "FROM `scott`.`EMP`\n" + "UNION ALL\n" - + "SELECT CAST(NULL AS SMALLINT) `EMPNO`, CAST(NULL AS TINYINT) `DEPTNO`, `DEPTNO`" - + " `DEPTNO0`, `DNAME`\n" + + "SELECT CAST(NULL AS SMALLINT) `EMPNO`, CAST(NULL AS STRING) `ENAME`, `DEPTNO`," + + " `DNAME`\n" + "FROM (SELECT `DEPTNO`, `DNAME`\n" + "FROM `scott`.`DEPT`) `t0`\n" + "WHERE `DEPTNO` = 20"; @@ -227,22 +231,9 @@ public void testAppendWithMergedColumns() { public void testAppendWithConflictTypeColumn() { String ppl = "source=EMP | fields DEPTNO | append [ source=EMP | fields DEPTNO | eval DEPTNO = 20 ]"; - RelNode root = getRelNode(ppl); - String expectedLogical = - "LogicalUnion(all=[true])\n" - + " LogicalProject(DEPTNO=[$7], DEPTNO0=[null:INTEGER])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n" - + " LogicalProject(DEPTNO=[null:TINYINT], DEPTNO0=[20])\n" - + " LogicalTableScan(table=[[scott, EMP]])\n"; - verifyLogical(root, expectedLogical); - verifyResultCount(root, 28); - - String expectedSparkSql = - "SELECT `DEPTNO`, CAST(NULL AS INTEGER) `DEPTNO0`\n" - + "FROM `scott`.`EMP`\n" - + "UNION ALL\n" - + "SELECT CAST(NULL AS TINYINT) `DEPTNO`, 20 `DEPTNO0`\n" - + "FROM `scott`.`EMP`"; - verifyPPLToSparkSQL(root, expectedSparkSql); + Exception exception = + Assert.assertThrows(IllegalArgumentException.class, () -> getRelNode(ppl)); + verifyErrorMessageContains( + exception, "Schema unification failed: field 'DEPTNO' has conflicting types"); } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMultisearchTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMultisearchTest.java index e69030753f2..8746fe846e5 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMultisearchTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMultisearchTest.java @@ -142,30 +142,28 @@ public void testMultisearchCrossIndices() { // Test multisearch with different tables (indices) String ppl = "| multisearch [search source=EMP | where DEPTNO = 10 | fields EMPNO, ENAME," - + " DEPTNO] [search source=DEPT | where DEPTNO = 10 | fields DEPTNO, DNAME | eval EMPNO" - + " = DEPTNO, ENAME = DNAME]"; + + " JOB] [search source=DEPT | where DEPTNO = 10 | fields DEPTNO, DNAME, LOC]"; RelNode root = getRelNode(ppl); String expectedLogical = "LogicalUnion(all=[true])\n" - + " LogicalProject(EMPNO=[$0], ENAME=[$1], DEPTNO=[$7], DEPTNO0=[null:TINYINT]," - + " DNAME=[null:VARCHAR(14)], EMPNO0=[null:TINYINT], ENAME0=[null:VARCHAR(14)])\n" + + " LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], DEPTNO=[null:TINYINT]," + + " DNAME=[null:VARCHAR(14)], LOC=[null:VARCHAR(13)])\n" + " LogicalFilter(condition=[=($7, 10)])\n" + " LogicalTableScan(table=[[scott, EMP]])\n" + " LogicalProject(EMPNO=[null:SMALLINT], ENAME=[null:VARCHAR(10)]," - + " DEPTNO=[null:TINYINT], DEPTNO0=[$0], DNAME=[$1], EMPNO0=[$0], ENAME0=[$1])\n" + + " JOB=[null:VARCHAR(9)], DEPTNO=[$0], DNAME=[$1], LOC=[$2])\n" + " LogicalFilter(condition=[=($0, 10)])\n" + " LogicalTableScan(table=[[scott, DEPT]])\n"; verifyLogical(root, expectedLogical); String expectedSparkSql = - "SELECT `EMPNO`, `ENAME`, `DEPTNO`, CAST(NULL AS TINYINT) `DEPTNO0`, CAST(NULL AS STRING)" - + " `DNAME`, CAST(NULL AS TINYINT) `EMPNO0`, CAST(NULL AS STRING) `ENAME0`\n" + "SELECT `EMPNO`, `ENAME`, `JOB`, CAST(NULL AS TINYINT) `DEPTNO`, CAST(NULL AS STRING)" + + " `DNAME`, CAST(NULL AS STRING) `LOC`\n" + "FROM `scott`.`EMP`\n" + "WHERE `DEPTNO` = 10\n" + "UNION ALL\n" + "SELECT CAST(NULL AS SMALLINT) `EMPNO`, CAST(NULL AS STRING) `ENAME`, CAST(NULL AS" - + " TINYINT) `DEPTNO`, `DEPTNO` `DEPTNO0`, `DNAME`, `DEPTNO` `EMPNO0`, `DNAME`" - + " `ENAME0`\n" + + " STRING) `JOB`, `DEPTNO`, `DNAME`, `LOC`\n" + "FROM `scott`.`DEPT`\n" + "WHERE `DEPTNO` = 10"; verifyPPLToSparkSQL(root, expectedSparkSql); From 01b0618c1f31caae64a2cd4922c8544c3114afab Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Fri, 10 Oct 2025 14:17:32 -0700 Subject: [PATCH 3/6] fix test Signed-off-by: Kai Huang --- .../sql/calcite/remote/CalcitePPLAppendCommandIT.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendCommandIT.java index 8f2371b9c8a..148e274aa21 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendCommandIT.java @@ -243,7 +243,7 @@ public void testAppendSchemaMergeWithTimestampUDT() throws IOException { executeQuery( String.format( Locale.ROOT, - "source=%s | fields account_number, age | append [ source=%s | fields" + "source=%s | fields account_number, firstname | append [ source=%s | fields" + " account_number, age, birthdate ] | where isnotnull(birthdate) and" + " account_number > 30", TEST_INDEX_ACCOUNT, @@ -251,8 +251,8 @@ public void testAppendSchemaMergeWithTimestampUDT() throws IOException { verifySchemaInOrder( actual, schema("account_number", "bigint"), - schema("age", "bigint"), - schema("age0", "int"), + schema("firstname", "string"), + schema("age", "int"), schema("birthdate", "string")); verifyDataRows(actual, rows(32, null, 34, "2018-08-11 00:00:00")); } From 31931cf4bf1134eb2d1459109c91d8a175bf59bb Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Fri, 10 Oct 2025 14:59:24 -0700 Subject: [PATCH 4/6] remove error location Signed-off-by: Kai Huang --- .../opensearch/sql/calcite/SchemaUnifier.java | 41 ++++++++----------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java b/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java index a6a392a8297..1dbf779f7f3 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java +++ b/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java @@ -7,10 +7,8 @@ import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; @@ -41,7 +39,7 @@ public static List buildUnifiedSchemaWithConflictResolution( return nodes; } - // Step 1: Build the unified schema by processing all nodes + // Step 1: Build the unified schema by processing all nodes (throws on conflict) List unifiedSchema = buildUnifiedSchema(nodes); // Step 2: Create projections for each node to align with unified schema @@ -55,44 +53,37 @@ public static List buildUnifiedSchemaWithConflictResolution( projectedNodes.add(projectedNode); } - // Step 3: Check for type conflicts and throw exception if found - Set uniqueFieldNames = new HashSet<>(); - for (String fieldName : fieldNames) { - if (!uniqueFieldNames.add(fieldName)) { - throw new IllegalArgumentException( - String.format( - "Schema unification failed: field '%s' has conflicting types across subsearches", - fieldName)); - } - } - return projectedNodes; } /** - * Builds a unified schema by merging fields from all nodes. Fields with the same name but - * different types are added as separate entries (will cause an exception to be thrown). + * Builds a unified schema by merging fields from all nodes. Throws an exception if fields with + * the same name have different types. * * @param nodes List of RelNodes to merge schemas from - * @return List of SchemaField representing the unified schema (may contain duplicate names if - * there are type conflicts) + * @return List of SchemaField representing the unified schema + * @throws IllegalArgumentException if type conflicts are detected */ private static List buildUnifiedSchema(List nodes) { List schema = new ArrayList<>(); - Map> seenFields = new HashMap<>(); + Map seenFields = new HashMap<>(); for (RelNode node : nodes) { for (RelDataTypeField field : node.getRowType().getFieldList()) { String fieldName = field.getName(); RelDataType fieldType = field.getType(); - // Track which (name, type) combinations we've seen - Set typesForName = seenFields.computeIfAbsent(fieldName, k -> new HashSet<>()); - - if (!typesForName.contains(fieldType)) { - // New field or same name with different type - add to schema + RelDataType existingType = seenFields.get(fieldName); + if (existingType == null) { + // New field - add to schema schema.add(new SchemaField(fieldName, fieldType)); - typesForName.add(fieldType); + seenFields.put(fieldName, fieldType); + } else if (!existingType.equals(fieldType)) { + // Same field name but different type - throw exception + throw new IllegalArgumentException( + String.format( + "Schema unification failed: field '%s' has conflicting types across subsearches", + fieldName)); } // If we've seen this exact (name, type) combination, skip it } From ae5dbb1cd3afeb93ef6f87e0380083d685a7991a Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 15 Oct 2025 13:19:34 -0700 Subject: [PATCH 5/6] Allow same SqlTypeName but with different nullability to be merged Signed-off-by: Kai Huang --- .../java/org/opensearch/sql/calcite/SchemaUnifier.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java b/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java index 1dbf779f7f3..4cd3a1c9f04 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java +++ b/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java @@ -78,7 +78,7 @@ private static List buildUnifiedSchema(List nodes) { // New field - add to schema schema.add(new SchemaField(fieldName, fieldType)); seenFields.put(fieldName, fieldType); - } else if (!existingType.equals(fieldType)) { + } else if (!areTypesCompatible(existingType, fieldType)) { // Same field name but different type - throw exception throw new IllegalArgumentException( String.format( @@ -92,6 +92,10 @@ private static List buildUnifiedSchema(List nodes) { return schema; } + private static boolean areTypesCompatible(RelDataType type1, RelDataType type2) { + return type1.getSqlTypeName() != null && type1.getSqlTypeName().equals(type2.getSqlTypeName()); + } + /** * Builds a projection for a node to align with the unified schema. For each field in the unified * schema: - If the node has a matching field with the same type, use it - Otherwise, project NULL @@ -113,8 +117,8 @@ private static List buildProjectionForNode( RelDataType expectedType = schemaField.getType(); RelDataTypeField nodeField = nodeFieldMap.get(fieldName); - if (nodeField != null && nodeField.getType().equals(expectedType)) { - // Field exists with matching type - use it + if (nodeField != null && areTypesCompatible(nodeField.getType(), expectedType)) { + // Field exists with compatible type - use it projection.add(context.rexBuilder.makeInputRef(node, nodeField.getIndex())); } else { // Field missing or type mismatch - project NULL From 527fb1c102c24adf32c94b8c9b3cb614fc68e1d6 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Tue, 28 Oct 2025 11:21:21 -0700 Subject: [PATCH 6/6] Update error message Signed-off-by: Kai Huang --- .../main/java/org/opensearch/sql/calcite/SchemaUnifier.java | 4 ++-- .../sql/calcite/remote/CalciteMultisearchCommandIT.java | 4 ++-- .../sql/calcite/remote/CalcitePPLAppendCommandIT.java | 2 +- .../org/opensearch/sql/ppl/calcite/CalcitePPLAppendTest.java | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java b/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java index 4cd3a1c9f04..05380ce8c48 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java +++ b/core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java @@ -82,8 +82,8 @@ private static List buildUnifiedSchema(List nodes) { // Same field name but different type - throw exception throw new IllegalArgumentException( String.format( - "Schema unification failed: field '%s' has conflicting types across subsearches", - fieldName)); + "Unable to process column '%s' due to incompatible types: '%s' and '%s'", + fieldName, existingType.getSqlTypeName(), fieldType.getSqlTypeName())); } // If we've seen this exact (name, type) combination, skip it } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java index ff5e3fd9ca3..393b0a4a501 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteMultisearchCommandIT.java @@ -316,7 +316,7 @@ public void testMultisearchWithDirectTypeConflict() { "Error message should indicate type conflict", exception .getMessage() - .contains("Schema unification failed: field 'age' has conflicting types")); + .contains("Unable to process column 'age' due to incompatible types:")); } @Test @@ -362,6 +362,6 @@ public void testMultisearchTypeConflictWithStats() { "Error message should indicate type conflict", exception .getMessage() - .contains("Schema unification failed: field 'age' has conflicting types")); + .contains("Unable to process column 'age' due to incompatible types:")); } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendCommandIT.java index 148e274aa21..d01ddfb2a44 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendCommandIT.java @@ -234,7 +234,7 @@ public void testAppendWithConflictTypeColumn() { "Error message should indicate type conflict", exception .getMessage() - .contains("Schema unification failed: field 'sum' has conflicting types")); + .contains("Unable to process column 'sum' due to incompatible types:")); } @Test diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendTest.java index c57d1d2f136..a163af186d5 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendTest.java @@ -234,6 +234,6 @@ public void testAppendWithConflictTypeColumn() { Exception exception = Assert.assertThrows(IllegalArgumentException.class, () -> getRelNode(ppl)); verifyErrorMessageContains( - exception, "Schema unification failed: field 'DEPTNO' has conflicting types"); + exception, "Unable to process column 'DEPTNO' due to incompatible types:"); } }