Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/common/src/array/stream_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::convert::Infallible;

use auto_enums::auto_enum;

use super::StreamChunk;
Expand Down Expand Up @@ -76,6 +78,28 @@ impl<R> Record<R> {
Record::Update { old_row, new_row } => Record::Update { old_row, new_row },
}
}

/// Try mapping the row in the record to another row, returning error if any of the mapping fails.
pub fn try_map<R2, E>(self, f: impl Fn(R) -> Result<R2, E>) -> Result<Record<R2>, E> {
Ok(match self {
Record::Insert { new_row } => Record::Insert {
new_row: f(new_row)?,
},
Record::Delete { old_row } => Record::Delete {
old_row: f(old_row)?,
},
Record::Update { old_row, new_row } => Record::Update {
old_row: f(old_row)?,
new_row: f(new_row)?,
},
})
}

/// Map the row in the record to another row.
pub fn map<R2>(self, f: impl Fn(R) -> R2) -> Record<R2> {
let Ok(record) = self.try_map::<R2, Infallible>(|row| Ok(f(row)));
record
}
}

impl<R: Row> Record<R> {
Expand Down
6 changes: 6 additions & 0 deletions src/stream/src/common/change_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ where
}
}

impl<K, R> Default for ChangeBuffer<K, R> {
fn default() -> Self {
Self::new()
}
}

impl<K, R> ChangeBuffer<K, R> {
/// Create a new `ChangeBuffer` that panics on inconsistency.
pub fn new() -> Self {
Expand Down
134 changes: 3 additions & 131 deletions src/stream/src/executor/mview/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
// limitations under the License.

use std::assert_matches::assert_matches;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::marker::PhantomData;
use std::ops::{Bound, Deref, Index};

Expand All @@ -31,7 +30,6 @@ use risingwave_common::catalog::{
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::row::{CompactedRow, OwnedRow, RowExt};
use risingwave_common::types::{DEBEZIUM_UNAVAILABLE_VALUE, DataType, ScalarImpl};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType, cmp_datum};
use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
Expand Down Expand Up @@ -507,7 +505,7 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
)
.await?;

match generate_output(change_buffer, data_types.clone())? {
match change_buffer.into_chunk(data_types.clone()) {
Some(output_chunk) => {
self.state_table.write_chunk(output_chunk.clone());
self.state_table.try_flush().await?;
Expand Down Expand Up @@ -1199,134 +1197,8 @@ fn handle_toast_columns_for_postgres_cdc(
OwnedRow::new(fixed_row_data)
}

/// Construct output `StreamChunk` from given buffer.
fn generate_output(
change_buffer: ChangeBuffer,
data_types: Vec<DataType>,
) -> StreamExecutorResult<Option<StreamChunk>> {
// construct output chunk
// TODO(st1page): when materialize partial columns(), we should construct some columns in the pk
let mut new_ops: Vec<Op> = vec![];
let mut new_rows: Vec<OwnedRow> = vec![];
for (_, row_op) in change_buffer.into_parts() {
match row_op {
ChangeBufferKeyOp::Insert(value) => {
new_ops.push(Op::Insert);
new_rows.push(value);
}
ChangeBufferKeyOp::Delete(old_value) => {
new_ops.push(Op::Delete);
new_rows.push(old_value);
}
ChangeBufferKeyOp::Update((old_value, new_value)) => {
// if old_value == new_value, we don't need to emit updates to downstream.
if old_value != new_value {
new_ops.push(Op::UpdateDelete);
new_ops.push(Op::UpdateInsert);
new_rows.push(old_value);
new_rows.push(new_value);
}
}
}
}
let mut data_chunk_builder = DataChunkBuilder::new(data_types, new_rows.len() + 1);

for row in new_rows {
let res = data_chunk_builder.append_one_row(row);
debug_assert!(res.is_none());
}

if let Some(new_data_chunk) = data_chunk_builder.consume_all() {
let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec());
Ok(Some(new_stream_chunk))
} else {
Ok(None)
}
}

/// `ChangeBuffer` is a buffer to handle chunk into `KeyOp`.
/// TODO(rc): merge with `TopNStaging`.
pub struct ChangeBuffer {
buffer: HashMap<Vec<u8>, ChangeBufferKeyOp>,
}
type ChangeBuffer = crate::common::change_buffer::ChangeBuffer<Vec<u8>, OwnedRow>;

/// `KeyOp` variant for `ChangeBuffer` that stores `OwnedRow` instead of Bytes
enum ChangeBufferKeyOp {
Insert(OwnedRow),
Delete(OwnedRow),
/// (`old_value`, `new_value`)
Update((OwnedRow, OwnedRow)),
}

impl ChangeBuffer {
fn new() -> Self {
Self {
buffer: HashMap::new(),
}
}

fn insert(&mut self, pk: Vec<u8>, value: OwnedRow) {
let entry = self.buffer.entry(pk);
match entry {
Entry::Vacant(e) => {
e.insert(ChangeBufferKeyOp::Insert(value));
}
Entry::Occupied(mut e) => {
if let ChangeBufferKeyOp::Delete(old_value) = e.get_mut() {
let old_val = std::mem::take(old_value);
e.insert(ChangeBufferKeyOp::Update((old_val, value)));
} else {
unreachable!();
}
}
}
}

fn delete(&mut self, pk: Vec<u8>, old_value: OwnedRow) {
let entry: Entry<'_, Vec<u8>, ChangeBufferKeyOp> = self.buffer.entry(pk);
match entry {
Entry::Vacant(e) => {
e.insert(ChangeBufferKeyOp::Delete(old_value));
}
Entry::Occupied(mut e) => match e.get_mut() {
ChangeBufferKeyOp::Insert(_) => {
e.remove();
}
ChangeBufferKeyOp::Update((prev, _curr)) => {
let prev = std::mem::take(prev);
e.insert(ChangeBufferKeyOp::Delete(prev));
}
ChangeBufferKeyOp::Delete(_) => {
unreachable!();
}
},
}
}

fn update(&mut self, pk: Vec<u8>, old_value: OwnedRow, new_value: OwnedRow) {
let entry = self.buffer.entry(pk);
match entry {
Entry::Vacant(e) => {
e.insert(ChangeBufferKeyOp::Update((old_value, new_value)));
}
Entry::Occupied(mut e) => match e.get_mut() {
ChangeBufferKeyOp::Insert(_) => {
e.insert(ChangeBufferKeyOp::Insert(new_value));
}
ChangeBufferKeyOp::Update((_prev, curr)) => {
*curr = new_value;
}
ChangeBufferKeyOp::Delete(_) => {
unreachable!()
}
},
}
}

fn into_parts(self) -> HashMap<Vec<u8>, ChangeBufferKeyOp> {
self.buffer
}
}
impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
fn execute(self: Box<Self>) -> BoxedMessageStream {
self.execute_inner().boxed()
Expand Down
68 changes: 5 additions & 63 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::ops::RangeInclusive;

use delta_btree_map::Change;
use itertools::Itertools;
use risingwave_common::array::Op;
use risingwave_common::array::stream_record::Record;
use risingwave_common::row::RowExt;
use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy;
Expand All @@ -33,6 +32,7 @@ use super::frame_finder::merge_rows_frames;
use super::over_partition::{OverPartition, PartitionDelta};
use super::range_cache::{CacheKey, PartitionCache};
use crate::cache::ManagedLruCache;
use crate::common::change_buffer::ChangeBuffer;
use crate::common::metrics::MetricsInfo;
use crate::consistency::consistency_panic;
use crate::executor::monitor::OverWindowMetrics;
Expand Down Expand Up @@ -274,69 +274,11 @@ impl<S: StateStore> OverWindowExecutor<S> {
this: &'_ ExecutorInner<S>,
chunk: &'a StreamChunk,
) -> impl Iterator<Item = Record<RowRef<'a>>> {
let mut changes_merged = BTreeMap::new();
for (op, row) in chunk.rows() {
let pk = DefaultOrdered(this.get_input_pk(row));
match op {
Op::Insert | Op::UpdateInsert => {
if let Some(prev_change) = changes_merged.get_mut(&pk) {
match prev_change {
Record::Delete { old_row } => {
*prev_change = Record::Update {
old_row: *old_row,
new_row: row,
};
}
_ => {
consistency_panic!(
?pk,
?row,
?prev_change,
"inconsistent changes in input chunk, double-inserting"
);
if let Record::Update { old_row, .. } = prev_change {
*prev_change = Record::Update {
old_row: *old_row,
new_row: row,
};
} else {
*prev_change = Record::Insert { new_row: row };
}
}
}
} else {
changes_merged.insert(pk, Record::Insert { new_row: row });
}
}
Op::Delete | Op::UpdateDelete => {
if let Some(prev_change) = changes_merged.get_mut(&pk) {
match prev_change {
Record::Insert { .. } => {
changes_merged.remove(&pk);
}
Record::Update {
old_row: real_old_row,
..
} => {
*prev_change = Record::Delete {
old_row: *real_old_row,
};
}
_ => {
consistency_panic!(
?pk,
"inconsistent changes in input chunk, double-deleting"
);
*prev_change = Record::Delete { old_row: row };
}
}
} else {
changes_merged.insert(pk, Record::Delete { old_row: row });
}
}
}
let mut cb = ChangeBuffer::with_capacity(chunk.cardinality());
for record in chunk.records() {
cb.apply_record(record, |row| this.get_input_pk(row));
}
changes_merged.into_values()
cb.into_records()
}

#[try_stream(ok = StreamChunk, error = StreamExecutorError)]
Expand Down
7 changes: 4 additions & 3 deletions src/stream/src/executor/top_n/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::common::metrics::MetricsInfo;
use crate::common::table::state_table::StateTablePostCommit;
use crate::executor::monitor::GroupTopNMetrics;
use crate::executor::prelude::*;
use crate::executor::top_n::top_n_cache::TopNStaging;

pub type GroupTopNExecutor<K, S, const WITH_TIES: bool> =
TopNExecutorWrapper<InnerGroupTopNExecutor<K, S, WITH_TIES>>;
Expand Down Expand Up @@ -169,7 +170,7 @@ where
chunk: StreamChunk,
) -> StreamExecutorResult<Option<StreamChunk>> {
let keys = K::build_many(&self.group_by, chunk.data_chunk());
let mut stagings = HashMap::new(); // K -> `TopNStaging`
let mut stagings: HashMap<K, TopNStaging> = HashMap::new();

for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
let Some((op, row_ref)) = r else {
Expand Down Expand Up @@ -232,8 +233,8 @@ where
let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(chunk.capacity()));
for staging in stagings.into_values() {
for res in staging.into_deserialized_changes(&deserializer) {
let (op, row) = res?;
let _none = chunk_builder.append_row(op, row);
let record = res?;
let _none = chunk_builder.append_record(record);
}
}
Ok(chunk_builder.take())
Expand Down
7 changes: 4 additions & 3 deletions src/stream/src/executor/top_n/group_top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::common::metrics::MetricsInfo;
use crate::common::table::state_table::StateTablePostCommit;
use crate::executor::monitor::GroupTopNMetrics;
use crate::executor::prelude::*;
use crate::executor::top_n::top_n_cache::TopNStaging;

/// If the input is append-only, `AppendOnlyGroupTopNExecutor` does not need
/// to keep all the rows seen. As long as a record
Expand Down Expand Up @@ -150,7 +151,7 @@ where
chunk: StreamChunk,
) -> StreamExecutorResult<Option<StreamChunk>> {
let keys = K::build_many(&self.group_by, chunk.data_chunk());
let mut stagings = HashMap::new(); // K -> `TopNStaging`
let mut stagings: HashMap<K, TopNStaging> = HashMap::new(); // K -> `TopNStaging`

let data_types = self.schema.data_types();
let deserializer = RowDeserializer::new(data_types.clone());
Expand Down Expand Up @@ -201,8 +202,8 @@ where
let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(chunk.capacity()));
for staging in stagings.into_values() {
for res in staging.into_deserialized_changes(&deserializer) {
let (op, row) = res?;
let _none = chunk_builder.append_row(op, row);
let record = res?;
let _none = chunk_builder.append_record(record);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/top_n/top_n_appendonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ where
}
let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(staging.len()));
for res in staging.into_deserialized_changes(&deserializer) {
let (op, row) = res?;
let _none = chunk_builder.append_row(op, row);
let record = res?;
let _none = chunk_builder.append_record(record);
}
Ok(chunk_builder.take())
}
Expand Down
Loading
Loading