Skip to content

Commit e1144af

Browse files
author
gituser
committed
Merge branch '1.10_release_4.0.x' into 1.10_release_4.1.x
2 parents 0f4a58f + 2a44f3f commit e1144af

File tree

2 files changed

+53
-38
lines changed

2 files changed

+53
-38
lines changed

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ExtendES5ApiCallBridge.java

Lines changed: 53 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import transwarp.org.elasticsearch.action.bulk.BulkProcessor;
3434
import transwarp.org.elasticsearch.client.transport.TransportClient;
3535
import transwarp.org.elasticsearch.common.network.NetworkModule;
36+
import transwarp.org.elasticsearch.common.settings.Setting;
3637
import transwarp.org.elasticsearch.common.settings.Settings;
3738
import transwarp.org.elasticsearch.common.transport.TransportAddress;
3839
import transwarp.org.elasticsearch.common.unit.TimeValue;
@@ -70,28 +71,41 @@ public ExtendES5ApiCallBridge(List<InetSocketAddress> transportAddresses, Elasti
7071
@Override
7172
public TransportClient createClient(Map<String, String> clientConfig) throws IOException{
7273

73-
//1. login kdc with keytab and krb5 conf
74-
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
75-
esTableInfo.getPrincipal(),
76-
esTableInfo.getKeytab(),
77-
esTableInfo.getKrb5conf());
78-
79-
//2. set transwarp attributes
80-
Settings settings = Settings.builder().put(clientConfig)
81-
.put("client.transport.sniff", true)
82-
.put("security.enable", true)
83-
.put(NetworkModule.TRANSPORT_TYPE_KEY, "security-netty3")
84-
.build();
85-
86-
//3. build transport client with transwarp plugins
87-
TransportClient transportClient = ugi.doAs((PrivilegedAction<TransportClient>) () -> {
88-
TransportClient tmpClient = new PreBuiltTransportClient(settings,
89-
Collections.singletonList(DoorKeeperClientPlugin.class));
74+
TransportClient transportClient;
75+
76+
if (esTableInfo.isEnableKrb()) {
77+
//1. login kdc with keytab and krb5 conf
78+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
79+
esTableInfo.getPrincipal(),
80+
esTableInfo.getKeytab(),
81+
esTableInfo.getKrb5conf());
82+
83+
//2. set transwarp attributes
84+
Settings settings = Settings.builder().put(clientConfig)
85+
.put("client.transport.sniff", true)
86+
.put("security.enable", true)
87+
.put(NetworkModule.TRANSPORT_TYPE_KEY, "security-netty3")
88+
.build();
89+
90+
//3. build transport client with transwarp plugins
91+
transportClient = ugi.doAs((PrivilegedAction<TransportClient>) () -> {
92+
TransportClient tmpClient = new PreBuiltTransportClient(settings,
93+
Collections.singletonList(DoorKeeperClientPlugin.class));
94+
for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
95+
tmpClient.addTransportAddress(transport);
96+
}
97+
return tmpClient;
98+
});
99+
} else {
100+
Settings settings = Settings.builder().put(clientConfig)
101+
.put("client.transport.sniff", true)
102+
.build();
103+
104+
transportClient = new PreBuiltTransportClient(settings);
90105
for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
91-
tmpClient.addTransportAddress(transport);
106+
transportClient.addTransportAddress(transport);
92107
}
93-
return tmpClient;
94-
});
108+
}
95109

96110
return transportClient;
97111
}
@@ -140,18 +154,27 @@ public void configureBulkProcessorBackoff(
140154
@Override
141155
public boolean verifyClientConnection(TransportClient client) throws IOException {
142156

143-
//1. login kdc with keytab and krb5 conf
144-
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
145-
esTableInfo.getPrincipal(),
146-
esTableInfo.getKeytab(),
147-
esTableInfo.getKrb5conf());
148157

149-
//2. refresh availableNodes.
150-
boolean verifyResult = ugi.doAs((PrivilegedAction<Boolean>) () -> {
151-
LOG.info("Refresh client available nodes.");
158+
boolean verifyResult = false;
159+
160+
if (esTableInfo.isEnableKrb()) {
161+
//1. login kdc with keytab and krb5 conf
162+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
163+
esTableInfo.getPrincipal(),
164+
esTableInfo.getKeytab(),
165+
esTableInfo.getKrb5conf());
166+
167+
//2. refresh availableNodes.
168+
verifyResult = ugi.doAs((PrivilegedAction<Boolean>) () -> {
169+
LOG.info("Refresh client available nodes.");
170+
client.refreshAvailableNodes();
171+
return client.connectedNodes().isEmpty();
172+
});
173+
} else {
152174
client.refreshAvailableNodes();
153-
return client.connectedNodes().isEmpty();
154-
});
175+
verifyResult = client.connectedNodes().isEmpty();
176+
}
177+
155178

156179
if (!verifyResult) {
157180
return true;

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -160,14 +160,6 @@ public boolean check() {
160160
Preconditions.checkArgument(NumberUtils.isNumber(number), "id must be a numeric type");
161161
});
162162
}
163-
164-
boolean allNotSet =
165-
Strings.isNullOrEmpty(principal) &&
166-
Strings.isNullOrEmpty(keytab) &&
167-
Strings.isNullOrEmpty(krb5conf);
168-
169-
Preconditions.checkState(!allNotSet, "xh's elasticsearch type of kerberos file is required");
170-
171163
return true;
172164
}
173165

0 commit comments

Comments
 (0)