Skip to content

Commit c1f23f4

Browse files
author
Grzegorz Kołakowski
committed
[FLINK-30702] Introduce ElasticDialect
1 parent 5ec3873 commit c1f23f4

File tree

10 files changed

+828
-1
lines changed

10 files changed

+828
-1
lines changed

flink-connector-jdbc/pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,26 @@ under the License.
195195
<scope>test</scope>
196196
</dependency>
197197

198+
<!-- Elastic tests -->
199+
<dependency>
200+
<groupId>org.testcontainers</groupId>
201+
<artifactId>elasticsearch</artifactId>
202+
<version>1.16.2</version>
203+
<scope>test</scope>
204+
</dependency>
205+
<dependency>
206+
<groupId>org.elasticsearch.plugin</groupId>
207+
<artifactId>x-pack-sql-jdbc</artifactId>
208+
<version>8.6.0</version>
209+
<scope>test</scope>
210+
</dependency>
211+
<dependency>
212+
<groupId>com.squareup.okhttp3</groupId>
213+
<artifactId>okhttp</artifactId>
214+
<version>4.10.0</version>
215+
<scope>test</scope>
216+
</dependency>
217+
198218
<!-- ArchUit test dependencies -->
199219

200220
<dependency>
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.dialect.elastic;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
23+
import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
24+
import org.apache.flink.connector.jdbc.internal.converter.ElasticRowConverter;
25+
import org.apache.flink.table.types.logical.LogicalTypeRoot;
26+
import org.apache.flink.table.types.logical.RowType;
27+
28+
import java.util.EnumSet;
29+
import java.util.Optional;
30+
import java.util.Set;
31+
32+
/** JDBC dialect for Elastic. */
33+
@Internal
34+
public class ElasticDialect extends AbstractDialect {
35+
36+
private static final long serialVersionUID = 1L;
37+
38+
// Define MAX/MIN precision of TIMESTAMP type according to Elastic docs:
39+
// https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html
40+
private static final int MIN_TIMESTAMP_PRECISION = 0;
41+
private static final int MAX_TIMESTAMP_PRECISION = 9;
42+
43+
@Override
44+
public String dialectName() {
45+
return "Elasticsearch";
46+
}
47+
48+
@Override
49+
public Optional<String> defaultDriverName() {
50+
return Optional.of("org.elasticsearch.xpack.sql.jdbc.EsDriver");
51+
}
52+
53+
@Override
54+
public Set<LogicalTypeRoot> supportedTypes() {
55+
// The list of types supported by Elastic SQL.
56+
// https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html
57+
return EnumSet.of(
58+
LogicalTypeRoot.BIGINT,
59+
LogicalTypeRoot.BOOLEAN,
60+
LogicalTypeRoot.DATE,
61+
LogicalTypeRoot.DOUBLE,
62+
LogicalTypeRoot.INTEGER,
63+
LogicalTypeRoot.FLOAT,
64+
LogicalTypeRoot.SMALLINT,
65+
LogicalTypeRoot.TINYINT,
66+
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
67+
LogicalTypeRoot.VARBINARY,
68+
LogicalTypeRoot.VARCHAR);
69+
}
70+
71+
@Override
72+
public Optional<Range> timestampPrecisionRange() {
73+
return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
74+
}
75+
76+
@Override
77+
public JdbcRowConverter getRowConverter(RowType rowType) {
78+
return new ElasticRowConverter(rowType);
79+
}
80+
81+
@Override
82+
public String getLimitClause(long limit) {
83+
return "LIMIT " + limit;
84+
}
85+
86+
@Override
87+
public String quoteIdentifier(String identifier) {
88+
return '"' + identifier + '"';
89+
}
90+
91+
@Override
92+
public Optional<String> getUpsertStatement(
93+
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
94+
throw new UnsupportedOperationException("Upsert is not supported.");
95+
}
96+
97+
@Override
98+
public String getInsertIntoStatement(String tableName, String[] fieldNames) {
99+
throw new UnsupportedOperationException("Insert into is not supported.");
100+
}
101+
102+
@Override
103+
public String getUpdateStatement(
104+
String tableName, String[] fieldNames, String[] conditionFields) {
105+
throw new UnsupportedOperationException("Update is not supported.");
106+
}
107+
108+
@Override
109+
public String getDeleteStatement(String tableName, String[] conditionFields) {
110+
throw new UnsupportedOperationException("Delete is not supported.");
111+
}
112+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.dialect.elastic;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
23+
import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
24+
25+
/** Factory for {@link ElasticDialect}. */
26+
@Internal
27+
public class ElasticDialectFactory implements JdbcDialectFactory {
28+
@Override
29+
public boolean acceptsURL(String url) {
30+
return url.startsWith("jdbc:elasticsearch:") || url.startsWith("jdbc:es:");
31+
}
32+
33+
@Override
34+
public JdbcDialect create() {
35+
return new ElasticDialect();
36+
}
37+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.internal.converter;
20+
21+
import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
22+
import org.apache.flink.table.types.logical.LogicalType;
23+
import org.apache.flink.table.types.logical.RowType;
24+
25+
/**
26+
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
27+
* Elastic.
28+
*/
29+
public class ElasticRowConverter extends AbstractJdbcRowConverter {
30+
31+
private static final long serialVersionUID = 1L;
32+
33+
@Override
34+
public String converterName() {
35+
return "Elasticsearch";
36+
}
37+
38+
public ElasticRowConverter(RowType rowType) {
39+
super(rowType);
40+
}
41+
42+
@Override
43+
protected JdbcDeserializationConverter createInternalConverter(LogicalType type) {
44+
switch (type.getTypeRoot()) {
45+
case TINYINT:
46+
case DOUBLE:
47+
case FLOAT:
48+
return val -> val;
49+
default:
50+
return super.createInternalConverter(type);
51+
}
52+
}
53+
}

flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@ org.apache.flink.connector.jdbc.dialect.mysql.MySqlDialectFactory
1818
org.apache.flink.connector.jdbc.dialect.psql.PostgresDialectFactory
1919
org.apache.flink.connector.jdbc.dialect.oracle.OracleDialectFactory
2020
org.apache.flink.connector.jdbc.dialect.sqlserver.SqlServerDialectFactory
21+
org.apache.flink.connector.jdbc.dialect.elastic.ElasticDialectFactory

flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcDataTypeTest.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,19 @@ public static List<TestItem> testData() {
119119
createTestItem("oracle", "TIMESTAMP(3)"),
120120
createTestItem("oracle", "TIMESTAMP WITHOUT TIME ZONE"),
121121
createTestItem("oracle", "VARBINARY"),
122+
createTestItem("elasticsearch", "BIGINT"),
123+
createTestItem("elasticsearch", "BOOLEAN"),
124+
createTestItem("elasticsearch", "DATE"),
125+
createTestItem("elasticsearch", "DOUBLE"),
126+
createTestItem("elasticsearch", "INTEGER"),
127+
createTestItem("elasticsearch", "FLOAT"),
128+
createTestItem("elasticsearch", "SMALLINT"),
129+
createTestItem("elasticsearch", "TINYINT"),
130+
createTestItem("elasticsearch", "TIMESTAMP(3)"),
131+
createTestItem("elasticsearch", "TIMESTAMP(9)"),
132+
createTestItem("elasticsearch", "TIMESTAMP WITHOUT TIME ZONE"),
133+
createTestItem("elasticsearch", "VARBINARY"),
134+
createTestItem("elasticsearch", "VARCHAR"),
122135

123136
// Unsupported types throws errors.
124137
createTestItem(
@@ -168,7 +181,15 @@ public static List<TestItem> testData() {
168181
createTestItem(
169182
"oracle",
170183
"VARBINARY(10)",
171-
"The Oracle dialect doesn't support type: VARBINARY(10)."));
184+
"The Oracle dialect doesn't support type: VARBINARY(10)."),
185+
createTestItem(
186+
"elasticsearch",
187+
"TIME(0)",
188+
"The Elasticsearch dialect doesn't support type: TIME(0)."),
189+
createTestItem(
190+
"elasticsearch",
191+
"TIMESTAMP_LTZ(3)",
192+
"The Elasticsearch dialect doesn't support type: TIMESTAMP_LTZ(3)."));
172193
}
173194

174195
private static TestItem createTestItem(Object... args) {

0 commit comments

Comments
 (0)