Skip to content

Commit 5f7166e

Browse files
committed
use change buffer for top-n
1 parent e631679 commit 5f7166e

File tree

5 files changed

+28
-65
lines changed

5 files changed

+28
-65
lines changed

src/stream/src/executor/top_n/group_top_n.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::common::metrics::MetricsInfo;
3030
use crate::common::table::state_table::StateTablePostCommit;
3131
use crate::executor::monitor::GroupTopNMetrics;
3232
use crate::executor::prelude::*;
33+
use crate::executor::top_n::top_n_cache::TopNStaging;
3334

3435
pub type GroupTopNExecutor<K, S, const WITH_TIES: bool> =
3536
TopNExecutorWrapper<InnerGroupTopNExecutor<K, S, WITH_TIES>>;
@@ -169,7 +170,7 @@ where
169170
chunk: StreamChunk,
170171
) -> StreamExecutorResult<Option<StreamChunk>> {
171172
let keys = K::build_many(&self.group_by, chunk.data_chunk());
172-
let mut stagings = HashMap::new(); // K -> `TopNStaging`
173+
let mut stagings: HashMap<K, TopNStaging> = HashMap::new();
173174

174175
for (r, group_cache_key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
175176
let Some((op, row_ref)) = r else {
@@ -232,8 +233,8 @@ where
232233
let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(chunk.capacity()));
233234
for staging in stagings.into_values() {
234235
for res in staging.into_deserialized_changes(&deserializer) {
235-
let (op, row) = res?;
236-
let _none = chunk_builder.append_row(op, row);
236+
let record = res?;
237+
let _none = chunk_builder.append_record_eliminate_noop_update(record);
237238
}
238239
}
239240
Ok(chunk_builder.take())

src/stream/src/executor/top_n/group_top_n_appendonly.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::common::metrics::MetricsInfo;
2929
use crate::common::table::state_table::StateTablePostCommit;
3030
use crate::executor::monitor::GroupTopNMetrics;
3131
use crate::executor::prelude::*;
32+
use crate::executor::top_n::top_n_cache::TopNStaging;
3233

3334
/// If the input is append-only, `AppendOnlyGroupTopNExecutor` does not need
3435
/// to keep all the rows seen. As long as a record
@@ -150,7 +151,7 @@ where
150151
chunk: StreamChunk,
151152
) -> StreamExecutorResult<Option<StreamChunk>> {
152153
let keys = K::build_many(&self.group_by, chunk.data_chunk());
153-
let mut stagings = HashMap::new(); // K -> `TopNStaging`
154+
let mut stagings: HashMap<K, TopNStaging> = HashMap::new(); // K -> `TopNStaging`
154155

155156
let data_types = self.schema.data_types();
156157
let deserializer = RowDeserializer::new(data_types.clone());
@@ -201,8 +202,8 @@ where
201202
let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(chunk.capacity()));
202203
for staging in stagings.into_values() {
203204
for res in staging.into_deserialized_changes(&deserializer) {
204-
let (op, row) = res?;
205-
let _none = chunk_builder.append_row(op, row);
205+
let record = res?;
206+
let _none = chunk_builder.append_record_eliminate_noop_update(record);
206207
}
207208
}
208209

src/stream/src/executor/top_n/top_n_appendonly.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,8 @@ where
133133
}
134134
let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(staging.len()));
135135
for res in staging.into_deserialized_changes(&deserializer) {
136-
let (op, row) = res?;
137-
let _none = chunk_builder.append_row(op, row);
136+
let record = res?;
137+
let _none = chunk_builder.append_record_eliminate_noop_update(record);
138138
}
139139
Ok(chunk_builder.take())
140140
}

src/stream/src/executor/top_n/top_n_cache.rs

Lines changed: 16 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,20 @@
1313
// limitations under the License.
1414

1515
use std::cmp::Ordering;
16-
use std::collections::BTreeMap;
1716
use std::fmt::Debug;
1817
use std::future::Future;
1918

2019
use itertools::Itertools;
21-
use risingwave_common::array::{Op, RowRef};
20+
use risingwave_common::array::RowRef;
21+
use risingwave_common::array::stream_record::Record;
2222
use risingwave_common::row::{CompactedRow, OwnedRow, Row, RowDeserializer, RowExt};
2323
use risingwave_common::types::DataType;
2424
use risingwave_common_estimate_size::EstimateSize;
2525
use risingwave_common_estimate_size::collections::EstimatedBTreeMap;
2626
use risingwave_storage::StateStore;
2727

2828
use super::{GroupKey, ManagedTopNState};
29+
use crate::common::change_buffer::ChangeBuffer;
2930
use crate::consistency::{consistency_error, enable_strict_consistency};
3031
use crate::executor::error::StreamExecutorResult;
3132

@@ -824,9 +825,7 @@ impl AppendOnlyTopNCacheTrait for TopNCache<true> {
824825
/// It should be maintained when an entry is inserted or deleted from the `middle` cache.
825826
#[derive(Debug, Default)]
826827
pub struct TopNStaging {
827-
to_delete: BTreeMap<CacheKey, CompactedRow>,
828-
to_insert: BTreeMap<CacheKey, CompactedRow>,
829-
to_update: BTreeMap<CacheKey, (CompactedRow, CompactedRow)>,
828+
inner: ChangeBuffer<CacheKey, CompactedRow>,
830829
}
831830

832831
impl TopNStaging {
@@ -837,75 +836,37 @@ impl TopNStaging {
837836
/// Insert a row into the staging changes. This method must be called when a row is
838837
/// added to the `middle` cache.
839838
fn insert(&mut self, cache_key: CacheKey, row: CompactedRow) {
840-
if let Some(old_row) = self.to_delete.remove(&cache_key) {
841-
if old_row != row {
842-
self.to_update.insert(cache_key, (old_row, row));
843-
}
844-
} else {
845-
self.to_insert.insert(cache_key, row);
846-
}
839+
self.inner.insert(cache_key, row);
847840
}
848841

849842
/// Delete a row from the staging changes. This method must be called when a row is
850843
/// removed from the `middle` cache.
851844
fn delete(&mut self, cache_key: CacheKey, row: CompactedRow) {
852-
if self.to_insert.remove(&cache_key).is_some() {
853-
// do nothing more
854-
} else if let Some((old_row, _)) = self.to_update.remove(&cache_key) {
855-
self.to_delete.insert(cache_key, old_row);
856-
} else {
857-
self.to_delete.insert(cache_key, row);
858-
}
845+
self.inner.delete(cache_key, row);
859846
}
860847

861848
/// Get the count of effective changes in the staging.
862849
pub fn len(&self) -> usize {
863-
self.to_delete.len() + self.to_insert.len() + self.to_update.len()
850+
self.inner.len()
864851
}
865852

866853
/// Check if the staging is empty.
867854
pub fn is_empty(&self) -> bool {
868-
self.to_delete.is_empty() && self.to_insert.is_empty() && self.to_update.is_empty()
869-
}
870-
871-
/// Iterate over the changes in the staging.
872-
pub fn into_changes(self) -> impl Iterator<Item = (Op, CompactedRow)> {
873-
#[cfg(debug_assertions)]
874-
{
875-
let keys = self
876-
.to_delete
877-
.keys()
878-
.chain(self.to_insert.keys())
879-
.chain(self.to_update.keys())
880-
.unique()
881-
.count();
882-
assert_eq!(
883-
keys,
884-
self.to_delete.len() + self.to_insert.len() + self.to_update.len(),
885-
"should not have duplicate keys with different operations",
886-
);
887-
}
888-
889-
// We expect one `CacheKey` to appear at most once in the staging, and, the order of
890-
// the outputs of `TopN` doesn't really matter, so we can simply chain the three maps.
891-
// Although the output order is not important, we still ensure that `Delete`s are emitted
892-
// before `Insert`s, so that we can avoid temporary violation of the `LIMIT` constraint.
893-
self.to_update
894-
.into_values()
895-
.flat_map(|(old_row, new_row)| {
896-
[(Op::UpdateDelete, old_row), (Op::UpdateInsert, new_row)]
897-
})
898-
.chain(self.to_delete.into_values().map(|row| (Op::Delete, row)))
899-
.chain(self.to_insert.into_values().map(|row| (Op::Insert, row)))
855+
self.inner.is_empty()
900856
}
901857

902858
/// Iterate over the changes in the staging, and deserialize the rows.
903859
pub fn into_deserialized_changes(
904860
self,
905861
deserializer: &RowDeserializer,
906-
) -> impl Iterator<Item = StreamExecutorResult<(Op, OwnedRow)>> + '_ {
907-
self.into_changes()
908-
.map(|(op, row)| Ok((op, deserializer.deserialize(row.row.as_ref())?)))
862+
) -> impl Iterator<Item = StreamExecutorResult<Record<OwnedRow>>> + '_ {
863+
self.inner.into_records().map(|record| {
864+
record.try_map(|row| {
865+
deserializer
866+
.deserialize(row.row.as_ref())
867+
.map_err(Into::into)
868+
})
869+
})
909870
}
910871
}
911872

src/stream/src/executor/top_n/top_n_plain.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,8 @@ where
170170
}
171171
let mut chunk_builder = StreamChunkBuilder::unlimited(data_types, Some(staging.len()));
172172
for res in staging.into_deserialized_changes(&deserializer) {
173-
let (op, row) = res?;
174-
let _none = chunk_builder.append_row(op, row);
173+
let record = res?;
174+
let _none = chunk_builder.append_record_eliminate_noop_update(record);
175175
}
176176
Ok(chunk_builder.take())
177177
}

0 commit comments

Comments
 (0)