Skip to content

Commit f114154

Browse files
committed
HeartbeatFunction handles None or an iterable of new values
1 parent 356c736 commit f114154

File tree

2 files changed

+5
-12
lines changed

2 files changed

+5
-12
lines changed

quixstreams/core/stream/functions/heartbeat.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ def wrapper(
2424
timestamp: int,
2525
headers: Any,
2626
):
27-
if is_heartbeat_message(key, value):
28-
# TODO: Heartbeats may return values (like expired windows)
29-
func(timestamp)
27+
if is_heartbeat_message(key, value) and (result := func(timestamp)):
28+
for new_value, new_key, new_timestamp, new_headers in result:
29+
child_executor(new_value, new_key, new_timestamp, new_headers)
3030
child_executor(value, key, timestamp, headers)
3131

3232
return wrapper

quixstreams/core/stream/functions/types.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any, Callable, Iterable, Protocol, Tuple, Union
1+
from typing import Any, Callable, Iterable, Optional, Protocol, Tuple, Union
22

33
__all__ = (
44
"StreamCallback",
@@ -36,14 +36,7 @@ def __bool__(self) -> bool: ...
3636
[Any, Any, int, Any], Iterable[Tuple[Any, Any, int, Any]]
3737
]
3838

39-
HeartbeatCallback = Callable[
40-
[int], # timestamp
41-
Union[
42-
None,
43-
Tuple[Any, Any, int, Any], # single value
44-
Iterable[Tuple[Any, Any, int, Any]], # expanded values
45-
],
46-
]
39+
HeartbeatCallback = Callable[[int], Optional[Iterable[Tuple[Any, Any, int, Any]]]]
4740

4841
StreamCallback = Union[
4942
ApplyCallback,

0 commit comments

Comments
 (0)