diff --git a/package.xml b/package.xml
index fd9d0b246..3d2db5981 100644
--- a/package.xml
+++ b/package.xml
@@ -34,6 +34,14 @@
/
+
+ patch/addax-kerberos-jdk24-patch/target/addax-kerberos-jdk24-patch-${project.version}/lib/
+
+ *.*
+
+ 0644
+ lib
+
lib/addax-rdbms/target/addax-rdbms-${project.version}/lib/
diff --git a/patch/addax-kerberos-jdk24-patch/package.xml b/patch/addax-kerberos-jdk24-patch/package.xml
new file mode 100644
index 000000000..c2bc008e4
--- /dev/null
+++ b/patch/addax-kerberos-jdk24-patch/package.xml
@@ -0,0 +1,48 @@
+
+
+
+ release
+
+ dir
+
+ false
+
+
+ target/
+
+ ${project.artifactId}-${project.version}.jar
+
+ lib
+
+
+
+
+ false
+ lib
+ runtime
+
+ com.wgzhao.addax:addax-core
+
+
+
+
diff --git a/patch/addax-kerberos-jdk24-patch/pom.xml b/patch/addax-kerberos-jdk24-patch/pom.xml
new file mode 100644
index 000000000..52be55c72
--- /dev/null
+++ b/patch/addax-kerberos-jdk24-patch/pom.xml
@@ -0,0 +1,72 @@
+
+
+
+ 4.0.0
+
+
+ com.wgzhao.addax
+ addax-all
+ 6.0.3-SNAPSHOT
+ ../../pom.xml
+
+
+ addax-kerberos-jdk24-patch
+
+ addax-kerberos-jdk24-patch
+ Fix for Error When kerberos Auth running on jdk24
+ jar
+
+
+
+
+ com.fasterxml.woodstox
+ woodstox-core
+ ${woodstox.version}
+
+
+ com.google.protobuf
+ protobuf-java
+ ${protobuf.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 24
+ 24
+
+
+
+
+
diff --git a/patch/addax-kerberos-jdk24-patch/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/patch/addax-kerberos-jdk24-patch/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
new file mode 100644
index 000000000..7eae75ca2
--- /dev/null
+++ b/patch/addax-kerberos-jdk24-patch/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
@@ -0,0 +1,2259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_TOKEN_FILES;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_TOKENS;
+import static org.apache.hadoop.security.UGIExceptionMessages.*;
+import static org.apache.hadoop.util.PlatformName.IBM_JAVA;
+import static org.apache.hadoop.util.StringUtils.getTrimmedStringCollection;
+
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.Principal;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.security.auth.DestroyFailedException;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
+import javax.security.auth.login.Configuration.Parameters;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import javax.security.auth.spi.LoginModule;
+
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.Time;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * User and group information for Hadoop.
+ * This class wraps around a JAAS Subject and provides methods to determine the
+ * user's username and groups. It supports both the Windows, Unix and Kerberos
+ * login modules.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class UserGroupInformation {
+ @VisibleForTesting
+ static final Logger LOG = LoggerFactory.getLogger(
+ UserGroupInformation.class);
+
+ /**
+ * Percentage of the ticket window to use before we renew ticket.
+ */
+ private static final float TICKET_RENEW_WINDOW = 0.80f;
+ private static boolean shouldRenewImmediatelyForTests = false;
+ static final String HADOOP_USER_NAME = "HADOOP_USER_NAME";
+ static final String HADOOP_PROXY_USER = "HADOOP_PROXY_USER";
+
+ /**
+ * For the purposes of unit tests, we want to test login
+ * from keytab and don't want to wait until the renew
+ * window (controlled by TICKET_RENEW_WINDOW).
+ * @param immediate true if we should login without waiting for ticket window
+ */
+ @VisibleForTesting
+ public static void setShouldRenewImmediatelyForTests(boolean immediate) {
+ shouldRenewImmediatelyForTests = immediate;
+ }
+
+ /**
+ * UgiMetrics maintains UGI activity statistics
+ * and publishes them through the metrics interfaces.
+ */
+ @Metrics(about="User and group related metrics", context="ugi")
+ static class UgiMetrics {
+ final MetricsRegistry registry = new MetricsRegistry("UgiMetrics");
+
+ @Metric("Rate of successful kerberos logins and latency (milliseconds)")
+ MutableRate loginSuccess;
+ @Metric("Rate of failed kerberos logins and latency (milliseconds)")
+ MutableRate loginFailure;
+ @Metric("GetGroups") MutableRate getGroups;
+ MutableQuantiles[] getGroupsQuantiles;
+ @Metric("Renewal failures since startup")
+ private MutableGaugeLong renewalFailuresTotal;
+ @Metric("Renewal failures since last successful login")
+ private MutableGaugeInt renewalFailures;
+
+ static UgiMetrics create() {
+ return DefaultMetricsSystem.instance().register(new UgiMetrics());
+ }
+
+ static void reattach() {
+ metrics = UgiMetrics.create();
+ }
+
+ void addGetGroups(long latency) {
+ getGroups.add(latency);
+ if (getGroupsQuantiles != null) {
+ for (MutableQuantiles q : getGroupsQuantiles) {
+ q.add(latency);
+ }
+ }
+ }
+
+ MutableGaugeInt getRenewalFailures() {
+ return renewalFailures;
+ }
+ }
+
+ /**
+ * A login module that looks at the Kerberos, Unix, or Windows principal and
+ * adds the corresponding UserName.
+ */
+ @InterfaceAudience.Private
+ public static class HadoopLoginModule implements LoginModule {
+ private Subject subject;
+
+ @Override
+ public boolean abort() throws LoginException {
+ return true;
+ }
+
+ private T getCanonicalUser(Class cls) {
+ for(T user: subject.getPrincipals(cls)) {
+ return user;
+ }
+ return null;
+ }
+
+ @Override
+ public boolean commit() throws LoginException {
+ LOG.debug("hadoop login commit");
+ // if we already have a user, we are done.
+ if (!subject.getPrincipals(User.class).isEmpty()) {
+ LOG.debug("Using existing subject: {}", subject.getPrincipals());
+ return true;
+ }
+ Principal user = getCanonicalUser(KerberosPrincipal.class);
+ if (user != null) {
+ LOG.debug("Using kerberos user: {}", user);
+ }
+ //If we don't have a kerberos user and security is disabled, check
+ //if user is specified in the environment or properties
+ if (!isSecurityEnabled() && (user == null)) {
+ String envUser = System.getenv(HADOOP_USER_NAME);
+ if (envUser == null) {
+ envUser = System.getProperty(HADOOP_USER_NAME);
+ }
+ user = envUser == null ? null : new User(envUser);
+ }
+ // use the OS user
+ if (user == null) {
+ user = getCanonicalUser(OS_PRINCIPAL_CLASS);
+ LOG.debug("Using local user: {}", user);
+ }
+ // if we found the user, add our principal
+ if (user != null) {
+ LOG.debug("Using user: \"{}\" with name: {}", user, user.getName());
+
+ User userEntry = null;
+ try {
+ // LoginContext will be attached later unless it's an external
+ // subject.
+ AuthenticationMethod authMethod = (user instanceof KerberosPrincipal)
+ ? AuthenticationMethod.KERBEROS : AuthenticationMethod.SIMPLE;
+ userEntry = new User(user.getName(), authMethod, null);
+ } catch (Exception e) {
+ throw (LoginException)(new LoginException(e.toString()).initCause(e));
+ }
+ LOG.debug("User entry: \"{}\"", userEntry);
+
+ subject.getPrincipals().add(userEntry);
+ return true;
+ }
+ throw new LoginException("Failed to find user in name " + subject);
+ }
+
+ @Override
+ public void initialize(Subject subject, CallbackHandler callbackHandler,
+ Map sharedState, Map options) {
+ this.subject = subject;
+ }
+
+ @Override
+ public boolean login() throws LoginException {
+ LOG.debug("Hadoop login");
+ return true;
+ }
+
+ @Override
+ public boolean logout() throws LoginException {
+ LOG.debug("Hadoop logout");
+ return true;
+ }
+ }
+
+ /**
+ * Reattach the class's metrics to a new metric system.
+ */
+ public static void reattachMetrics() {
+ UgiMetrics.reattach();
+ }
+
+ /** Metrics to track UGI activity */
+ static UgiMetrics metrics = UgiMetrics.create();
+ /** The auth method to use */
+ private static AuthenticationMethod authenticationMethod;
+ /** Server-side groups fetching service */
+ private static Groups groups;
+ /** Min time (in seconds) before relogin for Kerberos */
+ private static long kerberosMinSecondsBeforeRelogin;
+ /** Boolean flag to enable auto-renewal for keytab based loging. */
+ private static boolean kerberosKeyTabLoginRenewalEnabled;
+ /** A reference to Kerberos login auto renewal thread. */
+ private static Optional kerberosLoginRenewalExecutor =
+ Optional.empty();
+ /** The configuration to use */
+
+ private static Configuration conf;
+
+
+ /**Environment variable pointing to the token cache file*/
+ public static final String HADOOP_TOKEN_FILE_LOCATION =
+ "HADOOP_TOKEN_FILE_LOCATION";
+ /** Environment variable pointing to the base64 tokens. */
+ public static final String HADOOP_TOKEN = "HADOOP_TOKEN";
+
+ public static boolean isInitialized() {
+ return conf != null;
+ }
+
+ /**
+ * A method to initialize the fields that depend on a configuration.
+ * Must be called before useKerberos or groups is used.
+ */
+ private static void ensureInitialized() {
+ if (!isInitialized()) {
+ synchronized(UserGroupInformation.class) {
+ if (!isInitialized()) { // someone might have beat us
+ initialize(new Configuration(), false);
+ }
+ }
+ }
+ }
+
+ /**
+ * Initialize UGI and related classes.
+ * @param conf the configuration to use
+ */
+ private static synchronized void initialize(Configuration conf,
+ boolean overrideNameRules) {
+ authenticationMethod = SecurityUtil.getAuthenticationMethod(conf);
+ if (overrideNameRules || !HadoopKerberosName.hasRulesBeenSet()) {
+ try {
+ HadoopKerberosName.setConfiguration(conf);
+ } catch (IOException ioe) {
+ throw new RuntimeException(
+ "Problem with Kerberos auth_to_local name configuration", ioe);
+ }
+ }
+ try {
+ kerberosMinSecondsBeforeRelogin = 1000L * conf.getLong(
+ HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN,
+ HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN_DEFAULT);
+ }
+ catch(NumberFormatException nfe) {
+ throw new IllegalArgumentException("Invalid attribute value for " +
+ HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN + " of " +
+ conf.get(HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN));
+ }
+
+ kerberosKeyTabLoginRenewalEnabled = conf.getBoolean(
+ HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED,
+ HADOOP_KERBEROS_KEYTAB_LOGIN_AUTORENEWAL_ENABLED_DEFAULT);
+
+ // If we haven't set up testing groups, use the configuration to find it
+ if (!(groups instanceof TestingGroups)) {
+ groups = Groups.getUserToGroupsMappingService(conf);
+ }
+ UserGroupInformation.conf = conf;
+
+ if (metrics.getGroupsQuantiles == null) {
+ int[] intervals = conf.getInts(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS);
+ if (intervals != null && intervals.length > 0) {
+ final int length = intervals.length;
+ MutableQuantiles[] getGroupsQuantiles = new MutableQuantiles[length];
+ for (int i = 0; i < length; i++) {
+ getGroupsQuantiles[i] = metrics.registry.newQuantiles(
+ "getGroups" + intervals[i] + "s",
+ "Get groups", "ops", "latency", intervals[i]);
+ }
+ metrics.getGroupsQuantiles = getGroupsQuantiles;
+ }
+ }
+ }
+
+ /**
+ * Set the static configuration for UGI.
+ * In particular, set the security authentication mechanism and the
+ * group look up service.
+ * @param conf the configuration to use
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public static void setConfiguration(Configuration conf) {
+ initialize(conf, true);
+ }
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ public static void reset() {
+ authenticationMethod = null;
+ conf = null;
+ groups = null;
+ kerberosMinSecondsBeforeRelogin = 0;
+ kerberosKeyTabLoginRenewalEnabled = false;
+ kerberosLoginRenewalExecutor = Optional.empty();
+ setLoginUser(null);
+ HadoopKerberosName.setRules(null);
+ }
+
+ /**
+ * Determine if UserGroupInformation is using Kerberos to determine
+ * user identities or is relying on simple authentication
+ *
+ * @return true if UGI is working in a secure environment
+ */
+ public static boolean isSecurityEnabled() {
+ return !isAuthenticationMethodEnabled(AuthenticationMethod.SIMPLE);
+ }
+
+ @InterfaceAudience.Private
+ @InterfaceStability.Evolving
+ private static boolean isAuthenticationMethodEnabled(AuthenticationMethod method) {
+ ensureInitialized();
+ return (authenticationMethod == method);
+ }
+
+ @InterfaceAudience.Private
+ @InterfaceStability.Evolving
+ @VisibleForTesting
+ static boolean isKerberosKeyTabLoginRenewalEnabled() {
+ ensureInitialized();
+ return kerberosKeyTabLoginRenewalEnabled;
+ }
+
+ @InterfaceAudience.Private
+ @InterfaceStability.Evolving
+ @VisibleForTesting
+ static Optional getKerberosLoginRenewalExecutor() {
+ ensureInitialized();
+ return kerberosLoginRenewalExecutor;
+ }
+
+ /**
+ * Information about the logged in user.
+ */
+ private static final AtomicReference loginUserRef =
+ new AtomicReference<>();
+
+ private final Subject subject;
+ // All non-static fields must be read-only caches that come from the subject.
+ private final User user;
+
+ private static String OS_LOGIN_MODULE_NAME;
+ private static Class extends Principal> OS_PRINCIPAL_CLASS;
+
+ private static final boolean windows =
+ System.getProperty("os.name").startsWith("Windows");
+
+ /* Return the OS login module class name */
+ /* For IBM JDK, use the common OS login module class name for all platforms */
+ private static String getOSLoginModuleName() {
+ if (IBM_JAVA) {
+ return "com.ibm.security.auth.module.JAASLoginModule";
+ } else {
+ return windows ? "com.sun.security.auth.module.NTLoginModule"
+ : "com.sun.security.auth.module.UnixLoginModule";
+ }
+ }
+
+ /* Return the OS principal class */
+ /* For IBM JDK, use the common OS principal class for all platforms */
+ @SuppressWarnings("unchecked")
+ private static Class extends Principal> getOsPrincipalClass() {
+ ClassLoader cl = ClassLoader.getSystemClassLoader();
+ try {
+ String principalClass = null;
+ if (IBM_JAVA) {
+ principalClass = "com.ibm.security.auth.UsernamePrincipal";
+ } else {
+ principalClass = windows ? "com.sun.security.auth.NTUserPrincipal"
+ : "com.sun.security.auth.UnixPrincipal";
+ }
+ return (Class extends Principal>) cl.loadClass(principalClass);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Unable to find JAAS classes:" + e.getMessage());
+ }
+ return null;
+ }
+ static {
+ OS_LOGIN_MODULE_NAME = getOSLoginModuleName();
+ OS_PRINCIPAL_CLASS = getOsPrincipalClass();
+ }
+
+ private static class RealUser implements Principal {
+ private final UserGroupInformation realUser;
+
+ RealUser(UserGroupInformation realUser) {
+ this.realUser = realUser;
+ }
+
+ @Override
+ public String getName() {
+ return realUser.getUserName();
+ }
+
+ public UserGroupInformation getRealUser() {
+ return realUser;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o == null || getClass() != o.getClass()) {
+ return false;
+ } else {
+ return realUser.equals(((RealUser) o).realUser);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return realUser.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return realUser.toString();
+ }
+ }
+
+ private static HadoopLoginContext
+ newLoginContext(String appName, Subject subject,
+ HadoopConfiguration loginConf)
+ throws LoginException {
+ // Temporarily switch the thread's ContextClassLoader to match this
+ // class's classloader, so that we can properly load HadoopLoginModule
+ // from the JAAS libraries.
+ Thread t = Thread.currentThread();
+ ClassLoader oldCCL = t.getContextClassLoader();
+ t.setContextClassLoader(HadoopLoginModule.class.getClassLoader());
+ try {
+ return new HadoopLoginContext(appName, subject, loginConf);
+ } finally {
+ t.setContextClassLoader(oldCCL);
+ }
+ }
+
+ // return the LoginContext only if it's managed by the ugi. externally
+ // managed login contexts will be ignored.
+ private HadoopLoginContext getLogin() {
+ LoginContext login = user.getLogin();
+ return (login instanceof HadoopLoginContext)
+ ? (HadoopLoginContext)login : null;
+ }
+
+ private void setLogin(LoginContext login) {
+ user.setLogin(login);
+ }
+
+ /**
+ * Set the last login time for logged in user
+ * @param loginTime the number of milliseconds since the beginning of time
+ */
+ private void setLastLogin(long loginTime) {
+ user.setLastLogin(loginTime);
+ }
+
+ /**
+ * Create a UserGroupInformation for the given subject.
+ * This does not change the subject or acquire new credentials.
+ *
+ * The creator of subject is responsible for renewing credentials.
+ * @param subject the user's subject
+ */
+ UserGroupInformation(Subject subject) {
+ this.subject = subject;
+ // do not access ANY private credentials since they are mutable
+ // during a relogin. no principal locking necessary since
+ // relogin/logout does not remove User principal.
+ this.user = subject.getPrincipals(User.class).iterator().next();
+ if (user == null || user.getName() == null) {
+ throw new IllegalStateException("Subject does not contain a valid User");
+ }
+ }
+
+ /**
+ * checks if logged in using kerberos
+ * @return true if the subject logged via keytab or has a Kerberos TGT
+ */
+ public boolean hasKerberosCredentials() {
+ return user.getAuthenticationMethod() == AuthenticationMethod.KERBEROS;
+ }
+
+ /**
+ * Return the current user, including any doAs in the current stack.
+ * @return the current user
+ * @throws IOException if login fails
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public static UserGroupInformation getCurrentUser() throws IOException {
+ ensureInitialized();
+ Subject subject = Subject.current();
+ if (subject == null || subject.getPrincipals(User.class).isEmpty()) {
+ return getLoginUser();
+ } else {
+ return new UserGroupInformation(subject);
+ }
+ }
+
+ /**
+ * Find the most appropriate UserGroupInformation to use
+ *
+ * @param ticketCachePath The Kerberos ticket cache path, or NULL
+ * if none is specfied
+ * @param user The user name, or NULL if none is specified.
+ *
+ * @return The most appropriate UserGroupInformation
+ * @throws IOException raised on errors performing I/O.
+ */
+ public static UserGroupInformation getBestUGI(
+ String ticketCachePath, String user) throws IOException {
+ if (ticketCachePath != null) {
+ return getUGIFromTicketCache(ticketCachePath, user);
+ } else if (user == null) {
+ return getCurrentUser();
+ } else {
+ return createRemoteUser(user);
+ }
+ }
+
+ /**
+ * Create a UserGroupInformation from a Kerberos ticket cache.
+ *
+ * @param user The principal name to load from the ticket
+ * cache
+ * @param ticketCache the path to the ticket cache file
+ *
+ * @throws IOException if the kerberos login fails
+ * @return UserGroupInformation.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public static UserGroupInformation getUGIFromTicketCache(
+ String ticketCache, String user) throws IOException {
+ if (!isAuthenticationMethodEnabled(AuthenticationMethod.KERBEROS)) {
+ return getBestUGI(null, user);
+ }
+ LoginParams params = new LoginParams();
+ params.put(LoginParam.PRINCIPAL, user);
+ params.put(LoginParam.CCACHE, ticketCache);
+ return doSubjectLogin(null, params);
+ }
+
+ /**
+ * Create a UserGroupInformation from a Subject with Kerberos principal.
+ *
+ * @param subject The KerberosPrincipal to use in UGI.
+ * The creator of subject is responsible for
+ * renewing credentials.
+ *
+ * @throws IOException raised on errors performing I/O.
+ * @throws KerberosAuthException if the kerberos login fails
+ * @return UserGroupInformation.
+ */
+ public static UserGroupInformation getUGIFromSubject(Subject subject)
+ throws IOException {
+ if (subject == null) {
+ throw new KerberosAuthException(SUBJECT_MUST_NOT_BE_NULL);
+ }
+
+ if (subject.getPrincipals(KerberosPrincipal.class).isEmpty()) {
+ throw new KerberosAuthException(SUBJECT_MUST_CONTAIN_PRINCIPAL);
+ }
+
+ // null params indicate external subject login. no login context will
+ // be attached.
+ return doSubjectLogin(subject, null);
+ }
+
+ /**
+ * Get the currently logged in user. If no explicit login has occurred,
+ * the user will automatically be logged in with either kerberos credentials
+ * if available, or as the local OS user, based on security settings.
+ * @return the logged in user
+ * @throws IOException if login fails
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public static UserGroupInformation getLoginUser() throws IOException {
+ ensureInitialized();
+ UserGroupInformation loginUser = loginUserRef.get();
+ // a potential race condition exists only for the initial creation of
+ // the login user. there's no need to penalize all subsequent calls
+ // with sychronization overhead so optimistically create a login user
+ // and discard if we lose the race.
+ if (loginUser == null) {
+ UserGroupInformation newLoginUser = createLoginUser(null);
+ do {
+ // it's extremely unlikely that the login user will be non-null
+ // (lost CAS race), but be nulled before the subsequent get, but loop
+ // for correctness.
+ if (loginUserRef.compareAndSet(null, newLoginUser)) {
+ loginUser = newLoginUser;
+ // only spawn renewal if this login user is the winner.
+ loginUser.spawnAutoRenewalThreadForUserCreds(false);
+ } else {
+ loginUser = loginUserRef.get();
+ }
+ } while (loginUser == null);
+ }
+ return loginUser;
+ }
+
+ /**
+ * remove the login method that is followed by a space from the username
+ * e.g. "jack (auth:SIMPLE)" {@literal ->} "jack"
+ *
+ * @param userName input userName.
+ * @return userName without login method
+ */
+ public static String trimLoginMethod(String userName) {
+ int spaceIndex = userName.indexOf(' ');
+ if (spaceIndex >= 0) {
+ userName = userName.substring(0, spaceIndex);
+ }
+ return userName;
+ }
+
+ /**
+ * Log in a user using the given subject
+ * @param subject the subject to use when logging in a user, or null to
+ * create a new subject.
+ *
+ * If subject is not null, the creator of subject is responsible for renewing
+ * credentials.
+ *
+ * @throws IOException if login fails
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public static void loginUserFromSubject(Subject subject) throws IOException {
+ setLoginUser(createLoginUser(subject));
+ }
+
+ private static
+ UserGroupInformation createLoginUser(Subject subject) throws IOException {
+ UserGroupInformation realUser = doSubjectLogin(subject, null);
+ UserGroupInformation loginUser = null;
+ try {
+ // If the HADOOP_PROXY_USER environment variable or property
+ // is specified, create a proxy user as the logged in user.
+ String proxyUser = System.getenv(HADOOP_PROXY_USER);
+ if (proxyUser == null) {
+ proxyUser = System.getProperty(HADOOP_PROXY_USER);
+ }
+ loginUser = proxyUser == null ? realUser : createProxyUser(proxyUser, realUser);
+
+ // Load tokens from files
+ final Collection tokenFileLocations = new LinkedHashSet<>();
+ tokenFileLocations.addAll(getTrimmedStringCollection(
+ System.getProperty(HADOOP_TOKEN_FILES)));
+ tokenFileLocations.addAll(getTrimmedStringCollection(
+ conf.get(HADOOP_TOKEN_FILES)));
+ tokenFileLocations.addAll(getTrimmedStringCollection(
+ System.getenv(HADOOP_TOKEN_FILE_LOCATION)));
+ for (String tokenFileLocation : tokenFileLocations) {
+ if (tokenFileLocation != null && tokenFileLocation.length() > 0) {
+ File tokenFile = new File(tokenFileLocation);
+ LOG.debug("Reading credentials from location {}",
+ tokenFile.getCanonicalPath());
+ if (tokenFile.exists() && tokenFile.isFile()) {
+ Credentials cred = Credentials.readTokenStorageFile(
+ tokenFile, conf);
+ LOG.debug("Loaded {} tokens from {}", cred.numberOfTokens(),
+ tokenFile.getCanonicalPath());
+ loginUser.addCredentials(cred);
+ } else {
+ LOG.info("Token file {} does not exist",
+ tokenFile.getCanonicalPath());
+ }
+ }
+ }
+
+ // Load tokens from base64 encoding
+ final Collection tokensBase64 = new LinkedHashSet<>();
+ tokensBase64.addAll(getTrimmedStringCollection(
+ System.getProperty(HADOOP_TOKENS)));
+ tokensBase64.addAll(getTrimmedStringCollection(
+ conf.get(HADOOP_TOKENS)));
+ tokensBase64.addAll(getTrimmedStringCollection(
+ System.getenv(HADOOP_TOKEN)));
+ int numTokenBase64 = 0;
+ for (String tokenBase64 : tokensBase64) {
+ if (tokenBase64 != null && tokenBase64.length() > 0) {
+ try {
+ Token token = new Token<>();
+ token.decodeFromUrlString(tokenBase64);
+ Credentials cred = new Credentials();
+ cred.addToken(token.getService(), token);
+ loginUser.addCredentials(cred);
+ numTokenBase64++;
+ } catch (IOException ioe) {
+ LOG.error("Cannot add token {}: {}",
+ tokenBase64, ioe.getMessage());
+ }
+ }
+ }
+ if (numTokenBase64 > 0) {
+ LOG.debug("Loaded {} base64 tokens", numTokenBase64);
+ }
+ } catch (IOException ioe) {
+ LOG.debug("Failure to load login credentials", ioe);
+ throw ioe;
+ }
+ LOG.debug("UGI loginUser: {}", loginUser);
+ return loginUser;
+ }
+
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ @VisibleForTesting
+ public static void setLoginUser(UserGroupInformation ugi) {
+ // if this is to become stable, should probably logout the currently
+ // logged in ugi if it's different
+ loginUserRef.set(ugi);
+ }
+
+ private String getKeytab() {
+ HadoopLoginContext login = getLogin();
+ return (login != null)
+ ? login.getConfiguration().getParameters().get(LoginParam.KEYTAB)
+ : null;
+ }
+
+ /**
+ * Is the ugi managed by the UGI or an external subject?
+ * @return true if managed by UGI.
+ */
+ private boolean isHadoopLogin() {
+ // checks if the private hadoop login context is managing the ugi.
+ return getLogin() != null;
+ }
+
+ /**
+ * Is this user logged in from a keytab file managed by the UGI?
+ * @return true if the credentials are from a keytab file.
+ */
+ public boolean isFromKeytab() {
+ // can't simply check if keytab is present since a relogin failure will
+ // have removed the keytab from priv creds. instead, check login params.
+ return hasKerberosCredentials() && isHadoopLogin() && getKeytab() != null;
+ }
+
+ /**
+ * Is this user logged in from a ticket (but no keytab) managed by the UGI?
+ * @return true if the credentials are from a ticket cache.
+ */
+ private boolean isFromTicket() {
+ return hasKerberosCredentials() && isHadoopLogin() && getKeytab() == null;
+ }
+
+ /**
+ * Get the Kerberos TGT
+ * @return the user's TGT or null if none was found
+ */
+ private KerberosTicket getTGT() {
+ Set tickets = subject
+ .getPrivateCredentials(KerberosTicket.class);
+ for (KerberosTicket ticket : tickets) {
+ if (SecurityUtil.isOriginalTGT(ticket)) {
+ return ticket;
+ }
+ }
+ return null;
+ }
+
+ private long getRefreshTime(KerberosTicket tgt) {
+ long start = tgt.getStartTime().getTime();
+ long end = tgt.getEndTime().getTime();
+ return start + (long) ((end - start) * TICKET_RENEW_WINDOW);
+ }
+
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public boolean shouldRelogin() {
+ return hasKerberosCredentials() && isHadoopLogin();
+ }
+
+ /**
+ * Spawn a thread to do periodic renewals of kerberos credentials. NEVER
+ * directly call this method. This method should only be used for ticket cache
+ * based kerberos credentials.
+ *
+ * @param force - used by tests to forcibly spawn thread
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ @VisibleForTesting
+ void spawnAutoRenewalThreadForUserCreds(boolean force) {
+ if (!force && (!shouldRelogin() || isFromKeytab())) {
+ return;
+ }
+
+ //spawn thread only if we have kerb credentials
+ KerberosTicket tgt = getTGT();
+ if (tgt == null) {
+ return;
+ }
+ String cmd = conf.get("hadoop.kerberos.kinit.command", "kinit");
+ long nextRefresh = getRefreshTime(tgt);
+ executeAutoRenewalTask(getUserName(),
+ new TicketCacheRenewalRunnable(tgt, cmd, nextRefresh));
+ }
+
+ /**
+ * Spawn a thread to do periodic renewals of kerberos credentials from a
+ * keytab file.
+ */
+ private void spawnAutoRenewalThreadForKeytab() {
+ if (!shouldRelogin() || isFromTicket()) {
+ return;
+ }
+
+ // spawn thread only if we have kerb credentials
+ KerberosTicket tgt = getTGT();
+ if (tgt == null) {
+ return;
+ }
+ long nextRefresh = getRefreshTime(tgt);
+ executeAutoRenewalTask(getUserName(),
+ new KeytabRenewalRunnable(tgt, nextRefresh));
+ }
+
+ /**
+ * Spawn a thread to do periodic renewals of kerberos credentials from a
+ * keytab file. NEVER directly call this method.
+ *
+ * @param userName Name of the user for which login needs to be renewed.
+ * @param task The reference of the login renewal task.
+ */
+ private void executeAutoRenewalTask(final String userName,
+ AutoRenewalForUserCredsRunnable task) {
+ kerberosLoginRenewalExecutor = Optional.of(
+ Executors.newSingleThreadExecutor(
+ new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setDaemon(true);
+ t.setName("TGT Renewer for " + userName);
+ return t;
+ }
+ }
+ ));
+ kerberosLoginRenewalExecutor.get().submit(task);
+ }
+
+ /**
+ * An abstract class which encapsulates the functionality required to
+ * auto renew Kerbeors TGT. The concrete implementations of this class
+ * are expected to provide implementation required to perform actual
+ * TGT renewal (see {@code TicketCacheRenewalRunnable} and
+ * {@code KeytabRenewalRunnable}).
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ @VisibleForTesting
+ abstract class AutoRenewalForUserCredsRunnable implements Runnable {
+ private KerberosTicket tgt;
+ private RetryPolicy rp;
+ private long nextRefresh;
+ private boolean runRenewalLoop = true;
+
+ AutoRenewalForUserCredsRunnable(KerberosTicket tgt, long nextRefresh) {
+ this.tgt = tgt;
+ this.nextRefresh = nextRefresh;
+ this.rp = null;
+ }
+
+ public void setRunRenewalLoop(boolean runRenewalLoop) {
+ this.runRenewalLoop = runRenewalLoop;
+ }
+
+ /**
+ * This method is used to perform renewal of kerberos login ticket.
+ * The concrete implementations of this class should provide specific
+ * logic required to perform renewal as part of this method.
+ */
+ protected abstract void relogin() throws IOException;
+
+ @Override
+ public void run() {
+ do {
+ try {
+ long now = Time.now();
+ LOG.debug("Current time is {}, next refresh is {}", now, nextRefresh);
+ if (now < nextRefresh) {
+ Thread.sleep(nextRefresh - now);
+ }
+ relogin();
+ tgt = getTGT();
+ if (tgt == null) {
+ LOG.warn("No TGT after renewal. Aborting renew thread for " +
+ getUserName());
+ return;
+ }
+ nextRefresh = Math.max(getRefreshTime(tgt),
+ now + kerberosMinSecondsBeforeRelogin);
+ metrics.renewalFailures.set(0);
+ rp = null;
+ } catch (InterruptedException ie) {
+ LOG.warn("Terminating renewal thread");
+ return;
+ } catch (IOException ie) {
+ metrics.renewalFailuresTotal.incr();
+ final long now = Time.now();
+
+ if (tgt.isDestroyed()) {
+ LOG.error(String.format("TGT is destroyed. " +
+ "Aborting renew thread for %s.", getUserName()), ie);
+ return;
+ }
+
+ long tgtEndTime;
+ // As described in HADOOP-15593 we need to handle the case when
+ // tgt.getEndTime() throws NPE because of JDK issue JDK-8147772
+ // NPE is only possible if this issue is not fixed in the JDK
+ // currently used
+ try {
+ tgtEndTime = tgt.getEndTime().getTime();
+ } catch (NullPointerException npe) {
+ LOG.error("NPE thrown while getting KerberosTicket endTime. "
+ + "Aborting renew thread for {}.", getUserName(), ie);
+ return;
+ }
+
+ LOG.warn(
+ "Exception encountered while running the "
+ + "renewal command for {}. "
+ + "(TGT end time:{}, renewalFailures: {}, "
+ + "renewalFailuresTotal: {})",
+ getUserName(), tgtEndTime, metrics.renewalFailures.value(),
+ metrics.renewalFailuresTotal.value(), ie);
+ if (rp == null) {
+ // Use a dummy maxRetries to create the policy. The policy will
+ // only be used to get next retry time with exponential back-off.
+ // The final retry time will be later limited within the
+ // tgt endTime in getNextTgtRenewalTime.
+ rp = RetryPolicies.exponentialBackoffRetry(Long.SIZE - 2,
+ kerberosMinSecondsBeforeRelogin, TimeUnit.MILLISECONDS);
+ }
+ try {
+ nextRefresh = getNextTgtRenewalTime(tgtEndTime, now, rp);
+ } catch (Exception e) {
+ LOG.error("Exception when calculating next tgt renewal time", e);
+ return;
+ }
+ metrics.renewalFailures.incr();
+ // retry until close enough to tgt endTime.
+ if (now > nextRefresh) {
+ LOG.error("TGT is expired. Aborting renew thread for {}.",
+ getUserName());
+ return;
+ }
+ }
+ } while (runRenewalLoop);
+ }
+ }
+
+ /**
+ * A concrete implementation of {@code AutoRenewalForUserCredsRunnable} class
+ * which performs TGT renewal using kinit command.
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ @VisibleForTesting
+ final class TicketCacheRenewalRunnable
+ extends AutoRenewalForUserCredsRunnable {
+ private String kinitCmd;
+
+ TicketCacheRenewalRunnable(KerberosTicket tgt, String kinitCmd,
+ long nextRefresh) {
+ super(tgt, nextRefresh);
+ this.kinitCmd = kinitCmd;
+ }
+
+ @Override
+ public void relogin() throws IOException {
+ String output = Shell.execCommand(kinitCmd, "-R");
+ LOG.debug("Renewed ticket. kinit output: {}", output);
+ reloginFromTicketCache();
+ }
+ }
+
+ /**
+ * A concrete implementation of {@code AutoRenewalForUserCredsRunnable} class
+ * which performs TGT renewal using specified keytab.
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ @VisibleForTesting
+ final class KeytabRenewalRunnable extends AutoRenewalForUserCredsRunnable {
+
+ KeytabRenewalRunnable(KerberosTicket tgt, long nextRefresh) {
+ super(tgt, nextRefresh);
+ }
+
+ @Override
+ public void relogin() throws IOException {
+ reloginFromKeytab();
+ }
+ }
+
+ /**
+ * Get time for next login retry. This will allow the thread to retry with
+ * exponential back-off, until tgt endtime.
+ * Last retry is {@link #kerberosMinSecondsBeforeRelogin} before endtime.
+ *
+ * @param tgtEndTime EndTime of the tgt.
+ * @param now Current time.
+ * @param rp The retry policy.
+ * @return Time for next login retry.
+ */
+ @VisibleForTesting
+ static long getNextTgtRenewalTime(final long tgtEndTime, final long now,
+ final RetryPolicy rp) throws Exception {
+ final long lastRetryTime = tgtEndTime - kerberosMinSecondsBeforeRelogin;
+ final RetryPolicy.RetryAction ra = rp.shouldRetry(null,
+ metrics.renewalFailures.value(), 0, false);
+ return Math.min(lastRetryTime, now + ra.delayMillis);
+ }
+
+ /**
+ * Log a user in from a keytab file. Loads a user identity from a keytab
+ * file and logs them in. They become the currently logged-in user.
+ * @param user the principal name to load from the keytab
+ * @param path the path to the keytab file
+ * @throws IOException raised on errors performing I/O.
+ * @throws KerberosAuthException if it's a kerberos login exception.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public
+ static void loginUserFromKeytab(String user,
+ String path
+ ) throws IOException {
+ if (!isSecurityEnabled())
+ return;
+
+ UserGroupInformation u = loginUserFromKeytabAndReturnUGI(user, path);
+ if (isKerberosKeyTabLoginRenewalEnabled()) {
+ u.spawnAutoRenewalThreadForKeytab();
+ }
+
+ setLoginUser(u);
+
+ LOG.info(
+ "Login successful for user {} using keytab file {}. Keytab auto"
+ + " renewal enabled : {}",
+ user, new File(path).getName(), isKerberosKeyTabLoginRenewalEnabled());
+ }
+
+ /**
+ * Log the current user out who previously logged in using keytab.
+ * This method assumes that the user logged in by calling
+ * {@link #loginUserFromKeytab(String, String)}.
+ *
+ * @throws IOException raised on errors performing I/O.
+ * @throws KerberosAuthException if a failure occurred in logout,
+ * or if the user did not log in by invoking loginUserFromKeyTab() before.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public void logoutUserFromKeytab() throws IOException {
+ if (!hasKerberosCredentials()) {
+ return;
+ }
+
+ // Shutdown the background task performing login renewal.
+ if (getKerberosLoginRenewalExecutor().isPresent()) {
+ getKerberosLoginRenewalExecutor().get().shutdownNow();
+ }
+
+ HadoopLoginContext login = getLogin();
+ String keytabFile = getKeytab();
+ if (login == null || keytabFile == null) {
+ throw new KerberosAuthException(MUST_FIRST_LOGIN_FROM_KEYTAB);
+ }
+
+ try {
+ LOG.debug("Initiating logout for {}", getUserName());
+ // hadoop login context internally locks credentials.
+ login.logout();
+ } catch (LoginException le) {
+ KerberosAuthException kae = new KerberosAuthException(LOGOUT_FAILURE, le);
+ kae.setUser(user.toString());
+ kae.setKeytabFile(keytabFile);
+ throw kae;
+ }
+
+ LOG.info("Logout successful for user " + getUserName()
+ + " using keytab file " + keytabFile);
+ }
+
+ /**
+ * Re-login a user from keytab if TGT is expired or is close to expiry.
+ *
+ * @throws IOException raised on errors performing I/O.
+ * @throws KerberosAuthException if it's a kerberos login exception.
+ */
+ public void checkTGTAndReloginFromKeytab() throws IOException {
+ reloginFromKeytab(true);
+ }
+
+ // if the first kerberos ticket is not TGT, then remove and destroy it since
+ // the kerberos library of jdk always use the first kerberos ticket as TGT.
+ // See HADOOP-13433 for more details.
+ @VisibleForTesting
+ void fixKerberosTicketOrder() {
+ Set