-
Notifications
You must be signed in to change notification settings - Fork 309
Add support for min_idle_time in Redis StreamSub and XAUTOCLAIM
#2607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Lancetnik
merged 4 commits into
ag2ai:main
from
powersemmi:feat/add-redis-xautoclaim-support
Oct 28, 2025
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
72265af
Add support for `min_idle_time` in Redis StreamSub and XAUTOCLAIM
powersemmi 7dfbeb1
Add documentation for Redis Stream message claiming and `min_idle_tim…
powersemmi e588642
fix review
powersemmi f5d2d0c
Replace br.publish with br._connection.xadd
Lancetnik File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| --- | ||
| # 0.5 - API | ||
| # 2 - Release | ||
| # 3 - Contributing | ||
| # 5 - Template Page | ||
| # 10 - Default | ||
| search: | ||
| boost: 10 | ||
| --- | ||
|
|
||
| # Redis Stream Message Claiming | ||
|
|
||
| When working with Redis Stream Consumer Groups, there may be situations where messages remain in a pending state because a consumer failed to process them. FastStream provides a mechanism to automatically claim these pending messages using Redis's `XAUTOCLAIM` command through the `min_idle_time` parameter. | ||
|
|
||
| ## What is Message Claiming? | ||
|
|
||
| In Redis Streams, when a consumer reads a message from a consumer group but fails to acknowledge it (due to a crash, network issue, or processing error), the message remains in the [**Pending Entries List (PEL)**](https://redis.io/docs/latest/develop/data-types/streams/#working-with-multiple-consumer-groups) of that consumer group. These unacknowledged messages are associated with the original consumer and have an "idle time" - the duration since they were last delivered. | ||
|
|
||
| Message claiming allows another consumer to take ownership of these pending messages that have been idle for too long, ensuring that messages don't get stuck and workload can be redistributed among healthy consumers. | ||
|
|
||
| ## Using `min_idle_time` for Automatic Claiming | ||
|
|
||
| FastStream's `StreamSub` provides a `min_idle_time` parameter that enables automatic claiming of pending messages via Redis's `XAUTOCLAIM` command. When set, the consumer will automatically scan for and claim messages that have been pending for at least the specified duration (in milliseconds). | ||
|
|
||
| ### Basic Example | ||
|
|
||
| Here's a simple example that demonstrates automatic message claiming: | ||
|
|
||
| ```python linenums="1" | ||
| {! docs_src/redis/stream/claiming_basic.py !} | ||
| ``` | ||
|
|
||
| ## How It Works | ||
|
|
||
| When `min_idle_time` is set: | ||
|
|
||
| 1. **Circular Scanning**: Instead of using `XREADGROUP` to read new messages, the consumer uses `XAUTOCLAIM` to scan the Pending Entries List | ||
| 2. **Idle Time Check**: Only messages that have been pending for at least `min_idle_time` milliseconds are claimed | ||
| 3. **Ownership Transfer**: Claimed messages are automatically transferred from the failing consumer to the claiming consumer | ||
| 4. **Continuous Processing**: The scanning process is circular - after reaching the end of the [PEL](https://redis.io/docs/latest/develop/data-types/streams/#working-with-multiple-consumer-groups), it starts over from the beginning | ||
|
|
||
| ### Practical Use Case | ||
|
|
||
| Consider a scenario where you have multiple workers processing orders: | ||
|
|
||
| ```python linenums="1" | ||
| from faststream import FastStream | ||
| from faststream.redis import RedisBroker, StreamSub | ||
|
|
||
| broker = RedisBroker() | ||
| app = FastStream(broker) | ||
|
|
||
| # Worker that might fail | ||
| @broker.subscriber( | ||
| stream=StreamSub( | ||
| "orders", | ||
| group="order-processors", | ||
| consumer="worker-1", | ||
| ) | ||
| ) | ||
| async def worker_that_might_fail(order_id: str): | ||
| # Process order - might crash before acknowledging | ||
| await process_complex_order(order_id) | ||
| # If crash happens here, message stays pending | ||
|
|
||
| # Backup worker with message claiming | ||
| @broker.subscriber( | ||
| stream=StreamSub( | ||
| "orders", | ||
| group="order-processors", | ||
| consumer="worker-2", | ||
| min_idle_time=10000, # 10 seconds | ||
| ) | ||
| ) | ||
| async def backup_worker(order_id: str): | ||
| # This worker will automatically pick up messages | ||
| # that worker-1 failed to process within 10 seconds | ||
| print(f"Recovering and processing order: {order_id}") | ||
| await process_complex_order(order_id) | ||
| ``` | ||
|
|
||
| ## Combining with Manual Acknowledgment | ||
|
|
||
| You can combine `min_idle_time` with manual acknowledgment policies for fine-grained control: | ||
|
|
||
| ```python linenums="1" | ||
| {! docs_src/redis/stream/claiming_manual_ack.py !} | ||
| ``` | ||
|
|
||
| ## Configuration Guidelines | ||
|
|
||
| ### Choosing `min_idle_time` | ||
|
|
||
| The appropriate `min_idle_time` value depends on your use case: | ||
|
|
||
| - **Short duration (1-5 seconds)**: For fast-processing tasks where quick failure recovery is needed | ||
| - **Medium duration (10-60 seconds)**: For most general-purpose applications with moderate processing times | ||
| - **Long duration (5-30 minutes)**: For long-running tasks where you want to ensure a consumer has truly failed | ||
|
|
||
| !!! warning | ||
| Setting `min_idle_time` too low may cause messages to be unnecessarily transferred between healthy consumers. Set it based on your typical message processing time plus a safety buffer. | ||
|
|
||
| ### Deployment Patterns | ||
|
|
||
| #### Pattern 1: Dedicated Claiming Worker | ||
| Deploy a separate worker specifically for claiming abandoned messages: | ||
|
|
||
| ```python | ||
| # Main workers (fast path) | ||
| @broker.subscriber( | ||
| stream=StreamSub("tasks", group="workers", consumer="main-1") | ||
| ) | ||
| async def main_worker(task): ... | ||
|
|
||
| # Claiming worker (recovery path) | ||
| @broker.subscriber( | ||
| stream=StreamSub("tasks", group="workers", consumer="claimer", min_idle_time=15000) | ||
| ) | ||
| async def claiming_worker(task): ... | ||
| ``` | ||
|
|
||
| #### Pattern 2: All Workers Can Claim | ||
| All workers can claim messages from each other: | ||
|
|
||
| ```python | ||
| # Each worker can both process new messages and claim abandoned ones | ||
| @broker.subscriber( | ||
| stream=StreamSub( | ||
| "tasks", | ||
| group="workers", | ||
| consumer=f"worker-{instance_id}", | ||
| min_idle_time=10000, | ||
| ) | ||
| ) | ||
| async def worker(task): ... | ||
| ``` | ||
|
|
||
| ## Technical Details | ||
|
|
||
| - **Start ID**: FastStream automatically manages the `start_id` parameter for `XAUTOCLAIM`, enabling circular scanning through the Pending Entries List | ||
| - **Empty Results**: When no pending messages meet the idle time criteria, the consumer will continue polling | ||
| - **ACK Handling**: Claimed messages must still be acknowledged using `msg.ack()` to be removed from the [PEL](https://redis.io/docs/latest/develop/data-types/streams/#working-with-multiple-consumer-groups) | ||
|
|
||
| ## References | ||
|
|
||
| For more information about Redis Streams message claiming: | ||
|
|
||
| - [Redis XAUTOCLAIM Documentation](https://redis.io/docs/latest/commands/xautoclaim/){.external-link target="_blank"} | ||
| - [Redis Streams Claiming Guide](https://redis.io/docs/latest/develop/data-types/streams/#claiming-and-the-delivery-counter){.external-link target="_blank"} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| from faststream import FastStream, Logger | ||
| from faststream.redis import RedisBroker, StreamSub | ||
|
|
||
| broker = RedisBroker() | ||
| app = FastStream(broker) | ||
|
|
||
|
|
||
| @broker.subscriber( | ||
| stream=StreamSub( | ||
| "orders", | ||
| group="processors", | ||
| consumer="worker-1", | ||
| min_idle_time=5000, # Claim messages idle for 5+ seconds | ||
| ) | ||
| ) | ||
| async def handle(order_id: str, logger: Logger): | ||
| logger.info(f"Processing order: {order_id}") | ||
|
|
||
|
|
||
| @app.after_startup | ||
| async def publish_test(): | ||
| await broker.publish("order-123", stream="orders") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| from faststream import AckPolicy, FastStream, Logger | ||
| from faststream.redis import RedisBroker, RedisStreamMessage, StreamSub, Redis | ||
|
|
||
| broker = RedisBroker() | ||
| app = FastStream(broker) | ||
|
|
||
|
|
||
| @broker.subscriber( | ||
| stream=StreamSub( | ||
| "critical-tasks", | ||
| group="task-workers", | ||
| consumer="worker-failover", | ||
| min_idle_time=30000, # 30 seconds | ||
| ), | ||
| ack_policy=AckPolicy.MANUAL, | ||
| ) | ||
| async def handle(msg: RedisStreamMessage, logger: Logger, redis: Redis): | ||
| try: | ||
| # Process the claimed message | ||
| logger.info(f"Processing: {msg.body}") | ||
| # Explicitly acknowledge after successful processing | ||
| await msg.ack(redis=redis, group="critical-tasks") | ||
| except Exception as e: | ||
| # Don't acknowledge - let it be claimed by another consumer | ||
| logger.error(f"Failed to process: {e}") | ||
|
|
||
|
|
||
| @app.after_startup | ||
| async def publish_test(): | ||
| await broker.publish("critical-task-1", stream="critical-tasks") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.