1313// limitations under the License. 
1414
1515use  std:: assert_matches:: assert_matches; 
16- use  std:: collections:: HashSet ; 
16+ use  std:: collections:: hash_map:: Entry ; 
17+ use  std:: collections:: { HashMap ,  HashSet } ; 
1718use  std:: marker:: PhantomData ; 
1819use  std:: ops:: { Bound ,  Deref ,  Index } ; 
1920
@@ -30,6 +31,7 @@ use risingwave_common::catalog::{
3031use  risingwave_common:: hash:: { VirtualNode ,  VnodeBitmapExt } ; 
3132use  risingwave_common:: row:: { CompactedRow ,  OwnedRow ,  RowExt } ; 
3233use  risingwave_common:: types:: { DEBEZIUM_UNAVAILABLE_VALUE ,  DataType ,  ScalarImpl } ; 
34+ use  risingwave_common:: util:: chunk_coalesce:: DataChunkBuilder ; 
3335use  risingwave_common:: util:: iter_util:: { ZipEqDebug ,  ZipEqFast } ; 
3436use  risingwave_common:: util:: sort_util:: { ColumnOrder ,  OrderType ,  cmp_datum} ; 
3537use  risingwave_common:: util:: value_encoding:: { BasicSerde ,  ValueRowSerializer } ; 
@@ -505,7 +507,7 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
505507                                            ) 
506508                                            . await ?; 
507509
508-                                         match  change_buffer . into_chunk ( data_types. clone ( ) )  { 
510+                                         match  generate_output ( change_buffer ,   data_types. clone ( ) ) ?  { 
509511                                            Some ( output_chunk)  => { 
510512                                                self . state_table . write_chunk ( output_chunk. clone ( ) ) ; 
511513                                                self . state_table . try_flush ( ) . await ?; 
@@ -1193,8 +1195,134 @@ fn handle_toast_columns_for_postgres_cdc(
11931195    OwnedRow :: new ( fixed_row_data) 
11941196} 
11951197
1196- type  ChangeBuffer  = crate :: common:: change_buffer:: ChangeBuffer < Vec < u8 > ,  OwnedRow > ; 
1198+ /// Construct output `StreamChunk` from given buffer. 
1199+ fn  generate_output ( 
1200+     change_buffer :  ChangeBuffer , 
1201+     data_types :  Vec < DataType > , 
1202+ )  -> StreamExecutorResult < Option < StreamChunk > >  { 
1203+     // construct output chunk 
1204+     // TODO(st1page): when materialize partial columns(), we should construct some columns in the pk 
1205+     let  mut  new_ops:  Vec < Op >  = vec ! [ ] ; 
1206+     let  mut  new_rows:  Vec < OwnedRow >  = vec ! [ ] ; 
1207+     for  ( _,  row_op)  in  change_buffer. into_parts ( )  { 
1208+         match  row_op { 
1209+             ChangeBufferKeyOp :: Insert ( value)  => { 
1210+                 new_ops. push ( Op :: Insert ) ; 
1211+                 new_rows. push ( value) ; 
1212+             } 
1213+             ChangeBufferKeyOp :: Delete ( old_value)  => { 
1214+                 new_ops. push ( Op :: Delete ) ; 
1215+                 new_rows. push ( old_value) ; 
1216+             } 
1217+             ChangeBufferKeyOp :: Update ( ( old_value,  new_value) )  => { 
1218+                 // if old_value == new_value, we don't need to emit updates to downstream. 
1219+                 if  old_value != new_value { 
1220+                     new_ops. push ( Op :: UpdateDelete ) ; 
1221+                     new_ops. push ( Op :: UpdateInsert ) ; 
1222+                     new_rows. push ( old_value) ; 
1223+                     new_rows. push ( new_value) ; 
1224+                 } 
1225+             } 
1226+         } 
1227+     } 
1228+     let  mut  data_chunk_builder = DataChunkBuilder :: new ( data_types,  new_rows. len ( )  + 1 ) ; 
1229+ 
1230+     for  row in  new_rows { 
1231+         let  res = data_chunk_builder. append_one_row ( row) ; 
1232+         debug_assert ! ( res. is_none( ) ) ; 
1233+     } 
1234+ 
1235+     if  let  Some ( new_data_chunk)  = data_chunk_builder. consume_all ( )  { 
1236+         let  new_stream_chunk = StreamChunk :: new ( new_ops,  new_data_chunk. columns ( ) . to_vec ( ) ) ; 
1237+         Ok ( Some ( new_stream_chunk) ) 
1238+     }  else  { 
1239+         Ok ( None ) 
1240+     } 
1241+ } 
1242+ 
1243+ /// `ChangeBuffer` is a buffer to handle chunk into `KeyOp`. 
1244+ /// TODO(rc): merge with `TopNStaging`. 
1245+ pub  struct  ChangeBuffer  { 
1246+     buffer :  HashMap < Vec < u8 > ,  ChangeBufferKeyOp > , 
1247+ } 
11971248
1249+ /// `KeyOp` variant for `ChangeBuffer` that stores `OwnedRow` instead of Bytes 
1250+ enum  ChangeBufferKeyOp  { 
1251+     Insert ( OwnedRow ) , 
1252+     Delete ( OwnedRow ) , 
1253+     /// (`old_value`, `new_value`) 
1254+      Update ( ( OwnedRow ,  OwnedRow ) ) , 
1255+ } 
1256+ 
1257+ impl  ChangeBuffer  { 
1258+     fn  new ( )  -> Self  { 
1259+         Self  { 
1260+             buffer :  HashMap :: new ( ) , 
1261+         } 
1262+     } 
1263+ 
1264+     fn  insert ( & mut  self ,  pk :  Vec < u8 > ,  value :  OwnedRow )  { 
1265+         let  entry = self . buffer . entry ( pk) ; 
1266+         match  entry { 
1267+             Entry :: Vacant ( e)  => { 
1268+                 e. insert ( ChangeBufferKeyOp :: Insert ( value) ) ; 
1269+             } 
1270+             Entry :: Occupied ( mut  e)  => { 
1271+                 if  let  ChangeBufferKeyOp :: Delete ( old_value)  = e. get_mut ( )  { 
1272+                     let  old_val = std:: mem:: take ( old_value) ; 
1273+                     e. insert ( ChangeBufferKeyOp :: Update ( ( old_val,  value) ) ) ; 
1274+                 }  else  { 
1275+                     unreachable ! ( ) ; 
1276+                 } 
1277+             } 
1278+         } 
1279+     } 
1280+ 
1281+     fn  delete ( & mut  self ,  pk :  Vec < u8 > ,  old_value :  OwnedRow )  { 
1282+         let  entry:  Entry < ' _ ,  Vec < u8 > ,  ChangeBufferKeyOp >  = self . buffer . entry ( pk) ; 
1283+         match  entry { 
1284+             Entry :: Vacant ( e)  => { 
1285+                 e. insert ( ChangeBufferKeyOp :: Delete ( old_value) ) ; 
1286+             } 
1287+             Entry :: Occupied ( mut  e)  => match  e. get_mut ( )  { 
1288+                 ChangeBufferKeyOp :: Insert ( _)  => { 
1289+                     e. remove ( ) ; 
1290+                 } 
1291+                 ChangeBufferKeyOp :: Update ( ( prev,  _curr) )  => { 
1292+                     let  prev = std:: mem:: take ( prev) ; 
1293+                     e. insert ( ChangeBufferKeyOp :: Delete ( prev) ) ; 
1294+                 } 
1295+                 ChangeBufferKeyOp :: Delete ( _)  => { 
1296+                     unreachable ! ( ) ; 
1297+                 } 
1298+             } , 
1299+         } 
1300+     } 
1301+ 
1302+     fn  update ( & mut  self ,  pk :  Vec < u8 > ,  old_value :  OwnedRow ,  new_value :  OwnedRow )  { 
1303+         let  entry = self . buffer . entry ( pk) ; 
1304+         match  entry { 
1305+             Entry :: Vacant ( e)  => { 
1306+                 e. insert ( ChangeBufferKeyOp :: Update ( ( old_value,  new_value) ) ) ; 
1307+             } 
1308+             Entry :: Occupied ( mut  e)  => match  e. get_mut ( )  { 
1309+                 ChangeBufferKeyOp :: Insert ( _)  => { 
1310+                     e. insert ( ChangeBufferKeyOp :: Insert ( new_value) ) ; 
1311+                 } 
1312+                 ChangeBufferKeyOp :: Update ( ( _prev,  curr) )  => { 
1313+                     * curr = new_value; 
1314+                 } 
1315+                 ChangeBufferKeyOp :: Delete ( _)  => { 
1316+                     unreachable ! ( ) 
1317+                 } 
1318+             } , 
1319+         } 
1320+     } 
1321+ 
1322+     fn  into_parts ( self )  -> HashMap < Vec < u8 > ,  ChangeBufferKeyOp >  { 
1323+         self . buffer 
1324+     } 
1325+ } 
11981326impl < S :  StateStore ,  SD :  ValueRowSerde >  Execute  for  MaterializeExecutor < S ,  SD >  { 
11991327    fn  execute ( self :  Box < Self > )  -> BoxedMessageStream  { 
12001328        self . execute_inner ( ) . boxed ( ) 
0 commit comments