Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions google-cloud-storage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,6 @@
<className>com/google/cloud/storage/Storage</className>
<method>com.google.cloud.storage.BlobAppendableUpload blobAppendableUpload(com.google.cloud.storage.BlobInfo, com.google.cloud.storage.BlobAppendableUploadConfig, com.google.cloud.storage.Storage$BlobWriteOption[])</method>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/storage/Hasher$*</className>
<method>* validate(*)</method>
<to>* validate(*)</to>
</difference>

<difference>
<differenceType>7013</differenceType>
Expand Down Expand Up @@ -357,4 +351,19 @@
<className>com/google/cloud/storage/multipartupload/model/ListPartsResponse$Builder</className>
<method>com.google.cloud.storage.multipartupload.model.ListPartsResponse$Builder setUploadId(java.lang.String)</method>
</difference>

<!-- Hasher is a package private interface, with inner classes in it -->
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/storage/Hasher*</className>
<method>* nullSafeConcat(*)</method>
<to>**</to>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/storage/Hasher*</className>
<method>**</method>
<to>**</to>
</difference>

</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByt
RetrierWithAlg retrier,
JsonResumableWrite resumableWrite,
SettableApiFuture<StorageObject> result,
LongConsumer committedBytesCallback) {
LongConsumer committedBytesCallback,
Hasher hasher) {
this.session = ResumableSession.json(httpClientContext, retrier, resumableWrite);
this.result = result;
this.committedBytesCallback = committedBytesCallback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ protected LazyWriteChannel<StorageObject> newLazyWriteChannel() {
blobChannelContext
.getRetrier()
.withAlg(blobChannelContext.getRetryAlgorithmManager().idempotent()))
.setHasher(start.getHasher())
.buffered(getBufferHandle())
.setStartAsync(ApiFutures.immediateFuture(start))
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ final class ByteSizeConstants {
static final long _256KiBL = 262144L;
static final long _512KiBL = 524288L;
static final long _768KiBL = 786432L;
static final long _1MiBL = 1048576L;

private ByteSizeConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,12 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
ApiFuture<JsonResumableWrite> startAsync =
ApiFutures.immediateFuture(
JsonResumableWrite.of(
encode, optionsMap, uploadIdSupplier.get(), 0L));
encode,
optionsMap,
uploadIdSupplier.get(),
0L,
opts.getHasher(),
opts.getHasher().initialValue()));

return ResumableMedia.http()
.write()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ default Crc32cLengthKnown hash(Supplier<ByteBuffer> b) {
void validateUnchecked(Crc32cValue<?> expected, ByteString byteString)
throws UncheckedChecksumMismatchException;

@Nullable Crc32cLengthKnown nullSafeConcat(
@Nullable Crc32cLengthKnown r1, @NonNull Crc32cLengthKnown r2);
@Nullable <C extends Crc32cValue<?>> C nullSafeConcat(
@Nullable C r1, @Nullable Crc32cLengthKnown r2);

/**
* The initial value to use for this hasher.
Expand Down Expand Up @@ -123,8 +123,8 @@ public void validate(Crc32cValue<?> expected, ByteString b) {}
public void validateUnchecked(Crc32cValue<?> expected, ByteString byteString) {}

@Override
public @Nullable Crc32cLengthKnown nullSafeConcat(
@Nullable Crc32cLengthKnown r1, @NonNull Crc32cLengthKnown r2) {
public <C extends Crc32cValue<?>> @Nullable C nullSafeConcat(
@Nullable C r1, @Nullable Crc32cLengthKnown r2) {
return null;
}

Expand Down Expand Up @@ -189,14 +189,16 @@ public void validateUnchecked(Crc32cValue<?> expected, ByteString byteString)
}
}

@SuppressWarnings("unchecked")
@Override
@Nullable
public Crc32cLengthKnown nullSafeConcat(
@Nullable Crc32cLengthKnown r1, @NonNull Crc32cLengthKnown r2) {
public <C extends Crc32cValue<?>> @Nullable C nullSafeConcat(
@Nullable C r1, @Nullable Crc32cLengthKnown r2) {
if (r1 == null) {
return null;
} else if (r2 == null) {
return r1;
} else {
return r1.concat(r2);
return (C) r1.concat(r2);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ public String getHeaderValue() {

@Override
public boolean endOffsetEquals(long e) {
return false;
return e == Math.max(0, size - 1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ static final class ResumableUploadBuilder {
@NonNull private final HttpClientContext httpClientContext;
private RetrierWithAlg retrier;
private LongConsumer committedBytesCallback;
private Hasher hasher;

ResumableUploadBuilder(@NonNull HttpClientContext httpClientContext) {
this.httpClientContext = httpClientContext;
this.retrier = RetrierWithAlg.attemptOnce();
this.committedBytesCallback = l -> {};
this.hasher = Hasher.defaultHasher();
}

ResumableUploadBuilder setCommittedBytesCallback(@NonNull LongConsumer committedBytesCallback) {
Expand All @@ -75,6 +77,11 @@ ResumableUploadBuilder withRetryConfig(@NonNull RetrierWithAlg retrier) {
return this;
}

ResumableUploadBuilder setHasher(@NonNull Hasher hasher) {
this.hasher = requireNonNull(hasher, "hasher must be non null");
return this;
}

/**
* Do not apply any intermediate buffering. Any call to {@link
* java.nio.channels.WritableByteChannel#write(ByteBuffer)} will be segmented as is and sent to
Expand Down Expand Up @@ -117,9 +124,15 @@ BufferedResumableUploadBuilder buffered(BufferHandle bufferHandle) {
// function read them into local variables which will be closed over rather than the class
// fields.
RetrierWithAlg boundRetrier = retrier;
Hasher boundHasher = hasher;
return (start, resultFuture) ->
new ApiaryUnbufferedWritableByteChannel(
httpClientContext, boundRetrier, start, resultFuture, committedBytesCallback);
httpClientContext,
boundRetrier,
start,
resultFuture,
committedBytesCallback,
boundHasher);
}

final class UnbufferedResumableUploadBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@

import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
import com.google.cloud.storage.HttpContentRange.HasRange;
import com.google.cloud.storage.Retrying.RetrierWithAlg;
import com.google.cloud.storage.spi.v1.HttpRpcContext;
import com.google.cloud.storage.spi.v1.HttpStorageRpc;
import io.opencensus.trace.EndSpanOptions;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -54,28 +58,118 @@ final class JsonResumableSession {

ResumableOperationResult<@Nullable StorageObject> put(
RewindableContent content, HttpContentRange contentRange) {
Crc32cValue<?> crc32cSoFar = resumableWrite.getCumulativeCrc32c();
@Nullable Crc32cValue<?> nextCumulativeCrc32c =
resumableWrite.getHasher().nullSafeConcat(crc32cSoFar, content.getCrc32c());
@Nullable Crc32cValue<?> finalChecksum =
contentRange.isFinalizing() ? nextCumulativeCrc32c : null;
JsonResumableSessionPutTask task =
new JsonResumableSessionPutTask(context, resumableWrite, content, contentRange);
new JsonResumableSessionPutTask(
context, resumableWrite, content, contentRange, finalChecksum);
HttpRpcContext httpRpcContext = HttpRpcContext.getInstance();
try {
httpRpcContext.newInvocationId();
AtomicBoolean dirty = new AtomicBoolean(false);
return retrier.run(
() -> {
if (dirty.getAndSet(true)) {
ResumableOperationResult<@Nullable StorageObject> query = query();
long persistedSize = query.getPersistedSize();
if (contentRange.endOffsetEquals(persistedSize) || query.getObject() != null) {
return query;
} else {
task.rewindTo(persistedSize);
}
}
return task.call();
},
Decoder.identity());
ResumableOperationResult<@Nullable StorageObject> result =
retrier.run(
() -> {
if (dirty.getAndSet(true)) {
ResumableOperationResult<@Nullable StorageObject> query = query();
long persistedSize = query.getPersistedSize();
if (contentRange.endOffsetEquals(persistedSize) || query.getObject() != null) {
return query;
} else {
task.rewindTo(persistedSize);
}
}
return task.call();
},
Decoder.identity());

if (nextCumulativeCrc32c != null) {
long persistedSize = result.getPersistedSize();
if (contentRange.endOffsetEquals(persistedSize) || result.getObject() != null) {
resumableWrite.setCumulativeCrc32c(nextCumulativeCrc32c);
} else if (contentRange instanceof HasRange) {
ByteRangeSpec range = ((HasRange<?>) contentRange).range();
content.rewindTo(0);
long serverConsumedBytes = persistedSize - range.beginOffset();
try (HashingGatheringByteChannel hashingChannel =
new HashingGatheringByteChannel(serverConsumedBytes)) {
StorageException.wrapIOException(() -> content.writeTo(hashingChannel));
resumableWrite.setCumulativeCrc32c(
resumableWrite.getHasher().nullSafeConcat(crc32cSoFar, hashingChannel.cumulative));
}
} else {
throw new StorageException(
0,
String.format(
Locale.US,
"Result persistedSize (%d) did not match expected end of contentRange (%s) and contentRange does not have range to allow automatic recovery",
persistedSize,
contentRange));
}
}
return result;
} finally {
httpRpcContext.clearInvocationId();
}
}

private static final class HashingGatheringByteChannel implements GatheringByteChannel {
private final long maxBytesToConsume;

private Crc32cLengthKnown cumulative;

private HashingGatheringByteChannel(long maxBytesToConsume) {
this.maxBytesToConsume = maxBytesToConsume;
this.cumulative = Crc32cValue.zero();
}

@Override
public int write(ByteBuffer src) {
return Math.toIntExact(write(new ByteBuffer[] {src}, 0, 1));
}

@Override
public long write(ByteBuffer[] srcs) {
return write(srcs, 0, srcs.length);
}

@Override
public long write(ByteBuffer[] srcs, int offset, int length) {
Crc32cLengthKnown cum = Crc32cValue.zero();
for (int i = offset; i < length; i++) {
long toConsume = maxBytesToConsume - cum.getLength();
if (toConsume <= 0) {
if (cum.getLength() == 0) {
return -1;
} else {
break;
}
}

ByteBuffer buf = srcs[i];
if (buf.remaining() <= toConsume) {
cum = cum.concat(Hasher.enabled().hash(buf));
} else {
ByteBuffer slice = buf.slice();
int limit = Math.toIntExact(toConsume);
slice.limit(limit);
cum = cum.concat(Hasher.enabled().hash(slice));
buf.position(buf.position() + limit);
}
}
cumulative = cumulative.concat(cum);
return cum.getLength();
}

@Override
public boolean isOpen() {
return true;
}

@Override
public void close() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ final class JsonResumableSessionPutTask
private final JsonResumableWrite jsonResumableWrite;
private final RewindableContent content;
private final HttpContentRange originalContentRange;
private final @Nullable Crc32cValue<?> cumulativeCrc32c;

private HttpContentRange contentRange;

Expand All @@ -52,11 +53,22 @@ final class JsonResumableSessionPutTask
JsonResumableWrite jsonResumableWrite,
RewindableContent content,
HttpContentRange originalContentRange) {
this(httpClientContext, jsonResumableWrite, content, originalContentRange, null);
}

@VisibleForTesting
JsonResumableSessionPutTask(
HttpClientContext httpClientContext,
JsonResumableWrite jsonResumableWrite,
RewindableContent content,
HttpContentRange originalContentRange,
@Nullable Crc32cValue<?> cumulativeCrc32c) {
this.context = httpClientContext;
this.jsonResumableWrite = jsonResumableWrite;
this.content = content;
this.originalContentRange = originalContentRange;
this.contentRange = originalContentRange;
this.cumulativeCrc32c = cumulativeCrc32c;
}

public void rewindTo(long offset) {
Expand Down Expand Up @@ -101,6 +113,9 @@ public void rewindTo(long offset) {
for (Entry<String, String> e : jsonResumableWrite.getExtraHeaders().entrySet()) {
headers.set(e.getKey(), e.getValue());
}
if (cumulativeCrc32c != null) {
headers.set("x-goog-hash", "crc32c=" + Utils.crc32cCodec.encode(cumulativeCrc32c.getValue()));
}

HttpResponse response = null;
try {
Expand Down
Loading