- 
                Notifications
    
You must be signed in to change notification settings  - Fork 177
 
          Fixes for Multisearch and Append command
          #4512
        
          New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1a1eb0a
              3fbe4df
              01b0618
              31931cf
              ae5dbb1
              527fb1c
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -7,29 +7,27 @@ | |
| 
     | 
||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.calcite.rel.RelNode; | ||
| import org.apache.calcite.rel.type.RelDataType; | ||
| import org.apache.calcite.rel.type.RelDataTypeField; | ||
| import org.apache.calcite.rex.RexNode; | ||
| import org.apache.calcite.sql.validate.SqlValidatorUtil; | ||
| 
     | 
||
| /** | ||
| * Utility class for unifying schemas across multiple RelNodes with type conflict resolution. Uses | ||
| * the same strategy as append command - renames conflicting fields to avoid type conflicts. | ||
| * Utility class for unifying schemas across multiple RelNodes. Throws an exception when type | ||
| * conflicts are detected. | ||
| */ | ||
| public class SchemaUnifier { | ||
| 
     | 
||
| /** | ||
| * Builds a unified schema for multiple nodes with type conflict resolution. | ||
| * Builds a unified schema for multiple nodes. Throws an exception if type conflicts are detected. | ||
| * | ||
| * @param nodes List of RelNodes to unify schemas for | ||
| * @param context Calcite plan context | ||
| * @return List of projected RelNodes with unified schema | ||
| * @throws IllegalArgumentException if type conflicts are detected | ||
| */ | ||
| public static List<RelNode> buildUnifiedSchemaWithConflictResolution( | ||
| List<RelNode> nodes, CalcitePlanContext context) { | ||
| 
        
          
        
         | 
    @@ -41,7 +39,7 @@ public static List<RelNode> buildUnifiedSchemaWithConflictResolution( | |
| return nodes; | ||
| } | ||
| 
     | 
||
| // Step 1: Build the unified schema by processing all nodes | ||
| // Step 1: Build the unified schema by processing all nodes (throws on conflict) | ||
| List<SchemaField> unifiedSchema = buildUnifiedSchema(nodes); | ||
| 
     | 
||
| // Step 2: Create projections for each node to align with unified schema | ||
| 
        
          
        
         | 
    @@ -55,47 +53,37 @@ public static List<RelNode> buildUnifiedSchemaWithConflictResolution( | |
| projectedNodes.add(projectedNode); | ||
| } | ||
| 
     | 
||
| // Step 3: Unify names to handle type conflicts (this creates age0, age1, etc.) | ||
| List<String> uniqueNames = | ||
| SqlValidatorUtil.uniquify(fieldNames, SqlValidatorUtil.EXPR_SUGGESTER, true); | ||
| 
     | 
||
| // Step 4: Re-project with unique names if needed | ||
| if (!uniqueNames.equals(fieldNames)) { | ||
| List<RelNode> renamedNodes = new ArrayList<>(); | ||
| for (RelNode node : projectedNodes) { | ||
| RelNode renamedNode = | ||
| context.relBuilder.push(node).project(context.relBuilder.fields(), uniqueNames).build(); | ||
| renamedNodes.add(renamedNode); | ||
| } | ||
| return renamedNodes; | ||
| } | ||
| 
     | 
||
| return projectedNodes; | ||
| } | ||
| 
     | 
||
| /** | ||
| * Builds a unified schema by merging fields from all nodes. Fields with the same name but | ||
| * different types are added as separate entries (which will be renamed during uniquification). | ||
| * Builds a unified schema by merging fields from all nodes. Throws an exception if fields with | ||
| * the same name have different types. | ||
| * | ||
| * @param nodes List of RelNodes to merge schemas from | ||
| * @return List of SchemaField representing the unified schema (may contain duplicate names) | ||
| * @return List of SchemaField representing the unified schema | ||
| * @throws IllegalArgumentException if type conflicts are detected | ||
| */ | ||
| private static List<SchemaField> buildUnifiedSchema(List<RelNode> nodes) { | ||
| List<SchemaField> schema = new ArrayList<>(); | ||
| Map<String, Set<RelDataType>> seenFields = new HashMap<>(); | ||
| Map<String, RelDataType> seenFields = new HashMap<>(); | ||
| 
     | 
||
| for (RelNode node : nodes) { | ||
| for (RelDataTypeField field : node.getRowType().getFieldList()) { | ||
| String fieldName = field.getName(); | ||
| RelDataType fieldType = field.getType(); | ||
| 
     | 
||
| // Track which (name, type) combinations we've seen | ||
| Set<RelDataType> typesForName = seenFields.computeIfAbsent(fieldName, k -> new HashSet<>()); | ||
| 
     | 
||
| if (!typesForName.contains(fieldType)) { | ||
| // New field or same name with different type - add to schema | ||
| RelDataType existingType = seenFields.get(fieldName); | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We recently find another issue of type conflicts here. RelDataType evaluates the hash equality by its digested string as well. For example, "INTEGER" is not equal to "INTEGER NOT NULL". A quick fix would be aligning the same SqlType to be nullable. Ideally it won't affect the data type resolution while execution. cc @xinyual There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can consider to allow same SqlTypeName but with different nullability to be merged here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the suggestion! I have updated the implementation to allow same SqlTypeName but with different nullability to be merged  | 
||
| if (existingType == null) { | ||
| // New field - add to schema | ||
| schema.add(new SchemaField(fieldName, fieldType)); | ||
| typesForName.add(fieldType); | ||
| seenFields.put(fieldName, fieldType); | ||
| } else if (!areTypesCompatible(existingType, fieldType)) { | ||
| // Same field name but different type - throw exception | ||
| throw new IllegalArgumentException( | ||
| String.format( | ||
| "Unable to process column '%s' due to incompatible types: '%s' and '%s'", | ||
| fieldName, existingType.getSqlTypeName(), fieldType.getSqlTypeName())); | ||
| } | ||
| // If we've seen this exact (name, type) combination, skip it | ||
| } | ||
| 
        
          
        
         | 
    @@ -104,6 +92,10 @@ private static List<SchemaField> buildUnifiedSchema(List<RelNode> nodes) { | |
| return schema; | ||
| } | ||
| 
     | 
||
| private static boolean areTypesCompatible(RelDataType type1, RelDataType type2) { | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is another concern of using this method to allow type merge. If index A's RelDataType 'INTEGER NOT NULL' is put to unified schema, index B's same name RelDataType 'INTEGER' will be merged silently. Index B's column values could contain NULL values. The generated code could ignore the null check because the merged unified schema has 'iNTEGER NOT NULL'. It will probably throw NPE when merging index B's NULL values. We could write some query to double check if we can reproduce this scenario. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried some queries in my local test. Haven't seen such NPE errors yet. Not sure if there is edge case. But for now we can leave it there until needed fix.  | 
||
| return type1.getSqlTypeName() != null && type1.getSqlTypeName().equals(type2.getSqlTypeName()); | ||
| } | ||
| 
     | 
||
| /** | ||
| * Builds a projection for a node to align with the unified schema. For each field in the unified | ||
| * schema: - If the node has a matching field with the same type, use it - Otherwise, project NULL | ||
| 
        
          
        
         | 
    @@ -125,8 +117,8 @@ private static List<RexNode> buildProjectionForNode( | |
| RelDataType expectedType = schemaField.getType(); | ||
| RelDataTypeField nodeField = nodeFieldMap.get(fieldName); | ||
| 
     | 
||
| if (nodeField != null && nodeField.getType().equals(expectedType)) { | ||
| // Field exists with matching type - use it | ||
| if (nodeField != null && areTypesCompatible(nodeField.getType(), expectedType)) { | ||
| // Field exists with compatible type - use it | ||
| projection.add(context.rexBuilder.makeInputRef(node, nodeField.getIndex())); | ||
| } else { | ||
| // Field missing or type mismatch - project NULL | ||
| 
          
            
          
           | 
    ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -24,6 +24,11 @@ append <sub-search> | |
| 
     | 
||
| * sub-search: mandatory. Executes PPL commands as a secondary search. | ||
| 
     | 
||
| Limitations | ||
| =========== | ||
| 
     | 
||
| * **Schema Compatibility**: When fields with the same name exist between the main search and sub-search but have incompatible types, the query will fail with an error. To avoid type conflicts, ensure that fields with the same name have the same data type, or use different field names (e.g., by renaming with ``eval`` or using ``fields`` to select non-conflicting columns). | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Understand the intention here. Strong schema engine like SQL restricts the type to be the same. Some weak schema engine resolves types at runtime and doesn't care the data type. I think it's not easy to make it compatible. Not sure what's better user experience and customer expectation here. Does user accept this behavior or expect to union anyway? cc @LantaoJin There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. Thanks for the change.  | 
||
| 
     | 
||
| Example 1: Append rows from a count aggregation to existing search result | ||
| =============================================================== | ||
| 
     | 
||
| 
          
            
          
           | 
    @@ -64,23 +69,3 @@ PPL query:: | |
| | 101 | M | null | | ||
| +-----+--------+-------+ | ||
| 
     | 
||
| Example 3: Append rows with column type conflict | ||
| ============================================= | ||
| 
     | 
||
| This example shows how column type conflicts are handled when appending results. Same name columns with different types will generate two different columns in appended result. | ||
| 
     | 
||
| PPL query:: | ||
| 
     | 
||
| os> source=accounts | stats sum(age) as sum by gender, state | sort -sum | head 5 | append [ source=accounts | stats sum(age) as sum by gender | eval sum = cast(sum as double) ]; | ||
| fetched rows / total rows = 6/6 | ||
| +------+--------+-------+-------+ | ||
| | sum | gender | state | sum0 | | ||
| |------+--------+-------+-------| | ||
| | 36 | M | TN | null | | ||
| | 33 | M | MD | null | | ||
| | 32 | M | IL | null | | ||
| | 28 | F | VA | null | | ||
| | null | F | null | 28.0 | | ||
| | null | M | null | 101.0 | | ||
| +------+--------+-------+-------+ | ||
| 
     | 
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (non-blocking): Should we think about indices with different timestamp field names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently it is set as a limitation: we want to only support @timestamp