Skip to content

Commit e422dbb

Browse files
committed
[FLINK-38733] Add new SplitterEnumerator on JdbcSource
1 parent 59dea79 commit e422dbb

File tree

5 files changed

+9
-7
lines changed

5 files changed

+9
-7
lines changed

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void start() {
8787
} else {
8888
context.callAsync(
8989
() ->
90-
splitterEnumerator.hasFinishSplits()
90+
splitterEnumerator.isAllSplitsFinished()
9191
? Collections.emptyList()
9292
: splitterEnumerator.enumerateSplits(),
9393
(List<JdbcSourceSplit> splits, Throwable error) ->

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/PreparedSplitterEnumerator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class PreparedSplitterEnumerator implements SplitterEnumerator {
4444

4545
private final String sqlTemplate;
4646
private final Serializable[][] sqlParameters;
47-
private Boolean finished;
47+
private boolean finished;
4848

4949
protected PreparedSplitterEnumerator(String sqlTemplate, Serializable[][] sqlParameters) {
5050
this.sqlTemplate = Preconditions.checkNotNull(sqlTemplate);
@@ -68,7 +68,7 @@ public void start(JdbcConnectionProvider connectionProvider) {}
6868
public void close() {}
6969

7070
@Override
71-
public boolean hasFinishSplits() {
71+
public boolean isAllSplitsFinished() {
7272
return this.finished;
7373
}
7474

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/SlideTimingSplitterEnumerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public static SlideTimingSplitterEnumerator of(
5252
}
5353

5454
@Override
55-
public boolean hasFinishSplits() {
55+
public boolean isAllSplitsFinished() {
5656
return false;
5757
}
5858

flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/splitter/SplitterEnumerator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.connector.jdbc.core.datastream.source.enumerator.splitter;
2020

21+
import org.apache.flink.annotation.PublicEvolving;
2122
import org.apache.flink.connector.jdbc.core.datastream.source.split.JdbcSourceSplit;
2223
import org.apache.flink.connector.jdbc.datasource.connections.JdbcConnectionProvider;
2324

@@ -29,21 +30,22 @@
2930
import java.util.function.Supplier;
3031

3132
/** Interface for jdbc sql split enumerator. */
33+
@PublicEvolving
3234
public interface SplitterEnumerator extends AutoCloseable, Serializable {
3335

3436
void start(JdbcConnectionProvider connectionProvider);
3537

3638
void close();
3739

38-
boolean hasFinishSplits();
40+
boolean isAllSplitsFinished();
3941

4042
List<JdbcSourceSplit> enumerateSplits();
4143

4244
default List<JdbcSourceSplit> enumerateSplits(@Nonnull Supplier<Boolean> splitGettable) {
4345
return enumerateSplits(splitGettable.get());
4446
}
4547

46-
default List<JdbcSourceSplit> enumerateSplits(Boolean splitGettable) {
48+
default List<JdbcSourceSplit> enumerateSplits(boolean splitGettable) {
4749
if (!splitGettable) {
4850
return Collections.emptyList();
4951
}

flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/core/datastream/source/enumerator/JdbcSourceEnumeratorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void start(JdbcConnectionProvider connectionProvider) {}
114114
public void close() {}
115115

116116
@Override
117-
public boolean hasFinishSplits() {
117+
public boolean isAllSplitsFinished() {
118118
return false;
119119
}
120120

0 commit comments

Comments
 (0)