From ba31241d3be49f728e298047d16475bf887be3a9 Mon Sep 17 00:00:00 2001 From: Shreyas Sinha Date: Mon, 22 Dec 2025 05:33:19 +0000 Subject: [PATCH 1/6] chore: replacing storage.writer with MPU in JSON flow. --- .../storage/BaseStorageWriteChannel.java | 3 +- .../google/cloud/storage/MPUWriteChannel.java | 156 ++++++++++++++++++ .../storage/MultipartUploadWriteChannel.java | 142 ++++++++++++++++ .../com/google/cloud/storage/StorageImpl.java | 24 +-- 4 files changed, 312 insertions(+), 13 deletions(-) create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/MPUWriteChannel.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageWriteChannel.java index 10f79c8dff..ee995af553 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BaseStorageWriteChannel.java @@ -116,8 +116,7 @@ public final int write(ByteBuffer src) throws IOException { if (!tmp.isOpen()) { return 0; } - int write = tmp.write(src); - return write; + return tmp.write(src); } catch (StorageException e) { throw new IOException(e); } catch (IOException e) { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/MPUWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/MPUWriteChannel.java new file mode 100644 index 0000000000..89225ef836 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/MPUWriteChannel.java @@ -0,0 +1,156 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// package com.google.cloud.storage; +// +// import com.google.api.core.ApiFuture; +// import com.google.api.core.SettableApiFuture; +// import com.google.cloud.RestorableState; +// import com.google.cloud.WriteChannel; +// import com.google.cloud.storage.Conversions.Decoder; +// import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; +// import com.google.cloud.storage.multipartupload.model.AbortMultipartUploadRequest; +// import com.google.cloud.storage.multipartupload.model.CompleteMultipartUploadRequest; +// import com.google.cloud.storage.multipartupload.model.CompleteMultipartUploadResponse; +// import com.google.cloud.storage.multipartupload.model.CompletedPart; +// import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadRequest; +// import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadResponse; +// import com.google.cloud.storage.multipartupload.model.UploadPartRequest; +// import com.google.cloud.storage.multipartupload.model.UploadPartResponse; +// import com.google.storage.v2.Object; +// import com.google.storage.v2.WriteObjectResponse; +// import java.io.IOException; +// import java.nio.ByteBuffer; +// import java.util.ArrayList; +// import java.util.List; +// import java.util.concurrent.Callable; +// +// final class MPUWriteChannel extends BaseStorageWriteChannel { +// +// private final MultipartUploadClient multipartUploadClient; +// private final CreateMultipartUploadRequest createRequest; +// +// MPUWriteChannel( +// MultipartUploadClient multipartUploadClient, CreateMultipartUploadRequest createRequest) { +// super(decode()); +// this.multipartUploadClient = multipartUploadClient; +// this.createRequest = createRequest; +// } +// +// @Override +// public RestorableState capture() { +// return CrossTransportUtils.throwHttpJsonOnly(WriteChannel.class, "capture"); +// } +// +// @Override +// protected LazyWriteChannel newLazyWriteChannel() { +// return new LazyWriteChannel<>( +// () -> +// new UnbufferedWritableByteChannelSession<>( +// new MultipartUploadSession(multipartUploadClient, createRequest), +// UnbufferedWritableByteChannel.of())); +// } +// +// private static class MultipartUploadSession +// implements Callable { +// +// private final SettableApiFuture result = SettableApiFuture.create(); +// private final MultipartUploadClient client; +// private final CreateMultipartUploadRequest createRequest; +// +// private String uploadId; +// private final List parts = new ArrayList<>(); +// private int partNumber = 1; +// private boolean open = true; +// +// private MultipartUploadSession( +// MultipartUploadClient client, CreateMultipartUploadRequest createRequest) { +// this.client = client; +// this.createRequest = createRequest; +// } +// +// @Override +// public UnbufferedWritableByteChannel call() throws Exception { +// CreateMultipartUploadResponse createResponse = client.createMultipartUpload(createRequest); +// this.uploadId = createResponse.getUploadId(); +// return new UnbufferedWritableByteChannel() { +// @Override +// public int write(ByteBuffer src) throws IOException { +// if (!open) { +// throw new IOException("Channel is closed."); +// } +// int remaining = src.remaining(); +// UploadPartRequest partRequest = +// UploadPartRequest.newBuilder( +// createRequest.getBucket(), createRequest.getObject(), uploadId, partNumber) +// .build(); +// UploadPartResponse partResponse = +// client.uploadPart(partRequest, RequestBody.of(src.duplicate())); +// parts.add( +// CompletedPart.newBuilder(partNumber, partResponse.getEtag()) +// .build()); +// partNumber++; +// return remaining; +// } +// +// @Override +// public boolean isOpen() { +// return open; +// } +// +// @Override +// public void close() throws IOException { +// if (!open) { +// return; +// } +// open = false; +// if (result.isCancelled()) { +// client.abortMultipartUpload( +// AbortMultipartUploadRequest.newBuilder( +// createRequest.getBucket(), createRequest.getObject(), uploadId) +// .build()); +// } else { +// CompleteMultipartUploadRequest completeRequest = +// CompleteMultipartUploadRequest.newBuilder( +// createRequest.getBucket(), createRequest.getObject(), uploadId, parts) +// .build(); +// CompleteMultipartUploadResponse completeResponse = +// client.completeMultipartUpload(completeRequest); +// +// Object resource = +// Object.newBuilder() +// .setBucket(completeResponse.getBucket()) +// .setName(completeResponse.getObject()) +// .setEtag(completeResponse.getEtag()) +// .setSize(completeResponse.getSize()) +// .build(); +// WriteObjectResponse responseObject = +// WriteObjectResponse.newBuilder().setResource(resource).build(); +// result.set(responseObject); +// } +// } +// }; +// } +// +// ApiFuture getResult() { +// return result; +// } +// } +// +// private static Decoder decode() { +// return Conversions.grpc().blobInfo().compose(WriteObjectResponse::getResource); +// } +// } \ No newline at end of file diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java new file mode 100644 index 0000000000..0d8b0fe0ff --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java @@ -0,0 +1,142 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.storage; + +import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; +import com.google.cloud.RestorableState; +import com.google.cloud.WriteChannel; +import com.google.cloud.storage.BlobReadChannelV2.BlobReadChannelContext; +import com.google.cloud.storage.multipartupload.model.CompleteMultipartUploadRequest; +import com.google.cloud.storage.multipartupload.model.CompletedMultipartUpload; +import com.google.cloud.storage.multipartupload.model.CompletedPart; +import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadRequest; +import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadResponse; +import com.google.cloud.storage.multipartupload.model.Part; +import com.google.cloud.storage.multipartupload.model.UploadPartRequest; +import com.google.cloud.storage.multipartupload.model.UploadPartResponse; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class MultipartUploadWriteChannel implements StorageWriteChannel { + + private static final int MIN_CHUNK_SIZE = 5 * 1024 * 1024; // 5MB + + private final SettableApiFuture result; + private final String bucketName; + private final String blobName; + private final String uploadId; + private final MultipartUploadClientImpl client; + private final ByteBuffer buffer; + List completedParts = new ArrayList<>(); + private int partNumber = 1; + private boolean open = true; + + public MultipartUploadWriteChannel( + BlobReadChannelContext context, String bucketName, String blobName) { + this.result = SettableApiFuture.create(); + this.bucketName = bucketName; + this.blobName = blobName; + this.buffer = ByteBuffer.allocate(MIN_CHUNK_SIZE); + + HttpStorageOptions options = context.getStorageOptions(); + this.client = + new MultipartUploadClientImpl( + context.getRetrier(), + MultipartUploadHttpRequestManager.createFrom(options), + options.getRetryAlgorithmManager()); + + CreateMultipartUploadRequest createRequest = CreateMultipartUploadRequest.builder().bucket(bucketName).key(blobName).build(); + CreateMultipartUploadResponse createResponse = client.createMultipartUpload(createRequest); + this.uploadId = createResponse.uploadId(); + } + + @Override + public ApiFuture getObject() { + return result; + } + + @Override + public void setChunkSize(int i) { + // The chunk size is fixed at 5MB + } + + @Override + public RestorableState capture() { + return null; + } + + @Override + public int write(ByteBuffer src) throws IOException { + if (!open) { + throw new IOException("Channel is closed"); + } + int bytesWritten = 0; + while (src.hasRemaining()) { + if (buffer.remaining() == 0) { + uploadPart(); + } + int bytesToCopy = Math.min(src.remaining(), buffer.remaining()); + byte[] array = new byte[bytesToCopy]; + src.get(array); + buffer.put(array); + bytesWritten += bytesToCopy; + } + return bytesWritten; + } + + private void uploadPart() { + buffer.flip(); + RequestBody requestBody = RequestBody.of(buffer); + UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(bucketName).key(blobName).uploadId(uploadId).partNumber(partNumber).build(); + UploadPartResponse uploadPartResponse = + client.uploadPart(uploadRequest, requestBody); + CompletedPart part = CompletedPart.builder() + .partNumber(partNumber) + .eTag(uploadPartResponse.eTag()) + .build(); + completedParts.add(part); + partNumber++; + buffer.clear(); + } + + + + @Override + public boolean isOpen() { + return open; + } + + @Override + public void close() throws IOException { + if (!open) { + return; + } + if (buffer.position() > 0) { + uploadPart(); + } + CompletedMultipartUpload completedMultipartUpload = + CompletedMultipartUpload.builder().parts(completedParts).build(); + CompleteMultipartUploadRequest completeRequest = + CompleteMultipartUploadRequest.builder().uploadId(uploadId).bucket(bucketName).key(blobName).multipartUpload(completedMultipartUpload).build(); + client.completeMultipartUpload(completeRequest); + result.set(null); // Or the actual BlobInfo if available + open = false; + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index d4d4217840..85c3fa31d2 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -753,17 +753,19 @@ public StorageWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); StorageObject encode = codecs.blobInfo().encode(updated); - // open the resumable session outside the write channel - // the exception behavior of open is different from #write(ByteBuffer) - Supplier uploadIdSupplier = - ResumableMedia.startUploadForBlobInfo( - getOptions(), - updated, - optionsMap, - retrier.withAlg(retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap))); - JsonResumableWrite jsonResumableWrite = - JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); - return new BlobWriteChannelV2(BlobReadChannelContext.from(this), jsonResumableWrite); + // // open the resumable session outside the write channel + // // the exception behavior of open is different from #write(ByteBuffer) + // Supplier uploadIdSupplier = + // ResumableMedia.startUploadForBlobInfo( + // getOptions(), + // updated, + // optionsMap, + // retrier.withAlg(retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap))); + // JsonResumableWrite jsonResumableWrite = + // JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); + // return new BlobWriteChannelV2(BlobReadChannelContext.from(this), jsonResumableWrite); + System.out.println("Using JSON writer with MPU"); + return new MultipartUploadWriteChannel(BlobReadChannelContext.from(this), updated.getBucket(), updated.getName()); } @Override From 7af29da8da69c9c934ad92e9af98279253b72a7f Mon Sep 17 00:00:00 2001 From: Shreyas Sinha Date: Mon, 22 Dec 2025 06:12:22 +0000 Subject: [PATCH 2/6] chore: adding parallelism to MPUWriteChannel --- .../storage/MultipartUploadWriteChannel.java | 86 ++++++++++++++----- 1 file changed, 66 insertions(+), 20 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java index 0d8b0fe0ff..308c0c47ce 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java @@ -16,6 +16,7 @@ package com.google.cloud.storage; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.cloud.RestorableState; import com.google.cloud.WriteChannel; @@ -25,14 +26,19 @@ import com.google.cloud.storage.multipartupload.model.CompletedPart; import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadRequest; import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadResponse; -import com.google.cloud.storage.multipartupload.model.Part; import com.google.cloud.storage.multipartupload.model.UploadPartRequest; import com.google.cloud.storage.multipartupload.model.UploadPartResponse; -import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; public class MultipartUploadWriteChannel implements StorageWriteChannel { @@ -44,7 +50,8 @@ public class MultipartUploadWriteChannel implements StorageWriteChannel { private final String uploadId; private final MultipartUploadClientImpl client; private final ByteBuffer buffer; - List completedParts = new ArrayList<>(); + private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(16)); + List> completedParts = new ArrayList<>(); private int partNumber = 1; private boolean open = true; @@ -103,16 +110,43 @@ public int write(ByteBuffer src) throws IOException { private void uploadPart() { buffer.flip(); - RequestBody requestBody = RequestBody.of(buffer); - UploadPartRequest uploadRequest = UploadPartRequest.builder().bucket(bucketName).key(blobName).uploadId(uploadId).partNumber(partNumber).build(); - UploadPartResponse uploadPartResponse = - client.uploadPart(uploadRequest, requestBody); - CompletedPart part = CompletedPart.builder() - .partNumber(partNumber) - .eTag(uploadPartResponse.eTag()) - .build(); - completedParts.add(part); - partNumber++; + final int partNumber = this.partNumber++; + final ByteBuffer partBuffer = ByteBuffer.allocate(buffer.remaining()); + partBuffer.put(buffer); + partBuffer.flip(); + Callable uploadTask = + () -> { + RequestBody requestBody = RequestBody.of(partBuffer); + UploadPartRequest uploadRequest = + UploadPartRequest.builder() + .bucket(bucketName) + .key(blobName) + .uploadId(uploadId) + .partNumber(partNumber) + .build(); + UploadPartResponse uploadPartResponse = client.uploadPart(uploadRequest, requestBody); + return CompletedPart.builder() + .partNumber(partNumber) + .eTag(uploadPartResponse.eTag()) + .build(); + }; + ListenableFuture listenableFuture = executor.submit(uploadTask); + SettableApiFuture settableApiFuture = SettableApiFuture.create(); + Futures.addCallback( + listenableFuture, + new FutureCallback() { + @Override + public void onFailure(Throwable t) { + settableApiFuture.setException(t); + } + + @Override + public void onSuccess(CompletedPart result) { + settableApiFuture.set(result); + } + }, + executor); + completedParts.add(settableApiFuture); buffer.clear(); } @@ -131,12 +165,24 @@ public void close() throws IOException { if (buffer.position() > 0) { uploadPart(); } - CompletedMultipartUpload completedMultipartUpload = - CompletedMultipartUpload.builder().parts(completedParts).build(); - CompleteMultipartUploadRequest completeRequest = - CompleteMultipartUploadRequest.builder().uploadId(uploadId).bucket(bucketName).key(blobName).multipartUpload(completedMultipartUpload).build(); - client.completeMultipartUpload(completeRequest); - result.set(null); // Or the actual BlobInfo if available - open = false; + try { + List parts = ApiFutures.allAsList(completedParts).get(); + CompletedMultipartUpload completedMultipartUpload = + CompletedMultipartUpload.builder().parts(parts).build(); + CompleteMultipartUploadRequest completeRequest = + CompleteMultipartUploadRequest.builder() + .uploadId(uploadId) + .bucket(bucketName) + .key(blobName) + .multipartUpload(completedMultipartUpload) + .build(); + client.completeMultipartUpload(completeRequest); + result.set(null); // Or the actual BlobInfo if available + } catch (Exception e) { + throw new IOException(e); + } finally { + open = false; + executor.shutdown(); + } } } From e3d9c33bf2416545ff10d95e81c93905f0708dc2 Mon Sep 17 00:00:00 2001 From: Shreyas Sinha Date: Mon, 22 Dec 2025 13:47:00 +0000 Subject: [PATCH 3/6] chore: implemented buffer pool to reduces copy operation. --- .../google/cloud/storage/MPUWriteChannel.java | 156 ------------------ .../storage/MultipartUploadWriteChannel.java | 137 +++++++++------ 2 files changed, 85 insertions(+), 208 deletions(-) delete mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/MPUWriteChannel.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/MPUWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/MPUWriteChannel.java deleted file mode 100644 index 89225ef836..0000000000 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/MPUWriteChannel.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// package com.google.cloud.storage; -// -// import com.google.api.core.ApiFuture; -// import com.google.api.core.SettableApiFuture; -// import com.google.cloud.RestorableState; -// import com.google.cloud.WriteChannel; -// import com.google.cloud.storage.Conversions.Decoder; -// import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; -// import com.google.cloud.storage.multipartupload.model.AbortMultipartUploadRequest; -// import com.google.cloud.storage.multipartupload.model.CompleteMultipartUploadRequest; -// import com.google.cloud.storage.multipartupload.model.CompleteMultipartUploadResponse; -// import com.google.cloud.storage.multipartupload.model.CompletedPart; -// import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadRequest; -// import com.google.cloud.storage.multipartupload.model.CreateMultipartUploadResponse; -// import com.google.cloud.storage.multipartupload.model.UploadPartRequest; -// import com.google.cloud.storage.multipartupload.model.UploadPartResponse; -// import com.google.storage.v2.Object; -// import com.google.storage.v2.WriteObjectResponse; -// import java.io.IOException; -// import java.nio.ByteBuffer; -// import java.util.ArrayList; -// import java.util.List; -// import java.util.concurrent.Callable; -// -// final class MPUWriteChannel extends BaseStorageWriteChannel { -// -// private final MultipartUploadClient multipartUploadClient; -// private final CreateMultipartUploadRequest createRequest; -// -// MPUWriteChannel( -// MultipartUploadClient multipartUploadClient, CreateMultipartUploadRequest createRequest) { -// super(decode()); -// this.multipartUploadClient = multipartUploadClient; -// this.createRequest = createRequest; -// } -// -// @Override -// public RestorableState capture() { -// return CrossTransportUtils.throwHttpJsonOnly(WriteChannel.class, "capture"); -// } -// -// @Override -// protected LazyWriteChannel newLazyWriteChannel() { -// return new LazyWriteChannel<>( -// () -> -// new UnbufferedWritableByteChannelSession<>( -// new MultipartUploadSession(multipartUploadClient, createRequest), -// UnbufferedWritableByteChannel.of())); -// } -// -// private static class MultipartUploadSession -// implements Callable { -// -// private final SettableApiFuture result = SettableApiFuture.create(); -// private final MultipartUploadClient client; -// private final CreateMultipartUploadRequest createRequest; -// -// private String uploadId; -// private final List parts = new ArrayList<>(); -// private int partNumber = 1; -// private boolean open = true; -// -// private MultipartUploadSession( -// MultipartUploadClient client, CreateMultipartUploadRequest createRequest) { -// this.client = client; -// this.createRequest = createRequest; -// } -// -// @Override -// public UnbufferedWritableByteChannel call() throws Exception { -// CreateMultipartUploadResponse createResponse = client.createMultipartUpload(createRequest); -// this.uploadId = createResponse.getUploadId(); -// return new UnbufferedWritableByteChannel() { -// @Override -// public int write(ByteBuffer src) throws IOException { -// if (!open) { -// throw new IOException("Channel is closed."); -// } -// int remaining = src.remaining(); -// UploadPartRequest partRequest = -// UploadPartRequest.newBuilder( -// createRequest.getBucket(), createRequest.getObject(), uploadId, partNumber) -// .build(); -// UploadPartResponse partResponse = -// client.uploadPart(partRequest, RequestBody.of(src.duplicate())); -// parts.add( -// CompletedPart.newBuilder(partNumber, partResponse.getEtag()) -// .build()); -// partNumber++; -// return remaining; -// } -// -// @Override -// public boolean isOpen() { -// return open; -// } -// -// @Override -// public void close() throws IOException { -// if (!open) { -// return; -// } -// open = false; -// if (result.isCancelled()) { -// client.abortMultipartUpload( -// AbortMultipartUploadRequest.newBuilder( -// createRequest.getBucket(), createRequest.getObject(), uploadId) -// .build()); -// } else { -// CompleteMultipartUploadRequest completeRequest = -// CompleteMultipartUploadRequest.newBuilder( -// createRequest.getBucket(), createRequest.getObject(), uploadId, parts) -// .build(); -// CompleteMultipartUploadResponse completeResponse = -// client.completeMultipartUpload(completeRequest); -// -// Object resource = -// Object.newBuilder() -// .setBucket(completeResponse.getBucket()) -// .setName(completeResponse.getObject()) -// .setEtag(completeResponse.getEtag()) -// .setSize(completeResponse.getSize()) -// .build(); -// WriteObjectResponse responseObject = -// WriteObjectResponse.newBuilder().setResource(resource).build(); -// result.set(responseObject); -// } -// } -// }; -// } -// -// ApiFuture getResult() { -// return result; -// } -// } -// -// private static Decoder decode() { -// return Conversions.grpc().blobInfo().compose(WriteObjectResponse::getResource); -// } -// } \ No newline at end of file diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java index 308c0c47ce..f261ca500b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2024 Google LLC + * Copyright 2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,30 +37,46 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Executors; public class MultipartUploadWriteChannel implements StorageWriteChannel { private static final int MIN_CHUNK_SIZE = 5 * 1024 * 1024; // 5MB + private static final int POOL_SIZE = 8; private final SettableApiFuture result; private final String bucketName; private final String blobName; private final String uploadId; private final MultipartUploadClientImpl client; - private final ByteBuffer buffer; - private final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(16)); + private final ListeningExecutorService executor = + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(POOL_SIZE)); + private final BlockingQueue bufferQueue; + private ByteBuffer currentBuffer; List> completedParts = new ArrayList<>(); private int partNumber = 1; private boolean open = true; - public MultipartUploadWriteChannel( + MultipartUploadWriteChannel( BlobReadChannelContext context, String bucketName, String blobName) { this.result = SettableApiFuture.create(); this.bucketName = bucketName; this.blobName = blobName; - this.buffer = ByteBuffer.allocate(MIN_CHUNK_SIZE); + + this.bufferQueue = new ArrayBlockingQueue<>(POOL_SIZE); + for (int i = 0; i < POOL_SIZE; i++) { + //noinspection ResultOfMethodCallIgnored + this.bufferQueue.offer(ByteBuffer.allocate(MIN_CHUNK_SIZE)); + } + try { + this.currentBuffer = bufferQueue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } HttpStorageOptions options = context.getStorageOptions(); this.client = @@ -69,7 +85,8 @@ public MultipartUploadWriteChannel( MultipartUploadHttpRequestManager.createFrom(options), options.getRetryAlgorithmManager()); - CreateMultipartUploadRequest createRequest = CreateMultipartUploadRequest.builder().bucket(bucketName).key(blobName).build(); + CreateMultipartUploadRequest createRequest = + CreateMultipartUploadRequest.builder().bucket(bucketName).key(blobName).build(); CreateMultipartUploadResponse createResponse = client.createMultipartUpload(createRequest); this.uploadId = createResponse.uploadId(); } @@ -96,62 +113,70 @@ public int write(ByteBuffer src) throws IOException { } int bytesWritten = 0; while (src.hasRemaining()) { - if (buffer.remaining() == 0) { + if (currentBuffer.remaining() == 0) { uploadPart(); } - int bytesToCopy = Math.min(src.remaining(), buffer.remaining()); - byte[] array = new byte[bytesToCopy]; - src.get(array); - buffer.put(array); + int bytesToCopy = Math.min(src.remaining(), currentBuffer.remaining()); + int originalLimit = src.limit(); + src.limit(src.position() + bytesToCopy); + currentBuffer.put(src); + src.limit(originalLimit); bytesWritten += bytesToCopy; } return bytesWritten; } - private void uploadPart() { - buffer.flip(); - final int partNumber = this.partNumber++; - final ByteBuffer partBuffer = ByteBuffer.allocate(buffer.remaining()); - partBuffer.put(buffer); - partBuffer.flip(); - Callable uploadTask = - () -> { - RequestBody requestBody = RequestBody.of(partBuffer); - UploadPartRequest uploadRequest = - UploadPartRequest.builder() - .bucket(bucketName) - .key(blobName) - .uploadId(uploadId) + private void uploadPart() throws IOException { + try { + final ByteBuffer bufferToUpload = currentBuffer; + currentBuffer = bufferQueue.take(); + + bufferToUpload.flip(); + final int partNumber = this.partNumber++; + Callable uploadTask = + () -> { + try { + RequestBody requestBody = RequestBody.of(bufferToUpload); + UploadPartRequest uploadRequest = + UploadPartRequest.builder() + .bucket(bucketName) + .key(blobName) + .uploadId(uploadId) + .partNumber(partNumber) + .build(); + UploadPartResponse uploadPartResponse = client.uploadPart(uploadRequest, requestBody); + return CompletedPart.builder() .partNumber(partNumber) + .eTag(uploadPartResponse.eTag()) .build(); - UploadPartResponse uploadPartResponse = client.uploadPart(uploadRequest, requestBody); - return CompletedPart.builder() - .partNumber(partNumber) - .eTag(uploadPartResponse.eTag()) - .build(); - }; - ListenableFuture listenableFuture = executor.submit(uploadTask); - SettableApiFuture settableApiFuture = SettableApiFuture.create(); - Futures.addCallback( - listenableFuture, - new FutureCallback() { - @Override - public void onFailure(Throwable t) { - settableApiFuture.setException(t); - } - - @Override - public void onSuccess(CompletedPart result) { - settableApiFuture.set(result); - } - }, - executor); - completedParts.add(settableApiFuture); - buffer.clear(); + } finally { + bufferToUpload.clear(); + bufferQueue.put(bufferToUpload); + } + }; + ListenableFuture listenableFuture = executor.submit(uploadTask); + SettableApiFuture settableApiFuture = SettableApiFuture.create(); + Futures.addCallback( + listenableFuture, + new FutureCallback() { + @Override + public void onFailure(Throwable t) { + settableApiFuture.setException(t); + } + + @Override + public void onSuccess(CompletedPart result) { + settableApiFuture.set(result); + } + }, + executor); + completedParts.add(settableApiFuture); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for a buffer", e); + } } - - @Override public boolean isOpen() { return open; @@ -162,9 +187,17 @@ public void close() throws IOException { if (!open) { return; } - if (buffer.position() > 0) { + if (currentBuffer.position() > 0) { uploadPart(); } + + if (currentBuffer != null) { + currentBuffer.clear(); + //noinspection ResultOfMethodCallIgnored + bufferQueue.offer(currentBuffer); + currentBuffer = null; + } + try { List parts = ApiFutures.allAsList(completedParts).get(); CompletedMultipartUpload completedMultipartUpload = From 52909c703bec93674a94f3145ee07f6d5e107419 Mon Sep 17 00:00:00 2001 From: Shreyas Sinha Date: Mon, 22 Dec 2025 14:05:27 +0000 Subject: [PATCH 4/6] chore: improved logging message. --- .../com/google/cloud/storage/MultipartUploadWriteChannel.java | 2 +- .../src/main/java/com/google/cloud/storage/StorageImpl.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java index f261ca500b..ccd681d2a2 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java @@ -45,7 +45,7 @@ public class MultipartUploadWriteChannel implements StorageWriteChannel { private static final int MIN_CHUNK_SIZE = 5 * 1024 * 1024; // 5MB - private static final int POOL_SIZE = 8; + private static final int POOL_SIZE = 16; private final SettableApiFuture result; private final String bucketName; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index 85c3fa31d2..6fd7a60ce8 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -764,7 +764,7 @@ public StorageWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) // JsonResumableWrite jsonResumableWrite = // JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); // return new BlobWriteChannelV2(BlobReadChannelContext.from(this), jsonResumableWrite); - System.out.println("Using JSON writer with MPU"); + System.out.println("Using JSON writer with XML MPU API"); return new MultipartUploadWriteChannel(BlobReadChannelContext.from(this), updated.getBucket(), updated.getName()); } From 02fe3ff6a6d9784f60f959b5dc9e24d3003b6bfa Mon Sep 17 00:00:00 2001 From: Shreyas Sinha Date: Mon, 22 Dec 2025 14:18:10 +0000 Subject: [PATCH 5/6] chore: refactoring code a bit --- .../storage/MultipartUploadWriteChannel.java | 197 +++++++++++------- .../com/google/cloud/storage/StorageImpl.java | 2 +- 2 files changed, 124 insertions(+), 75 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java index ccd681d2a2..d305868fcd 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.google.cloud.storage; import com.google.api.core.ApiFuture; @@ -21,6 +22,7 @@ import com.google.cloud.RestorableState; import com.google.cloud.WriteChannel; import com.google.cloud.storage.BlobReadChannelV2.BlobReadChannelContext; +import com.google.cloud.storage.multipartupload.model.AbortMultipartUploadRequest; import com.google.cloud.storage.multipartupload.model.CompleteMultipartUploadRequest; import com.google.cloud.storage.multipartupload.model.CompletedMultipartUpload; import com.google.cloud.storage.multipartupload.model.CompletedPart; @@ -40,6 +42,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; public class MultipartUploadWriteChannel implements StorageWriteChannel { @@ -47,48 +50,31 @@ public class MultipartUploadWriteChannel implements StorageWriteChannel { private static final int MIN_CHUNK_SIZE = 5 * 1024 * 1024; // 5MB private static final int POOL_SIZE = 16; - private final SettableApiFuture result; + private final SettableApiFuture result = SettableApiFuture.create(); + private final BlobReadChannelContext context; private final String bucketName; private final String blobName; - private final String uploadId; - private final MultipartUploadClientImpl client; - private final ListeningExecutorService executor = - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(POOL_SIZE)); + private final ListeningExecutorService executor; private final BlockingQueue bufferQueue; + private final List> completedParts = new ArrayList<>(); + + private MultipartUploadClientImpl client; + private String uploadId; private ByteBuffer currentBuffer; - List> completedParts = new ArrayList<>(); private int partNumber = 1; private boolean open = true; MultipartUploadWriteChannel( BlobReadChannelContext context, String bucketName, String blobName) { - this.result = SettableApiFuture.create(); + this.context = context; this.bucketName = bucketName; this.blobName = blobName; - + this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(POOL_SIZE)); this.bufferQueue = new ArrayBlockingQueue<>(POOL_SIZE); for (int i = 0; i < POOL_SIZE; i++) { - //noinspection ResultOfMethodCallIgnored - this.bufferQueue.offer(ByteBuffer.allocate(MIN_CHUNK_SIZE)); + bufferQueue.offer(ByteBuffer.allocate(MIN_CHUNK_SIZE)); } - try { - this.currentBuffer = bufferQueue.take(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - - HttpStorageOptions options = context.getStorageOptions(); - this.client = - new MultipartUploadClientImpl( - context.getRetrier(), - MultipartUploadHttpRequestManager.createFrom(options), - options.getRetryAlgorithmManager()); - - CreateMultipartUploadRequest createRequest = - CreateMultipartUploadRequest.builder().bucket(bucketName).key(blobName).build(); - CreateMultipartUploadResponse createResponse = client.createMultipartUpload(createRequest); - this.uploadId = createResponse.uploadId(); + this.currentBuffer = bufferQueue.poll(); // Non-blocking } @Override @@ -98,7 +84,7 @@ public ApiFuture getObject() { @Override public void setChunkSize(int i) { - // The chunk size is fixed at 5MB + // The chunk size is fixed at 5MB for multipart uploads. } @Override @@ -111,11 +97,15 @@ public int write(ByteBuffer src) throws IOException { if (!open) { throw new IOException("Channel is closed"); } + lazyInit(); + int bytesWritten = 0; while (src.hasRemaining()) { if (currentBuffer.remaining() == 0) { - uploadPart(); + flushBuffer(); } + // The simplest way to handle incoming buffers is to copy their content into + // our managed, fixed-size buffers. int bytesToCopy = Math.min(src.remaining(), currentBuffer.remaining()); int originalLimit = src.limit(); src.limit(src.position() + bytesToCopy); @@ -126,13 +116,35 @@ public int write(ByteBuffer src) throws IOException { return bytesWritten; } - private void uploadPart() throws IOException { + private void lazyInit() throws IOException { + if (client == null) { + HttpStorageOptions options = context.getStorageOptions(); + this.client = + new MultipartUploadClientImpl( + context.getRetrier(), + MultipartUploadHttpRequestManager.createFrom(options), + options.getRetryAlgorithmManager()); + } + if (uploadId == null) { + try { + CreateMultipartUploadRequest createRequest = + CreateMultipartUploadRequest.builder().bucket(bucketName).key(blobName).build(); + CreateMultipartUploadResponse createResponse = client.createMultipartUpload(createRequest); + this.uploadId = createResponse.uploadId(); + } catch (Exception e) { + throw new IOException("Failed to initiate multipart upload", e); + } + } + } + + private void flushBuffer() throws IOException { try { final ByteBuffer bufferToUpload = currentBuffer; - currentBuffer = bufferQueue.take(); + currentBuffer = bufferQueue.take(); // Block until a buffer is available bufferToUpload.flip(); - final int partNumber = this.partNumber++; + final int partNum = this.partNumber++; + Callable uploadTask = () -> { try { @@ -142,35 +154,23 @@ private void uploadPart() throws IOException { .bucket(bucketName) .key(blobName) .uploadId(uploadId) - .partNumber(partNumber) + .partNumber(partNum) .build(); UploadPartResponse uploadPartResponse = client.uploadPart(uploadRequest, requestBody); return CompletedPart.builder() - .partNumber(partNumber) + .partNumber(partNum) .eTag(uploadPartResponse.eTag()) .build(); } finally { + // Return buffer to the pool bufferToUpload.clear(); bufferQueue.put(bufferToUpload); } }; - ListenableFuture listenableFuture = executor.submit(uploadTask); - SettableApiFuture settableApiFuture = SettableApiFuture.create(); - Futures.addCallback( - listenableFuture, - new FutureCallback() { - @Override - public void onFailure(Throwable t) { - settableApiFuture.setException(t); - } - @Override - public void onSuccess(CompletedPart result) { - settableApiFuture.set(result); - } - }, - executor); - completedParts.add(settableApiFuture); + ListenableFuture future = executor.submit(uploadTask); + completedParts.add(toApiFuture(future)); + } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Interrupted while waiting for a buffer", e); @@ -187,35 +187,84 @@ public void close() throws IOException { if (!open) { return; } - if (currentBuffer.position() > 0) { - uploadPart(); - } - - if (currentBuffer != null) { - currentBuffer.clear(); - //noinspection ResultOfMethodCallIgnored - bufferQueue.offer(currentBuffer); - currentBuffer = null; - } + open = false; try { + // If an upload was initiated and there's data in the current buffer, flush it. + if (uploadId != null && currentBuffer.position() > 0) { + flushBuffer(); + } + + // Wait for all parts to finish uploading. List parts = ApiFutures.allAsList(completedParts).get(); - CompletedMultipartUpload completedMultipartUpload = - CompletedMultipartUpload.builder().parts(parts).build(); - CompleteMultipartUploadRequest completeRequest = - CompleteMultipartUploadRequest.builder() - .uploadId(uploadId) - .bucket(bucketName) - .key(blobName) - .multipartUpload(completedMultipartUpload) - .build(); - client.completeMultipartUpload(completeRequest); - result.set(null); // Or the actual BlobInfo if available - } catch (Exception e) { + + if (!parts.isEmpty()) { + completeUpload(parts); + BlobId blobId = BlobId.of(bucketName, blobName); + result.set(BlobInfo.newBuilder(blobId).build()); + } else if (uploadId != null) { + // If no parts were successfully uploaded, but an upload was initiated, abort it. + abortUpload(); + result.set(null); + } else { + // No upload was ever started. + result.set(null); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); } finally { - open = false; + // Return the current buffer to the pool + if (currentBuffer != null) { + currentBuffer.clear(); + bufferQueue.offer(currentBuffer); + currentBuffer = null; + } executor.shutdown(); } } + + private void completeUpload(List parts) { + CompletedMultipartUpload completedMultipartUpload = + CompletedMultipartUpload.builder().parts(parts).build(); + CompleteMultipartUploadRequest completeRequest = + CompleteMultipartUploadRequest.builder() + .uploadId(uploadId) + .bucket(bucketName) + .key(blobName) + .multipartUpload(completedMultipartUpload) + .build(); + client.completeMultipartUpload(completeRequest); + } + + private void abortUpload() { + AbortMultipartUploadRequest abortRequest = + AbortMultipartUploadRequest.builder() + .bucket(bucketName) + .key(blobName) + .uploadId(uploadId) + .build(); + client.abortMultipartUpload(abortRequest); + } + + private static ApiFuture toApiFuture(ListenableFuture listenableFuture) { + SettableApiFuture settable = SettableApiFuture.create(); + Futures.addCallback( + listenableFuture, + new FutureCallback() { + @Override + public void onSuccess(T result) { + settable.set(result); + } + + @Override + public void onFailure(Throwable t) { + settable.setException(t); + } + }, + MoreExecutors.directExecutor()); + return settable; + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index 6fd7a60ce8..593e4af538 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -764,7 +764,7 @@ public StorageWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) // JsonResumableWrite jsonResumableWrite = // JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); // return new BlobWriteChannelV2(BlobReadChannelContext.from(this), jsonResumableWrite); - System.out.println("Using JSON writer with XML MPU API"); + System.out.println("JSON writer with XML MPU API"); return new MultipartUploadWriteChannel(BlobReadChannelContext.from(this), updated.getBucket(), updated.getName()); } From 50c3756db1cf33b47a00f72b7ba85afa312f10f4 Mon Sep 17 00:00:00 2001 From: Shreyas Sinha Date: Mon, 22 Dec 2025 14:25:30 +0000 Subject: [PATCH 6/6] chore: adding lint fixes. --- .../google/cloud/storage/MultipartUploadWriteChannel.java | 7 +++---- .../main/java/com/google/cloud/storage/StorageImpl.java | 6 ++++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java index d305868fcd..f8c40c48d7 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/MultipartUploadWriteChannel.java @@ -64,8 +64,7 @@ public class MultipartUploadWriteChannel implements StorageWriteChannel { private int partNumber = 1; private boolean open = true; - MultipartUploadWriteChannel( - BlobReadChannelContext context, String bucketName, String blobName) { + MultipartUploadWriteChannel(BlobReadChannelContext context, String bucketName, String blobName) { this.context = context; this.bucketName = bucketName; this.blobName = blobName; @@ -255,12 +254,12 @@ private static ApiFuture toApiFuture(ListenableFuture listenableFuture listenableFuture, new FutureCallback() { @Override - public void onSuccess(T result) { + public void onSuccess(T result) { settable.set(result); } @Override - public void onFailure(Throwable t) { + public void onFailure(Throwable t) { settable.setException(t); } }, diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index 593e4af538..a16d3b74ed 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -760,12 +760,14 @@ public StorageWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) // getOptions(), // updated, // optionsMap, - // retrier.withAlg(retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap))); + // + // retrier.withAlg(retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap))); // JsonResumableWrite jsonResumableWrite = // JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); // return new BlobWriteChannelV2(BlobReadChannelContext.from(this), jsonResumableWrite); System.out.println("JSON writer with XML MPU API"); - return new MultipartUploadWriteChannel(BlobReadChannelContext.from(this), updated.getBucket(), updated.getName()); + return new MultipartUploadWriteChannel( + BlobReadChannelContext.from(this), updated.getBucket(), updated.getName()); } @Override