diff --git a/google-cloud-storage/clirr-ignored-differences.xml b/google-cloud-storage/clirr-ignored-differences.xml
index c934dbe41b..99ba9b1799 100644
--- a/google-cloud-storage/clirr-ignored-differences.xml
+++ b/google-cloud-storage/clirr-ignored-differences.xml
@@ -138,12 +138,6 @@
com/google/cloud/storage/Storage
com.google.cloud.storage.BlobAppendableUpload blobAppendableUpload(com.google.cloud.storage.BlobInfo, com.google.cloud.storage.BlobAppendableUploadConfig, com.google.cloud.storage.Storage$BlobWriteOption[])
-
- 7005
- com/google/cloud/storage/Hasher$*
- * validate(*)
- * validate(*)
-
7013
@@ -357,4 +351,19 @@
com/google/cloud/storage/multipartupload/model/ListPartsResponse$Builder
com.google.cloud.storage.multipartupload.model.ListPartsResponse$Builder setUploadId(java.lang.String)
+
+
+
+ 7006
+ com/google/cloud/storage/Hasher*
+ * nullSafeConcat(*)
+ **
+
+
+ 7005
+ com/google/cloud/storage/Hasher*
+ **
+ **
+
+
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java
index 4b922a4282..385d9cd2f6 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java
@@ -44,7 +44,8 @@ final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByt
RetrierWithAlg retrier,
JsonResumableWrite resumableWrite,
SettableApiFuture result,
- LongConsumer committedBytesCallback) {
+ LongConsumer committedBytesCallback,
+ Hasher hasher) {
this.session = ResumableSession.json(httpClientContext, retrier, resumableWrite);
this.result = result;
this.committedBytesCallback = committedBytesCallback;
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java
index 703c9f980c..edda3ce245 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java
@@ -79,6 +79,7 @@ protected LazyWriteChannel newLazyWriteChannel() {
blobChannelContext
.getRetrier()
.withAlg(blobChannelContext.getRetryAlgorithmManager().idempotent()))
+ .setHasher(start.getHasher())
.buffered(getBufferHandle())
.setStartAsync(ApiFutures.immediateFuture(start))
.build());
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ByteSizeConstants.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ByteSizeConstants.java
index 463df327f5..c7110c1b12 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ByteSizeConstants.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ByteSizeConstants.java
@@ -37,6 +37,7 @@ final class ByteSizeConstants {
static final long _256KiBL = 262144L;
static final long _512KiBL = 524288L;
static final long _768KiBL = 786432L;
+ static final long _1MiBL = 1048576L;
private ByteSizeConstants() {}
}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java
index 53a14ca264..6d163f1d81 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java
@@ -193,7 +193,12 @@ public WritableByteChannelSession, BlobInfo> writeSession(
ApiFuture startAsync =
ApiFutures.immediateFuture(
JsonResumableWrite.of(
- encode, optionsMap, uploadIdSupplier.get(), 0L));
+ encode,
+ optionsMap,
+ uploadIdSupplier.get(),
+ 0L,
+ opts.getHasher(),
+ opts.getHasher().initialValue()));
return ResumableMedia.http()
.write()
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java
index 47a7b029e0..c1b506de2f 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java
@@ -73,8 +73,8 @@ default Crc32cLengthKnown hash(Supplier b) {
void validateUnchecked(Crc32cValue> expected, ByteString byteString)
throws UncheckedChecksumMismatchException;
- @Nullable Crc32cLengthKnown nullSafeConcat(
- @Nullable Crc32cLengthKnown r1, @NonNull Crc32cLengthKnown r2);
+ @Nullable > C nullSafeConcat(
+ @Nullable C r1, @Nullable Crc32cLengthKnown r2);
/**
* The initial value to use for this hasher.
@@ -123,8 +123,8 @@ public void validate(Crc32cValue> expected, ByteString b) {}
public void validateUnchecked(Crc32cValue> expected, ByteString byteString) {}
@Override
- public @Nullable Crc32cLengthKnown nullSafeConcat(
- @Nullable Crc32cLengthKnown r1, @NonNull Crc32cLengthKnown r2) {
+ public > @Nullable C nullSafeConcat(
+ @Nullable C r1, @Nullable Crc32cLengthKnown r2) {
return null;
}
@@ -189,14 +189,16 @@ public void validateUnchecked(Crc32cValue> expected, ByteString byteString)
}
}
+ @SuppressWarnings("unchecked")
@Override
- @Nullable
- public Crc32cLengthKnown nullSafeConcat(
- @Nullable Crc32cLengthKnown r1, @NonNull Crc32cLengthKnown r2) {
+ public > @Nullable C nullSafeConcat(
+ @Nullable C r1, @Nullable Crc32cLengthKnown r2) {
if (r1 == null) {
return null;
+ } else if (r2 == null) {
+ return r1;
} else {
- return r1.concat(r2);
+ return (C) r1.concat(r2);
}
}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpContentRange.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpContentRange.java
index a069c24950..43c68c02b9 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpContentRange.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpContentRange.java
@@ -210,7 +210,7 @@ public String getHeaderValue() {
@Override
public boolean endOffsetEquals(long e) {
- return false;
+ return e == Math.max(0, size - 1);
}
@Override
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpWritableByteChannelSessionBuilder.java
index a5474b5aee..6cec9eab05 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpWritableByteChannelSessionBuilder.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/HttpWritableByteChannelSessionBuilder.java
@@ -57,11 +57,13 @@ static final class ResumableUploadBuilder {
@NonNull private final HttpClientContext httpClientContext;
private RetrierWithAlg retrier;
private LongConsumer committedBytesCallback;
+ private Hasher hasher;
ResumableUploadBuilder(@NonNull HttpClientContext httpClientContext) {
this.httpClientContext = httpClientContext;
this.retrier = RetrierWithAlg.attemptOnce();
this.committedBytesCallback = l -> {};
+ this.hasher = Hasher.defaultHasher();
}
ResumableUploadBuilder setCommittedBytesCallback(@NonNull LongConsumer committedBytesCallback) {
@@ -75,6 +77,11 @@ ResumableUploadBuilder withRetryConfig(@NonNull RetrierWithAlg retrier) {
return this;
}
+ ResumableUploadBuilder setHasher(@NonNull Hasher hasher) {
+ this.hasher = requireNonNull(hasher, "hasher must be non null");
+ return this;
+ }
+
/**
* Do not apply any intermediate buffering. Any call to {@link
* java.nio.channels.WritableByteChannel#write(ByteBuffer)} will be segmented as is and sent to
@@ -117,9 +124,15 @@ BufferedResumableUploadBuilder buffered(BufferHandle bufferHandle) {
// function read them into local variables which will be closed over rather than the class
// fields.
RetrierWithAlg boundRetrier = retrier;
+ Hasher boundHasher = hasher;
return (start, resultFuture) ->
new ApiaryUnbufferedWritableByteChannel(
- httpClientContext, boundRetrier, start, resultFuture, committedBytesCallback);
+ httpClientContext,
+ boundRetrier,
+ start,
+ resultFuture,
+ committedBytesCallback,
+ boundHasher);
}
final class UnbufferedResumableUploadBuilder {
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java
index d4347787e0..1577bba23b 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java
@@ -18,10 +18,14 @@
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.storage.Conversions.Decoder;
+import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
+import com.google.cloud.storage.HttpContentRange.HasRange;
import com.google.cloud.storage.Retrying.RetrierWithAlg;
import com.google.cloud.storage.spi.v1.HttpRpcContext;
import com.google.cloud.storage.spi.v1.HttpStorageRpc;
import io.opencensus.trace.EndSpanOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -54,28 +58,118 @@ final class JsonResumableSession {
ResumableOperationResult<@Nullable StorageObject> put(
RewindableContent content, HttpContentRange contentRange) {
+ Crc32cValue> crc32cSoFar = resumableWrite.getCumulativeCrc32c();
+ @Nullable Crc32cValue> nextCumulativeCrc32c =
+ resumableWrite.getHasher().nullSafeConcat(crc32cSoFar, content.getCrc32c());
+ @Nullable Crc32cValue> finalChecksum =
+ contentRange.isFinalizing() ? nextCumulativeCrc32c : null;
JsonResumableSessionPutTask task =
- new JsonResumableSessionPutTask(context, resumableWrite, content, contentRange);
+ new JsonResumableSessionPutTask(
+ context, resumableWrite, content, contentRange, finalChecksum);
HttpRpcContext httpRpcContext = HttpRpcContext.getInstance();
try {
httpRpcContext.newInvocationId();
AtomicBoolean dirty = new AtomicBoolean(false);
- return retrier.run(
- () -> {
- if (dirty.getAndSet(true)) {
- ResumableOperationResult<@Nullable StorageObject> query = query();
- long persistedSize = query.getPersistedSize();
- if (contentRange.endOffsetEquals(persistedSize) || query.getObject() != null) {
- return query;
- } else {
- task.rewindTo(persistedSize);
- }
- }
- return task.call();
- },
- Decoder.identity());
+ ResumableOperationResult<@Nullable StorageObject> result =
+ retrier.run(
+ () -> {
+ if (dirty.getAndSet(true)) {
+ ResumableOperationResult<@Nullable StorageObject> query = query();
+ long persistedSize = query.getPersistedSize();
+ if (contentRange.endOffsetEquals(persistedSize) || query.getObject() != null) {
+ return query;
+ } else {
+ task.rewindTo(persistedSize);
+ }
+ }
+ return task.call();
+ },
+ Decoder.identity());
+
+ if (nextCumulativeCrc32c != null) {
+ long persistedSize = result.getPersistedSize();
+ if (contentRange.endOffsetEquals(persistedSize) || result.getObject() != null) {
+ resumableWrite.setCumulativeCrc32c(nextCumulativeCrc32c);
+ } else if (contentRange instanceof HasRange) {
+ ByteRangeSpec range = ((HasRange>) contentRange).range();
+ content.rewindTo(0);
+ long serverConsumedBytes = persistedSize - range.beginOffset();
+ try (HashingGatheringByteChannel hashingChannel =
+ new HashingGatheringByteChannel(serverConsumedBytes)) {
+ StorageException.wrapIOException(() -> content.writeTo(hashingChannel));
+ resumableWrite.setCumulativeCrc32c(
+ resumableWrite.getHasher().nullSafeConcat(crc32cSoFar, hashingChannel.cumulative));
+ }
+ } else {
+ throw new StorageException(
+ 0,
+ String.format(
+ Locale.US,
+ "Result persistedSize (%d) did not match expected end of contentRange (%s) and contentRange does not have range to allow automatic recovery",
+ persistedSize,
+ contentRange));
+ }
+ }
+ return result;
} finally {
httpRpcContext.clearInvocationId();
}
}
+
+ private static final class HashingGatheringByteChannel implements GatheringByteChannel {
+ private final long maxBytesToConsume;
+
+ private Crc32cLengthKnown cumulative;
+
+ private HashingGatheringByteChannel(long maxBytesToConsume) {
+ this.maxBytesToConsume = maxBytesToConsume;
+ this.cumulative = Crc32cValue.zero();
+ }
+
+ @Override
+ public int write(ByteBuffer src) {
+ return Math.toIntExact(write(new ByteBuffer[] {src}, 0, 1));
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs) {
+ return write(srcs, 0, srcs.length);
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int offset, int length) {
+ Crc32cLengthKnown cum = Crc32cValue.zero();
+ for (int i = offset; i < length; i++) {
+ long toConsume = maxBytesToConsume - cum.getLength();
+ if (toConsume <= 0) {
+ if (cum.getLength() == 0) {
+ return -1;
+ } else {
+ break;
+ }
+ }
+
+ ByteBuffer buf = srcs[i];
+ if (buf.remaining() <= toConsume) {
+ cum = cum.concat(Hasher.enabled().hash(buf));
+ } else {
+ ByteBuffer slice = buf.slice();
+ int limit = Math.toIntExact(toConsume);
+ slice.limit(limit);
+ cum = cum.concat(Hasher.enabled().hash(slice));
+ buf.position(buf.position() + limit);
+ }
+ }
+ cumulative = cumulative.concat(cum);
+ return cum.getLength();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ public void close() {}
+ }
}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java
index 92de549bd8..81018d5f40 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java
@@ -43,6 +43,7 @@ final class JsonResumableSessionPutTask
private final JsonResumableWrite jsonResumableWrite;
private final RewindableContent content;
private final HttpContentRange originalContentRange;
+ private final @Nullable Crc32cValue> cumulativeCrc32c;
private HttpContentRange contentRange;
@@ -52,11 +53,22 @@ final class JsonResumableSessionPutTask
JsonResumableWrite jsonResumableWrite,
RewindableContent content,
HttpContentRange originalContentRange) {
+ this(httpClientContext, jsonResumableWrite, content, originalContentRange, null);
+ }
+
+ @VisibleForTesting
+ JsonResumableSessionPutTask(
+ HttpClientContext httpClientContext,
+ JsonResumableWrite jsonResumableWrite,
+ RewindableContent content,
+ HttpContentRange originalContentRange,
+ @Nullable Crc32cValue> cumulativeCrc32c) {
this.context = httpClientContext;
this.jsonResumableWrite = jsonResumableWrite;
this.content = content;
this.originalContentRange = originalContentRange;
this.contentRange = originalContentRange;
+ this.cumulativeCrc32c = cumulativeCrc32c;
}
public void rewindTo(long offset) {
@@ -101,6 +113,9 @@ public void rewindTo(long offset) {
for (Entry e : jsonResumableWrite.getExtraHeaders().entrySet()) {
headers.set(e.getKey(), e.getValue());
}
+ if (cumulativeCrc32c != null) {
+ headers.set("x-goog-hash", "crc32c=" + Utils.crc32cCodec.encode(cumulativeCrc32c.getValue()));
+ }
HttpResponse response = null;
try {
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableWrite.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableWrite.java
index 41bbec72ec..b2347c47f3 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableWrite.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableWrite.java
@@ -31,6 +31,7 @@
import java.io.StringReader;
import java.util.Map;
import java.util.Objects;
+import org.checkerframework.checker.lock.qual.GuardedBy;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;
@@ -39,6 +40,8 @@ final class JsonResumableWrite implements Serializable {
private static final Gson gson = new Gson();
@MonotonicNonNull private transient StorageObject object;
+ @MonotonicNonNull private transient Hasher hasher;
+ @MonotonicNonNull private transient Crc32cValue> cumulativeCrc32c;
@MonotonicNonNull private final Map options;
@MonotonicNonNull private final String signedUrl;
@@ -48,13 +51,20 @@ final class JsonResumableWrite implements Serializable {
private volatile String objectJson;
+ @GuardedBy("objectJson")
+ private String base64CumulativeCrc32c;
+
private JsonResumableWrite(
StorageObject object,
+ @MonotonicNonNull Hasher hasher,
+ @MonotonicNonNull Crc32cValue> cumulativeCrc32c,
Map options,
String signedUrl,
@NonNull String uploadId,
long beginOffset) {
this.object = object;
+ this.hasher = hasher;
+ this.cumulativeCrc32c = cumulativeCrc32c;
this.options = options;
this.signedUrl = signedUrl;
this.uploadId = uploadId;
@@ -85,7 +95,20 @@ public JsonResumableWrite withBeginOffset(long newBeginOffset) {
"New beginOffset must be >= existing beginOffset (%s >= %s)",
newBeginOffset,
beginOffset);
- return new JsonResumableWrite(object, options, signedUrl, uploadId, newBeginOffset);
+ return new JsonResumableWrite(
+ object, hasher, cumulativeCrc32c, options, signedUrl, uploadId, newBeginOffset);
+ }
+
+ public @MonotonicNonNull Hasher getHasher() {
+ return hasher;
+ }
+
+ public @MonotonicNonNull Crc32cValue> getCumulativeCrc32c() {
+ return cumulativeCrc32c;
+ }
+
+ public void setCumulativeCrc32c(Crc32cValue> cumulativeCrc32c) {
+ this.cumulativeCrc32c = cumulativeCrc32c;
}
@Override
@@ -99,6 +122,8 @@ public boolean equals(Object o) {
JsonResumableWrite that = (JsonResumableWrite) o;
return beginOffset == that.beginOffset
&& Objects.equals(object, that.object)
+ && Objects.equals(hasher, that.hasher)
+ && cumulativeCrc32c.eqValue(that.cumulativeCrc32c)
&& Objects.equals(options, that.options)
&& Objects.equals(signedUrl, that.signedUrl)
&& Objects.equals(uploadId, that.uploadId);
@@ -106,13 +131,16 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
- return Objects.hash(object, options, signedUrl, uploadId, beginOffset);
+ return Objects.hash(
+ object, hasher, cumulativeCrc32c.getValue(), options, signedUrl, uploadId, beginOffset);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("object", object)
+ .add("hasher", hasher)
+ .add("cumulativeCrc32c", cumulativeCrc32c)
.add("options", options)
.add("signedUrl", signedUrl)
.add("uploadId", uploadId)
@@ -125,6 +153,7 @@ private String getObjectJson() {
synchronized (this) {
if (objectJson == null) {
objectJson = gson.toJson(object);
+ base64CumulativeCrc32c = Utils.crc32cCodec.encode(cumulativeCrc32c.getValue());
}
}
}
@@ -140,14 +169,38 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
in.defaultReadObject();
JsonReader jsonReader = gson.newJsonReader(new StringReader(this.objectJson));
this.object = gson.fromJson(jsonReader, StorageObject.class);
+ if (base64CumulativeCrc32c != null) {
+ Integer decode = Utils.crc32cCodec.decode(base64CumulativeCrc32c);
+ if (decode == 0) {
+ this.cumulativeCrc32c = Crc32cValue.zero();
+ } else {
+ this.cumulativeCrc32c = Crc32cValue.of(decode);
+ }
+ this.hasher = Hasher.enabled();
+ }
}
static JsonResumableWrite of(
StorageObject req, Map options, String uploadId, long beginOffset) {
- return new JsonResumableWrite(req, options, null, uploadId, beginOffset);
+ return of(req, options, uploadId, beginOffset, Hasher.noop(), null);
+ }
+
+ static JsonResumableWrite of(
+ StorageObject req,
+ Map options,
+ String uploadId,
+ long beginOffset,
+ Hasher hasher,
+ Crc32cValue> initialValue) {
+ return new JsonResumableWrite(req, hasher, initialValue, options, null, uploadId, beginOffset);
}
static JsonResumableWrite of(String signedUrl, String uploadId, long beginOffset) {
- return new JsonResumableWrite(null, null, signedUrl, uploadId, beginOffset);
+ Hasher hasher = Hasher.noop();
+ if (beginOffset == 0) {
+ hasher = Hasher.defaultHasher();
+ }
+ return new JsonResumableWrite(
+ null, hasher, hasher.initialValue(), null, signedUrl, uploadId, beginOffset);
}
}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java
index d4d4217840..ebc4cbe5d7 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java
@@ -216,7 +216,13 @@ public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOp
optionsMap,
retrier.withAlg(retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)));
JsonResumableWrite jsonResumableWrite =
- JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);
+ JsonResumableWrite.of(
+ encode,
+ optionsMap,
+ uploadIdSupplier.get(),
+ 0,
+ opts.getHasher(),
+ opts.getHasher().initialValue());
JsonResumableSession session =
ResumableSession.json(
@@ -762,7 +768,13 @@ public StorageWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options)
optionsMap,
retrier.withAlg(retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)));
JsonResumableWrite jsonResumableWrite =
- JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);
+ JsonResumableWrite.of(
+ encode,
+ optionsMap,
+ uploadIdSupplier.get(),
+ 0,
+ opts.getHasher(),
+ opts.getHasher().initialValue());
return new BlobWriteChannelV2(BlobReadChannelContext.from(this), jsonResumableWrite);
}
@@ -1724,7 +1736,13 @@ public BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts chunks = ctc.chunkup(_256KiB);
+ ChecksummedTestContent chunk1 = chunks.get(0);
+ ChecksummedTestContent chunk2 = chunks.get(1);
+
+ AtomicReference capturedInitialHash = new AtomicReference<>();
+ AtomicReference capturedFinalHash = new AtomicReference<>();
+
+ HttpRequestHandler handler =
+ req -> {
+ String contentRange = req.headers().get(CONTENT_RANGE);
+ String currentHash = req.headers().get("x-goog-hash");
+
+ if (contentRange.contains("/*")) { // First chunk (non-final)
+ capturedInitialHash.set(currentHash);
+ FullHttpResponse resp =
+ new DefaultFullHttpResponse(req.protocolVersion(), RESUME_INCOMPLETE);
+ resp.headers()
+ .set(
+ HttpHeaderNames.RANGE,
+ ByteRangeSpec.explicit(0L, _256KiBL).getHttpRangeHeader());
+ return resp;
+ } else { // Second chunk (final)
+ capturedFinalHash.set(currentHash);
+
+ StorageObject so =
+ new StorageObject().setName("object").setSize(BigInteger.valueOf(_512KiBL));
+ ByteBuf buf = Unpooled.wrappedBuffer(gson.toByteArray(so));
+ FullHttpResponse resp = new DefaultFullHttpResponse(req.protocolVersion(), OK, buf);
+ resp.headers().set(CONTENT_TYPE, "application/json; charset=utf-8");
+ return resp;
+ }
+ };
+
+ try (FakeHttpServer fakeHttpServer = FakeHttpServer.of(handler)) {
+ String uploadUrl =
+ fakeHttpServer
+ .createUri(
+ "/upload/{uploadId}", ImmutableMap.of("uploadId", UUID.randomUUID().toString()))
+ .toString();
+
+ JsonResumableWrite resumableWrite =
+ JsonResumableWrite.of(
+ null, ImmutableMap.of(), uploadUrl, 0, Hasher.enabled(), Crc32cValue.zero());
+ JsonResumableSession session =
+ new JsonResumableSession(httpClientContext, RETRIER, resumableWrite);
+
+ ResumableOperationResult<@Nullable StorageObject> put1 =
+ session.put(
+ RewindableContent.of(chunk1.asByteBuffer()),
+ HttpContentRange.of(ByteRangeSpec.explicit(0L, _256KiBL)));
+ assertThat(put1.getObject()).isNull();
+ assertThat(put1.getPersistedSize()).isEqualTo(_256KiBL);
+
+ ResumableOperationResult<@Nullable StorageObject> put2 =
+ session.put(
+ RewindableContent.of(chunk2.asByteBuffer()),
+ HttpContentRange.of(ByteRangeSpec.explicit(0L, _256KiBL), ctc.length()));
+ assertThat(put2.getObject()).isNotNull();
+ assertThat(put2.getPersistedSize()).isEqualTo(_512KiBL);
+
+ assertThat(capturedInitialHash.get()).isNull();
+ assertThat(capturedFinalHash.get()).isEqualTo(expectedHashHeader);
+ }
+ }
+
+ @Test
+ public void retriesOfPartiallyConsumedBytesChecksumCorrectly() throws Exception {
+ ChecksummedTestContent ctc = ChecksummedTestContent.gen(_1MiB);
+ String expectedFullHashHeader = "crc32c=" + ctc.getCrc32cBase64();
+ ChecksummedTestContent chunk1 = ctc.slice(0, _256KiB);
+ ChecksummedTestContent chunk2 = ctc.slice(_256KiB, _256KiB);
+ ChecksummedTestContent chunk3 = ctc.slice(_512KiB, _512KiB);
+ HttpContentRange expectedContentRange1 =
+ HttpContentRange.of(ByteRangeSpec.explicit(0L, (long) chunk1.length()));
+ HttpContentRange expectedContentRange2 =
+ HttpContentRange.of(ByteRangeSpec.explicit(_256KiBL, _256KiBL + chunk2.length()));
+ HttpContentRange expectedContentrange3 =
+ HttpContentRange.of(ByteRangeSpec.explicit(_512KiBL, (long) ctc.length()), ctc.length());
+ HttpContentRange retriedFullContentRange =
+ HttpContentRange.of(ByteRangeSpec.explicit(_768KiBL, (long) ctc.length()), ctc.length());
+
+ AtomicLong requestCount = new AtomicLong(0);
+ List hashes = Collections.synchronizedList(new ArrayList<>());
+
+ HttpRequestHandler handler =
+ req -> {
+ requestCount.incrementAndGet();
+ HttpContentRange contentRange =
+ HttpContentRange.parse(req.headers().get("Content-Range"));
+ String hashHeader = req.headers().get("x-goog-hash");
+ if (hashHeader != null) {
+ hashes.add(hashHeader);
+ }
+
+ if (expectedContentRange1.equals(contentRange)) {
+ DefaultFullHttpResponse resp =
+ new DefaultFullHttpResponse(req.protocolVersion(), RESUME_INCOMPLETE);
+ resp.headers().set(RANGE, ByteRangeSpec.explicit(0L, _256KiBL).getHttpRangeHeader());
+ return resp;
+ } else if (expectedContentRange2.equals(contentRange)) {
+ DefaultFullHttpResponse resp =
+ new DefaultFullHttpResponse(req.protocolVersion(), RESUME_INCOMPLETE);
+ resp.headers().set(RANGE, ByteRangeSpec.explicit(0L, _512KiBL).getHttpRangeHeader());
+ return resp;
+ } else if (expectedContentrange3.equals(contentRange)) {
+ // simulate a broken connection -- except instead of breaking the connection (which is
+ // very difficult to do with netty) return a 503.
+ return new DefaultFullHttpResponse(
+ req.protocolVersion(), HttpResponseStatus.SERVICE_UNAVAILABLE);
+ } else if (HttpContentRange.query().equals(contentRange)) {
+ DefaultFullHttpResponse resp =
+ new DefaultFullHttpResponse(req.protocolVersion(), RESUME_INCOMPLETE);
+ resp.headers().set(RANGE, ByteRangeSpec.explicit(0L, _768KiBL).getHttpRangeHeader());
+ return resp;
+ } else if (retriedFullContentRange.equals(contentRange)) {
+ StorageObject so =
+ new StorageObject().setName("object").setSize(BigInteger.valueOf(_1MiB));
+ ByteBuf buf = Unpooled.wrappedBuffer(gson.toByteArray(so));
+ DefaultFullHttpResponse resp =
+ new DefaultFullHttpResponse(req.protocolVersion(), OK, buf);
+ resp.headers().set(CONTENT_TYPE, "application/json; charset=utf-8");
+ return resp;
+ }
+ return new DefaultFullHttpResponse(
+ req.protocolVersion(), HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ };
+
+ try (FakeHttpServer fakeHttpServer = FakeHttpServer.of(handler)) {
+ String uploadUrl =
+ fakeHttpServer
+ .createUri(
+ "/upload/{uploadId}", ImmutableMap.of("uploadId", UUID.randomUUID().toString()))
+ .toString();
+
+ JsonResumableWrite resumableWrite =
+ JsonResumableWrite.of(
+ null, ImmutableMap.of(), uploadUrl, 0, Hasher.enabled(), Crc32cValue.zero());
+ JsonResumableSession session =
+ new JsonResumableSession(httpClientContext, RETRIER, resumableWrite);
+
+ ResumableOperationResult<@Nullable StorageObject> result1 =
+ session.put(
+ RewindableContent.of(chunk1.asByteBuffer()),
+ HttpContentRange.of(ByteRangeSpec.explicit(0L, _256KiBL)));
+ assertThat(result1.getObject()).isNull();
+ assertThat(result1.getPersistedSize()).isEqualTo(_256KiBL);
+ ResumableOperationResult<@Nullable StorageObject> result2 =
+ session.put(
+ RewindableContent.of(chunk2.asByteBuffer()),
+ HttpContentRange.of(ByteRangeSpec.explicit(_256KiBL, _512KiBL)));
+ assertThat(result2.getObject()).isNull();
+ assertThat(result2.getPersistedSize()).isEqualTo(_512KiBL);
+ ResumableOperationResult<@Nullable StorageObject> result3 =
+ session.put(
+ RewindableContent.of(chunk3.asByteBuffer()),
+ HttpContentRange.of(ByteRangeSpec.explicit(_512KiBL, _1MiBL), ctc.length()));
+ assertThat(result3.getObject()).isNotNull();
+ assertThat(result3.getPersistedSize()).isEqualTo(_1MiBL);
+
+ assertThat(requestCount.get()).isEqualTo(5);
+ assertThat(hashes)
+ .isEqualTo(ImmutableList.of(expectedFullHashHeader, expectedFullHashHeader));
+ }
+ }
}
diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java
index 1514c0f982..8905a48c90 100644
--- a/google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java
+++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java
@@ -266,7 +266,9 @@ protected Restorable>[] restorableObjects() {
Conversions.json().blobInfo().encode(BlobInfo.newBuilder("b", "n").build()),
ImmutableMap.of(),
"upload-id",
- 0));
+ 0,
+ Hasher.enabled(),
+ Crc32cValue.zero()));
return new Restorable>[] {readerV2, writer};
}