Skip to content

Conversation

@fedimser
Copy link
Contributor

@fedimser fedimser commented Nov 21, 2025

What changes were proposed in this pull request?

  • Added TwsTester, a test helper for writing unit tests for StatefulProcessor implementations that will be used in TransformWithState operator in streaming queries. It processes input rows and returns output rows equivalent to those that would be produced by the processor in an actual Spark streaming query.
  • Supported functionality:
    • Processing input rows and producing output rows via test().
    • Initial state setup via constructor parameter.
    • Direct state manipulation via setValueState, setListState, setMapState.
    • Direct state inspection via peekValueState, peekListState, peekMapState.
    • Timers (both ProcessingTime and EventTime modes).
    • TTL.
    • Real-time mode simulation.
  • This is implemented in Scala. Equivalent TwsTester in Python for PySpark will be added in [SPARK-54805] Implement TwsTester in PySpark #53491.

Why are the changes needed?

Some users requested unit testing functionality for TWS.

Does this PR introduce any user-facing change?

Yes, it adds new public API to Spark:

  • org.apache.spark.sql.streaming.TwsTester in Scala.

How was this patch tested?

Added unit and end-to-end tests in this PR. End-to-end tests compare TwsTester output with results of a real streaming query.

Was this patch authored or co-authored using generative AI tooling?

Yes. Cursor with claude-4.5-sonnet was used to assist with coding and generate some of documentation and tests.
Generated-by: claude-4.5-sonnet

- Add TwsTester.scala: Main testing framework for TransformWithState
- Add InMemoryStatefulProcessorHandleImpl.scala: In-memory implementation for testing
- Add TwsTesterSuite.scala: Test suite for TwsTester framework
- Add processors: EventTimeWindow, MultiTimer, RunningCount, SessionTimeout, TopK, WordFrequency
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only minors and nits. Thanks for addressing all review comments!

* @param processor the StatefulProcessor to test.
* @param initialState initial state for each key as a list of (key, state) tuples.
* @param timeMode time mode (None, ProcessingTime or EventTime).
* @param outputMode output mode (Append, Update, or Complete).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with it.

fedimser and others added 4 commits January 7, 2026 13:54
…ng/operators/stateful/transformwithstate/testing/InMemoryStatefulProcessorHandle.scala

Co-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
@fedimser fedimser requested a review from HeartSaVioR January 8, 2026 22:04
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Thanks for your patience!

@HeartSaVioR
Copy link
Contributor

I see CI fails with document generation, but it does not seem to come from this PR.

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

sarutak added a commit that referenced this pull request Jan 9, 2026
### What changes were proposed in this pull request?
This PR fixes an issue that illegal character `<` is used in block comment in `TwsTester.scala`, which was brought in #53159 and causes error when generating Javadoc.
https://github.com/apache/spark/actions/runs/20839516325/job/59870814463

```
[error] Generating /home/runner/work/spark/spark/target/javaunidoc/org/apache/spark/sql/streaming/TwsTester.html...
```

### Why are the changes needed?
To recover CI.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #53746 from sarutak/fix-genjavadoc-error.

Authored-by: Kousuke Saruta <sarutak@amazon.co.jp>
Signed-off-by: Kousuke Saruta <sarutak@apache.org>
Yicong-Huang pushed a commit to Yicong-Huang/spark that referenced this pull request Jan 9, 2026
### What changes were proposed in this pull request?

* Added TwsTester, a test helper for writing unit tests for StatefulProcessor implementations that will be used in TransformWithState operator in streaming queries. It processes input rows and returns output rows equivalent to those that would be produced by the processor in an actual Spark streaming query.
* Supported functionality:
   * Processing input rows and producing output rows via `test()`.
   * Initial state setup via constructor parameter.
   * Direct state manipulation via `setValueState`, `setListState`, `setMapState`.
   * Direct state inspection via `peekValueState`, `peekListState`, `peekMapState`.
   * Timers (both ProcessingTime and EventTime modes).
   * TTL.
   * Real-time mode simulation.
* This is implemented in Scala. Equivalent TwsTester in Python for PySpark will be added in apache#53491.

### Why are the changes needed?

Some users requested unit testing functionality for TWS.

### Does this PR introduce _any_ user-facing change?

Yes, it adds new public API to Spark:
* org.apache.spark.sql.streaming.TwsTester in Scala.

### How was this patch tested?

Added unit and end-to-end tests in this PR. End-to-end tests compare TwsTester output with results of a real streaming query.

### Was this patch authored or co-authored using generative AI tooling?

Yes. Cursor with claude-4.5-sonnet was used to assist with coding and generate some of documentation and tests.
Generated-by: claude-4.5-sonnet

Closes apache#53159 from fedimser/tws-tester-3.

Lead-authored-by: Dmytro Fedoriaka <dmytro.fedoriaka@databricks.com>
Co-authored-by: Dmytro Fedoriaka <fedimser@users.noreply.github.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Yicong-Huang pushed a commit to Yicong-Huang/spark that referenced this pull request Jan 9, 2026
### What changes were proposed in this pull request?
This PR fixes an issue that illegal character `<` is used in block comment in `TwsTester.scala`, which was brought in apache#53159 and causes error when generating Javadoc.
https://github.com/apache/spark/actions/runs/20839516325/job/59870814463

```
[error] Generating /home/runner/work/spark/spark/target/javaunidoc/org/apache/spark/sql/streaming/TwsTester.html...
```

### Why are the changes needed?
To recover CI.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#53746 from sarutak/fix-genjavadoc-error.

Authored-by: Kousuke Saruta <sarutak@amazon.co.jp>
Signed-off-by: Kousuke Saruta <sarutak@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants