diff --git a/google-cloud-storage/clirr-ignored-differences.xml b/google-cloud-storage/clirr-ignored-differences.xml index c934dbe41b..99ba9b1799 100644 --- a/google-cloud-storage/clirr-ignored-differences.xml +++ b/google-cloud-storage/clirr-ignored-differences.xml @@ -138,12 +138,6 @@ com/google/cloud/storage/Storage com.google.cloud.storage.BlobAppendableUpload blobAppendableUpload(com.google.cloud.storage.BlobInfo, com.google.cloud.storage.BlobAppendableUploadConfig, com.google.cloud.storage.Storage$BlobWriteOption[]) - - 7005 - com/google/cloud/storage/Hasher$* - * validate(*) - * validate(*) - 7013 @@ -357,4 +351,19 @@ com/google/cloud/storage/multipartupload/model/ListPartsResponse$Builder com.google.cloud.storage.multipartupload.model.ListPartsResponse$Builder setUploadId(java.lang.String) + + + + 7006 + com/google/cloud/storage/Hasher* + * nullSafeConcat(*) + ** + + + 7005 + com/google/cloud/storage/Hasher* + ** + ** + + diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java index 4b922a4282..385d9cd2f6 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java @@ -44,7 +44,8 @@ final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByt RetrierWithAlg retrier, JsonResumableWrite resumableWrite, SettableApiFuture result, - LongConsumer committedBytesCallback) { + LongConsumer committedBytesCallback, + Hasher hasher) { this.session = ResumableSession.json(httpClientContext, retrier, resumableWrite); this.result = result; this.committedBytesCallback = committedBytesCallback; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java index 703c9f980c..edda3ce245 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java @@ -79,6 +79,7 @@ protected LazyWriteChannel newLazyWriteChannel() { blobChannelContext .getRetrier() .withAlg(blobChannelContext.getRetryAlgorithmManager().idempotent())) + .setHasher(start.getHasher()) .buffered(getBufferHandle()) .setStartAsync(ApiFutures.immediateFuture(start)) .build()); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ByteSizeConstants.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ByteSizeConstants.java index 463df327f5..c7110c1b12 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ByteSizeConstants.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ByteSizeConstants.java @@ -37,6 +37,7 @@ final class ByteSizeConstants { static final long _256KiBL = 262144L; static final long _512KiBL = 524288L; static final long _768KiBL = 786432L; + static final long _1MiBL = 1048576L; private ByteSizeConstants() {} } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java index 53a14ca264..6d163f1d81 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java @@ -193,7 +193,12 @@ public WritableByteChannelSession writeSession( ApiFuture startAsync = ApiFutures.immediateFuture( JsonResumableWrite.of( - encode, optionsMap, uploadIdSupplier.get(), 0L)); + encode, + optionsMap, + uploadIdSupplier.get(), + 0L, + opts.getHasher(), + opts.getHasher().initialValue())); return ResumableMedia.http() .write() diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java index 47a7b029e0..c1b506de2f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java @@ -73,8 +73,8 @@ default Crc32cLengthKnown hash(Supplier b) { void validateUnchecked(Crc32cValue expected, ByteString byteString) throws UncheckedChecksumMismatchException; - @Nullable Crc32cLengthKnown nullSafeConcat( - @Nullable Crc32cLengthKnown r1, @NonNull Crc32cLengthKnown r2); + @Nullable > C nullSafeConcat( + @Nullable C r1, @Nullable Crc32cLengthKnown r2); /** * The initial value to use for this hasher. @@ -123,8 +123,8 @@ public void validate(Crc32cValue expected, ByteString b) {} public void validateUnchecked(Crc32cValue expected, ByteString byteString) {} @Override - public @Nullable Crc32cLengthKnown nullSafeConcat( - @Nullable Crc32cLengthKnown r1, @NonNull Crc32cLengthKnown r2) { + public > @Nullable C nullSafeConcat( + @Nullable C r1, @Nullable Crc32cLengthKnown r2) { return null; } @@ -189,14 +189,16 @@ public void validateUnchecked(Crc32cValue expected, ByteString byteString) } } + @SuppressWarnings("unchecked") @Override - @Nullable - public Crc32cLengthKnown nullSafeConcat( - @Nullable Crc32cLengthKnown r1, @NonNull Crc32cLengthKnown r2) { + public > @Nullable C nullSafeConcat( + @Nullable C r1, @Nullable Crc32cLengthKnown r2) { if (r1 == null) { return null; + } else if (r2 == null) { + return r1; } else { - return r1.concat(r2); + return (C) r1.concat(r2); } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpContentRange.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpContentRange.java index a069c24950..43c68c02b9 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpContentRange.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpContentRange.java @@ -210,7 +210,7 @@ public String getHeaderValue() { @Override public boolean endOffsetEquals(long e) { - return false; + return e == Math.max(0, size - 1); } @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpWritableByteChannelSessionBuilder.java index a5474b5aee..6cec9eab05 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpWritableByteChannelSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpWritableByteChannelSessionBuilder.java @@ -57,11 +57,13 @@ static final class ResumableUploadBuilder { @NonNull private final HttpClientContext httpClientContext; private RetrierWithAlg retrier; private LongConsumer committedBytesCallback; + private Hasher hasher; ResumableUploadBuilder(@NonNull HttpClientContext httpClientContext) { this.httpClientContext = httpClientContext; this.retrier = RetrierWithAlg.attemptOnce(); this.committedBytesCallback = l -> {}; + this.hasher = Hasher.defaultHasher(); } ResumableUploadBuilder setCommittedBytesCallback(@NonNull LongConsumer committedBytesCallback) { @@ -75,6 +77,11 @@ ResumableUploadBuilder withRetryConfig(@NonNull RetrierWithAlg retrier) { return this; } + ResumableUploadBuilder setHasher(@NonNull Hasher hasher) { + this.hasher = requireNonNull(hasher, "hasher must be non null"); + return this; + } + /** * Do not apply any intermediate buffering. Any call to {@link * java.nio.channels.WritableByteChannel#write(ByteBuffer)} will be segmented as is and sent to @@ -117,9 +124,15 @@ BufferedResumableUploadBuilder buffered(BufferHandle bufferHandle) { // function read them into local variables which will be closed over rather than the class // fields. RetrierWithAlg boundRetrier = retrier; + Hasher boundHasher = hasher; return (start, resultFuture) -> new ApiaryUnbufferedWritableByteChannel( - httpClientContext, boundRetrier, start, resultFuture, committedBytesCallback); + httpClientContext, + boundRetrier, + start, + resultFuture, + committedBytesCallback, + boundHasher); } final class UnbufferedResumableUploadBuilder { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java index d4347787e0..1577bba23b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java @@ -18,10 +18,14 @@ import com.google.api.services.storage.model.StorageObject; import com.google.cloud.storage.Conversions.Decoder; +import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; +import com.google.cloud.storage.HttpContentRange.HasRange; import com.google.cloud.storage.Retrying.RetrierWithAlg; import com.google.cloud.storage.spi.v1.HttpRpcContext; import com.google.cloud.storage.spi.v1.HttpStorageRpc; import io.opencensus.trace.EndSpanOptions; +import java.nio.ByteBuffer; +import java.nio.channels.GatheringByteChannel; import java.util.Locale; import java.util.concurrent.atomic.AtomicBoolean; import org.checkerframework.checker.nullness.qual.Nullable; @@ -54,28 +58,118 @@ final class JsonResumableSession { ResumableOperationResult<@Nullable StorageObject> put( RewindableContent content, HttpContentRange contentRange) { + Crc32cValue crc32cSoFar = resumableWrite.getCumulativeCrc32c(); + @Nullable Crc32cValue nextCumulativeCrc32c = + resumableWrite.getHasher().nullSafeConcat(crc32cSoFar, content.getCrc32c()); + @Nullable Crc32cValue finalChecksum = + contentRange.isFinalizing() ? nextCumulativeCrc32c : null; JsonResumableSessionPutTask task = - new JsonResumableSessionPutTask(context, resumableWrite, content, contentRange); + new JsonResumableSessionPutTask( + context, resumableWrite, content, contentRange, finalChecksum); HttpRpcContext httpRpcContext = HttpRpcContext.getInstance(); try { httpRpcContext.newInvocationId(); AtomicBoolean dirty = new AtomicBoolean(false); - return retrier.run( - () -> { - if (dirty.getAndSet(true)) { - ResumableOperationResult<@Nullable StorageObject> query = query(); - long persistedSize = query.getPersistedSize(); - if (contentRange.endOffsetEquals(persistedSize) || query.getObject() != null) { - return query; - } else { - task.rewindTo(persistedSize); - } - } - return task.call(); - }, - Decoder.identity()); + ResumableOperationResult<@Nullable StorageObject> result = + retrier.run( + () -> { + if (dirty.getAndSet(true)) { + ResumableOperationResult<@Nullable StorageObject> query = query(); + long persistedSize = query.getPersistedSize(); + if (contentRange.endOffsetEquals(persistedSize) || query.getObject() != null) { + return query; + } else { + task.rewindTo(persistedSize); + } + } + return task.call(); + }, + Decoder.identity()); + + if (nextCumulativeCrc32c != null) { + long persistedSize = result.getPersistedSize(); + if (contentRange.endOffsetEquals(persistedSize) || result.getObject() != null) { + resumableWrite.setCumulativeCrc32c(nextCumulativeCrc32c); + } else if (contentRange instanceof HasRange) { + ByteRangeSpec range = ((HasRange) contentRange).range(); + content.rewindTo(0); + long serverConsumedBytes = persistedSize - range.beginOffset(); + try (HashingGatheringByteChannel hashingChannel = + new HashingGatheringByteChannel(serverConsumedBytes)) { + StorageException.wrapIOException(() -> content.writeTo(hashingChannel)); + resumableWrite.setCumulativeCrc32c( + resumableWrite.getHasher().nullSafeConcat(crc32cSoFar, hashingChannel.cumulative)); + } + } else { + throw new StorageException( + 0, + String.format( + Locale.US, + "Result persistedSize (%d) did not match expected end of contentRange (%s) and contentRange does not have range to allow automatic recovery", + persistedSize, + contentRange)); + } + } + return result; } finally { httpRpcContext.clearInvocationId(); } } + + private static final class HashingGatheringByteChannel implements GatheringByteChannel { + private final long maxBytesToConsume; + + private Crc32cLengthKnown cumulative; + + private HashingGatheringByteChannel(long maxBytesToConsume) { + this.maxBytesToConsume = maxBytesToConsume; + this.cumulative = Crc32cValue.zero(); + } + + @Override + public int write(ByteBuffer src) { + return Math.toIntExact(write(new ByteBuffer[] {src}, 0, 1)); + } + + @Override + public long write(ByteBuffer[] srcs) { + return write(srcs, 0, srcs.length); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) { + Crc32cLengthKnown cum = Crc32cValue.zero(); + for (int i = offset; i < length; i++) { + long toConsume = maxBytesToConsume - cum.getLength(); + if (toConsume <= 0) { + if (cum.getLength() == 0) { + return -1; + } else { + break; + } + } + + ByteBuffer buf = srcs[i]; + if (buf.remaining() <= toConsume) { + cum = cum.concat(Hasher.enabled().hash(buf)); + } else { + ByteBuffer slice = buf.slice(); + int limit = Math.toIntExact(toConsume); + slice.limit(limit); + cum = cum.concat(Hasher.enabled().hash(slice)); + buf.position(buf.position() + limit); + } + } + cumulative = cumulative.concat(cum); + return cum.getLength(); + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() {} + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java index 92de549bd8..81018d5f40 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java @@ -43,6 +43,7 @@ final class JsonResumableSessionPutTask private final JsonResumableWrite jsonResumableWrite; private final RewindableContent content; private final HttpContentRange originalContentRange; + private final @Nullable Crc32cValue cumulativeCrc32c; private HttpContentRange contentRange; @@ -52,11 +53,22 @@ final class JsonResumableSessionPutTask JsonResumableWrite jsonResumableWrite, RewindableContent content, HttpContentRange originalContentRange) { + this(httpClientContext, jsonResumableWrite, content, originalContentRange, null); + } + + @VisibleForTesting + JsonResumableSessionPutTask( + HttpClientContext httpClientContext, + JsonResumableWrite jsonResumableWrite, + RewindableContent content, + HttpContentRange originalContentRange, + @Nullable Crc32cValue cumulativeCrc32c) { this.context = httpClientContext; this.jsonResumableWrite = jsonResumableWrite; this.content = content; this.originalContentRange = originalContentRange; this.contentRange = originalContentRange; + this.cumulativeCrc32c = cumulativeCrc32c; } public void rewindTo(long offset) { @@ -101,6 +113,9 @@ public void rewindTo(long offset) { for (Entry e : jsonResumableWrite.getExtraHeaders().entrySet()) { headers.set(e.getKey(), e.getValue()); } + if (cumulativeCrc32c != null) { + headers.set("x-goog-hash", "crc32c=" + Utils.crc32cCodec.encode(cumulativeCrc32c.getValue())); + } HttpResponse response = null; try { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableWrite.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableWrite.java index 41bbec72ec..b2347c47f3 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableWrite.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableWrite.java @@ -31,6 +31,7 @@ import java.io.StringReader; import java.util.Map; import java.util.Objects; +import org.checkerframework.checker.lock.qual.GuardedBy; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.NonNull; @@ -39,6 +40,8 @@ final class JsonResumableWrite implements Serializable { private static final Gson gson = new Gson(); @MonotonicNonNull private transient StorageObject object; + @MonotonicNonNull private transient Hasher hasher; + @MonotonicNonNull private transient Crc32cValue cumulativeCrc32c; @MonotonicNonNull private final Map options; @MonotonicNonNull private final String signedUrl; @@ -48,13 +51,20 @@ final class JsonResumableWrite implements Serializable { private volatile String objectJson; + @GuardedBy("objectJson") + private String base64CumulativeCrc32c; + private JsonResumableWrite( StorageObject object, + @MonotonicNonNull Hasher hasher, + @MonotonicNonNull Crc32cValue cumulativeCrc32c, Map options, String signedUrl, @NonNull String uploadId, long beginOffset) { this.object = object; + this.hasher = hasher; + this.cumulativeCrc32c = cumulativeCrc32c; this.options = options; this.signedUrl = signedUrl; this.uploadId = uploadId; @@ -85,7 +95,20 @@ public JsonResumableWrite withBeginOffset(long newBeginOffset) { "New beginOffset must be >= existing beginOffset (%s >= %s)", newBeginOffset, beginOffset); - return new JsonResumableWrite(object, options, signedUrl, uploadId, newBeginOffset); + return new JsonResumableWrite( + object, hasher, cumulativeCrc32c, options, signedUrl, uploadId, newBeginOffset); + } + + public @MonotonicNonNull Hasher getHasher() { + return hasher; + } + + public @MonotonicNonNull Crc32cValue getCumulativeCrc32c() { + return cumulativeCrc32c; + } + + public void setCumulativeCrc32c(Crc32cValue cumulativeCrc32c) { + this.cumulativeCrc32c = cumulativeCrc32c; } @Override @@ -99,6 +122,8 @@ public boolean equals(Object o) { JsonResumableWrite that = (JsonResumableWrite) o; return beginOffset == that.beginOffset && Objects.equals(object, that.object) + && Objects.equals(hasher, that.hasher) + && cumulativeCrc32c.eqValue(that.cumulativeCrc32c) && Objects.equals(options, that.options) && Objects.equals(signedUrl, that.signedUrl) && Objects.equals(uploadId, that.uploadId); @@ -106,13 +131,16 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(object, options, signedUrl, uploadId, beginOffset); + return Objects.hash( + object, hasher, cumulativeCrc32c.getValue(), options, signedUrl, uploadId, beginOffset); } @Override public String toString() { return MoreObjects.toStringHelper(this) .add("object", object) + .add("hasher", hasher) + .add("cumulativeCrc32c", cumulativeCrc32c) .add("options", options) .add("signedUrl", signedUrl) .add("uploadId", uploadId) @@ -125,6 +153,7 @@ private String getObjectJson() { synchronized (this) { if (objectJson == null) { objectJson = gson.toJson(object); + base64CumulativeCrc32c = Utils.crc32cCodec.encode(cumulativeCrc32c.getValue()); } } } @@ -140,14 +169,38 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE in.defaultReadObject(); JsonReader jsonReader = gson.newJsonReader(new StringReader(this.objectJson)); this.object = gson.fromJson(jsonReader, StorageObject.class); + if (base64CumulativeCrc32c != null) { + Integer decode = Utils.crc32cCodec.decode(base64CumulativeCrc32c); + if (decode == 0) { + this.cumulativeCrc32c = Crc32cValue.zero(); + } else { + this.cumulativeCrc32c = Crc32cValue.of(decode); + } + this.hasher = Hasher.enabled(); + } } static JsonResumableWrite of( StorageObject req, Map options, String uploadId, long beginOffset) { - return new JsonResumableWrite(req, options, null, uploadId, beginOffset); + return of(req, options, uploadId, beginOffset, Hasher.noop(), null); + } + + static JsonResumableWrite of( + StorageObject req, + Map options, + String uploadId, + long beginOffset, + Hasher hasher, + Crc32cValue initialValue) { + return new JsonResumableWrite(req, hasher, initialValue, options, null, uploadId, beginOffset); } static JsonResumableWrite of(String signedUrl, String uploadId, long beginOffset) { - return new JsonResumableWrite(null, null, signedUrl, uploadId, beginOffset); + Hasher hasher = Hasher.noop(); + if (beginOffset == 0) { + hasher = Hasher.defaultHasher(); + } + return new JsonResumableWrite( + null, hasher, hasher.initialValue(), null, signedUrl, uploadId, beginOffset); } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index d4d4217840..ebc4cbe5d7 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -216,7 +216,13 @@ public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOp optionsMap, retrier.withAlg(retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap))); JsonResumableWrite jsonResumableWrite = - JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); + JsonResumableWrite.of( + encode, + optionsMap, + uploadIdSupplier.get(), + 0, + opts.getHasher(), + opts.getHasher().initialValue()); JsonResumableSession session = ResumableSession.json( @@ -762,7 +768,13 @@ public StorageWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) optionsMap, retrier.withAlg(retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap))); JsonResumableWrite jsonResumableWrite = - JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); + JsonResumableWrite.of( + encode, + optionsMap, + uploadIdSupplier.get(), + 0, + opts.getHasher(), + opts.getHasher().initialValue()); return new BlobWriteChannelV2(BlobReadChannelContext.from(this), jsonResumableWrite); } @@ -1724,7 +1736,13 @@ public BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts chunks = ctc.chunkup(_256KiB); + ChecksummedTestContent chunk1 = chunks.get(0); + ChecksummedTestContent chunk2 = chunks.get(1); + + AtomicReference capturedInitialHash = new AtomicReference<>(); + AtomicReference capturedFinalHash = new AtomicReference<>(); + + HttpRequestHandler handler = + req -> { + String contentRange = req.headers().get(CONTENT_RANGE); + String currentHash = req.headers().get("x-goog-hash"); + + if (contentRange.contains("/*")) { // First chunk (non-final) + capturedInitialHash.set(currentHash); + FullHttpResponse resp = + new DefaultFullHttpResponse(req.protocolVersion(), RESUME_INCOMPLETE); + resp.headers() + .set( + HttpHeaderNames.RANGE, + ByteRangeSpec.explicit(0L, _256KiBL).getHttpRangeHeader()); + return resp; + } else { // Second chunk (final) + capturedFinalHash.set(currentHash); + + StorageObject so = + new StorageObject().setName("object").setSize(BigInteger.valueOf(_512KiBL)); + ByteBuf buf = Unpooled.wrappedBuffer(gson.toByteArray(so)); + FullHttpResponse resp = new DefaultFullHttpResponse(req.protocolVersion(), OK, buf); + resp.headers().set(CONTENT_TYPE, "application/json; charset=utf-8"); + return resp; + } + }; + + try (FakeHttpServer fakeHttpServer = FakeHttpServer.of(handler)) { + String uploadUrl = + fakeHttpServer + .createUri( + "/upload/{uploadId}", ImmutableMap.of("uploadId", UUID.randomUUID().toString())) + .toString(); + + JsonResumableWrite resumableWrite = + JsonResumableWrite.of( + null, ImmutableMap.of(), uploadUrl, 0, Hasher.enabled(), Crc32cValue.zero()); + JsonResumableSession session = + new JsonResumableSession(httpClientContext, RETRIER, resumableWrite); + + ResumableOperationResult<@Nullable StorageObject> put1 = + session.put( + RewindableContent.of(chunk1.asByteBuffer()), + HttpContentRange.of(ByteRangeSpec.explicit(0L, _256KiBL))); + assertThat(put1.getObject()).isNull(); + assertThat(put1.getPersistedSize()).isEqualTo(_256KiBL); + + ResumableOperationResult<@Nullable StorageObject> put2 = + session.put( + RewindableContent.of(chunk2.asByteBuffer()), + HttpContentRange.of(ByteRangeSpec.explicit(0L, _256KiBL), ctc.length())); + assertThat(put2.getObject()).isNotNull(); + assertThat(put2.getPersistedSize()).isEqualTo(_512KiBL); + + assertThat(capturedInitialHash.get()).isNull(); + assertThat(capturedFinalHash.get()).isEqualTo(expectedHashHeader); + } + } + + @Test + public void retriesOfPartiallyConsumedBytesChecksumCorrectly() throws Exception { + ChecksummedTestContent ctc = ChecksummedTestContent.gen(_1MiB); + String expectedFullHashHeader = "crc32c=" + ctc.getCrc32cBase64(); + ChecksummedTestContent chunk1 = ctc.slice(0, _256KiB); + ChecksummedTestContent chunk2 = ctc.slice(_256KiB, _256KiB); + ChecksummedTestContent chunk3 = ctc.slice(_512KiB, _512KiB); + HttpContentRange expectedContentRange1 = + HttpContentRange.of(ByteRangeSpec.explicit(0L, (long) chunk1.length())); + HttpContentRange expectedContentRange2 = + HttpContentRange.of(ByteRangeSpec.explicit(_256KiBL, _256KiBL + chunk2.length())); + HttpContentRange expectedContentrange3 = + HttpContentRange.of(ByteRangeSpec.explicit(_512KiBL, (long) ctc.length()), ctc.length()); + HttpContentRange retriedFullContentRange = + HttpContentRange.of(ByteRangeSpec.explicit(_768KiBL, (long) ctc.length()), ctc.length()); + + AtomicLong requestCount = new AtomicLong(0); + List hashes = Collections.synchronizedList(new ArrayList<>()); + + HttpRequestHandler handler = + req -> { + requestCount.incrementAndGet(); + HttpContentRange contentRange = + HttpContentRange.parse(req.headers().get("Content-Range")); + String hashHeader = req.headers().get("x-goog-hash"); + if (hashHeader != null) { + hashes.add(hashHeader); + } + + if (expectedContentRange1.equals(contentRange)) { + DefaultFullHttpResponse resp = + new DefaultFullHttpResponse(req.protocolVersion(), RESUME_INCOMPLETE); + resp.headers().set(RANGE, ByteRangeSpec.explicit(0L, _256KiBL).getHttpRangeHeader()); + return resp; + } else if (expectedContentRange2.equals(contentRange)) { + DefaultFullHttpResponse resp = + new DefaultFullHttpResponse(req.protocolVersion(), RESUME_INCOMPLETE); + resp.headers().set(RANGE, ByteRangeSpec.explicit(0L, _512KiBL).getHttpRangeHeader()); + return resp; + } else if (expectedContentrange3.equals(contentRange)) { + // simulate a broken connection -- except instead of breaking the connection (which is + // very difficult to do with netty) return a 503. + return new DefaultFullHttpResponse( + req.protocolVersion(), HttpResponseStatus.SERVICE_UNAVAILABLE); + } else if (HttpContentRange.query().equals(contentRange)) { + DefaultFullHttpResponse resp = + new DefaultFullHttpResponse(req.protocolVersion(), RESUME_INCOMPLETE); + resp.headers().set(RANGE, ByteRangeSpec.explicit(0L, _768KiBL).getHttpRangeHeader()); + return resp; + } else if (retriedFullContentRange.equals(contentRange)) { + StorageObject so = + new StorageObject().setName("object").setSize(BigInteger.valueOf(_1MiB)); + ByteBuf buf = Unpooled.wrappedBuffer(gson.toByteArray(so)); + DefaultFullHttpResponse resp = + new DefaultFullHttpResponse(req.protocolVersion(), OK, buf); + resp.headers().set(CONTENT_TYPE, "application/json; charset=utf-8"); + return resp; + } + return new DefaultFullHttpResponse( + req.protocolVersion(), HttpResponseStatus.INTERNAL_SERVER_ERROR); + }; + + try (FakeHttpServer fakeHttpServer = FakeHttpServer.of(handler)) { + String uploadUrl = + fakeHttpServer + .createUri( + "/upload/{uploadId}", ImmutableMap.of("uploadId", UUID.randomUUID().toString())) + .toString(); + + JsonResumableWrite resumableWrite = + JsonResumableWrite.of( + null, ImmutableMap.of(), uploadUrl, 0, Hasher.enabled(), Crc32cValue.zero()); + JsonResumableSession session = + new JsonResumableSession(httpClientContext, RETRIER, resumableWrite); + + ResumableOperationResult<@Nullable StorageObject> result1 = + session.put( + RewindableContent.of(chunk1.asByteBuffer()), + HttpContentRange.of(ByteRangeSpec.explicit(0L, _256KiBL))); + assertThat(result1.getObject()).isNull(); + assertThat(result1.getPersistedSize()).isEqualTo(_256KiBL); + ResumableOperationResult<@Nullable StorageObject> result2 = + session.put( + RewindableContent.of(chunk2.asByteBuffer()), + HttpContentRange.of(ByteRangeSpec.explicit(_256KiBL, _512KiBL))); + assertThat(result2.getObject()).isNull(); + assertThat(result2.getPersistedSize()).isEqualTo(_512KiBL); + ResumableOperationResult<@Nullable StorageObject> result3 = + session.put( + RewindableContent.of(chunk3.asByteBuffer()), + HttpContentRange.of(ByteRangeSpec.explicit(_512KiBL, _1MiBL), ctc.length())); + assertThat(result3.getObject()).isNotNull(); + assertThat(result3.getPersistedSize()).isEqualTo(_1MiBL); + + assertThat(requestCount.get()).isEqualTo(5); + assertThat(hashes) + .isEqualTo(ImmutableList.of(expectedFullHashHeader, expectedFullHashHeader)); + } + } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java index 1514c0f982..8905a48c90 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java @@ -266,7 +266,9 @@ protected Restorable[] restorableObjects() { Conversions.json().blobInfo().encode(BlobInfo.newBuilder("b", "n").build()), ImmutableMap.of(), "upload-id", - 0)); + 0, + Hasher.enabled(), + Crc32cValue.zero())); return new Restorable[] {readerV2, writer}; }