Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1809,18 +1809,16 @@ public RelNode visitMultisearch(Multisearch node, CalcitePlanContext context) {
}

/**
* Finds the timestamp field for multisearch ordering.
* Finds the @timestamp field for multisearch ordering. Only @timestamp field is used for
* timestamp interleaving. Other timestamp-like fields are ignored.
*
* @param rowType The row type to search for timestamp fields
* @return The name of the timestamp field, or null if not found
* @param rowType The row type to search for @timestamp field
* @return "@timestamp" if the field exists, or null if not found
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (non-blocking): Should we think about indices with different timestamp field names?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently it is set as a limitation: we want to only support @timestamp

*/
private String findTimestampField(RelDataType rowType) {
String[] candidates = {"@timestamp", "_time", "timestamp", "time"};
for (String fieldName : candidates) {
RelDataTypeField field = rowType.getField(fieldName, false, false);
if (field != null) {
return fieldName;
}
RelDataTypeField field = rowType.getField("@timestamp", false, false);
if (field != null) {
return "@timestamp";
}
return null;
}
Expand Down
60 changes: 26 additions & 34 deletions core/src/main/java/org/opensearch/sql/calcite/SchemaUnifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,27 @@

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.validate.SqlValidatorUtil;

/**
* Utility class for unifying schemas across multiple RelNodes with type conflict resolution. Uses
* the same strategy as append command - renames conflicting fields to avoid type conflicts.
* Utility class for unifying schemas across multiple RelNodes. Throws an exception when type
* conflicts are detected.
*/
public class SchemaUnifier {

/**
* Builds a unified schema for multiple nodes with type conflict resolution.
* Builds a unified schema for multiple nodes. Throws an exception if type conflicts are detected.
*
* @param nodes List of RelNodes to unify schemas for
* @param context Calcite plan context
* @return List of projected RelNodes with unified schema
* @throws IllegalArgumentException if type conflicts are detected
*/
public static List<RelNode> buildUnifiedSchemaWithConflictResolution(
List<RelNode> nodes, CalcitePlanContext context) {
Expand All @@ -41,7 +39,7 @@ public static List<RelNode> buildUnifiedSchemaWithConflictResolution(
return nodes;
}

// Step 1: Build the unified schema by processing all nodes
// Step 1: Build the unified schema by processing all nodes (throws on conflict)
List<SchemaField> unifiedSchema = buildUnifiedSchema(nodes);

// Step 2: Create projections for each node to align with unified schema
Expand All @@ -55,47 +53,37 @@ public static List<RelNode> buildUnifiedSchemaWithConflictResolution(
projectedNodes.add(projectedNode);
}

// Step 3: Unify names to handle type conflicts (this creates age0, age1, etc.)
List<String> uniqueNames =
SqlValidatorUtil.uniquify(fieldNames, SqlValidatorUtil.EXPR_SUGGESTER, true);

// Step 4: Re-project with unique names if needed
if (!uniqueNames.equals(fieldNames)) {
List<RelNode> renamedNodes = new ArrayList<>();
for (RelNode node : projectedNodes) {
RelNode renamedNode =
context.relBuilder.push(node).project(context.relBuilder.fields(), uniqueNames).build();
renamedNodes.add(renamedNode);
}
return renamedNodes;
}

return projectedNodes;
}

/**
* Builds a unified schema by merging fields from all nodes. Fields with the same name but
* different types are added as separate entries (which will be renamed during uniquification).
* Builds a unified schema by merging fields from all nodes. Throws an exception if fields with
* the same name have different types.
*
* @param nodes List of RelNodes to merge schemas from
* @return List of SchemaField representing the unified schema (may contain duplicate names)
* @return List of SchemaField representing the unified schema
* @throws IllegalArgumentException if type conflicts are detected
*/
private static List<SchemaField> buildUnifiedSchema(List<RelNode> nodes) {
List<SchemaField> schema = new ArrayList<>();
Map<String, Set<RelDataType>> seenFields = new HashMap<>();
Map<String, RelDataType> seenFields = new HashMap<>();

for (RelNode node : nodes) {
for (RelDataTypeField field : node.getRowType().getFieldList()) {
String fieldName = field.getName();
RelDataType fieldType = field.getType();

// Track which (name, type) combinations we've seen
Set<RelDataType> typesForName = seenFields.computeIfAbsent(fieldName, k -> new HashSet<>());

if (!typesForName.contains(fieldType)) {
// New field or same name with different type - add to schema
RelDataType existingType = seenFields.get(fieldName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We recently find another issue of type conflicts here. RelDataType evaluates the hash equality by its digested string as well. For example, "INTEGER" is not equal to "INTEGER NOT NULL". A quick fix would be aligning the same SqlType to be nullable. Ideally it won't affect the data type resolution while execution. cc @xinyual

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can consider to allow same SqlTypeName but with different nullability to be merged here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I have updated the implementation to allow same SqlTypeName but with different nullability to be merged

if (existingType == null) {
// New field - add to schema
schema.add(new SchemaField(fieldName, fieldType));
typesForName.add(fieldType);
seenFields.put(fieldName, fieldType);
} else if (!areTypesCompatible(existingType, fieldType)) {
// Same field name but different type - throw exception
throw new IllegalArgumentException(
String.format(
"Unable to process column '%s' due to incompatible types: '%s' and '%s'",
fieldName, existingType.getSqlTypeName(), fieldType.getSqlTypeName()));
}
// If we've seen this exact (name, type) combination, skip it
}
Expand All @@ -104,6 +92,10 @@ private static List<SchemaField> buildUnifiedSchema(List<RelNode> nodes) {
return schema;
}

private static boolean areTypesCompatible(RelDataType type1, RelDataType type2) {
Copy link
Contributor

@songkant-aws songkant-aws Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another concern of using this method to allow type merge. If index A's RelDataType 'INTEGER NOT NULL' is put to unified schema, index B's same name RelDataType 'INTEGER' will be merged silently. Index B's column values could contain NULL values.

The generated code could ignore the null check because the merged unified schema has 'iNTEGER NOT NULL'. It will probably throw NPE when merging index B's NULL values. We could write some query to double check if we can reproduce this scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried some queries in my local test. Haven't seen such NPE errors yet. Not sure if there is edge case. But for now we can leave it there until needed fix.

return type1.getSqlTypeName() != null && type1.getSqlTypeName().equals(type2.getSqlTypeName());
}

/**
* Builds a projection for a node to align with the unified schema. For each field in the unified
* schema: - If the node has a matching field with the same type, use it - Otherwise, project NULL
Expand All @@ -125,8 +117,8 @@ private static List<RexNode> buildProjectionForNode(
RelDataType expectedType = schemaField.getType();
RelDataTypeField nodeField = nodeFieldMap.get(fieldName);

if (nodeField != null && nodeField.getType().equals(expectedType)) {
// Field exists with matching type - use it
if (nodeField != null && areTypesCompatible(nodeField.getType(), expectedType)) {
// Field exists with compatible type - use it
projection.add(context.rexBuilder.makeInputRef(node, nodeField.getIndex()));
} else {
// Field missing or type mismatch - project NULL
Expand Down
1 change: 1 addition & 0 deletions docs/category.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"user/ppl/cmd/rare.rst",
"user/ppl/cmd/regex.rst",
"user/ppl/cmd/rename.rst",
"user/ppl/cmd/multisearch.rst",
"user/ppl/cmd/replace.rst",
"user/ppl/cmd/rex.rst",
"user/ppl/cmd/search.rst",
Expand Down
25 changes: 5 additions & 20 deletions docs/user/ppl/cmd/append.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ append <sub-search>

* sub-search: mandatory. Executes PPL commands as a secondary search.

Limitations
===========

* **Schema Compatibility**: When fields with the same name exist between the main search and sub-search but have incompatible types, the query will fail with an error. To avoid type conflicts, ensure that fields with the same name have the same data type, or use different field names (e.g., by renaming with ``eval`` or using ``fields`` to select non-conflicting columns).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understand the intention here. Strong schema engine like SQL restricts the type to be the same. Some weak schema engine resolves types at runtime and doesn't care the data type. I think it's not easy to make it compatible.

Not sure what's better user experience and customer expectation here. Does user accept this behavior or expect to union anyway? cc @LantaoJin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are planning to enable permissive mode in the future: #4349 to support schema merging with type conflicts, in order to avoid breaking changes in the future, we are marking this as a limitation now instead of using a workaround. cc @penghuo

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks for the change.


Example 1: Append rows from a count aggregation to existing search result
===============================================================

Expand Down Expand Up @@ -64,23 +69,3 @@ PPL query::
| 101 | M | null |
+-----+--------+-------+

Example 3: Append rows with column type conflict
=============================================

This example shows how column type conflicts are handled when appending results. Same name columns with different types will generate two different columns in appended result.

PPL query::

os> source=accounts | stats sum(age) as sum by gender, state | sort -sum | head 5 | append [ source=accounts | stats sum(age) as sum by gender | eval sum = cast(sum as double) ];
fetched rows / total rows = 6/6
+------+--------+-------+-------+
| sum | gender | state | sum0 |
|------+--------+-------+-------|
| 36 | M | TN | null |
| 33 | M | MD | null |
| 32 | M | IL | null |
| 28 | F | VA | null |
| null | F | null | 28.0 |
| null | M | null | 101.0 |
+------+--------+-------+-------+

87 changes: 23 additions & 64 deletions docs/user/ppl/cmd/multisearch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@ Description
* **A/B Testing Analysis**: Combine results from different test groups for comparison
* **Time-series Data Merging**: Interleave events from multiple sources based on timestamps

Version
=======
3.3.0

Syntax
======
| multisearch <subsearch1> <subsearch2> <subsearch3> ...
Expand All @@ -59,7 +55,7 @@ Limitations
===========

* **Minimum Subsearches**: At least two subsearches must be specified
* **Schema Compatibility**: When fields with the same name exist across subsearches but have incompatible types, the system automatically resolves conflicts by renaming the conflicting fields. The first occurrence retains the original name, while subsequent conflicting fields are renamed with a numeric suffix (e.g., ``age`` becomes ``age0``, ``age1``, etc.). This ensures all data is preserved while maintaining schema consistency.
* **Schema Compatibility**: When fields with the same name exist across subsearches but have incompatible types, the query will fail with an error. To avoid type conflicts, ensure that fields with the same name have the same data type across all subsearches, or use different field names (e.g., by renaming with ``eval`` or using ``fields`` to select non-conflicting columns).

Usage
=====
Expand All @@ -84,8 +80,8 @@ PPL query::
|-----------+-----+-----------|
| Nanette | 28 | young |
| Amber | 32 | adult |
| Dale | 33 | adult |
| Hattie | 36 | adult |
| Dale | 37 | adult |
+-----------+-----+-----------+

Example 2: Success Rate Pattern
Expand All @@ -97,14 +93,14 @@ PPL query::

os> | multisearch [search source=accounts | where balance > 20000 | eval query_type = "high_balance" | fields firstname, balance, query_type] [search source=accounts | where balance > 0 AND balance <= 20000 | eval query_type = "regular" | fields firstname, balance, query_type] | sort balance desc;
fetched rows / total rows = 4/4
+-----------+---------+-------------+
| firstname | balance | query_type |
|-----------+---------+-------------|
| Amber | 39225 | high_balance|
| Nanette | 32838 | high_balance|
| Hattie | 5686 | regular |
| Dale | 4180 | regular |
+-----------+---------+-------------+
+-----------+---------+--------------+
| firstname | balance | query_type |
|-----------+---------+--------------|
| Amber | 39225 | high_balance |
| Nanette | 32838 | high_balance |
| Hattie | 5686 | regular |
| Dale | 4180 | regular |
+-----------+---------+--------------+

Example 3: Timestamp Interleaving
==================================
Expand All @@ -113,37 +109,19 @@ Combine time-series data from multiple sources with automatic timestamp-based or

PPL query::

os> | multisearch [search source=time_data | where category IN ("A", "B")] [search source=time_data2 | where category IN ("E", "F")] | head 5;
os> | multisearch [search source=time_data | where category IN ("A", "B")] [search source=time_data2 | where category IN ("E", "F")] | fields @timestamp, category, value, timestamp | head 5;
fetched rows / total rows = 5/5
+-------+---------------------+----------+-------+---------------------+
| index | @timestamp | category | value | timestamp |
|-------+---------------------+----------+-------+---------------------|
| null | 2025-08-01 04:00:00 | E | 2001 | 2025-08-01 04:00:00 |
| null | 2025-08-01 03:47:41 | A | 8762 | 2025-08-01 03:47:41 |
| null | 2025-08-01 02:30:00 | F | 2002 | 2025-08-01 02:30:00 |
| null | 2025-08-01 01:14:11 | B | 9015 | 2025-08-01 01:14:11 |
| null | 2025-08-01 01:00:00 | E | 2003 | 2025-08-01 01:00:00 |
+-------+---------------------+----------+-------+---------------------+

Example 4: Handling Empty Results
==================================

Multisearch gracefully handles cases where some subsearches return no results.

PPL query::

os> | multisearch [search source=accounts | where age > 25 | fields firstname, age] [search source=accounts | where age > 200 | eval impossible = "yes" | fields firstname, age, impossible] | head 5;
fetched rows / total rows = 4/4
+-----------+-----+------------+
| firstname | age | impossible |
|-----------+-----+------------|
| Nanette | 28 | null |
| Amber | 32 | null |
| Hattie | 36 | null |
| Dale | 37 | null |
+-----------+-----+------------+

Example 5: Type Compatibility - Missing Fields
+---------------------+----------+-------+---------------------+
| @timestamp | category | value | timestamp |
|---------------------+----------+-------+---------------------|
| 2025-08-01 04:00:00 | E | 2001 | 2025-08-01 04:00:00 |
| 2025-08-01 03:47:41 | A | 8762 | 2025-08-01 03:47:41 |
| 2025-08-01 02:30:00 | F | 2002 | 2025-08-01 02:30:00 |
| 2025-08-01 01:14:11 | B | 9015 | 2025-08-01 01:14:11 |
| 2025-08-01 01:00:00 | E | 2003 | 2025-08-01 01:00:00 |
+---------------------+----------+-------+---------------------+

Example 4: Type Compatibility - Missing Fields
=================================================

Demonstrate how missing fields are handled with NULL insertion.
Expand All @@ -157,26 +135,7 @@ PPL query::
|-----------+-----+------------|
| Nanette | 28 | yes |
| Amber | 32 | null |
| Dale | 33 | null |
| Hattie | 36 | null |
| Dale | 37 | null |
+-----------+-----+------------+

Example 6: Type Conflict Resolution - Automatic Renaming
===========================================================

When the same field name has incompatible types across subsearches, the system automatically renames conflicting fields with numeric suffixes.

PPL query::

os> | multisearch [search source=accounts | fields firstname, age, balance | head 2] [search source=locations | fields description, age, place_id | head 2];
fetched rows / total rows = 4/4
+-----------+-----+---------+------------------+------+----------+
| firstname | age | balance | description | age0 | place_id |
|-----------+-----+---------+------------------+------+----------|
| Amber | 32 | 39225 | null | null | null |
| Hattie | 36 | 5686 | null | null | null |
| null | null| null | Central Park | old | 1001 |
| null | null| null | Times Square | modern| 1002 |
+-----------+-----+---------+------------------+------+----------+

In this example, the ``age`` field has type ``bigint`` in accounts but type ``string`` in locations. The system keeps the first occurrence as ``age`` (bigint) and renames the second occurrence to ``age0`` (string), preserving all data while avoiding type conflicts.
Loading
Loading