diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java index a3b6e128132..ee5c7548757 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java @@ -19,10 +19,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import jakarta.annotation.Nonnull; import java.io.IOException; import java.nio.Buffer; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; @@ -47,6 +49,7 @@ import org.apache.ozone.erasurecode.rawcoder.RawErasureEncoder; import org.apache.ozone.erasurecode.rawcoder.util.CodecUtil; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.function.CheckedRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,6 +89,13 @@ public final class ECKeyOutputStream extends KeyOutputStream // how much data has been ingested into the stream private long writeOffset; + private List> preCommits = Collections.emptyList(); + + @Override + public void setPreCommits(@Nonnull List> preCommits) { + this.preCommits = preCommits; + } + @VisibleForTesting public void insertFlushCheckpoint(long version) throws IOException { addStripeToQueue(new CheckpointDummyStripe(version)); @@ -485,6 +495,9 @@ public void close() throws IOException { "Expected: %d and actual %d write sizes do not match", expectedSize, offset)); } + for (CheckedRunnable preCommit : preCommits) { + preCommit.run(); + } blockOutputStreamEntryPool.commitKey(offset); } } catch (ExecutionException e) { diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java index f9a47a9f55e..fffe6e6e81d 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java @@ -19,9 +19,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import jakarta.annotation.Nonnull; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -44,6 +46,7 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; +import org.apache.ratis.util.function.CheckedRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,6 +85,12 @@ public class KeyDataStreamOutput extends AbstractDataStreamOutput */ private boolean atomicKeyCreation; + private List> preCommits = Collections.emptyList(); + + public void setPreCommits(@Nonnull List> preCommits) { + this.preCommits = preCommits; + } + @VisibleForTesting public List getStreamEntries() { return blockDataStreamOutputEntryPool.getStreamEntries(); @@ -431,6 +440,9 @@ public void close() throws IOException { String.format("Expected: %d and actual %d write sizes do not match", expectedSize, offset)); } + for (CheckedRunnable preCommit : preCommits) { + preCommit.run(); + } blockDataStreamOutputEntryPool.commitKey(offset); } finally { blockDataStreamOutputEntryPool.cleanup(); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java index 778ac7e2f4f..2f9edfa94ea 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java @@ -19,10 +19,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import jakarta.annotation.Nonnull; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -110,6 +112,11 @@ public class KeyOutputStream extends OutputStream private final int maxConcurrentWritePerKey; private final KeyOutputStreamSemaphore keyOutputStreamSemaphore; + private List> preCommits = Collections.emptyList(); + + public void setPreCommits(@Nonnull List> preCommits) { + this.preCommits = preCommits; + } @VisibleForTesting KeyOutputStreamSemaphore getRequestSemaphore() { @@ -655,6 +662,9 @@ private void closeInternal() throws IOException { String.format("Expected: %d and actual %d write sizes do not match", expectedSize, offset)); } + for (CheckedRunnable preCommit : preCommits) { + preCommit.run(); + } blockOutputStreamEntryPool.commitKey(offset); } finally { blockOutputStreamEntryPool.cleanup(); diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/presigned_url_helper.py b/hadoop-ozone/dist/src/main/smoketest/s3/presigned_url_helper.py new file mode 100644 index 00000000000..8b5cef974f5 --- /dev/null +++ b/hadoop-ozone/dist/src/main/smoketest/s3/presigned_url_helper.py @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib + + +def generate_presigned_put_object_url( + aws_access_key_id=None, + aws_secret_access_key=None, + bucket_name=None, + object_key=None, + region_name='us-east-1', + expiration=3600, + content_type=None, + endpoint_url=None, +): + """ + Generate a presigned URL for PUT Object. This function creates the S3 client internally. + """ + try: + import boto3 + + client_args = { + 'service_name': 's3', + 'region_name': region_name, + } + + if aws_access_key_id and aws_secret_access_key: + client_args['aws_access_key_id'] = aws_access_key_id + client_args['aws_secret_access_key'] = aws_secret_access_key + + if endpoint_url: + client_args['endpoint_url'] = endpoint_url + + s3_client = boto3.client(**client_args) + + params = { + 'Bucket': bucket_name, + 'Key': object_key, + } + + if content_type: + params['ContentType'] = content_type + + presigned_url = s3_client.generate_presigned_url( + ClientMethod='put_object', + Params=params, + ExpiresIn=expiration + ) + + return presigned_url + + except Exception as e: + raise Exception(f"Failed to generate presigned URL: {str(e)}") + + +def compute_sha256_file(path): + """Compute SHA256 hex digest for the entire file content at path.""" + with open(path, 'rb') as f: + return hashlib.sha256(f.read()).hexdigest() diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/presignedurl.robot b/hadoop-ozone/dist/src/main/smoketest/s3/presignedurl.robot new file mode 100644 index 00000000000..8cc4ff6cbf7 --- /dev/null +++ b/hadoop-ozone/dist/src/main/smoketest/s3/presignedurl.robot @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +*** Settings *** +Documentation S3 gateway test with aws cli +Library OperatingSystem +Library String +Library ./presigned_url_helper.py +Resource ../commonlib.robot +Resource commonawslib.robot +Test Timeout 5 minutes +Suite Setup Setup s3 tests + +*** Variables *** +${ENDPOINT_URL} http://s3g:9878 +${OZONE_TEST} true +${BUCKET} generated + +*** Test Cases *** +Presigned URL PUT Object + [Documentation] Test presigned URL PUT object + Execute echo "Randomtext" > /tmp/testfile + ${ACCESS_KEY} = Execute aws configure get aws_access_key_id + ${SECRET_ACCESS_KEY} = Execute aws configure get aws_secret_access_key + ${presigned_url}= Generate Presigned Put Object Url ${ACCESS_KEY} ${SECRET_ACCESS_KEY} ${BUCKET} test-presigned-put us-east-1 3600 ${EMPTY} ${ENDPOINT_URL} + ${SHA256} = Compute Sha256 File /tmp/testfile + ${result} = Execute curl -X PUT -T "/tmp/testfile" -H "x-amz-content-sha256: ${SHA256}" "${presigned_url}" + Should Not Contain ${result} Error + ${head_result} = Execute AWSS3ApiCli head-object --bucket ${BUCKET} --key test-presigned-put + Should Not Contain ${head_result} Error + +Presigned URL PUT Object using wrong x-amz-content-sha256 + [Documentation] Test presigned URL PUT object with wrong x-amz-content-sha256 + Execute echo "Randomtext" > /tmp/testfile + ${ACCESS_KEY} = Execute aws configure get aws_access_key_id + ${SECRET_ACCESS_KEY} = Execute aws configure get aws_secret_access_key + ${presigned_url}= Generate Presigned Put Object Url ${ACCESS_KEY} ${SECRET_ACCESS_KEY} ${BUCKET} test-presigned-put-wrong-sha us-east-1 3600 ${EMPTY} ${ENDPOINT_URL} + ${result} = Execute curl -X PUT -T "/tmp/testfile" -H "x-amz-content-sha256: wronghash" "${presigned_url}" + Should Contain ${result} The provided 'x-amz-content-sha256' header does not match the computed hash. + ${head_result} = Execute AWSS3APICli and ignore error head-object --bucket ${BUCKET} --key test-presigned-put-wrong-sha + Should contain ${head_result} 404 + Should contain ${head_result} Not Found diff --git a/hadoop-ozone/integration-test-s3/pom.xml b/hadoop-ozone/integration-test-s3/pom.xml index 55864a27902..46b3cbc6513 100644 --- a/hadoop-ozone/integration-test-s3/pom.xml +++ b/hadoop-ozone/integration-test-s3/pom.xml @@ -66,6 +66,11 @@ hadoop-common test + + org.apache.kerby + kerby-util + test + org.apache.ozone hdds-common diff --git a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java index 5e200e1350a..c872d19f527 100644 --- a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java +++ b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java @@ -1152,6 +1152,41 @@ public void testPresignedUrlPutObject() throws Exception { } } + @Test + public void testPresignedUrlPutSingleChunkWithWrongSha256() throws Exception { + final String keyName = getKeyName(); + + // Test PutObjectRequest presigned URL + GeneratePresignedUrlRequest generatePresignedUrlRequest = + new GeneratePresignedUrlRequest(BUCKET_NAME, keyName).withMethod(HttpMethod.PUT).withExpiration(expiration); + URL presignedUrl = s3Client.generatePresignedUrl(generatePresignedUrlRequest); + + Map> headers = new HashMap<>(); + List sha256Value = new ArrayList<>(); + sha256Value.add("wrong-sha256-value"); + headers.put("x-amz-content-sha256", sha256Value); + + HttpURLConnection connection = null; + try { + connection = S3SDKTestUtils.openHttpURLConnection(presignedUrl, "PUT", + headers, CONTENT.getBytes(StandardCharsets.UTF_8)); + int responseCode = connection.getResponseCode(); + assertEquals(400, responseCode, "PutObject presigned URL should return 400 because of wrong SHA256"); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + + // Verify the object was not uploaded + AmazonServiceException ase = assertThrows(AmazonServiceException.class, + () -> s3Client.getObject(BUCKET_NAME, keyName)); + + assertEquals(ErrorType.Client, ase.getErrorType()); + assertEquals(404, ase.getStatusCode()); + assertEquals("NoSuchKey", ase.getErrorCode()); + } + @Test public void testPresignedUrlMultipartUpload(@TempDir Path tempDir) throws Exception { final String keyName = getKeyName(); diff --git a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java index 09026dcb918..73dac51346d 100644 --- a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java +++ b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java @@ -67,6 +67,7 @@ import org.apache.hadoop.ozone.s3.S3ClientFactory; import org.apache.hadoop.ozone.s3.awssdk.S3SDKTestUtils; import org.apache.hadoop.ozone.s3.endpoint.S3Owner; +import org.apache.hadoop.ozone.s3.util.S3Consts; import org.apache.hadoop.security.UserGroupInformation; import org.apache.ozone.test.NonHATests; import org.apache.ozone.test.OzoneTestBase; @@ -645,6 +646,41 @@ public void testPresignedUrlPut() throws Exception { assertEquals(CONTENT, actualContent); } + @Test + public void testPresignedUrlPutSingleChunkWithWrongSha256() throws Exception { + final String keyName = getKeyName(); + + PutObjectRequest objectRequest = PutObjectRequest.builder().bucket(BUCKET_NAME).key(keyName).build(); + + PutObjectPresignRequest presignRequest = PutObjectPresignRequest.builder() + .signatureDuration(duration) + .putObjectRequest(objectRequest) + .build(); + + PresignedPutObjectRequest presignedRequest = presigner.presignPutObject(presignRequest); + + Map> headers = presignedRequest.signedHeaders(); + List sha256 = new ArrayList<>(); + sha256.add("wrong-sha256-value"); + headers.put(S3Consts.X_AMZ_CONTENT_SHA256, sha256); + + // use http url connection + HttpURLConnection connection = null; + try { + connection = S3SDKTestUtils.openHttpURLConnection(presignedRequest.url(), "PUT", + headers, CONTENT.getBytes(StandardCharsets.UTF_8)); + int responseCode = connection.getResponseCode(); + assertEquals(400, responseCode, "PutObject presigned URL should return 400 because of wrong SHA256"); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + + // Verify the object was not uploaded + assertThrows(NoSuchKeyException.class, () -> s3Client.headObject(b -> b.bucket(BUCKET_NAME).key(keyName))); + } + @Test public void testPresignedUrlMultipartUpload(@TempDir Path tempDir) throws Exception { final String keyName = getKeyName(); diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/MultiDigestInputStream.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/MultiDigestInputStream.java new file mode 100644 index 00000000000..587cbec0516 --- /dev/null +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/MultiDigestInputStream.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.s3; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * An InputStream that computes multiple message digests simultaneously + * as data is read from the underlying stream. + * + *

+ * This class extends {@link FilterInputStream} and allows multiple digest + * algorithms (for example, MD5 or SHA-256) to be computed in a single pass + * over the data. This is more efficient than reading the stream multiple + * times when multiple digests are required. + *

+ * + *

Important note about relationship to {@code DigestInputStream}:

+ *
    + *
  • This class is conceptually similar to {@link java.security.DigestInputStream}. + * Several methods (notably {@link #read()} , {@link #read(byte[], int, int)} and + * {@link #on(boolean)}) follow the same behavior and semantics as in + * {@code DigestInputStream} and are documented here with that intent. + *
  • + *
  • Where method signatures differ from {@code DigestInputStream} (for + * example {@link #getMessageDigest(String)} which takes an algorithm name + * and returns the corresponding digest), the difference is explicitly + * documented on the method itself.
  • + *
+ * + *

Example usage:

+ *
+ * MessageDigest md5 = MessageDigest.getInstance("MD5");
+ * MessageDigest sha256 = MessageDigest.getInstance("SHA-256");
+ * MultiDigestInputStream mdis = new MultiDigestInputStream(inputStream, md5, sha256);
+ * // Read from mdis (reads will update all registered digests while 'on' is true)
+ * byte[] md5Hash = mdis.getMessageDigest("MD5").digest();
+ * byte[] sha256Hash = mdis.getMessageDigest("SHA-256").digest();
+ * 
+ * + *

Notes:

+ *
    + *
  • The constructor accepts one or more already-created {@link MessageDigest} + * instances; the digests are kept and updated as data is read.
  • + *
  • Call {@link #on(boolean)} with {@code false} to temporarily disable + * digest updates (for example, to skip computing during certain reads), + * and {@code true} to re-enable. This behavior mirrors + * {@link java.security.DigestInputStream#on(boolean)}.
  • + *
  • {@link #getAllDigests()} returns a copy of the internal digest map.
  • + *
+ * + * @see java.security.DigestInputStream + */ +public class MultiDigestInputStream extends FilterInputStream { + + private final Map digests; + private boolean on = true; + + /** + * Creates a MultiDigestInputStream with the specified digests. + * + * @param in the underlying input stream + * @param inputDigests the message digest instances to compute (may be zero-length) + */ + public MultiDigestInputStream(InputStream in, Collection inputDigests) { + super(in); + this.digests = new HashMap<>(); + for (MessageDigest digest : inputDigests) { + digests.put(digest.getAlgorithm(), digest); + } + } + + /** + * Reads the next byte of data from the input stream. If a byte is read and + * digest updates are enabled (see {@link #on(boolean)}), the byte is + * supplied to all registered digests. + * + * @return the next byte of data, or -1 if the end of the stream is reached + * @throws IOException if an I/O error occurs + */ + @Override + public int read() throws IOException { + int ch = in.read(); + if (ch != -1) { + updateDigests((byte) ch); + } + return ch; + } + + /** + * Reads up to {@code len} bytes of data into an array of bytes from the + * input stream. If bytes are read and digest updates are enabled, the + * read bytes are supplied to all registered digests. + * + * @param b the buffer into which the data is read + * @param off the start offset in array {@code b} at which the data is written + * @param len the maximum number of bytes to read + * @return the total number of bytes read into the buffer, or -1 if there is + * no more data because the end of the stream has been reached + * @throws IOException if an I/O error occurs + */ + @Override + public int read(byte[] b, int off, int len) throws IOException { + int bytesRead = in.read(b, off, len); + if (bytesRead > 0) { + updateDigests(b, off, bytesRead); + } + return bytesRead; + } + + private void updateDigests(byte b) { + if (!on) { + return; + } + for (MessageDigest digest : digests.values()) { + digest.update(b); + } + } + + private void updateDigests(byte[] b, int off, int len) { + if (!on) { + return; + } + for (MessageDigest digest : digests.values()) { + digest.update(b, off, len); + } + } + + /** + * Gets the {@link MessageDigest} instance for the specified algorithm. + * + *

Note: {@code DigestInputStream#getMessageDigest()} returns + * the single digest instance associated with that stream. This class may + * manage multiple digests; therefore this method accepts an algorithm name + * and returns the corresponding {@link MessageDigest} or {@code null} if not + * registered. + * + * @param algorithm the digest algorithm name (for example, "MD5" or "SHA-256") + * @return the MessageDigest instance for the specified algorithm, + * or {@code null} if the algorithm was not registered + * @see java.security.DigestInputStream#getMessageDigest() + */ + public MessageDigest getMessageDigest(String algorithm) { + return digests.get(algorithm); + } + + /** + * Returns a copy of the map of all digests being computed. + * Modifications to the returned map do not affect the stream's internal state. + * + * @return a shallow copy of the digests map (algorithm name to MessageDigest) + */ + public Map getAllDigests() { + return new HashMap<>(digests); + } + + /** + * Resets all message digests by calling {@link MessageDigest#reset()} on each + * registered digest. + */ + public void resetDigests() { + for (MessageDigest digest : digests.values()) { + digest.reset(); + } + } + + /** + * Enable or disable updating of the registered digests while reading. + * + * @param enabled true to turn the digest function on, false to turn it off + */ + public void on(boolean enabled) { + this.on = enabled; + } + + /** + * Associates the given MessageDigest with the specified algorithm name, + * replacing any existing digest for that algorithm. + * + * @param algorithm the digest algorithm name + * @param digest the MessageDigest instance to set + */ + public void setMessageDigest(String algorithm, MessageDigest digest) { + digests.put(algorithm, digest); + } + + /** + * Adds a new message digest algorithm to be computed. If the algorithm name + * already exists in the map, it will be replaced by the newly created + * MessageDigest instance. + * + * @param algorithm the digest algorithm name + * @throws NoSuchAlgorithmException if the algorithm is not available + */ + public void addMessageDigest(String algorithm) + throws NoSuchAlgorithmException { + digests.put(algorithm, MessageDigest.getInstance(algorithm)); + } + + /** + * Removes and returns the message digest instance for the specified + * algorithm name. + * + * @param algorithm the digest algorithm name to remove + * @return the removed MessageDigest, or {@code null} if not found + */ + public MessageDigest removeMessageDigest(String algorithm) { + return digests.remove(algorithm); + } + + /** + * Returns a string representation of this stream and its message digests. + * + * @return a string representation of the object + */ + @Override + public String toString() { + return getClass().getName() + " [on=" + on + ", algorithms=" + + digests.keySet() + "]"; + } +} diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index c6a2b653909..45e20230337 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -74,6 +74,8 @@ import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -124,6 +126,7 @@ import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo; import org.apache.hadoop.ozone.s3.HeaderPreprocessor; +import org.apache.hadoop.ozone.s3.MultiDigestInputStream; import org.apache.hadoop.ozone.s3.SignedChunksInputStream; import org.apache.hadoop.ozone.s3.UnsignedChunksInputStream; import org.apache.hadoop.ozone.s3.endpoint.S3Tagging.Tag; @@ -139,6 +142,7 @@ import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.apache.hadoop.util.Time; import org.apache.http.HttpStatus; +import org.apache.ratis.util.function.CheckedRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -155,6 +159,7 @@ public class ObjectEndpoint extends EndpointBase { LoggerFactory.getLogger(ObjectEndpoint.class); private static final ThreadLocal E_TAG_PROVIDER; + private static final ThreadLocal SHA_256_PROVIDER; static { E_TAG_PROVIDER = ThreadLocal.withInitial(() -> { @@ -164,11 +169,19 @@ public class ObjectEndpoint extends EndpointBase { throw new RuntimeException(e); } }); + + SHA_256_PROVIDER = ThreadLocal.withInitial(() -> { + try { + return MessageDigest.getInstance(OzoneConsts.FILE_HASH); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + }); } /*FOR the feature Overriding Response Header https://docs.aws.amazon.com/de_de/AmazonS3/latest/API/API_GetObject.html */ - private Map overrideQueryParameter; + private final Map overrideQueryParameter; private int bufferSize; private int chunkSize; private boolean datastreamEnabled; @@ -226,7 +239,7 @@ public Response put( PerformanceStringBuilder perf = new PerformanceStringBuilder(); String copyHeader = null, storageType = null, storageConfig = null; - DigestInputStream digestInputStream = null; + MultiDigestInputStream multiDigestInputStream = null; try { if (aclMarker != null) { s3GAction = S3GAction.PUT_OBJECT_ACL; @@ -302,7 +315,7 @@ public Response put( // Normal put object S3ChunkInputStreamInfo chunkInputStreamInfo = getS3ChunkInputStreamInfo(body, length, amzDecodedLength, keyPath); - digestInputStream = chunkInputStreamInfo.getDigestInputStream(); + multiDigestInputStream = chunkInputStreamInfo.getMultiDigestInputStream(); length = chunkInputStreamInfo.getEffectiveLength(); Map customMetadata = @@ -315,22 +328,37 @@ public Response put( perf.appendStreamMode(); Pair keyWriteResult = ObjectEndpointStreaming .put(bucket, keyPath, length, replicationConfig, chunkSize, - customMetadata, tags, digestInputStream, perf); + customMetadata, tags, multiDigestInputStream, getHeaders(), signatureInfo.isSignPayload(), perf); eTag = keyWriteResult.getKey(); putLength = keyWriteResult.getValue(); } else { + final String amzContentSha256Header = + validateSignatureHeader(getHeaders(), keyPath, signatureInfo.isSignPayload()); try (OzoneOutputStream output = getClientProtocol().createKey( volume.getName(), bucketName, keyPath, length, replicationConfig, customMetadata, tags)) { long metadataLatencyNs = getMetrics().updatePutKeyMetadataStats(startNanos); perf.appendMetaLatencyNanos(metadataLatencyNs); - putLength = IOUtils.copyLarge(digestInputStream, output, 0, length, + putLength = IOUtils.copyLarge(multiDigestInputStream, output, 0, length, new byte[getIOBufferSize(length)]); eTag = DatatypeConverter.printHexBinary( - digestInputStream.getMessageDigest().digest()) + multiDigestInputStream.getMessageDigest(OzoneConsts.MD5_HASH).digest()) .toLowerCase(); output.getMetadata().put(OzoneConsts.ETAG, eTag); + + // If sha256Digest exists, this request must validate x-amz-content-sha256 + MessageDigest sha256Digest = multiDigestInputStream.getMessageDigest(OzoneConsts.FILE_HASH); + if (sha256Digest != null) { + final String actualSha256 = DatatypeConverter.printHexBinary( + sha256Digest.digest()).toLowerCase(); + CheckedRunnable preCommit = () -> { + if (!amzContentSha256Header.equals(actualSha256)) { + throw S3ErrorTable.newError(S3ErrorTable.X_AMZ_CONTENT_SHA256_MISMATCH, keyPath); + } + }; + output.getKeyOutputStream().setPreCommits(Collections.singletonList(preCommit)); + } } } getMetrics().incPutKeySuccessLength(putLength); @@ -383,8 +411,8 @@ public Response put( } finally { // Reset the thread-local message digest instance in case of exception // and MessageDigest#digest is never called - if (digestInputStream != null) { - digestInputStream.getMessageDigest().reset(); + if (multiDigestInputStream != null) { + multiDigestInputStream.resetDigests(); } if (auditSuccess) { long opLatencyNs = getMetrics().updateCreateKeySuccessStats(startNanos); @@ -941,13 +969,13 @@ private Response createMultipartKey(OzoneVolume volume, OzoneBucket ozoneBucket, throws IOException, OS3Exception { long startNanos = Time.monotonicNowNanos(); String copyHeader = null; - DigestInputStream digestInputStream = null; + MultiDigestInputStream multiDigestInputStream = null; final String bucketName = ozoneBucket.getName(); try { String amzDecodedLength = getHeaders().getHeaderString(DECODED_CONTENT_LENGTH_HEADER); S3ChunkInputStreamInfo chunkInputStreamInfo = getS3ChunkInputStreamInfo( body, length, amzDecodedLength, key); - digestInputStream = chunkInputStreamInfo.getDigestInputStream(); + multiDigestInputStream = chunkInputStreamInfo.getMultiDigestInputStream(); length = chunkInputStreamInfo.getEffectiveLength(); copyHeader = getHeaders().getHeaderString(COPY_SOURCE_HEADER); @@ -967,7 +995,7 @@ private Response createMultipartKey(OzoneVolume volume, OzoneBucket ozoneBucket, perf.appendStreamMode(); return ObjectEndpointStreaming .createMultipartKey(ozoneBucket, key, length, partNumber, - uploadID, chunkSize, digestInputStream, perf); + uploadID, chunkSize, multiDigestInputStream, perf); } // OmMultipartCommitUploadPartInfo can only be gotten after the // OzoneOutputStream is closed, so we need to save the OzoneOutputStream @@ -1044,9 +1072,9 @@ private Response createMultipartKey(OzoneVolume volume, OzoneBucket ozoneBucket, partNumber, uploadID)) { metadataLatencyNs = getMetrics().updatePutKeyMetadataStats(startNanos); - putLength = IOUtils.copyLarge(digestInputStream, ozoneOutputStream, 0, length, + putLength = IOUtils.copyLarge(multiDigestInputStream, ozoneOutputStream, 0, length, new byte[getIOBufferSize(length)]); - byte[] digest = digestInputStream.getMessageDigest().digest(); + byte[] digest = multiDigestInputStream.getMessageDigest(OzoneConsts.MD5_HASH).digest(); ozoneOutputStream.getMetadata() .put(OzoneConsts.ETAG, DatatypeConverter.printHexBinary(digest).toLowerCase()); outputStream = ozoneOutputStream; @@ -1095,8 +1123,8 @@ private Response createMultipartKey(OzoneVolume volume, OzoneBucket ozoneBucket, } finally { // Reset the thread-local message digest instance in case of exception // and MessageDigest#digest is never called - if (digestInputStream != null) { - digestInputStream.getMessageDigest().reset(); + if (multiDigestInputStream != null) { + multiDigestInputStream.resetDigests(); } } } @@ -1475,6 +1503,11 @@ public MessageDigest getMessageDigestInstance() { return E_TAG_PROVIDER.get(); } + @VisibleForTesting + public MessageDigest getSha256DigestInstance() { + return SHA_256_PROVIDER.get(); + } + private String extractPartsCount(String eTag) { if (eTag.contains("-")) { String[] parts = eTag.replace("\"", "").split("-"); @@ -1524,23 +1557,30 @@ private S3ChunkInputStreamInfo getS3ChunkInputStreamInfo( effectiveLength = contentLength; } - // DigestInputStream is used for ETag calculation - DigestInputStream digestInputStream = new DigestInputStream(chunkInputStream, getMessageDigestInstance()); - return new S3ChunkInputStreamInfo(digestInputStream, effectiveLength); + // MessageDigest is used for ETag calculation + // and Sha256Digest is used for "x-amz-content-sha256" header verification + List digests = new ArrayList<>(); + digests.add(getMessageDigestInstance()); + if (!hasUnsignedPayload(amzContentSha256Header) && !hasMultiChunksPayload(amzContentSha256Header)) { + digests.add(getSha256DigestInstance()); + } + MultiDigestInputStream multiDigestInputStream = + new MultiDigestInputStream(chunkInputStream, digests); + return new S3ChunkInputStreamInfo(multiDigestInputStream, effectiveLength); } @Immutable static final class S3ChunkInputStreamInfo { - private final DigestInputStream digestInputStream; + private final MultiDigestInputStream multiDigestInputStream; private final long effectiveLength; - S3ChunkInputStreamInfo(DigestInputStream digestInputStream, long effectiveLength) { - this.digestInputStream = digestInputStream; + S3ChunkInputStreamInfo(MultiDigestInputStream multiDigestInputStream, long effectiveLength) { + this.multiDigestInputStream = multiDigestInputStream; this.effectiveLength = effectiveLength; } - public DigestInputStream getDigestInputStream() { - return digestInputStream; + public MultiDigestInputStream getMultiDigestInputStream() { + return multiDigestInputStream; } public long getEffectiveLength() { diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java index 186719c2b78..8773bf3ca68 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java @@ -20,12 +20,15 @@ import static org.apache.hadoop.ozone.audit.AuditLogger.PerformanceStringBuilder; import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST; import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD; +import static org.apache.hadoop.ozone.s3.util.S3Utils.validateSignatureHeader; import static org.apache.hadoop.ozone.s3.util.S3Utils.wrapInQuotes; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.security.DigestInputStream; +import java.security.MessageDigest; +import java.util.Collections; import java.util.Map; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; @@ -38,10 +41,12 @@ import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.om.OmConfig; import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.s3.MultiDigestInputStream; import org.apache.hadoop.ozone.s3.exception.OS3Exception; import org.apache.hadoop.ozone.s3.exception.S3ErrorTable; import org.apache.hadoop.ozone.s3.metrics.S3GatewayMetrics; import org.apache.hadoop.util.Time; +import org.apache.ratis.util.function.CheckedRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,13 +67,14 @@ public static Pair put( OzoneBucket bucket, String keyPath, long length, ReplicationConfig replicationConfig, int chunkSize, Map keyMetadata, - Map tags, - DigestInputStream body, PerformanceStringBuilder perf) + Map tags, MultiDigestInputStream body, + HttpHeaders headers, boolean isSignedPayload, + PerformanceStringBuilder perf) throws IOException, OS3Exception { try { return putKeyWithStream(bucket, keyPath, - length, chunkSize, replicationConfig, keyMetadata, tags, body, perf); + length, chunkSize, replicationConfig, keyMetadata, tags, body, headers, isSignedPayload, perf); } catch (IOException ex) { LOG.error("Exception occurred in PutObject", ex); if (ex instanceof OMException) { @@ -100,19 +106,36 @@ public static Pair putKeyWithStream( ReplicationConfig replicationConfig, Map keyMetadata, Map tags, - DigestInputStream body, PerformanceStringBuilder perf) - throws IOException { + MultiDigestInputStream body, + HttpHeaders headers, + boolean isSignedPayload, + PerformanceStringBuilder perf) + throws IOException, OS3Exception { long startNanos = Time.monotonicNowNanos(); + final String amzContentSha256Header = validateSignatureHeader(headers, keyPath, isSignedPayload); long writeLen; String eTag; try (OzoneDataStreamOutput streamOutput = bucket.createStreamKey(keyPath, length, replicationConfig, keyMetadata, tags)) { long metadataLatencyNs = METRICS.updatePutKeyMetadataStats(startNanos); writeLen = writeToStreamOutput(streamOutput, body, bufferSize, length); - eTag = DatatypeConverter.printHexBinary(body.getMessageDigest().digest()) + eTag = DatatypeConverter.printHexBinary(body.getMessageDigest(OzoneConsts.MD5_HASH).digest()) .toLowerCase(); perf.appendMetaLatencyNanos(metadataLatencyNs); ((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG, eTag); + + // If sha256Digest exists, this request must validate x-amz-content-sha256 + MessageDigest sha256Digest = body.getMessageDigest(OzoneConsts.FILE_HASH); + if (sha256Digest != null) { + final String actualSha256 = DatatypeConverter.printHexBinary( + sha256Digest.digest()).toLowerCase(); + CheckedRunnable preCommit = () -> { + if (!amzContentSha256Header.equals(actualSha256)) { + throw S3ErrorTable.newError(S3ErrorTable.X_AMZ_CONTENT_SHA256_MISMATCH, keyPath); + } + }; + streamOutput.getKeyDataStreamOutput().setPreCommits(Collections.singletonList(preCommit)); + } } return Pair.of(eTag, writeLen); } @@ -163,7 +186,7 @@ private static long writeToStreamOutput(OzoneDataStreamOutput streamOutput, @SuppressWarnings("checkstyle:ParameterNumber") public static Response createMultipartKey(OzoneBucket ozoneBucket, String key, long length, int partNumber, String uploadID, int chunkSize, - DigestInputStream body, PerformanceStringBuilder perf) + MultiDigestInputStream body, PerformanceStringBuilder perf) throws IOException, OS3Exception { long startNanos = Time.monotonicNowNanos(); String eTag; @@ -174,7 +197,7 @@ public static Response createMultipartKey(OzoneBucket ozoneBucket, String key, long putLength = writeToStreamOutput(streamOutput, body, chunkSize, length); eTag = DatatypeConverter.printHexBinary( - body.getMessageDigest().digest()).toLowerCase(); + body.getMessageDigest(OzoneConsts.MD5_HASH).digest()).toLowerCase(); ((KeyMetadataAware)streamOutput).getMetadata().put(OzoneConsts.ETAG, eTag); METRICS.incPutKeySuccessLength(putLength); perf.appendMetaLatencyNanos(metadataLatencyNs); diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/OS3Exception.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/OS3Exception.java index 00b36427d43..f93f4a7a4d7 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/OS3Exception.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/OS3Exception.java @@ -36,7 +36,7 @@ */ @XmlRootElement(name = "Error") @XmlAccessorType(XmlAccessType.NONE) -public class OS3Exception extends Exception { +public class OS3Exception extends RuntimeException { private static final Logger LOG = LoggerFactory.getLogger(OS3Exception.class); private static ObjectMapper mapper; diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java index 060ed83d1bc..434087da746 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/exception/S3ErrorTable.java @@ -160,6 +160,10 @@ public final class S3ErrorTable { "Access Denied", "User doesn't have permission to access this resource due to a " + "bucket ownership mismatch.", HTTP_FORBIDDEN); + public static final OS3Exception X_AMZ_CONTENT_SHA256_MISMATCH = new OS3Exception( + "XAmzContentSHA256Mismatch", "The provided 'x-amz-content-sha256' header does " + + "not match the computed hash.", HTTP_BAD_REQUEST); + private static Function generateInternalError = e -> new OS3Exception("InternalError", e.getMessage(), HTTP_INTERNAL_ERROR); diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/signature/StringToSignProducer.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/signature/StringToSignProducer.java index e2f8d64a4d1..233a001400e 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/signature/StringToSignProducer.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/signature/StringToSignProducer.java @@ -337,7 +337,7 @@ static void validateSignedHeader( } break; case X_AMZ_CONTENT_SHA256: - // TODO: Construct request payload and match HEX(SHA256(requestPayload)) + // Validate x-amz-content-sha256 during upload, before committing the key. break; default: break; diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestMultiDigestInputStream.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestMultiDigestInputStream.java new file mode 100644 index 00000000000..cd83d5d4900 --- /dev/null +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/TestMultiDigestInputStream.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.s3; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.ByteArrayInputStream; +import java.security.MessageDigest; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Test {@link MultiDigestInputStream}. + */ +public class TestMultiDigestInputStream { + + private static final String TEST_DATA = "1234567890"; + + static Stream algorithmAndDataTestCases() throws Exception { + return Stream.of( + // Empty stream + Arguments.of("empty stream with MD5", + Arrays.asList(MessageDigest.getInstance("MD5")), ""), + Arguments.of("empty stream with multiple algorithms", + Arrays.asList(MessageDigest.getInstance("MD5"), + MessageDigest.getInstance("SHA-256")), ""), + // Normal data + Arguments.of("MD5", + Arrays.asList(MessageDigest.getInstance("MD5")), TEST_DATA), + Arguments.of("MD5 and SHA-256", + Arrays.asList(MessageDigest.getInstance("MD5"), + MessageDigest.getInstance("SHA-256")), TEST_DATA), + Arguments.of("MD5, SHA-1 and SHA-256", + Arrays.asList(MessageDigest.getInstance("MD5"), + MessageDigest.getInstance("SHA-1"), + MessageDigest.getInstance("SHA-256")), TEST_DATA) + ); + } + + @ParameterizedTest + @MethodSource("algorithmAndDataTestCases") + void testRead(String testName, List digests, String data) throws Exception { + byte[] dataBytes = data.getBytes(UTF_8); + + try (MultiDigestInputStream mdis = new MultiDigestInputStream( + new ByteArrayInputStream(dataBytes), digests)) { + String result = IOUtils.toString(mdis, UTF_8); + assertEquals(data, result); + + for (MessageDigest digest : digests) { + String algorithm = digest.getAlgorithm(); + byte[] expectedDigest = MessageDigest.getInstance(algorithm).digest(dataBytes); + assertArrayEquals(expectedDigest, mdis.getMessageDigest(algorithm).digest()); + } + } + } + + @Test + void testOnOffFunctionality() throws Exception { + byte[] data = TEST_DATA.getBytes(UTF_8); + + try (MultiDigestInputStream mdis = new MultiDigestInputStream(new ByteArrayInputStream(data), + Collections.singletonList(MessageDigest.getInstance("MD5")))) { + + mdis.on(false); + + String result = IOUtils.toString(mdis, UTF_8); + assertEquals(TEST_DATA, result); + + // Digest should be empty since it was turned off + MessageDigest md5 = mdis.getMessageDigest("MD5"); + assertNotNull(md5); + byte[] emptyDigest = MessageDigest.getInstance("MD5").digest(); + assertArrayEquals(emptyDigest, md5.digest()); + } + } + + @Test + void testOnOffWithPartialRead() throws Exception { + String firstPart = "12345"; + String secondPart = "67890"; + byte[] data = (firstPart + secondPart).getBytes(UTF_8); + + try (MultiDigestInputStream mdis = new MultiDigestInputStream(new ByteArrayInputStream(data), + Collections.singletonList(MessageDigest.getInstance("MD5")))) { + // Read first part with digest on + byte[] buffer1 = new byte[firstPart.length()]; + int bytesRead1 = mdis.read(buffer1, 0, buffer1.length); + assertEquals(firstPart.length(), bytesRead1); + assertEquals(firstPart, new String(buffer1, UTF_8)); + + mdis.on(false); + byte[] buffer2 = new byte[secondPart.length()]; + int bytesRead2 = mdis.read(buffer2, 0, buffer2.length); + assertEquals(secondPart.length(), bytesRead2); + assertEquals(secondPart, new String(buffer2, UTF_8)); + + // Digest should only contain first part + MessageDigest md5 = mdis.getMessageDigest("MD5"); + byte[] expectedDigest = MessageDigest.getInstance("MD5").digest(firstPart.getBytes(UTF_8)); + assertArrayEquals(expectedDigest, md5.digest()); + } + } + + @Test + void testResetDigests() throws Exception { + byte[] data = TEST_DATA.getBytes(UTF_8); + + try (MultiDigestInputStream mdis = new MultiDigestInputStream(new ByteArrayInputStream(data), + Collections.singletonList(MessageDigest.getInstance("MD5")))) { + + int byte1 = mdis.read(); + int byte2 = mdis.read(); + assertTrue(byte1 != -1 && byte2 != -1); + + mdis.resetDigests(); + + MessageDigest md5 = mdis.getMessageDigest("MD5"); + byte[] emptyDigest = MessageDigest.getInstance("MD5").digest(); + assertArrayEquals(emptyDigest, md5.digest()); + } + } + + @Test + void testDigestManagement() throws Exception { + byte[] data = TEST_DATA.getBytes(UTF_8); + + try (MultiDigestInputStream mdis = new MultiDigestInputStream(new ByteArrayInputStream(data), + Arrays.asList(MessageDigest.getInstance("MD5"), MessageDigest.getInstance("SHA-1")))) { + + // Test initial state - getAllDigests + Map allDigests = mdis.getAllDigests(); + assertEquals(2, allDigests.size()); + assertTrue(allDigests.containsKey("MD5")); + assertTrue(allDigests.containsKey("SHA-1")); + + // Test add + mdis.addMessageDigest("SHA-256"); + assertNotNull(mdis.getMessageDigest("SHA-256")); + assertEquals(3, mdis.getAllDigests().size()); + + // Test set - replace with new instance + MessageDigest newMd5 = MessageDigest.getInstance("MD5"); + mdis.setMessageDigest("MD5", newMd5); + assertNotNull(mdis.getMessageDigest("MD5")); + + // Test remove + MessageDigest removed = mdis.removeMessageDigest("SHA-1"); + assertNotNull(removed); + assertNull(mdis.getMessageDigest("SHA-1")); + assertEquals(2, mdis.getAllDigests().size()); + + // Test get non-existent + assertNull(mdis.getMessageDigest("SHA-512")); + + // Read data and verify remaining digests work correctly + String result = IOUtils.toString(mdis, UTF_8); + assertEquals(TEST_DATA, result); + + byte[] expectedMd5 = MessageDigest.getInstance("MD5").digest(data); + assertArrayEquals(expectedMd5, mdis.getMessageDigest("MD5").digest()); + + byte[] expectedSha256 = MessageDigest.getInstance("SHA-256").digest(data); + assertArrayEquals(expectedSha256, mdis.getMessageDigest("SHA-256").digest()); + } + } + +} diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java index a9fd7da4200..7c1352e59dd 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java @@ -81,7 +81,7 @@ public void init() throws OS3Exception, IOException { client.getObjectStore().createS3Bucket(BUCKET_NAME); headers = mock(HttpHeaders.class); - when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn("mockSignature"); + when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn("UNSIGNED-PAYLOAD"); rest = EndpointBuilder.newObjectEndpointBuilder() .setClient(client) diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java index a561343a518..ed6afb29a45 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java @@ -506,7 +506,7 @@ public void testPutEmptyObject() throws Exception { private HttpHeaders newMockHttpHeaders() { HttpHeaders httpHeaders = mock(HttpHeaders.class); - when(httpHeaders.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn("mockSignature"); + when(httpHeaders.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn("UNSIGNED-PAYLOAD"); return httpHeaders; } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingDelete.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingDelete.java index 6cf7eea1336..d60752dfedd 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingDelete.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingDelete.java @@ -74,7 +74,7 @@ public void init() throws OS3Exception, IOException { // Create a key with object tags Mockito.when(headers.getHeaderString(TAG_HEADER)).thenReturn("tag1=value1&tag2=value2"); Mockito.when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)) - .thenReturn("mockSignature"); + .thenReturn("UNSIGNED-PAYLOAD"); put(rest, BUCKET_NAME, KEY_WITH_TAG, CONTENT); } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingGet.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingGet.java index 94942479cff..f1e166a138d 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingGet.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingGet.java @@ -59,7 +59,7 @@ public void init() throws Exception { HttpHeaders headers = Mockito.mock(HttpHeaders.class); Mockito.when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)) - .thenReturn("mockSignature"); + .thenReturn("UNSIGNED-PAYLOAD"); rest = EndpointBuilder.newObjectEndpointBuilder() .setClient(client) diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingPut.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingPut.java index f6f26515ea9..75ddd97bd24 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingPut.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectTaggingPut.java @@ -65,7 +65,7 @@ void setup() throws Exception { clientStub.getObjectStore().createS3Bucket(BUCKET_NAME); HttpHeaders headers = mock(HttpHeaders.class); - when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn("mockSignature"); + when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn("UNSIGNED-PAYLOAD"); // Create PutObject and setClient to OzoneClientStub objectEndpoint = EndpointBuilder.newObjectEndpointBuilder() diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java index 4981069528a..2da2a42a05b 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java @@ -233,9 +233,13 @@ public void testPartUploadMessageDigestResetDuringException() throws IOException assertEquals(200, response.getStatus()); MessageDigest messageDigest = mock(MessageDigest.class); + when(messageDigest.getAlgorithm()).thenReturn("MD5"); + MessageDigest sha256Digest = mock(MessageDigest.class); + when(sha256Digest.getAlgorithm()).thenReturn("SHA-256"); try (MockedStatic mocked = mockStatic(IOUtils.class)) { // Add the mocked methods only during the copy request when(objectEndpoint.getMessageDigestInstance()).thenReturn(messageDigest); + when(objectEndpoint.getSha256DigestInstance()).thenReturn(sha256Digest); mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), any(OutputStream.class), anyLong(), anyLong(), any(byte[].class))) .thenThrow(IOException.class); @@ -251,6 +255,7 @@ public void testPartUploadMessageDigestResetDuringException() throws IOException // Verify that the message digest is reset so that the instance can be reused for the // next request in the same thread verify(messageDigest, times(1)).reset(); + verify(sha256Digest, times(1)).reset(); } } } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java index dbe21601dbd..e9d70f67982 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestUploadWithStream.java @@ -66,7 +66,7 @@ public void setUp() throws Exception { client.getObjectStore().createS3Bucket(S3BUCKET); HttpHeaders headers = mock(HttpHeaders.class); - when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn("mockSignature"); + when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn("UNSIGNED-PAYLOAD"); when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn("STANDARD"); OzoneConfiguration conf = new OzoneConfiguration(); diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java index c6c9face137..8aa4ba707ef 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java @@ -90,7 +90,8 @@ public void setup() throws Exception { when(headers.getHeaderString(STORAGE_CLASS_HEADER)).thenReturn( "STANDARD"); when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)) - .thenReturn("mockSignature"); + .thenReturn("UNSIGNED-PAYLOAD"); + bucketEndpoint = EndpointBuilder.newBucketEndpointBuilder() .setClient(clientStub)