Skip to content

Commit 2728f89

Browse files
committed
[feat-1721][elasticsearch-xh]support non kerberos es.
1 parent abfbc18 commit 2728f89

File tree

2 files changed

+61
-35
lines changed

2 files changed

+61
-35
lines changed

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ExtendES5ApiCallBridge.java

Lines changed: 54 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import transwarp.org.elasticsearch.action.bulk.BulkProcessor;
3434
import transwarp.org.elasticsearch.client.transport.TransportClient;
3535
import transwarp.org.elasticsearch.common.network.NetworkModule;
36+
import transwarp.org.elasticsearch.common.settings.Setting;
3637
import transwarp.org.elasticsearch.common.settings.Settings;
3738
import transwarp.org.elasticsearch.common.transport.TransportAddress;
3839
import transwarp.org.elasticsearch.common.unit.TimeValue;
@@ -65,33 +66,47 @@ public ExtendES5ApiCallBridge(List<InetSocketAddress> transportAddresses, Elasti
6566
Preconditions.checkArgument(transportAddresses != null && !transportAddresses.isEmpty());
6667
this.transportAddresses = transportAddresses;
6768
this.esTableInfo = esTableInfo;
69+
this.esTableInfo.judgeKrbEnable();
6870
}
6971

7072
@Override
7173
public TransportClient createClient(Map<String, String> clientConfig) throws IOException{
7274

73-
//1. login kdc with keytab and krb5 conf
74-
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
75-
esTableInfo.getPrincipal(),
76-
esTableInfo.getKeytab(),
77-
esTableInfo.getKrb5conf());
78-
79-
//2. set transwarp attributes
80-
Settings settings = Settings.builder().put(clientConfig)
81-
.put("client.transport.sniff", true)
82-
.put("security.enable", true)
83-
.put(NetworkModule.TRANSPORT_TYPE_KEY, "security-netty3")
84-
.build();
85-
86-
//3. build transport client with transwarp plugins
87-
TransportClient transportClient = ugi.doAs((PrivilegedAction<TransportClient>) () -> {
88-
TransportClient tmpClient = new PreBuiltTransportClient(settings,
89-
Collections.singletonList(DoorKeeperClientPlugin.class));
75+
TransportClient transportClient;
76+
77+
if (esTableInfo.isEnableKrb()) {
78+
//1. login kdc with keytab and krb5 conf
79+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
80+
esTableInfo.getPrincipal(),
81+
esTableInfo.getKeytab(),
82+
esTableInfo.getKrb5conf());
83+
84+
//2. set transwarp attributes
85+
Settings settings = Settings.builder().put(clientConfig)
86+
.put("client.transport.sniff", true)
87+
.put("security.enable", true)
88+
.put(NetworkModule.TRANSPORT_TYPE_KEY, "security-netty3")
89+
.build();
90+
91+
//3. build transport client with transwarp plugins
92+
transportClient = ugi.doAs((PrivilegedAction<TransportClient>) () -> {
93+
TransportClient tmpClient = new PreBuiltTransportClient(settings,
94+
Collections.singletonList(DoorKeeperClientPlugin.class));
95+
for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
96+
tmpClient.addTransportAddress(transport);
97+
}
98+
return tmpClient;
99+
});
100+
} else {
101+
Settings settings = Settings.builder().put(clientConfig)
102+
.put("client.transport.sniff", true)
103+
.build();
104+
105+
transportClient = new PreBuiltTransportClient(settings);
90106
for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
91-
tmpClient.addTransportAddress(transport);
107+
transportClient.addTransportAddress(transport);
92108
}
93-
return tmpClient;
94-
});
109+
}
95110

96111
return transportClient;
97112
}
@@ -140,18 +155,27 @@ public void configureBulkProcessorBackoff(
140155
@Override
141156
public boolean verifyClientConnection(TransportClient client) throws IOException {
142157

143-
//1. login kdc with keytab and krb5 conf
144-
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
145-
esTableInfo.getPrincipal(),
146-
esTableInfo.getKeytab(),
147-
esTableInfo.getKrb5conf());
148158

149-
//2. refresh availableNodes.
150-
boolean verifyResult = ugi.doAs((PrivilegedAction<Boolean>) () -> {
151-
LOG.info("Refresh client available nodes.");
159+
boolean verifyResult = false;
160+
161+
if (esTableInfo.isEnableKrb()) {
162+
//1. login kdc with keytab and krb5 conf
163+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
164+
esTableInfo.getPrincipal(),
165+
esTableInfo.getKeytab(),
166+
esTableInfo.getKrb5conf());
167+
168+
//2. refresh availableNodes.
169+
verifyResult = ugi.doAs((PrivilegedAction<Boolean>) () -> {
170+
LOG.info("Refresh client available nodes.");
171+
client.refreshAvailableNodes();
172+
return client.connectedNodes().isEmpty();
173+
});
174+
} else {
152175
client.refreshAvailableNodes();
153-
return client.connectedNodes().isEmpty();
154-
});
176+
verifyResult = client.connectedNodes().isEmpty();
177+
}
178+
155179

156180
if (!verifyResult) {
157181
return true;

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,14 @@ public boolean check() {
161161
});
162162
}
163163

164-
boolean allNotSet =
165-
Strings.isNullOrEmpty(principal) &&
166-
Strings.isNullOrEmpty(keytab) &&
167-
Strings.isNullOrEmpty(krb5conf);
168164

169-
Preconditions.checkState(!allNotSet, "xh's elasticsearch type of kerberos file is required");
165+
// all or nothing
166+
boolean allSet = Strings.isNullOrEmpty(principal)
167+
== Strings.isNullOrEmpty(keytab)
168+
== Strings.isNullOrEmpty(krb5conf);
169+
170+
171+
Preconditions.checkState(!allSet, "xh's elasticsearch type of kerberos file is required");
170172

171173
return true;
172174
}

0 commit comments

Comments
 (0)