Skip to content

Conversation

@v-pratap
Copy link
Contributor

@v-pratap v-pratap commented Jan 6, 2026

This PR implements scalable multi-stream reading for Zonal Buckets by introducing a MultiStreamManager to pool and reuse gRPC connections. It mitigates performance regressions from frequent stream creation by reusing idle streams and load-balancing requests, while maintaining a proactive background connection to minimize latency during ramp-up.
Class UML diagrams of the final architecture: Link
Benchmarking script used: Link

Bechmarking the changes by reading 500 GB data (thread count = 1),

Bechmarking the changes by reading 500 GB data (thread count = 30),

Benchmarking by changing the MaximumRangeSizeOption values,

Code accurate Control flow diagram for this snippet,

auto async = MakeAsyncClient(project_id);
google::storage::v2::BidiReadObjectSpec spec; spec.set_bucket("projects/_/buckets/" + bucket); spec.set_object(object);
auto descriptor = *async.Open(spec).get();
auto [reader, token] = descriptor.Read(/*offset=*/0, /*limit=*/400 * 1024 * 1024);
for (auto t = std::move(token); t.valid(); ) { auto r = reader.Read(std::move(t)).get(); t = std::get<1>(*r); /* process std::get<0>(*r) */ }

@v-pratap v-pratap requested review from a team as code owners January 6, 2026 12:25
@product-auto-label product-auto-label bot added the api: storage Issues related to the Cloud Storage API. label Jan 6, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @v-pratap, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the asynchronous download capabilities for Google Cloud Storage by introducing a MultiStreamManager. This new component intelligently pools and reuses gRPC connections, distributing read operations across multiple streams. The primary goal is to improve performance and scalability for large data transfers, especially in multi-threaded environments, by reducing connection overhead and proactively managing stream availability. The changes involve a substantial refactoring of the internal object descriptor logic to leverage this new multi-streaming architecture.

Highlights

  • Multi-Stream Manager Implementation: Introduced a new MultiStreamManager component to efficiently manage and reuse gRPC streams for asynchronous downloads, particularly benefiting Zonal Buckets.
  • Performance Improvement: Benchmarking demonstrates significant performance gains, with multi-stream reading achieving up to 1.49 GB/s (a ~37% increase) compared to 1.09 GB/s for single-stream in multi-threaded scenarios.
  • Stream Reuse and Load Balancing: The system now reuses idle streams and load-balances read requests across available streams, mitigating performance regressions caused by frequent stream creation.
  • Proactive Stream Creation: A proactive background mechanism ensures a new stream is always being established, minimizing latency during ramp-up and ensuring streams are readily available.
  • Refactored ObjectDescriptorImpl: The core ObjectDescriptorImpl class has been refactored to integrate with the MultiStreamManager, centralizing stream management logic and improving maintainability.
  • Enhanced Test Coverage: New unit tests for MultiStreamManager and updated tests for ObjectDescriptorImpl have been added to validate the new multi-streaming and stream reuse behaviors, including scenarios for proactive stream creation and error handling.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a MultiStreamManager to support multiple gRPC streams for asynchronous downloads, aiming to improve performance through connection pooling and reuse. The implementation is well-structured, encapsulating stream management logic effectively. The changes include proactive stream creation and reuse of idle streams, with corresponding updates to ObjectDescriptorImpl. The test suite has been significantly enhanced to cover the new multi-stream functionality, including scenarios for proactive creation, stream reuse, and various error conditions. I've found one critical issue regarding unhandled exceptions that could lead to application termination.

Comment on lines +102 to +127
stream_future.then([w = WeakFromThis()](auto f) {
auto self = w.lock();
if (!self) return;

auto stream_result = f.get();
if (!stream_result) {
// Stream creation failed.
// The next call to AssurePendingStreamQueued will retry creation.
return;
}

std::unique_lock<std::mutex> lk(self->mu_);
if (self->cancelled_) return;

auto read_stream =
std::make_shared<ReadStream>(std::move(stream_result->stream),
self->resume_policy_prototype_->clone());

auto new_it = self->stream_manager_->AddStream(std::move(read_stream));

// Now that we consumed pending_stream_, queue the next one immediately.
self->AssurePendingStreamQueued(lk);

lk.unlock();
self->OnRead(new_it, std::move(stream_result->first_response));
});

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The call to f.get() on line 106 may throw an unhandled exception if the stream_future is cancelled, which would terminate the application. The ObjectDescriptorImpl::Cancel() method does cancel pending_stream_, which is where stream_future comes from. The continuation lambda should handle this potential exception, for example by wrapping the f.get() call in a try-catch block.

  stream_future.then([w = WeakFromThis()](auto f) {
    auto self = w.lock();
    if (!self) return;

    StatusOr<OpenStreamResult> stream_result;
    try {
      stream_result = f.get();
    } catch (...) {
      // The future was cancelled, which is expected if the ObjectDescriptor is
      // destroyed. This is not an error. The next call to
      // AssurePendingStreamQueued() will create a new stream if needed.
      return;
    }
    if (!stream_result) {
      // Stream creation failed.
      // The next call to AssurePendingStreamQueued will retry creation.
      return;
    }

    std::unique_lock<std::mutex> lk(self->mu_);
    if (self->cancelled_) return;

    auto read_stream =
        std::make_shared<ReadStream>(std::move(stream_result->stream),
                                     self->resume_policy_prototype_->clone());

    auto new_it = self->stream_manager_->AddStream(std::move(read_stream));

    // Now that we consumed pending_stream_, queue the next one immediately.
    self->AssurePendingStreamQueued(lk);

    lk.unlock();
    self->OnRead(new_it, std::move(stream_result->first_response));
  });

@codecov
Copy link

codecov bot commented Jan 6, 2026

Codecov Report

❌ Patch coverage is 98.02632% with 15 lines in your changes missing coverage. Please review.
✅ Project coverage is 92.96%. Comparing base (95a7914) to head (f3db622).

Files with missing lines Patch % Lines
...rage/internal/async/object_descriptor_impl_test.cc 97.74% 9 Missing ⚠️
...loud/storage/internal/async/multi_stream_manager.h 93.10% 4 Missing ⚠️
...d/storage/internal/async/object_descriptor_impl.cc 98.34% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #15858      +/-   ##
==========================================
+ Coverage   92.95%   92.96%   +0.01%     
==========================================
  Files        2458     2460       +2     
  Lines      227590   228211     +621     
==========================================
+ Hits       211548   212158     +610     
- Misses      16042    16053      +11     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

// Constructor creates the first stream using the factory immediately.
explicit MultiStreamManager(StreamFactory stream_factory)
: stream_factory_(std::move(stream_factory)) {
streams_.push_back(Stream{stream_factory_(), {}});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: here and other places where push_back or push_front is called while constructing the element at the same time are good opportunities to use emplace_back or emplace_front instead.

}

bool Empty() const { return streams_.empty(); }
StreamIterator End() { return streams_.end(); }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can End be made const?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: storage Issues related to the Cloud Storage API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants