Skip to content

Commit 99e38ae

Browse files
authored
Pushdown distinct count approx (#4614)
* enable pushdown distinct count approx Signed-off-by: xinyual <xinyual@amazon.com> * enable pushdown distinct count approx Signed-off-by: xinyual <xinyual@amazon.com> * fix IT Signed-off-by: xinyual <xinyual@amazon.com> * fix IT Signed-off-by: xinyual <xinyual@amazon.com> --------- Signed-off-by: xinyual <xinyual@amazon.com>
1 parent e76754c commit 99e38ae

File tree

6 files changed

+32
-6
lines changed

6 files changed

+32
-6
lines changed

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,19 @@ public void testExplainOnAggregationWithSumEnhancement() throws IOException {
617617
TEST_INDEX_BANK)));
618618
}
619619

620+
@Test
621+
public void testStatsDistinctCountApproxFunctionExplainWithPushDown() throws IOException {
622+
enabledOnlyWhenPushdownIsEnabled();
623+
String query =
624+
"source=opensearch-sql_test_index_account | stats distinct_count_approx(state) as"
625+
+ " distinct_states by gender";
626+
var result = explainQueryToString(query);
627+
String expected =
628+
loadFromFile(
629+
"expectedOutput/calcite/explain_agg_with_distinct_count_approx_enhancement.json");
630+
assertJsonEqualsIgnoreId(expected, result);
631+
}
632+
620633
@Test
621634
public void testExplainRegexMatchInWhereWithScriptPushdown() throws IOException {
622635
enabledOnlyWhenPushdownIsEnabled();
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite":{
3+
"logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(distinct_states=[$1], gender=[$0])\n LogicalAggregate(group=[{0}], distinct_states=[DISTINCT_COUNT_APPROX($1)])\n LogicalProject(gender=[$4], state=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical":"CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},distinct_states=DISTINCT_COUNT_APPROX($1)), PROJECT->[distinct_states, gender], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"distinct_states\":{\"cardinality\":{\"field\":\"state.keyword\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
3-
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[APPROX_DISTINCT_COUNT($7) OVER ()])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4-
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(aggs [APPROX_DISTINCT_COUNT($7)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[DISTINCT_COUNT_APPROX($7) OVER ()])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(aggs [DISTINCT_COUNT_APPROX($7)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
55
}
66
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
3-
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[APPROX_DISTINCT_COUNT($7) OVER (PARTITION BY $4)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4-
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(partition {4} aggs [APPROX_DISTINCT_COUNT($7)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], balance=[$3], gender=[$4], city=[$5], employer=[$6], state=[$7], age=[$8], email=[$9], lastname=[$10], distinct_states=[DISTINCT_COUNT_APPROX($7) OVER (PARTITION BY $4)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableWindow(window#0=[window(partition {4} aggs [DISTINCT_COUNT_APPROX($7)])])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[account_number, firstname, address, balance, gender, city, employer, state, age, email, lastname]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"balance\",\"gender\",\"city\",\"employer\",\"state\",\"age\",\"email\",\"lastname\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
55
}
66
}

opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package org.opensearch.sql.opensearch.executor;
77

8+
import static org.opensearch.sql.expression.function.BuiltinFunctionName.DISTINCT_COUNT_APPROX;
9+
810
import java.security.AccessController;
911
import java.security.PrivilegedAction;
1012
import java.sql.PreparedStatement;
@@ -282,10 +284,10 @@ private void registerOpenSearchFunctions() {
282284
SqlUserDefinedAggFunction approxDistinctCountFunction =
283285
UserDefinedFunctionUtils.createUserDefinedAggFunction(
284286
DistinctCountApproxAggFunction.class,
285-
"APPROX_DISTINCT_COUNT",
287+
DISTINCT_COUNT_APPROX.toString(),
286288
ReturnTypes.BIGINT_FORCE_NULLABLE,
287289
null);
288290
PPLFuncImpTable.INSTANCE.registerExternalAggOperator(
289-
BuiltinFunctionName.DISTINCT_COUNT_APPROX, approxDistinctCountFunction);
291+
DISTINCT_COUNT_APPROX, approxDistinctCountFunction);
290292
}
291293
}

opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,11 @@ yield switch (functionName) {
495495
}
496496
yield Pair.of(aggBuilder, new SinglePercentileParser(aggFieldName));
497497
}
498+
case DISTINCT_COUNT_APPROX -> Pair.of(
499+
helper.build(
500+
!args.isEmpty() ? args.getFirst() : null,
501+
AggregationBuilders.cardinality(aggFieldName)),
502+
new SingleValueParser(aggFieldName));
498503
default -> throw new AggregateAnalyzer.AggregateAnalyzerException(
499504
String.format("Unsupported push-down aggregator %s", aggCall.getAggregation()));
500505
};

0 commit comments

Comments
 (0)