Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import javax.annotation.PostConstruct;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HEAD;
import javax.ws.rs.HeaderParam;
Expand All @@ -92,7 +91,6 @@
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
Expand Down Expand Up @@ -222,23 +220,23 @@ public void init() {
* See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html for
* more details.
*/
@SuppressWarnings({"checkstyle:MethodLength", "checkstyle:ParameterNumber"})
@SuppressWarnings("checkstyle:MethodLength")
@PUT
public Response put(
@PathParam(BUCKET) String bucketName,
@PathParam(PATH) String keyPath,
@HeaderParam(HttpHeaders.CONTENT_LENGTH) long length,
@QueryParam(QueryParams.PART_NUMBER) int partNumber,
@QueryParam(QueryParams.UPLOAD_ID) @DefaultValue("") String uploadID,
@QueryParam(QueryParams.TAGGING) String taggingMarker,
@QueryParam(QueryParams.ACL) String aclMarker,
final InputStream body) throws IOException, OS3Exception {
final InputStream body
) throws IOException, OS3Exception {
final String aclMarker = queryParams().get(QueryParams.ACL);
final String taggingMarker = queryParams().get(QueryParams.TAGGING);
final String uploadID = queryParams().get(QueryParams.UPLOAD_ID);
long startNanos = Time.monotonicNowNanos();
S3GAction s3GAction = S3GAction.CREATE_KEY;
boolean auditSuccess = true;
PerformanceStringBuilder perf = new PerformanceStringBuilder();

String copyHeader = null, storageType = null, storageConfig = null;
String copyHeader = null;
MultiDigestInputStream multiDigestInputStream = null;
try {
if (aclMarker != null) {
Expand All @@ -261,17 +259,13 @@ public Response put(
}
// If uploadID is specified, it is a request for upload part
return createMultipartKey(volume, bucket, keyPath, length,
partNumber, uploadID, body, perf);
body, perf);
}

copyHeader = getHeaders().getHeaderString(COPY_SOURCE_HEADER);
storageType = getHeaders().getHeaderString(STORAGE_CLASS_HEADER);
storageConfig = getHeaders().getHeaderString(CUSTOM_METADATA_HEADER_PREFIX + STORAGE_CONFIG_HEADER);
boolean storageTypeDefault = StringUtils.isEmpty(storageType);

// Normal put object
ReplicationConfig replicationConfig =
getReplicationConfig(bucket, storageType, storageConfig);
ReplicationConfig replicationConfig = getReplicationConfig(bucket);

boolean enableEC = false;
if ((replicationConfig != null &&
Expand All @@ -284,8 +278,7 @@ public Response put(
//Copy object, as copy source available.
s3GAction = S3GAction.COPY_OBJECT;
CopyObjectResponse copyObjectResponse = copyObject(volume,
copyHeader, bucketName, keyPath, replicationConfig,
storageTypeDefault, perf);
bucketName, keyPath, replicationConfig, perf);
return Response.status(Status.OK).entity(copyObjectResponse).header(
"Connection", "close").build();
}
Expand Down Expand Up @@ -431,17 +424,18 @@ public Response put(
* https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadListParts.html
* for more details.
*/
@SuppressWarnings({"checkstyle:MethodLength", "checkstyle:ParameterNumber"})
@SuppressWarnings("checkstyle:MethodLength")
@GET
public Response get(
@PathParam(BUCKET) String bucketName,
@PathParam(PATH) String keyPath,
@QueryParam(QueryParams.PART_NUMBER) int partNumber,
@QueryParam(QueryParams.UPLOAD_ID) String uploadId,
@QueryParam(QueryParams.MAX_PARTS) @DefaultValue("1000") int maxParts,
@QueryParam(QueryParams.PART_NUMBER_MARKER) String partNumberMarker,
@QueryParam(QueryParams.TAGGING) String taggingMarker)
throws IOException, OS3Exception {
@PathParam(PATH) String keyPath
) throws IOException, OS3Exception {
final int maxParts = queryParams().getInt(QueryParams.MAX_PARTS, 1000);
final int partNumber = queryParams().getInt(QueryParams.PART_NUMBER, 0);
final String partNumberMarker = queryParams().get(QueryParams.PART_NUMBER_MARKER);
final String taggingMarker = queryParams().get(QueryParams.TAGGING);
final String uploadId = queryParams().get(QueryParams.UPLOAD_ID);

long startNanos = Time.monotonicNowNanos();
S3GAction s3GAction = S3GAction.GET_KEY;
PerformanceStringBuilder perf = new PerformanceStringBuilder();
Expand Down Expand Up @@ -748,10 +742,11 @@ private Response abortMultipartUpload(OzoneVolume volume, String bucket,
@SuppressWarnings("emptyblock")
public Response delete(
@PathParam(BUCKET) String bucketName,
@PathParam(PATH) String keyPath,
@QueryParam(QueryParams.UPLOAD_ID) @DefaultValue("") String uploadId,
@QueryParam(QueryParams.TAGGING) String taggingMarker) throws
IOException, OS3Exception {
@PathParam(PATH) String keyPath
) throws IOException, OS3Exception {
final String taggingMarker = queryParams().get(QueryParams.TAGGING);
final String uploadId = queryParams().get(QueryParams.UPLOAD_ID);

long startNanos = Time.monotonicNowNanos();
S3GAction s3GAction = S3GAction.DELETE_KEY;

Expand Down Expand Up @@ -826,24 +821,20 @@ public Response delete(
public Response initializeMultipartUpload(
@PathParam(BUCKET) String bucket,
@PathParam(PATH) String key
)
throws IOException, OS3Exception {
) throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
S3GAction s3GAction = S3GAction.INIT_MULTIPART_UPLOAD;

try {
OzoneBucket ozoneBucket = getBucket(bucket);
S3Owner.verifyBucketOwnerCondition(getHeaders(), bucket, ozoneBucket.getOwner());
String storageType = getHeaders().getHeaderString(STORAGE_CLASS_HEADER);
String storageConfig = getHeaders().getHeaderString(CUSTOM_METADATA_HEADER_PREFIX + STORAGE_CONFIG_HEADER);

Map<String, String> customMetadata =
getCustomMetadataFromHeaders(getHeaders().getRequestHeaders());

Map<String, String> tags = getTaggingFromHeaders(getHeaders());

ReplicationConfig replicationConfig =
getReplicationConfig(ozoneBucket, storageType, storageConfig);
ReplicationConfig replicationConfig = getReplicationConfig(ozoneBucket);

OmMultipartInfo multipartInfo =
ozoneBucket.initiateMultipartUpload(key, replicationConfig, customMetadata, tags);
Expand Down Expand Up @@ -873,8 +864,9 @@ public Response initializeMultipartUpload(
}
}

private ReplicationConfig getReplicationConfig(OzoneBucket ozoneBucket,
String storageType, String storageConfig) throws OS3Exception {
private ReplicationConfig getReplicationConfig(OzoneBucket ozoneBucket) throws OS3Exception {
String storageType = getHeaders().getHeaderString(STORAGE_CLASS_HEADER);
String storageConfig = getHeaders().getHeaderString(CUSTOM_METADATA_HEADER_PREFIX + STORAGE_CONFIG_HEADER);

ReplicationConfig clientConfiguredReplicationConfig =
OzoneClientUtils.getClientConfiguredReplicationConfig(getOzoneConfiguration());
Expand All @@ -891,9 +883,9 @@ private ReplicationConfig getReplicationConfig(OzoneBucket ozoneBucket,
public Response completeMultipartUpload(
@PathParam(BUCKET) String bucket,
@PathParam(PATH) String key,
@QueryParam(QueryParams.UPLOAD_ID) @DefaultValue("") String uploadID,
CompleteMultipartUploadRequest multipartUploadRequest)
throws IOException, OS3Exception {
CompleteMultipartUploadRequest multipartUploadRequest
) throws IOException, OS3Exception {
final String uploadID = queryParams().get(QueryParams.UPLOAD_ID, "");
long startNanos = Time.monotonicNowNanos();
S3GAction s3GAction = S3GAction.COMPLETE_MULTIPART_UPLOAD;
OzoneVolume volume = getVolume();
Expand Down Expand Up @@ -962,12 +954,14 @@ public Response completeMultipartUpload(
}
}

@SuppressWarnings({"checkstyle:MethodLength", "checkstyle:ParameterNumber"})
@SuppressWarnings("checkstyle:MethodLength")
private Response createMultipartKey(OzoneVolume volume, OzoneBucket ozoneBucket,
String key, long length, int partNumber, String uploadID,
String key, long length,
final InputStream body, PerformanceStringBuilder perf)
throws IOException, OS3Exception {
long startNanos = Time.monotonicNowNanos();
final String uploadID = queryParams().get(QueryParams.UPLOAD_ID);
final int partNumber = queryParams().getInt(QueryParams.PART_NUMBER, 0);
String copyHeader = null;
MultiDigestInputStream multiDigestInputStream = null;
final String bucketName = ozoneBucket.getName();
Expand All @@ -979,10 +973,7 @@ private Response createMultipartKey(OzoneVolume volume, OzoneBucket ozoneBucket,
length = chunkInputStreamInfo.getEffectiveLength();

copyHeader = getHeaders().getHeaderString(COPY_SOURCE_HEADER);
String storageType = getHeaders().getHeaderString(STORAGE_CLASS_HEADER);
String storageConfig = getHeaders().getHeaderString(CUSTOM_METADATA_HEADER_PREFIX + STORAGE_CONFIG_HEADER);
ReplicationConfig replicationConfig =
getReplicationConfig(ozoneBucket, storageType, storageConfig);
ReplicationConfig replicationConfig = getReplicationConfig(ozoneBucket);

boolean enableEC = false;
if ((replicationConfig != null &&
Expand Down Expand Up @@ -1227,12 +1218,14 @@ void copy(OzoneVolume volume, DigestInputStream src, long srcKeyLen,
perf.appendSizeBytes(copyLength);
}

@SuppressWarnings("checkstyle:ParameterNumber")
private CopyObjectResponse copyObject(OzoneVolume volume,
String copyHeader, String destBucket, String destkey,
ReplicationConfig replicationConfig, boolean storageTypeDefault,
String destBucket, String destkey, ReplicationConfig replicationConfig,
PerformanceStringBuilder perf)
throws OS3Exception, IOException {
String copyHeader = getHeaders().getHeaderString(COPY_SOURCE_HEADER);
String storageType = getHeaders().getHeaderString(STORAGE_CLASS_HEADER);
boolean storageTypeDefault = StringUtils.isEmpty(storageType);

long startNanos = Time.monotonicNowNanos();
Pair<String, String> result = parseSourceHeader(copyHeader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import javax.ws.rs.core.Response;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.s3.exception.OS3Exception;
import org.apache.hadoop.ozone.s3.util.S3Consts;
import org.apache.http.HttpStatus;
import org.apache.ratis.util.function.CheckedRunnable;
import org.apache.ratis.util.function.CheckedSupplier;

/** Utilities for unit-testing S3 endpoints. */
Expand All @@ -40,7 +42,7 @@ public static Response get(
String bucket,
String key
) throws IOException, OS3Exception {
return subject.get(bucket, key, 0, null, 0, null, null);
return subject.get(bucket, key);
}

/** Get key tags. */
Expand All @@ -49,7 +51,8 @@ public static Response getTagging(
String bucket,
String key
) throws IOException, OS3Exception {
return subject.get(bucket, key, 0, null, 0, null, "");
subject.queryParamsForTest().set(S3Consts.QueryParams.TAGGING, "");
return subject.get(bucket, key);
}

/** List parts of MPU. */
Expand All @@ -61,7 +64,10 @@ public static Response listParts(
int maxParts,
int nextPart
) throws IOException, OS3Exception {
return subject.get(bucket, key, 0, uploadID, maxParts, String.valueOf(nextPart), null);
subject.queryParamsForTest().set(S3Consts.QueryParams.UPLOAD_ID, uploadID);
subject.queryParamsForTest().setInt(S3Consts.QueryParams.MAX_PARTS, maxParts);
subject.queryParamsForTest().setInt(S3Consts.QueryParams.PART_NUMBER_MARKER, nextPart);
return subject.get(bucket, key);
}

/** Put without content. */
Expand Down Expand Up @@ -90,12 +96,13 @@ public static Response putTagging(
String key,
String content
) throws IOException, OS3Exception {
subject.queryParamsForTest().set(S3Consts.QueryParams.TAGGING, "");
if (content == null) {
return subject.put(bucket, key, 0, 0, null, "", null, null);
return subject.put(bucket, key, 0, null);
} else {
final long length = content.length();
try (ByteArrayInputStream body = new ByteArrayInputStream(content.getBytes(UTF_8))) {
return subject.put(bucket, key, length, 0, null, "", null, body);
return subject.put(bucket, key, length, body);
}
}
}
Expand All @@ -109,12 +116,17 @@ public static Response put(
String uploadID,
String content
) throws IOException, OS3Exception {
if (uploadID != null) {
subject.queryParamsForTest().set(S3Consts.QueryParams.UPLOAD_ID, uploadID);
}
subject.queryParamsForTest().setInt(S3Consts.QueryParams.PART_NUMBER, partNumber);

if (content == null) {
return subject.put(bucket, key, 0, partNumber, uploadID, null, null, null);
return subject.put(bucket, key, 0, null);
} else {
final long length = content.length();
try (ByteArrayInputStream body = new ByteArrayInputStream(content.getBytes(UTF_8))) {
return subject.put(bucket, key, length, partNumber, uploadID, null, null, body);
return subject.put(bucket, key, length, body);
}
}
}
Expand All @@ -125,7 +137,7 @@ public static Response delete(
String bucket,
String key
) throws IOException, OS3Exception {
return subject.delete(bucket, key, null, null);
return subject.delete(bucket, key);
}

/** Delete key tags. */
Expand All @@ -134,7 +146,8 @@ public static Response deleteTagging(
String bucket,
String key
) throws IOException, OS3Exception {
return subject.delete(bucket, key, null, "");
subject.queryParamsForTest().set(S3Consts.QueryParams.TAGGING, "");
return subject.delete(bucket, key);
}

/** Initiate multipart upload.
Expand Down Expand Up @@ -185,7 +198,9 @@ public static void completeMultipartUpload(
CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest();
completeMultipartUploadRequest.setPartList(parts);

try (Response response = subject.completeMultipartUpload(bucket, key, uploadID, completeMultipartUploadRequest)) {
subject.queryParamsForTest().set(S3Consts.QueryParams.UPLOAD_ID, uploadID);

try (Response response = subject.completeMultipartUpload(bucket, key, completeMultipartUploadRequest)) {
assertEquals(HttpStatus.SC_OK, response.getStatus());

CompleteMultipartUploadResponse completeMultipartUploadResponse =
Expand All @@ -205,7 +220,8 @@ public static Response abortMultipartUpload(
String key,
String uploadID
) throws IOException, OS3Exception {
return subject.delete(bucket, key, uploadID, null);
subject.queryParamsForTest().set(S3Consts.QueryParams.UPLOAD_ID, uploadID);
return subject.delete(bucket, key);
}

/** Verify response is success for {@code request}. */
Expand All @@ -220,7 +236,15 @@ public static <E extends Exception> void assertStatus(int status, CheckedSupplie
}
}

/** Verify error response for {@code request} matching {@code expected} {@link OS3Exception}. */
/** Verify error response for {@code request} matches {@code expected} {@link OS3Exception}. */
public static OS3Exception assertErrorResponse(OS3Exception expected, CheckedRunnable<?> request) {
OS3Exception actual = assertThrows(OS3Exception.class, request::run);
assertEquals(expected.getCode(), actual.getCode());
assertEquals(expected.getHttpCode(), actual.getHttpCode());
return actual;
}

/** Verify error response for {@code request} matches {@code expected} {@link OS3Exception}. */
public static OS3Exception assertErrorResponse(OS3Exception expected, CheckedSupplier<Response, ?> request) {
OS3Exception actual = assertThrows(OS3Exception.class, () -> request.get().close());
assertEquals(expected.getCode(), actual.getCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.util.Collections.emptyList;
import static org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.abortMultipartUpload;
import static org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.assertErrorResponse;
import static org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.assertStatus;
import static org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.assertSucceeds;
import static org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.completeMultipartUpload;
import static org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.delete;
import static org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.deleteTagging;
import static org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.get;
Expand Down Expand Up @@ -54,7 +56,6 @@
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientStub;
import org.apache.hadoop.ozone.s3.endpoint.BucketEndpoint;
import org.apache.hadoop.ozone.s3.endpoint.CompleteMultipartUploadRequest;
import org.apache.hadoop.ozone.s3.endpoint.EndpointBuilder;
import org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils;
import org.apache.hadoop.ozone.s3.endpoint.ObjectEndpoint;
Expand Down Expand Up @@ -406,9 +407,8 @@ public void testAbortMultiPartUploadFailure() {
public void testCompleteMultiPartUploadSuccess() throws Exception {
long oriMetric = metrics.getCompleteMultiPartUploadSuccess();
String uploadID = initiateMultipartUpload(bucketName, keyName);
CompleteMultipartUploadRequest request = new CompleteMultipartUploadRequest();

assertSucceeds(() -> keyEndpoint.completeMultipartUpload(bucketName, keyName, uploadID, request));
completeMultipartUpload(keyEndpoint, bucketName, keyName, uploadID, emptyList());

long curMetric = metrics.getCompleteMultiPartUploadSuccess();
assertEquals(1L, curMetric - oriMetric);
Expand All @@ -417,10 +417,9 @@ public void testCompleteMultiPartUploadSuccess() throws Exception {
@Test
public void testCompleteMultiPartUploadFailure() {
long oriMetric = metrics.getCompleteMultiPartUploadFailure();
CompleteMultipartUploadRequest request = new CompleteMultipartUploadRequest();

assertErrorResponse(S3ErrorTable.NO_SUCH_UPLOAD,
() -> keyEndpoint.completeMultipartUpload(bucketName, "key2", "random", request));
() -> completeMultipartUpload(keyEndpoint, bucketName, "key2", "random", emptyList()));

long curMetric = metrics.getCompleteMultiPartUploadFailure();
assertEquals(1L, curMetric - oriMetric);
Expand Down