Skip to content

Commit ce2b3f8

Browse files
Add support for min_idle_time in Redis StreamSub and XAUTOCLAIM (#2607)
* Add support for `min_idle_time` in Redis StreamSub and XAUTOCLAIM Signed-off-by: Victor Maleca <powersemmi@gmail.com> * Add documentation for Redis Stream message claiming and `min_idle_time` usage Signed-off-by: Victor Maleca <powersemmi@gmail.com> * fix review Signed-off-by: Victor Maleca <powersemmi@gmail.com> * Replace br.publish with br._connection.xadd --------- Signed-off-by: Victor Maleca <powersemmi@gmail.com> Co-authored-by: Pastukhov Nikita <nikita@pastukhov-dev.ru>
1 parent 1354faa commit ce2b3f8

File tree

13 files changed

+749
-46
lines changed

13 files changed

+749
-46
lines changed

docs/docs/en/redis/index.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ search:
2222

2323
#### 1. Pub/Sub
2424

25-
[**Redis Pub/Sub**](https://redis.io/docs/interact/pubsub/){.external-link target="_blank"} implements the Publish/Subscribe messaging paradigm where senders (publishers) are not programmed to send their messages to specific receivers (subscribers). Instead, published messages are characterized into channels, without knowledge of what (if any) subscribers there may be.
25+
[**Redis Pub/Sub**](https://redis.io/docs/latest/develop/pubsub/){.external-link target="_blank"} implements the Publish/Subscribe messaging paradigm where senders (publishers) are not programmed to send their messages to specific receivers (subscribers). Instead, published messages are characterized into channels, without knowledge of what (if any) subscribers there may be.
2626

2727
#### 2. List
2828

29-
In contrast, [**Redis List**](https://redis.io/docs/data-types/lists/#common-use-cases-for-lists){.external-link target="_blank"} capitalizes on a straightforward list data structure. Messages, pushed by producers, form a first-in, first-out (FIFO) queue. Consumers, in turn, retrieve messages from this ordered list, providing a simplified mechanism for sequential message processing.
29+
In contrast, [**Redis List**](https://redis.io/docs/latest/develop/data-types/lists/#common-use-cases-for-lists){.external-link target="_blank"} capitalizes on a straightforward list data structure. Messages, pushed by producers, form a first-in, first-out (FIFO) queue. Consumers, in turn, retrieve messages from this ordered list, providing a simplified mechanism for sequential message processing.
3030

3131
#### 3. Streams
3232

33-
[**Redis Streams**](https://redis.io/docs/data-types/streams/){.external-link target="_blank"} introduce a more advanced concept, embracing an append-only log-like structure. Messages, organized as entries, allow for nuanced features like consumer groups, enabling parallel processing, and acknowledgment for precise message handling. Streams excel in scenarios demanding scalability, persistence, and ordered message processing.
33+
[**Redis Streams**](https://redis.io/docs/latest/develop/data-types/streams/){.external-link target="_blank"} introduce a more advanced concept, embracing an append-only log-like structure. Messages, organized as entries, allow for nuanced features like consumer groups, enabling parallel processing, and acknowledgment for precise message handling. Streams excel in scenarios demanding scalability, persistence, and ordered message processing.
3434

3535
Ultimately, the choice between **Pub/Sub**, **List**, or **Streams** hinges on the specific needs of the application. Redis Pub/Sub suits real-time communication, List offers simplicity in ordered processing, while Streams cater to complex, scalable, and ordered message handling, each providing tailored solutions based on distinct use case requirements.
3636

@@ -58,4 +58,4 @@ Here's a simplified code example demonstrating how to establish a connection to
5858

5959
This minimal example illustrates how **FastStream** simplifies the process of connecting to **Redis** and performing basic message processing from the *in-channel* to the *out-channel*. Depending on your specific use case and requirements, you can further customize your **Redis** integration with **FastStream** to build efficient and responsive applications.
6060

61-
For more advanced configuration options and detailed usage instructions, please refer to the **FastStream Redis** documentation and the [official Redis documentation](https://redis.io/documentation){.external-link target="_blank"}.
61+
For more advanced configuration options and detailed usage instructions, please refer to the **FastStream Redis** documentation and the [official Redis documentation](https://redis.io/docs/latest/){.external-link target="_blank"}.

docs/docs/en/redis/list/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@ search:
1010

1111
# Redis Lists
1212

13-
[Redis Lists](https://redis.io/docs/data-types/lists/){.external-link target="_blank"} are a simple and flexible data structure that function as ordered collections of strings. They are similar to lists in programming languages, and **Redis** provides commands to perform a variety of operations such as adding, retrieving, and removing elements from either end of the list.
13+
[Redis Lists](https://redis.io/docs/latest/develop/data-types/lists/){.external-link target="_blank"} are a simple and flexible data structure that function as ordered collections of strings. They are similar to lists in programming languages, and **Redis** provides commands to perform a variety of operations such as adding, retrieving, and removing elements from either end of the list.
1414

1515
**Redis Lists** are particularly useful for scenarios such as implementing queues, effectively using the list as a **FIFO** (First-In-First-Out) structure.

docs/docs/en/redis/pubsub/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ search:
1010

1111
# Redis Channels
1212

13-
[**Redis Pub/Sub Channels**](https://redis.io/docs/interact/pubsub/){.external-link target="_blank"} are a feature of **Redis** that enables messaging between clients through a publish/subscribe (pub/sub) pattern. A **Redis** channel is essentially a medium through which messages are transmitted. Different clients can subscribe to these channels to listen for messages, while other clients can publish messages to these channels.
13+
[**Redis Pub/Sub Channels**](https://redis.io/docs/latest/develop/pubsub/){.external-link target="_blank"} are a feature of **Redis** that enables messaging between clients through a publish/subscribe (pub/sub) pattern. A **Redis** channel is essentially a medium through which messages are transmitted. Different clients can subscribe to these channels to listen for messages, while other clients can publish messages to these channels.
1414

1515
When a message is published to a **Redis** channel, all subscribers to that channel receive the message instantly. This makes **Redis** channels suitable for a variety of real-time applications such as chat rooms, notifications, live updates, and many more use cases where messages must be broadcast promptly to multiple clients.
1616

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
---
2+
# 0.5 - API
3+
# 2 - Release
4+
# 3 - Contributing
5+
# 5 - Template Page
6+
# 10 - Default
7+
search:
8+
boost: 10
9+
---
10+
11+
# Redis Stream Message Claiming
12+
13+
When working with Redis Stream Consumer Groups, there may be situations where messages remain in a pending state because a consumer failed to process them. FastStream provides a mechanism to automatically claim these pending messages using Redis's `XAUTOCLAIM` command through the `min_idle_time` parameter.
14+
15+
## What is Message Claiming?
16+
17+
In Redis Streams, when a consumer reads a message from a consumer group but fails to acknowledge it (due to a crash, network issue, or processing error), the message remains in the [**Pending Entries List (PEL)**](https://redis.io/docs/latest/develop/data-types/streams/#working-with-multiple-consumer-groups) of that consumer group. These unacknowledged messages are associated with the original consumer and have an "idle time" - the duration since they were last delivered.
18+
19+
Message claiming allows another consumer to take ownership of these pending messages that have been idle for too long, ensuring that messages don't get stuck and workload can be redistributed among healthy consumers.
20+
21+
## Using `min_idle_time` for Automatic Claiming
22+
23+
FastStream's `StreamSub` provides a `min_idle_time` parameter that enables automatic claiming of pending messages via Redis's `XAUTOCLAIM` command. When set, the consumer will automatically scan for and claim messages that have been pending for at least the specified duration (in milliseconds).
24+
25+
### Basic Example
26+
27+
Here's a simple example that demonstrates automatic message claiming:
28+
29+
```python linenums="1"
30+
{! docs_src/redis/stream/claiming_basic.py !}
31+
```
32+
33+
## How It Works
34+
35+
When `min_idle_time` is set:
36+
37+
1. **Circular Scanning**: Instead of using `XREADGROUP` to read new messages, the consumer uses `XAUTOCLAIM` to scan the Pending Entries List
38+
2. **Idle Time Check**: Only messages that have been pending for at least `min_idle_time` milliseconds are claimed
39+
3. **Ownership Transfer**: Claimed messages are automatically transferred from the failing consumer to the claiming consumer
40+
4. **Continuous Processing**: The scanning process is circular - after reaching the end of the [PEL](https://redis.io/docs/latest/develop/data-types/streams/#working-with-multiple-consumer-groups), it starts over from the beginning
41+
42+
### Practical Use Case
43+
44+
Consider a scenario where you have multiple workers processing orders:
45+
46+
```python linenums="1"
47+
from faststream import FastStream
48+
from faststream.redis import RedisBroker, StreamSub
49+
50+
broker = RedisBroker()
51+
app = FastStream(broker)
52+
53+
# Worker that might fail
54+
@broker.subscriber(
55+
stream=StreamSub(
56+
"orders",
57+
group="order-processors",
58+
consumer="worker-1",
59+
)
60+
)
61+
async def worker_that_might_fail(order_id: str):
62+
# Process order - might crash before acknowledging
63+
await process_complex_order(order_id)
64+
# If crash happens here, message stays pending
65+
66+
# Backup worker with message claiming
67+
@broker.subscriber(
68+
stream=StreamSub(
69+
"orders",
70+
group="order-processors",
71+
consumer="worker-2",
72+
min_idle_time=10000, # 10 seconds
73+
)
74+
)
75+
async def backup_worker(order_id: str):
76+
# This worker will automatically pick up messages
77+
# that worker-1 failed to process within 10 seconds
78+
print(f"Recovering and processing order: {order_id}")
79+
await process_complex_order(order_id)
80+
```
81+
82+
## Combining with Manual Acknowledgment
83+
84+
You can combine `min_idle_time` with manual acknowledgment policies for fine-grained control:
85+
86+
```python linenums="1"
87+
{! docs_src/redis/stream/claiming_manual_ack.py !}
88+
```
89+
90+
## Configuration Guidelines
91+
92+
### Choosing `min_idle_time`
93+
94+
The appropriate `min_idle_time` value depends on your use case:
95+
96+
- **Short duration (1-5 seconds)**: For fast-processing tasks where quick failure recovery is needed
97+
- **Medium duration (10-60 seconds)**: For most general-purpose applications with moderate processing times
98+
- **Long duration (5-30 minutes)**: For long-running tasks where you want to ensure a consumer has truly failed
99+
100+
!!! warning
101+
Setting `min_idle_time` too low may cause messages to be unnecessarily transferred between healthy consumers. Set it based on your typical message processing time plus a safety buffer.
102+
103+
### Deployment Patterns
104+
105+
#### Pattern 1: Dedicated Claiming Worker
106+
Deploy a separate worker specifically for claiming abandoned messages:
107+
108+
```python
109+
# Main workers (fast path)
110+
@broker.subscriber(
111+
stream=StreamSub("tasks", group="workers", consumer="main-1")
112+
)
113+
async def main_worker(task): ...
114+
115+
# Claiming worker (recovery path)
116+
@broker.subscriber(
117+
stream=StreamSub("tasks", group="workers", consumer="claimer", min_idle_time=15000)
118+
)
119+
async def claiming_worker(task): ...
120+
```
121+
122+
#### Pattern 2: All Workers Can Claim
123+
All workers can claim messages from each other:
124+
125+
```python
126+
# Each worker can both process new messages and claim abandoned ones
127+
@broker.subscriber(
128+
stream=StreamSub(
129+
"tasks",
130+
group="workers",
131+
consumer=f"worker-{instance_id}",
132+
min_idle_time=10000,
133+
)
134+
)
135+
async def worker(task): ...
136+
```
137+
138+
## Technical Details
139+
140+
- **Start ID**: FastStream automatically manages the `start_id` parameter for `XAUTOCLAIM`, enabling circular scanning through the Pending Entries List
141+
- **Empty Results**: When no pending messages meet the idle time criteria, the consumer will continue polling
142+
- **ACK Handling**: Claimed messages must still be acknowledged using `msg.ack()` to be removed from the [PEL](https://redis.io/docs/latest/develop/data-types/streams/#working-with-multiple-consumer-groups)
143+
144+
## References
145+
146+
For more information about Redis Streams message claiming:
147+
148+
- [Redis XAUTOCLAIM Documentation](https://redis.io/docs/latest/commands/xautoclaim/){.external-link target="_blank"}
149+
- [Redis Streams Claiming Guide](https://redis.io/docs/latest/develop/data-types/streams/#claiming-and-the-delivery-counter){.external-link target="_blank"}

docs/docs/en/redis/streams/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ search:
1010

1111
# Redis Streams
1212

13-
[Redis Streams](https://redis.io/docs/data-types/streams/){.external-link target="_blank"} are a data structure introduced in **Redis 5.0** that offer a reliable and highly scalable way to handle streams of data. They are similar to logging systems like **Apache Kafka**, where data is stored in a log structure and can be consumed by multiple clients. **Streams** provide a sequence of ordered messages, and they are designed to handle a high volume of data by allowing partitioning and multiple consumers.
13+
[Redis Streams](https://redis.io/docs/latest/develop/tools/insight/tutorials/insight-stream-consumer/){.external-link target="_blank"} are a data structure introduced in **Redis 5.0** that offer a reliable and highly scalable way to handle streams of data. They are similar to logging systems like **Apache Kafka**, where data is stored in a log structure and can be consumed by multiple clients. **Streams** provide a sequence of ordered messages, and they are designed to handle a high volume of data by allowing partitioning and multiple consumers.
1414

1515
A **Redis Stream** is a collection of entries, each having an ID (which includes a timestamp) and a set of key-value pairs representing the message data. Clients can add to a stream by generating a new entry and can read from a stream to consume its messages.
1616

docs/docs/navigation_template.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ search:
117117
- [Groups](redis/streams/groups.md)
118118
- [Batching](redis/streams/batch.md)
119119
- [Acknowledgement](redis/streams/ack.md)
120+
- [Claiming](redis/streams/claiming.md)
120121
- [RPC](redis/rpc.md)
121122
- [Pipeline](redis/pipeline.md)
122123
- [Message Information](redis/message.md)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from faststream import FastStream, Logger
2+
from faststream.redis import RedisBroker, StreamSub
3+
4+
broker = RedisBroker()
5+
app = FastStream(broker)
6+
7+
8+
@broker.subscriber(
9+
stream=StreamSub(
10+
"orders",
11+
group="processors",
12+
consumer="worker-1",
13+
min_idle_time=5000, # Claim messages idle for 5+ seconds
14+
)
15+
)
16+
async def handle(order_id: str, logger: Logger):
17+
logger.info(f"Processing order: {order_id}")
18+
19+
20+
@app.after_startup
21+
async def publish_test():
22+
await broker.publish("order-123", stream="orders")
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from faststream import AckPolicy, FastStream, Logger
2+
from faststream.redis import RedisBroker, RedisStreamMessage, StreamSub, Redis
3+
4+
broker = RedisBroker()
5+
app = FastStream(broker)
6+
7+
8+
@broker.subscriber(
9+
stream=StreamSub(
10+
"critical-tasks",
11+
group="task-workers",
12+
consumer="worker-failover",
13+
min_idle_time=30000, # 30 seconds
14+
),
15+
ack_policy=AckPolicy.MANUAL,
16+
)
17+
async def handle(msg: RedisStreamMessage, logger: Logger, redis: Redis):
18+
try:
19+
# Process the claimed message
20+
logger.info(f"Processing: {msg.body}")
21+
# Explicitly acknowledge after successful processing
22+
await msg.ack(redis=redis, group="critical-tasks")
23+
except Exception as e:
24+
# Don't acknowledge - let it be claimed by another consumer
25+
logger.error(f"Failed to process: {e}")
26+
27+
28+
@app.after_startup
29+
async def publish_test():
30+
await broker.publish("critical-task-1", stream="critical-tasks")

faststream/redis/schemas/stream_sub.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ class StreamSub(NameRequired):
3333
https://redis.io/docs/latest/commands/xreadgroup/#differences-between-xread-and-xreadgroup
3434
polling_interval:
3535
Polling interval in seconds.
36+
min_idle_time:
37+
Minimum idle time in milliseconds for a message to be eligible for claiming via XAUTOCLAIM.
38+
Messages that have been pending (unacknowledged) for at least this duration can be
39+
reclaimed by this consumer. Only applicable when using consumer groups.
40+
41+
https://redis.io/docs/latest/commands/xautoclaim/
3642
"""
3743

3844
__slots__ = (
@@ -42,6 +48,7 @@ class StreamSub(NameRequired):
4248
"last_id",
4349
"max_records",
4450
"maxlen",
51+
"min_idle_time",
4552
"name",
4653
"no_ack",
4754
"polling_interval",
@@ -58,11 +65,15 @@ def __init__(
5865
last_id: str | None = None,
5966
maxlen: int | None = None,
6067
max_records: int | None = None,
68+
min_idle_time: int | None = None,
6169
) -> None:
6270
if (group and not consumer) or (not group and consumer):
6371
msg = "You should specify `group` and `consumer` both"
6472
raise SetupError(msg)
6573

74+
if last_id is None:
75+
last_id = ">" if group and consumer else "$"
76+
6677
if group and consumer:
6778
if last_id != ">":
6879
if polling_interval:
@@ -86,9 +97,6 @@ def __init__(
8697
stacklevel=1,
8798
)
8899

89-
if last_id is None:
90-
last_id = ">" if group and consumer else "$"
91-
92100
super().__init__(stream)
93101

94102
self.group = group
@@ -99,6 +107,7 @@ def __init__(
99107
self.last_id = last_id
100108
self.maxlen = maxlen
101109
self.max_records = max_records
110+
self.min_idle_time = min_idle_time
102111

103112
def add_prefix(self, prefix: str) -> "StreamSub":
104113
new_stream = deepcopy(self)

0 commit comments

Comments
 (0)