v3.15.0
What's Changed
๐ New streaming join: StreamingDataFrame.join_asof
With StreamingDataFrame.join_asof()
, you can join two topics into a new stream where each left record is merged with the right record with the same key whose timestamp is less than or equal to the left timestamp.
This join is built with the timeseries enrichment use cases in mind, where the left side represents some measurements and the right side represents events.
Some examples:
- Matching of the sensor measurements with the events in the system.
- Joining the purchases with the effective prices of the goods.
from datetime import timedelta
from quixstreams import Application
app = Application(...)
sdf_measurements = app.dataframe(app.topic("measurements"))
sdf_metadata = app.dataframe(app.topic("metadata"))
# Join records from the topic "measurements"
# with the latest effective records from the topic "metadata".
# using the "inner" join strategy and keeping the "metadata" records stored for 14 days in event time.
sdf_joined = sdf_measurements.join_asof(
right=sdf_metadata,
how="inner", # Emit updates only if the match is found in the store.
on_merge="keep-left", # Prefer the columns from the left dataframe if they overlap with the right.
grace_ms=timedelta(days=14), # Keep the state for 14 days (measured in event time, similar to windows).
)
if __name__ == '__main__':
app.run()
Learn more about it on the Joins docs page.
By @gwaramadze and @daniil-quix in #874 #841
State improvements
- Enable fsync in RocksDB by default by @gwaramadze in #883
- RocksDBStorePartition: log a number of bytes written by @daniil-quix in #890
- Optimize state operations by @daniil-quix in #891
- Add a parameter to clear corrupted RocksDBs by @gwaramadze in #888.
To re-create the corrupted RocksDB state store automatically, use the RocksDBOptions object:
from quixstreams import Application
from quixstreams.state.rocksdb import RocksDBOptions
app = Application(..., rocksdb_options=RocksDBOptions(on_corrupted_recreate=True))
Dependencies
- Bump types-protobuf from 6.30.2.20250503 to 6.30.2.20250516 by @dependabot in #885
Full Changelog: v3.14.1...v3.15.0