Skip to content

Commit a4af9a1

Browse files
committed
[FLINK-34468][Connector/Cassandra] Adding support for Flink 1.19
1 parent 997a12e commit a4af9a1

File tree

6 files changed

+6
-27
lines changed

6 files changed

+6
-27
lines changed

.github/workflows/push_pr.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ jobs:
2525
compile_and_test:
2626
strategy:
2727
matrix:
28-
flink: [ 1.19.0 ]
28+
flink: [ 1.20.0 ]
2929
include:
30+
- flink: 1.19.0
3031
- flink: 1.18.1
3132

3233
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
.eslintcache
22
.cache
3+
.java-version
34
scalastyle-output.xml
45
.classpath
56
.idea/*
Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,4 @@
11
Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (CassandraSource.java:138)
2-
Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSource.java:124)
3-
Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSource.java:125)
4-
Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (CassandraSource.java:126)
5-
Constructor <org.apache.flink.connector.cassandra.source.CassandraSource.<init>(org.apache.flink.streaming.connectors.cassandra.ClusterBuilder, long, java.lang.Class, java.lang.String, org.apache.flink.streaming.connectors.cassandra.MapperOptions)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.String, [Ljava.lang.Object;)> in (CassandraSource.java:127)
6-
Method <org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (CassandraSource.java:145)
7-
Method <org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (CassandraSource.java:149)
82
Method <org.apache.flink.connector.cassandra.source.CassandraSource.checkQueryValidity(java.lang.String)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (CassandraSource.java:0)
93
Method <org.apache.flink.connector.cassandra.source.reader.CassandraSplitReader.generateRangeQuery(java.lang.String, java.lang.String)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (CassandraSplitReader.java:0)
10-
Method <org.apache.flink.connector.cassandra.source.split.SplitsGenerator.estimateTableSize()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SplitsGenerator.java:0)
4+
Method <org.apache.flink.connector.cassandra.source.split.SplitsGenerator.estimateTableSize()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SplitsGenerator.java:0)

flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,6 @@ public void testGenerateSplitsWithTooHighMaximumSplitSize(
176176
}
177177

178178
// overridden to use unordered checks
179-
@Override
180179
protected void checkResultWithSemantic(
181180
CloseableIterator<Pojo> resultIterator,
182181
List<List<Pojo>> testData,
@@ -197,36 +196,31 @@ protected void checkResultWithSemantic(
197196
}
198197

199198
@Disabled("Not a unbounded source")
200-
@Override
201199
public void testSourceMetrics(
202200
TestEnvironment testEnv,
203201
DataStreamSourceExternalContext<Pojo> externalContext,
204202
CheckpointingMode semantic)
205203
throws Exception {}
206204

207205
@Disabled("Not a unbounded source")
208-
@Override
209206
public void testSavepoint(
210207
TestEnvironment testEnv,
211208
DataStreamSourceExternalContext<Pojo> externalContext,
212209
CheckpointingMode semantic) {}
213210

214211
@Disabled("Not a unbounded source")
215-
@Override
216212
public void testScaleUp(
217213
TestEnvironment testEnv,
218214
DataStreamSourceExternalContext<Pojo> externalContext,
219215
CheckpointingMode semantic) {}
220216

221217
@Disabled("Not a unbounded source")
222-
@Override
223218
public void testScaleDown(
224219
TestEnvironment testEnv,
225220
DataStreamSourceExternalContext<Pojo> externalContext,
226221
CheckpointingMode semantic) {}
227222

228223
@Disabled("Not a unbounded source")
229-
@Override
230224
public void testTaskManagerFailure(
231225
TestEnvironment testEnv,
232226
DataStreamSourceExternalContext<Pojo> externalContext,

flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.apache.flink.streaming.api.datastream.DataStreamSource;
4343
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
4444
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
45-
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
4645
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
4746
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
4847
import org.apache.flink.testutils.junit.extensions.retry.RetryExtension;
@@ -80,10 +79,7 @@
8079
@SuppressWarnings("serial")
8180
@Testcontainers
8281
@ExtendWith(RetryExtension.class)
83-
class CassandraConnectorITCase
84-
extends WriteAheadSinkTestBase<
85-
Tuple3<String, Integer, Integer>,
86-
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>>> {
82+
class CassandraConnectorITCase {
8783

8884
private static final CassandraTestEnvironment cassandraTestEnvironment =
8985
new CassandraTestEnvironment(false);
@@ -284,7 +280,6 @@ void testAnnotatePojoWithTable() {
284280
// Exactly-once Tests
285281
// ------------------------------------------------------------------------
286282

287-
@Override
288283
protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createSink()
289284
throws Exception {
290285
return new CassandraTupleWriteAheadSink<>(
@@ -295,17 +290,14 @@ protected CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> createS
295290
new CassandraCommitter(cassandraTestEnvironment.getBuilderForReading()));
296291
}
297292

298-
@Override
299293
protected TupleTypeInfo<Tuple3<String, Integer, Integer>> createTypeInfo() {
300294
return TupleTypeInfo.getBasicTupleTypeInfo(String.class, Integer.class, Integer.class);
301295
}
302296

303-
@Override
304297
protected Tuple3<String, Integer, Integer> generateValue(int counter, int checkpointID) {
305298
return new Tuple3<>(UUID.randomUUID().toString(), counter, checkpointID);
306299
}
307300

308-
@Override
309301
protected void verifyResultsIdealCircumstances(
310302
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
311303

@@ -325,7 +317,6 @@ protected void verifyResultsIdealCircumstances(
325317
.isEmpty();
326318
}
327319

328-
@Override
329320
protected void verifyResultsDataPersistenceUponMissedNotify(
330321
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
331322

@@ -345,7 +336,6 @@ protected void verifyResultsDataPersistenceUponMissedNotify(
345336
.isEmpty();
346337
}
347338

348-
@Override
349339
protected void verifyResultsDataDiscardingUponRestore(
350340
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) {
351341

@@ -368,7 +358,6 @@ protected void verifyResultsDataDiscardingUponRestore(
368358
.isEmpty();
369359
}
370360

371-
@Override
372361
protected void verifyResultsWhenReScaling(
373362
CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink,
374363
int startElementCounter,

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ under the License.
4242
</scm>
4343

4444
<properties>
45-
<flink.version>1.18.0</flink.version>
45+
<flink.version>1.20.0</flink.version>
4646
<japicmp.referenceVersion>3.1.0-1.17</japicmp.referenceVersion>
4747
<guava.version>19.0</guava.version>
4848
</properties>

0 commit comments

Comments
 (0)