Skip to content

Commit 6ab0d16

Browse files
committed
use change buffer for compactor
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
1 parent 447f3cd commit 6ab0d16

File tree

4 files changed

+45
-129
lines changed

4 files changed

+45
-129
lines changed

src/common/src/array/data_chunk_iter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ unsafe impl TrustedLen for DataChunkRefIterWithHoles<'_> {}
125125
mod row_ref {
126126
use super::*;
127127

128-
#[derive(Clone, Copy)]
128+
#[derive(Clone, Copy, Default)]
129129
pub struct RowRef<'a> {
130130
columns: &'a [ArrayRef],
131131

src/common/src/row/project.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use super::Row;
1818
use crate::types::DatumRef;
1919

2020
/// Row for the [`project`](super::RowExt::project) method.
21-
#[derive(Debug, Clone, Copy)]
21+
#[derive(Debug, Clone, Copy, Default)]
2222
pub struct Project<'i, R> {
2323
row: R,
2424
indices: &'i [usize],

src/stream/src/common/change_buffer.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,36 @@ where
131131
},
132132
}
133133
}
134+
135+
pub fn apply_record(&mut self, record: Record<R>, key_fn: impl Fn(&R) -> K) {
136+
match record {
137+
Record::Insert { new_row } => self.insert(key_fn(&new_row), new_row),
138+
Record::Delete { old_row } => self.delete(key_fn(&old_row), old_row),
139+
Record::Update { old_row, new_row } => {
140+
let old_key = key_fn(&old_row);
141+
let new_key = key_fn(&new_row);
142+
if old_key != new_key {
143+
self.ib
144+
.report("inconsistent changes: mismatched key in update");
145+
}
146+
self.update(old_key, old_row, new_row);
147+
}
148+
}
149+
}
134150
}
135151

136152
impl<K, R> ChangeBuffer<K, R> {
137153
pub fn new() -> Self {
138154
Self::default()
139155
}
140156

157+
pub fn with_capacity(capacity: usize) -> Self {
158+
Self {
159+
buffer: IndexMap::with_capacity(capacity),
160+
..Default::default()
161+
}
162+
}
163+
141164
pub fn with_inconsistency_behavior(mut self, ib: InconsistencyBehavior) -> Self {
142165
self.ib = ib;
143166
self
@@ -165,4 +188,16 @@ impl<K, R: Row> ChangeBuffer<K, R> {
165188
}
166189
builder.take()
167190
}
191+
192+
pub fn into_chunks(self, data_types: Vec<DataType>, chunk_size: usize) -> Vec<StreamChunk> {
193+
let mut res = Vec::new();
194+
let mut builder = StreamChunkBuilder::new(chunk_size, data_types);
195+
for record in self.into_records() {
196+
if let Some(chunk) = builder.append_record_eliminate_noop_update(record) {
197+
res.push(chunk);
198+
}
199+
}
200+
res.extend(builder.take());
201+
res
202+
}
168203
}

src/stream/src/common/compact_chunk.rs

Lines changed: 8 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ use std::mem;
2020
use itertools::Itertools;
2121
use prehash::{Passthru, Prehashed, new_prehashed_map_with_capacity};
2222
use risingwave_common::array::stream_chunk::{OpRowMutRef, StreamChunkMut};
23-
use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
24-
use risingwave_common::array::stream_record::Record;
2523
use risingwave_common::array::{Op, RowRef, StreamChunk};
2624
use risingwave_common::row::{Project, RowExt};
2725
use risingwave_common::types::DataType;
@@ -39,6 +37,7 @@ use risingwave_common::util::hash_util::Crc32FastBuilder;
3937
// of the same key, construct new chunks. A combination of `into_compacted_chunks`, `compact`,
4038
// and `StreamChunkBuilder`.
4139
pub use super::change_buffer::InconsistencyBehavior;
40+
use crate::common::change_buffer::ChangeBuffer;
4241

4342
/// A helper to compact the stream chunks by modifying the `Ops` and visibility of the chunk.
4443
pub struct StreamChunkCompactor {
@@ -110,107 +109,6 @@ impl<'a> OpRowMutRefTuple<'a> {
110109
type OpRowMap<'a, 'b> =
111110
HashMap<Prehashed<Project<'b, RowRef<'a>>>, OpRowMutRefTuple<'a>, BuildHasherDefault<Passthru>>;
112111

113-
#[derive(Clone, Debug)]
114-
pub enum RowOp<'a> {
115-
Insert(RowRef<'a>),
116-
Delete(RowRef<'a>),
117-
/// (`old_value`, `new_value`)
118-
Update((RowRef<'a>, RowRef<'a>)),
119-
}
120-
121-
pub struct RowOpMap<'a, 'b> {
122-
map: HashMap<Prehashed<Project<'b, RowRef<'a>>>, RowOp<'a>, BuildHasherDefault<Passthru>>,
123-
ib: InconsistencyBehavior,
124-
}
125-
126-
impl<'a, 'b> RowOpMap<'a, 'b> {
127-
fn with_capacity(estimate_size: usize, ib: InconsistencyBehavior) -> Self {
128-
Self {
129-
map: new_prehashed_map_with_capacity(estimate_size),
130-
ib,
131-
}
132-
}
133-
134-
pub fn insert(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
135-
let entry = self.map.entry(k);
136-
match entry {
137-
Entry::Vacant(e) => {
138-
e.insert(RowOp::Insert(v));
139-
}
140-
Entry::Occupied(mut e) => match e.get() {
141-
RowOp::Delete(old_v) => {
142-
e.insert(RowOp::Update((*old_v, v)));
143-
}
144-
RowOp::Insert(_) => {
145-
self.ib
146-
.report("double insert for the same pk, breaking the sink's pk constraint");
147-
e.insert(RowOp::Insert(v));
148-
}
149-
RowOp::Update((old_v, _)) => {
150-
self.ib
151-
.report("double insert for the same pk, breaking the sink's pk constraint");
152-
e.insert(RowOp::Update((*old_v, v)));
153-
}
154-
},
155-
}
156-
}
157-
158-
pub fn delete(&mut self, k: Prehashed<Project<'b, RowRef<'a>>>, v: RowRef<'a>) {
159-
let entry = self.map.entry(k);
160-
match entry {
161-
Entry::Vacant(e) => {
162-
e.insert(RowOp::Delete(v));
163-
}
164-
Entry::Occupied(mut e) => match e.get() {
165-
RowOp::Insert(_) => {
166-
e.remove();
167-
}
168-
RowOp::Update((prev, _)) => {
169-
e.insert(RowOp::Delete(*prev));
170-
}
171-
RowOp::Delete(_) => {
172-
self.ib.report("double delete for the same pk");
173-
e.insert(RowOp::Delete(v));
174-
}
175-
},
176-
}
177-
}
178-
179-
pub fn into_chunks(self, chunk_size: usize, data_types: Vec<DataType>) -> Vec<StreamChunk> {
180-
let mut ret = vec![];
181-
let mut builder = StreamChunkBuilder::new(chunk_size, data_types);
182-
for (_, row_op) in self.map {
183-
match row_op {
184-
RowOp::Insert(row) => {
185-
if let Some(c) = builder.append_record(Record::Insert { new_row: row }) {
186-
ret.push(c)
187-
}
188-
}
189-
RowOp::Delete(row) => {
190-
if let Some(c) = builder.append_record(Record::Delete { old_row: row }) {
191-
ret.push(c)
192-
}
193-
}
194-
RowOp::Update((old, new)) => {
195-
if old == new {
196-
continue;
197-
}
198-
if let Some(c) = builder.append_record(Record::Update {
199-
old_row: old,
200-
new_row: new,
201-
}) {
202-
ret.push(c)
203-
}
204-
}
205-
}
206-
}
207-
if let Some(c) = builder.take() {
208-
ret.push(c);
209-
}
210-
ret
211-
}
212-
}
213-
214112
impl StreamChunkCompactor {
215113
pub fn new(key: Vec<usize>, chunks: Vec<StreamChunk>) -> Self {
216114
Self { chunks, key }
@@ -292,32 +190,15 @@ impl StreamChunkCompactor {
292190
let (chunks, key_indices) = self.into_inner();
293191

294192
let estimate_size = chunks.iter().map(|c| c.cardinality()).sum();
295-
let chunks: Vec<(_, _, _)> = chunks
296-
.into_iter()
297-
.map(|c| {
298-
let (c, ops) = c.into_parts();
299-
let hash_values = c
300-
.get_hash_values(&key_indices, Crc32FastBuilder)
301-
.into_iter()
302-
.map(|hash| hash.value())
303-
.collect_vec();
304-
(hash_values, ops, c)
305-
})
306-
.collect_vec();
307-
let mut map = RowOpMap::with_capacity(estimate_size, ib);
308-
for (hash_values, ops, c) in &chunks {
309-
for row in c.rows() {
310-
let hash = hash_values[row.index()];
311-
let op = ops[row.index()];
312-
let key = row.project(&key_indices);
313-
let k = Prehashed::new(key, hash);
314-
match op {
315-
Op::Insert | Op::UpdateInsert => map.insert(k, row),
316-
Op::Delete | Op::UpdateDelete => map.delete(k, row),
317-
}
193+
let mut cb = ChangeBuffer::with_capacity(estimate_size).with_inconsistency_behavior(ib);
194+
195+
for chunk in chunks.iter() {
196+
for record in chunk.records() {
197+
cb.apply_record(record, |&row| row.project(&key_indices));
318198
}
319199
}
320-
map.into_chunks(chunk_size, data_types)
200+
201+
cb.into_chunks(data_types, chunk_size)
321202
}
322203
}
323204

0 commit comments

Comments
 (0)