diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 1e8df1c67470..b606404b3959 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -4858,4 +4858,25 @@
5m
Interval for cleaning up orphan snapshot local data versions corresponding to snapshots
+
+
+ ozone.om.revoked.sts.token.cleanup.service.interval
+ 3h
+ OZONE, OM, PERFORMANCE, SECURITY
+
+ A background job that periodically checks revoked STS token entries and
+ deletes ones that have existed for 12 hours. This entry controls the interval of this
+ cleanup check. Unit could be defined with postfix (ns,ms,s,m,h,d).
+
+
+
+ ozone.om.revoked.sts.token.cleanup.service.timeout
+ 15m
+ OZONE, OM, PERFORMANCE, SECURITY
+
+ A timeout value for the revoked STS token cleanup service. If this is set
+ greater than 0, the service will stop waiting for the deletion
+ completion after this time. Unit could be defined with postfix (ns,ms,s,m,h,d).
+
+
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index dd70a9056f96..41d3d0bac437 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -344,6 +344,7 @@ public static boolean isReadOnly(
case QuotaRepair:
case PutObjectTagging:
case DeleteObjectTagging:
+ case CleanupRevokedSTSTokens:
case UnknownCommand:
return false;
case EchoRPC:
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 469900aa8ea7..a414675c6718 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -682,6 +682,16 @@ public final class OMConfigKeys {
"ozone.om.snapshot.local.data.manager.service.interval";
public static final String OZONE_OM_SNAPSHOT_LOCAL_DATA_MANAGER_SERVICE_INTERVAL_DEFAULT = "5m";
+ public static final String OZONE_OM_REVOKED_STS_TOKEN_CLEANUP_SERVICE_INTERVAL
+ = "ozone.om.revoked.sts.token.cleanup.service.interval";
+ public static final String OZONE_OM_REVOKED_STS_TOKEN_CLEANUP_SERVICE_INTERVAL_DEFAULT
+ = "3h";
+
+ public static final String OZONE_OM_REVOKED_STS_TOKEN_CLEANUP_SERVICE_TIMEOUT
+ = "ozone.om.revoked.sts.token.cleanup.service.timeout";
+ public static final String OZONE_OM_REVOKED_STS_TOKEN_CLEANUP_SERVICE_TIMEOUT_DEFAULT
+ = "15m";
+
/**
* Never constructed.
*/
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index b24aff586cea..763d2619a709 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -158,6 +158,7 @@ enum Type {
DeleteObjectTagging = 142;
AssumeRole = 143;
RevokeSTSToken = 144;
+ CleanupRevokedSTSTokens = 145;
}
enum SafeMode {
@@ -308,6 +309,7 @@ message OMRequest {
repeated SetSnapshotPropertyRequest SetSnapshotPropertyRequests = 143;
optional AssumeRoleRequest assumeRoleRequest = 144;
optional RevokeSTSTokenRequest revokeSTSTokenRequest = 145;
+ optional CleanupRevokedSTSTokensRequest cleanupRevokedSTSTokensRequest = 146;
}
message OMResponse {
@@ -443,6 +445,7 @@ message OMResponse {
optional DeleteObjectTaggingResponse deleteObjectTaggingResponse = 142;
optional AssumeRoleResponse assumeRoleResponse = 143;
optional RevokeSTSTokenResponse revokeSTSTokenResponse = 144;
+ optional CleanupRevokedSTSTokensResponse cleanupRevokedSTSTokensResponse = 145;
}
enum Status {
@@ -2394,6 +2397,17 @@ message RevokeSTSTokenRequest {
message RevokeSTSTokenResponse {
}
+/**
+ This will contain a list of revoked STS temporary access key IDs whose entries should be removed from
+ the s3RevokedStsTokenTable.
+*/
+message CleanupRevokedSTSTokensRequest {
+ repeated string accessKeyId = 1;
+}
+
+message CleanupRevokedSTSTokensResponse {
+}
+
/**
The OM service that takes care of Ozone namespace.
*/
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index e6c5f916cc1e..d26c9139bb22 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -287,6 +287,7 @@
import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
import org.apache.hadoop.ozone.om.service.OMRangerBGSyncService;
import org.apache.hadoop.ozone.om.service.QuotaRepairTask;
+import org.apache.hadoop.ozone.om.service.RevokedSTSTokenCleanupService;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
@@ -438,6 +439,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private final boolean isSpnegoEnabled;
private final SecurityConfig secConfig;
private S3SecretManager s3SecretManager;
+ private RevokedSTSTokenCleanupService revokedSTSTokenCleanupService;
private final boolean isOmGrpcServerEnabled;
private volatile boolean isOmRpcServerRunning = false;
private volatile boolean isOmGrpcServerRunning = false;
@@ -1946,6 +1948,18 @@ public void start() throws IOException {
keyManager.start(configuration);
+ final long revokedSTSTokenCleanupInterval = configuration.getTimeDuration(
+ OMConfigKeys.OZONE_OM_REVOKED_STS_TOKEN_CLEANUP_SERVICE_INTERVAL,
+ OMConfigKeys.OZONE_OM_REVOKED_STS_TOKEN_CLEANUP_SERVICE_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ final long revokedSTSTokenCleanupTimeout = configuration.getTimeDuration(
+ OMConfigKeys.OZONE_OM_REVOKED_STS_TOKEN_CLEANUP_SERVICE_TIMEOUT,
+ OMConfigKeys.OZONE_OM_REVOKED_STS_TOKEN_CLEANUP_SERVICE_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ revokedSTSTokenCleanupService = new RevokedSTSTokenCleanupService(
+ revokedSTSTokenCleanupInterval, TimeUnit.MILLISECONDS, revokedSTSTokenCleanupTimeout, this);
+ revokedSTSTokenCleanupService.start();
+
try {
httpServer = new OzoneManagerHttpServer(configuration, this);
httpServer.start();
@@ -2524,6 +2538,9 @@ public boolean stop() {
if (edekCacheLoader != null) {
edekCacheLoader.shutdown();
}
+ if (revokedSTSTokenCleanupService != null) {
+ revokedSTSTokenCleanupService.shutdown();
+ }
return true;
} catch (Exception e) {
LOG.error("OzoneManager stop failed.", e);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 706e00f9537e..3d6f2d5f3f2f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -68,6 +68,7 @@
import org.apache.hadoop.ozone.om.request.key.acl.prefix.OMPrefixSetAclRequest;
import org.apache.hadoop.ozone.om.request.s3.multipart.S3ExpiredMultipartUploadsAbortRequest;
import org.apache.hadoop.ozone.om.request.s3.security.OMSetSecretRequest;
+import org.apache.hadoop.ozone.om.request.s3.security.S3CleanupRevokedSTSTokensRequest;
import org.apache.hadoop.ozone.om.request.s3.security.S3GetSecretRequest;
import org.apache.hadoop.ozone.om.request.s3.security.S3RevokeSTSTokenRequest;
import org.apache.hadoop.ozone.om.request.s3.security.S3RevokeSecretRequest;
@@ -199,6 +200,8 @@ public static OMClientRequest createClientRequest(OMRequest omRequest,
return new S3RevokeSecretRequest(omRequest);
case RevokeSTSToken:
return new S3RevokeSTSTokenRequest(omRequest);
+ case CleanupRevokedSTSTokens:
+ return new S3CleanupRevokedSTSTokensRequest(omRequest);
case PurgeKeys:
return new OMKeyPurgeRequest(omRequest);
case PurgeDirectories:
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3CleanupRevokedSTSTokensRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3CleanupRevokedSTSTokensRequest.java
new file mode 100644
index 000000000000..ab0bf76b6d92
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3CleanupRevokedSTSTokensRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.s3.security;
+
+import java.util.List;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.security.S3CleanupRevokedSTSTokensResponse;
+import org.apache.hadoop.ozone.om.service.RevokedSTSTokenCleanupService;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CleanupRevokedSTSTokensRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+/**
+ * Handles CleanupRevokedSTSTokens requests submitted by {@link RevokedSTSTokenCleanupService}.
+ */
+public class S3CleanupRevokedSTSTokensRequest extends OMClientRequest {
+
+ public S3CleanupRevokedSTSTokensRequest(OMRequest omRequest) {
+ super(omRequest);
+ }
+
+ @Override
+ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) {
+ final CleanupRevokedSTSTokensRequest request = getOmRequest().getCleanupRevokedSTSTokensRequest();
+ final OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(getOmRequest());
+
+ final List accessKeyIds = request.getAccessKeyIdList();
+ return new S3CleanupRevokedSTSTokensResponse(accessKeyIds, omResponse.build());
+ }
+}
+
+
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/S3CleanupRevokedSTSTokensResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/S3CleanupRevokedSTSTokensResponse.java
new file mode 100644
index 000000000000..8cb756aa5a47
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/S3CleanupRevokedSTSTokensResponse.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.response.s3.security;
+
+import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.S3_REVOKED_STS_TOKEN_TABLE;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
+
+import jakarta.annotation.Nonnull;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+/**
+ * Response for CleanupRevokedSTSTokens request.
+ */
+@CleanupTableInfo(cleanupTables = {S3_REVOKED_STS_TOKEN_TABLE})
+public class S3CleanupRevokedSTSTokensResponse extends OMClientResponse {
+
+ private final List accessKeyIds;
+
+ public S3CleanupRevokedSTSTokensResponse(List accessKeyIds, @Nonnull OMResponse omResponse) {
+ super(omResponse);
+ this.accessKeyIds = accessKeyIds;
+ }
+
+ @Override
+ public void addToDBBatch(OMMetadataManager omMetadataManager, BatchOperation batchOperation) throws IOException {
+ if (accessKeyIds == null || accessKeyIds.isEmpty()) {
+ return;
+ }
+ if (!getOMResponse().hasStatus() || getOMResponse().getStatus() != OK) {
+ return;
+ }
+
+ final Table table = omMetadataManager.getS3RevokedStsTokenTable();
+ if (table == null) {
+ return;
+ }
+
+ for (String accessKeyId : accessKeyIds) {
+ table.deleteWithBatch(batchOperation, accessKeyId);
+ }
+ }
+}
+
+
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/RevokedSTSTokenCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/RevokedSTSTokenCleanupService.java
new file mode 100644
index 000000000000..f1f3fe8b2fb8
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/RevokedSTSTokenCleanupService.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.time.Clock;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.ClientVersion;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CleanupRevokedSTSTokensRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.ClientId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Background service that periodically scans the revoked STS token table and submits OM requests to
+ * remove entries whose session token has expired.
+ */
+public class RevokedSTSTokenCleanupService extends BackgroundService {
+ private static final Logger LOG = LoggerFactory.getLogger(RevokedSTSTokenCleanupService.class);
+
+ // Use a single thread
+ private static final int REVOKED_STS_TOKEN_CLEANER_CORE_POOL_SIZE = 1;
+ private static final Clock CLOCK = Clock.system(ZoneOffset.UTC);
+ private static final long CLEANUP_THRESHOLD = 12 * 60 * 60 * 1000L; // 12 hours in milliseconds
+
+ private final OzoneManager ozoneManager;
+ private final OMMetadataManager metadataManager;
+ private final AtomicBoolean suspended;
+ private final AtomicLong runCount;
+ private final AtomicLong submittedDeletedEntryCount;
+ // Dummy client ID to use for response, since this is triggered by a
+ // service, not the client.
+ private final ClientId clientId = ClientId.randomId();
+
+ /**
+ * Creates a Revoked STS Token cleanup service.
+ *
+ * @param interval the interval between successive runs
+ * @param unit the time unit for {@code interval}
+ * @param serviceTimeout timeout for a single run
+ * @param ozoneManager the OzoneManager instance
+ */
+ public RevokedSTSTokenCleanupService(long interval, TimeUnit unit, long serviceTimeout, OzoneManager ozoneManager) {
+ super(
+ "RevokedSTSTokenCleanupService", interval, unit, REVOKED_STS_TOKEN_CLEANER_CORE_POOL_SIZE,
+ serviceTimeout, ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.metadataManager = ozoneManager.getMetadataManager();
+ this.suspended = new AtomicBoolean(false);
+ this.runCount = new AtomicLong(0);
+ this.submittedDeletedEntryCount = new AtomicLong(0);
+ }
+
+ /**
+ * Returns the number of times this Background service has run.
+ * @return Long, run count.
+ */
+ @VisibleForTesting
+ public long getRunCount() {
+ return runCount.get();
+ }
+
+ /**
+ * Returns the number of entries this Background service has submitted for deletion.
+ * @return Long, submitted for deletion entry count.
+ */
+ @VisibleForTesting
+ public long getSubmittedDeletedEntryCount() {
+ return submittedDeletedEntryCount.get();
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ final BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ queue.add(new RevokedSTSTokenCleanupTask());
+ return queue;
+ }
+
+ private boolean shouldRun() {
+ return !suspended.get() && ozoneManager.isLeaderReady();
+ }
+
+ private class RevokedSTSTokenCleanupTask implements BackgroundTask {
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+ if (!shouldRun()) {
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+
+ final long startTime = Time.monotonicNow();
+ runCount.incrementAndGet();
+ final Table revokedStsTokenTable = metadataManager.getS3RevokedStsTokenTable();
+
+ // Collect entries that have existed for over 12 hours during the scan
+ final List accessKeyIdsToCleanup = new ArrayList<>();
+
+ // Iterate over all entries in the revoked STS token table and remove
+ // those whose initialCreationTimeMillis is more than 12 hours
+ try (Table.KeyValueIterator iterator = revokedStsTokenTable.iterator()) {
+ iterator.seekToFirst();
+ while (iterator.hasNext()) {
+ final Table.KeyValue entry = iterator.next();
+ final String accessKeyId = entry.getKey();
+ final Long initialCreationTimeMillis = entry.getValue();
+
+ if (shouldCleanup(initialCreationTimeMillis)) {
+ accessKeyIdsToCleanup.add(accessKeyId);
+ }
+ }
+ } catch (IOException e) {
+ // IO exceptions while iterating should be logged and retried next run.
+ LOG.error("Failure while scanning s3RevokedStsTokenTable. It will be retried in the next interval", e);
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+
+ final long deletedInRun;
+ if (!accessKeyIdsToCleanup.isEmpty()) {
+ LOG.info("Found {} revoked STS token entries to clean up.", accessKeyIdsToCleanup.size());
+ final boolean success = submitCleanupRequest(accessKeyIdsToCleanup);
+ if (success) {
+ deletedInRun = accessKeyIdsToCleanup.size();
+ } else {
+ deletedInRun = 0;
+ LOG.warn(
+ "RevokedSTSTokenCleanupService failed to submit cleanup request. Expired entries will be retried in " +
+ "the next run.");
+ }
+ } else {
+ return BackgroundTaskResult.EmptyTaskResult.newResult();
+ }
+
+ if (deletedInRun > 0) {
+ submittedDeletedEntryCount.addAndGet(deletedInRun);
+ }
+
+ final long elapsed = Time.monotonicNow() - startTime;
+ LOG.info(
+ "RevokedSTSTokenCleanupService run completed. deletedEntriesInRun={}, totalDeletedEntries={}, " +
+ "elapsedTimeMs={}", deletedInRun, submittedDeletedEntryCount.get(), elapsed);
+
+ final long resultCount = deletedInRun;
+ return () -> (int) resultCount;
+ }
+
+ /**
+ * Returns true if the given STS session token has expired.
+ */
+ private boolean shouldCleanup(long initialCreationTimeMillis) {
+ final long now = CLOCK.millis();
+
+ if (now - initialCreationTimeMillis > CLEANUP_THRESHOLD) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Revoked STS token entry created at {} is older than 12 hours, will clean up. Current time: {}",
+ initialCreationTimeMillis, now);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Builds and submits an OMRequest to delete the provided revoked STS token(s).
+ */
+ private boolean submitCleanupRequest(List expiredAccessKeyIds) {
+ final CleanupRevokedSTSTokensRequest request = CleanupRevokedSTSTokensRequest.newBuilder()
+ .addAllAccessKeyId(expiredAccessKeyIds)
+ .build();
+
+ final OMRequest omRequest = OMRequest.newBuilder()
+ .setCmdType(Type.CleanupRevokedSTSTokens)
+ .setCleanupRevokedSTSTokensRequest(request)
+ .setClientId(clientId.toString())
+ .setVersion(ClientVersion.CURRENT_VERSION)
+ .build();
+
+ try {
+ final OMResponse omResponse = OzoneManagerRatisUtils.submitRequest(
+ ozoneManager, omRequest, clientId, runCount.get());
+ return omResponse != null && omResponse.getSuccess();
+ } catch (ServiceException e) {
+ LOG.error("Revoked STS token cleanup request failed. Will retry at next run.", e);
+ return false;
+ }
+ }
+ }
+}
+
+
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestRevokedSTSTokenCleanupService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestRevokedSTSTokenCleanupService.java
new file mode 100644
index 000000000000..0b58c3090107
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestRevokedSTSTokenCleanupService.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.service;
+
+import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+import com.google.protobuf.ServiceException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.hdds.utils.db.StringInMemoryTestTable;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CleanupRevokedSTSTokensRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ozone.test.TestClock;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+/**
+ * Unit tests for {@link RevokedSTSTokenCleanupService}.
+ */
+public class TestRevokedSTSTokenCleanupService {
+ private OzoneManager ozoneManager;
+ private StringInMemoryTestTable revokedStsTokenTable;
+ private TestClock testClock;
+
+ @BeforeEach
+ public void setUp() {
+ testClock = TestClock.newInstance();
+ ozoneManager = mock(OzoneManager.class);
+ final OMMetadataManager omMetadataManager = mock(OMMetadataManager.class);
+ revokedStsTokenTable = new StringInMemoryTestTable<>();
+
+ when(ozoneManager.isLeaderReady()).thenReturn(true);
+ when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
+ when(ozoneManager.getThreadNamePrefix()).thenReturn("om-");
+ when(omMetadataManager.getS3RevokedStsTokenTable()).thenReturn(revokedStsTokenTable);
+ }
+
+ @Test
+ public void submitsCleanupRequestForOnlyExpiredTokens() throws Exception {
+ // If there are two revoked entries, one expired and one not expired, only the expired access key id should be
+ // submitted for cleanup.
+ final long nowMillis = testClock.millis();
+ final long expiredCreationTimeMillis = nowMillis - TimeUnit.HOURS.toMillis(13); // older than 12h threshold
+ final long validCreationTimeMillis = nowMillis - TimeUnit.HOURS.toMillis(1);
+ revokedStsTokenTable.put("ASIA1234567890", expiredCreationTimeMillis);
+ revokedStsTokenTable.put("ASIA4567890123", validCreationTimeMillis);
+
+ final AtomicReference capturedRequest = new AtomicReference<>();
+
+ try (MockedStatic ozoneManagerRatisUtilsMock = mockStatic(OzoneManagerRatisUtils.class)) {
+ mockRatisSubmitAndCapture(ozoneManagerRatisUtilsMock, capturedRequest);
+
+ // Run the cleanup service
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService = createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+ assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isEqualTo(1);
+
+ final OMRequest omRequest = capturedRequest.get();
+ assertThat(omRequest).isNotNull();
+ assertThat(omRequest.getCmdType()).isEqualTo(Type.CleanupRevokedSTSTokens);
+
+ final CleanupRevokedSTSTokensRequest cleanupRevokedSTSTokensRequest =
+ omRequest.getCleanupRevokedSTSTokensRequest();
+ assertThat(cleanupRevokedSTSTokensRequest.getAccessKeyIdList()).containsExactly("ASIA1234567890");
+ }
+ }
+
+ @Test
+ public void doesNotSubmitRequestWhenThereAreNoExpiredTokens() throws Exception {
+ // If only non-expired entries exist in the revoked sts token table, no cleanup request should be submitted and
+ // no metrics should be updated.
+ final long nowMillis = testClock.millis();
+ revokedStsTokenTable.put("ASIA1234567890", nowMillis - TimeUnit.HOURS.toMillis(1));
+ revokedStsTokenTable.put("ASIA0123456789", nowMillis - TimeUnit.HOURS.toMillis(2));
+
+ final AtomicReference capturedRequest = new AtomicReference<>();
+
+ try (MockedStatic ozoneManagerRatisUtilsMock = mockStatic(OzoneManagerRatisUtils.class)) {
+ mockRatisSubmitAndCapture(ozoneManagerRatisUtilsMock, capturedRequest);
+
+ // Run the cleanup service
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService = createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+ assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isZero();
+ assertThat(capturedRequest.get()).isNull();
+ }
+ }
+
+ @Test
+ public void handlesNoEntriesInRevokedSTSTokenTable() throws Exception {
+ // If the table is empty (which most of the time it will be), no cleanup requests should be submitted and no metrics
+ // should be updated.
+ final AtomicReference capturedRequest = new AtomicReference<>();
+
+ try (MockedStatic ozoneManagerRatisUtilsMock = mockStatic(OzoneManagerRatisUtils.class)) {
+ mockRatisSubmitAndCapture(ozoneManagerRatisUtilsMock, capturedRequest);
+
+ // Run the cleanup service
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService = createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+ assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isZero();
+ assertThat(capturedRequest.get()).isNull();
+ }
+ }
+
+ @Test
+ public void doesNotUpdateMetricsOnRatisSubmissionServiceExceptionFailure() throws Exception {
+ // If there are expired tokens in the table but the OM request submission to clean up the entries fails with a
+ // service exception, the metrics should not be updated
+ final long nowMillis = testClock.millis();
+ revokedStsTokenTable.put("ASIA1234567890", nowMillis - TimeUnit.HOURS.toMillis(13));
+ revokedStsTokenTable.put("ASIA0987654321", nowMillis - TimeUnit.HOURS.toMillis(14));
+
+ final AtomicInteger submitAttempts = new AtomicInteger(0);
+
+ try (MockedStatic ozoneManagerRatisUtilsMock = mockStatic(OzoneManagerRatisUtils.class)) {
+ // Simulate Ratis submission failure
+ mockRatisSubmitToFail(ozoneManagerRatisUtilsMock, submitAttempts);
+
+ // Run the cleanup service
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService = createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+ assertThat(submitAttempts.get()).isEqualTo(1);
+ assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isZero();
+ }
+ }
+
+ @Test
+ public void doesNotUpdateMetricsOnNonSuccessfulResponse() throws Exception {
+ // If there is an expired token in the table but the OM request submission to clean up the entries gets a
+ // non-successful response, the metrics should not be updated
+ final long nowMillis = testClock.millis();
+ revokedStsTokenTable.put("ASIA1234567890", nowMillis - TimeUnit.HOURS.toMillis(20));
+
+ try (MockedStatic ozoneManagerRatisUtilsMock = mockStatic(OzoneManagerRatisUtils.class)) {
+ // Return a non-successful response
+ mockRatisSubmitWithInternalErrorResponse(ozoneManagerRatisUtilsMock);
+
+ // Run the cleanup service
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService = createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+ assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isZero();
+ }
+ }
+
+ @Test
+ public void handlesAllExpiredTokens() throws Exception {
+ // If all the tokens in the table are expired on a particular run, ensure the metrics are updated appropriately
+ final long nowMillis = testClock.millis();
+ revokedStsTokenTable.put("ASIA1234567890", nowMillis - TimeUnit.HOURS.toMillis(13));
+ revokedStsTokenTable.put("ASIA0123456789", nowMillis - TimeUnit.HOURS.toMillis(14));
+ revokedStsTokenTable.put("ASIA9876543210", nowMillis - TimeUnit.HOURS.toMillis(15));
+
+ final AtomicReference capturedRequest = new AtomicReference<>();
+
+ try (MockedStatic ozoneManagerRatisUtilsMock = mockStatic(OzoneManagerRatisUtils.class)) {
+ mockRatisSubmitAndCapture(ozoneManagerRatisUtilsMock, capturedRequest);
+
+ // Run the cleanup service
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService = createAndRunCleanupService();
+
+ assertThat(revokedSTSTokenCleanupService.getRunCount()).isEqualTo(1);
+ assertThat(revokedSTSTokenCleanupService.getSubmittedDeletedEntryCount()).isEqualTo(3);
+
+ final OMRequest omRequest = capturedRequest.get();
+ assertThat(omRequest).isNotNull();
+ assertThat(omRequest.getCmdType()).isEqualTo(Type.CleanupRevokedSTSTokens);
+
+ final CleanupRevokedSTSTokensRequest cleanupRevokedSTSTokensRequest =
+ omRequest.getCleanupRevokedSTSTokensRequest();
+ assertThat(cleanupRevokedSTSTokensRequest.getAccessKeyIdList())
+ .containsExactlyInAnyOrder("ASIA1234567890", "ASIA0123456789", "ASIA9876543210");
+ }
+ }
+
+ private RevokedSTSTokenCleanupService createAndRunCleanupService() throws Exception {
+ final RevokedSTSTokenCleanupService revokedSTSTokenCleanupService =
+ new RevokedSTSTokenCleanupService(1, TimeUnit.HOURS, 1_000, ozoneManager);
+ revokedSTSTokenCleanupService.runPeriodicalTaskNow();
+ return revokedSTSTokenCleanupService;
+ }
+
+ private void mockRatisSubmitAndCapture(MockedStatic ozoneManagerRatisUtilsMock,
+ AtomicReference capturedRequest) {
+ ozoneManagerRatisUtilsMock.when(
+ () -> OzoneManagerRatisUtils.submitRequest(any(), any(), any(), anyLong()))
+ .thenAnswer(invocation -> {
+ final OMRequest omRequest = invocation.getArgument(1);
+ capturedRequest.set(omRequest);
+ return buildOkResponse(omRequest);
+ });
+ }
+
+ private void mockRatisSubmitToFail(MockedStatic ozoneManagerRatisUtilsMock,
+ AtomicInteger submitAttempts) {
+ ozoneManagerRatisUtilsMock.when(
+ () -> OzoneManagerRatisUtils.submitRequest(any(), any(), any(), anyLong()))
+ .thenAnswer(invocation -> {
+ submitAttempts.incrementAndGet();
+ throw new ServiceException("Simulated Ratis failure");
+ });
+ }
+
+ private void mockRatisSubmitWithInternalErrorResponse(MockedStatic omRatisUtilsMock) {
+ omRatisUtilsMock.when(
+ () -> OzoneManagerRatisUtils.submitRequest(any(), any(), any(), anyLong()))
+ .thenReturn(OMResponse.newBuilder()
+ .setCmdType(Type.CleanupRevokedSTSTokens)
+ .setStatus(Status.INTERNAL_ERROR)
+ .setSuccess(false)
+ .build());
+ }
+
+ private static OMResponse buildOkResponse(OMRequest omRequest) {
+ return OMResponse.newBuilder()
+ .setCmdType(omRequest.getCmdType())
+ .setStatus(OK)
+ .setSuccess(true)
+ .build();
+ }
+}
+
+
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/package-info.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/package-info.java
new file mode 100644
index 000000000000..71dfeed880d0
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Tests for OM services.
+ */
+package org.apache.hadoop.ozone.om.service;