-
Notifications
You must be signed in to change notification settings - Fork 203
[FLINK-37288] Add Google Cloud Spanner dialect and catalog #156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[FLINK-37288] Add Google Cloud Spanner dialect and catalog #156
Conversation
|
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) |
| this.connectionProperties = Preconditions.checkNotNull(connectionProperties); | ||
| checkArgument( | ||
| !StringUtils.isNullOrWhitespaceOnly(connectionProperties.getProperty(USER_KEY))); | ||
| checkArgument( | ||
| !StringUtils.isNullOrWhitespaceOnly( | ||
| connectionProperties.getProperty(PASSWORD_KEY))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spanner does not use password authentication.
| /** | ||
| * URL has to be without database, like "jdbc:dialect://localhost:1234/" or | ||
| * "jdbc:dialect://localhost:1234" rather than "jdbc:dialect://localhost:1234/db". | ||
| */ | ||
| protected static void validateJdbcUrl(String url) { | ||
| String[] parts = url.trim().split("\\/+"); | ||
|
|
||
| checkArgument(parts.length == 2); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of Spanner, the URL is as follows, so I have deleted this validation.
jdbc:cloudspanner://hostname/projects/gcp_project_id/instances/instance_id/databases/database_id
| public interface SpannerTestBase extends DatabaseTest { | ||
|
|
||
| /** Test for {@link AbstractJdbcCatalog}. */ | ||
| class AbstractJdbcCatalogTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is no longer needed because URL validation has been removed.
| } | ||
|
|
||
| public Schema getTableSchema() { | ||
| public Schema getTableSchema(String pkConstraintName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of Spanner, the contract name of the primary key is different, so it is possible to specify the contract name of the primary key.
c7f0a5a to
3a73e0b
Compare
3a73e0b to
be60494
Compare
|
I have applied code formatting using Spotless and updated the documentation. It is ready for review. |
e4559c9 to
4ec422d
Compare
4ec422d to
520db3b
Compare
| | Db2 | `com.ibm.db2.jcc` | `db2jcc` | [Download](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) | | ||
| | Trino | `io.trino` | `trino-jdbc` | [Download](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) | | ||
| | OceanBase | `com.oceanbase` | `oceanbase-client` | [Download](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/) | | ||
| | Spanner | `com.google.cloud` | `google-cloud-spanner-jdbc` | [Download](https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner-jdbc) | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the download url points to Maven like the spanner doc. How do I get the jar file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have written the same URL as the official document.
The Jar files can be downloaded from the following.
- https://repo1.maven.org/maven2/com/google/cloud/google-cloud-spanner-jdbc/2.27.1/
- https://repo1.maven.org/maven2/com/google/cloud/google-cloud-spanner-jdbc/2.27.1/google-cloud-spanner-jdbc-2.27.1-single-jar-with-dependencies.jar
- https://repo1.maven.org/maven2/com/google/cloud/google-cloud-spanner-jdbc/2.27.1/google-cloud-spanner-jdbc-2.27.1.jar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://repo1.maven.org/maven2/com/google/cloud/google-cloud-spanner-jdbc/2.27.1/google-cloud-spanner-jdbc-2.27.1-single-jar-with-dependencies.jar
It is necessary to use a single JAR file that includes dependencies.
| Data Type Mapping | ||
| ---------------- | ||
| Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2 and OceanBase. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily. | ||
| Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2, OceanBase and Spanner. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits:
connect -> the connection
which uses dialect like -> using dialects e.g.
The Derby dialect usually used for testing purpose. -> The Derby dialect is usually used for testing purpose.
JDBC table -> a JDBC table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed: 3c7d225
|
Does this repository not have an owner? Many PRs adding dialects are being left unreviewed. Does this mean that contributions are not accepted? |
84e20b0 to
939af62
Compare
| // Check if base-url contains query parameters | ||
| int questionMarkIndex = baseUrl.indexOf('?'); | ||
|
|
||
| if (questionMarkIndex == -1) { | ||
| // No parameters: traditional baseUrl + databaseName | ||
| return baseUrl + databaseName; | ||
| } | ||
|
|
||
| // Parameters present: insert database name before '?' | ||
| // Example: "jdbc:postgresql://localhost:5432/?sslmode=require" | ||
| // -> "jdbc:postgresql://localhost:5432/mydb?sslmode=require" | ||
| String urlWithoutParams = baseUrl.substring(0, questionMarkIndex); | ||
| String params = baseUrl.substring(questionMarkIndex); | ||
| return urlWithoutParams + databaseName + params; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Spanner tests, it is necessary to configure autoConfigEmulator to automatically set up the emulator. This enables parameter passing.
| @Override | ||
| public void setArray(int fieldIndex, Array x) throws SQLException { | ||
| for (int index : indexMapping[fieldIndex]) { | ||
| statement.setArray(index, x); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public Array createArrayOf(String typeName, Object[] elements) throws SQLException { | ||
| return connection.createArrayOf(typeName, elements); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These implementations are necessary to support writing to Array types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation introduces the first full ARRAY type write support in the
flink-connector-jdbc project. Currently, PostgreSQL dialect only supports reading
ARRAY types (throws IllegalStateException on write - see
PostgresDialectConverter.java:61-67).
If this change is merged, the implementation pattern here could serve as a reference
for adding ARRAY write support to PostgreSQL and other dialects in the future.
CI Failure InvestigationI investigated the CI failure in the Error DetailsFailed Test: Root Cause Exception: Analysis
Local Test ResultsI ran the same test locally with both Flink versions: Flink 2.1.1: Flink 2.0.1: Both tests passed successfully in the local environment. ConclusionThis appears to be a CI environment-specific issue rather than a code problem:
Recommendation
|
https://issues.apache.org/jira/browse/FLINK-37288