Skip to content

Commit 2a42950

Browse files
committed
filter out no-op updates in into_records
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
1 parent d58153e commit 2a42950

File tree

3 files changed

+27
-24
lines changed

3 files changed

+27
-24
lines changed

src/common/src/array/stream_chunk.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,13 @@ impl Default for OpRowMutRef<'_> {
533533
}
534534
}
535535

536+
impl PartialEq for OpRowMutRef<'_> {
537+
fn eq(&self, other: &Self) -> bool {
538+
self.row_ref() == other.row_ref()
539+
}
540+
}
541+
impl Eq for OpRowMutRef<'_> {}
542+
536543
impl<'a> OpRowMutRef<'a> {
537544
pub fn index(&self) -> usize {
538545
self.i

src/common/src/array/stream_chunk_builder.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -157,20 +157,6 @@ impl StreamChunkBuilder {
157157
}
158158
}
159159

160-
/// Append a record to the builder if it's not a no-op update, return a chunk if the builder is full.
161-
#[must_use]
162-
pub fn append_record_eliminate_noop_update(
163-
&mut self,
164-
record: Record<impl Row>,
165-
) -> Option<StreamChunk> {
166-
if let Record::Update { old_row, new_row } = &record
167-
&& Row::eq(old_row, new_row)
168-
{
169-
return None;
170-
}
171-
self.append_record(record)
172-
}
173-
174160
/// Take all the pending data and return a chunk. If there is no pending data, return `None`.
175161
/// Note that if this is an unlimited chunk builder, the only way to get a chunk is to call
176162
/// `take`.

src/stream/src/common/change_buffer.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ mod private {
5555
pub trait Key: Eq + std::hash::Hash {}
5656
impl<K> Key for K where K: Eq + std::hash::Hash {}
5757

58-
pub trait Row: Default {}
59-
impl<R> Row for R where R: Default {}
58+
pub trait Row: Eq + Default {}
59+
impl<R> Row for R where R: Eq + Default {}
6060
}
6161

6262
/// A buffer that accumulates changes and produce compacted changes.
@@ -175,6 +175,17 @@ where
175175
Op::Delete | Op::UpdateDelete => self.delete(key, row),
176176
}
177177
}
178+
179+
/// Consume the buffer and produce a list of change records.
180+
///
181+
/// No-op updates are filtered out.
182+
pub fn into_records(self) -> impl Iterator<Item = Record<R>> {
183+
self.buffer.into_values().filter(|record| match record {
184+
Record::Insert { .. } => true,
185+
Record::Delete { .. } => true,
186+
Record::Update { old_row, new_row } => old_row != new_row,
187+
})
188+
}
178189
}
179190

180191
impl<K, R> ChangeBuffer<K, R> {
@@ -206,19 +217,18 @@ impl<K, R> ChangeBuffer<K, R> {
206217
pub fn is_empty(&self) -> bool {
207218
self.buffer.is_empty()
208219
}
209-
210-
/// Consume the buffer and produce a list of change records.
211-
pub fn into_records(self) -> impl ExactSizeIterator<Item = Record<R>> {
212-
self.buffer.into_values()
213-
}
214220
}
215221

216-
impl<K, R: Row> ChangeBuffer<K, R> {
222+
impl<K, R> ChangeBuffer<K, R>
223+
where
224+
K: private::Key,
225+
R: private::Row + Row,
226+
{
217227
/// Consume the buffer and produce a single compacted chunk.
218228
pub fn into_chunk(self, data_types: Vec<DataType>) -> Option<StreamChunk> {
219229
let mut builder = StreamChunkBuilder::unlimited(data_types, Some(self.buffer.len()));
220230
for record in self.into_records() {
221-
let none = builder.append_record_eliminate_noop_update(record);
231+
let none = builder.append_record(record);
222232
debug_assert!(none.is_none());
223233
}
224234
builder.take()
@@ -229,7 +239,7 @@ impl<K, R: Row> ChangeBuffer<K, R> {
229239
let mut res = Vec::new();
230240
let mut builder = StreamChunkBuilder::new(chunk_size, data_types);
231241
for record in self.into_records() {
232-
if let Some(chunk) = builder.append_record_eliminate_noop_update(record) {
242+
if let Some(chunk) = builder.append_record(record) {
233243
res.push(chunk);
234244
}
235245
}

0 commit comments

Comments
 (0)