Skip to content

Commit 5888af3

Browse files
authored
[Backport 2.19-dev] Pushdown sort aggregate metrics (#4603) (#4658)
* Pushdown sort aggregate metrics (#4603) * convert sort aggregate metrics to term sort Signed-off-by: Lantao Jin <ltjin@amazon.com> * refactor Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix conflicts Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix conflicts2 Signed-off-by: Lantao Jin <ltjin@amazon.com> * Add more javadoc Signed-off-by: Lantao Jin <ltjin@amazon.com> * address comments Signed-off-by: Lantao Jin <ltjin@amazon.com> * delete incorrect comments Signed-off-by: Lantao Jin <ltjin@amazon.com> * convert composite agg to multi-terms agg for sort metrics on multiple buckets Signed-off-by: Lantao Jin <ltjin@amazon.com> * avoid the case of 'stat count, sum ... | sort count' Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com> (cherry picked from commit 5ebed84) * fix conflicts Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent 2893e87 commit 5888af3

File tree

41 files changed

+1253
-551
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1253
-551
lines changed

core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,22 @@
1414
import com.google.common.collect.ImmutableList;
1515
import java.lang.reflect.Method;
1616
import java.util.ArrayList;
17+
import java.util.HashSet;
1718
import java.util.List;
1819
import java.util.Objects;
20+
import java.util.Set;
1921
import java.util.function.Predicate;
2022
import java.util.stream.Collectors;
2123
import javax.annotation.Nullable;
2224
import org.apache.calcite.plan.RelOptTable;
2325
import org.apache.calcite.rel.RelHomogeneousShuttle;
2426
import org.apache.calcite.rel.RelNode;
2527
import org.apache.calcite.rel.RelShuttle;
28+
import org.apache.calcite.rel.core.Project;
29+
import org.apache.calcite.rel.core.Sort;
2630
import org.apache.calcite.rel.core.TableScan;
2731
import org.apache.calcite.rel.logical.LogicalProject;
32+
import org.apache.calcite.rel.logical.LogicalSort;
2833
import org.apache.calcite.rel.type.RelDataType;
2934
import org.apache.calcite.rex.RexCall;
3035
import org.apache.calcite.rex.RexCorrelVariable;
@@ -38,6 +43,7 @@
3843
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
3944
import org.apache.calcite.sql.type.SqlTypeName;
4045
import org.apache.calcite.tools.RelBuilder;
46+
import org.apache.calcite.util.Pair;
4147
import org.apache.calcite.util.Util;
4248
import org.opensearch.sql.ast.AbstractNodeVisitor;
4349
import org.opensearch.sql.ast.Node;
@@ -501,13 +507,51 @@ public Void visitInputRef(RexInputRef inputRef) {
501507
return selectedColumns;
502508
}
503509

510+
// `RelDecorrelator` may generate a Project with duplicated fields, e.g. Project($0,$0).
511+
// There will be problem if pushing down the pattern like `Aggregate(AGG($0),{1})-Project($0,$0)`,
512+
// as it will lead to field-name conflict.
513+
// We should wait and rely on `AggregateProjectMergeRule` to mitigate it by having this constraint
514+
// Nevertheless, that rule cannot handle all cases if there is RexCall in the Project,
515+
// e.g. Project($0, $0, +($0,1)). We cannot push down the Aggregate for this corner case.
516+
// TODO: Simplify the Project where there is RexCall by adding a new rule.
517+
static boolean distinctProjectList(LogicalProject project) {
518+
// Change to Set<Pair<RexNode, String>> to resolve
519+
// https://github.com/opensearch-project/sql/issues/4347
520+
Set<Pair<RexNode, String>> rexSet = new HashSet<>();
521+
return project.getNamedProjects().stream().allMatch(rexSet::add);
522+
}
523+
524+
static boolean containsRexOver(LogicalProject project) {
525+
return project.getProjects().stream().anyMatch(RexOver::containsOver);
526+
}
527+
528+
/**
529+
* The LogicalSort is a LIMIT that should be pushed down when its fetch field is not null and its
530+
* collation is empty. For example: <code>sort name | head 5</code> should not be pushed down
531+
* because it has a field collation.
532+
*
533+
* @param sort The LogicalSort to check.
534+
* @return True if the LogicalSort is a LIMIT, false otherwise.
535+
*/
536+
static boolean isLogicalSortLimit(LogicalSort sort) {
537+
return sort.fetch != null;
538+
}
539+
540+
static boolean projectContainsExpr(Project project) {
541+
return project.getProjects().stream().anyMatch(p -> p instanceof RexCall);
542+
}
543+
544+
static boolean sortByFieldsOnly(Sort sort) {
545+
return !sort.getCollation().getFieldCollations().isEmpty() && sort.fetch == null;
546+
}
547+
504548
/**
505549
* Get a string representation of the argument types expressed in ExprType for error messages.
506550
*
507551
* @param argTypes the list of argument types as {@link RelDataType}
508552
* @return a string in the format [type1,type2,...] representing the argument types
509553
*/
510-
public static String getActualSignature(List<RelDataType> argTypes) {
554+
static String getActualSignature(List<RelDataType> argTypes) {
511555
return "["
512556
+ argTypes.stream()
513557
.map(OpenSearchTypeFactory::convertRelDataTypeToExprType)

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,24 +1048,118 @@ public void testExplainCountsByAgg() throws IOException {
10481048
}
10491049

10501050
@Test
1051-
public void testExplainSortOnMetricsNoBucketNullable() throws IOException {
1052-
// TODO enhancement later: https://github.com/opensearch-project/sql/issues/4282
1051+
public void testExplainSortOnMetrics() throws IOException {
10531052
enabledOnlyWhenPushdownIsEnabled();
10541053
String expected = loadExpectedPlan("explain_agg_sort_on_metrics1.yaml");
10551054
assertYamlEqualsIgnoreId(
10561055
expected,
10571056
explainQueryYaml(
10581057
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() by"
10591058
+ " state | sort `count()`"));
1060-
10611059
expected = loadExpectedPlan("explain_agg_sort_on_metrics2.yaml");
1060+
assertYamlEqualsIgnoreId(
1061+
expected,
1062+
explainQueryYaml(
1063+
"source=opensearch-sql_test_index_account | stats bucket_nullable=false sum(balance)"
1064+
+ " as sum by state | sort - sum"));
1065+
// TODO limit should pushdown to non-composite agg
1066+
expected = loadExpectedPlan("explain_agg_sort_on_metrics3.yaml");
1067+
assertYamlEqualsIgnoreId(
1068+
expected,
1069+
explainQueryYaml(
1070+
String.format(
1071+
"source=%s | stats count() as cnt by span(birthdate, 1d) | sort - cnt",
1072+
TEST_INDEX_BANK)));
1073+
expected = loadExpectedPlan("explain_agg_sort_on_metrics4.yaml");
1074+
assertYamlEqualsIgnoreId(
1075+
expected,
1076+
explainQueryYaml(
1077+
String.format(
1078+
"source=%s | stats bucket_nullable=false sum(balance) by span(age, 5) | sort -"
1079+
+ " `sum(balance)`",
1080+
TEST_INDEX_BANK)));
1081+
}
1082+
1083+
@Test
1084+
public void testExplainSortOnMetricsMultiTerms() throws IOException {
1085+
enabledOnlyWhenPushdownIsEnabled();
1086+
String expected = loadExpectedPlan("explain_agg_sort_on_metrics_multi_terms.yaml");
10621087
assertYamlEqualsIgnoreId(
10631088
expected,
10641089
explainQueryYaml(
10651090
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() by"
10661091
+ " gender, state | sort `count()`"));
10671092
}
10681093

1094+
@Test
1095+
public void testExplainCompositeMultiBucketsAutoDateThenSortOnMetricsNotPushdown()
1096+
throws IOException {
1097+
enabledOnlyWhenPushdownIsEnabled();
1098+
assertYamlEqualsIgnoreId(
1099+
loadExpectedPlan("agg_composite_multi_terms_autodate_sort_agg_metric_not_push.yaml"),
1100+
explainQueryYaml(
1101+
String.format(
1102+
"source=%s | bin timestamp bins=3 | stats bucket_nullable=false avg(value), count()"
1103+
+ " as cnt by category, value, timestamp | sort cnt",
1104+
TEST_INDEX_TIME_DATA)));
1105+
}
1106+
1107+
@Test
1108+
public void testExplainCompositeRangeThenSortOnMetricsNotPushdown() throws IOException {
1109+
enabledOnlyWhenPushdownIsEnabled();
1110+
assertYamlEqualsIgnoreId(
1111+
loadExpectedPlan("agg_composite_range_sort_agg_metric_not_push.yaml"),
1112+
explainQueryYaml(
1113+
String.format(
1114+
"source=%s | eval value_range = case(value < 7000, 'small'"
1115+
+ " else 'great') | stats bucket_nullable=false avg(value), count() as cnt by"
1116+
+ " value_range, category | sort cnt",
1117+
TEST_INDEX_TIME_DATA)));
1118+
}
1119+
1120+
@Test
1121+
public void testExplainCompositeAutoDateThenSortOnMetricsNotPushdown() throws IOException {
1122+
enabledOnlyWhenPushdownIsEnabled();
1123+
assertYamlEqualsIgnoreId(
1124+
loadExpectedPlan("agg_composite_autodate_sort_agg_metric_not_push.yaml"),
1125+
explainQueryYaml(
1126+
String.format(
1127+
"source=%s | bin timestamp bins=3 | stats bucket_nullable=false avg(value), count()"
1128+
+ " as cnt by timestamp, category | sort cnt",
1129+
TEST_INDEX_TIME_DATA)));
1130+
}
1131+
1132+
@Test
1133+
public void testExplainCompositeRangeAutoDateThenSortOnMetricsNotPushdown() throws IOException {
1134+
enabledOnlyWhenPushdownIsEnabled();
1135+
assertYamlEqualsIgnoreId(
1136+
loadExpectedPlan("agg_composite_autodate_range_metric_sort_agg_metric_not_push.yaml"),
1137+
explainQueryYaml(
1138+
String.format(
1139+
"source=%s | bin timestamp bins=3 | eval value_range = case(value < 7000, 'small'"
1140+
+ " else 'great') | stats bucket_nullable=false avg(value), count() as cnt by"
1141+
+ " timestamp, value_range, category | sort cnt",
1142+
TEST_INDEX_TIME_DATA)));
1143+
}
1144+
1145+
@Test
1146+
public void testExplainMultipleAggregatorsWithSortOnOneMetricNotPushDown() throws IOException {
1147+
enabledOnlyWhenPushdownIsEnabled();
1148+
String expected =
1149+
loadExpectedPlan("explain_multiple_agg_with_sort_on_one_metric_not_push1.yaml");
1150+
assertYamlEqualsIgnoreId(
1151+
expected,
1152+
explainQueryYaml(
1153+
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() as c,"
1154+
+ " sum(balance) as s by state | sort c"));
1155+
expected = loadExpectedPlan("explain_multiple_agg_with_sort_on_one_metric_not_push2.yaml");
1156+
assertYamlEqualsIgnoreId(
1157+
expected,
1158+
explainQueryYaml(
1159+
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() as c,"
1160+
+ " sum(balance) as s by state | sort c, s"));
1161+
}
1162+
10691163
@Test
10701164
public void testExplainEvalMax() throws IOException {
10711165
String expected = loadExpectedPlan("explain_eval_max.json");
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])
5+
LogicalProject(avg(value)=[$3], cnt=[$4], timestamp=[$0], value_range=[$1], category=[$2])
6+
LogicalAggregate(group=[{0, 1, 2}], avg(value)=[AVG($3)], cnt=[COUNT()])
7+
LogicalProject(timestamp=[$9], value_range=[$10], category=[$1], value=[$2])
8+
LogicalFilter(condition=[AND(IS NOT NULL($9), IS NOT NULL($1))])
9+
LogicalProject(@timestamp=[$0], category=[$1], value=[$2], _id=[$4], _index=[$5], _score=[$6], _maxscore=[$7], _sort=[$8], _routing=[$9], timestamp=[WIDTH_BUCKET($3, 3, -(MAX($3) OVER (), MIN($3) OVER ()), MAX($3) OVER ())], value_range=[CASE(<($2, 7000), 'small':VARCHAR, 'great':VARCHAR)])
10+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]])
11+
physical: |
12+
EnumerableLimit(fetch=[10000])
13+
EnumerableSort(sort0=[$1], dir0=[ASC-nulls-first])
14+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2, 3},avg(value)=AVG($1),cnt=COUNT()), PROJECT->[avg(value), cnt, timestamp, value_range, category]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"timestamp":{"auto_date_histogram":{"field":"timestamp","buckets":3,"minimum_interval":null},"aggregations":{"value_range":{"range":{"field":"value","ranges":[{"key":"small","to":7000.0},{"key":"great","from":7000.0}],"keyed":true},"aggregations":{"avg(value)":{"avg":{"field":"value"}}}}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])
5+
LogicalProject(avg(value)=[$2], cnt=[$3], timestamp=[$0], category=[$1])
6+
LogicalAggregate(group=[{0, 1}], avg(value)=[AVG($2)], cnt=[COUNT()])
7+
LogicalProject(timestamp=[$9], category=[$1], value=[$2])
8+
LogicalFilter(condition=[AND(IS NOT NULL($9), IS NOT NULL($1))])
9+
LogicalProject(@timestamp=[$0], category=[$1], value=[$2], _id=[$4], _index=[$5], _score=[$6], _maxscore=[$7], _sort=[$8], _routing=[$9], timestamp=[WIDTH_BUCKET($3, 3, -(MAX($3) OVER (), MIN($3) OVER ()), MAX($3) OVER ())])
10+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]])
11+
physical: |
12+
EnumerableLimit(fetch=[10000])
13+
EnumerableSort(sort0=[$1], dir0=[ASC-nulls-first])
14+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},avg(value)=AVG($1),cnt=COUNT()), PROJECT->[avg(value), cnt, timestamp, category]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"timestamp":{"auto_date_histogram":{"field":"timestamp","buckets":3,"minimum_interval":null},"aggregations":{"avg(value)":{"avg":{"field":"value"}}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])
5+
LogicalProject(avg(value)=[$3], cnt=[$4], category=[$0], value=[$1], timestamp=[$2])
6+
LogicalAggregate(group=[{0, 1, 2}], avg(value)=[AVG($1)], cnt=[COUNT()])
7+
LogicalProject(category=[$1], value=[$2], timestamp=[$9])
8+
LogicalFilter(condition=[AND(IS NOT NULL($1), IS NOT NULL($2), IS NOT NULL($9))])
9+
LogicalProject(@timestamp=[$0], category=[$1], value=[$2], _id=[$4], _index=[$5], _score=[$6], _maxscore=[$7], _sort=[$8], _routing=[$9], timestamp=[WIDTH_BUCKET($3, 3, -(MAX($3) OVER (), MIN($3) OVER ()), MAX($3) OVER ())])
10+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]])
11+
physical: |
12+
EnumerableLimit(fetch=[10000])
13+
EnumerableSort(sort0=[$1], dir0=[ASC-nulls-first])
14+
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t1):DOUBLE], avg(value)=[$t4], cnt=[$t3], category=[$t0], value=[$t1], timestamp=[$t2])
15+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1, 2},cnt=COUNT())], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":false,"order":"asc"}}},{"value":{"terms":{"field":"value","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"timestamp":{"auto_date_histogram":{"field":"timestamp","buckets":3,"minimum_interval":null}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])
5+
LogicalProject(avg(value)=[$2], cnt=[$3], value_range=[$0], category=[$1])
6+
LogicalAggregate(group=[{0, 1}], avg(value)=[AVG($2)], cnt=[COUNT()])
7+
LogicalProject(value_range=[$10], category=[$1], value=[$2])
8+
LogicalFilter(condition=[IS NOT NULL($1)])
9+
LogicalProject(@timestamp=[$0], category=[$1], value=[$2], timestamp=[$3], _id=[$4], _index=[$5], _score=[$6], _maxscore=[$7], _sort=[$8], _routing=[$9], value_range=[CASE(<($2, 7000), 'small':VARCHAR, 'great':VARCHAR)])
10+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]])
11+
physical: |
12+
EnumerableLimit(fetch=[10000])
13+
EnumerableSort(sort0=[$1], dir0=[ASC-nulls-first])
14+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},avg(value)=AVG($1),cnt=COUNT()), PROJECT->[avg(value), cnt, value_range, category]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"value_range":{"range":{"field":"value","ranges":[{"key":"small","to":7000.0},{"key":"great","from":7000.0}],"keyed":true},"aggregations":{"avg(value)":{"avg":{"field":"value"}}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics1.json

Whitespace-only changes.

integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics1.yaml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,4 @@ calcite:
88
LogicalFilter(condition=[IS NOT NULL($7)])
99
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1010
physical: |
11-
EnumerableLimit(fetch=[10000])
12-
EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first])
13-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), state]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
11+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), SORT_AGG_METRICS->[1 ASC FIRST], PROJECT->[count(), state], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"state":{"terms":{"field":"state.keyword","size":1000,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"count()":"asc"},{"_key":"asc"}]},"aggregations":{"count()":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
calcite:
22
logical: |
3-
LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])
4-
LogicalSort(sort0=[$0], dir0=[ASC-nulls-first])
5-
LogicalProject(count()=[$2], gender=[$0], state=[$1])
6-
LogicalAggregate(group=[{0, 1}], count()=[COUNT()])
7-
LogicalProject(gender=[$4], state=[$7])
8-
LogicalFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($7))])
3+
LogicalSystemLimit(sort0=[$0], dir0=[DESC-nulls-last], fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalSort(sort0=[$0], dir0=[DESC-nulls-last])
5+
LogicalProject(sum=[$1], state=[$0])
6+
LogicalAggregate(group=[{0}], sum=[SUM($1)])
7+
LogicalProject(state=[$7], balance=[$3])
8+
LogicalFilter(condition=[IS NOT NULL($7)])
99
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1010
physical: |
11-
EnumerableLimit(fetch=[10000])
12-
EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first])
13-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count()=COUNT()), PROJECT->[count(), gender, state]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":false,"order":"asc"}}},{"state":{"terms":{"field":"state.keyword","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
11+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1},sum=SUM($0)), SORT_AGG_METRICS->[1 DESC LAST], PROJECT->[sum, state], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"state":{"terms":{"field":"state.keyword","size":1000,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"sum":"desc"},{"_key":"asc"}]},"aggregations":{"sum":{"sum":{"field":"balance"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

0 commit comments

Comments
 (0)