1313// limitations under the License.
1414
1515use std:: assert_matches:: assert_matches;
16- use std:: collections:: hash_map:: Entry ;
17- use std:: collections:: { HashMap , HashSet } ;
16+ use std:: collections:: HashSet ;
1817use std:: marker:: PhantomData ;
1918use std:: ops:: { Bound , Deref , Index } ;
2019
@@ -31,7 +30,6 @@ use risingwave_common::catalog::{
3130use risingwave_common:: hash:: { VirtualNode , VnodeBitmapExt } ;
3231use risingwave_common:: row:: { CompactedRow , OwnedRow , RowExt } ;
3332use risingwave_common:: types:: { DEBEZIUM_UNAVAILABLE_VALUE , DataType , ScalarImpl } ;
34- use risingwave_common:: util:: chunk_coalesce:: DataChunkBuilder ;
3533use risingwave_common:: util:: iter_util:: { ZipEqDebug , ZipEqFast } ;
3634use risingwave_common:: util:: sort_util:: { ColumnOrder , OrderType , cmp_datum} ;
3735use risingwave_common:: util:: value_encoding:: { BasicSerde , ValueRowSerializer } ;
@@ -507,7 +505,7 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
507505 )
508506 . await ?;
509507
510- match generate_output ( change_buffer, data_types. clone ( ) ) ? {
508+ match change_buffer. into_chunk ( data_types. clone ( ) ) {
511509 Some ( output_chunk) => {
512510 self . state_table . write_chunk ( output_chunk. clone ( ) ) ;
513511 self . state_table . try_flush ( ) . await ?;
@@ -1195,134 +1193,8 @@ fn handle_toast_columns_for_postgres_cdc(
11951193 OwnedRow :: new ( fixed_row_data)
11961194}
11971195
1198- /// Construct output `StreamChunk` from given buffer.
1199- fn generate_output (
1200- change_buffer : ChangeBuffer ,
1201- data_types : Vec < DataType > ,
1202- ) -> StreamExecutorResult < Option < StreamChunk > > {
1203- // construct output chunk
1204- // TODO(st1page): when materialize partial columns(), we should construct some columns in the pk
1205- let mut new_ops: Vec < Op > = vec ! [ ] ;
1206- let mut new_rows: Vec < OwnedRow > = vec ! [ ] ;
1207- for ( _, row_op) in change_buffer. into_parts ( ) {
1208- match row_op {
1209- ChangeBufferKeyOp :: Insert ( value) => {
1210- new_ops. push ( Op :: Insert ) ;
1211- new_rows. push ( value) ;
1212- }
1213- ChangeBufferKeyOp :: Delete ( old_value) => {
1214- new_ops. push ( Op :: Delete ) ;
1215- new_rows. push ( old_value) ;
1216- }
1217- ChangeBufferKeyOp :: Update ( ( old_value, new_value) ) => {
1218- // if old_value == new_value, we don't need to emit updates to downstream.
1219- if old_value != new_value {
1220- new_ops. push ( Op :: UpdateDelete ) ;
1221- new_ops. push ( Op :: UpdateInsert ) ;
1222- new_rows. push ( old_value) ;
1223- new_rows. push ( new_value) ;
1224- }
1225- }
1226- }
1227- }
1228- let mut data_chunk_builder = DataChunkBuilder :: new ( data_types, new_rows. len ( ) + 1 ) ;
1229-
1230- for row in new_rows {
1231- let res = data_chunk_builder. append_one_row ( row) ;
1232- debug_assert ! ( res. is_none( ) ) ;
1233- }
1234-
1235- if let Some ( new_data_chunk) = data_chunk_builder. consume_all ( ) {
1236- let new_stream_chunk = StreamChunk :: new ( new_ops, new_data_chunk. columns ( ) . to_vec ( ) ) ;
1237- Ok ( Some ( new_stream_chunk) )
1238- } else {
1239- Ok ( None )
1240- }
1241- }
1242-
1243- /// `ChangeBuffer` is a buffer to handle chunk into `KeyOp`.
1244- /// TODO(rc): merge with `TopNStaging`.
1245- pub struct ChangeBuffer {
1246- buffer : HashMap < Vec < u8 > , ChangeBufferKeyOp > ,
1247- }
1196+ type ChangeBuffer = crate :: common:: change_buffer:: ChangeBuffer < Vec < u8 > , OwnedRow > ;
12481197
1249- /// `KeyOp` variant for `ChangeBuffer` that stores `OwnedRow` instead of Bytes
1250- enum ChangeBufferKeyOp {
1251- Insert ( OwnedRow ) ,
1252- Delete ( OwnedRow ) ,
1253- /// (`old_value`, `new_value`)
1254- Update ( ( OwnedRow , OwnedRow ) ) ,
1255- }
1256-
1257- impl ChangeBuffer {
1258- fn new ( ) -> Self {
1259- Self {
1260- buffer : HashMap :: new ( ) ,
1261- }
1262- }
1263-
1264- fn insert ( & mut self , pk : Vec < u8 > , value : OwnedRow ) {
1265- let entry = self . buffer . entry ( pk) ;
1266- match entry {
1267- Entry :: Vacant ( e) => {
1268- e. insert ( ChangeBufferKeyOp :: Insert ( value) ) ;
1269- }
1270- Entry :: Occupied ( mut e) => {
1271- if let ChangeBufferKeyOp :: Delete ( old_value) = e. get_mut ( ) {
1272- let old_val = std:: mem:: take ( old_value) ;
1273- e. insert ( ChangeBufferKeyOp :: Update ( ( old_val, value) ) ) ;
1274- } else {
1275- unreachable ! ( ) ;
1276- }
1277- }
1278- }
1279- }
1280-
1281- fn delete ( & mut self , pk : Vec < u8 > , old_value : OwnedRow ) {
1282- let entry: Entry < ' _ , Vec < u8 > , ChangeBufferKeyOp > = self . buffer . entry ( pk) ;
1283- match entry {
1284- Entry :: Vacant ( e) => {
1285- e. insert ( ChangeBufferKeyOp :: Delete ( old_value) ) ;
1286- }
1287- Entry :: Occupied ( mut e) => match e. get_mut ( ) {
1288- ChangeBufferKeyOp :: Insert ( _) => {
1289- e. remove ( ) ;
1290- }
1291- ChangeBufferKeyOp :: Update ( ( prev, _curr) ) => {
1292- let prev = std:: mem:: take ( prev) ;
1293- e. insert ( ChangeBufferKeyOp :: Delete ( prev) ) ;
1294- }
1295- ChangeBufferKeyOp :: Delete ( _) => {
1296- unreachable ! ( ) ;
1297- }
1298- } ,
1299- }
1300- }
1301-
1302- fn update ( & mut self , pk : Vec < u8 > , old_value : OwnedRow , new_value : OwnedRow ) {
1303- let entry = self . buffer . entry ( pk) ;
1304- match entry {
1305- Entry :: Vacant ( e) => {
1306- e. insert ( ChangeBufferKeyOp :: Update ( ( old_value, new_value) ) ) ;
1307- }
1308- Entry :: Occupied ( mut e) => match e. get_mut ( ) {
1309- ChangeBufferKeyOp :: Insert ( _) => {
1310- e. insert ( ChangeBufferKeyOp :: Insert ( new_value) ) ;
1311- }
1312- ChangeBufferKeyOp :: Update ( ( _prev, curr) ) => {
1313- * curr = new_value;
1314- }
1315- ChangeBufferKeyOp :: Delete ( _) => {
1316- unreachable ! ( )
1317- }
1318- } ,
1319- }
1320- }
1321-
1322- fn into_parts ( self ) -> HashMap < Vec < u8 > , ChangeBufferKeyOp > {
1323- self . buffer
1324- }
1325- }
13261198impl < S : StateStore , SD : ValueRowSerde > Execute for MaterializeExecutor < S , SD > {
13271199 fn execute ( self : Box < Self > ) -> BoxedMessageStream {
13281200 self . execute_inner ( ) . boxed ( )
0 commit comments