@@ -18,7 +18,6 @@ use std::ops::RangeInclusive;
1818
1919use delta_btree_map:: Change ;
2020use itertools:: Itertools ;
21- use risingwave_common:: array:: Op ;
2221use risingwave_common:: array:: stream_record:: Record ;
2322use risingwave_common:: row:: RowExt ;
2423use risingwave_common:: session_config:: OverWindowCachePolicy as CachePolicy ;
@@ -33,6 +32,7 @@ use super::frame_finder::merge_rows_frames;
3332use super :: over_partition:: { OverPartition , PartitionDelta } ;
3433use super :: range_cache:: { CacheKey , PartitionCache } ;
3534use crate :: cache:: ManagedLruCache ;
35+ use crate :: common:: change_buffer:: ChangeBuffer ;
3636use crate :: common:: metrics:: MetricsInfo ;
3737use crate :: consistency:: consistency_panic;
3838use crate :: executor:: monitor:: OverWindowMetrics ;
@@ -274,69 +274,11 @@ impl<S: StateStore> OverWindowExecutor<S> {
274274 this : & ' _ ExecutorInner < S > ,
275275 chunk : & ' a StreamChunk ,
276276 ) -> impl Iterator < Item = Record < RowRef < ' a > > > {
277- let mut changes_merged = BTreeMap :: new ( ) ;
278- for ( op, row) in chunk. rows ( ) {
279- let pk = DefaultOrdered ( this. get_input_pk ( row) ) ;
280- match op {
281- Op :: Insert | Op :: UpdateInsert => {
282- if let Some ( prev_change) = changes_merged. get_mut ( & pk) {
283- match prev_change {
284- Record :: Delete { old_row } => {
285- * prev_change = Record :: Update {
286- old_row : * old_row,
287- new_row : row,
288- } ;
289- }
290- _ => {
291- consistency_panic ! (
292- ?pk,
293- ?row,
294- ?prev_change,
295- "inconsistent changes in input chunk, double-inserting"
296- ) ;
297- if let Record :: Update { old_row, .. } = prev_change {
298- * prev_change = Record :: Update {
299- old_row : * old_row,
300- new_row : row,
301- } ;
302- } else {
303- * prev_change = Record :: Insert { new_row : row } ;
304- }
305- }
306- }
307- } else {
308- changes_merged. insert ( pk, Record :: Insert { new_row : row } ) ;
309- }
310- }
311- Op :: Delete | Op :: UpdateDelete => {
312- if let Some ( prev_change) = changes_merged. get_mut ( & pk) {
313- match prev_change {
314- Record :: Insert { .. } => {
315- changes_merged. remove ( & pk) ;
316- }
317- Record :: Update {
318- old_row : real_old_row,
319- ..
320- } => {
321- * prev_change = Record :: Delete {
322- old_row : * real_old_row,
323- } ;
324- }
325- _ => {
326- consistency_panic ! (
327- ?pk,
328- "inconsistent changes in input chunk, double-deleting"
329- ) ;
330- * prev_change = Record :: Delete { old_row : row } ;
331- }
332- }
333- } else {
334- changes_merged. insert ( pk, Record :: Delete { old_row : row } ) ;
335- }
336- }
337- }
277+ let mut cb = ChangeBuffer :: with_capacity ( chunk. cardinality ( ) ) ;
278+ for record in chunk. records ( ) {
279+ cb. apply_record ( record, |row| this. get_input_pk ( row) ) ;
338280 }
339- changes_merged . into_values ( )
281+ cb . into_records ( )
340282 }
341283
342284 #[ try_stream( ok = StreamChunk , error = StreamExecutorError ) ]
0 commit comments