Skip to content

Commit 00296c2

Browse files
committed
add docs
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
1 parent 5eafb78 commit 00296c2

File tree

1 file changed

+34
-14
lines changed

1 file changed

+34
-14
lines changed

src/stream/src/common/change_buffer.rs

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
// Copyright 2025 RisingWave Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
115
use std::sync::LazyLock;
216

317
use indexmap::IndexMap;
@@ -10,10 +24,9 @@ use risingwave_common::types::DataType;
1024

1125
use crate::consistency::consistency_panic;
1226

13-
/// Behavior when inconsistency is detected during compaction.
14-
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
27+
/// Behavior when inconsistency is detected when applying changes.
28+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1529
pub enum InconsistencyBehavior {
16-
#[default]
1730
Panic,
1831
Warn,
1932
Tolerate,
@@ -46,26 +59,19 @@ mod private {
4659
impl<R> Row for R where R: Default {}
4760
}
4861

62+
/// A buffer that accumulates changes and produce compacted changes.
4963
#[derive(Debug)]
5064
pub struct ChangeBuffer<K, R> {
5165
buffer: IndexMap<K, Record<R>>,
5266
ib: InconsistencyBehavior,
5367
}
5468

55-
impl<K, R> Default for ChangeBuffer<K, R> {
56-
fn default() -> Self {
57-
Self {
58-
buffer: IndexMap::new(),
59-
ib: InconsistencyBehavior::default(),
60-
}
61-
}
62-
}
63-
6469
impl<K, R> ChangeBuffer<K, R>
6570
where
6671
K: private::Key,
6772
R: private::Row,
6873
{
74+
/// Apply an insertion of a row with the given key.
6975
pub fn insert(&mut self, key: K, new_row: R) {
7076
let entry = self.buffer.entry(key);
7177
match entry {
@@ -89,6 +95,7 @@ where
8995
}
9096
}
9197

98+
/// Apply a deletion of a row with the given key.
9299
pub fn delete(&mut self, key: K, old_row: R) {
93100
let entry = self.buffer.entry(key);
94101
match entry {
@@ -111,6 +118,7 @@ where
111118
}
112119
}
113120

121+
/// Apply an update of a row with the given key.
114122
pub fn update(&mut self, key: K, old_row: R, new_row: R) {
115123
let entry = self.buffer.entry(key);
116124
match entry {
@@ -132,6 +140,10 @@ where
132140
}
133141
}
134142

143+
/// Apply a change record, with the key extracted by the given function.
144+
///
145+
/// For `Record::Update`, inconsistency is reported if the old key and the new key are different.
146+
/// Further behavior is determined by the `InconsistencyBehavior`.
135147
pub fn apply_record(&mut self, record: Record<R>, key_fn: impl Fn(&R) -> K) {
136148
match record {
137149
Record::Insert { new_row } => self.insert(key_fn(&new_row), new_row),
@@ -150,36 +162,43 @@ where
150162
}
151163

152164
impl<K, R> ChangeBuffer<K, R> {
165+
/// Create a new `ChangeBuffer` that panics on inconsistency.
153166
pub fn new() -> Self {
154-
Self::default()
167+
Self::with_capacity(0)
155168
}
156169

170+
/// Create a new `ChangeBuffer` with the given capacity that panics on inconsistency.
157171
pub fn with_capacity(capacity: usize) -> Self {
158172
Self {
159173
buffer: IndexMap::with_capacity(capacity),
160-
..Default::default()
174+
ib: InconsistencyBehavior::Panic,
161175
}
162176
}
163177

178+
/// Set the inconsistency behavior.
164179
pub fn with_inconsistency_behavior(mut self, ib: InconsistencyBehavior) -> Self {
165180
self.ib = ib;
166181
self
167182
}
168183

184+
/// Get the number of keys that have pending changes in the buffer.
169185
pub fn len(&self) -> usize {
170186
self.buffer.len()
171187
}
172188

189+
/// Check if the buffer is empty.
173190
pub fn is_empty(&self) -> bool {
174191
self.buffer.is_empty()
175192
}
176193

194+
/// Consume the buffer and produce a list of change records.
177195
pub fn into_records(self) -> impl ExactSizeIterator<Item = Record<R>> {
178196
self.buffer.into_values()
179197
}
180198
}
181199

182200
impl<K, R: Row> ChangeBuffer<K, R> {
201+
/// Consume the buffer and produce a single compacted chunk.
183202
pub fn into_chunk(self, data_types: Vec<DataType>) -> Option<StreamChunk> {
184203
let mut builder = StreamChunkBuilder::unlimited(data_types, Some(self.buffer.len()));
185204
for record in self.into_records() {
@@ -189,6 +208,7 @@ impl<K, R: Row> ChangeBuffer<K, R> {
189208
builder.take()
190209
}
191210

211+
/// Consume the buffer and produce a list of compacted chunks with the given size at most.
192212
pub fn into_chunks(self, data_types: Vec<DataType>, chunk_size: usize) -> Vec<StreamChunk> {
193213
let mut res = Vec::new();
194214
let mut builder = StreamChunkBuilder::new(chunk_size, data_types);

0 commit comments

Comments
 (0)