-
Notifications
You must be signed in to change notification settings - Fork 700
refactor(streaming): use ChangeBuffer for executors
#23508
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
5f7166e to
f4c1bbc
Compare
00296c2 to
a5b01a5
Compare
f4c1bbc to
c4aa20e
Compare
This reverts commit e79251b.
c4aa20e to
1ed8fec
Compare
This reverts commit 0fa45f6.
1ed8fec to
40945fd
Compare
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
40945fd to
8aca2f3
Compare
0fa45f6 to
c0303f2
Compare
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
ChangeBuffer for more executorsChangeBuffer for executors
8aca2f3 to
966e876
Compare
995c695 to
7cb115d
Compare
966e876 to
3bcf3d0
Compare
3bcf3d0 to
f2c4203
Compare
| // We expect one `CacheKey` to appear at most once in the staging, and, the order of | ||
| // the outputs of `TopN` doesn't really matter, so we can simply chain the three maps. | ||
| // Although the output order is not important, we still ensure that `Delete`s are emitted | ||
| // before `Insert`s, so that we can avoid temporary violation of the `LIMIT` constraint. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't matter any more as we switch internal to IndexMap.
The caller guarantees that when updating a row in staging, it always call delete before insert. Thus, by iterating the changes based on the operation order, we won't violate the LIMIT either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors streaming executors to use the centralized ChangeBuffer utility instead of maintaining duplicated change-buffering logic. The change eliminates redundant implementations in MaterializeCache, TopNStaging, and OverWindowExecutor, improving code maintainability and consistency.
Key Changes:
- Replaced custom change buffering logic with
ChangeBufferin multiple executors - Added
try_mapandmapmethods toRecord<R>for row transformation - Updated executors to use
Recordtypes andappend_recordinstead of tuples
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| src/stream/src/common/change_buffer.rs | Added Default trait implementation for ChangeBuffer |
| src/common/src/array/stream_record.rs | Added try_map and map methods to Record<R> for row transformations |
| src/stream/src/executor/mview/materialize.rs | Removed local ChangeBuffer implementation in favor of common ChangeBuffer; removed generate_output function |
| src/stream/src/executor/over_window/general.rs | Replaced manual change merging logic with ChangeBuffer |
| src/stream/src/executor/top_n/top_n_cache.rs | Refactored TopNStaging to use ChangeBuffer internally; simplified insert/delete operations |
| src/stream/src/executor/top_n/top_n_plain.rs | Updated to use Record type and append_record method |
| src/stream/src/executor/top_n/top_n_appendonly.rs | Updated to use Record type and append_record method |
| src/stream/src/executor/top_n/group_top_n_appendonly.rs | Added explicit type annotation for stagings map; updated to use append_record |
| src/stream/src/executor/top_n/group_top_n.rs | Added explicit type annotation for stagings map; updated to use append_record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
With the introduction of the
ChangeBufferin the recent PR #23507, we can now streamline the implementation of executors that previously replicated this functionality, including:MaterializeCacheTopNStagingOverWindowExecutorActually, this was once proposed in #19451 (comment), where it seemed that we merged the PR in a rush and didn't get it done eventually.
Checklist
Documentation
Release note