Skip to content

v3.15.0

Compare
Choose a tag to compare
@daniil-quix daniil-quix released this 27 May 15:55
· 96 commits to main since this release
a2f4ee7

What's Changed

๐Ÿ’Ž New streaming join: StreamingDataFrame.join_asof

With StreamingDataFrame.join_asof(), you can join two topics into a new stream where each left record is merged with the right record with the same key whose timestamp is less than or equal to the left timestamp.

This join is built with the timeseries enrichment use cases in mind, where the left side represents some measurements and the right side represents events.

Some examples:

  • Matching of the sensor measurements with the events in the system.
  • Joining the purchases with the effective prices of the goods.
from datetime import timedelta

from quixstreams import Application

app = Application(...)

sdf_measurements = app.dataframe(app.topic("measurements"))
sdf_metadata = app.dataframe(app.topic("metadata"))

# Join records from the topic "measurements"
# with the latest effective records from the topic "metadata".
# using the "inner" join strategy and keeping the "metadata" records stored for 14 days in event time.
sdf_joined = sdf_measurements.join_asof(
    right=sdf_metadata,
    how="inner",                 # Emit updates only if the match is found in the store.
    on_merge="keep-left",        # Prefer the columns from the left dataframe if they overlap with the right. 
    grace_ms=timedelta(days=14), # Keep the state for 14 days (measured in event time, similar to windows).
)

if __name__ == '__main__':
    app.run()

Learn more about it on the Joins docs page.

By @gwaramadze and @daniil-quix in #874 #841

State improvements

from quixstreams import Application
from quixstreams.state.rocksdb import RocksDBOptions

app = Application(..., rocksdb_options=RocksDBOptions(on_corrupted_recreate=True))

Dependencies

  • Bump types-protobuf from 6.30.2.20250503 to 6.30.2.20250516 by @dependabot in #885

Full Changelog: v3.14.1...v3.15.0