-
Notifications
You must be signed in to change notification settings - Fork 2.2k
feat(plugins): Add RateLimitPlugin for global request rate limiting #3408
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Jacksunwei
wants to merge
1
commit into
main
Choose a base branch
from
feat/rate-limit-plugin
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+404
−0
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,163 @@ | ||
| # Copyright 2025 Google LLC | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import time | ||
| from typing import Optional | ||
|
|
||
| from ..agents.callback_context import CallbackContext | ||
| from ..models.llm_request import LlmRequest | ||
| from ..models.llm_response import LlmResponse | ||
| from .base_plugin import BasePlugin | ||
|
|
||
|
|
||
| class RateLimitPlugin(BasePlugin): | ||
| """Plugin that enforces global rate limiting on LLM requests. | ||
|
|
||
| This plugin implements a sliding window rate limiter that restricts the | ||
| total number of LLM requests across all models within a one-minute window. | ||
| When the rate limit is exceeded, the plugin blocks (waits) until a slot | ||
| becomes available. | ||
|
|
||
| Example: | ||
| ```python | ||
| from google.adk import Agent, Runner | ||
| from google.adk.plugins.rate_limit_plugin import RateLimitPlugin | ||
|
|
||
| agent = Agent(name="assistant", model="gemini-2.5-flash", ...) | ||
|
|
||
| runner = Runner( | ||
| agents=[agent], | ||
| plugins=[ | ||
| RateLimitPlugin(max_requests_per_minute=15) | ||
| ] | ||
| ) | ||
| ``` | ||
|
|
||
| Attributes: | ||
| max_requests_per_minute: Maximum number of requests allowed per minute | ||
| globally across all models. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| max_requests_per_minute: int = 15, | ||
| name: str = 'rate_limit_plugin', | ||
| ): | ||
| """Initialize the rate limit plugin. | ||
|
|
||
| Args: | ||
| max_requests_per_minute: Maximum requests allowed per minute globally. | ||
| name: Name of the plugin instance. | ||
| """ | ||
| super().__init__(name) | ||
| self.max_requests = max_requests_per_minute | ||
|
|
||
| # Track request timestamps globally (all models) | ||
| # List of timestamps (in seconds since epoch) | ||
| self._request_timestamps: list[float] = [] | ||
|
|
||
| # Lock for thread-safe access to timestamps | ||
| self._lock = asyncio.Lock() | ||
|
|
||
| def _clean_old_timestamps( | ||
| self, timestamps: list[float], current_time: float | ||
| ) -> list[float]: | ||
| """Remove timestamps older than 1 minute from the tracking list. | ||
|
|
||
| Args: | ||
| timestamps: List of request timestamps. | ||
| current_time: Current time in seconds since epoch. | ||
|
|
||
| Returns: | ||
| Filtered list containing only timestamps from the last minute. | ||
| """ | ||
| # Keep only timestamps within the last 60 seconds | ||
| cutoff_time = current_time - 60.0 | ||
| return [ts for ts in timestamps if ts > cutoff_time] | ||
|
|
||
| async def _wait_for_rate_limit(self, current_time: float) -> None: | ||
| """Wait until a request slot becomes available. | ||
|
|
||
| Args: | ||
| current_time: Current time in seconds since epoch. | ||
| """ | ||
| while True: | ||
| async with self._lock: | ||
| timestamps = self._clean_old_timestamps( | ||
| self._request_timestamps, time.time() | ||
| ) | ||
| self._request_timestamps = timestamps | ||
|
|
||
| if len(timestamps) < self.max_requests: | ||
| # Slot available, exit loop | ||
| return | ||
|
|
||
| # Calculate wait time until the oldest request falls outside the window | ||
| oldest_timestamp = timestamps[0] | ||
| wait_seconds = 60.0 - (time.time() - oldest_timestamp) + 0.1 | ||
|
|
||
| # Wait outside the lock to allow other operations | ||
| if wait_seconds > 0: | ||
| await asyncio.sleep(wait_seconds) | ||
| else: | ||
| # Re-check immediately | ||
| await asyncio.sleep(0.01) | ||
|
|
||
| async def before_model_callback( | ||
| self, *, callback_context: CallbackContext, llm_request: LlmRequest | ||
| ) -> Optional[LlmResponse]: | ||
| """Check and enforce rate limits before each LLM request. | ||
|
|
||
| This callback is invoked before every LLM request. It checks whether | ||
| the request would exceed the configured global rate limit across all models. | ||
| If so, it blocks (waits) until the rate limit allows the request. | ||
|
|
||
| Args: | ||
| callback_context: Context containing agent, user, and session information. | ||
| llm_request: The LLM request that is about to be sent. | ||
|
|
||
| Returns: | ||
| None to allow the request to proceed (after waiting if necessary). | ||
| """ | ||
| current_time = time.time() | ||
|
|
||
| async with self._lock: | ||
| # Clean old timestamps | ||
| timestamps = self._clean_old_timestamps( | ||
| self._request_timestamps, current_time | ||
| ) | ||
| self._request_timestamps = timestamps | ||
|
|
||
| # Check if rate limit would be exceeded | ||
| if len(timestamps) >= self.max_requests: | ||
| # Need to wait | ||
| pass | ||
| else: | ||
| # Slot available, record and proceed | ||
| self._request_timestamps.append(current_time) | ||
| return None | ||
|
|
||
| # Wait for availability if limit exceeded | ||
| await self._wait_for_rate_limit(current_time) | ||
|
|
||
| # Record this request after waiting | ||
| async with self._lock: | ||
| current_time = time.time() | ||
| self._request_timestamps.append(current_time) | ||
|
|
||
| # Allow request to proceed | ||
| return None | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation of the rate limiter has a critical race condition that can cause the rate limit to be exceeded under concurrent load. The
before_model_callbackmethod checks the limit, but if it's exceeded, it releases the lock before waiting. After waiting, it re-acquires the lock and appends a timestamp without re-validating the limit. Another request could have taken the available slot in the meantime, leading to more requests being processed than allowed.To fix this and simplify the logic, the check and the action (appending the timestamp) should be atomic. This can be achieved by refactoring the logic into a single loop within
before_model_callbackand removing the separate_wait_for_rate_limitmethod. The proposed suggestion consolidates the logic, making it both correct and easier to understand.