From 63b50ee9769add77fdbdd7e218a42b5affb58e5f Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Fri, 4 Jul 2025 14:10:57 +0200 Subject: [PATCH 1/9] Heartbeat --- quixstreams/app.py | 58 +++++++++++++++++++ quixstreams/core/stream/functions/__init__.py | 1 + .../core/stream/functions/heartbeat.py | 33 +++++++++++ quixstreams/core/stream/stream.py | 5 ++ quixstreams/dataframe/windows/base.py | 4 ++ quixstreams/models/messagecontext.py | 7 +++ 6 files changed, 108 insertions(+) create mode 100644 quixstreams/core/stream/functions/heartbeat.py diff --git a/quixstreams/app.py b/quixstreams/app.py index 58c69e284..3fdbf1c49 100644 --- a/quixstreams/app.py +++ b/quixstreams/app.py @@ -5,6 +5,7 @@ import time import warnings from collections import defaultdict +from datetime import datetime from pathlib import Path from typing import Callable, List, Literal, Optional, Protocol, Tuple, Type, Union @@ -28,6 +29,8 @@ from .logging import LogLevel, configure_logging from .models import ( DeserializerType, + MessageContext, + Row, SerializerType, TimestampExtractor, Topic, @@ -151,6 +154,7 @@ def __init__( topic_create_timeout: float = 60, processing_guarantee: ProcessingGuarantee = "at-least-once", max_partition_buffer_size: int = 10000, + heartbeat_interval: float = 0.0, ): """ :param broker_address: Connection settings for Kafka. @@ -220,6 +224,11 @@ def __init__( It is a soft limit, and the actual number of buffered messages can be up to x2 higher. Lower value decreases the memory use, but increases the latency. Default - `10000`. + :param heartbeat_interval: the interval (seconds) at which to send heartbeat messages. + The heartbeat timing starts counting from application start. + The heartbeat is sent for every partition on every topic. + If the value is 0, no heartbeat messages will be sent. + Default - `0.0`.

***Error Handlers***
To handle errors, `Application` accepts callbacks triggered when @@ -363,6 +372,10 @@ def __init__( recovery_manager=recovery_manager, ) + self._heartbeat_active = heartbeat_interval > 0 + self._heartbeat_interval = heartbeat_interval + self._heartbeat_last_sent = datetime.now().timestamp() + self._source_manager = SourceManager() self._sink_manager = SinkManager() self._dataframe_registry = DataFrameRegistry() @@ -879,6 +892,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None): processing_context = self._processing_context source_manager = self._source_manager process_message = self._process_message + process_heartbeat = self._process_heartbeat printer = self._processing_context.printer run_tracker = self._run_tracker consumer = self._consumer @@ -902,6 +916,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None): run_tracker.timeout_refresh() else: process_message(dataframes_composed) + process_heartbeat(dataframes_composed) processing_context.commit_checkpoint() consumer.resume_backpressured() source_manager.raise_for_error() @@ -985,6 +1000,49 @@ def _process_message(self, dataframe_composed): if self._on_message_processed is not None: self._on_message_processed(topic_name, partition, offset) + def _process_heartbeat(self, dataframe_composed): + if not self._heartbeat_active: + return + + now = datetime.now().timestamp() + if self._heartbeat_last_sent > now - self._heartbeat_interval: + return + + value, key, timestamp, headers = None, None, int(now * 1000), {} + non_changelog_topics = self._topic_manager.non_changelog_topics + + for tp in self._consumer.assignment(): + if (topic := tp.topic) in non_changelog_topics: + row = Row( + value=value, + key=key, + timestamp=timestamp, + context=MessageContext( + topic=topic, + partition=tp.partition, + offset=-1, + size=-1, + heartbeat=True, + ), + headers=headers, + ) + context = copy_context() + context.run(set_message_context, row.context) + try: + context.run( + dataframe_composed[topic], + value, + key, + timestamp, + headers, + ) + except Exception as exc: + to_suppress = self._on_processing_error(exc, row, logger) + if not to_suppress: + raise + + self._heartbeat_last_sent = now + def _on_assign(self, _, topic_partitions: List[TopicPartition]): """ Assign new topic partitions to consumer and state. diff --git a/quixstreams/core/stream/functions/__init__.py b/quixstreams/core/stream/functions/__init__.py index 1561500e3..edaf9240b 100644 --- a/quixstreams/core/stream/functions/__init__.py +++ b/quixstreams/core/stream/functions/__init__.py @@ -5,3 +5,4 @@ from .transform import * from .types import * from .update import * +from .heartbeat import * \ No newline at end of file diff --git a/quixstreams/core/stream/functions/heartbeat.py b/quixstreams/core/stream/functions/heartbeat.py new file mode 100644 index 000000000..8f09c18ef --- /dev/null +++ b/quixstreams/core/stream/functions/heartbeat.py @@ -0,0 +1,33 @@ +from typing import Any, Literal, Union, cast, overload + +from quixstreams.context import message_context + +from .base import StreamFunction +from .types import VoidExecutor + +__all__ = ("HeartbeatFunction", "is_heartbeat_message") + + +class HeartbeatFunction(StreamFunction): + def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor: + child_executor = self._resolve_branching(*child_executors) + + func = self.func + + def wrapper( + value: Any, + key: Any, + timestamp: int, + headers: Any, + ): + if is_heartbeat_message(key, value) and (result := func(timestamp)): + for new_value, new_key, new_timestamp, new_headers in result: + child_executor(new_value, new_key, new_timestamp, new_headers) + + child_executor(value, key, timestamp, headers) + + return wrapper + + +def is_heartbeat_message(key: Any, value: Any) -> bool: + return message_context().heartbeat and key is None and value is None diff --git a/quixstreams/core/stream/stream.py b/quixstreams/core/stream/stream.py index f538f5307..115d66235 100644 --- a/quixstreams/core/stream/stream.py +++ b/quixstreams/core/stream/stream.py @@ -25,6 +25,7 @@ FilterFunction, FilterWithMetadataCallback, FilterWithMetadataFunction, + HeartbeatFunction, ReturningExecutor, StreamFunction, TransformCallback, @@ -280,6 +281,10 @@ def add_transform( """ return self._add(TransformFunction(func, expand=expand)) # type: ignore[call-overload] + def add_heartbeat(self, func) -> "Stream": + heartbeat_func = HeartbeatFunction(func) + return self._add(heartbeat_func) + def merge(self, other: "Stream") -> "Stream": """ Merge two Streams together and return a new Stream with two parents diff --git a/quixstreams/dataframe/windows/base.py b/quixstreams/dataframe/windows/base.py index 8040b2774..5bcf09360 100644 --- a/quixstreams/dataframe/windows/base.py +++ b/quixstreams/dataframe/windows/base.py @@ -401,6 +401,7 @@ def _as_windowed( processing_context: "ProcessingContext", store_name: str, stream_id: str, + heartbeat_func, ) -> TransformExpandedCallback: @functools.wraps(func) def wrapper( @@ -413,6 +414,9 @@ def wrapper( stream_id=stream_id, partition=ctx.partition, store_name=store_name ), ) + if is_heartbeat_message(key, value): + return heartbeat_func(timestamp, transaction) + if key is None: logger.warning( f"Skipping window processing for a message because the key is None, " diff --git a/quixstreams/models/messagecontext.py b/quixstreams/models/messagecontext.py index 351fe9157..a41c011bf 100644 --- a/quixstreams/models/messagecontext.py +++ b/quixstreams/models/messagecontext.py @@ -16,6 +16,7 @@ class MessageContext: "_size", "_headers", "_leader_epoch", + "_heartbeat", ) def __init__( @@ -25,12 +26,14 @@ def __init__( offset: int, size: int, leader_epoch: Optional[int] = None, + heartbeat: bool = False, ): self._topic = topic self._partition = partition self._offset = offset self._size = size self._leader_epoch = leader_epoch + self._heartbeat = heartbeat @property def topic(self) -> str: @@ -51,3 +54,7 @@ def size(self) -> int: @property def leader_epoch(self) -> Optional[int]: return self._leader_epoch + + @property + def heartbeat(self) -> bool: + return self._heartbeat From 2bf9fb5bf5d685f1bdbcc1fa8885e69b3ac9cdd1 Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Fri, 4 Jul 2025 14:16:08 +0200 Subject: [PATCH 2/9] Ignore heartbeat --- quixstreams/core/stream/functions/apply.py | 5 ++-- .../core/stream/functions/heartbeat.py | 25 ++++++++++++++++--- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/quixstreams/core/stream/functions/apply.py b/quixstreams/core/stream/functions/apply.py index bdf493953..c1df1d873 100644 --- a/quixstreams/core/stream/functions/apply.py +++ b/quixstreams/core/stream/functions/apply.py @@ -1,6 +1,7 @@ from typing import Any, Literal, Union, overload from .base import StreamFunction +from .heartbeat import ignore_heartbeat from .types import ( ApplyCallback, ApplyExpandedCallback, @@ -66,7 +67,7 @@ def wrapper( result = func(value) child_executor(result, key, timestamp, headers) - return wrapper + return ignore_heartbeat(wrapper) class ApplyWithMetadataFunction(StreamFunction): @@ -128,4 +129,4 @@ def wrapper( result = func(value, key, timestamp, headers) child_executor(result, key, timestamp, headers) - return wrapper + return ignore_heartbeat(wrapper) diff --git a/quixstreams/core/stream/functions/heartbeat.py b/quixstreams/core/stream/functions/heartbeat.py index 8f09c18ef..04544cfd2 100644 --- a/quixstreams/core/stream/functions/heartbeat.py +++ b/quixstreams/core/stream/functions/heartbeat.py @@ -1,11 +1,12 @@ -from typing import Any, Literal, Union, cast, overload +from functools import wraps +from typing import Any from quixstreams.context import message_context from .base import StreamFunction from .types import VoidExecutor -__all__ = ("HeartbeatFunction", "is_heartbeat_message") +__all__ = ("HeartbeatFunction", "is_heartbeat_message", "ignore_heartbeat") class HeartbeatFunction(StreamFunction): @@ -20,7 +21,7 @@ def wrapper( timestamp: int, headers: Any, ): - if is_heartbeat_message(key, value) and (result := func(timestamp)): + if is_heartbeat_message(key, value) and (result := func(timestamp)): for new_value, new_key, new_timestamp, new_headers in result: child_executor(new_value, new_key, new_timestamp, new_headers) @@ -31,3 +32,21 @@ def wrapper( def is_heartbeat_message(key: Any, value: Any) -> bool: return message_context().heartbeat and key is None and value is None + + +def ignore_heartbeat(func): + """ + Decorator that wraps a function to return early if the message is a heartbeat. + + The decorated function should expect (value, key, timestamp, headers) parameters. + If is_heartbeat_message(key, value) returns True, the function returns early + without executing the wrapped function. + """ + + @wraps(func) + def wrapper(value: Any, key: Any, timestamp: int, headers: Any): + if is_heartbeat_message(key, value): + return + return func(value, key, timestamp, headers) + + return wrapper From 06d48cb248d418134588f5b5e67c13cec7b62ef5 Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Fri, 4 Jul 2025 14:17:53 +0200 Subject: [PATCH 3/9] Heartbeat on SDF --- quixstreams/dataframe/dataframe.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/quixstreams/dataframe/dataframe.py b/quixstreams/dataframe/dataframe.py index 7e9b09844..fe85f11ab 100644 --- a/quixstreams/dataframe/dataframe.py +++ b/quixstreams/dataframe/dataframe.py @@ -191,6 +191,10 @@ def stream_id(self) -> str: def topics(self) -> tuple[Topic, ...]: return self._topics + def heartbeat(self, func) -> "StreamingDataFrame": + stream = self.stream.add_heartbeat(func) + return self.__dataframe_clone__(stream=stream) + @overload def apply( self, From 356c736bca859a81e13ee1aed69595962c9f9991 Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Mon, 7 Jul 2025 10:40:11 +0200 Subject: [PATCH 4/9] Correct heartbeat ignore --- quixstreams/core/stream/functions/__init__.py | 2 +- quixstreams/core/stream/functions/apply.py | 30 +++++++++++----- quixstreams/core/stream/functions/filter.py | 6 ++-- .../core/stream/functions/heartbeat.py | 34 +++++-------------- .../core/stream/functions/transform.py | 25 +++++++++----- quixstreams/core/stream/functions/types.py | 9 +++++ quixstreams/core/stream/functions/update.py | 8 +++-- quixstreams/dataframe/windows/base.py | 16 ++++++++- 8 files changed, 82 insertions(+), 48 deletions(-) diff --git a/quixstreams/core/stream/functions/__init__.py b/quixstreams/core/stream/functions/__init__.py index edaf9240b..1be9fd6a0 100644 --- a/quixstreams/core/stream/functions/__init__.py +++ b/quixstreams/core/stream/functions/__init__.py @@ -2,7 +2,7 @@ from .apply import * from .base import * from .filter import * +from .heartbeat import * from .transform import * from .types import * from .update import * -from .heartbeat import * \ No newline at end of file diff --git a/quixstreams/core/stream/functions/apply.py b/quixstreams/core/stream/functions/apply.py index c1df1d873..24f6fd92d 100644 --- a/quixstreams/core/stream/functions/apply.py +++ b/quixstreams/core/stream/functions/apply.py @@ -1,7 +1,7 @@ from typing import Any, Literal, Union, overload from .base import StreamFunction -from .heartbeat import ignore_heartbeat +from .heartbeat import is_heartbeat_message from .types import ( ApplyCallback, ApplyExpandedCallback, @@ -49,6 +49,10 @@ def wrapper( timestamp: int, headers: Any, ) -> None: + # Pass heartbeat messages downstream + if is_heartbeat_message(key, value): + child_executor(value, key, timestamp, headers) + # Execute a function on a single value and wrap results into a list # to expand them downstream result = func(value) @@ -63,11 +67,13 @@ def wrapper( timestamp: int, headers: Any, ) -> None: - # Execute a function on a single value and return its result - result = func(value) - child_executor(result, key, timestamp, headers) + # Pass heartbeat messages downstream or execute + # a function on a single value and return its result + if not is_heartbeat_message(key, value): + value = func(value) + child_executor(value, key, timestamp, headers) - return ignore_heartbeat(wrapper) + return wrapper class ApplyWithMetadataFunction(StreamFunction): @@ -111,6 +117,10 @@ def wrapper( timestamp: int, headers: Any, ): + # Pass heartbeat messages downstream + if is_heartbeat_message(key, value): + child_executor(value, key, timestamp, headers) + # Execute a function on a single value and wrap results into a list # to expand them downstream result = func(value, key, timestamp, headers) @@ -125,8 +135,10 @@ def wrapper( timestamp: int, headers: Any, ): - # Execute a function on a single value and return its result - result = func(value, key, timestamp, headers) - child_executor(result, key, timestamp, headers) + # Pass heartbeat messages downstream or execute + # a function on a single value and return its result + if not is_heartbeat_message(key, value): + value = func(value, key, timestamp, headers) + child_executor(value, key, timestamp, headers) - return ignore_heartbeat(wrapper) + return wrapper diff --git a/quixstreams/core/stream/functions/filter.py b/quixstreams/core/stream/functions/filter.py index e291880c7..ed1bf6038 100644 --- a/quixstreams/core/stream/functions/filter.py +++ b/quixstreams/core/stream/functions/filter.py @@ -1,5 +1,7 @@ from typing import Any +from quixstreams.core.stream.functions.heartbeat import is_heartbeat_message + from .base import StreamFunction from .types import FilterCallback, FilterWithMetadataCallback, VoidExecutor @@ -30,7 +32,7 @@ def wrapper( headers: Any, ): # Filter a single value - if func(value): + if is_heartbeat_message(key, value) or func(value): child_executor(value, key, timestamp, headers) return wrapper @@ -62,7 +64,7 @@ def wrapper( headers: Any, ): # Filter a single value - if func(value, key, timestamp, headers): + if is_heartbeat_message(key, value) or func(value, key, timestamp, headers): child_executor(value, key, timestamp, headers) return wrapper diff --git a/quixstreams/core/stream/functions/heartbeat.py b/quixstreams/core/stream/functions/heartbeat.py index 04544cfd2..065f43e6d 100644 --- a/quixstreams/core/stream/functions/heartbeat.py +++ b/quixstreams/core/stream/functions/heartbeat.py @@ -1,15 +1,18 @@ -from functools import wraps from typing import Any from quixstreams.context import message_context from .base import StreamFunction -from .types import VoidExecutor +from .types import HeartbeatCallback, VoidExecutor -__all__ = ("HeartbeatFunction", "is_heartbeat_message", "ignore_heartbeat") +__all__ = ("HeartbeatFunction", "is_heartbeat_message") class HeartbeatFunction(StreamFunction): + def __init__(self, func: HeartbeatCallback) -> None: + super().__init__(func) + self.func: HeartbeatCallback + def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor: child_executor = self._resolve_branching(*child_executors) @@ -21,10 +24,9 @@ def wrapper( timestamp: int, headers: Any, ): - if is_heartbeat_message(key, value) and (result := func(timestamp)): - for new_value, new_key, new_timestamp, new_headers in result: - child_executor(new_value, new_key, new_timestamp, new_headers) - + if is_heartbeat_message(key, value): + # TODO: Heartbeats may return values (like expired windows) + func(timestamp) child_executor(value, key, timestamp, headers) return wrapper @@ -32,21 +34,3 @@ def wrapper( def is_heartbeat_message(key: Any, value: Any) -> bool: return message_context().heartbeat and key is None and value is None - - -def ignore_heartbeat(func): - """ - Decorator that wraps a function to return early if the message is a heartbeat. - - The decorated function should expect (value, key, timestamp, headers) parameters. - If is_heartbeat_message(key, value) returns True, the function returns early - without executing the wrapped function. - """ - - @wraps(func) - def wrapper(value: Any, key: Any, timestamp: int, headers: Any): - if is_heartbeat_message(key, value): - return - return func(value, key, timestamp, headers) - - return wrapper diff --git a/quixstreams/core/stream/functions/transform.py b/quixstreams/core/stream/functions/transform.py index 219662b6b..2ca23718a 100644 --- a/quixstreams/core/stream/functions/transform.py +++ b/quixstreams/core/stream/functions/transform.py @@ -1,5 +1,7 @@ from typing import Any, Literal, Union, cast, overload +from quixstreams.core.stream.functions.heartbeat import is_heartbeat_message + from .base import StreamFunction from .types import TransformCallback, TransformExpandedCallback, VoidExecutor @@ -53,9 +55,13 @@ def wrapper( timestamp: int, headers: Any, ): - result = expanded_func(value, key, timestamp, headers) - for new_value, new_key, new_timestamp, new_headers in result: - child_executor(new_value, new_key, new_timestamp, new_headers) + # Pass heartbeat messages downstream + if is_heartbeat_message(key, value): + child_executor(value, key, timestamp, headers) + else: + result = expanded_func(value, key, timestamp, headers) + for new_value, new_key, new_timestamp, new_headers in result: + child_executor(new_value, new_key, new_timestamp, new_headers) else: func = cast(TransformCallback, self.func) @@ -66,10 +72,13 @@ def wrapper( timestamp: int, headers: Any, ): - # Execute a function on a single value and return its result - new_value, new_key, new_timestamp, new_headers = func( - value, key, timestamp, headers - ) - child_executor(new_value, new_key, new_timestamp, new_headers) + # Pass heartbeat messages downstream or execute + # a function on a single value and return its result + if not is_heartbeat_message(key, value): + value, key, timestamp, headers = func( + value, key, timestamp, headers + ) + + child_executor(value, key, timestamp, headers) return wrapper diff --git a/quixstreams/core/stream/functions/types.py b/quixstreams/core/stream/functions/types.py index 504299b53..ba9239591 100644 --- a/quixstreams/core/stream/functions/types.py +++ b/quixstreams/core/stream/functions/types.py @@ -36,6 +36,15 @@ def __bool__(self) -> bool: ... [Any, Any, int, Any], Iterable[Tuple[Any, Any, int, Any]] ] +HeartbeatCallback = Callable[ + [int], # timestamp + Union[ + None, + Tuple[Any, Any, int, Any], # single value + Iterable[Tuple[Any, Any, int, Any]], # expanded values + ], +] + StreamCallback = Union[ ApplyCallback, ApplyExpandedCallback, diff --git a/quixstreams/core/stream/functions/update.py b/quixstreams/core/stream/functions/update.py index b2d9a19bc..18b7928aa 100644 --- a/quixstreams/core/stream/functions/update.py +++ b/quixstreams/core/stream/functions/update.py @@ -1,5 +1,7 @@ from typing import Any +from quixstreams.core.stream.functions.heartbeat import is_heartbeat_message + from .base import StreamFunction from .types import UpdateCallback, UpdateWithMetadataCallback, VoidExecutor @@ -28,7 +30,8 @@ def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor: def wrapper(value: Any, key: Any, timestamp: int, headers: Any): # Update a single value and forward it - func(value) + if not is_heartbeat_message(key, value): + func(value) child_executor(value, key, timestamp, headers) return wrapper @@ -56,7 +59,8 @@ def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor: def wrapper(value: Any, key: Any, timestamp: int, headers: Any): # Update a single value and forward it - func(value, key, timestamp, headers) + if not is_heartbeat_message(key, value): + func(value, key, timestamp, headers) child_executor(value, key, timestamp, headers) return wrapper diff --git a/quixstreams/dataframe/windows/base.py b/quixstreams/dataframe/windows/base.py index 5bcf09360..147c35190 100644 --- a/quixstreams/dataframe/windows/base.py +++ b/quixstreams/dataframe/windows/base.py @@ -19,6 +19,7 @@ from quixstreams.context import message_context from quixstreams.core.stream import TransformExpandedCallback from quixstreams.core.stream.exceptions import InvalidOperation +from quixstreams.core.stream.functions.heartbeat import is_heartbeat_message from quixstreams.models.topics.manager import TopicManager from quixstreams.state import WindowedPartitionTransaction @@ -83,6 +84,7 @@ def _apply_window( self, func: TransformRecordCallbackExpandedWindowed, name: str, + heartbeat_func, ) -> "StreamingDataFrame": self.register_store() @@ -91,6 +93,7 @@ def _apply_window( stream_id=self._dataframe.stream_id, processing_context=self._dataframe.processing_context, store_name=name, + heartbeat_func=heartbeat_func, ) # Manually modify the Stream and clone the source StreamingDataFrame # to avoid adding "transform" API to it. @@ -140,9 +143,13 @@ def window_callback( for key, window in expired_windows: yield (window, key, window["start"], None) + def heartbeat_callback(timestamp: int) -> Iterable[Message]: + return [] + return self._apply_window( func=window_callback, name=self._name, + heartbeat_func=heartbeat_callback, ) def current(self) -> "StreamingDataFrame": @@ -188,7 +195,14 @@ def window_callback( for key, window in updated_windows: yield (window, key, window["start"], None) - return self._apply_window(func=window_callback, name=self._name) + def heartbeat_callback(timestamp: int) -> Iterable[Message]: + return [] + + return self._apply_window( + func=window_callback, + name=self._name, + heartbeat_func=heartbeat_callback, + ) # Implemented by SingleAggregationWindowMixin and MultiAggregationWindowMixin # Single aggregation and multi aggregation windows store aggregations and collections From f11415471769fe8cb367d5313778b449d40fecb9 Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Mon, 7 Jul 2025 11:45:43 +0200 Subject: [PATCH 5/9] HeartbeatFunction handles None or an iterable of new values --- quixstreams/core/stream/functions/heartbeat.py | 6 +++--- quixstreams/core/stream/functions/types.py | 11 ++--------- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/quixstreams/core/stream/functions/heartbeat.py b/quixstreams/core/stream/functions/heartbeat.py index 065f43e6d..589745aa2 100644 --- a/quixstreams/core/stream/functions/heartbeat.py +++ b/quixstreams/core/stream/functions/heartbeat.py @@ -24,9 +24,9 @@ def wrapper( timestamp: int, headers: Any, ): - if is_heartbeat_message(key, value): - # TODO: Heartbeats may return values (like expired windows) - func(timestamp) + if is_heartbeat_message(key, value) and (result := func(timestamp)): + for new_value, new_key, new_timestamp, new_headers in result: + child_executor(new_value, new_key, new_timestamp, new_headers) child_executor(value, key, timestamp, headers) return wrapper diff --git a/quixstreams/core/stream/functions/types.py b/quixstreams/core/stream/functions/types.py index ba9239591..eceb8fdb3 100644 --- a/quixstreams/core/stream/functions/types.py +++ b/quixstreams/core/stream/functions/types.py @@ -1,4 +1,4 @@ -from typing import Any, Callable, Iterable, Protocol, Tuple, Union +from typing import Any, Callable, Iterable, Optional, Protocol, Tuple, Union __all__ = ( "StreamCallback", @@ -36,14 +36,7 @@ def __bool__(self) -> bool: ... [Any, Any, int, Any], Iterable[Tuple[Any, Any, int, Any]] ] -HeartbeatCallback = Callable[ - [int], # timestamp - Union[ - None, - Tuple[Any, Any, int, Any], # single value - Iterable[Tuple[Any, Any, int, Any]], # expanded values - ], -] +HeartbeatCallback = Callable[[int], Optional[Iterable[Tuple[Any, Any, int, Any]]]] StreamCallback = Union[ ApplyCallback, From 9b40b99c1296cb2438b572990823c34d4f538864 Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Mon, 7 Jul 2025 12:59:55 +0200 Subject: [PATCH 6/9] Add TODO for placeholders --- quixstreams/dataframe/windows/base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/quixstreams/dataframe/windows/base.py b/quixstreams/dataframe/windows/base.py index 147c35190..9947de362 100644 --- a/quixstreams/dataframe/windows/base.py +++ b/quixstreams/dataframe/windows/base.py @@ -144,6 +144,7 @@ def window_callback( yield (window, key, window["start"], None) def heartbeat_callback(timestamp: int) -> Iterable[Message]: + # TODO: Implement heartbeat callback return [] return self._apply_window( @@ -196,6 +197,7 @@ def window_callback( yield (window, key, window["start"], None) def heartbeat_callback(timestamp: int) -> Iterable[Message]: + # TODO: Implement heartbeat callback return [] return self._apply_window( From 8cd4f82b29d32864239812ab828a0012c2990295 Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Mon, 7 Jul 2025 13:27:54 +0200 Subject: [PATCH 7/9] Correct heartbeat handling in windowing --- quixstreams/dataframe/windows/base.py | 33 ++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/quixstreams/dataframe/windows/base.py b/quixstreams/dataframe/windows/base.py index 9947de362..a0d90dee3 100644 --- a/quixstreams/dataframe/windows/base.py +++ b/quixstreams/dataframe/windows/base.py @@ -19,7 +19,6 @@ from quixstreams.context import message_context from quixstreams.core.stream import TransformExpandedCallback from quixstreams.core.stream.exceptions import InvalidOperation -from quixstreams.core.stream.functions.heartbeat import is_heartbeat_message from quixstreams.models.topics.manager import TopicManager from quixstreams.state import WindowedPartitionTransaction @@ -93,13 +92,19 @@ def _apply_window( stream_id=self._dataframe.stream_id, processing_context=self._dataframe.processing_context, store_name=name, - heartbeat_func=heartbeat_func, + ) + heartbeat_func = _as_heartbeat( + func=heartbeat_func, + stream_id=self._dataframe.stream_id, + processing_context=self._dataframe.processing_context, + store_name=name, ) # Manually modify the Stream and clone the source StreamingDataFrame # to avoid adding "transform" API to it. # Transform callbacks can modify record key and timestamp, # and it's prone to misuse. stream = self._dataframe.stream.add_transform(func=windowed_func, expand=True) + stream = stream.add_heartbeat(func=heartbeat_func) return self._dataframe.__dataframe_clone__(stream=stream) def final(self) -> "StreamingDataFrame": @@ -417,7 +422,6 @@ def _as_windowed( processing_context: "ProcessingContext", store_name: str, stream_id: str, - heartbeat_func, ) -> TransformExpandedCallback: @functools.wraps(func) def wrapper( @@ -430,9 +434,6 @@ def wrapper( stream_id=stream_id, partition=ctx.partition, store_name=store_name ), ) - if is_heartbeat_message(key, value): - return heartbeat_func(timestamp, transaction) - if key is None: logger.warning( f"Skipping window processing for a message because the key is None, " @@ -444,6 +445,26 @@ def wrapper( return wrapper +def _as_heartbeat( + func, # TODO: typing? + processing_context: "ProcessingContext", + store_name: str, + stream_id: str, +): # TODO: typing? + @functools.wraps(func) + def wrapper(timestamp: int) -> Iterable[Message]: + ctx = message_context() + transaction = cast( + WindowedPartitionTransaction, + processing_context.checkpoint.get_store_transaction( + stream_id=stream_id, partition=ctx.partition, store_name=store_name + ), + ) + return func(timestamp, transaction) + + return wrapper + + class WindowOnLateCallback(Protocol): def __call__( self, From 2cdcf35c0d3617d3fa502155812a066024f275d5 Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Mon, 7 Jul 2025 14:20:44 +0200 Subject: [PATCH 8/9] Correct callbacks --- quixstreams/dataframe/windows/base.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/quixstreams/dataframe/windows/base.py b/quixstreams/dataframe/windows/base.py index a0d90dee3..5847f99d2 100644 --- a/quixstreams/dataframe/windows/base.py +++ b/quixstreams/dataframe/windows/base.py @@ -148,7 +148,9 @@ def window_callback( for key, window in expired_windows: yield (window, key, window["start"], None) - def heartbeat_callback(timestamp: int) -> Iterable[Message]: + def heartbeat_callback( + timestamp: int, transaction: WindowedPartitionTransaction + ) -> Iterable[Message]: # TODO: Implement heartbeat callback return [] @@ -201,7 +203,9 @@ def window_callback( for key, window in updated_windows: yield (window, key, window["start"], None) - def heartbeat_callback(timestamp: int) -> Iterable[Message]: + def heartbeat_callback( + timestamp: int, transaction: WindowedPartitionTransaction + ) -> Iterable[Message]: # TODO: Implement heartbeat callback return [] From d980bc8d2cbabc68e8497c4dfd71fab5e627acad Mon Sep 17 00:00:00 2001 From: Remy Gwaramadze Date: Mon, 7 Jul 2025 15:15:15 +0200 Subject: [PATCH 9/9] PoC: wallclock window expiration --- quixstreams/dataframe/windows/base.py | 13 +++++++++++-- quixstreams/dataframe/windows/count_based.py | 8 ++++++++ quixstreams/dataframe/windows/time_based.py | 17 +++++++++++++++++ .../state/rocksdb/windowed/transaction.py | 10 +++++++--- quixstreams/state/types.py | 2 ++ 5 files changed, 45 insertions(+), 5 deletions(-) diff --git a/quixstreams/dataframe/windows/base.py b/quixstreams/dataframe/windows/base.py index 5847f99d2..4f326306d 100644 --- a/quixstreams/dataframe/windows/base.py +++ b/quixstreams/dataframe/windows/base.py @@ -69,6 +69,14 @@ def process_window( ) -> tuple[Iterable[WindowKeyResult], Iterable[WindowKeyResult]]: pass + @abstractmethod + def process_heartbeat( + self, + timestamp_ms: int, + transaction: WindowedPartitionTransaction, + ) -> Iterable[WindowKeyResult]: + pass + def register_store(self) -> None: TopicManager.ensure_topics_copartitioned(*self._dataframe.topics) # Create a config for the changelog topic based on the underlying SDF topics @@ -151,8 +159,9 @@ def window_callback( def heartbeat_callback( timestamp: int, transaction: WindowedPartitionTransaction ) -> Iterable[Message]: - # TODO: Implement heartbeat callback - return [] + # TODO: Check if this will work for sliding windows + for key, window in self.process_heartbeat(timestamp, transaction): + yield (window, key, window["start"], None) return self._apply_window( func=window_callback, diff --git a/quixstreams/dataframe/windows/count_based.py b/quixstreams/dataframe/windows/count_based.py index 57c6b36e5..649ee0ee0 100644 --- a/quixstreams/dataframe/windows/count_based.py +++ b/quixstreams/dataframe/windows/count_based.py @@ -189,6 +189,14 @@ def process_window( state.set(key=self.STATE_KEY, value=data) return updated_windows, expired_windows + def process_heartbeat( + self, + timestamp_ms: int, + transaction: WindowedPartitionTransaction, + ) -> Iterable[WindowKeyResult]: + # Count based windows cannot be expired by heartbeat + return [] + def _get_collection_start_id(self, window: CountWindowData) -> int: start_id = window.get("collection_start_id", _MISSING) if start_id is _MISSING: diff --git a/quixstreams/dataframe/windows/time_based.py b/quixstreams/dataframe/windows/time_based.py index c403cfdfa..171dae6d6 100644 --- a/quixstreams/dataframe/windows/time_based.py +++ b/quixstreams/dataframe/windows/time_based.py @@ -200,11 +200,27 @@ def process_window( return updated_windows, expired_windows + def process_heartbeat( + self, + timestamp_ms: int, + transaction: WindowedPartitionTransaction, + ) -> Iterable[WindowKeyResult]: + latest_expired_window_end = transaction.get_latest_expired(prefix=b"") + latest_timestamp = max(timestamp_ms, latest_expired_window_end) + max_expired_window_end = latest_timestamp - self._grace_ms + return self.expire_by_partition( + transaction, + max_expired_window_end, + self.collect, + advance_last_expired_timestamp=False, + ) + def expire_by_partition( self, transaction: WindowedPartitionTransaction, max_expired_end: int, collect: bool, + advance_last_expired_timestamp: bool = True, ) -> Iterable[WindowKeyResult]: for ( window_start, @@ -214,6 +230,7 @@ def expire_by_partition( step_ms=self._step_ms if self._step_ms else self._duration_ms, collect=collect, delete=True, + advance_last_expired_timestamp=advance_last_expired_timestamp, ): yield key, self._results(aggregated, collected, window_start, window_end) diff --git a/quixstreams/state/rocksdb/windowed/transaction.py b/quixstreams/state/rocksdb/windowed/transaction.py index 3779b3e29..dfa3fa12a 100644 --- a/quixstreams/state/rocksdb/windowed/transaction.py +++ b/quixstreams/state/rocksdb/windowed/transaction.py @@ -298,6 +298,7 @@ def expire_all_windows( step_ms: int, delete: bool = True, collect: bool = False, + advance_last_expired_timestamp: bool = True, ) -> Iterable[ExpiredWindowDetail]: """ Get all expired windows for all prefix from RocksDB up to the specified `max_end_time` timestamp. @@ -360,9 +361,12 @@ def expire_all_windows( if collect: self.delete_from_collection(end=start, prefix=prefix) - self._set_timestamp( - prefix=b"", cache=self._last_expired_timestamps, timestamp_ms=last_expired - ) + if advance_last_expired_timestamp: + self._set_timestamp( + prefix=b"", + cache=self._last_expired_timestamps, + timestamp_ms=last_expired, + ) def delete_windows( self, max_start_time: int, delete_values: bool, prefix: bytes diff --git a/quixstreams/state/types.py b/quixstreams/state/types.py index c80c9e2ad..9f07b9499 100644 --- a/quixstreams/state/types.py +++ b/quixstreams/state/types.py @@ -378,6 +378,7 @@ def expire_all_windows( step_ms: int, delete: bool = True, collect: bool = False, + advance_last_expired_timestamp: bool = True, ) -> Iterable[ExpiredWindowDetail[V]]: """ Get all expired windows for all prefix from RocksDB up to the specified `max_start_time` timestamp. @@ -388,6 +389,7 @@ def expire_all_windows( :param max_end_time: The timestamp up to which windows are considered expired, inclusive. :param delete: If True, expired windows will be deleted. :param collect: If True, values will be collected into windows. + :param advance_last_expired_timestamp: If True, the last expired timestamp will be persisted. """ ...