Skip to content

Commit 5ebed84

Browse files
authored
Pushdown sort aggregate metrics (#4603)
* convert sort aggregate metrics to term sort Signed-off-by: Lantao Jin <ltjin@amazon.com> * refactor Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix conflicts Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix conflicts2 Signed-off-by: Lantao Jin <ltjin@amazon.com> * Add more javadoc Signed-off-by: Lantao Jin <ltjin@amazon.com> * address comments Signed-off-by: Lantao Jin <ltjin@amazon.com> * delete incorrect comments Signed-off-by: Lantao Jin <ltjin@amazon.com> * convert composite agg to multi-terms agg for sort metrics on multiple buckets Signed-off-by: Lantao Jin <ltjin@amazon.com> * avoid the case of 'stat count, sum ... | sort count' Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent d719670 commit 5ebed84

File tree

41 files changed

+1156
-500
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1156
-500
lines changed

core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,22 @@
1414
import com.google.common.collect.ImmutableList;
1515
import java.lang.reflect.Method;
1616
import java.util.ArrayList;
17+
import java.util.HashSet;
1718
import java.util.List;
1819
import java.util.Objects;
20+
import java.util.Set;
1921
import java.util.function.Predicate;
2022
import java.util.stream.Collectors;
2123
import javax.annotation.Nullable;
2224
import org.apache.calcite.plan.RelOptTable;
2325
import org.apache.calcite.rel.RelHomogeneousShuttle;
2426
import org.apache.calcite.rel.RelNode;
2527
import org.apache.calcite.rel.RelShuttle;
28+
import org.apache.calcite.rel.core.Project;
29+
import org.apache.calcite.rel.core.Sort;
2630
import org.apache.calcite.rel.core.TableScan;
2731
import org.apache.calcite.rel.logical.LogicalProject;
32+
import org.apache.calcite.rel.logical.LogicalSort;
2833
import org.apache.calcite.rel.type.RelDataType;
2934
import org.apache.calcite.rex.RexCall;
3035
import org.apache.calcite.rex.RexCorrelVariable;
@@ -38,6 +43,7 @@
3843
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
3944
import org.apache.calcite.sql.type.SqlTypeName;
4045
import org.apache.calcite.tools.RelBuilder;
46+
import org.apache.calcite.util.Pair;
4147
import org.apache.calcite.util.Util;
4248
import org.opensearch.sql.ast.AbstractNodeVisitor;
4349
import org.opensearch.sql.ast.Node;
@@ -474,13 +480,51 @@ public Void visitInputRef(RexInputRef inputRef) {
474480
return selectedColumns;
475481
}
476482

483+
// `RelDecorrelator` may generate a Project with duplicated fields, e.g. Project($0,$0).
484+
// There will be problem if pushing down the pattern like `Aggregate(AGG($0),{1})-Project($0,$0)`,
485+
// as it will lead to field-name conflict.
486+
// We should wait and rely on `AggregateProjectMergeRule` to mitigate it by having this constraint
487+
// Nevertheless, that rule cannot handle all cases if there is RexCall in the Project,
488+
// e.g. Project($0, $0, +($0,1)). We cannot push down the Aggregate for this corner case.
489+
// TODO: Simplify the Project where there is RexCall by adding a new rule.
490+
static boolean distinctProjectList(LogicalProject project) {
491+
// Change to Set<Pair<RexNode, String>> to resolve
492+
// https://github.com/opensearch-project/sql/issues/4347
493+
Set<Pair<RexNode, String>> rexSet = new HashSet<>();
494+
return project.getNamedProjects().stream().allMatch(rexSet::add);
495+
}
496+
497+
static boolean containsRexOver(LogicalProject project) {
498+
return project.getProjects().stream().anyMatch(RexOver::containsOver);
499+
}
500+
501+
/**
502+
* The LogicalSort is a LIMIT that should be pushed down when its fetch field is not null and its
503+
* collation is empty. For example: <code>sort name | head 5</code> should not be pushed down
504+
* because it has a field collation.
505+
*
506+
* @param sort The LogicalSort to check.
507+
* @return True if the LogicalSort is a LIMIT, false otherwise.
508+
*/
509+
static boolean isLogicalSortLimit(LogicalSort sort) {
510+
return sort.fetch != null;
511+
}
512+
513+
static boolean projectContainsExpr(Project project) {
514+
return project.getProjects().stream().anyMatch(p -> p instanceof RexCall);
515+
}
516+
517+
static boolean sortByFieldsOnly(Sort sort) {
518+
return !sort.getCollation().getFieldCollations().isEmpty() && sort.fetch == null;
519+
}
520+
477521
/**
478522
* Get a string representation of the argument types expressed in ExprType for error messages.
479523
*
480524
* @param argTypes the list of argument types as {@link RelDataType}
481525
* @return a string in the format [type1,type2,...] representing the argument types
482526
*/
483-
public static String getActualSignature(List<RelDataType> argTypes) {
527+
static String getActualSignature(List<RelDataType> argTypes) {
484528
return "["
485529
+ argTypes.stream()
486530
.map(OpenSearchTypeFactory::convertRelDataTypeToExprType)

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 97 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,24 +1018,118 @@ public void testExplainCountsByAgg() throws IOException {
10181018
}
10191019

10201020
@Test
1021-
public void testExplainSortOnMetricsNoBucketNullable() throws IOException {
1022-
// TODO enhancement later: https://github.com/opensearch-project/sql/issues/4282
1021+
public void testExplainSortOnMetrics() throws IOException {
10231022
enabledOnlyWhenPushdownIsEnabled();
10241023
String expected = loadExpectedPlan("explain_agg_sort_on_metrics1.yaml");
10251024
assertYamlEqualsIgnoreId(
10261025
expected,
10271026
explainQueryYaml(
10281027
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() by"
10291028
+ " state | sort `count()`"));
1030-
10311029
expected = loadExpectedPlan("explain_agg_sort_on_metrics2.yaml");
1030+
assertYamlEqualsIgnoreId(
1031+
expected,
1032+
explainQueryYaml(
1033+
"source=opensearch-sql_test_index_account | stats bucket_nullable=false sum(balance)"
1034+
+ " as sum by state | sort - sum"));
1035+
// TODO limit should pushdown to non-composite agg
1036+
expected = loadExpectedPlan("explain_agg_sort_on_metrics3.yaml");
1037+
assertYamlEqualsIgnoreId(
1038+
expected,
1039+
explainQueryYaml(
1040+
String.format(
1041+
"source=%s | stats count() as cnt by span(birthdate, 1d) | sort - cnt",
1042+
TEST_INDEX_BANK)));
1043+
expected = loadExpectedPlan("explain_agg_sort_on_metrics4.yaml");
1044+
assertYamlEqualsIgnoreId(
1045+
expected,
1046+
explainQueryYaml(
1047+
String.format(
1048+
"source=%s | stats bucket_nullable=false sum(balance) by span(age, 5) | sort -"
1049+
+ " `sum(balance)`",
1050+
TEST_INDEX_BANK)));
1051+
}
1052+
1053+
@Test
1054+
public void testExplainSortOnMetricsMultiTerms() throws IOException {
1055+
enabledOnlyWhenPushdownIsEnabled();
1056+
String expected = loadExpectedPlan("explain_agg_sort_on_metrics_multi_terms.yaml");
10321057
assertYamlEqualsIgnoreId(
10331058
expected,
10341059
explainQueryYaml(
10351060
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() by"
10361061
+ " gender, state | sort `count()`"));
10371062
}
10381063

1064+
@Test
1065+
public void testExplainCompositeMultiBucketsAutoDateThenSortOnMetricsNotPushdown()
1066+
throws IOException {
1067+
enabledOnlyWhenPushdownIsEnabled();
1068+
assertYamlEqualsIgnoreId(
1069+
loadExpectedPlan("agg_composite_multi_terms_autodate_sort_agg_metric_not_push.yaml"),
1070+
explainQueryYaml(
1071+
String.format(
1072+
"source=%s | bin timestamp bins=3 | stats bucket_nullable=false avg(value), count()"
1073+
+ " as cnt by category, value, timestamp | sort cnt",
1074+
TEST_INDEX_TIME_DATA)));
1075+
}
1076+
1077+
@Test
1078+
public void testExplainCompositeRangeThenSortOnMetricsNotPushdown() throws IOException {
1079+
enabledOnlyWhenPushdownIsEnabled();
1080+
assertYamlEqualsIgnoreId(
1081+
loadExpectedPlan("agg_composite_range_sort_agg_metric_not_push.yaml"),
1082+
explainQueryYaml(
1083+
String.format(
1084+
"source=%s | eval value_range = case(value < 7000, 'small'"
1085+
+ " else 'great') | stats bucket_nullable=false avg(value), count() as cnt by"
1086+
+ " value_range, category | sort cnt",
1087+
TEST_INDEX_TIME_DATA)));
1088+
}
1089+
1090+
@Test
1091+
public void testExplainCompositeAutoDateThenSortOnMetricsNotPushdown() throws IOException {
1092+
enabledOnlyWhenPushdownIsEnabled();
1093+
assertYamlEqualsIgnoreId(
1094+
loadExpectedPlan("agg_composite_autodate_sort_agg_metric_not_push.yaml"),
1095+
explainQueryYaml(
1096+
String.format(
1097+
"source=%s | bin timestamp bins=3 | stats bucket_nullable=false avg(value), count()"
1098+
+ " as cnt by timestamp, category | sort cnt",
1099+
TEST_INDEX_TIME_DATA)));
1100+
}
1101+
1102+
@Test
1103+
public void testExplainCompositeRangeAutoDateThenSortOnMetricsNotPushdown() throws IOException {
1104+
enabledOnlyWhenPushdownIsEnabled();
1105+
assertYamlEqualsIgnoreId(
1106+
loadExpectedPlan("agg_composite_autodate_range_metric_sort_agg_metric_not_push.yaml"),
1107+
explainQueryYaml(
1108+
String.format(
1109+
"source=%s | bin timestamp bins=3 | eval value_range = case(value < 7000, 'small'"
1110+
+ " else 'great') | stats bucket_nullable=false avg(value), count() as cnt by"
1111+
+ " timestamp, value_range, category | sort cnt",
1112+
TEST_INDEX_TIME_DATA)));
1113+
}
1114+
1115+
@Test
1116+
public void testExplainMultipleAggregatorsWithSortOnOneMetricNotPushDown() throws IOException {
1117+
enabledOnlyWhenPushdownIsEnabled();
1118+
String expected =
1119+
loadExpectedPlan("explain_multiple_agg_with_sort_on_one_metric_not_push1.yaml");
1120+
assertYamlEqualsIgnoreId(
1121+
expected,
1122+
explainQueryYaml(
1123+
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() as c,"
1124+
+ " sum(balance) as s by state | sort c"));
1125+
expected = loadExpectedPlan("explain_multiple_agg_with_sort_on_one_metric_not_push2.yaml");
1126+
assertYamlEqualsIgnoreId(
1127+
expected,
1128+
explainQueryYaml(
1129+
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() as c,"
1130+
+ " sum(balance) as s by state | sort c, s"));
1131+
}
1132+
10391133
@Test
10401134
public void testExplainEvalMax() throws IOException {
10411135
String expected = loadExpectedPlan("explain_eval_max.json");
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])
5+
LogicalProject(avg(value)=[$3], cnt=[$4], timestamp=[$0], value_range=[$1], category=[$2])
6+
LogicalAggregate(group=[{0, 1, 2}], avg(value)=[AVG($3)], cnt=[COUNT()])
7+
LogicalProject(timestamp=[$9], value_range=[$10], category=[$1], value=[$2])
8+
LogicalFilter(condition=[AND(IS NOT NULL($9), IS NOT NULL($1))])
9+
LogicalProject(@timestamp=[$0], category=[$1], value=[$2], _id=[$4], _index=[$5], _score=[$6], _maxscore=[$7], _sort=[$8], _routing=[$9], timestamp=[WIDTH_BUCKET($3, 3, -(MAX($3) OVER (), MIN($3) OVER ()), MAX($3) OVER ())], value_range=[CASE(<($2, 7000), 'small':VARCHAR, 'great':VARCHAR)])
10+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]])
11+
physical: |
12+
EnumerableLimit(fetch=[10000])
13+
EnumerableSort(sort0=[$1], dir0=[ASC-nulls-first])
14+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2, 3},avg(value)=AVG($1),cnt=COUNT()), PROJECT->[avg(value), cnt, timestamp, value_range, category]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"timestamp":{"auto_date_histogram":{"field":"timestamp","buckets":3,"minimum_interval":null},"aggregations":{"value_range":{"range":{"field":"value","ranges":[{"key":"small","to":7000.0},{"key":"great","from":7000.0}],"keyed":true},"aggregations":{"avg(value)":{"avg":{"field":"value"}}}}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])
5+
LogicalProject(avg(value)=[$2], cnt=[$3], timestamp=[$0], category=[$1])
6+
LogicalAggregate(group=[{0, 1}], avg(value)=[AVG($2)], cnt=[COUNT()])
7+
LogicalProject(timestamp=[$9], category=[$1], value=[$2])
8+
LogicalFilter(condition=[AND(IS NOT NULL($9), IS NOT NULL($1))])
9+
LogicalProject(@timestamp=[$0], category=[$1], value=[$2], _id=[$4], _index=[$5], _score=[$6], _maxscore=[$7], _sort=[$8], _routing=[$9], timestamp=[WIDTH_BUCKET($3, 3, -(MAX($3) OVER (), MIN($3) OVER ()), MAX($3) OVER ())])
10+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]])
11+
physical: |
12+
EnumerableLimit(fetch=[10000])
13+
EnumerableSort(sort0=[$1], dir0=[ASC-nulls-first])
14+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},avg(value)=AVG($1),cnt=COUNT()), PROJECT->[avg(value), cnt, timestamp, category]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"timestamp":{"auto_date_histogram":{"field":"timestamp","buckets":3,"minimum_interval":null},"aggregations":{"avg(value)":{"avg":{"field":"value"}}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])
5+
LogicalProject(avg(value)=[$3], cnt=[$4], category=[$0], value=[$1], timestamp=[$2])
6+
LogicalAggregate(group=[{0, 1, 2}], avg(value)=[AVG($1)], cnt=[COUNT()])
7+
LogicalProject(category=[$1], value=[$2], timestamp=[$9])
8+
LogicalFilter(condition=[AND(IS NOT NULL($1), IS NOT NULL($2), IS NOT NULL($9))])
9+
LogicalProject(@timestamp=[$0], category=[$1], value=[$2], _id=[$4], _index=[$5], _score=[$6], _maxscore=[$7], _sort=[$8], _routing=[$9], timestamp=[WIDTH_BUCKET($3, 3, -(MAX($3) OVER (), MIN($3) OVER ()), MAX($3) OVER ())])
10+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]])
11+
physical: |
12+
EnumerableLimit(fetch=[10000])
13+
EnumerableSort(sort0=[$1], dir0=[ASC-nulls-first])
14+
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t1):DOUBLE], avg(value)=[$t4], cnt=[$t3], category=[$t0], value=[$t1], timestamp=[$t2])
15+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1, 2},cnt=COUNT())], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":false,"order":"asc"}}},{"value":{"terms":{"field":"value","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"timestamp":{"auto_date_histogram":{"field":"timestamp","buckets":3,"minimum_interval":null}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])
5+
LogicalProject(avg(value)=[$2], cnt=[$3], value_range=[$0], category=[$1])
6+
LogicalAggregate(group=[{0, 1}], avg(value)=[AVG($2)], cnt=[COUNT()])
7+
LogicalProject(value_range=[$10], category=[$1], value=[$2])
8+
LogicalFilter(condition=[IS NOT NULL($1)])
9+
LogicalProject(@timestamp=[$0], category=[$1], value=[$2], timestamp=[$3], _id=[$4], _index=[$5], _score=[$6], _maxscore=[$7], _sort=[$8], _routing=[$9], value_range=[CASE(<($2, 7000), 'small':VARCHAR, 'great':VARCHAR)])
10+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]])
11+
physical: |
12+
EnumerableLimit(fetch=[10000])
13+
EnumerableSort(sort0=[$1], dir0=[ASC-nulls-first])
14+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_time_data]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 2},avg(value)=AVG($1),cnt=COUNT()), PROJECT->[avg(value), cnt, value_range, category]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"category":{"terms":{"field":"category","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"value_range":{"range":{"field":"value","ranges":[{"key":"small","to":7000.0},{"key":"great","from":7000.0}],"keyed":true},"aggregations":{"avg(value)":{"avg":{"field":"value"}}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics1.json

Lines changed: 0 additions & 6 deletions
This file was deleted.

integ-test/src/test/resources/expectedOutput/calcite/explain_agg_sort_on_metrics1.yaml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,4 @@ calcite:
88
LogicalFilter(condition=[IS NOT NULL($7)])
99
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
1010
physical: |
11-
EnumerableLimit(fetch=[10000])
12-
EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first])
13-
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), state]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
11+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), SORT_AGG_METRICS->[1 ASC FIRST], PROJECT->[count(), state], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"state":{"terms":{"field":"state.keyword","size":1000,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":[{"count()":"asc"},{"_key":"asc"}]},"aggregations":{"count()":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

0 commit comments

Comments
 (0)