-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54122][SS] Implement TwsTester in Scala #53159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add TwsTester.scala: Main testing framework for TransformWithState - Add InMemoryStatefulProcessorHandleImpl.scala: In-memory implementation for testing - Add TwsTesterSuite.scala: Test suite for TwsTester framework - Add processors: EventTimeWindow, MultiTimer, RunningCount, SessionTimeout, TopK, WordFrequency
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only minors and nits. Thanks for addressing all review comments!
...treaming/operators/stateful/transformwithstate/testing/InMemoryStatefulProcessorHandle.scala
Outdated
Show resolved
Hide resolved
...treaming/operators/stateful/transformwithstate/testing/InMemoryStatefulProcessorHandle.scala
Outdated
Show resolved
Hide resolved
...treaming/operators/stateful/transformwithstate/testing/InMemoryStatefulProcessorHandle.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/streaming/TwsTester.scala
Outdated
Show resolved
Hide resolved
| * @param processor the StatefulProcessor to test. | ||
| * @param initialState initial state for each key as a list of (key, state) tuples. | ||
| * @param timeMode time mode (None, ProcessingTime or EventTime). | ||
| * @param outputMode output mode (Append, Update, or Complete). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK with it.
sql/core/src/test/scala/org/apache/spark/sql/streaming/TwsTesterSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TwsTesterSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TwsTesterSuite.scala
Outdated
Show resolved
Hide resolved
…ng/operators/stateful/transformwithstate/testing/InMemoryStatefulProcessorHandle.scala Co-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Thanks for your patience!
|
I see CI fails with document generation, but it does not seem to come from this PR. |
|
Thanks! Merging to master. |
### What changes were proposed in this pull request? This PR fixes an issue that illegal character `<` is used in block comment in `TwsTester.scala`, which was brought in #53159 and causes error when generating Javadoc. https://github.com/apache/spark/actions/runs/20839516325/job/59870814463 ``` [error] Generating /home/runner/work/spark/spark/target/javaunidoc/org/apache/spark/sql/streaming/TwsTester.html... ``` ### Why are the changes needed? To recover CI. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53746 from sarutak/fix-genjavadoc-error. Authored-by: Kousuke Saruta <sarutak@amazon.co.jp> Signed-off-by: Kousuke Saruta <sarutak@apache.org>
### What changes were proposed in this pull request? * Added TwsTester, a test helper for writing unit tests for StatefulProcessor implementations that will be used in TransformWithState operator in streaming queries. It processes input rows and returns output rows equivalent to those that would be produced by the processor in an actual Spark streaming query. * Supported functionality: * Processing input rows and producing output rows via `test()`. * Initial state setup via constructor parameter. * Direct state manipulation via `setValueState`, `setListState`, `setMapState`. * Direct state inspection via `peekValueState`, `peekListState`, `peekMapState`. * Timers (both ProcessingTime and EventTime modes). * TTL. * Real-time mode simulation. * This is implemented in Scala. Equivalent TwsTester in Python for PySpark will be added in apache#53491. ### Why are the changes needed? Some users requested unit testing functionality for TWS. ### Does this PR introduce _any_ user-facing change? Yes, it adds new public API to Spark: * org.apache.spark.sql.streaming.TwsTester in Scala. ### How was this patch tested? Added unit and end-to-end tests in this PR. End-to-end tests compare TwsTester output with results of a real streaming query. ### Was this patch authored or co-authored using generative AI tooling? Yes. Cursor with claude-4.5-sonnet was used to assist with coding and generate some of documentation and tests. Generated-by: claude-4.5-sonnet Closes apache#53159 from fedimser/tws-tester-3. Lead-authored-by: Dmytro Fedoriaka <dmytro.fedoriaka@databricks.com> Co-authored-by: Dmytro Fedoriaka <fedimser@users.noreply.github.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request? This PR fixes an issue that illegal character `<` is used in block comment in `TwsTester.scala`, which was brought in apache#53159 and causes error when generating Javadoc. https://github.com/apache/spark/actions/runs/20839516325/job/59870814463 ``` [error] Generating /home/runner/work/spark/spark/target/javaunidoc/org/apache/spark/sql/streaming/TwsTester.html... ``` ### Why are the changes needed? To recover CI. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#53746 from sarutak/fix-genjavadoc-error. Authored-by: Kousuke Saruta <sarutak@amazon.co.jp> Signed-off-by: Kousuke Saruta <sarutak@apache.org>
What changes were proposed in this pull request?
test().setValueState,setListState,setMapState.peekValueState,peekListState,peekMapState.Why are the changes needed?
Some users requested unit testing functionality for TWS.
Does this PR introduce any user-facing change?
Yes, it adds new public API to Spark:
How was this patch tested?
Added unit and end-to-end tests in this PR. End-to-end tests compare TwsTester output with results of a real streaming query.
Was this patch authored or co-authored using generative AI tooling?
Yes. Cursor with claude-4.5-sonnet was used to assist with coding and generate some of documentation and tests.
Generated-by: claude-4.5-sonnet