Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/docs/en/redis/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ search:

#### 1. Pub/Sub

[**Redis Pub/Sub**](https://redis.io/docs/interact/pubsub/){.external-link target="_blank"} implements the Publish/Subscribe messaging paradigm where senders (publishers) are not programmed to send their messages to specific receivers (subscribers). Instead, published messages are characterized into channels, without knowledge of what (if any) subscribers there may be.
[**Redis Pub/Sub**](https://redis.io/docs/latest/develop/pubsub/){.external-link target="_blank"} implements the Publish/Subscribe messaging paradigm where senders (publishers) are not programmed to send their messages to specific receivers (subscribers). Instead, published messages are characterized into channels, without knowledge of what (if any) subscribers there may be.

#### 2. List

In contrast, [**Redis List**](https://redis.io/docs/data-types/lists/#common-use-cases-for-lists){.external-link target="_blank"} capitalizes on a straightforward list data structure. Messages, pushed by producers, form a first-in, first-out (FIFO) queue. Consumers, in turn, retrieve messages from this ordered list, providing a simplified mechanism for sequential message processing.
In contrast, [**Redis List**](https://redis.io/docs/latest/develop/data-types/lists/#common-use-cases-for-lists){.external-link target="_blank"} capitalizes on a straightforward list data structure. Messages, pushed by producers, form a first-in, first-out (FIFO) queue. Consumers, in turn, retrieve messages from this ordered list, providing a simplified mechanism for sequential message processing.

#### 3. Streams

[**Redis Streams**](https://redis.io/docs/data-types/streams/){.external-link target="_blank"} introduce a more advanced concept, embracing an append-only log-like structure. Messages, organized as entries, allow for nuanced features like consumer groups, enabling parallel processing, and acknowledgment for precise message handling. Streams excel in scenarios demanding scalability, persistence, and ordered message processing.
[**Redis Streams**](https://redis.io/docs/latest/develop/data-types/streams/){.external-link target="_blank"} introduce a more advanced concept, embracing an append-only log-like structure. Messages, organized as entries, allow for nuanced features like consumer groups, enabling parallel processing, and acknowledgment for precise message handling. Streams excel in scenarios demanding scalability, persistence, and ordered message processing.

Ultimately, the choice between **Pub/Sub**, **List**, or **Streams** hinges on the specific needs of the application. Redis Pub/Sub suits real-time communication, List offers simplicity in ordered processing, while Streams cater to complex, scalable, and ordered message handling, each providing tailored solutions based on distinct use case requirements.

Expand Down Expand Up @@ -58,4 +58,4 @@ Here's a simplified code example demonstrating how to establish a connection to

This minimal example illustrates how **FastStream** simplifies the process of connecting to **Redis** and performing basic message processing from the *in-channel* to the *out-channel*. Depending on your specific use case and requirements, you can further customize your **Redis** integration with **FastStream** to build efficient and responsive applications.

For more advanced configuration options and detailed usage instructions, please refer to the **FastStream Redis** documentation and the [official Redis documentation](https://redis.io/documentation){.external-link target="_blank"}.
For more advanced configuration options and detailed usage instructions, please refer to the **FastStream Redis** documentation and the [official Redis documentation](https://redis.io/docs/latest/){.external-link target="_blank"}.
2 changes: 1 addition & 1 deletion docs/docs/en/redis/list/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ search:

# Redis Lists

[Redis Lists](https://redis.io/docs/data-types/lists/){.external-link target="_blank"} are a simple and flexible data structure that function as ordered collections of strings. They are similar to lists in programming languages, and **Redis** provides commands to perform a variety of operations such as adding, retrieving, and removing elements from either end of the list.
[Redis Lists](https://redis.io/docs/latest/develop/data-types/lists/){.external-link target="_blank"} are a simple and flexible data structure that function as ordered collections of strings. They are similar to lists in programming languages, and **Redis** provides commands to perform a variety of operations such as adding, retrieving, and removing elements from either end of the list.

**Redis Lists** are particularly useful for scenarios such as implementing queues, effectively using the list as a **FIFO** (First-In-First-Out) structure.
2 changes: 1 addition & 1 deletion docs/docs/en/redis/pubsub/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ search:

# Redis Channels

[**Redis Pub/Sub Channels**](https://redis.io/docs/interact/pubsub/){.external-link target="_blank"} are a feature of **Redis** that enables messaging between clients through a publish/subscribe (pub/sub) pattern. A **Redis** channel is essentially a medium through which messages are transmitted. Different clients can subscribe to these channels to listen for messages, while other clients can publish messages to these channels.
[**Redis Pub/Sub Channels**](https://redis.io/docs/latest/develop/pubsub/){.external-link target="_blank"} are a feature of **Redis** that enables messaging between clients through a publish/subscribe (pub/sub) pattern. A **Redis** channel is essentially a medium through which messages are transmitted. Different clients can subscribe to these channels to listen for messages, while other clients can publish messages to these channels.

When a message is published to a **Redis** channel, all subscribers to that channel receive the message instantly. This makes **Redis** channels suitable for a variety of real-time applications such as chat rooms, notifications, live updates, and many more use cases where messages must be broadcast promptly to multiple clients.

Expand Down
149 changes: 149 additions & 0 deletions docs/docs/en/redis/streams/claiming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 10
---

# Redis Stream Message Claiming

When working with Redis Stream Consumer Groups, there may be situations where messages remain in a pending state because a consumer failed to process them. FastStream provides a mechanism to automatically claim these pending messages using Redis's `XAUTOCLAIM` command through the `min_idle_time` parameter.

## What is Message Claiming?

In Redis Streams, when a consumer reads a message from a consumer group but fails to acknowledge it (due to a crash, network issue, or processing error), the message remains in the [**Pending Entries List (PEL)**](https://redis.io/docs/latest/develop/data-types/streams/#working-with-multiple-consumer-groups) of that consumer group. These unacknowledged messages are associated with the original consumer and have an "idle time" - the duration since they were last delivered.

Message claiming allows another consumer to take ownership of these pending messages that have been idle for too long, ensuring that messages don't get stuck and workload can be redistributed among healthy consumers.

## Using `min_idle_time` for Automatic Claiming

FastStream's `StreamSub` provides a `min_idle_time` parameter that enables automatic claiming of pending messages via Redis's `XAUTOCLAIM` command. When set, the consumer will automatically scan for and claim messages that have been pending for at least the specified duration (in milliseconds).

### Basic Example

Here's a simple example that demonstrates automatic message claiming:

```python linenums="1"
{! docs_src/redis/stream/claiming_basic.py !}
```

## How It Works

When `min_idle_time` is set:

1. **Circular Scanning**: Instead of using `XREADGROUP` to read new messages, the consumer uses `XAUTOCLAIM` to scan the Pending Entries List
2. **Idle Time Check**: Only messages that have been pending for at least `min_idle_time` milliseconds are claimed
3. **Ownership Transfer**: Claimed messages are automatically transferred from the failing consumer to the claiming consumer
4. **Continuous Processing**: The scanning process is circular - after reaching the end of the [PEL](https://redis.io/docs/latest/develop/data-types/streams/#working-with-multiple-consumer-groups), it starts over from the beginning

### Practical Use Case

Consider a scenario where you have multiple workers processing orders:

```python linenums="1"
from faststream import FastStream
from faststream.redis import RedisBroker, StreamSub

broker = RedisBroker()
app = FastStream(broker)

# Worker that might fail
@broker.subscriber(
stream=StreamSub(
"orders",
group="order-processors",
consumer="worker-1",
)
)
async def worker_that_might_fail(order_id: str):
# Process order - might crash before acknowledging
await process_complex_order(order_id)
# If crash happens here, message stays pending

# Backup worker with message claiming
@broker.subscriber(
stream=StreamSub(
"orders",
group="order-processors",
consumer="worker-2",
min_idle_time=10000, # 10 seconds
)
)
async def backup_worker(order_id: str):
# This worker will automatically pick up messages
# that worker-1 failed to process within 10 seconds
print(f"Recovering and processing order: {order_id}")
await process_complex_order(order_id)
```

## Combining with Manual Acknowledgment

You can combine `min_idle_time` with manual acknowledgment policies for fine-grained control:

```python linenums="1"
{! docs_src/redis/stream/claiming_manual_ack.py !}
```

## Configuration Guidelines

### Choosing `min_idle_time`

The appropriate `min_idle_time` value depends on your use case:

- **Short duration (1-5 seconds)**: For fast-processing tasks where quick failure recovery is needed
- **Medium duration (10-60 seconds)**: For most general-purpose applications with moderate processing times
- **Long duration (5-30 minutes)**: For long-running tasks where you want to ensure a consumer has truly failed

!!! warning
Setting `min_idle_time` too low may cause messages to be unnecessarily transferred between healthy consumers. Set it based on your typical message processing time plus a safety buffer.

### Deployment Patterns

#### Pattern 1: Dedicated Claiming Worker
Deploy a separate worker specifically for claiming abandoned messages:

```python
# Main workers (fast path)
@broker.subscriber(
stream=StreamSub("tasks", group="workers", consumer="main-1")
)
async def main_worker(task): ...

# Claiming worker (recovery path)
@broker.subscriber(
stream=StreamSub("tasks", group="workers", consumer="claimer", min_idle_time=15000)
)
async def claiming_worker(task): ...
```

#### Pattern 2: All Workers Can Claim
All workers can claim messages from each other:

```python
# Each worker can both process new messages and claim abandoned ones
@broker.subscriber(
stream=StreamSub(
"tasks",
group="workers",
consumer=f"worker-{instance_id}",
min_idle_time=10000,
)
)
async def worker(task): ...
```

## Technical Details

- **Start ID**: FastStream automatically manages the `start_id` parameter for `XAUTOCLAIM`, enabling circular scanning through the Pending Entries List
- **Empty Results**: When no pending messages meet the idle time criteria, the consumer will continue polling
- **ACK Handling**: Claimed messages must still be acknowledged using `msg.ack()` to be removed from the [PEL](https://redis.io/docs/latest/develop/data-types/streams/#working-with-multiple-consumer-groups)

## References

For more information about Redis Streams message claiming:

- [Redis XAUTOCLAIM Documentation](https://redis.io/docs/latest/commands/xautoclaim/){.external-link target="_blank"}
- [Redis Streams Claiming Guide](https://redis.io/docs/latest/develop/data-types/streams/#claiming-and-the-delivery-counter){.external-link target="_blank"}
2 changes: 1 addition & 1 deletion docs/docs/en/redis/streams/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ search:

# Redis Streams

[Redis Streams](https://redis.io/docs/data-types/streams/){.external-link target="_blank"} are a data structure introduced in **Redis 5.0** that offer a reliable and highly scalable way to handle streams of data. They are similar to logging systems like **Apache Kafka**, where data is stored in a log structure and can be consumed by multiple clients. **Streams** provide a sequence of ordered messages, and they are designed to handle a high volume of data by allowing partitioning and multiple consumers.
[Redis Streams](https://redis.io/docs/latest/develop/tools/insight/tutorials/insight-stream-consumer/){.external-link target="_blank"} are a data structure introduced in **Redis 5.0** that offer a reliable and highly scalable way to handle streams of data. They are similar to logging systems like **Apache Kafka**, where data is stored in a log structure and can be consumed by multiple clients. **Streams** provide a sequence of ordered messages, and they are designed to handle a high volume of data by allowing partitioning and multiple consumers.

A **Redis Stream** is a collection of entries, each having an ID (which includes a timestamp) and a set of key-value pairs representing the message data. Clients can add to a stream by generating a new entry and can read from a stream to consume its messages.

Expand Down
1 change: 1 addition & 0 deletions docs/docs/navigation_template.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ search:
- [Groups](redis/streams/groups.md)
- [Batching](redis/streams/batch.md)
- [Acknowledgement](redis/streams/ack.md)
- [Claiming](redis/streams/claiming.md)
- [RPC](redis/rpc.md)
- [Pipeline](redis/pipeline.md)
- [Message Information](redis/message.md)
Expand Down
22 changes: 22 additions & 0 deletions docs/docs_src/redis/stream/claiming_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from faststream import FastStream, Logger
from faststream.redis import RedisBroker, StreamSub

broker = RedisBroker()
app = FastStream(broker)


@broker.subscriber(
stream=StreamSub(
"orders",
group="processors",
consumer="worker-1",
min_idle_time=5000, # Claim messages idle for 5+ seconds
)
)
async def handle(order_id: str, logger: Logger):
logger.info(f"Processing order: {order_id}")


@app.after_startup
async def publish_test():
await broker.publish("order-123", stream="orders")
30 changes: 30 additions & 0 deletions docs/docs_src/redis/stream/claiming_manual_ack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from faststream import AckPolicy, FastStream, Logger
from faststream.redis import RedisBroker, RedisStreamMessage, StreamSub, Redis

broker = RedisBroker()
app = FastStream(broker)


@broker.subscriber(
stream=StreamSub(
"critical-tasks",
group="task-workers",
consumer="worker-failover",
min_idle_time=30000, # 30 seconds
),
ack_policy=AckPolicy.MANUAL,
)
async def handle(msg: RedisStreamMessage, logger: Logger, redis: Redis):
try:
# Process the claimed message
logger.info(f"Processing: {msg.body}")
# Explicitly acknowledge after successful processing
await msg.ack(redis=redis, group="critical-tasks")
except Exception as e:
# Don't acknowledge - let it be claimed by another consumer
logger.error(f"Failed to process: {e}")


@app.after_startup
async def publish_test():
await broker.publish("critical-task-1", stream="critical-tasks")
15 changes: 12 additions & 3 deletions faststream/redis/schemas/stream_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ class StreamSub(NameRequired):
https://redis.io/docs/latest/commands/xreadgroup/#differences-between-xread-and-xreadgroup
polling_interval:
Polling interval in seconds.
min_idle_time:
Minimum idle time in milliseconds for a message to be eligible for claiming via XAUTOCLAIM.
Messages that have been pending (unacknowledged) for at least this duration can be
reclaimed by this consumer. Only applicable when using consumer groups.

https://redis.io/docs/latest/commands/xautoclaim/
"""

__slots__ = (
Expand All @@ -42,6 +48,7 @@ class StreamSub(NameRequired):
"last_id",
"max_records",
"maxlen",
"min_idle_time",
"name",
"no_ack",
"polling_interval",
Expand All @@ -58,11 +65,15 @@ def __init__(
last_id: str | None = None,
maxlen: int | None = None,
max_records: int | None = None,
min_idle_time: int | None = None,
) -> None:
if (group and not consumer) or (not group and consumer):
msg = "You should specify `group` and `consumer` both"
raise SetupError(msg)

if last_id is None:
last_id = ">" if group and consumer else "$"

if group and consumer:
if last_id != ">":
if polling_interval:
Expand All @@ -86,9 +97,6 @@ def __init__(
stacklevel=1,
)

if last_id is None:
last_id = ">" if group and consumer else "$"

super().__init__(stream)

self.group = group
Expand All @@ -99,6 +107,7 @@ def __init__(
self.last_id = last_id
self.maxlen = maxlen
self.max_records = max_records
self.min_idle_time = min_idle_time

def add_prefix(self, prefix: str) -> "StreamSub":
new_stream = deepcopy(self)
Expand Down
Loading
Loading