Skip to content

Commit 560e14e

Browse files
author
gituser
committed
Merge branch '1.10_release_4.1.x' into 1.10_release_4.2.x
2 parents b703ac9 + e1144af commit 560e14e

File tree

8 files changed

+579
-40
lines changed

8 files changed

+579
-40
lines changed

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
<properties>
1818
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
1919
<project.package.name>core</project.package.name>
20-
<jackson.version>2.7.9</jackson.version>
20+
<jackson.version>2.11.3</jackson.version>
2121
<guava.version>19.0</guava.version>
2222
<logger.tool.version>1.0.0-SNAPSHOT</logger.tool.version>
2323
</properties>

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ExtendES5ApiCallBridge.java

Lines changed: 53 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import transwarp.org.elasticsearch.action.bulk.BulkProcessor;
3434
import transwarp.org.elasticsearch.client.transport.TransportClient;
3535
import transwarp.org.elasticsearch.common.network.NetworkModule;
36+
import transwarp.org.elasticsearch.common.settings.Setting;
3637
import transwarp.org.elasticsearch.common.settings.Settings;
3738
import transwarp.org.elasticsearch.common.transport.TransportAddress;
3839
import transwarp.org.elasticsearch.common.unit.TimeValue;
@@ -70,28 +71,41 @@ public ExtendES5ApiCallBridge(List<InetSocketAddress> transportAddresses, Elasti
7071
@Override
7172
public TransportClient createClient(Map<String, String> clientConfig) throws IOException{
7273

73-
//1. login kdc with keytab and krb5 conf
74-
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
75-
esTableInfo.getPrincipal(),
76-
esTableInfo.getKeytab(),
77-
esTableInfo.getKrb5conf());
78-
79-
//2. set transwarp attributes
80-
Settings settings = Settings.builder().put(clientConfig)
81-
.put("client.transport.sniff", true)
82-
.put("security.enable", true)
83-
.put(NetworkModule.TRANSPORT_TYPE_KEY, "security-netty3")
84-
.build();
85-
86-
//3. build transport client with transwarp plugins
87-
TransportClient transportClient = ugi.doAs((PrivilegedAction<TransportClient>) () -> {
88-
TransportClient tmpClient = new PreBuiltTransportClient(settings,
89-
Collections.singletonList(DoorKeeperClientPlugin.class));
74+
TransportClient transportClient;
75+
76+
if (esTableInfo.isEnableKrb()) {
77+
//1. login kdc with keytab and krb5 conf
78+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
79+
esTableInfo.getPrincipal(),
80+
esTableInfo.getKeytab(),
81+
esTableInfo.getKrb5conf());
82+
83+
//2. set transwarp attributes
84+
Settings settings = Settings.builder().put(clientConfig)
85+
.put("client.transport.sniff", true)
86+
.put("security.enable", true)
87+
.put(NetworkModule.TRANSPORT_TYPE_KEY, "security-netty3")
88+
.build();
89+
90+
//3. build transport client with transwarp plugins
91+
transportClient = ugi.doAs((PrivilegedAction<TransportClient>) () -> {
92+
TransportClient tmpClient = new PreBuiltTransportClient(settings,
93+
Collections.singletonList(DoorKeeperClientPlugin.class));
94+
for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
95+
tmpClient.addTransportAddress(transport);
96+
}
97+
return tmpClient;
98+
});
99+
} else {
100+
Settings settings = Settings.builder().put(clientConfig)
101+
.put("client.transport.sniff", true)
102+
.build();
103+
104+
transportClient = new PreBuiltTransportClient(settings);
90105
for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
91-
tmpClient.addTransportAddress(transport);
106+
transportClient.addTransportAddress(transport);
92107
}
93-
return tmpClient;
94-
});
108+
}
95109

96110
return transportClient;
97111
}
@@ -140,18 +154,27 @@ public void configureBulkProcessorBackoff(
140154
@Override
141155
public boolean verifyClientConnection(TransportClient client) throws IOException {
142156

143-
//1. login kdc with keytab and krb5 conf
144-
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
145-
esTableInfo.getPrincipal(),
146-
esTableInfo.getKeytab(),
147-
esTableInfo.getKrb5conf());
148157

149-
//2. refresh availableNodes.
150-
boolean verifyResult = ugi.doAs((PrivilegedAction<Boolean>) () -> {
151-
LOG.info("Refresh client available nodes.");
158+
boolean verifyResult = false;
159+
160+
if (esTableInfo.isEnableKrb()) {
161+
//1. login kdc with keytab and krb5 conf
162+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
163+
esTableInfo.getPrincipal(),
164+
esTableInfo.getKeytab(),
165+
esTableInfo.getKrb5conf());
166+
167+
//2. refresh availableNodes.
168+
verifyResult = ugi.doAs((PrivilegedAction<Boolean>) () -> {
169+
LOG.info("Refresh client available nodes.");
170+
client.refreshAvailableNodes();
171+
return client.connectedNodes().isEmpty();
172+
});
173+
} else {
152174
client.refreshAvailableNodes();
153-
return client.connectedNodes().isEmpty();
154-
});
175+
verifyResult = client.connectedNodes().isEmpty();
176+
}
177+
155178

156179
if (!verifyResult) {
157180
return true;

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -160,14 +160,6 @@ public boolean check() {
160160
Preconditions.checkArgument(NumberUtils.isNumber(number), "id must be a numeric type");
161161
});
162162
}
163-
164-
boolean allNotSet =
165-
Strings.isNullOrEmpty(principal) &&
166-
Strings.isNullOrEmpty(keytab) &&
167-
Strings.isNullOrEmpty(krb5conf);
168-
169-
Preconditions.checkState(!allNotSet, "xh's elasticsearch type of kerberos file is required");
170-
171163
return true;
172164
}
173165

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaSource.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,18 @@ protected Properties getKafkaProperties(KafkaSourceTableInfo kafkaSourceTableInf
6363
if (StringUtils.isNotBlank(kafkaSourceTableInfo.getGroupId())) {
6464
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaSourceTableInfo.getGroupId());
6565
}
66-
66+
if("kafka".equalsIgnoreCase(kafkaSourceTableInfo.getType())){
67+
String keyDeserializer = kafkaSourceTableInfo.getKafkaParam(KafkaSourceTableInfo.KEY_DESERIALIZER);
68+
if(StringUtils.isNotBlank(keyDeserializer)){
69+
kafkaSourceTableInfo.putKafkaParam(KafkaSourceTableInfo.KEY_DESERIALIZER, KafkaSourceTableInfo.DT_DESERIALIZER_CLASS_NAME);
70+
kafkaSourceTableInfo.putKafkaParam(KafkaSourceTableInfo.DT_KEY_DESERIALIZER, keyDeserializer);
71+
}
72+
String valueDeserializer = kafkaSourceTableInfo.getKafkaParam(KafkaSourceTableInfo.VALUE_DESERIALIZER);
73+
if(StringUtils.isNotBlank(valueDeserializer)){
74+
kafkaSourceTableInfo.putKafkaParam(KafkaSourceTableInfo.VALUE_DESERIALIZER, KafkaSourceTableInfo.DT_DESERIALIZER_CLASS_NAME);
75+
kafkaSourceTableInfo.putKafkaParam(KafkaSourceTableInfo.DT_VALUE_DESERIALIZER, valueDeserializer);
76+
}
77+
}
6778
for (String key : kafkaSourceTableInfo.getKafkaParamKeys()) {
6879
props.setProperty(key, kafkaSourceTableInfo.getKafkaParam(key));
6980
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceTableInfo.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,16 @@ public class KafkaSourceTableInfo extends AbstractSourceTableInfo {
6060

6161
public static final String TIMESTAMP_OFFSET = "timestampOffset";
6262

63+
public static final String KEY_DESERIALIZER = "key.deserializer";
64+
65+
public static final String DT_KEY_DESERIALIZER = "dt.key.deserializer";
66+
67+
public static final String VALUE_DESERIALIZER = "value.deserializer";
68+
69+
public static final String DT_VALUE_DESERIALIZER = "dt.value.deserializer";
70+
71+
public static final String DT_DESERIALIZER_CLASS_NAME = "com.dtstack.flink.sql.source.kafka.deserializer.DtKafkaDeserializer";
72+
6373
private String bootstrapServers;
6474

6575
private String topic;
@@ -132,6 +142,10 @@ public String getKafkaParam(String key) {
132142
return kafkaParams.get(key);
133143
}
134144

145+
public void putKafkaParam(String key, String value) {
146+
kafkaParams.put(key, value);
147+
}
148+
135149
public Set<String> getKafkaParamKeys() {
136150
return kafkaParams.keySet();
137151
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.dtstack.flink.sql.source.kafka.deserializer;
19+
20+
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
21+
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import org.apache.kafka.common.errors.SerializationException;
23+
import org.apache.kafka.common.header.Headers;
24+
import org.apache.kafka.common.serialization.Deserializer;
25+
26+
import java.io.IOException;
27+
import java.nio.charset.StandardCharsets;
28+
import java.util.Map;
29+
30+
/**
31+
* Date: 2021/04/20
32+
* Company: www.dtstack.com
33+
*
34+
* @author tudou
35+
*/
36+
public class DtKafkaDeserializer<T> implements Deserializer<byte[]> {
37+
protected ObjectMapper objectMapper = new ObjectMapper();
38+
private Deserializer<T> deserializer;
39+
40+
@Override
41+
@SuppressWarnings("unchecked")
42+
public void configure(Map<String, ?> configs, boolean isKey) {
43+
String deserializerClassName;
44+
if(isKey){
45+
deserializerClassName = (String)configs.get(KafkaSourceTableInfo.DT_KEY_DESERIALIZER);
46+
}else{
47+
deserializerClassName = (String)configs.get(KafkaSourceTableInfo.DT_VALUE_DESERIALIZER);
48+
}
49+
try {
50+
this.deserializer = (Deserializer<T>)Class.forName(deserializerClassName).newInstance();
51+
} catch (Exception e) {
52+
throw new RuntimeException("Can't create instance: " + deserializerClassName, e);
53+
}
54+
this.deserializer.configure(configs, isKey);
55+
}
56+
57+
@Override
58+
public byte[] deserialize(String topic, Headers headers, byte[] data) {
59+
return toBytes(deserializer.deserialize(topic, headers, data));
60+
}
61+
62+
@Override
63+
public byte[] deserialize(String topic, byte[] data) {
64+
return toBytes(deserializer.deserialize(topic, data));
65+
}
66+
67+
/**
68+
* T value to byte[]
69+
* @param value
70+
* @return
71+
*/
72+
private byte[] toBytes(T value){
73+
if(value instanceof byte[]){
74+
return (byte[])value;
75+
}else{
76+
try {
77+
return this.objectMapper.readValue(this.objectMapper.writeValueAsBytes(value), String.class).getBytes(StandardCharsets.UTF_8);
78+
} catch (IOException e) {
79+
throw new SerializationException("Can't deserialize data", e);
80+
}
81+
}
82+
}
83+
84+
@Override
85+
public void close() {
86+
this.deserializer.close();
87+
}
88+
}

0 commit comments

Comments
 (0)