Skip to content

Commit f9e0c15

Browse files
[FLINK-26820] Cassandra Sink V2 Implementation (FLIP-533) (Phase-1)
1 parent 84c3fa0 commit f9e0c15

File tree

74 files changed

+14246
-3
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+14246
-3
lines changed

flink-connector-cassandra/archunit-violations/dcfaa83d-a12c-48e1-9e51-b8d3808cd287

Lines changed: 75 additions & 3 deletions
Large diffs are not rendered by default.
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.cassandra.sink;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.api.connector.sink2.Sink;
23+
import org.apache.flink.api.connector.sink2.SinkWriter;
24+
import org.apache.flink.connector.cassandra.sink.config.CassandraSinkConfig;
25+
import org.apache.flink.connector.cassandra.sink.config.write.RequestConfiguration;
26+
import org.apache.flink.connector.cassandra.sink.exception.CassandraFailureHandler;
27+
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
28+
29+
import java.io.IOException;
30+
31+
/**
32+
* A generic Flink {@link Sink} implementation for writing records to Apache Cassandra.
33+
*
34+
* <p>This sink supports multiple input types including POJOs, Tuples, Rows, and Scala Products.
35+
*
36+
* <p>Use {@link CassandraSinkBuilder} to create and configure instances of this sink.
37+
*
38+
* @param <INPUT> The input record type
39+
*/
40+
@PublicEvolving
41+
public class CassandraSink<INPUT> implements Sink<INPUT> {
42+
43+
private final CassandraSinkConfig<INPUT> sinkConfig;
44+
private final ClusterBuilder clusterBuilder;
45+
private final CassandraFailureHandler failureHandler;
46+
private final RequestConfiguration requestConfiguration;
47+
48+
/**
49+
* Package-private constructor - use {@link CassandraSinkBuilder} to create instances.
50+
*
51+
* @param sinkConfig The sink-specific configuration
52+
* @param clusterBuilder Factory for building Cassandra cluster connections
53+
* @param failureHandler Handler for retriable and fatal failures
54+
* @param requestConfiguration Settings for concurrency, timeouts, and retries
55+
*/
56+
CassandraSink(
57+
CassandraSinkConfig<INPUT> sinkConfig,
58+
ClusterBuilder clusterBuilder,
59+
CassandraFailureHandler failureHandler,
60+
RequestConfiguration requestConfiguration) {
61+
this.sinkConfig = sinkConfig;
62+
this.clusterBuilder = clusterBuilder;
63+
this.failureHandler = failureHandler;
64+
this.requestConfiguration = requestConfiguration;
65+
}
66+
67+
/**
68+
* Creates a new {@link SinkWriter} for this sink.
69+
*
70+
* @param context the sink context providing access to MailboxExecutor and metrics
71+
* @return a new CassandraSinkWriter instance
72+
* @throws IOException if writer creation fails
73+
*/
74+
@Override
75+
public SinkWriter<INPUT> createWriter(InitContext context) throws IOException {
76+
return new CassandraSinkWriter<>(
77+
this.requestConfiguration,
78+
this.sinkConfig,
79+
this.clusterBuilder,
80+
this.failureHandler,
81+
context.getMailboxExecutor(),
82+
context.metricGroup());
83+
}
84+
85+
// Package-private getters for testing
86+
CassandraSinkConfig<INPUT> getSinkConfig() {
87+
return sinkConfig;
88+
}
89+
90+
ClusterBuilder getClusterBuilder() {
91+
return clusterBuilder;
92+
}
93+
94+
CassandraFailureHandler getFailureHandler() {
95+
return failureHandler;
96+
}
97+
98+
RequestConfiguration getRequestConfiguration() {
99+
return requestConfiguration;
100+
}
101+
}

0 commit comments

Comments
 (0)