Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quixstreams/dataframe/windows/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def window_callback(
)
# Use window start timestamp as a new record timestamp
for key, window in expired_windows:
yield (window, key, window["start"], None)
yield window, key, window["start"], None

return self._apply_window(
func=window_callback,
Expand Down
137 changes: 136 additions & 1 deletion quixstreams/dataframe/windows/time_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
Window,
WindowKeyResult,
WindowOnLateCallback,
WindowResult,
get_window_ranges,
)
from .triggers import TimeWindowTrigger, TriggerAction

if TYPE_CHECKING:
from quixstreams.dataframe.dataframe import StreamingDataFrame
Expand Down Expand Up @@ -122,6 +124,26 @@ def current(
self._closing_strategy = ClosingStrategy.new(closing_strategy)
return super().current()

def trigger(self, trigger: TimeWindowTrigger) -> "StreamingDataFrame":
def window_callback(
value: Any,
key: Any,
timestamp_ms: int,
_headers: Any,
transaction: WindowedPartitionTransaction,
) -> Iterable[tuple[WindowResult, Any, int, Any]]:
windows = self.process_window_new(
value=value,
key=key,
timestamp_ms=timestamp_ms,
transaction=transaction,
trigger=trigger,
)
for key, window in windows:
yield window, key, window["start"], None

return self._apply_window(func=window_callback, name=self._name)

def process_window(
self,
value: Any,
Expand Down Expand Up @@ -200,6 +222,119 @@ def process_window(

return updated_windows, expired_windows

def process_window_new(
self,
value: Any,
key: Any,
timestamp_ms: int,
transaction: WindowedPartitionTransaction,
trigger: TimeWindowTrigger,
) -> Iterable[WindowKeyResult]:
window_state = transaction.as_state(prefix=key)
duration_ms = self._duration_ms
grace_ms = self._grace_ms

collect = self.collect
aggregate = self.aggregate

ranges = get_window_ranges(
timestamp_ms=timestamp_ms,
duration_ms=duration_ms,
step_ms=self._step_ms,
)

if self._closing_strategy == ClosingStrategy.PARTITION:
latest_expired_window_end = transaction.get_latest_expired(prefix=b"")
latest_timestamp = max(timestamp_ms, latest_expired_window_end)
else:
state_ts = window_state.get_latest_timestamp() or 0
latest_timestamp = max(timestamp_ms, state_ts)

max_expired_window_end = latest_timestamp - grace_ms
max_expired_window_start = max_expired_window_end - duration_ms
updated_windows: list[WindowKeyResult] = []
for start, end in ranges:
if start <= max_expired_window_start:
late_by_ms = max_expired_window_end - timestamp_ms
self._on_expired_window(
value=value,
key=key,
start=start,
end=end,
timestamp_ms=timestamp_ms,
late_by_ms=late_by_ms,
)
continue

# When collecting values, we only mark the window existence with None
# since actual values are stored separately and combined into an array
# during window expiration.
aggregated = None
if aggregate:
current_window = window_state.get_window(start, end)
if current_window is None:
current_window = self._initialize_value()

aggregated = self._aggregate_value(current_window, value, timestamp_ms)

updated_window = self._results(aggregated, [], start, end)

window_state.update_window(
start, end, value=aggregated, timestamp_ms=timestamp_ms
)

trigger_state = transaction.as_trigger_state(
prefix=window_state.prefix, start_ms=start, end_ms=end
)

if (
action := trigger.on_window_updated(
value, start=start, end=end, state=trigger_state
)
) == TriggerAction.EMIT:
updated_windows.append((key, updated_window))
elif action == TriggerAction.DELETE:
# # TODO: Bump the event time here.
# # TODO: For count-windows, you want to add an element first, and only then emit/delete it.
# # But for session-close events, do you want this?
window_state.delete_window(start, end)
elif action == TriggerAction.EMIT_AND_DELETE:
# TODO: Bump the event time here.
# TODO: Do not update windows which are supposed to be deleted
# TODO: The deleted window must be returned in the sorted way together with other expired windows.
updated_windows.append((key, updated_window))
window_state.delete_window(start, end)

if collect:
# TODO: Is it correct? Looks like ".collect()" may not skip the late messages
# TODO: Create windows when "collect" is used too
# TODO: maybe rethink how "collect" is handled to accommodate the triggers
window_state.add_to_collection(
value=self._collect_value(value),
id=timestamp_ms,
)

if self._closing_strategy == ClosingStrategy.PARTITION:
expired_windows = self.expire_by_partition(
transaction, max_expired_window_end, collect
)
else:
expired_windows = self.expire_by_key(
key, window_state, max_expired_window_start, collect
)

for key, window in expired_windows:
# Ignore all other actions except "EMIT" and "EMIT_AND_DELETE"
# because the time has already moved beyond the window boundaries
if trigger.on_window_closed() in (
TriggerAction.EMIT,
TriggerAction.EMIT_AND_DELETE,
):
yield key, window

for key, window in updated_windows:
yield key, window

def expire_by_partition(
self,
transaction: WindowedPartitionTransaction,
Expand Down Expand Up @@ -231,7 +366,7 @@ def expire_by_key(
max_start_time=max_expired_start,
collect=collect,
):
yield (key, self._results(aggregated, collected, window_start, window_end))
yield key, self._results(aggregated, collected, window_start, window_end)

def _on_expired_window(
self,
Expand Down
2 changes: 2 additions & 0 deletions quixstreams/dataframe/windows/triggers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .base import TimeWindowTrigger as TimeWindowTrigger
from .base import TriggerAction as TriggerAction
71 changes: 71 additions & 0 deletions quixstreams/dataframe/windows/triggers/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import enum
from abc import ABC, abstractmethod
from typing import Any

from quixstreams.state.base.state import State

__all__ = ("TriggerAction", "TimeWindowTrigger")


class TriggerAction(enum.Enum):
EMIT = 1
CONTINUE = 2
DELETE = 3
EMIT_AND_DELETE = 4


# TODO: Should we support multiple triggers? Or rather a single trigger that can react to many events?
# TODO: If we skip a window update, we still must bump the event time (!)
# It means that we must detach time bump from the window update
# TODO: The trigger state can live in a pre-defined column family "__trigger__"
class TimeWindowTrigger(ABC):
@abstractmethod
def on_window_closed(self) -> TriggerAction:
"""
Called after the window is already closed and removed from the state.

TODO: "delete" and "emit-and-delete" actions don't make sense here
"""

@abstractmethod
def on_window_updated(
self,
value: Any,
start: int,
end: int,
state: State,
) -> TriggerAction:
# TODO: This looks like "on_window_update"
# TODO: Is it possible to support "collect()" for this one?
"""
Called after the window is updated.
"""
...

@abstractmethod
def clear(self, state: State):
# TODO: Clear the trigger state when the window is closed
# It's probably better to make the trigger to define it to avoid querying the whole CF for keys to delete.
...

# @abstractmethod
# def on_wallclock_time_advanced(
# self, timestamp_ms: int, state: State
# ) -> TriggerAction:
# """
# Called when the heartbeat is received.
# Can be used to trigger the processing-time based behavior
# when there're no new messages for some time.
#
# # TODO: The granularity of the heartbeats is regulated elsewhere
# # TODO: If we want to expire windows based on processing time,
# how do we avoid re-scanning the state on every heartbeat?
# 1. The default implementation is void
# 2. In "on_window_updated", we need to add a window key to a separate state to know when it's added
# 3. The state should have a cache on top of it, which is some heapq initialized with a limited number of keys (e.g. 10000)
# The queue is needed to limit the additional memory use.
# When the queue gets empty (e.g. the first 10000 windows are expired), we refill it with another 10000.
# 4. How do we even emit or expire windows from this trigger since we don't have them? Should this method have a different API?
# If we store the keys to be expired, could we also return them from this method?
#
# """
38 changes: 31 additions & 7 deletions quixstreams/state/base/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,27 @@ class TransactionState(State):
__slots__ = (
"_transaction",
"_prefix",
"_cf_name",
)

def __init__(self, prefix: bytes, transaction: "PartitionTransaction"):
def __init__(
self,
prefix: bytes,
transaction: "PartitionTransaction",
cf_name: str = "default", # TODO: Test
):
"""
Simple key-value state to be provided into `StreamingDataFrame` functions

:param transaction: instance of `PartitionTransaction`
"""
self._prefix = prefix
self._transaction = transaction
self._cf_name = cf_name

@property
def prefix(self) -> bytes:
return self._prefix

@overload
def get(self, key: K, default: Literal[None] = None) -> Optional[V]: ...
Expand All @@ -118,7 +129,9 @@ def get(self, key: K, default: Optional[V] = None) -> Optional[V]:
:param default: default value to return if the key is not found
:return: value or None if the key is not found and `default` is not provided
"""
return self._transaction.get(key=key, prefix=self._prefix, default=default)
return self._transaction.get(
key=key, prefix=self._prefix, default=default, cf_name=self._cf_name
)

@overload
def get_bytes(self, key: K, default: Literal[None] = None) -> Optional[bytes]: ...
Expand All @@ -135,7 +148,7 @@ def get_bytes(self, key: K, default: Optional[bytes] = None) -> Optional[bytes]:
:return: value or None if the key is not found and `default` is not provided
"""
return self._transaction.get_bytes(
key=key, prefix=self._prefix, default=default
key=key, prefix=self._prefix, default=default, cf_name=self._cf_name
)

def set(self, key: K, value: V) -> None:
Expand All @@ -144,15 +157,22 @@ def set(self, key: K, value: V) -> None:
:param key: key
:param value: value
"""
return self._transaction.set(key=key, value=value, prefix=self._prefix)
return self._transaction.set(
key=key, value=value, prefix=self._prefix, cf_name=self._cf_name
)

def set_bytes(self, key: K, value: bytes) -> None:
"""
Set value for the key.
:param key: key
:param value: value
"""
return self._transaction.set_bytes(key=key, value=value, prefix=self._prefix)
return self._transaction.set_bytes(
key=key,
value=value,
prefix=self._prefix,
cf_name=self._cf_name,
)

def delete(self, key: K):
"""
Expand All @@ -161,7 +181,9 @@ def delete(self, key: K):
This function always returns `None`, even if value is not found.
:param key: key
"""
return self._transaction.delete(key=key, prefix=self._prefix)
return self._transaction.delete(
key=key, prefix=self._prefix, cf_name=self._cf_name
)

def exists(self, key: K) -> bool:
"""
Expand All @@ -170,4 +192,6 @@ def exists(self, key: K) -> bool:
:return: True if key exists, False otherwise
"""

return self._transaction.exists(key=key, prefix=self._prefix)
return self._transaction.exists(
key=key, prefix=self._prefix, cf_name=self._cf_name
)
2 changes: 2 additions & 0 deletions quixstreams/state/rocksdb/windowed/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@
LATEST_TIMESTAMP_KEY = b"__latest_timestamp__"

VALUES_CF_NAME = "__values__"

TRIGGERS_CF_NAME = "__triggers__"
18 changes: 18 additions & 0 deletions quixstreams/state/rocksdb/windowed/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ def update_window(
prefix=self._prefix,
)

def delete_window(
self,
start_ms: int,
end_ms: int,
) -> None:
# TODO: tests
"""
Delete a single window.

:param start_ms: start of the window in milliseconds
:param end_ms: end of the window in milliseconds
"""
return self._transaction.delete_window(
start_ms=start_ms,
end_ms=end_ms,
prefix=self._prefix,
)

def add_to_collection(self, value: Any, id: Optional[int]) -> int:
"""
Collect a value for collection-type window aggregations.
Expand Down
Loading