Skip to content

Commit b68e966

Browse files
authored
Merge pull request #3 from databendcloud/feat/support-schemaless-sync
feat: support schemaless data sync
2 parents c65032d + 6c8feaf commit b68e966

File tree

21 files changed

+569
-35
lines changed

21 files changed

+569
-35
lines changed

.github/workflows/ci.yaml

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,33 @@
11
name: TEST CI
22

33
on:
4+
push:
5+
branches:
6+
- main
7+
- master
48
pull_request:
5-
branches: [main]
9+
branches:
10+
- main
11+
- master
612

713
jobs:
814
build-connect:
9-
runs-on: [self-hosted, X64, Linux, 8c16g]
15+
runs-on: ubuntu-latest
16+
services:
17+
databend:
18+
image: datafuselabs/databend
19+
env:
20+
QUERY_DEFAULT_USER: databend
21+
QUERY_DEFAULT_PASSWORD: databend
22+
MINIO_ENABLED: true
23+
# options: >-
24+
# --health-cmd "curl -fs http://localhost:8000/v1/health || exit 1"
25+
# --health-interval 10s
26+
# --health-timeout 5s
27+
# --health-retries 5
28+
ports:
29+
- 8000:8000
30+
- 9000:9000
1031
steps:
1132
- name: Checkout
1233
uses: actions/checkout@v3
@@ -23,7 +44,7 @@ jobs:
2344
maven-version: 3.8.2
2445

2546
- name: Maven Build
26-
run: mvn -passembly -Dmaven.test.skip package
47+
run: mvn -Passembly -Dmaven.test.skip package
2748

2849
- name: Databend Kafka Integration test
2950
run: mvn test -D test=DatabendSinkIT

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@
293293
<dependency>
294294
<groupId>com.databend</groupId>
295295
<artifactId>databend-jdbc</artifactId>
296-
<version>0.2.7</version>
296+
<version>0.2.8</version>
297297
</dependency>
298298
<dependency>
299299
<groupId>org.apache.avro</groupId>

src/main/java/com/databend/kafka/connect/sink/DatabendClient.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ public boolean tableExists(
439439
String[] tableTypes = tableTypes(metadata, this.tableTypes);
440440
String tableTypeDisplay = displayableTableTypes(tableTypes, "/");
441441
glog.info("Checking {} dialect for existence of {} {}", this, tableTypeDisplay, tableId);
442-
glog.info("catalogName is {}, schemaName is {}, tableName is {}", tableId.catalogName(),tableId.schemaName(), tableId.tableName());
442+
glog.info("catalogName is {}, schemaName is {}, tableName is {}", tableId.catalogName(), tableId.schemaName(), tableId.tableName());
443443
try (ResultSet rs = connection.getMetaData().getTables(
444444
tableId.catalogName(),
445445
tableId.schemaName(),
@@ -1622,15 +1622,17 @@ protected String getSqlType(SinkRecordField field) {
16221622
case INT64:
16231623
return "BIGINT";
16241624
case FLOAT32:
1625-
return "REAL";
1625+
return "FLOAT";
16261626
case FLOAT64:
1627-
return "DOUBLE PRECISION";
1627+
return "DOUBLE";
16281628
case BOOLEAN:
16291629
return "BOOLEAN";
16301630
case STRING:
1631-
return "TEXT";
1632-
case BYTES:
1633-
return "BYTEA";
1631+
return "STRING";
1632+
case MAP:
1633+
return "STRING";
1634+
case STRUCT:
1635+
return "VARIANT";
16341636
case ARRAY:
16351637
SinkRecordField childField = new SinkRecordField(
16361638
field.schema().valueSchema(),

src/main/java/com/databend/kafka/connect/sink/DatabendSinkTask.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.databend.kafka.connect.sink;
22

33
import com.databend.kafka.connect.databendclient.DatabendConnection;
4+
import com.databend.kafka.connect.sink.records.Record;
5+
import com.databend.kafka.connect.sink.records.SchemaType;
46
import com.databend.kafka.connect.util.Version;
57
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
68
import org.apache.kafka.common.TopicPartition;
@@ -15,7 +17,9 @@
1517
import java.sql.SQLException;
1618
import java.util.Collection;
1719
import java.util.Collections;
20+
import java.util.List;
1821
import java.util.Map;
22+
import java.util.stream.Collectors;
1923

2024
public class DatabendSinkTask extends SinkTask {
2125
private static final Logger log = LoggerFactory.getLogger(DatabendSinkConfig.class);
@@ -75,6 +79,10 @@ public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, Offs
7579
return offsets;
7680
}
7781

82+
public boolean isSchemaless(Record record) {
83+
return record.getSchemaType().equals(SchemaType.SCHEMA_LESS);
84+
}
85+
7886
@Override
7987
public void put(Collection<SinkRecord> records) {
8088
log.info("###: {}", records);
@@ -89,6 +97,20 @@ public void put(Collection<SinkRecord> records) {
8997
+ "database...",
9098
recordsCount, first.topic(), first.kafkaPartition(), first.kafkaOffset()
9199
);
100+
101+
// isSchemaless
102+
if (isSchemaless(Record.convert(first))) {
103+
try {
104+
log.info("Writing {} schemaless records", records.size());
105+
writer.writeSchemaLessData(records.stream().map(Record::convert).collect(Collectors.toList()));
106+
} catch (SQLException | TableAlterOrCreateException e) {
107+
log.error("Error while writing records to Databend", e);
108+
throw new ConnectException(e);
109+
}
110+
log.info("Successfully wrote {} schemaless records", records.size());
111+
return;
112+
}
113+
92114
try {
93115
log.info("Writing {} records", records.size());
94116
writer.write(records);

src/main/java/com/databend/kafka/connect/sink/DatabendWriter.java

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,25 @@
33
import com.databend.kafka.connect.databendclient.CachedConnectionProvider;
44
import com.databend.kafka.connect.databendclient.DatabendConnection;
55
import com.databend.kafka.connect.databendclient.TableIdentity;
6+
import com.databend.kafka.connect.sink.records.Data;
7+
import com.databend.kafka.connect.sink.records.Record;
8+
import com.fasterxml.jackson.core.JsonProcessingException;
9+
import com.fasterxml.jackson.databind.ObjectMapper;
610
import org.apache.kafka.connect.errors.ConnectException;
711
import org.apache.kafka.connect.sink.SinkRecord;
812
import org.slf4j.Logger;
913
import org.slf4j.LoggerFactory;
1014

15+
import java.io.ByteArrayInputStream;
16+
import java.io.InputStream;
17+
import java.nio.charset.StandardCharsets;
1118
import java.sql.Connection;
1219
import java.sql.SQLException;
20+
import java.time.LocalDateTime;
1321
import java.util.Collection;
1422
import java.util.HashMap;
1523
import java.util.Map;
24+
import java.util.UUID;
1625

1726
public class DatabendWriter {
1827
private static final Logger log = LoggerFactory.getLogger(DatabendWriter.class);
@@ -43,11 +52,101 @@ protected void onConnect(final Connection connection) throws SQLException {
4352
};
4453
}
4554

55+
public void writeSchemaLessData(final Collection<Record> records) throws SQLException, TableAlterOrCreateException {
56+
final com.databend.jdbc.DatabendConnection connection = (com.databend.jdbc.DatabendConnection) cachedConnectionProvider.getConnection();
57+
log.info("DatabendWriter Writing {} records", records.size());
58+
// new ObjectMapper
59+
ObjectMapper objectMapper = new ObjectMapper();
60+
61+
StringBuilder sb = new StringBuilder();
62+
try {
63+
TableIdentity tableId = null;
64+
for (Record record : records) {
65+
tableId = destinationTable(record.getTopic());
66+
Map<String, Data> recordMap = record.getJsonMap();
67+
68+
// 创建一个新的 Map 来存储转换后的数据
69+
Map<String, Object> transformedMap = new HashMap<>();
70+
71+
for (Map.Entry<String, Data> entry : recordMap.entrySet()) {
72+
String key = entry.getKey();
73+
Data data = entry.getValue();
74+
75+
// Check the field type and handle the object accordingly
76+
Object value;
77+
switch (data.getFieldType()) {
78+
case INT8:
79+
case INT16:
80+
case INT32:
81+
case INT64:
82+
log.info("DatabendWriter Writing record int data");
83+
value = Integer.parseInt(data.getObject().toString());
84+
break;
85+
case FLOAT32:
86+
case FLOAT64:
87+
value = Double.parseDouble(data.getObject().toString());
88+
break;
89+
case BOOLEAN:
90+
value = Boolean.parseBoolean(data.getObject().toString());
91+
break;
92+
case STRING:
93+
log.info("DatabendWriter Writing record string data");
94+
value = data.getObject().toString();
95+
break;
96+
default:
97+
log.info("DatabendWriter Writing record string data");
98+
value = data.getObject().toString();
99+
break;
100+
}
101+
102+
// Add the processed value to the map
103+
transformedMap.put(key, value);
104+
}
105+
log.info("DatabendWriter Writing transformedMap is: {}", transformedMap);
106+
107+
String json = objectMapper.writeValueAsString(transformedMap);
108+
sb.append(json).append("\n");
109+
}
110+
String jsonStr = sb.toString();
111+
// log.info("DatabendWriter Writing jsonStr is: {}", jsonStr);
112+
String uuid = UUID.randomUUID().toString();
113+
String stagePrefix = String.format("%s/%s/%s/%s/%s/%s/%s/",
114+
LocalDateTime.now().getYear(),
115+
LocalDateTime.now().getMonthValue(),
116+
LocalDateTime.now().getDayOfMonth(),
117+
LocalDateTime.now().getHour(),
118+
LocalDateTime.now().getMinute(),
119+
LocalDateTime.now().getSecond(),
120+
uuid);
121+
InputStream inputStream = new ByteArrayInputStream(jsonStr.getBytes(StandardCharsets.UTF_8));
122+
String fileName = String.format("%s.%s", uuid, "ndjson");
123+
connection.uploadStream("~", stagePrefix, inputStream, fileName, jsonStr.length(), false);
124+
assert tableId != null;
125+
String copyIntoSQL = String.format(
126+
"COPY INTO %s FROM %s FILE_FORMAT = (type = NDJSON missing_field_as = FIELD_DEFAULT COMPRESSION = AUTO) " +
127+
"PURGE = %b FORCE = %b DISABLE_VARIANT_CHECK = %b",
128+
tableId,
129+
String.format("@~/%s/%s", stagePrefix, fileName),
130+
true,
131+
true,
132+
true
133+
);
134+
try {
135+
connection.createStatement().execute(copyIntoSQL);
136+
} catch (Exception e) {
137+
log.error("DatabendWriter writeSchemaLessData error: {}", e);
138+
}
139+
} catch (TableAlterOrCreateException e) {
140+
throw e;
141+
} catch (JsonProcessingException e) {
142+
throw new RuntimeException(e);
143+
}
144+
}
145+
46146
void write(final Collection<SinkRecord> records)
47147
throws SQLException, TableAlterOrCreateException {
48148
final Connection connection = cachedConnectionProvider.getConnection();
49149
log.info("DatabendWriter Writing {} records", records.size());
50-
log.info("DatabendWriter Writing records is: {}", records);
51150
try {
52151
final Map<TableIdentity, BufferedRecords> bufferByTable = new HashMap<>();
53152
for (SinkRecord record : records) {
@@ -56,7 +155,6 @@ void write(final Collection<SinkRecord> records)
56155
log.info("DatabendWriter Writing record valueSchema is: {}", record.valueSchema().fields());
57156
}
58157
log.info("DatabendWriter Writing record key is: {}", record.key());
59-
log.info("DatabendWriter Writing record value is: {}", record.value());
60158
log.info("DatabendWriter Writing record topic is: {}", record.topic());
61159
log.info("DatabendWriter Writing record timestamp is: {}", record.timestamp());
62160
final TableIdentity tableId = destinationTable(record.topic());
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.databend.kafka.connect.sink.kafka;
2+
3+
public class OffsetContainer extends TopicPartitionContainer {
4+
private long offset;
5+
6+
7+
public OffsetContainer(String topic, int partition, long offset) {
8+
super(topic, partition);
9+
this.offset = offset;
10+
}
11+
12+
public long getOffset() {
13+
return offset;
14+
}
15+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.databend.kafka.connect.sink.kafka;
2+
3+
public class TopicPartitionContainer {
4+
5+
protected String topic;
6+
protected int partition;
7+
8+
public TopicPartitionContainer(String topic, int partition) {
9+
this.topic = topic;
10+
this.partition = partition;
11+
}
12+
13+
public String getTopic() {
14+
return topic;
15+
}
16+
17+
public int getPartition() {
18+
return partition;
19+
}
20+
21+
public String getTopicAndPartitionKey() {
22+
return String.format("%s-%d", topic, partition);
23+
}
24+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.databend.kafka.connect.sink.records;
2+
3+
import org.apache.kafka.connect.data.Schema;
4+
5+
public class Data {
6+
private Schema.Type fieldType;
7+
private Object object;
8+
9+
public Data(Schema.Type fieldType, Object object) {
10+
this.fieldType = fieldType;
11+
this.object = object;
12+
}
13+
14+
public Schema.Type getFieldType() {
15+
return fieldType;
16+
}
17+
18+
public Object getObject() {
19+
return object;
20+
}
21+
22+
@Override
23+
public String toString() {
24+
if (object == null) {
25+
return null;
26+
}
27+
return object.toString();
28+
}
29+
public String getJsonType() {
30+
switch (fieldType) {
31+
case INT8:
32+
case INT16:
33+
case INT32:
34+
case INT64:
35+
return "integer";
36+
case FLOAT32:
37+
case FLOAT64:
38+
return "number";
39+
case BOOLEAN:
40+
return "boolean";
41+
case STRING:
42+
return "string";
43+
default:
44+
return "string";
45+
}
46+
}
47+
}
48+
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.databend.kafka.connect.sink.records;
2+
3+
import com.databend.kafka.connect.sink.kafka.OffsetContainer;
4+
import org.apache.kafka.connect.data.Field;
5+
import org.apache.kafka.connect.sink.SinkRecord;
6+
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
10+
public class EmptyRecordConvertor implements RecordConvertor {
11+
@Override
12+
public Record convert(SinkRecord sinkRecord) {
13+
String topic = sinkRecord.topic();
14+
int partition = sinkRecord.kafkaPartition().intValue();
15+
long offset = sinkRecord.kafkaOffset();
16+
List<Field> fields = new ArrayList<>();
17+
return new Record(SchemaType.SCHEMA_LESS, new OffsetContainer(topic, partition, offset), fields, null, sinkRecord);
18+
}
19+
}

0 commit comments

Comments
 (0)