Skip to content

Commit 0f4a58f

Browse files
author
gituser
committed
Merge branch 'hotfix_1.10_4.1.x_1434' into 1.10_release_4.1.x
2 parents a4d5912 + d108f23 commit 0f4a58f

File tree

7 files changed

+528
-5
lines changed

7 files changed

+528
-5
lines changed

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
<properties>
1818
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
1919
<project.package.name>core</project.package.name>
20-
<jackson.version>2.7.9</jackson.version>
20+
<jackson.version>2.11.3</jackson.version>
2121
<guava.version>19.0</guava.version>
2222
<logger.tool.version>1.0.0-SNAPSHOT</logger.tool.version>
2323
</properties>

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaSource.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,18 @@ protected Properties getKafkaProperties(KafkaSourceTableInfo kafkaSourceTableInf
6363
if (StringUtils.isNotBlank(kafkaSourceTableInfo.getGroupId())) {
6464
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaSourceTableInfo.getGroupId());
6565
}
66-
66+
if("kafka".equalsIgnoreCase(kafkaSourceTableInfo.getType())){
67+
String keyDeserializer = kafkaSourceTableInfo.getKafkaParam(KafkaSourceTableInfo.KEY_DESERIALIZER);
68+
if(StringUtils.isNotBlank(keyDeserializer)){
69+
kafkaSourceTableInfo.putKafkaParam(KafkaSourceTableInfo.KEY_DESERIALIZER, KafkaSourceTableInfo.DT_DESERIALIZER_CLASS_NAME);
70+
kafkaSourceTableInfo.putKafkaParam(KafkaSourceTableInfo.DT_KEY_DESERIALIZER, keyDeserializer);
71+
}
72+
String valueDeserializer = kafkaSourceTableInfo.getKafkaParam(KafkaSourceTableInfo.VALUE_DESERIALIZER);
73+
if(StringUtils.isNotBlank(valueDeserializer)){
74+
kafkaSourceTableInfo.putKafkaParam(KafkaSourceTableInfo.VALUE_DESERIALIZER, KafkaSourceTableInfo.DT_DESERIALIZER_CLASS_NAME);
75+
kafkaSourceTableInfo.putKafkaParam(KafkaSourceTableInfo.DT_VALUE_DESERIALIZER, valueDeserializer);
76+
}
77+
}
6778
for (String key : kafkaSourceTableInfo.getKafkaParamKeys()) {
6879
props.setProperty(key, kafkaSourceTableInfo.getKafkaParam(key));
6980
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,16 @@ public class KafkaSourceTableInfo extends AbstractSourceTableInfo {
5757

5858
public static final String TIMESTAMP_OFFSET = "timestampOffset";
5959

60+
public static final String KEY_DESERIALIZER = "key.deserializer";
61+
62+
public static final String DT_KEY_DESERIALIZER = "dt.key.deserializer";
63+
64+
public static final String VALUE_DESERIALIZER = "value.deserializer";
65+
66+
public static final String DT_VALUE_DESERIALIZER = "dt.value.deserializer";
67+
68+
public static final String DT_DESERIALIZER_CLASS_NAME = "com.dtstack.flink.sql.source.kafka.deserializer.DtKafkaDeserializer";
69+
6070
private String bootstrapServers;
6171

6272
private String topic;
@@ -127,6 +137,10 @@ public String getKafkaParam(String key) {
127137
return kafkaParams.get(key);
128138
}
129139

140+
public void putKafkaParam(String key, String value) {
141+
kafkaParams.put(key, value);
142+
}
143+
130144
public Set<String> getKafkaParamKeys() {
131145
return kafkaParams.keySet();
132146
}

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package com.dtstack.flink.sql.source.kafka;
2020

21+
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
22+
import org.apache.commons.lang3.StringUtils;
2123
import org.apache.flink.api.common.serialization.DeserializationSchema;
2224
import org.apache.flink.api.common.typeinfo.TypeInformation;
2325
import org.apache.flink.metrics.MetricGroup;
@@ -32,9 +34,6 @@
3234
import org.apache.flink.types.Row;
3335
import org.apache.flink.util.SerializedValue;
3436

35-
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
36-
import org.apache.commons.lang3.StringUtils;
37-
3837
import java.util.Arrays;
3938
import java.util.Map;
4039
import java.util.Properties;
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.dtstack.flink.sql.source.kafka.deserializer;
19+
20+
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
21+
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import org.apache.kafka.common.errors.SerializationException;
23+
import org.apache.kafka.common.header.Headers;
24+
import org.apache.kafka.common.serialization.Deserializer;
25+
26+
import java.io.IOException;
27+
import java.nio.charset.StandardCharsets;
28+
import java.util.Map;
29+
30+
/**
31+
* Date: 2021/04/20
32+
* Company: www.dtstack.com
33+
*
34+
* @author tudou
35+
*/
36+
public class DtKafkaDeserializer<T> implements Deserializer<byte[]> {
37+
protected ObjectMapper objectMapper = new ObjectMapper();
38+
private Deserializer<T> deserializer;
39+
40+
@Override
41+
@SuppressWarnings("unchecked")
42+
public void configure(Map<String, ?> configs, boolean isKey) {
43+
String deserializerClassName;
44+
if(isKey){
45+
deserializerClassName = (String)configs.get(KafkaSourceTableInfo.DT_KEY_DESERIALIZER);
46+
}else{
47+
deserializerClassName = (String)configs.get(KafkaSourceTableInfo.DT_VALUE_DESERIALIZER);
48+
}
49+
try {
50+
this.deserializer = (Deserializer<T>)Class.forName(deserializerClassName).newInstance();
51+
} catch (Exception e) {
52+
throw new RuntimeException("Can't create instance: " + deserializerClassName, e);
53+
}
54+
this.deserializer.configure(configs, isKey);
55+
}
56+
57+
@Override
58+
public byte[] deserialize(String topic, Headers headers, byte[] data) {
59+
return toBytes(deserializer.deserialize(topic, headers, data));
60+
}
61+
62+
@Override
63+
public byte[] deserialize(String topic, byte[] data) {
64+
return toBytes(deserializer.deserialize(topic, data));
65+
}
66+
67+
/**
68+
* T value to byte[]
69+
* @param value
70+
* @return
71+
*/
72+
private byte[] toBytes(T value){
73+
if(value instanceof byte[]){
74+
return (byte[])value;
75+
}else{
76+
try {
77+
return this.objectMapper.readValue(this.objectMapper.writeValueAsBytes(value), String.class).getBytes(StandardCharsets.UTF_8);
78+
} catch (IOException e) {
79+
throw new SerializationException("Can't deserialize data", e);
80+
}
81+
}
82+
}
83+
84+
@Override
85+
public void close() {
86+
this.deserializer.close();
87+
}
88+
}

0 commit comments

Comments
 (0)