Skip to content

Commit 0610624

Browse files
authored
refactor(streaming): use ChangeBuffer for executors (#23508)
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
1 parent ccea659 commit 0610624

File tree

9 files changed

+66
-259
lines changed

9 files changed

+66
-259
lines changed

src/common/src/array/stream_record.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::convert::Infallible;
16+
1517
use auto_enums::auto_enum;
1618

1719
use super::StreamChunk;
@@ -76,6 +78,28 @@ impl<R> Record<R> {
7678
Record::Update { old_row, new_row } => Record::Update { old_row, new_row },
7779
}
7880
}
81+
82+
/// Try mapping the row in the record to another row, returning error if any of the mapping fails.
83+
pub fn try_map<R2, E>(self, f: impl Fn(R) -> Result<R2, E>) -> Result<Record<R2>, E> {
84+
Ok(match self {
85+
Record::Insert { new_row } => Record::Insert {
86+
new_row: f(new_row)?,
87+
},
88+
Record::Delete { old_row } => Record::Delete {
89+
old_row: f(old_row)?,
90+
},
91+
Record::Update { old_row, new_row } => Record::Update {
92+
old_row: f(old_row)?,
93+
new_row: f(new_row)?,
94+
},
95+
})
96+
}
97+
98+
/// Map the row in the record to another row.
99+
pub fn map<R2>(self, f: impl Fn(R) -> R2) -> Record<R2> {
100+
let Ok(record) = self.try_map::<R2, Infallible>(|row| Ok(f(row)));
101+
record
102+
}
79103
}
80104

81105
impl<R: Row> Record<R> {

src/stream/src/common/change_buffer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,12 @@ where
188188
}
189189
}
190190

191+
impl<K, R> Default for ChangeBuffer<K, R> {
192+
fn default() -> Self {
193+
Self::new()
194+
}
195+
}
196+
191197
impl<K, R> ChangeBuffer<K, R> {
192198
/// Create a new `ChangeBuffer` that panics on inconsistency.
193199
pub fn new() -> Self {

src/stream/src/executor/mview/materialize.rs

Lines changed: 3 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::assert_matches::assert_matches;
16-
use std::collections::hash_map::Entry;
17-
use std::collections::{HashMap, HashSet};
16+
use std::collections::HashSet;
1817
use std::marker::PhantomData;
1918
use std::ops::{Bound, Deref, Index};
2019

@@ -31,7 +30,6 @@ use risingwave_common::catalog::{
3130
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
3231
use risingwave_common::row::{CompactedRow, OwnedRow, RowExt};
3332
use risingwave_common::types::{DEBEZIUM_UNAVAILABLE_VALUE, DataType, ScalarImpl};
34-
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
3533
use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
3634
use risingwave_common::util::sort_util::{ColumnOrder, OrderType, cmp_datum};
3735
use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
@@ -507,7 +505,7 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
507505
)
508506
.await?;
509507

510-
match generate_output(change_buffer, data_types.clone())? {
508+
match change_buffer.into_chunk(data_types.clone()) {
511509
Some(output_chunk) => {
512510
self.state_table.write_chunk(output_chunk.clone());
513511
self.state_table.try_flush().await?;
@@ -1199,134 +1197,8 @@ fn handle_toast_columns_for_postgres_cdc(
11991197
OwnedRow::new(fixed_row_data)
12001198
}
12011199

1202-
/// Construct output `StreamChunk` from given buffer.
1203-
fn generate_output(
1204-
change_buffer: ChangeBuffer,
1205-
data_types: Vec<DataType>,
1206-
) -> StreamExecutorResult<Option<StreamChunk>> {
1207-
// construct output chunk
1208-
// TODO(st1page): when materialize partial columns(), we should construct some columns in the pk
1209-
let mut new_ops: Vec<Op> = vec![];
1210-
let mut new_rows: Vec<OwnedRow> = vec![];
1211-
for (_, row_op) in change_buffer.into_parts() {
1212-
match row_op {
1213-
ChangeBufferKeyOp::Insert(value) => {
1214-
new_ops.push(Op::Insert);
1215-
new_rows.push(value);
1216-
}
1217-
ChangeBufferKeyOp::Delete(old_value) => {
1218-
new_ops.push(Op::Delete);
1219-
new_rows.push(old_value);
1220-
}
1221-
ChangeBufferKeyOp::Update((old_value, new_value)) => {
1222-
// if old_value == new_value, we don't need to emit updates to downstream.
1223-
if old_value != new_value {
1224-
new_ops.push(Op::UpdateDelete);
1225-
new_ops.push(Op::UpdateInsert);
1226-
new_rows.push(old_value);
1227-
new_rows.push(new_value);
1228-
}
1229-
}
1230-
}
1231-
}
1232-
let mut data_chunk_builder = DataChunkBuilder::new(data_types, new_rows.len() + 1);
1233-
1234-
for row in new_rows {
1235-
let res = data_chunk_builder.append_one_row(row);
1236-
debug_assert!(res.is_none());
1237-
}
1238-
1239-
if let Some(new_data_chunk) = data_chunk_builder.consume_all() {
1240-
let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec());
1241-
Ok(Some(new_stream_chunk))
1242-
} else {
1243-
Ok(None)
1244-
}
1245-
}
1246-
1247-
/// `ChangeBuffer` is a buffer to handle chunk into `KeyOp`.
1248-
/// TODO(rc): merge with `TopNStaging`.
1249-
pub struct ChangeBuffer {
1250-
buffer: HashMap<Vec<u8>, ChangeBufferKeyOp>,
1251-
}
1200+
type ChangeBuffer = crate::common::change_buffer::ChangeBuffer<Vec<u8>, OwnedRow>;
12521201

1253-
/// `KeyOp` variant for `ChangeBuffer` that stores `OwnedRow` instead of Bytes
1254-
enum ChangeBufferKeyOp {
1255-
Insert(OwnedRow),
1256-
Delete(OwnedRow),
1257-
/// (`old_value`, `new_value`)
1258-
Update((OwnedRow, OwnedRow)),
1259-
}
1260-
1261-
impl ChangeBuffer {
1262-
fn new() -> Self {
1263-
Self {
1264-
buffer: HashMap::new(),
1265-
}
1266-
}
1267-
1268-
fn insert(&mut self, pk: Vec<u8>, value: OwnedRow) {
1269-
let entry = self.buffer.entry(pk);
1270-
match entry {
1271-
Entry::Vacant(e) => {
1272-
e.insert(ChangeBufferKeyOp::Insert(value));
1273-
}
1274-
Entry::Occupied(mut e) => {
1275-
if let ChangeBufferKeyOp::Delete(old_value) = e.get_mut() {
1276-
let old_val = std::mem::take(old_value);
1277-
e.insert(ChangeBufferKeyOp::Update((old_val, value)));
1278-
} else {
1279-
unreachable!();
1280-
}
1281-
}
1282-
}
1283-
}
1284-
1285-
fn delete(&mut self, pk: Vec<u8>, old_value: OwnedRow) {
1286-
let entry: Entry<'_, Vec<u8>, ChangeBufferKeyOp> = self.buffer.entry(pk);
1287-
match entry {
1288-
Entry::Vacant(e) => {
1289-
e.insert(ChangeBufferKeyOp::Delete(old_value));
1290-
}
1291-
Entry::Occupied(mut e) => match e.get_mut() {
1292-
ChangeBufferKeyOp::Insert(_) => {
1293-
e.remove();
1294-
}
1295-
ChangeBufferKeyOp::Update((prev, _curr)) => {
1296-
let prev = std::mem::take(prev);
1297-
e.insert(ChangeBufferKeyOp::Delete(prev));
1298-
}
1299-
ChangeBufferKeyOp::Delete(_) => {
1300-
unreachable!();
1301-
}
1302-
},
1303-
}
1304-
}
1305-
1306-
fn update(&mut self, pk: Vec<u8>, old_value: OwnedRow, new_value: OwnedRow) {
1307-
let entry = self.buffer.entry(pk);
1308-
match entry {
1309-
Entry::Vacant(e) => {
1310-
e.insert(ChangeBufferKeyOp::Update((old_value, new_value)));
1311-
}
1312-
Entry::Occupied(mut e) => match e.get_mut() {
1313-
ChangeBufferKeyOp::Insert(_) => {
1314-
e.insert(ChangeBufferKeyOp::Insert(new_value));
1315-
}
1316-
ChangeBufferKeyOp::Update((_prev, curr)) => {
1317-
*curr = new_value;
1318-
}
1319-
ChangeBufferKeyOp::Delete(_) => {
1320-
unreachable!()
1321-
}
1322-
},
1323-
}
1324-
}
1325-
1326-
fn into_parts(self) -> HashMap<Vec<u8>, ChangeBufferKeyOp> {
1327-
self.buffer
1328-
}
1329-
}
13301202
impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
13311203
fn execute(self: Box<Self>) -> BoxedMessageStream {
13321204
self.execute_inner().boxed()

src/stream/src/executor/over_window/general.rs

Lines changed: 5 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use std::ops::RangeInclusive;
1818

1919
use delta_btree_map::Change;
2020
use itertools::Itertools;
21-
use risingwave_common::array::Op;
2221
use risingwave_common::array::stream_record::Record;
2322
use risingwave_common::row::RowExt;
2423
use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy;
@@ -33,6 +32,7 @@ use super::frame_finder::merge_rows_frames;
3332
use super::over_partition::{OverPartition, PartitionDelta};
3433
use super::range_cache::{CacheKey, PartitionCache};
3534
use crate::cache::ManagedLruCache;
35+
use crate::common::change_buffer::ChangeBuffer;
3636
use crate::common::metrics::MetricsInfo;
3737
use crate::consistency::consistency_panic;
3838
use crate::executor::monitor::OverWindowMetrics;
@@ -274,69 +274,11 @@ impl<S: StateStore> OverWindowExecutor<S> {
274274
this: &'_ ExecutorInner<S>,
275275
chunk: &'a StreamChunk,
276276
) -> impl Iterator<Item = Record<RowRef<'a>>> {
277-
let mut changes_merged = BTreeMap::new();
278-
for (op, row) in chunk.rows() {
279-
let pk = DefaultOrdered(this.get_input_pk(row));
280-
match op {
281-
Op::Insert | Op::UpdateInsert => {
282-
if let Some(prev_change) = changes_merged.get_mut(&pk) {
283-
match prev_change {
284-
Record::Delete { old_row } => {
285-
*prev_change = Record::Update {
286-
old_row: *old_row,
287-
new_row: row,
288-
};
289-
}
290-
_ => {
291-
consistency_panic!(
292-
?pk,
293-
?row,
294-
?prev_change,
295-
"inconsistent changes in input chunk, double-inserting"
296-
);
297-
if let Record::Update { old_row, .. } = prev_change {
298-
*prev_change = Record::Update {
299-
old_row: *old_row,
300-
new_row: row,
301-
};
302-
} else {
303-
*prev_change = Record::Insert { new_row: row };
304-
}
305-
}
306-
}
307-
} else {
308-
changes_merged.insert(pk, Record::Insert { new_row: row });
309-
}
310-
}
311-
Op::Delete | Op::UpdateDelete => {
312-
if let Some(prev_change) = changes_merged.get_mut(&pk) {
313-
match prev_change {
314-
Record::Insert { .. } => {
315-
changes_merged.remove(&pk);
316-
}
317-
Record::Update {
318-
old_row: real_old_row,
319-
..
320-
} => {
321-
*prev_change = Record::Delete {
322-
old_row: *real_old_row,
323-
};
324-
}
325-
_ => {
326-
consistency_panic!(
327-
?pk,
328-
"inconsistent changes in input chunk, double-deleting"
329-
);
330-
*prev_change = Record::Delete { old_row: row };
331-
}
332-
}
333-
} else {
334-
changes_merged.insert(pk, Record::Delete { old_row: row });
335-
}
336-
}
337-
}
277+
let mut cb = ChangeBuffer::with_capacity(chunk.cardinality());
278+
for record in chunk.records() {
279+
cb.apply_record(record, |row| this.get_input_pk(row));
338280
}
339-
changes_merged.into_values()
281+
cb.into_records()
340282
}
341283

342284
#[try_stream(ok = StreamChunk, error = StreamExecutorError)]

src/stream/src/executor/top_n/group_top_n.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::common::metrics::MetricsInfo;
3030
use crate::common::table::state_table::StateTablePostCommit;
3131
use crate::executor::monitor::GroupTopNMetrics;
3232
use crate::executor::prelude::*;
33+
use crate::executor::top_n::top_n_cache::TopNStaging;
3334

3435
pub type GroupTopNExecutor<K, S, const WITH_TIES: bool> =
3536
TopNExecutorWrapper<InnerGroupTopNExecutor<K, S, WITH_TIES>>;
@@ -169,7 +170,7 @@ where
169170
chunk: StreamChunk,
170171
) -> StreamExecutorResult<Option<StreamChunk>> {
171172
let keys = K::build_many(&self.group_by, chunk.data_chunk());
172-
let mut stagings = HashMap::new(); // K -> `TopNStaging`
173+
let mut stagings: HashMap<K, TopNStaging> = HashMap::new();
173174

174175
for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
175176
let Some((op, row_ref)) = r else {
@@ -232,8 +233,8 @@ where
232233
let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(chunk.capacity()));
233234
for staging in stagings.into_values() {
234235
for res in staging.into_deserialized_changes(&deserializer) {
235-
let (op, row) = res?;
236-
let _none = chunk_builder.append_row(op, row);
236+
let record = res?;
237+
let _none = chunk_builder.append_record(record);
237238
}
238239
}
239240
Ok(chunk_builder.take())

src/stream/src/executor/top_n/group_top_n_appendonly.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::common::metrics::MetricsInfo;
2929
use crate::common::table::state_table::StateTablePostCommit;
3030
use crate::executor::monitor::GroupTopNMetrics;
3131
use crate::executor::prelude::*;
32+
use crate::executor::top_n::top_n_cache::TopNStaging;
3233

3334
/// If the input is append-only, `AppendOnlyGroupTopNExecutor` does not need
3435
/// to keep all the rows seen. As long as a record
@@ -150,7 +151,7 @@ where
150151
chunk: StreamChunk,
151152
) -> StreamExecutorResult<Option<StreamChunk>> {
152153
let keys = K::build_many(&self.group_by, chunk.data_chunk());
153-
let mut stagings = HashMap::new(); // K -> `TopNStaging`
154+
let mut stagings: HashMap<K, TopNStaging> = HashMap::new(); // K -> `TopNStaging`
154155

155156
let data_types = self.schema.data_types();
156157
let deserializer = RowDeserializer::new(data_types.clone());
@@ -201,8 +202,8 @@ where
201202
let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(chunk.capacity()));
202203
for staging in stagings.into_values() {
203204
for res in staging.into_deserialized_changes(&deserializer) {
204-
let (op, row) = res?;
205-
let _none = chunk_builder.append_row(op, row);
205+
let record = res?;
206+
let _none = chunk_builder.append_record(record);
206207
}
207208
}
208209

src/stream/src/executor/top_n/top_n_appendonly.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,8 @@ where
133133
}
134134
let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(staging.len()));
135135
for res in staging.into_deserialized_changes(&deserializer) {
136-
let (op, row) = res?;
137-
let _none = chunk_builder.append_row(op, row);
136+
let record = res?;
137+
let _none = chunk_builder.append_record(record);
138138
}
139139
Ok(chunk_builder.take())
140140
}

0 commit comments

Comments
 (0)