Skip to content

Commit 8aca2f3

Browse files
committed
replace append_record_eliminate_noop_update with append_record
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
1 parent b6d274d commit 8aca2f3

File tree

5 files changed

+10
-4
lines changed

5 files changed

+10
-4
lines changed

src/stream/src/common/change_buffer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,12 @@ where
188188
}
189189
}
190190

191+
impl<K, R> Default for ChangeBuffer<K, R> {
192+
fn default() -> Self {
193+
Self::new()
194+
}
195+
}
196+
191197
impl<K, R> ChangeBuffer<K, R> {
192198
/// Create a new `ChangeBuffer` that panics on inconsistency.
193199
pub fn new() -> Self {

src/stream/src/executor/top_n/group_top_n.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ where
234234
for staging in stagings.into_values() {
235235
for res in staging.into_deserialized_changes(&deserializer) {
236236
let record = res?;
237-
let _none = chunk_builder.append_record_eliminate_noop_update(record);
237+
let _none = chunk_builder.append_record(record);
238238
}
239239
}
240240
Ok(chunk_builder.take())

src/stream/src/executor/top_n/group_top_n_appendonly.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ where
203203
for staging in stagings.into_values() {
204204
for res in staging.into_deserialized_changes(&deserializer) {
205205
let record = res?;
206-
let _none = chunk_builder.append_record_eliminate_noop_update(record);
206+
let _none = chunk_builder.append_record(record);
207207
}
208208
}
209209

src/stream/src/executor/top_n/top_n_appendonly.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ where
134134
let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(staging.len()));
135135
for res in staging.into_deserialized_changes(&deserializer) {
136136
let record = res?;
137-
let _none = chunk_builder.append_record_eliminate_noop_update(record);
137+
let _none = chunk_builder.append_record(record);
138138
}
139139
Ok(chunk_builder.take())
140140
}

src/stream/src/executor/top_n/top_n_plain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ where
171171
let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(staging.len()));
172172
for res in staging.into_deserialized_changes(&deserializer) {
173173
let record = res?;
174-
let _none = chunk_builder.append_record_eliminate_noop_update(record);
174+
let _none = chunk_builder.append_record(record);
175175
}
176176
Ok(chunk_builder.take())
177177
}

0 commit comments

Comments
 (0)