Skip to content

Commit 80f391d

Browse files
committed
Add support for min_idle_time in Redis StreamSub and XAUTOCLAIM
Signed-off-by: Victor Maleca <powersemmi@gmail.com>
1 parent c87358d commit 80f391d

File tree

4 files changed

+498
-43
lines changed

4 files changed

+498
-43
lines changed

faststream/redis/schemas/stream_sub.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ class StreamSub(NameRequired):
3333
https://redis.io/docs/latest/commands/xreadgroup/#differences-between-xread-and-xreadgroup
3434
polling_interval:
3535
Polling interval in seconds.
36+
min_idle_time:
37+
Minimum idle time in milliseconds for a message to be eligible for claiming via XAUTOCLAIM.
38+
Messages that have been pending (unacknowledged) for at least this duration can be
39+
reclaimed by this consumer. Only applicable when using consumer groups.
40+
41+
https://redis.io/docs/latest/commands/xautoclaim/
3642
"""
3743

3844
__slots__ = (
@@ -42,6 +48,7 @@ class StreamSub(NameRequired):
4248
"last_id",
4349
"max_records",
4450
"maxlen",
51+
"min_idle_time",
4552
"name",
4653
"no_ack",
4754
"polling_interval",
@@ -58,11 +65,15 @@ def __init__(
5865
last_id: str | None = None,
5966
maxlen: int | None = None,
6067
max_records: int | None = None,
68+
min_idle_time: int | None = None,
6169
) -> None:
6270
if (group and not consumer) or (not group and consumer):
6371
msg = "You should specify `group` and `consumer` both"
6472
raise SetupError(msg)
6573

74+
if last_id is None:
75+
last_id = ">" if group and consumer else "$"
76+
6677
if group and consumer:
6778
if last_id != ">":
6879
if polling_interval:
@@ -86,9 +97,6 @@ def __init__(
8697
stacklevel=1,
8798
)
8899

89-
if last_id is None:
90-
last_id = ">" if group and consumer else "$"
91-
92100
super().__init__(stream)
93101

94102
self.group = group
@@ -99,6 +107,7 @@ def __init__(
99107
self.last_id = last_id
100108
self.maxlen = maxlen
101109
self.max_records = max_records
110+
self.min_idle_time = min_idle_time
102111

103112
def add_prefix(self, prefix: str) -> "StreamSub":
104113
new_stream = deepcopy(self)

faststream/redis/subscriber/usecases/stream_subscriber.py

Lines changed: 121 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import math
2-
from collections.abc import AsyncIterator, Awaitable, Callable
2+
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine
33
from typing import TYPE_CHECKING, Any, Optional, TypeAlias
44

55
from redis.exceptions import ResponseError
@@ -47,6 +47,8 @@ def __init__(
4747
assert config.stream_sub
4848
self._stream_sub = config.stream_sub
4949
self.last_id = config.stream_sub.last_id
50+
self.min_idle_time = config.stream_sub.min_idle_time
51+
self.autoclaim_start_id = "0-0"
5052

5153
@property
5254
def stream_sub(self) -> "StreamSub":
@@ -139,7 +141,7 @@ def read(
139141
noack=stream.no_ack,
140142
)
141143

142-
else:
144+
elif self.stream_sub.min_idle_time is None:
143145

144146
def read(
145147
last_id: str,
@@ -164,6 +166,50 @@ def read(
164166
count=stream.max_records,
165167
)
166168

169+
else:
170+
171+
def read(
172+
_: str,
173+
) -> Coroutine[
174+
Any,
175+
Any,
176+
tuple[
177+
tuple[
178+
TopicName,
179+
tuple[
180+
tuple[
181+
Offset,
182+
dict[bytes, bytes],
183+
],
184+
...,
185+
],
186+
],
187+
...,
188+
],
189+
]:
190+
async def xautoclaim() -> tuple[
191+
tuple[TopicName, tuple[tuple[Offset, dict[bytes, bytes]], ...]], ...
192+
]:
193+
stream_message = await client.xautoclaim(
194+
name=self.stream_sub.name,
195+
groupname=self.stream_sub.group,
196+
consumername=self.stream_sub.consumer,
197+
min_idle_time=self.min_idle_time,
198+
start_id=self.autoclaim_start_id,
199+
count=1,
200+
)
201+
stream_name = self.stream_sub.name.encode()
202+
(next_id, messages, _) = stream_message
203+
# Update start_id for next call
204+
self.autoclaim_start_id = (
205+
next_id.decode() if isinstance(next_id, bytes) else next_id
206+
)
207+
if not messages:
208+
return ()
209+
return ((stream_name, messages),)
210+
211+
return xautoclaim()
212+
167213
await super().start(read)
168214

169215
@override
@@ -175,17 +221,43 @@ async def get_one(
175221
assert not self.calls, (
176222
"You can't use `get_one` method if subscriber has registered handlers."
177223
)
224+
if self.min_idle_time is None:
225+
if self.stream_sub.group and self.stream_sub.consumer:
226+
stream_message = await self._client.xreadgroup(
227+
groupname=self.stream_sub.group,
228+
consumername=self.stream_sub.consumer,
229+
streams={self.stream_sub.name: self.last_id},
230+
block=math.ceil(timeout * 1000),
231+
count=1,
232+
)
233+
else:
234+
stream_message = await self._client.xread(
235+
{self.stream_sub.name: self.last_id},
236+
block=math.ceil(timeout * 1000),
237+
count=1,
238+
)
239+
if not stream_message:
240+
return None
178241

179-
stream_message = await self._client.xread(
180-
{self.stream_sub.name: self.last_id},
181-
block=math.ceil(timeout * 1000),
182-
count=1,
183-
)
184-
185-
if not stream_message:
186-
return None
187-
188-
((stream_name, ((message_id, raw_message),)),) = stream_message
242+
((stream_name, ((message_id, raw_message),)),) = stream_message
243+
else:
244+
stream_message = await self._client.xautoclaim(
245+
name=self.stream_sub.name,
246+
groupname=self.stream_sub.group,
247+
consumername=self.stream_sub.consumer,
248+
min_idle_time=self.min_idle_time,
249+
start_id=self.autoclaim_start_id,
250+
count=1,
251+
)
252+
(next_id, messages, _) = stream_message
253+
# Update start_id for next call
254+
self.autoclaim_start_id = (
255+
next_id.decode() if isinstance(next_id, bytes) else next_id
256+
)
257+
if not messages:
258+
return None
259+
stream_name = self.stream_sub.name.encode()
260+
((message_id, raw_message),) = messages
189261

190262
self.last_id = message_id.decode()
191263

@@ -221,16 +293,43 @@ async def __aiter__(self) -> AsyncIterator["RedisStreamMessage"]: # type: ignor
221293
async_parser, async_decoder = self._get_parser_and_decoder()
222294

223295
while True:
224-
stream_message = await self._client.xread(
225-
{self.stream_sub.name: self.last_id},
226-
block=math.ceil(timeout * 1000),
227-
count=1,
228-
)
229-
230-
if not stream_message:
231-
continue
232-
233-
((stream_name, ((message_id, raw_message),)),) = stream_message
296+
if self.min_idle_time is None:
297+
if self.stream_sub.group and self.stream_sub.consumer:
298+
stream_message = await self._client.xreadgroup(
299+
groupname=self.stream_sub.group,
300+
consumername=self.stream_sub.consumer,
301+
streams={self.stream_sub.name: self.last_id},
302+
block=math.ceil(timeout * 1000),
303+
count=1,
304+
)
305+
else:
306+
stream_message = await self._client.xread(
307+
{self.stream_sub.name: self.last_id},
308+
block=math.ceil(timeout * 1000),
309+
count=1,
310+
)
311+
if not stream_message:
312+
continue
313+
314+
((stream_name, ((message_id, raw_message),)),) = stream_message
315+
else:
316+
stream_message = await self._client.xautoclaim(
317+
name=self.stream_sub.name,
318+
groupname=self.stream_sub.group,
319+
consumername=self.stream_sub.consumer,
320+
min_idle_time=self.min_idle_time,
321+
start_id=self.autoclaim_start_id,
322+
count=1,
323+
)
324+
(next_id, messages, _) = stream_message
325+
# Update start_id for next call
326+
self.autoclaim_start_id = (
327+
next_id.decode() if isinstance(next_id, bytes) else next_id
328+
)
329+
if not messages:
330+
continue
331+
stream_name = self.stream_sub.name.encode()
332+
((message_id, raw_message),) = messages
234333

235334
self.last_id = message_id.decode()
236335

tests/brokers/redis/test_config.py

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -48,35 +48,31 @@ def test_stream_with_group() -> None:
4848

4949

5050
@pytest.mark.redis()
51-
def test_custom_ack() -> None:
51+
def test_stream_with_group_and_min_idle_time() -> None:
5252
config = RedisSubscriberConfig(
5353
_outer_config=MagicMock(),
5454
stream_sub=StreamSub(
5555
"test_stream",
5656
group="test_group",
5757
consumer="test_consumer",
58+
min_idle_time=1000,
5859
),
59-
_ack_policy=AckPolicy.ACK,
6060
)
61-
assert config.ack_policy is AckPolicy.ACK
61+
assert config.ack_policy is AckPolicy.REJECT_ON_ERROR
6262

6363

6464
@pytest.mark.redis()
65-
def test_stream_sub_with_no_ack_group() -> None:
66-
with pytest.warns(
67-
RuntimeWarning,
68-
match="`no_ack` is not supported by consumer group with last_id other than `>`",
69-
):
70-
config = RedisSubscriberConfig(
71-
_outer_config=MagicMock(),
72-
stream_sub=StreamSub(
73-
"test_stream",
74-
group="test_group",
75-
consumer="test_consumer",
76-
no_ack=True,
77-
),
78-
)
79-
assert config.ack_policy is AckPolicy.MANUAL
65+
def test_custom_ack() -> None:
66+
config = RedisSubscriberConfig(
67+
_outer_config=MagicMock(),
68+
stream_sub=StreamSub(
69+
"test_stream",
70+
group="test_group",
71+
consumer="test_consumer",
72+
),
73+
_ack_policy=AckPolicy.ACK,
74+
)
75+
assert config.ack_policy is AckPolicy.ACK
8076

8177

8278
@pytest.mark.redis()

0 commit comments

Comments
 (0)