-
Notifications
You must be signed in to change notification settings - Fork 2.2k
feat(plugins): Add RateLimitPlugin for global request rate limiting #3408
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Add a new RateLimitPlugin that enforces global rate limiting across all
LLM models using a sliding window algorithm. The plugin blocks (waits)
when the rate limit is exceeded, ensuring requests are processed within
the configured limit.
Key features:
- Global rate limiting (default 15 QPM) across all models
- Sliding window algorithm for accurate tracking
- Automatic blocking when limit exceeded (no errors thrown)
- Thread-safe with asyncio locks
- Automatic cleanup of expired timestamps
Example usage:
```python
from google.adk.plugins import RateLimitPlugin
runner = Runner(
agents=[agent],
plugins=[RateLimitPlugin(max_requests_per_minute=15)]
)
```
Summary of ChangesHello @Jacksunwei, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request integrates a Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a RateLimitPlugin to enforce global rate limiting for LLM requests. The implementation uses a sliding window algorithm and is designed to be thread-safe with asyncio.Lock. While the feature is valuable and the test coverage is good, I've identified a critical race condition in the core rate-limiting logic. The current implementation can allow the rate limit to be exceeded under concurrent loads. My review includes a detailed explanation of the issue and a code suggestion to refactor the logic, which resolves the race condition and simplifies the implementation.
| async def _wait_for_rate_limit(self, current_time: float) -> None: | ||
| """Wait until a request slot becomes available. | ||
|
|
||
| Args: | ||
| current_time: Current time in seconds since epoch. | ||
| """ | ||
| while True: | ||
| async with self._lock: | ||
| timestamps = self._clean_old_timestamps( | ||
| self._request_timestamps, time.time() | ||
| ) | ||
| self._request_timestamps = timestamps | ||
|
|
||
| if len(timestamps) < self.max_requests: | ||
| # Slot available, exit loop | ||
| return | ||
|
|
||
| # Calculate wait time until the oldest request falls outside the window | ||
| oldest_timestamp = timestamps[0] | ||
| wait_seconds = 60.0 - (time.time() - oldest_timestamp) + 0.1 | ||
|
|
||
| # Wait outside the lock to allow other operations | ||
| if wait_seconds > 0: | ||
| await asyncio.sleep(wait_seconds) | ||
| else: | ||
| # Re-check immediately | ||
| await asyncio.sleep(0.01) | ||
|
|
||
| async def before_model_callback( | ||
| self, *, callback_context: CallbackContext, llm_request: LlmRequest | ||
| ) -> Optional[LlmResponse]: | ||
| """Check and enforce rate limits before each LLM request. | ||
|
|
||
| This callback is invoked before every LLM request. It checks whether | ||
| the request would exceed the configured global rate limit across all models. | ||
| If so, it blocks (waits) until the rate limit allows the request. | ||
|
|
||
| Args: | ||
| callback_context: Context containing agent, user, and session information. | ||
| llm_request: The LLM request that is about to be sent. | ||
|
|
||
| Returns: | ||
| None to allow the request to proceed (after waiting if necessary). | ||
| """ | ||
| current_time = time.time() | ||
|
|
||
| async with self._lock: | ||
| # Clean old timestamps | ||
| timestamps = self._clean_old_timestamps( | ||
| self._request_timestamps, current_time | ||
| ) | ||
| self._request_timestamps = timestamps | ||
|
|
||
| # Check if rate limit would be exceeded | ||
| if len(timestamps) >= self.max_requests: | ||
| # Need to wait | ||
| pass | ||
| else: | ||
| # Slot available, record and proceed | ||
| self._request_timestamps.append(current_time) | ||
| return None | ||
|
|
||
| # Wait for availability if limit exceeded | ||
| await self._wait_for_rate_limit(current_time) | ||
|
|
||
| # Record this request after waiting | ||
| async with self._lock: | ||
| current_time = time.time() | ||
| self._request_timestamps.append(current_time) | ||
|
|
||
| # Allow request to proceed | ||
| return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation of the rate limiter has a critical race condition that can cause the rate limit to be exceeded under concurrent load. The before_model_callback method checks the limit, but if it's exceeded, it releases the lock before waiting. After waiting, it re-acquires the lock and appends a timestamp without re-validating the limit. Another request could have taken the available slot in the meantime, leading to more requests being processed than allowed.
To fix this and simplify the logic, the check and the action (appending the timestamp) should be atomic. This can be achieved by refactoring the logic into a single loop within before_model_callback and removing the separate _wait_for_rate_limit method. The proposed suggestion consolidates the logic, making it both correct and easier to understand.
async def before_model_callback(
self, *, callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]:
"""Check and enforce rate limits before each LLM request.
This callback is invoked before every LLM request. It checks whether
the request would exceed the configured global rate limit across all models.
If so, it blocks (waits) until the rate limit allows the request.
Args:
callback_context: Context containing agent, user, and session information.
llm_request: The LLM request that is about to be sent.
Returns:
None to allow the request to proceed (after waiting if necessary).
"""
while True:
async with self._lock:
current_time = time.time()
self._request_timestamps = self._clean_old_timestamps(
self._request_timestamps, current_time
)
if len(self._request_timestamps) < self.max_requests:
self._request_timestamps.append(current_time)
return None # Allow request to proceed
# Rate limit is active, calculate necessary wait time.
# Timestamps are sorted, so the oldest is at the front.
oldest_timestamp = self._request_timestamps[0]
wait_seconds = 60.0 - (current_time - oldest_timestamp) + 0.1
# Wait outside the lock to avoid blocking other coroutines.
if wait_seconds > 0:
await asyncio.sleep(wait_seconds)
else:
# A small sleep to prevent a tight loop if wait_seconds is negative.
await asyncio.sleep(0.01)
Summary
This PR adds a new
RateLimitPluginthat enforces global rate limiting across all LLM models using a sliding window algorithm.Features
Implementation Details
New files:
src/google/adk/plugins/rate_limit_plugin.py- Main plugin implementationtests/unittests/plugins/test_rate_limit_plugin.py- Comprehensive test suite (7 tests)Modified files:
src/google/adk/plugins/__init__.py- Exported RateLimitPlugin in public APIUsage Example
Test Coverage
All 7 tests pass successfully:
Test Plan
pytest tests/unittests/plugins/test_rate_limit_plugin.py -vfrom google.adk.plugins import RateLimitPluginisortandpyinkRelated Issues
Implements global rate limiting for LLM requests to prevent quota exhaustion and ensure fair resource usage.