Skip to content

Commit a5b01a5

Browse files
committed
fix unit test and add more docs
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
1 parent b3bc9c1 commit a5b01a5

File tree

2 files changed

+13
-7
lines changed

2 files changed

+13
-7
lines changed

src/stream/src/common/change_buffer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ mod private {
6262
/// A buffer that accumulates changes and produce compacted changes.
6363
#[derive(Debug)]
6464
pub struct ChangeBuffer<K, R> {
65+
// We use an `IndexMap` to preserve the original order of the changes as much as possible.
6566
buffer: IndexMap<K, Record<R>>,
6667
ib: InconsistencyBehavior,
6768
}
@@ -104,6 +105,8 @@ where
104105
}
105106
Entry::Occupied(mut e) => match e.get_mut() {
106107
Record::Insert { .. } => {
108+
// FIXME: though preserving the order well,
109+
// this is not performant compared to `swap_remove`
107110
e.shift_remove();
108111
}
109112
Record::Update { old_row, .. } => {

src/stream/src/common/compact_chunk.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ impl StreamChunkCompactor {
192192
let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
193193
let mut cb = ChangeBuffer::with_capacity(estimate_size).with_inconsistency_behavior(ib);
194194

195-
for chunk in chunks.iter() {
195+
for chunk in &chunks {
196196
for record in chunk.records() {
197197
cb.apply_record(record, |&row| row.project(&key_indices));
198198
}
@@ -298,17 +298,20 @@ mod tests {
298298
vec![DataType::Int64, DataType::Int64, DataType::Int64],
299299
InconsistencyBehavior::Panic,
300300
);
301+
let chunk = chunks.into_iter().next().unwrap();
301302
assert_eq!(
302-
chunks.into_iter().next().unwrap(),
303+
chunk,
303304
StreamChunk::from_pretty(
304-
" I I I
305-
+ 2 5 5
306-
- 6 6 9
307-
+ 4 9 2
305+
" I I I
308306
U- 1 1 1
309307
U+ 1 1 2
308+
+ 4 9 2
309+
+ 2 5 5
310+
- 6 6 9
310311
+ 2 2 2",
311-
)
312+
),
313+
"{}",
314+
chunk.to_pretty()
312315
);
313316
}
314317
}

0 commit comments

Comments
 (0)