Skip to content

Commit b116dd6

Browse files
authored
[Backport 2.19-dev] Pushdown case function in aggregations as range queries (#4400) (#4630)
* Pushdown case function in aggregations as range queries (#4400) * WIP: implementing case range analyzer Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Correct case analyzer Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Create bucket aggregation parsers that supports parsing nested sub aggregations Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Fix unit tests Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Fix parsers to multi-range cases Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Update leaf bucket parser Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Unit test case range analyzer Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Add explain ITs for pushing down case in aggregations Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Update CaseRangeAnalyzerTest Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Add a yaml test that replicates issue 4201 Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Add integration tests for case in aggregation Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Fix unit tests Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Add a patch to CalcitePPLCaseFunctionIT Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Migrate all composite aggregation parser usage to bucket aggregate parser Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Create a parent abstract classes for BucketAggregationParsers Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Remove an unnecessary bucket agg in AggregationQueryBuilder Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Test pushing down case where there exists null values Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Return empty in CaseRangeAnalyzer to unblock the rest pushdown - Additionally test number as result expressions Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Document limitations of pushding case as range queries Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Make case pushdown a private method Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Chores: remove unused helper method Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Unify logics for creating nested aggregations Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Remove a note in condition.rst Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Optmize range aggregation Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Ignore testNestedAggregationsExplain when pushdown is disabled Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Fix explain ITs after merge Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> --------- Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> (cherry picked from commit 18ab4dc) # Conflicts: # integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java * Downgrade langauge level to java 11 Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Delete an IT due to system-incompatible formatting issue Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> --------- Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent c36ea9e commit b116dd6

File tree

42 files changed

+2222
-162
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+2222
-162
lines changed

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,19 +1032,15 @@ void populate() {
10321032
XOR,
10331033
SqlStdOperatorTable.NOT_EQUALS,
10341034
PPLTypeChecker.family(SqlTypeFamily.BOOLEAN, SqlTypeFamily.BOOLEAN));
1035-
// SqlStdOperatorTable.CASE.getOperandTypeChecker is null. We manually create a
1036-
// type checker
1037-
// for it. The second and third operands are required to be of the same type. If
1038-
// not,
1039-
// it will throw an IllegalArgumentException with information Can't find
1040-
// leastRestrictive type
1035+
// SqlStdOperatorTable.CASE.getOperandTypeChecker is null. We manually create a type checker
1036+
// for it. The second and third operands are required to be of the same type. If not, it will
1037+
// throw an IllegalArgumentException with information Can't find leastRestrictive type
10411038
registerOperator(
10421039
IF,
10431040
SqlStdOperatorTable.CASE,
10441041
PPLTypeChecker.family(SqlTypeFamily.BOOLEAN, SqlTypeFamily.ANY, SqlTypeFamily.ANY));
10451042
// Re-define the type checker for is not null, is present, and is null since
1046-
// their original
1047-
// type checker ANY isn't compatible with struct types.
1043+
// their original type checker ANY isn't compatible with struct types.
10481044
registerOperator(
10491045
IS_NOT_NULL,
10501046
SqlStdOperatorTable.IS_NOT_NULL,

docs/user/ppl/functions/condition.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,14 @@ Argument type: all the supported data type, (NOTE : there is no comma before "el
207207

208208
Return type: any
209209

210+
Limitations
211+
>>>>>>>>>>>
212+
213+
When each condition is a field comparison with a numeric literal and each result expression is a string literal, the query will be optimized as `range aggregations <https://docs.opensearch.org/latest/aggregations/bucket/range>`_ if pushdown optimization is enabled. However, this optimization has the following limitations:
214+
215+
- Null values will not be grouped into any bucket of a range aggregation and will be ignored
216+
- The default ELSE clause will use the string literal ``"null"`` instead of actual NULL values
217+
210218
Example::
211219

212220
os> source=accounts | eval result = case(age > 35, firstname, age < 30, lastname else employer) | fields result, firstname, lastname, age, employer

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 116 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_LOGS;
1111
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_NESTED_SIMPLE;
1212
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_STRINGS;
13+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_TIME_DATA;
1314
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WEBLOGS;
1415
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WORKER;
1516
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_WORK_INFORMATION;
@@ -18,6 +19,7 @@
1819

1920
import java.io.IOException;
2021
import java.util.Locale;
22+
import org.junit.Assume;
2123
import org.junit.Ignore;
2224
import org.junit.Test;
2325
import org.opensearch.sql.ppl.ExplainIT;
@@ -512,22 +514,6 @@ public void testExplainStatsWithSubAggregation() throws IOException {
512514
+ " @timestamp, region"));
513515
}
514516

515-
@Test
516-
public void bucketNullableNotSupportSubAggregation() throws IOException {
517-
// TODO: Don't throw exception after addressing
518-
// https://github.com/opensearch-project/sql/issues/4317
519-
// When bucketNullable is true, sub aggregation is not supported. Hence we cannot pushdown the
520-
// aggregation in this query. Caused by issue
521-
// https://github.com/opensearch-project/sql/issues/4317,
522-
// bin aggregation on timestamp field won't work if not been push down.
523-
enabledOnlyWhenPushdownIsEnabled();
524-
assertThrows(
525-
Exception.class,
526-
() ->
527-
explainQueryToString(
528-
"source=events | bin @timestamp bins=3 | stats count() by @timestamp, region"));
529-
}
530-
531517
@Test
532518
public void testExplainBinWithSpan() throws IOException {
533519
String expected = loadExpectedPlan("explain_bin_span.yaml");
@@ -1169,4 +1155,118 @@ public void testReplaceCommandExplain() throws IOException {
11691155
"source=%s | replace 'IL' WITH 'Illinois' IN state | fields state",
11701156
TEST_INDEX_ACCOUNT)));
11711157
}
1158+
1159+
@Test
1160+
public void testCasePushdownAsRangeQueryExplain() throws IOException {
1161+
// CASE 1: Range - Metric
1162+
// 1.1 Range - Metric
1163+
assertYamlEqualsIgnoreId(
1164+
loadExpectedPlan("agg_range_metric_push.yaml"),
1165+
explainQueryYaml(
1166+
String.format(
1167+
"source=%s | eval age_range = case(age < 30, 'u30', age < 40, 'u40' else 'u100') |"
1168+
+ " stats avg(age) as avg_age by age_range",
1169+
TEST_INDEX_BANK)));
1170+
1171+
// 1.2 Range - Metric (COUNT)
1172+
assertYamlEqualsIgnoreId(
1173+
loadExpectedPlan("agg_range_count_push.yaml"),
1174+
explainQueryYaml(
1175+
String.format(
1176+
"source=%s | eval age_range = case(age < 30, 'u30', age >= 30 and age < 40, 'u40'"
1177+
+ " else 'u100') | stats avg(age) by age_range",
1178+
TEST_INDEX_BANK)));
1179+
1180+
// 1.3 Range - Range - Metric
1181+
assertYamlEqualsIgnoreId(
1182+
loadExpectedPlan("agg_range_range_metric_push.yaml"),
1183+
explainQueryYaml(
1184+
String.format(
1185+
"source=%s | eval age_range = case(age < 30, 'u30', age < 40, 'u40' else 'u100'),"
1186+
+ " balance_range = case(balance < 20000, 'medium' else 'high') | stats"
1187+
+ " avg(balance) as avg_balance by age_range, balance_range",
1188+
TEST_INDEX_BANK)));
1189+
1190+
// 1.5 Should not be pushed because the range is not closed-open
1191+
assertYamlEqualsIgnoreId(
1192+
loadExpectedPlan("agg_case_cannot_push.yaml"),
1193+
explainQueryYaml(
1194+
String.format(
1195+
"source=%s | eval age_range = case(age < 30, 'u30', age >= 30 and age <= 40, 'u40'"
1196+
+ " else 'u100') | stats avg(age) as avg_age by age_range",
1197+
TEST_INDEX_BANK)));
1198+
1199+
// 1.6 Should not be pushed as range query because the result expression is not a string
1200+
// literal.
1201+
// Range aggregation keys must be strings
1202+
assertYamlEqualsIgnoreId(
1203+
loadExpectedPlan("agg_case_num_res_cannot_push.yaml"),
1204+
explainQueryYaml(
1205+
String.format(
1206+
"source=%s | eval age_range = case(age < 30, 30 else 100) | stats count() by"
1207+
+ " age_range",
1208+
TEST_INDEX_BANK)));
1209+
1210+
// CASE 2: Composite - Range - Metric
1211+
// 2.1 Composite (term) - Range - Metric
1212+
assertYamlEqualsIgnoreId(
1213+
loadExpectedPlan("agg_composite_range_metric_push.yaml"),
1214+
explainQueryYaml(
1215+
String.format(
1216+
"source=%s | eval age_range = case(age < 30, 'u30' else 'a30') | stats avg(balance)"
1217+
+ " by state, age_range",
1218+
TEST_INDEX_BANK)));
1219+
1220+
// 2.2 Composite (date histogram) - Range - Metric
1221+
assertYamlEqualsIgnoreId(
1222+
loadExpectedPlan("agg_composite_date_range_push.yaml"),
1223+
explainQueryYaml(
1224+
"source=opensearch-sql_test_index_time_data | eval value_range = case(value < 7000,"
1225+
+ " 'small' else 'large') | stats avg(value) by value_range, span(@timestamp,"
1226+
+ " 1h)"));
1227+
1228+
// 2.3 Composite(2 fields) - Range - Metric (with count)
1229+
assertYamlEqualsIgnoreId(
1230+
loadExpectedPlan("agg_composite2_range_count_push.yaml"),
1231+
explainQueryYaml(
1232+
String.format(
1233+
"source=%s | eval age_range = case(age < 30, 'u30' else 'a30') | stats"
1234+
+ " avg(balance), count() by age_range, state, gender",
1235+
TEST_INDEX_BANK)));
1236+
1237+
// 2.4 Composite (2 fields) - Range - Range - Metric (with count)
1238+
assertYamlEqualsIgnoreId(
1239+
loadExpectedPlan("agg_composite2_range_range_count_push.yaml"),
1240+
explainQueryYaml(
1241+
String.format(
1242+
"source=%s | eval age_range = case(age < 35, 'u35' else 'a35'), balance_range ="
1243+
+ " case(balance < 20000, 'medium' else 'high') | stats avg(balance) as"
1244+
+ " avg_balance by age_range, balance_range, state",
1245+
TEST_INDEX_BANK)));
1246+
1247+
// 2.5 Should not be pushed down as range query because case result expression is not constant
1248+
assertYamlEqualsIgnoreId(
1249+
loadExpectedPlan("agg_case_composite_cannot_push.yaml"),
1250+
explainQueryYaml(
1251+
String.format(
1252+
"source=%s | eval age_range = case(age < 35, 'u35' else email) | stats avg(balance)"
1253+
+ " as avg_balance by age_range, state",
1254+
TEST_INDEX_BANK)));
1255+
}
1256+
1257+
@Test
1258+
public void testNestedAggregationsExplain() throws IOException {
1259+
// TODO: Remove after resolving: https://github.com/opensearch-project/sql/issues/4578
1260+
Assume.assumeFalse(
1261+
"The query runs into error when pushdown is disabled due to bin's implementation",
1262+
isPushdownDisabled());
1263+
assertYamlEqualsIgnoreId(
1264+
loadExpectedPlan("agg_composite_autodate_range_metric_push.yaml"),
1265+
explainQueryYaml(
1266+
String.format(
1267+
"source=%s | bin timestamp bins=3 | eval value_range = case(value < 7000, 'small'"
1268+
+ " else 'great') | stats bucket_nullable=false avg(value), count() by"
1269+
+ " timestamp, value_range, category",
1270+
TEST_INDEX_TIME_DATA)));
1271+
}
11721272
}

0 commit comments

Comments
 (0)