Skip to content

Commit 6295eff

Browse files
authored
Merge pull request #1 from databendcloud/feat/support-avro-converter
feat: support avro converter and fix some bugs
2 parents 979f901 + ed35875 commit 6295eff

File tree

23 files changed

+717
-281
lines changed

23 files changed

+717
-281
lines changed

.github/workflows/ci.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
name: Cron Datax test
1+
name: TEST CI
22

33
on:
44
pull_request:
55
branches: [main]
66

77
jobs:
8-
build-datax:
8+
build-connect:
99
runs-on: [self-hosted, X64, Linux, 8c16g]
1010
steps:
1111
- name: Checkout

pom.xml

Lines changed: 132 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
</licenses>
4242

4343
<properties>
44+
<maven.compiler.source>1.8</maven.compiler.source>
45+
<maven.compiler.target>1.8</maven.compiler.target>
4446
<derby.version>10.14.2.0</derby.version>
4547
<commons-io.version>2.4</commons-io.version>
4648
<kafka.connect.maven.plugin.version>0.11.1</kafka.connect.maven.plugin.version>
@@ -64,6 +66,26 @@
6466
<testcontainers.version>1.17.3</testcontainers.version>
6567
<databend.jdbc.driver.version>0.1.2</databend.jdbc.driver.version>
6668
</properties>
69+
<repositories>
70+
<repository>
71+
<id>confluent</id>
72+
<name>Confluent</name>
73+
<url>https://packages.confluent.io/maven/</url>
74+
</repository>
75+
76+
<repository>
77+
<id>cloudera-repo</id>
78+
<url>
79+
https://repository.cloudera.com/content/repositories/releases/
80+
</url>
81+
<releases>
82+
<enabled>true</enabled>
83+
</releases>
84+
<snapshots>
85+
<enabled>true</enabled>
86+
</snapshots>
87+
</repository>
88+
</repositories>
6789

6890
<!-- <repositories>-->
6991
<!-- <repository>-->
@@ -271,7 +293,103 @@
271293
<dependency>
272294
<groupId>com.databend</groupId>
273295
<artifactId>databend-jdbc</artifactId>
274-
<version>0.1.2</version>
296+
<version>0.2.7</version>
297+
</dependency>
298+
<dependency>
299+
<groupId>org.apache.avro</groupId>
300+
<artifactId>avro</artifactId>
301+
<version>1.11.3</version>
302+
<exclusions>
303+
<exclusion>
304+
<groupId>com.fasterxml.jackson.core</groupId>
305+
<artifactId>jackson-core</artifactId>
306+
</exclusion>
307+
<exclusion>
308+
<groupId>com.fasterxml.jackson.core</groupId>
309+
<artifactId>jackson-databind</artifactId>
310+
</exclusion>
311+
<exclusion>
312+
<groupId>org.apache.commons</groupId>
313+
<artifactId>commons-compress</artifactId>
314+
</exclusion>
315+
<exclusion>
316+
<groupId>org.slf4j</groupId>
317+
<artifactId>slf4j-api</artifactId>
318+
</exclusion>
319+
</exclusions>
320+
</dependency>
321+
<dependency>
322+
<groupId>io.confluent</groupId>
323+
<artifactId>kafka-avro-serializer</artifactId>
324+
<version>7.2.1</version>
325+
<exclusions>
326+
<exclusion>
327+
<groupId>io.confluent</groupId>
328+
<artifactId>common-utils</artifactId>
329+
</exclusion>
330+
</exclusions>
331+
</dependency>
332+
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-connect-avro-converter -->
333+
<dependency>
334+
<groupId>io.confluent</groupId>
335+
<artifactId>kafka-connect-avro-converter</artifactId>
336+
<version>7.2.1</version>
337+
</dependency>
338+
<dependency>
339+
<groupId>com.google.guava</groupId>
340+
<artifactId>guava</artifactId>
341+
<version>32.0.1-jre</version>
342+
</dependency>
343+
<dependency>
344+
<groupId>com.google.guava</groupId>
345+
<artifactId>failureaccess</artifactId>
346+
<version>1.0</version>
347+
</dependency>
348+
<dependency>
349+
<groupId>io.confluent</groupId>
350+
<artifactId>kafka-schema-registry-client</artifactId>
351+
<version>7.2.1</version>
352+
<exclusions>
353+
<exclusion>
354+
<groupId>io.confluent</groupId>
355+
<artifactId>common-utils</artifactId>
356+
</exclusion>
357+
<exclusion>
358+
<groupId>io.confluent</groupId>
359+
<artifactId>common-config</artifactId>
360+
</exclusion>
361+
<exclusion>
362+
<groupId>io.swagger</groupId>
363+
<artifactId>swagger-annotations</artifactId>
364+
</exclusion>
365+
<exclusion>
366+
<groupId>io.swagger</groupId>
367+
<artifactId>swagger-core</artifactId>
368+
</exclusion>
369+
</exclusions>
370+
</dependency>
371+
<dependency>
372+
<groupId>org.apache.avro</groupId>
373+
<artifactId>avro</artifactId>
374+
<version>1.11.3</version>
375+
<exclusions>
376+
<exclusion>
377+
<groupId>com.fasterxml.jackson.core</groupId>
378+
<artifactId>jackson-core</artifactId>
379+
</exclusion>
380+
<exclusion>
381+
<groupId>com.fasterxml.jackson.core</groupId>
382+
<artifactId>jackson-databind</artifactId>
383+
</exclusion>
384+
<exclusion>
385+
<groupId>org.apache.commons</groupId>
386+
<artifactId>commons-compress</artifactId>
387+
</exclusion>
388+
<exclusion>
389+
<groupId>org.slf4j</groupId>
390+
<artifactId>slf4j-api</artifactId>
391+
</exclusion>
392+
</exclusions>
275393
</dependency>
276394
</dependencies>
277395

@@ -280,14 +398,10 @@
280398
<plugin>
281399
<groupId>org.apache.maven.plugins</groupId>
282400
<artifactId>maven-compiler-plugin</artifactId>
283-
<inherited>true</inherited>
401+
<version>3.7.0</version>
284402
<configuration>
285-
<compilerArgs>
286-
<arg>-Xlint:all</arg>
287-
<arg>-Werror</arg>
288-
</compilerArgs>
289-
<source>8</source>
290-
<target>8</target>
403+
<source>1.8</source>
404+
<target>1.8</target>
291405
</configuration>
292406
</plugin>
293407
<plugin>
@@ -418,16 +532,16 @@
418532
<!-- </execution>-->
419533
<!-- </executions>-->
420534
<!-- </plugin>-->
421-
<plugin>
422-
<groupId>org.apache.maven.plugins</groupId>
423-
<artifactId>maven-release-plugin</artifactId>
424-
<version>3.0.1</version>
425-
<configuration>
426-
<autoVersionSubmodules>true</autoVersionSubmodules>
427-
<remoteTagging>false</remoteTagging>
428-
<tagNameFormat>v@{project.version}</tagNameFormat>
429-
</configuration>
430-
</plugin>
535+
<!-- <plugin>-->
536+
<!-- <groupId>org.apache.maven.plugins</groupId>-->
537+
<!-- <artifactId>maven-release-plugin</artifactId>-->
538+
<!-- <version>3.0.1</version>-->
539+
<!-- <configuration>-->
540+
<!-- <autoVersionSubmodules>true</autoVersionSubmodules>-->
541+
<!-- <remoteTagging>false</remoteTagging>-->
542+
<!-- <tagNameFormat>v@{project.version}</tagNameFormat>-->
543+
<!-- </configuration>-->
544+
<!-- </plugin>-->
431545
</plugins>
432546
<resources>
433547
<resource>

src/main/java/com/databend/kafka/connect/databendclient/TableDefinitions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ public TableDefinition get(
4141
) throws SQLException {
4242
TableDefinition dbTable = cache.get(tableId);
4343
if (dbTable == null) {
44+
log.info("Table {} not found in cache; checking database", tableId);
4445
if (dialect.tableExists(connection, tableId)) {
46+
log.info("Table {} exists in database", tableId);
4547
dbTable = dialect.describeTable(connection, tableId);
4648
if (dbTable != null) {
4749
log.info("Setting metadata for table {} to {}", tableId, dbTable);

src/main/java/com/databend/kafka/connect/sink/BufferedRecords.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public BufferedRecords(
5959
}
6060

6161
public List<SinkRecord> add(SinkRecord record) throws SQLException, TableAlterOrCreateException {
62+
log.info("Adding record to buffer: {}", record);
6263
recordValidator.validate(record);
6364
final List<SinkRecord> flushed = new ArrayList<>();
6465

src/main/java/com/databend/kafka/connect/sink/DatabendClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,7 @@ public boolean tableExists(
439439
String[] tableTypes = tableTypes(metadata, this.tableTypes);
440440
String tableTypeDisplay = displayableTableTypes(tableTypes, "/");
441441
glog.info("Checking {} dialect for existence of {} {}", this, tableTypeDisplay, tableId);
442+
glog.info("catalogName is {}, schemaName is {}, tableName is {}", tableId.catalogName(),tableId.schemaName(), tableId.tableName());
442443
try (ResultSet rs = connection.getMetaData().getTables(
443444
tableId.catalogName(),
444445
tableId.schemaName(),
@@ -1399,7 +1400,7 @@ public String buildCreateTableStatement(
13991400
SQLExpressionBuilder builder = expressionBuilder();
14001401

14011402
final List<String> pkFieldNames = extractPrimaryKeyFieldNames(fields);
1402-
builder.append("CREATE TABLE ");
1403+
builder.append("CREATE TABLE IF NOT EXISTS ");
14031404
builder.append(table);
14041405
builder.append(" (");
14051406
writeColumnsSpec(builder, fields);

src/main/java/com/databend/kafka/connect/sink/DatabendSinkTask.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ public class DatabendSinkTask extends SinkTask {
2828

2929
boolean shouldTrimSensitiveLogs;
3030

31+
public DatabendSinkTask() {
32+
}
33+
3134
@Override
3235
public void start(final Map<String, String> props) {
3336
log.info("Starting Databend Sink task");
@@ -53,19 +56,41 @@ void initWriter() {
5356
log.info("Databend writer initialized");
5457
}
5558

59+
@Override
60+
public void open(final Collection<TopicPartition> partitions) {
61+
// This method is called when the task's assigned partitions are changed.
62+
// You can initialize resources related to the assigned partitions here.
63+
// For now, we are just logging the assigned partitions.
64+
65+
log.info("Opening Databend Sink task for the following partitions:");
66+
for (TopicPartition partition : partitions) {
67+
log.info("Partition: {}", partition);
68+
}
69+
}
70+
71+
@Override
72+
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) throws RetriableException {
73+
// You can add any processing you need to do before committing the offsets here.
74+
// For now, we are just returning the offsets as is.
75+
return offsets;
76+
}
77+
5678
@Override
5779
public void put(Collection<SinkRecord> records) {
80+
log.info("###: {}", records);
81+
log.info("Received {} records", records.size());
5882
if (records.isEmpty()) {
5983
return;
6084
}
6185
final SinkRecord first = records.iterator().next();
6286
final int recordsCount = records.size();
63-
log.debug(
87+
log.info(
6488
"Received {} records. First record kafka coordinates:({}-{}-{}). Writing them to the "
6589
+ "database...",
6690
recordsCount, first.topic(), first.kafkaPartition(), first.kafkaOffset()
6791
);
6892
try {
93+
log.info("Writing {} records", records.size());
6994
writer.write(records);
7095
} catch (TableAlterOrCreateException tace) {
7196
if (reporter != null) {

src/main/java/com/databend/kafka/connect/sink/DatabendWriter.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,17 @@ protected void onConnect(final Connection connection) throws SQLException {
4646
void write(final Collection<SinkRecord> records)
4747
throws SQLException, TableAlterOrCreateException {
4848
final Connection connection = cachedConnectionProvider.getConnection();
49+
log.info("DatabendWriter Writing {} records", records.size());
50+
log.info("DatabendWriter Writing records is: {}", records);
4951
try {
5052
final Map<TableIdentity, BufferedRecords> bufferByTable = new HashMap<>();
5153
for (SinkRecord record : records) {
54+
log.info("DatabendWriter Writing record keySchema is: {}", record.keySchema());
55+
log.info("DatabendWriter Writing record valueSchema is: {}", record.valueSchema().fields());
56+
log.info("DatabendWriter Writing record key is: {}", record.key());
57+
log.info("DatabendWriter Writing record value is: {}", record.value());
58+
log.info("DatabendWriter Writing record topic is: {}", record.topic());
59+
log.info("DatabendWriter Writing record timestamp is: {}", record.timestamp());
5260
final TableIdentity tableId = destinationTable(record.topic());
5361
BufferedRecords buffer = bufferByTable.get(tableId);
5462
if (buffer == null) {
@@ -64,7 +72,7 @@ void write(final Collection<SinkRecord> records)
6472
buffer.flush();
6573
buffer.close();
6674
}
67-
connection.commit();
75+
// connection.commit();
6876
} catch (SQLException | TableAlterOrCreateException e) {
6977
// e.addSuppressed(e);
7078
throw e;

src/main/java/com/databend/kafka/connect/sink/DbStructure.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ boolean amendIfNecessary(
111111
final int maxRetries
112112
) throws SQLException, TableAlterOrCreateException {
113113
final TableDefinition tableDefn = tableDefns.get(connection, tableId);
114+
log.info("tableDefn: {}", tableDefn);
115+
log.info("Amending table {} with fieldsMetadata: {}", tableId, fieldsMetadata);
114116

115117
final Set<SinkRecordField> missingFields = missingFields(
116118
fieldsMetadata.allFields.values(),

src/main/java/com/databend/kafka/connect/sink/metadata/FieldsMetadata.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import org.apache.kafka.connect.data.Field;
55
import org.apache.kafka.connect.data.Schema;
66
import org.apache.kafka.connect.errors.ConnectException;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
79

810
import java.util.*;
911

@@ -12,6 +14,7 @@ public class FieldsMetadata {
1214
public final Set<String> keyFieldNames;
1315
public final Set<String> nonKeyFieldNames;
1416
public final Map<String, SinkRecordField> allFields;
17+
private static final Logger LOGGER = LoggerFactory.getLogger(DatabendSinkConfig.class);
1518

1619
// visible for testing
1720
public FieldsMetadata(
@@ -86,6 +89,7 @@ public static FieldsMetadata extract(
8689
}
8790

8891
final Set<String> nonKeyFieldNames = new LinkedHashSet<>();
92+
LOGGER.info("@@Value schema is: {}", valueSchema);
8993
if (valueSchema != null) {
9094
for (Field field : valueSchema.fields()) {
9195
if (keyFieldNames.contains(field.name())) {
@@ -246,6 +250,9 @@ private static void extractRecordValuePk(
246250
DatabendSinkConfig.PrimaryKeyMode.RECORD_VALUE)
247251
);
248252
}
253+
254+
LOGGER.info("Value schema is: {}", valueSchema.toString());
255+
LOGGER.info("Value fields are: {}", valueSchema.fields());
249256
if (configuredPkFields.isEmpty()) {
250257
for (Field keyField : valueSchema.fields()) {
251258
keyFieldNames.add(keyField.name());
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.databend.kafka.connect.sink.records;
2+
3+
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
4+
import java.util.Map;
5+
6+
class AvroConverterConfig extends AbstractKafkaAvroSerDeConfig {
7+
AvroConverterConfig(final Map<?, ?> props) {
8+
super(baseConfigDef(), props);
9+
}
10+
}

0 commit comments

Comments
 (0)