diff --git a/docs/docs/en/redis/index.md b/docs/docs/en/redis/index.md index 13b545cfb6..1ca0feedd5 100644 --- a/docs/docs/en/redis/index.md +++ b/docs/docs/en/redis/index.md @@ -22,15 +22,15 @@ search: #### 1. Pub/Sub -[**Redis Pub/Sub**](https://redis.io/docs/interact/pubsub/){.external-link target="_blank"} implements the Publish/Subscribe messaging paradigm where senders (publishers) are not programmed to send their messages to specific receivers (subscribers). Instead, published messages are characterized into channels, without knowledge of what (if any) subscribers there may be. +[**Redis Pub/Sub**](https://redis.io/docs/latest/develop/pubsub/){.external-link target="_blank"} implements the Publish/Subscribe messaging paradigm where senders (publishers) are not programmed to send their messages to specific receivers (subscribers). Instead, published messages are characterized into channels, without knowledge of what (if any) subscribers there may be. #### 2. List -In contrast, [**Redis List**](https://redis.io/docs/data-types/lists/#common-use-cases-for-lists){.external-link target="_blank"} capitalizes on a straightforward list data structure. Messages, pushed by producers, form a first-in, first-out (FIFO) queue. Consumers, in turn, retrieve messages from this ordered list, providing a simplified mechanism for sequential message processing. +In contrast, [**Redis List**](https://redis.io/docs/latest/develop/data-types/lists/#common-use-cases-for-lists){.external-link target="_blank"} capitalizes on a straightforward list data structure. Messages, pushed by producers, form a first-in, first-out (FIFO) queue. Consumers, in turn, retrieve messages from this ordered list, providing a simplified mechanism for sequential message processing. #### 3. Streams -[**Redis Streams**](https://redis.io/docs/data-types/streams/){.external-link target="_blank"} introduce a more advanced concept, embracing an append-only log-like structure. Messages, organized as entries, allow for nuanced features like consumer groups, enabling parallel processing, and acknowledgment for precise message handling. Streams excel in scenarios demanding scalability, persistence, and ordered message processing. +[**Redis Streams**](https://redis.io/docs/latest/develop/data-types/streams/){.external-link target="_blank"} introduce a more advanced concept, embracing an append-only log-like structure. Messages, organized as entries, allow for nuanced features like consumer groups, enabling parallel processing, and acknowledgment for precise message handling. Streams excel in scenarios demanding scalability, persistence, and ordered message processing. Ultimately, the choice between **Pub/Sub**, **List**, or **Streams** hinges on the specific needs of the application. Redis Pub/Sub suits real-time communication, List offers simplicity in ordered processing, while Streams cater to complex, scalable, and ordered message handling, each providing tailored solutions based on distinct use case requirements. @@ -58,4 +58,4 @@ Here's a simplified code example demonstrating how to establish a connection to This minimal example illustrates how **FastStream** simplifies the process of connecting to **Redis** and performing basic message processing from the *in-channel* to the *out-channel*. Depending on your specific use case and requirements, you can further customize your **Redis** integration with **FastStream** to build efficient and responsive applications. -For more advanced configuration options and detailed usage instructions, please refer to the **FastStream Redis** documentation and the [official Redis documentation](https://redis.io/documentation){.external-link target="_blank"}. +For more advanced configuration options and detailed usage instructions, please refer to the **FastStream Redis** documentation and the [official Redis documentation](https://redis.io/docs/latest/){.external-link target="_blank"}. diff --git a/docs/docs/en/redis/list/index.md b/docs/docs/en/redis/list/index.md index c17045d4a7..d53b2cd165 100644 --- a/docs/docs/en/redis/list/index.md +++ b/docs/docs/en/redis/list/index.md @@ -10,6 +10,6 @@ search: # Redis Lists -[Redis Lists](https://redis.io/docs/data-types/lists/){.external-link target="_blank"} are a simple and flexible data structure that function as ordered collections of strings. They are similar to lists in programming languages, and **Redis** provides commands to perform a variety of operations such as adding, retrieving, and removing elements from either end of the list. +[Redis Lists](https://redis.io/docs/latest/develop/data-types/lists/){.external-link target="_blank"} are a simple and flexible data structure that function as ordered collections of strings. They are similar to lists in programming languages, and **Redis** provides commands to perform a variety of operations such as adding, retrieving, and removing elements from either end of the list. **Redis Lists** are particularly useful for scenarios such as implementing queues, effectively using the list as a **FIFO** (First-In-First-Out) structure. diff --git a/docs/docs/en/redis/pubsub/index.md b/docs/docs/en/redis/pubsub/index.md index 795aaf16d8..c327e10166 100644 --- a/docs/docs/en/redis/pubsub/index.md +++ b/docs/docs/en/redis/pubsub/index.md @@ -10,7 +10,7 @@ search: # Redis Channels -[**Redis Pub/Sub Channels**](https://redis.io/docs/interact/pubsub/){.external-link target="_blank"} are a feature of **Redis** that enables messaging between clients through a publish/subscribe (pub/sub) pattern. A **Redis** channel is essentially a medium through which messages are transmitted. Different clients can subscribe to these channels to listen for messages, while other clients can publish messages to these channels. +[**Redis Pub/Sub Channels**](https://redis.io/docs/latest/develop/pubsub/){.external-link target="_blank"} are a feature of **Redis** that enables messaging between clients through a publish/subscribe (pub/sub) pattern. A **Redis** channel is essentially a medium through which messages are transmitted. Different clients can subscribe to these channels to listen for messages, while other clients can publish messages to these channels. When a message is published to a **Redis** channel, all subscribers to that channel receive the message instantly. This makes **Redis** channels suitable for a variety of real-time applications such as chat rooms, notifications, live updates, and many more use cases where messages must be broadcast promptly to multiple clients. diff --git a/docs/docs/en/redis/streams/claiming.md b/docs/docs/en/redis/streams/claiming.md new file mode 100644 index 0000000000..6c5e149e8a --- /dev/null +++ b/docs/docs/en/redis/streams/claiming.md @@ -0,0 +1,149 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 10 +--- + +# Redis Stream Message Claiming + +When working with Redis Stream Consumer Groups, there may be situations where messages remain in a pending state because a consumer failed to process them. FastStream provides a mechanism to automatically claim these pending messages using Redis's `XAUTOCLAIM` command through the `min_idle_time` parameter. + +## What is Message Claiming? + +In Redis Streams, when a consumer reads a message from a consumer group but fails to acknowledge it (due to a crash, network issue, or processing error), the message remains in the [**Pending Entries List (PEL)**](https://redis.io/docs/latest/develop/data-types/streams/#working-with-multiple-consumer-groups) of that consumer group. These unacknowledged messages are associated with the original consumer and have an "idle time" - the duration since they were last delivered. + +Message claiming allows another consumer to take ownership of these pending messages that have been idle for too long, ensuring that messages don't get stuck and workload can be redistributed among healthy consumers. + +## Using `min_idle_time` for Automatic Claiming + +FastStream's `StreamSub` provides a `min_idle_time` parameter that enables automatic claiming of pending messages via Redis's `XAUTOCLAIM` command. When set, the consumer will automatically scan for and claim messages that have been pending for at least the specified duration (in milliseconds). + +### Basic Example + +Here's a simple example that demonstrates automatic message claiming: + +```python linenums="1" +{! docs_src/redis/stream/claiming_basic.py !} +``` + +## How It Works + +When `min_idle_time` is set: + +1. **Circular Scanning**: Instead of using `XREADGROUP` to read new messages, the consumer uses `XAUTOCLAIM` to scan the Pending Entries List +2. **Idle Time Check**: Only messages that have been pending for at least `min_idle_time` milliseconds are claimed +3. **Ownership Transfer**: Claimed messages are automatically transferred from the failing consumer to the claiming consumer +4. **Continuous Processing**: The scanning process is circular - after reaching the end of the [PEL](https://redis.io/docs/latest/develop/data-types/streams/#working-with-multiple-consumer-groups), it starts over from the beginning + +### Practical Use Case + +Consider a scenario where you have multiple workers processing orders: + +```python linenums="1" +from faststream import FastStream +from faststream.redis import RedisBroker, StreamSub + +broker = RedisBroker() +app = FastStream(broker) + +# Worker that might fail +@broker.subscriber( + stream=StreamSub( + "orders", + group="order-processors", + consumer="worker-1", + ) +) +async def worker_that_might_fail(order_id: str): + # Process order - might crash before acknowledging + await process_complex_order(order_id) + # If crash happens here, message stays pending + +# Backup worker with message claiming +@broker.subscriber( + stream=StreamSub( + "orders", + group="order-processors", + consumer="worker-2", + min_idle_time=10000, # 10 seconds + ) +) +async def backup_worker(order_id: str): + # This worker will automatically pick up messages + # that worker-1 failed to process within 10 seconds + print(f"Recovering and processing order: {order_id}") + await process_complex_order(order_id) +``` + +## Combining with Manual Acknowledgment + +You can combine `min_idle_time` with manual acknowledgment policies for fine-grained control: + +```python linenums="1" +{! docs_src/redis/stream/claiming_manual_ack.py !} +``` + +## Configuration Guidelines + +### Choosing `min_idle_time` + +The appropriate `min_idle_time` value depends on your use case: + +- **Short duration (1-5 seconds)**: For fast-processing tasks where quick failure recovery is needed +- **Medium duration (10-60 seconds)**: For most general-purpose applications with moderate processing times +- **Long duration (5-30 minutes)**: For long-running tasks where you want to ensure a consumer has truly failed + +!!! warning + Setting `min_idle_time` too low may cause messages to be unnecessarily transferred between healthy consumers. Set it based on your typical message processing time plus a safety buffer. + +### Deployment Patterns + +#### Pattern 1: Dedicated Claiming Worker +Deploy a separate worker specifically for claiming abandoned messages: + +```python +# Main workers (fast path) +@broker.subscriber( + stream=StreamSub("tasks", group="workers", consumer="main-1") +) +async def main_worker(task): ... + +# Claiming worker (recovery path) +@broker.subscriber( + stream=StreamSub("tasks", group="workers", consumer="claimer", min_idle_time=15000) +) +async def claiming_worker(task): ... +``` + +#### Pattern 2: All Workers Can Claim +All workers can claim messages from each other: + +```python +# Each worker can both process new messages and claim abandoned ones +@broker.subscriber( + stream=StreamSub( + "tasks", + group="workers", + consumer=f"worker-{instance_id}", + min_idle_time=10000, + ) +) +async def worker(task): ... +``` + +## Technical Details + +- **Start ID**: FastStream automatically manages the `start_id` parameter for `XAUTOCLAIM`, enabling circular scanning through the Pending Entries List +- **Empty Results**: When no pending messages meet the idle time criteria, the consumer will continue polling +- **ACK Handling**: Claimed messages must still be acknowledged using `msg.ack()` to be removed from the [PEL](https://redis.io/docs/latest/develop/data-types/streams/#working-with-multiple-consumer-groups) + +## References + +For more information about Redis Streams message claiming: + +- [Redis XAUTOCLAIM Documentation](https://redis.io/docs/latest/commands/xautoclaim/){.external-link target="_blank"} +- [Redis Streams Claiming Guide](https://redis.io/docs/latest/develop/data-types/streams/#claiming-and-the-delivery-counter){.external-link target="_blank"} diff --git a/docs/docs/en/redis/streams/index.md b/docs/docs/en/redis/streams/index.md index f9da536f9f..4dc2f743eb 100644 --- a/docs/docs/en/redis/streams/index.md +++ b/docs/docs/en/redis/streams/index.md @@ -10,7 +10,7 @@ search: # Redis Streams -[Redis Streams](https://redis.io/docs/data-types/streams/){.external-link target="_blank"} are a data structure introduced in **Redis 5.0** that offer a reliable and highly scalable way to handle streams of data. They are similar to logging systems like **Apache Kafka**, where data is stored in a log structure and can be consumed by multiple clients. **Streams** provide a sequence of ordered messages, and they are designed to handle a high volume of data by allowing partitioning and multiple consumers. +[Redis Streams](https://redis.io/docs/latest/develop/tools/insight/tutorials/insight-stream-consumer/){.external-link target="_blank"} are a data structure introduced in **Redis 5.0** that offer a reliable and highly scalable way to handle streams of data. They are similar to logging systems like **Apache Kafka**, where data is stored in a log structure and can be consumed by multiple clients. **Streams** provide a sequence of ordered messages, and they are designed to handle a high volume of data by allowing partitioning and multiple consumers. A **Redis Stream** is a collection of entries, each having an ID (which includes a timestamp) and a set of key-value pairs representing the message data. Clients can add to a stream by generating a new entry and can read from a stream to consume its messages. diff --git a/docs/docs/navigation_template.txt b/docs/docs/navigation_template.txt index f981eb876a..ec5b22b200 100644 --- a/docs/docs/navigation_template.txt +++ b/docs/docs/navigation_template.txt @@ -117,6 +117,7 @@ search: - [Groups](redis/streams/groups.md) - [Batching](redis/streams/batch.md) - [Acknowledgement](redis/streams/ack.md) + - [Claiming](redis/streams/claiming.md) - [RPC](redis/rpc.md) - [Pipeline](redis/pipeline.md) - [Message Information](redis/message.md) diff --git a/docs/docs_src/redis/stream/claiming_basic.py b/docs/docs_src/redis/stream/claiming_basic.py new file mode 100644 index 0000000000..97c901c615 --- /dev/null +++ b/docs/docs_src/redis/stream/claiming_basic.py @@ -0,0 +1,22 @@ +from faststream import FastStream, Logger +from faststream.redis import RedisBroker, StreamSub + +broker = RedisBroker() +app = FastStream(broker) + + +@broker.subscriber( + stream=StreamSub( + "orders", + group="processors", + consumer="worker-1", + min_idle_time=5000, # Claim messages idle for 5+ seconds + ) +) +async def handle(order_id: str, logger: Logger): + logger.info(f"Processing order: {order_id}") + + +@app.after_startup +async def publish_test(): + await broker.publish("order-123", stream="orders") diff --git a/docs/docs_src/redis/stream/claiming_manual_ack.py b/docs/docs_src/redis/stream/claiming_manual_ack.py new file mode 100644 index 0000000000..f26400da04 --- /dev/null +++ b/docs/docs_src/redis/stream/claiming_manual_ack.py @@ -0,0 +1,30 @@ +from faststream import AckPolicy, FastStream, Logger +from faststream.redis import RedisBroker, RedisStreamMessage, StreamSub, Redis + +broker = RedisBroker() +app = FastStream(broker) + + +@broker.subscriber( + stream=StreamSub( + "critical-tasks", + group="task-workers", + consumer="worker-failover", + min_idle_time=30000, # 30 seconds + ), + ack_policy=AckPolicy.MANUAL, +) +async def handle(msg: RedisStreamMessage, logger: Logger, redis: Redis): + try: + # Process the claimed message + logger.info(f"Processing: {msg.body}") + # Explicitly acknowledge after successful processing + await msg.ack(redis=redis, group="critical-tasks") + except Exception as e: + # Don't acknowledge - let it be claimed by another consumer + logger.error(f"Failed to process: {e}") + + +@app.after_startup +async def publish_test(): + await broker.publish("critical-task-1", stream="critical-tasks") diff --git a/faststream/redis/schemas/stream_sub.py b/faststream/redis/schemas/stream_sub.py index 9c1b39af34..8fbeee96e5 100644 --- a/faststream/redis/schemas/stream_sub.py +++ b/faststream/redis/schemas/stream_sub.py @@ -33,6 +33,12 @@ class StreamSub(NameRequired): https://redis.io/docs/latest/commands/xreadgroup/#differences-between-xread-and-xreadgroup polling_interval: Polling interval in seconds. + min_idle_time: + Minimum idle time in milliseconds for a message to be eligible for claiming via XAUTOCLAIM. + Messages that have been pending (unacknowledged) for at least this duration can be + reclaimed by this consumer. Only applicable when using consumer groups. + + https://redis.io/docs/latest/commands/xautoclaim/ """ __slots__ = ( @@ -42,6 +48,7 @@ class StreamSub(NameRequired): "last_id", "max_records", "maxlen", + "min_idle_time", "name", "no_ack", "polling_interval", @@ -58,11 +65,15 @@ def __init__( last_id: str | None = None, maxlen: int | None = None, max_records: int | None = None, + min_idle_time: int | None = None, ) -> None: if (group and not consumer) or (not group and consumer): msg = "You should specify `group` and `consumer` both" raise SetupError(msg) + if last_id is None: + last_id = ">" if group and consumer else "$" + if group and consumer: if last_id != ">": if polling_interval: @@ -86,9 +97,6 @@ def __init__( stacklevel=1, ) - if last_id is None: - last_id = ">" if group and consumer else "$" - super().__init__(stream) self.group = group @@ -99,6 +107,7 @@ def __init__( self.last_id = last_id self.maxlen = maxlen self.max_records = max_records + self.min_idle_time = min_idle_time def add_prefix(self, prefix: str) -> "StreamSub": new_stream = deepcopy(self) diff --git a/faststream/redis/subscriber/usecases/stream_subscriber.py b/faststream/redis/subscriber/usecases/stream_subscriber.py index 3166511536..0e799fa394 100644 --- a/faststream/redis/subscriber/usecases/stream_subscriber.py +++ b/faststream/redis/subscriber/usecases/stream_subscriber.py @@ -1,5 +1,5 @@ import math -from collections.abc import AsyncIterator, Awaitable, Callable +from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine from typing import TYPE_CHECKING, Any, Optional, TypeAlias from redis.exceptions import ResponseError @@ -47,6 +47,8 @@ def __init__( assert config.stream_sub self._stream_sub = config.stream_sub self.last_id = config.stream_sub.last_id + self.min_idle_time = config.stream_sub.min_idle_time + self.autoclaim_start_id = b"0-0" @property def stream_sub(self) -> "StreamSub": @@ -139,7 +141,7 @@ def read( noack=stream.no_ack, ) - else: + elif self.stream_sub.min_idle_time is None: def read( last_id: str, @@ -164,6 +166,48 @@ def read( count=stream.max_records, ) + else: + + def read( + _: str, + ) -> Coroutine[ + Any, + Any, + tuple[ + tuple[ + TopicName, + tuple[ + tuple[ + Offset, + dict[bytes, bytes], + ], + ..., + ], + ], + ..., + ], + ]: + async def xautoclaim() -> tuple[ + tuple[TopicName, tuple[tuple[Offset, dict[bytes, bytes]], ...]], ... + ]: + stream_message = await client.xautoclaim( + name=self.stream_sub.name, + groupname=self.stream_sub.group, + consumername=self.stream_sub.consumer, + min_idle_time=self.min_idle_time, + start_id=self.autoclaim_start_id, + count=1, + ) + stream_name = self.stream_sub.name.encode() + (next_id, messages, _) = stream_message + # Update start_id for next call + self.autoclaim_start_id = next_id + if not messages: + return () + return ((stream_name, messages),) + + return xautoclaim() + await super().start(read) @override @@ -175,17 +219,41 @@ async def get_one( assert not self.calls, ( "You can't use `get_one` method if subscriber has registered handlers." ) + if self.min_idle_time is None: + if self.stream_sub.group and self.stream_sub.consumer: + stream_message = await self._client.xreadgroup( + groupname=self.stream_sub.group, + consumername=self.stream_sub.consumer, + streams={self.stream_sub.name: self.last_id}, + block=math.ceil(timeout * 1000), + count=1, + ) + else: + stream_message = await self._client.xread( + {self.stream_sub.name: self.last_id}, + block=math.ceil(timeout * 1000), + count=1, + ) + if not stream_message: + return None - stream_message = await self._client.xread( - {self.stream_sub.name: self.last_id}, - block=math.ceil(timeout * 1000), - count=1, - ) - - if not stream_message: - return None - - ((stream_name, ((message_id, raw_message),)),) = stream_message + ((stream_name, ((message_id, raw_message),)),) = stream_message + else: + stream_message = await self._client.xautoclaim( + name=self.stream_sub.name, + groupname=self.stream_sub.group, + consumername=self.stream_sub.consumer, + min_idle_time=self.min_idle_time, + start_id=self.autoclaim_start_id, + count=1, + ) + (next_id, messages, _) = stream_message + # Update start_id for next call + self.autoclaim_start_id = next_id + if not messages: + return None + stream_name = self.stream_sub.name.encode() + ((message_id, raw_message),) = messages self.last_id = message_id.decode() @@ -221,16 +289,41 @@ async def __aiter__(self) -> AsyncIterator["RedisStreamMessage"]: # type: ignor async_parser, async_decoder = self._get_parser_and_decoder() while True: - stream_message = await self._client.xread( - {self.stream_sub.name: self.last_id}, - block=math.ceil(timeout * 1000), - count=1, - ) - - if not stream_message: - continue - - ((stream_name, ((message_id, raw_message),)),) = stream_message + if self.min_idle_time is None: + if self.stream_sub.group and self.stream_sub.consumer: + stream_message = await self._client.xreadgroup( + groupname=self.stream_sub.group, + consumername=self.stream_sub.consumer, + streams={self.stream_sub.name: self.last_id}, + block=math.ceil(timeout * 1000), + count=1, + ) + else: + stream_message = await self._client.xread( + {self.stream_sub.name: self.last_id}, + block=math.ceil(timeout * 1000), + count=1, + ) + if not stream_message: + continue + + ((stream_name, ((message_id, raw_message),)),) = stream_message + else: + stream_message = await self._client.xautoclaim( + name=self.stream_sub.name, + groupname=self.stream_sub.group, + consumername=self.stream_sub.consumer, + min_idle_time=self.min_idle_time, + start_id=self.autoclaim_start_id, + count=1, + ) + (next_id, messages, _) = stream_message + # Update start_id for next call + self.autoclaim_start_id = next_id + if not messages: + continue + stream_name = self.stream_sub.name.encode() + ((message_id, raw_message),) = messages self.last_id = message_id.decode() diff --git a/tests/brokers/redis/test_autoclaim.py b/tests/brokers/redis/test_autoclaim.py new file mode 100644 index 0000000000..b3fb44f1f7 --- /dev/null +++ b/tests/brokers/redis/test_autoclaim.py @@ -0,0 +1,361 @@ +import asyncio +from contextlib import suppress +from unittest.mock import MagicMock + +import pytest + +from faststream.redis import ( + StreamSub, +) +from tests.brokers.base.consume import BrokerRealConsumeTestcase + +from .basic import RedisTestcaseConfig + + +@pytest.mark.connected() +@pytest.mark.redis() +@pytest.mark.asyncio() +class TestAutoClaim(RedisTestcaseConfig, BrokerRealConsumeTestcase): + @pytest.mark.slow() + async def test_consume_stream_with_min_idle_time( + self, + queue: str, + mock: MagicMock, + ) -> None: + """Test consuming messages using XAUTOCLAIM with min_idle_time.""" + event = asyncio.Event() + + consume_broker = self.get_broker(apply_types=True) + + @consume_broker.subscriber( + stream=StreamSub( + queue, + group="test_group", + consumer="consumer1", + min_idle_time=100, # 100ms + ), + ) + async def handler(msg: str) -> None: + mock(msg) + event.set() + + async with self.patch_broker(consume_broker) as br: + await br.start() + + # First, publish a message and let it become pending + await br.publish("pending_message", stream=queue) + + # Wait a bit to ensure message becomes idle + await asyncio.sleep(0.2) + + # The subscriber with XAUTOCLAIM should reclaim it + await asyncio.wait( + (asyncio.create_task(event.wait()),), + timeout=3, + ) + + assert event.is_set() + mock.assert_called_once_with("pending_message") + + @pytest.mark.slow() + async def test_get_one_with_min_idle_time( + self, + queue: str, + ) -> None: + """Test get_one() with min_idle_time.""" + broker = self.get_broker(apply_types=True) + + async with self.patch_broker(broker) as br: + await br.start() + + # First, create a pending message + await br.publish({"data": "pending"}, stream=queue) + with suppress(Exception): + await br._connection.xgroup_create( + queue, "idle_group", id="0", mkstream=True + ) + + # Read it but don't ack to make it pending + await br._connection.xreadgroup( + groupname="idle_group", + consumername="temp_consumer", + streams={queue: ">"}, + count=1, + ) + + # Wait for it to become idle + await asyncio.sleep(0.1) + + # Now use get_one with min_idle_time + subscriber = br.subscriber( + stream=StreamSub( + queue, + group="idle_group", + consumer="claiming_consumer", + min_idle_time=1, + ) + ) + + message = await subscriber.get_one(timeout=3) + + assert message is not None + decoded = await message.decode() + assert decoded == {"data": "pending"} + + @pytest.mark.slow() + async def test_get_one_with_min_idle_time_no_pending( + self, + queue: str, + mock: MagicMock, + ) -> None: + """Test get_one() with min_idle_time when no pending messages.""" + broker = self.get_broker(apply_types=True) + + subscriber = broker.subscriber( + stream=StreamSub( + queue, + group="empty_group", + consumer="consumer1", + min_idle_time=100, + ) + ) + + async with self.patch_broker(broker) as br: + await br.start() + + # Should return None after timeout + result = await subscriber.get_one(timeout=0.5) + mock(result) + + mock.assert_called_once_with(None) + + @pytest.mark.slow() + async def test_iterator_with_min_idle_time( + self, + queue: str, + mock: MagicMock, + ) -> None: + """Test async iterator with min_idle_time.""" + broker = self.get_broker(apply_types=True) + + async with self.patch_broker(broker) as br: + await br.start() + + # Create pending messages + await br.publish({"data": "msg1"}, stream=queue) + await br.publish({"data": "msg2"}, stream=queue) + + with suppress(Exception): + await br._connection.xgroup_create( + queue, "iter_group", id="0", mkstream=True + ) + + # Read them but don't ack + await br._connection.xreadgroup( + groupname="iter_group", + consumername="temp", + streams={queue: ">"}, + count=10, + ) + + await asyncio.sleep(0.1) + + subscriber = br.subscriber( + stream=StreamSub( + queue, + group="iter_group", + consumer="iter_consumer", + min_idle_time=1, + ) + ) + + count = 0 + async for msg in subscriber: + decoded = await msg.decode() + mock(decoded) + count += 1 + if count >= 2: + break + + assert count == 2 + mock.assert_any_call({"data": "msg1"}) + mock.assert_any_call({"data": "msg2"}) + + @pytest.mark.slow() + async def test_consume_stream_batch_with_min_idle_time( + self, + queue: str, + mock: MagicMock, + ) -> None: + """Test batch consuming with min_idle_time.""" + event = asyncio.Event() + + consume_broker = self.get_broker(apply_types=True) + + @consume_broker.subscriber( + stream=StreamSub( + queue, + group="batch_group", + consumer="batch_consumer", + batch=True, + min_idle_time=1, + ), + ) + async def handler(msg: list) -> None: + mock(msg) + event.set() + + async with self.patch_broker(consume_broker) as br: + await br.start() + + # Create a pending message first + await br.publish({"data": "batch_msg"}, stream=queue) + + with suppress(Exception): + await br._connection.xgroup_create( + queue, "batch_group", id="0", mkstream=True + ) + + # Read but don't ack + await br._connection.xreadgroup( + groupname="batch_group", + consumername="temp", + streams={queue: ">"}, + count=1, + ) + + await asyncio.sleep(0.1) + + # Now the subscriber should reclaim it + await asyncio.wait( + (asyncio.create_task(event.wait()),), + timeout=3, + ) + + assert event.is_set() + # In batch mode, should receive list + assert mock.call_count == 1 + called_with = mock.call_args[0][0] + assert isinstance(called_with, list) + assert len(called_with) > 0 + + @pytest.mark.slow() + async def test_xautoclaim_with_deleted_messages( + self, + queue: str, + mock: MagicMock, + ) -> None: + """Test XAUTOCLAIM behavior when messages are deleted from stream.""" + consume_broker = self.get_broker(apply_types=True) + + async with self.patch_broker(consume_broker) as br: + await br.start() + + # Create and consume a message without ack + msg_id = await br.publish({"data": "will_delete"}, stream=queue) + + with suppress(Exception): + await br._connection.xgroup_create( + queue, "delete_group", id="0", mkstream=True + ) + + # Read to make it pending + await br._connection.xreadgroup( + groupname="delete_group", + consumername="temp", + streams={queue: ">"}, + count=1, + ) + + # Delete the message from stream + await br._connection.xdel(queue, msg_id) + + await asyncio.sleep(0.1) + + # XAUTOCLAIM should handle deleted messages gracefully + subscriber = br.subscriber( + stream=StreamSub( + queue, + group="delete_group", + consumer="delete_consumer", + min_idle_time=1, + ) + ) + + # Should timeout gracefully without errors + result = await subscriber.get_one(timeout=0.5) + mock(result) + + # Should return None (no valid messages to claim) + mock.assert_called_once_with(None) + + @pytest.mark.slow() + async def test_xautoclaim_circular_scanning_with_idle_timeout( + self, + queue: str, + mock: MagicMock, + ) -> None: + """Test that XAUTOCLAIM scans circularly and claims messages as they become idle.""" + consume_broker = self.get_broker(apply_types=True) + + async with self.patch_broker(consume_broker) as br: + await br.start() + + # Create multiple pending messages + msg_ids = [] + for i in range(5): + msg_id = await br.publish({"data": f"msg{i}"}, stream=queue) + msg_ids.append(msg_id) + + with suppress(Exception): + await br._connection.xgroup_create( + queue, "circular_group", id="0", mkstream=True + ) + + # Read all messages with consumer1 but don't ack - making them pending + await br._connection.xreadgroup( + groupname="circular_group", + consumername="consumer1", + streams={queue: ">"}, + count=10, + ) + + # Wait for messages to become idle + await asyncio.sleep(0.1) + + # Create subscriber with XAUTOCLAIM + subscriber = br.subscriber( + stream=StreamSub( + queue, + group="circular_group", + consumer="consumer2", + min_idle_time=1, + ) + ) + + # First pass: claim all messages one by one + claimed_messages_first_pass = [] + for _ in range(5): + msg = await subscriber.get_one(timeout=1) + if msg: + decoded = await msg.decode() + claimed_messages_first_pass.append(decoded) + mock(f"first_pass_{decoded['data']}") + + # Should have claimed all 5 messages in order + assert len(claimed_messages_first_pass) == 5 + assert claimed_messages_first_pass == [{"data": f"msg{i}"} for i in range(5)] + + # After reaching the end, XAUTOCLAIM should restart from "0-0" + # and scan circularly - messages are still pending since we didn't ACK them + # Second pass: verify circular behavior by claiming messages again + msg = await subscriber.get_one(timeout=1) + assert msg is not None + decoded = await msg.decode() + # Should get msg0 again (circular scan restarted) + assert decoded["data"] == "msg0" + mock("second_pass_msg0") + + # Verify messages were claimed in both passes + mock.assert_any_call("first_pass_msg0") + mock.assert_any_call("second_pass_msg0") diff --git a/tests/brokers/redis/test_config.py b/tests/brokers/redis/test_config.py index c226cafd23..dad8ceb8ee 100644 --- a/tests/brokers/redis/test_config.py +++ b/tests/brokers/redis/test_config.py @@ -47,20 +47,6 @@ def test_stream_with_group() -> None: assert config.ack_policy is AckPolicy.REJECT_ON_ERROR -@pytest.mark.redis() -def test_custom_ack() -> None: - config = RedisSubscriberConfig( - _outer_config=MagicMock(), - stream_sub=StreamSub( - "test_stream", - group="test_group", - consumer="test_consumer", - ), - _ack_policy=AckPolicy.ACK, - ) - assert config.ack_policy is AckPolicy.ACK - - @pytest.mark.redis() def test_stream_sub_with_no_ack_group() -> None: with pytest.warns( @@ -74,11 +60,40 @@ def test_stream_sub_with_no_ack_group() -> None: group="test_group", consumer="test_consumer", no_ack=True, + last_id="$", ), ) assert config.ack_policy is AckPolicy.MANUAL +@pytest.mark.redis() +def test_stream_with_group_and_min_idle_time() -> None: + config = RedisSubscriberConfig( + _outer_config=MagicMock(), + stream_sub=StreamSub( + "test_stream", + group="test_group", + consumer="test_consumer", + min_idle_time=1000, + ), + ) + assert config.ack_policy is AckPolicy.REJECT_ON_ERROR + + +@pytest.mark.redis() +def test_custom_ack() -> None: + config = RedisSubscriberConfig( + _outer_config=MagicMock(), + stream_sub=StreamSub( + "test_stream", + group="test_group", + consumer="test_consumer", + ), + _ack_policy=AckPolicy.ACK, + ) + assert config.ack_policy is AckPolicy.ACK + + @pytest.mark.redis() def test_no_ack() -> None: config = RedisSubscriberConfig(_outer_config=MagicMock(), _no_ack=True) diff --git a/tests/docs/redis/stream/test_claiming.py b/tests/docs/redis/stream/test_claiming.py new file mode 100644 index 0000000000..b8977e073a --- /dev/null +++ b/tests/docs/redis/stream/test_claiming.py @@ -0,0 +1,23 @@ +import pytest + +from faststream.redis import TestApp, TestRedisBroker + + +@pytest.mark.connected() +@pytest.mark.redis() +@pytest.mark.asyncio() +async def test_stream_claiming_basic() -> None: + from docs.docs_src.redis.stream.claiming_basic import app, broker, handle + + async with TestRedisBroker(broker), TestApp(app): + handle.mock.assert_called_once_with("order-123") + + +@pytest.mark.connected() +@pytest.mark.redis() +@pytest.mark.asyncio() +async def test_stream_claiming_manual_ack() -> None: + from docs.docs_src.redis.stream.claiming_manual_ack import app, broker, handle + + async with TestRedisBroker(broker), TestApp(app): + handle.mock.assert_called_once_with("critical-task-1")