Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -349,4 +349,18 @@ public static void enforceFeatureEnabledOrThrow(
+ "it is still possible to enforce the uniqueness of table locations within a catalog.")
.defaultValue(false)
.buildFeatureConfiguration();

public static final FeatureConfiguration<KmsSupportLevel> KMS_SUPPORT_LEVEL_S3 =
PolarisConfiguration.<KmsSupportLevel>builder()
.key("ENABLE_KMS_SUPPORT_FOR_S3")
.catalogConfig("polaris.config.enable-kms-support-for-s3")
.description("If true, enables KMS support for S3 storage integration")
.defaultValue(KmsSupportLevel.NONE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to add an end-to-end test for non-default values... Otherwise, how do we know they work if configured? 😉

Ideally, we should test with MinIO too... It looks like MinIO has KMS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have Java based e2e tests? I can work on adding one for this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.buildFeatureConfiguration();

public enum KmsSupportLevel {
NONE,
CATALOG,
TABLE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.polaris.core.storage.aws;

import static org.apache.polaris.core.config.FeatureConfiguration.KMS_SUPPORT_LEVEL_S3;
import static org.apache.polaris.core.config.FeatureConfiguration.STORAGE_CREDENTIAL_DURATION_SECONDS;

import jakarta.annotation.Nonnull;
Expand All @@ -27,6 +28,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.polaris.core.config.FeatureConfiguration;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.storage.AccessConfig;
import org.apache.polaris.core.storage.InMemoryStorageIntegration;
Expand Down Expand Up @@ -81,10 +83,11 @@ public AccessConfig getSubscopedCreds(
.roleSessionName("PolarisAwsCredentialsStorageIntegration")
.policy(
policyString(
storageConfig.getRoleARN(),
storageConfig,
allowListOperation,
allowedReadLocations,
allowedWriteLocations)
allowedWriteLocations,
callContext)
.toJson())
.durationSeconds(storageCredentialDurationSeconds);
credentialsProvider.ifPresent(
Expand Down Expand Up @@ -141,9 +144,12 @@ public AccessConfig getSubscopedCreds(
* ListBucket privileges with no resources. This prevents us from sending an empty policy to AWS
* and just assuming the role with full privileges.
*/
// TODO - add KMS key access
private IamPolicy policyString(
String roleArn, boolean allowList, Set<String> readLocations, Set<String> writeLocations) {
AwsStorageConfigurationInfo awsStorageConfigurationInfo,
boolean allowList,
Set<String> readLocations,
Set<String> writeLocations,
CallContext callContext) {
IamPolicy.Builder policyBuilder = IamPolicy.builder();
IamStatement.Builder allowGetObjectStatementBuilder =
IamStatement.builder()
Expand All @@ -153,7 +159,10 @@ private IamPolicy policyString(
Map<String, IamStatement.Builder> bucketListStatementBuilder = new HashMap<>();
Map<String, IamStatement.Builder> bucketGetLocationStatementBuilder = new HashMap<>();

String arnPrefix = getArnPrefixFor(roleArn);
String roleARN = awsStorageConfigurationInfo.getRoleARN();
String arnPrefix = getArnPrefixFor(roleARN);
String region = awsStorageConfigurationInfo.getRegion();
String awsAccountId = awsStorageConfigurationInfo.getAwsAccountId();
Stream.concat(readLocations.stream(), writeLocations.stream())
.distinct()
.forEach(
Expand Down Expand Up @@ -214,7 +223,32 @@ private IamPolicy policyString(
bucketGetLocationStatementBuilder
.values()
.forEach(statementBuilder -> policyBuilder.addStatement(statementBuilder.build()));
return policyBuilder.addStatement(allowGetObjectStatementBuilder.build()).build();

policyBuilder.addStatement(allowGetObjectStatementBuilder.build());

if (isKMSSupported(callContext)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to think the KMS config rather belongs in AwsStorageConfigurationInfo.

We can still keep the feature flag as a global kill switch (in case unexpected behaviour occurs in runtime), but using KMS or not is probably something to be configured at each specific storage integration point. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that's related to storageConfig. But that would mean bringing bucket-specific config to Catalog configuration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why bucket-specific? Right now it's just an on/off flag. It looks similar to the pathStyleAccess setting (#2012)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that isKMSSupported is a global setting (in this PR), but there may be multiple catalogs in the system. Enabling KMS for one, will change the behaviour of "storage integration" code for all S3 catalogs.

policyBuilder.addStatement(
IamStatement.builder()
.effect(IamEffect.ALLOW)
.addAction("kms:GenerateDataKey")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we separate out the read and write paths into two different statements?

.addAction("kms:Decrypt")
.addAction("kms:DescribeKey")
.addResource(getKMSArnPrefix(roleARN) + region + ":" + awsAccountId + ":key/*")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this come from the catalog configuration instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the key? I was thinking explicity key support can be added in a follow-up PR where we can support KMS at a table level

.addCondition(IamConditionOperator.STRING_EQUALS, "aws:PrincipalArn", roleARN)
.addCondition(
IamConditionOperator.STRING_LIKE,
"kms:EncryptionContext:aws:s3:arn",
getArnPrefixFor(roleARN)
+ StorageUtil.getBucket(
URI.create(awsStorageConfigurationInfo.getAllowedLocations().get(0)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this support any valid S3-object-ARN prefix(e.g., arn:aws:s3:::my-bucket/reports/2025/*), not just arn:aws:s3:::my-bucket/*? If that's so. We can create a policy for each individual s3 path in allowedLocations, or even better, we can apply to each path in readLocations and writeLocations, so that the polices only apply to table-level locations.

Copy link
Contributor Author

@fivetran-ashokborra fivetran-ashokborra Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may run into issues like PackedPolicyTooLargeException if the table name or prefix path is longer. But yeah, we can restrict at a table level

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I agree with @flyrain this should be in the context of the allowed read and write paths, translating to read and write permissions on KMS with the restrictions.

With respect to encryption context, we should also support bucket keys in which case the encryption context will be at the bucket level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems more straightforward to generate the KMS policy separately directly using readLocations and writeLocations, combining with the comment here: https://github.com/apache/polaris/pull/1424/files#r2237156985. This approach should allow us to support table-level KMS without additional complexity.

With that in place, we could simplify KMS_SUPPORT_LEVEL_S3 to a simple boolean (enabled / disabled). PackedPolicyTooLargeException is still a valid concern, but we can surface a clear error if it ever occurs assuming it's a edge case.

+ "/*")
.addCondition(
IamConditionOperator.STRING_EQUALS,
"kms:ViaService",
getS3Endpoint(roleARN, region))
.build());
}
return policyBuilder.build();
}

private String getArnPrefixFor(String roleArn) {
Expand All @@ -227,6 +261,24 @@ private String getArnPrefixFor(String roleArn) {
}
}

private static String getKMSArnPrefix(String roleArn) {
if (roleArn.contains("aws-cn")) {
return "arn:aws-cn:kms:";
} else if (roleArn.contains("aws-us-gov")) {
return "arn:aws-us-gov:kms:";
} else {
return "arn:aws:kms:";
}
}

private static String getS3Endpoint(String roleArn, String region) {
if (roleArn.contains("aws-cn")) {
Copy link
Contributor

@pavibhai pavibhai Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do the following instead:

Region regionObj = Region.of(region);
String.format("s3.%s.%s", region.id(), region.metadata().partition().dnsSuffix()))

return "s3." + region + ".amazonaws.com.cn";
} else {
return "s3." + region + ".amazonaws.com";
}
}

private static @Nonnull String parseS3Path(URI uri) {
String bucket = StorageUtil.getBucket(uri);
String path = trimLeadingSlash(uri.getPath());
Expand All @@ -239,4 +291,11 @@ private String getArnPrefixFor(String roleArn) {
}
return path;
}

private boolean isKMSSupported(CallContext callContext) {
return !callContext
.getRealmConfig()
.getConfig(KMS_SUPPORT_LEVEL_S3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this approach to getting the config value, is KmsSupportLevel.TABLE relevant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm deferring the table level support in a follow-up PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this method going to work in the follow-up PR? :)

.equals(FeatureConfiguration.KmsSupportLevel.NONE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,37 @@

package org.apache.polaris.core.storage;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.time.Clock;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.config.FeatureConfiguration;
import org.apache.polaris.core.config.PolarisConfigurationStore;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.persistence.BasePersistence;
import org.mockito.Mockito;

public abstract class BaseStorageIntegrationTest {
protected CallContext newCallContext() {
PolarisConfigurationStore configStore =
new PolarisConfigurationStore() {
@Override
public <T> @Nullable T getConfiguration(
@Nonnull RealmContext realmContext, String configName) {
if (FeatureConfiguration.KMS_SUPPORT_LEVEL_S3.key().equals(configName)) {
return (T) FeatureConfiguration.KmsSupportLevel.CATALOG;
}
return PolarisConfigurationStore.super.getConfiguration(realmContext, configName);
}
};

return new PolarisCallContext(
() -> "realm", Mockito.mock(BasePersistence.class), Mockito.mock(PolarisDiagnostics.class));
() -> "realm",
Mockito.mock(BasePersistence.class),
Mockito.mock(PolarisDiagnostics.class),
configStore,
Clock.systemDefaultZone());
}
}
Loading