From dcfd74180fce9c4dbd10955d132bc87d6abeb42e Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Mon, 21 Jul 2025 19:13:29 +0200 Subject: [PATCH 1/2] Window.final(): remove parenthesis --- quixstreams/dataframe/windows/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quixstreams/dataframe/windows/base.py b/quixstreams/dataframe/windows/base.py index 8040b2774..ac70641d4 100644 --- a/quixstreams/dataframe/windows/base.py +++ b/quixstreams/dataframe/windows/base.py @@ -138,7 +138,7 @@ def window_callback( ) # Use window start timestamp as a new record timestamp for key, window in expired_windows: - yield (window, key, window["start"], None) + yield window, key, window["start"], None return self._apply_window( func=window_callback, From c422c078af5e9a355c1caf58817a9305640cba7f Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Tue, 22 Jul 2025 15:29:23 +0200 Subject: [PATCH 2/2] WIP: Window triggers --- quixstreams/dataframe/windows/time_based.py | 137 +++++++++++++++++- .../dataframe/windows/triggers/__init__.py | 2 + .../dataframe/windows/triggers/base.py | 71 +++++++++ quixstreams/state/base/state.py | 38 ++++- .../state/rocksdb/windowed/metadata.py | 2 + quixstreams/state/rocksdb/windowed/state.py | 18 +++ .../state/rocksdb/windowed/transaction.py | 14 ++ quixstreams/state/types.py | 16 ++ 8 files changed, 290 insertions(+), 8 deletions(-) create mode 100644 quixstreams/dataframe/windows/triggers/__init__.py create mode 100644 quixstreams/dataframe/windows/triggers/base.py diff --git a/quixstreams/dataframe/windows/time_based.py b/quixstreams/dataframe/windows/time_based.py index c403cfdfa..b6ca4654b 100644 --- a/quixstreams/dataframe/windows/time_based.py +++ b/quixstreams/dataframe/windows/time_based.py @@ -11,8 +11,10 @@ Window, WindowKeyResult, WindowOnLateCallback, + WindowResult, get_window_ranges, ) +from .triggers import TimeWindowTrigger, TriggerAction if TYPE_CHECKING: from quixstreams.dataframe.dataframe import StreamingDataFrame @@ -122,6 +124,26 @@ def current( self._closing_strategy = ClosingStrategy.new(closing_strategy) return super().current() + def trigger(self, trigger: TimeWindowTrigger) -> "StreamingDataFrame": + def window_callback( + value: Any, + key: Any, + timestamp_ms: int, + _headers: Any, + transaction: WindowedPartitionTransaction, + ) -> Iterable[tuple[WindowResult, Any, int, Any]]: + windows = self.process_window_new( + value=value, + key=key, + timestamp_ms=timestamp_ms, + transaction=transaction, + trigger=trigger, + ) + for key, window in windows: + yield window, key, window["start"], None + + return self._apply_window(func=window_callback, name=self._name) + def process_window( self, value: Any, @@ -200,6 +222,119 @@ def process_window( return updated_windows, expired_windows + def process_window_new( + self, + value: Any, + key: Any, + timestamp_ms: int, + transaction: WindowedPartitionTransaction, + trigger: TimeWindowTrigger, + ) -> Iterable[WindowKeyResult]: + window_state = transaction.as_state(prefix=key) + duration_ms = self._duration_ms + grace_ms = self._grace_ms + + collect = self.collect + aggregate = self.aggregate + + ranges = get_window_ranges( + timestamp_ms=timestamp_ms, + duration_ms=duration_ms, + step_ms=self._step_ms, + ) + + if self._closing_strategy == ClosingStrategy.PARTITION: + latest_expired_window_end = transaction.get_latest_expired(prefix=b"") + latest_timestamp = max(timestamp_ms, latest_expired_window_end) + else: + state_ts = window_state.get_latest_timestamp() or 0 + latest_timestamp = max(timestamp_ms, state_ts) + + max_expired_window_end = latest_timestamp - grace_ms + max_expired_window_start = max_expired_window_end - duration_ms + updated_windows: list[WindowKeyResult] = [] + for start, end in ranges: + if start <= max_expired_window_start: + late_by_ms = max_expired_window_end - timestamp_ms + self._on_expired_window( + value=value, + key=key, + start=start, + end=end, + timestamp_ms=timestamp_ms, + late_by_ms=late_by_ms, + ) + continue + + # When collecting values, we only mark the window existence with None + # since actual values are stored separately and combined into an array + # during window expiration. + aggregated = None + if aggregate: + current_window = window_state.get_window(start, end) + if current_window is None: + current_window = self._initialize_value() + + aggregated = self._aggregate_value(current_window, value, timestamp_ms) + + updated_window = self._results(aggregated, [], start, end) + + window_state.update_window( + start, end, value=aggregated, timestamp_ms=timestamp_ms + ) + + trigger_state = transaction.as_trigger_state( + prefix=window_state.prefix, start_ms=start, end_ms=end + ) + + if ( + action := trigger.on_window_updated( + value, start=start, end=end, state=trigger_state + ) + ) == TriggerAction.EMIT: + updated_windows.append((key, updated_window)) + elif action == TriggerAction.DELETE: + # # TODO: Bump the event time here. + # # TODO: For count-windows, you want to add an element first, and only then emit/delete it. + # # But for session-close events, do you want this? + window_state.delete_window(start, end) + elif action == TriggerAction.EMIT_AND_DELETE: + # TODO: Bump the event time here. + # TODO: Do not update windows which are supposed to be deleted + # TODO: The deleted window must be returned in the sorted way together with other expired windows. + updated_windows.append((key, updated_window)) + window_state.delete_window(start, end) + + if collect: + # TODO: Is it correct? Looks like ".collect()" may not skip the late messages + # TODO: Create windows when "collect" is used too + # TODO: maybe rethink how "collect" is handled to accommodate the triggers + window_state.add_to_collection( + value=self._collect_value(value), + id=timestamp_ms, + ) + + if self._closing_strategy == ClosingStrategy.PARTITION: + expired_windows = self.expire_by_partition( + transaction, max_expired_window_end, collect + ) + else: + expired_windows = self.expire_by_key( + key, window_state, max_expired_window_start, collect + ) + + for key, window in expired_windows: + # Ignore all other actions except "EMIT" and "EMIT_AND_DELETE" + # because the time has already moved beyond the window boundaries + if trigger.on_window_closed() in ( + TriggerAction.EMIT, + TriggerAction.EMIT_AND_DELETE, + ): + yield key, window + + for key, window in updated_windows: + yield key, window + def expire_by_partition( self, transaction: WindowedPartitionTransaction, @@ -231,7 +366,7 @@ def expire_by_key( max_start_time=max_expired_start, collect=collect, ): - yield (key, self._results(aggregated, collected, window_start, window_end)) + yield key, self._results(aggregated, collected, window_start, window_end) def _on_expired_window( self, diff --git a/quixstreams/dataframe/windows/triggers/__init__.py b/quixstreams/dataframe/windows/triggers/__init__.py new file mode 100644 index 000000000..6dc6a0833 --- /dev/null +++ b/quixstreams/dataframe/windows/triggers/__init__.py @@ -0,0 +1,2 @@ +from .base import TimeWindowTrigger as TimeWindowTrigger +from .base import TriggerAction as TriggerAction diff --git a/quixstreams/dataframe/windows/triggers/base.py b/quixstreams/dataframe/windows/triggers/base.py new file mode 100644 index 000000000..4aecd70d4 --- /dev/null +++ b/quixstreams/dataframe/windows/triggers/base.py @@ -0,0 +1,71 @@ +import enum +from abc import ABC, abstractmethod +from typing import Any + +from quixstreams.state.base.state import State + +__all__ = ("TriggerAction", "TimeWindowTrigger") + + +class TriggerAction(enum.Enum): + EMIT = 1 + CONTINUE = 2 + DELETE = 3 + EMIT_AND_DELETE = 4 + + +# TODO: Should we support multiple triggers? Or rather a single trigger that can react to many events? +# TODO: If we skip a window update, we still must bump the event time (!) +# It means that we must detach time bump from the window update +# TODO: The trigger state can live in a pre-defined column family "__trigger__" +class TimeWindowTrigger(ABC): + @abstractmethod + def on_window_closed(self) -> TriggerAction: + """ + Called after the window is already closed and removed from the state. + + TODO: "delete" and "emit-and-delete" actions don't make sense here + """ + + @abstractmethod + def on_window_updated( + self, + value: Any, + start: int, + end: int, + state: State, + ) -> TriggerAction: + # TODO: This looks like "on_window_update" + # TODO: Is it possible to support "collect()" for this one? + """ + Called after the window is updated. + """ + ... + + @abstractmethod + def clear(self, state: State): + # TODO: Clear the trigger state when the window is closed + # It's probably better to make the trigger to define it to avoid querying the whole CF for keys to delete. + ... + + # @abstractmethod + # def on_wallclock_time_advanced( + # self, timestamp_ms: int, state: State + # ) -> TriggerAction: + # """ + # Called when the heartbeat is received. + # Can be used to trigger the processing-time based behavior + # when there're no new messages for some time. + # + # # TODO: The granularity of the heartbeats is regulated elsewhere + # # TODO: If we want to expire windows based on processing time, + # how do we avoid re-scanning the state on every heartbeat? + # 1. The default implementation is void + # 2. In "on_window_updated", we need to add a window key to a separate state to know when it's added + # 3. The state should have a cache on top of it, which is some heapq initialized with a limited number of keys (e.g. 10000) + # The queue is needed to limit the additional memory use. + # When the queue gets empty (e.g. the first 10000 windows are expired), we refill it with another 10000. + # 4. How do we even emit or expire windows from this trigger since we don't have them? Should this method have a different API? + # If we store the keys to be expired, could we also return them from this method? + # + # """ diff --git a/quixstreams/state/base/state.py b/quixstreams/state/base/state.py index 4114e44ab..86a8936fb 100644 --- a/quixstreams/state/base/state.py +++ b/quixstreams/state/base/state.py @@ -93,9 +93,15 @@ class TransactionState(State): __slots__ = ( "_transaction", "_prefix", + "_cf_name", ) - def __init__(self, prefix: bytes, transaction: "PartitionTransaction"): + def __init__( + self, + prefix: bytes, + transaction: "PartitionTransaction", + cf_name: str = "default", # TODO: Test + ): """ Simple key-value state to be provided into `StreamingDataFrame` functions @@ -103,6 +109,11 @@ def __init__(self, prefix: bytes, transaction: "PartitionTransaction"): """ self._prefix = prefix self._transaction = transaction + self._cf_name = cf_name + + @property + def prefix(self) -> bytes: + return self._prefix @overload def get(self, key: K, default: Literal[None] = None) -> Optional[V]: ... @@ -118,7 +129,9 @@ def get(self, key: K, default: Optional[V] = None) -> Optional[V]: :param default: default value to return if the key is not found :return: value or None if the key is not found and `default` is not provided """ - return self._transaction.get(key=key, prefix=self._prefix, default=default) + return self._transaction.get( + key=key, prefix=self._prefix, default=default, cf_name=self._cf_name + ) @overload def get_bytes(self, key: K, default: Literal[None] = None) -> Optional[bytes]: ... @@ -135,7 +148,7 @@ def get_bytes(self, key: K, default: Optional[bytes] = None) -> Optional[bytes]: :return: value or None if the key is not found and `default` is not provided """ return self._transaction.get_bytes( - key=key, prefix=self._prefix, default=default + key=key, prefix=self._prefix, default=default, cf_name=self._cf_name ) def set(self, key: K, value: V) -> None: @@ -144,7 +157,9 @@ def set(self, key: K, value: V) -> None: :param key: key :param value: value """ - return self._transaction.set(key=key, value=value, prefix=self._prefix) + return self._transaction.set( + key=key, value=value, prefix=self._prefix, cf_name=self._cf_name + ) def set_bytes(self, key: K, value: bytes) -> None: """ @@ -152,7 +167,12 @@ def set_bytes(self, key: K, value: bytes) -> None: :param key: key :param value: value """ - return self._transaction.set_bytes(key=key, value=value, prefix=self._prefix) + return self._transaction.set_bytes( + key=key, + value=value, + prefix=self._prefix, + cf_name=self._cf_name, + ) def delete(self, key: K): """ @@ -161,7 +181,9 @@ def delete(self, key: K): This function always returns `None`, even if value is not found. :param key: key """ - return self._transaction.delete(key=key, prefix=self._prefix) + return self._transaction.delete( + key=key, prefix=self._prefix, cf_name=self._cf_name + ) def exists(self, key: K) -> bool: """ @@ -170,4 +192,6 @@ def exists(self, key: K) -> bool: :return: True if key exists, False otherwise """ - return self._transaction.exists(key=key, prefix=self._prefix) + return self._transaction.exists( + key=key, prefix=self._prefix, cf_name=self._cf_name + ) diff --git a/quixstreams/state/rocksdb/windowed/metadata.py b/quixstreams/state/rocksdb/windowed/metadata.py index a41838f10..e47232ace 100644 --- a/quixstreams/state/rocksdb/windowed/metadata.py +++ b/quixstreams/state/rocksdb/windowed/metadata.py @@ -11,3 +11,5 @@ LATEST_TIMESTAMP_KEY = b"__latest_timestamp__" VALUES_CF_NAME = "__values__" + +TRIGGERS_CF_NAME = "__triggers__" diff --git a/quixstreams/state/rocksdb/windowed/state.py b/quixstreams/state/rocksdb/windowed/state.py index 3e3021b20..93010c385 100644 --- a/quixstreams/state/rocksdb/windowed/state.py +++ b/quixstreams/state/rocksdb/windowed/state.py @@ -63,6 +63,24 @@ def update_window( prefix=self._prefix, ) + def delete_window( + self, + start_ms: int, + end_ms: int, + ) -> None: + # TODO: tests + """ + Delete a single window. + + :param start_ms: start of the window in milliseconds + :param end_ms: end of the window in milliseconds + """ + return self._transaction.delete_window( + start_ms=start_ms, + end_ms=end_ms, + prefix=self._prefix, + ) + def add_to_collection(self, value: Any, id: Optional[int]) -> int: """ Collect a value for collection-type window aggregations. diff --git a/quixstreams/state/rocksdb/windowed/transaction.py b/quixstreams/state/rocksdb/windowed/transaction.py index 3779b3e29..c76cc99a6 100644 --- a/quixstreams/state/rocksdb/windowed/transaction.py +++ b/quixstreams/state/rocksdb/windowed/transaction.py @@ -1,5 +1,6 @@ from typing import TYPE_CHECKING, Any, Iterable, Optional, cast +from quixstreams.state.base.state import TransactionState from quixstreams.state.base.transaction import ( PartitionTransactionStatus, validate_transaction_status, @@ -26,6 +27,7 @@ LATEST_EXPIRED_WINDOW_TIMESTAMP_KEY, LATEST_TIMESTAMP_KEY, LATEST_TIMESTAMPS_CF_NAME, + TRIGGERS_CF_NAME, VALUES_CF_NAME, ) from .serialization import parse_window_key @@ -82,6 +84,18 @@ def as_state(self, prefix: Any = DEFAULT_PREFIX) -> WindowedTransactionState: # ), ) + def as_trigger_state( + self, prefix: bytes, start_ms: int, end_ms: int + ) -> TransactionState: + # TODO: Tests, docs + # TODO: Maybe make the serialization lazy (not every trigger needs the state). Additional serialization drops throughput by ~10%. + # TODO: Or maybe pass the already encoded window key as a prefix. Then there's no extra cost unless you call the state. + window_key = encode_integer_pair(start_ms, end_ms) + prefix = self._serialize_key(key=window_key, prefix=prefix) + return TransactionState( + transaction=self, prefix=prefix, cf_name=TRIGGERS_CF_NAME + ) + @validate_transaction_status(PartitionTransactionStatus.STARTED) def keys(self, cf_name: str = "default") -> Iterable[Any]: db_skip_keys: set[bytes] = set() diff --git a/quixstreams/state/types.py b/quixstreams/state/types.py index c80c9e2ad..916d9c0ed 100644 --- a/quixstreams/state/types.py +++ b/quixstreams/state/types.py @@ -3,6 +3,8 @@ from typing_extensions import TypeAlias, TypeVar, overload +from .base.state import State + logger = logging.getLogger(__name__) K = TypeVar("K", contravariant=True) @@ -21,6 +23,9 @@ class WindowedState(Protocol[K, V]): A windowed state to be provided into `StreamingDataFrame` window functions. """ + @property + def prefix(self) -> bytes: ... + @overload def get(self, key: K) -> Optional[V]: ... @@ -102,6 +107,15 @@ def update_window( """ ... + def delete_window(self, start_ms: int, end_ms: int) -> Optional[V]: + """ + Delete a single window. + + :param start_ms: start of the window in milliseconds + :param end_ms: end of the window in milliseconds + """ + ... + def add_to_collection(self, value: V, id: Optional[int]) -> int: """ Collect a value for collection-type window aggregations. @@ -250,6 +264,8 @@ def prepare(self, processed_offsets: Optional[dict[str, int]]): def as_state(self, prefix: Any) -> WindowedState[K, V]: ... + def as_trigger_state(self, prefix: bytes, start_ms: int, end_ms: int) -> State: ... + def get_window( self, start_ms: int,