Skip to content

Commit 6512714

Browse files
authored
Merge pull request #4 from databendcloud/fix/sync-json-schemaless
fix: json schemaless data sync failed
2 parents b68e966 + 3593243 commit 6512714

File tree

3 files changed

+34
-13
lines changed

3 files changed

+34
-13
lines changed

src/main/java/com/databend/kafka/connect/sink/DatabendWriter.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ protected void onConnect(final Connection connection) throws SQLException {
5555
public void writeSchemaLessData(final Collection<Record> records) throws SQLException, TableAlterOrCreateException {
5656
final com.databend.jdbc.DatabendConnection connection = (com.databend.jdbc.DatabendConnection) cachedConnectionProvider.getConnection();
5757
log.info("DatabendWriter Writing {} records", records.size());
58-
// new ObjectMapper
5958
ObjectMapper objectMapper = new ObjectMapper();
6059

6160
StringBuilder sb = new StringBuilder();
@@ -65,7 +64,7 @@ public void writeSchemaLessData(final Collection<Record> records) throws SQLExce
6564
tableId = destinationTable(record.getTopic());
6665
Map<String, Data> recordMap = record.getJsonMap();
6766

68-
// 创建一个新的 Map 来存储转换后的数据
67+
// create a new map to store the transformed data
6968
Map<String, Object> transformedMap = new HashMap<>();
7069

7170
for (Map.Entry<String, Data> entry : recordMap.entrySet()) {
@@ -79,7 +78,6 @@ public void writeSchemaLessData(final Collection<Record> records) throws SQLExce
7978
case INT16:
8079
case INT32:
8180
case INT64:
82-
log.info("DatabendWriter Writing record int data");
8381
value = Integer.parseInt(data.getObject().toString());
8482
break;
8583
case FLOAT32:
@@ -90,11 +88,9 @@ public void writeSchemaLessData(final Collection<Record> records) throws SQLExce
9088
value = Boolean.parseBoolean(data.getObject().toString());
9189
break;
9290
case STRING:
93-
log.info("DatabendWriter Writing record string data");
9491
value = data.getObject().toString();
9592
break;
9693
default:
97-
log.info("DatabendWriter Writing record string data");
9894
value = data.getObject().toString();
9995
break;
10096
}
@@ -107,34 +103,59 @@ public void writeSchemaLessData(final Collection<Record> records) throws SQLExce
107103
String json = objectMapper.writeValueAsString(transformedMap);
108104
sb.append(json).append("\n");
109105
}
106+
107+
// if there are no records to write, return
108+
if (sb.length() == 0) {
109+
log.info("No records to write");
110+
return;
111+
}
112+
110113
String jsonStr = sb.toString();
111-
// log.info("DatabendWriter Writing jsonStr is: {}", jsonStr);
114+
115+
// create uuid for the stage path
112116
String uuid = UUID.randomUUID().toString();
113-
String stagePrefix = String.format("%s/%s/%s/%s/%s/%s/%s/",
117+
String stagePath = String.format("%s/%s/%s/%s/%s/%s/%s",
114118
LocalDateTime.now().getYear(),
115119
LocalDateTime.now().getMonthValue(),
116120
LocalDateTime.now().getDayOfMonth(),
117121
LocalDateTime.now().getHour(),
118122
LocalDateTime.now().getMinute(),
119123
LocalDateTime.now().getSecond(),
120124
uuid);
121-
InputStream inputStream = new ByteArrayInputStream(jsonStr.getBytes(StandardCharsets.UTF_8));
122-
String fileName = String.format("%s.%s", uuid, "ndjson");
123-
connection.uploadStream("~", stagePrefix, inputStream, fileName, jsonStr.length(), false);
125+
126+
String fileName = String.format("%s.ndjson", uuid);
127+
128+
byte[] jsonBytes = jsonStr.getBytes(StandardCharsets.UTF_8);
129+
InputStream inputStream = new ByteArrayInputStream(jsonBytes);
130+
int contentLength = jsonBytes.length;
131+
132+
log.info("Uploading data, file size: {} bytes", contentLength);
133+
134+
connection.uploadStream("~", stagePath, inputStream, fileName, contentLength, false);
135+
124136
assert tableId != null;
137+
138+
String stagePlusFileName = String.format("@~/%s/%s", stagePath, fileName);
139+
140+
log.info("Copying data from stage: {}", stagePlusFileName);
141+
125142
String copyIntoSQL = String.format(
126143
"COPY INTO %s FROM %s FILE_FORMAT = (type = NDJSON missing_field_as = FIELD_DEFAULT COMPRESSION = AUTO) " +
127144
"PURGE = %b FORCE = %b DISABLE_VARIANT_CHECK = %b",
128145
tableId,
129-
String.format("@~/%s/%s", stagePrefix, fileName),
146+
stagePlusFileName,
130147
true,
131148
true,
132149
true
133150
);
151+
134152
try {
153+
log.info("Executing COPY INTO SQL: {}", copyIntoSQL);
135154
connection.createStatement().execute(copyIntoSQL);
155+
log.info("COPY INTO completed successfully");
136156
} catch (Exception e) {
137157
log.error("DatabendWriter writeSchemaLessData error: {}", e);
158+
throw e; // throw the exception to the caller
138159
}
139160
} catch (TableAlterOrCreateException e) {
140161
throw e;

target/components/packages/databendCloud-databend-kafka-connect-0.0.1-SNAPSHOT/databendCloud-databend-kafka-connect-0.0.1-SNAPSHOT/manifest.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,5 @@
3131
"url" : "https://www.apache.org/licenses/LICENSE-2.0"
3232
} ],
3333
"component_types" : [ "sink" ],
34-
"release_date" : "2024-08-06"
34+
"release_date" : "2025-03-18"
3535
}

target/manifest.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,5 @@
3131
"url" : "https://www.apache.org/licenses/LICENSE-2.0"
3232
} ],
3333
"component_types" : [ "sink" ],
34-
"release_date" : "2024-08-06"
34+
"release_date" : "2025-03-18"
3535
}

0 commit comments

Comments
 (0)