diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index ad91ad0240..a7d9beea10 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -376,4 +376,18 @@ public static void enforceFeatureEnabledOrThrow( + "Defaults to enabled, but service providers may want to disable it.") .defaultValue(true) .buildFeatureConfiguration(); + + public static final FeatureConfiguration KMS_SUPPORT_LEVEL_S3 = + PolarisConfiguration.builder() + .key("ENABLE_KMS_SUPPORT_FOR_S3") + .catalogConfig("polaris.config.enable-kms-support-for-s3") + .description("If true, enables KMS support for S3 storage integration") + .defaultValue(KmsSupportLevel.NONE) + .buildFeatureConfiguration(); + + public enum KmsSupportLevel { + NONE, + CATALOG, + TABLE + } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java index 3e93ba7b4b..4657887c26 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java @@ -18,6 +18,7 @@ */ package org.apache.polaris.core.storage.aws; +import static org.apache.polaris.core.config.FeatureConfiguration.KMS_SUPPORT_LEVEL_S3; import static org.apache.polaris.core.config.FeatureConfiguration.STORAGE_CREDENTIAL_DURATION_SECONDS; import jakarta.annotation.Nonnull; @@ -27,6 +28,7 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Stream; +import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.RealmConfig; import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.InMemoryStorageIntegration; @@ -86,10 +88,11 @@ public AccessConfig getSubscopedCreds( .roleSessionName("PolarisAwsCredentialsStorageIntegration") .policy( policyString( - storageConfig.getAwsPartition(), + storageConfig, allowListOperation, allowedReadLocations, - allowedWriteLocations) + allowedWriteLocations, + realmConfig) .toJson()) .durationSeconds(storageCredentialDurationSeconds); credentialsProvider.ifPresent( @@ -158,10 +161,11 @@ public AccessConfig getSubscopedCreds( */ // TODO - add KMS key access private IamPolicy policyString( - String awsPartition, + AwsStorageConfigurationInfo awsStorageConfigurationInfo, boolean allowList, Set readLocations, - Set writeLocations) { + Set writeLocations, + RealmConfig realmConfig) { IamPolicy.Builder policyBuilder = IamPolicy.builder(); IamStatement.Builder allowGetObjectStatementBuilder = IamStatement.builder() @@ -171,7 +175,10 @@ private IamPolicy policyString( Map bucketListStatementBuilder = new HashMap<>(); Map bucketGetLocationStatementBuilder = new HashMap<>(); - String arnPrefix = arnPrefixForPartition(awsPartition); + String arnPrefix = arnPrefixForPartition(awsStorageConfigurationInfo.getAwsPartition()); + String roleARN = awsStorageConfigurationInfo.getRoleARN(); + String region = awsStorageConfigurationInfo.getRegion(); + String awsAccountId = awsStorageConfigurationInfo.getAwsAccountId(); Stream.concat(readLocations.stream(), writeLocations.stream()) .distinct() .forEach( @@ -232,13 +239,56 @@ private IamPolicy policyString( bucketGetLocationStatementBuilder .values() .forEach(statementBuilder -> policyBuilder.addStatement(statementBuilder.build())); - return policyBuilder.addStatement(allowGetObjectStatementBuilder.build()).build(); + + policyBuilder.addStatement(allowGetObjectStatementBuilder.build()); + + if (isKMSSupported(realmConfig)) { + policyBuilder.addStatement( + IamStatement.builder() + .effect(IamEffect.ALLOW) + .addAction("kms:GenerateDataKey") + .addAction("kms:Decrypt") + .addAction("kms:DescribeKey") + .addResource(getKMSArnPrefix(roleARN) + region + ":" + awsAccountId + ":key/*") + .addCondition(IamConditionOperator.STRING_EQUALS, "aws:PrincipalArn", roleARN) + .addCondition( + IamConditionOperator.STRING_LIKE, + "kms:EncryptionContext:aws:s3:arn", + arnPrefix + + StorageUtil.getBucket( + URI.create(awsStorageConfigurationInfo.getAllowedLocations().get(0))) + + "/*") + .addCondition( + IamConditionOperator.STRING_EQUALS, + "kms:ViaService", + getS3Endpoint(roleARN, region)) + .build()); + } + return policyBuilder.build(); } private static String arnPrefixForPartition(String awsPartition) { return String.format("arn:%s:s3:::", awsPartition != null ? awsPartition : "aws"); } + private static String getKMSArnPrefix(String roleArn) { + if (roleArn.contains("aws-cn")) { + return "arn:aws-cn:kms:"; + } else if (roleArn.contains("aws-us-gov")) { + return "arn:aws-us-gov:kms:"; + } else { + return "arn:aws:kms:"; + } + } + + private static String getS3Endpoint(String roleArn, String region) { + if (roleArn.contains("aws-cn")) { + return "s3." + region + ".amazonaws.com.cn"; + } else { + return "s3." + region + ".amazonaws.com"; + } + } + private static @Nonnull String parseS3Path(URI uri) { String bucket = StorageUtil.getBucket(uri); String path = trimLeadingSlash(uri.getPath()); @@ -251,4 +301,10 @@ private static String arnPrefixForPartition(String awsPartition) { } return path; } + + private boolean isKMSSupported(RealmConfig realmConfig) { + return !realmConfig + .getConfig(KMS_SUPPORT_LEVEL_S3) + .equals(FeatureConfiguration.KmsSupportLevel.NONE); + } } diff --git a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java index 7b4b50dece..93b4430731 100644 --- a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java @@ -21,10 +21,15 @@ import static org.assertj.core.api.Assertions.assertThat; import jakarta.annotation.Nonnull; +import jakarta.annotation.Nullable; import java.time.Instant; import java.util.List; import java.util.Optional; import java.util.Set; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.config.RealmConfigImpl; +import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.core.storage.AccessConfig; import org.apache.polaris.core.storage.BaseStorageIntegrationTest; import org.apache.polaris.core.storage.StorageAccessProperty; @@ -50,6 +55,20 @@ class AwsCredentialsStorageIntegrationTest extends BaseStorageIntegrationTest { + private static final RealmConfigImpl REALM_CONFIG_WITH_KMS = + new RealmConfigImpl( + new PolarisConfigurationStore() { + @Override + public @Nullable T getConfiguration( + @Nonnull RealmContext realmContext, String configName) { + if (FeatureConfiguration.KMS_SUPPORT_LEVEL_S3.key().equals(configName)) { + return (T) FeatureConfiguration.KmsSupportLevel.CATALOG; + } + return PolarisConfigurationStore.super.getConfiguration(realmContext, configName); + } + }, + () -> "realm"); + public static final Instant EXPIRE_TIME = Instant.now().plusMillis(3600_000); public static final AssumeRoleResponse ASSUME_ROLE_RESPONSE = @@ -117,6 +136,7 @@ public void testGetSubscopedCreds(String scheme) { public void testGetSubscopedCredsInlinePolicy(String awsPartition) { String roleARN; String region; + String accountId = "012345678901"; switch (awsPartition) { case AWS_PARTITION: roleARN = "arn:aws:iam::012345678901:role/jdoe"; @@ -133,7 +153,6 @@ public void testGetSubscopedCredsInlinePolicy(String awsPartition) { default: throw new IllegalArgumentException("Unknown aws partition: " + awsPartition); } - ; StsClient stsClient = Mockito.mock(StsClient.class); String externalId = "externalId"; String bucket = "bucket"; @@ -153,7 +172,7 @@ public void testGetSubscopedCredsInlinePolicy(String awsPartition) { assertThat(policy) .extracting(IamPolicy::statements) .asInstanceOf(InstanceOfAssertFactories.list(IamStatement.class)) - .hasSize(4) + .hasSize(5) .satisfiesExactly( statement -> assertThat(statement) @@ -228,7 +247,40 @@ public void testGetSubscopedCredsInlinePolicy(String awsPartition) { List.of( IamAction.create("s3:GetObject"), IamAction.create("s3:GetObjectVersion")), - IamStatement::actions)); + IamStatement::actions), + statement -> + assertThat(statement) + .returns(IamEffect.ALLOW, IamStatement::effect) + .returns( + List.of( + IamAction.create("kms:GenerateDataKey"), + IamAction.create("kms:Decrypt"), + IamAction.create("kms:DescribeKey")), + IamStatement::actions) + .satisfies( + st -> + assertThat(st.resources()) + .containsExactlyInAnyOrder( + IamResource.create( + kmsArn( + awsPartition, region, accountId)))) + .satisfies( + st -> + assertThat(st.conditions()) + .containsExactlyInAnyOrder( + IamCondition.create( + IamConditionOperator.STRING_EQUALS, + "aws:PrincipalArn", + roleARN), + IamCondition.create( + IamConditionOperator.STRING_EQUALS, + "kms:ViaService", + "s3." + region + ".amazonaws.com"), + IamCondition.create( + IamConditionOperator.STRING_LIKE, + "kms:EncryptionContext:aws:s3:arn", + s3Arn(awsPartition, bucket, null) + + "/*")))); }); return ASSUME_ROLE_RESPONSE; }); @@ -245,7 +297,7 @@ public void testGetSubscopedCredsInlinePolicy(String awsPartition) { .build(), stsClient) .getSubscopedCreds( - EMPTY_REALM_CONFIG, + REALM_CONFIG_WITH_KMS, true, Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), Set.of(s3Path(bucket, firstPath)), @@ -264,7 +316,7 @@ public void testGetSubscopedCredsInlinePolicy(String awsPartition) { .build(), stsClient) .getSubscopedCreds( - EMPTY_REALM_CONFIG, + REALM_CONFIG_WITH_KMS, true, Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), Set.of(s3Path(bucket, firstPath)), @@ -289,6 +341,8 @@ public void testGetSubscopedCredsInlinePolicyWithoutList() { String roleARN = "arn:aws:iam::012345678901:role/jdoe"; String externalId = "externalId"; String bucket = "bucket"; + String region = "us-east-2"; + String accountId = "012345678901"; String warehouseKeyPrefix = "path/to/warehouse"; String firstPath = warehouseKeyPrefix + "/namespace/table"; String secondPath = warehouseKeyPrefix + "/oldnamespace/table"; @@ -305,7 +359,7 @@ public void testGetSubscopedCredsInlinePolicyWithoutList() { assertThat(policy) .extracting(IamPolicy::statements) .asInstanceOf(InstanceOfAssertFactories.list(IamStatement.class)) - .hasSize(3) + .hasSize(4) .satisfiesExactly( statement -> assertThat(statement) @@ -351,7 +405,40 @@ public void testGetSubscopedCredsInlinePolicyWithoutList() { List.of( IamAction.create("s3:GetObject"), IamAction.create("s3:GetObjectVersion")), - IamStatement::actions)); + IamStatement::actions), + statement -> + assertThat(statement) + .returns(IamEffect.ALLOW, IamStatement::effect) + .returns( + List.of( + IamAction.create("kms:GenerateDataKey"), + IamAction.create("kms:Decrypt"), + IamAction.create("kms:DescribeKey")), + IamStatement::actions) + .satisfies( + st -> + assertThat(st.resources()) + .containsExactlyInAnyOrder( + IamResource.create( + kmsArn( + AWS_PARTITION, region, accountId)))) + .satisfies( + st -> + assertThat(st.conditions()) + .containsExactlyInAnyOrder( + IamCondition.create( + IamConditionOperator.STRING_EQUALS, + "aws:PrincipalArn", + roleARN), + IamCondition.create( + IamConditionOperator.STRING_EQUALS, + "kms:ViaService", + "s3." + region + ".amazonaws.com"), + IamCondition.create( + IamConditionOperator.STRING_LIKE, + "kms:EncryptionContext:aws:s3:arn", + s3Arn(AWS_PARTITION, bucket, null) + + "/*")))); }); return ASSUME_ROLE_RESPONSE; }); @@ -365,7 +452,7 @@ public void testGetSubscopedCredsInlinePolicyWithoutList() { .build(), stsClient) .getSubscopedCreds( - EMPTY_REALM_CONFIG, + REALM_CONFIG_WITH_KMS, false, /* allowList = false*/ Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), Set.of(s3Path(bucket, firstPath)), @@ -383,9 +470,11 @@ public void testGetSubscopedCredsInlinePolicyWithoutList() { @Test public void testGetSubscopedCredsInlinePolicyWithoutWrites() { StsClient stsClient = Mockito.mock(StsClient.class); - String roleARN = "arn:aws:iam::012345678901:role/jdoe"; + String accountId = "012345678901"; + String roleARN = "arn:aws:iam::" + accountId + ":role/jdoe"; String externalId = "externalId"; String bucket = "bucket"; + String region = "us-east-2"; String warehouseKeyPrefix = "path/to/warehouse"; String firstPath = warehouseKeyPrefix + "/namespace/table"; String secondPath = warehouseKeyPrefix + "/oldnamespace/table"; @@ -402,7 +491,7 @@ public void testGetSubscopedCredsInlinePolicyWithoutWrites() { assertThat(policy) .extracting(IamPolicy::statements) .asInstanceOf(InstanceOfAssertFactories.list(IamStatement.class)) - .hasSize(3) + .hasSize(4) .satisfiesExactly( statement -> assertThat(statement) @@ -446,7 +535,40 @@ public void testGetSubscopedCredsInlinePolicyWithoutWrites() { List.of( IamAction.create("s3:GetObject"), IamAction.create("s3:GetObjectVersion")), - IamStatement::actions)); + IamStatement::actions), + statement -> + assertThat(statement) + .returns(IamEffect.ALLOW, IamStatement::effect) + .returns( + List.of( + IamAction.create("kms:GenerateDataKey"), + IamAction.create("kms:Decrypt"), + IamAction.create("kms:DescribeKey")), + IamStatement::actions) + .satisfies( + st -> + assertThat(st.resources()) + .containsExactlyInAnyOrder( + IamResource.create( + kmsArn( + AWS_PARTITION, region, accountId)))) + .satisfies( + st -> + assertThat(st.conditions()) + .containsExactlyInAnyOrder( + IamCondition.create( + IamConditionOperator.STRING_EQUALS, + "aws:PrincipalArn", + roleARN), + IamCondition.create( + IamConditionOperator.STRING_EQUALS, + "kms:ViaService", + "s3." + region + ".amazonaws.com"), + IamCondition.create( + IamConditionOperator.STRING_LIKE, + "kms:EncryptionContext:aws:s3:arn", + s3Arn(AWS_PARTITION, bucket, null) + + "/*")))); }); return ASSUME_ROLE_RESPONSE; }); @@ -460,7 +582,7 @@ public void testGetSubscopedCredsInlinePolicyWithoutWrites() { .build(), stsClient) .getSubscopedCreds( - EMPTY_REALM_CONFIG, + REALM_CONFIG_WITH_KMS, true, /* allowList = true */ Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)), Set.of(), @@ -478,10 +600,12 @@ public void testGetSubscopedCredsInlinePolicyWithoutWrites() { @Test public void testGetSubscopedCredsInlinePolicyWithEmptyReadAndWrite() { StsClient stsClient = Mockito.mock(StsClient.class); - String roleARN = "arn:aws:iam::012345678901:role/jdoe"; + String accountId = "012345678901"; + String roleARN = "arn:aws:iam::" + accountId + ":role/jdoe"; String externalId = "externalId"; String bucket = "bucket"; String warehouseKeyPrefix = "path/to/warehouse"; + String region = "us-east-2"; Mockito.when(stsClient.assumeRole(Mockito.isA(AssumeRoleRequest.class))) .thenAnswer( invocation -> { @@ -495,7 +619,7 @@ public void testGetSubscopedCredsInlinePolicyWithEmptyReadAndWrite() { assertThat(policy) .extracting(IamPolicy::statements) .asInstanceOf(InstanceOfAssertFactories.list(IamStatement.class)) - .hasSize(2) + .hasSize(3) .satisfiesExactly( statement -> assertThat(statement) @@ -513,7 +637,40 @@ public void testGetSubscopedCredsInlinePolicyWithEmptyReadAndWrite() { List.of( IamAction.create("s3:GetObject"), IamAction.create("s3:GetObjectVersion")), - IamStatement::actions)); + IamStatement::actions), + statement -> + assertThat(statement) + .returns(IamEffect.ALLOW, IamStatement::effect) + .returns( + List.of( + IamAction.create("kms:GenerateDataKey"), + IamAction.create("kms:Decrypt"), + IamAction.create("kms:DescribeKey")), + IamStatement::actions) + .satisfies( + st -> + assertThat(st.resources()) + .containsExactlyInAnyOrder( + IamResource.create( + kmsArn( + AWS_PARTITION, region, accountId)))) + .satisfies( + st -> + assertThat(st.conditions()) + .containsExactlyInAnyOrder( + IamCondition.create( + IamConditionOperator.STRING_EQUALS, + "aws:PrincipalArn", + roleARN), + IamCondition.create( + IamConditionOperator.STRING_EQUALS, + "kms:ViaService", + "s3." + region + ".amazonaws.com"), + IamCondition.create( + IamConditionOperator.STRING_LIKE, + "kms:EncryptionContext:aws:s3:arn", + s3Arn(AWS_PARTITION, bucket, null) + + "/*")))); }); return ASSUME_ROLE_RESPONSE; }); @@ -527,7 +684,7 @@ public void testGetSubscopedCredsInlinePolicyWithEmptyReadAndWrite() { .build(), stsClient) .getSubscopedCreds( - EMPTY_REALM_CONFIG, + REALM_CONFIG_WITH_KMS, true, /* allowList = true */ Set.of(), Set.of(), @@ -600,7 +757,6 @@ public void testClientRegion(String awsPartition) { default: throw new IllegalArgumentException("Unknown aws partition: " + awsPartition); } - ; } @ParameterizedTest @@ -658,7 +814,6 @@ public void testNoClientRegion(String awsPartition) { default: throw new IllegalArgumentException("Unknown aws partition: " + awsPartition); } - ; } private static @Nonnull String s3Arn(String partition, String bucket, String keyPrefix) { @@ -672,4 +827,8 @@ public void testNoClientRegion(String awsPartition) { private static @Nonnull String s3Path(String bucket, String keyPrefix) { return "s3://" + bucket + "/" + keyPrefix; } + + private static @Nonnull String kmsArn(String partition, String region, String accountId) { + return "arn:" + partition + ":kms:" + region + ":" + accountId + ":key/*"; + } }