Skip to content

Commit de26741

Browse files
committed
null adaptor
1 parent 64d74db commit de26741

File tree

5 files changed

+161
-39
lines changed

5 files changed

+161
-39
lines changed

src/query/expression/src/aggregate/aggregate_function.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
7373
vec![StateSerdeItem::Binary(self.serialize_size_per_row())]
7474
}
7575

76-
fn serialize(&self, place: AggrState, builder: &mut ColumnBuilder) -> Result<()> {
77-
let binary_builder = builder.as_tuple_mut().unwrap()[0].as_binary_mut().unwrap();
76+
fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> {
77+
let binary_builder = builders[0].as_binary_mut().unwrap();
7878
self.serialize_binary(place, &mut binary_builder.data)?;
7979
binary_builder.commit_row();
8080
Ok(())
@@ -86,8 +86,8 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
8686
None
8787
}
8888

89-
fn merge(&self, place: AggrState, data: ScalarRef) -> Result<()> {
90-
let mut binary = *data.as_tuple().unwrap()[0].as_binary().unwrap();
89+
fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> {
90+
let mut binary = *data[0].as_binary().unwrap();
9191
self.merge_binary(place, &mut binary)
9292
}
9393

@@ -102,7 +102,10 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
102102
) -> Result<()> {
103103
let column = state.to_column();
104104
for (place, data) in places.iter().zip(column.iter()) {
105-
self.merge(AggrState::new(*place, loc), data)?;
105+
self.merge(
106+
AggrState::new(*place, loc),
107+
data.as_tuple().unwrap().as_slice(),
108+
)?;
106109
}
107110

108111
Ok(())
@@ -111,7 +114,7 @@ pub trait AggregateFunction: fmt::Display + Sync + Send {
111114
fn batch_merge_single(&self, place: AggrState, state: &BlockEntry) -> Result<()> {
112115
let column = state.to_column();
113116
for data in column.iter() {
114-
self.merge(place, data)?;
117+
self.merge(place, data.as_tuple().unwrap().as_slice())?;
115118
}
116119
Ok(())
117120
}

src/query/expression/src/aggregate/payload_flush.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,8 @@ impl Payload {
149149
.enumerate()
150150
{
151151
{
152-
let builder = &mut builders[idx];
153-
func.serialize(AggrState::new(*place, loc), builder)?;
152+
let builders = builders[idx].as_tuple_mut().unwrap().as_mut_slice();
153+
func.serialize(AggrState::new(*place, loc), builders)?;
154154
}
155155
}
156156
}

src/query/functions/src/aggregates/adaptors/aggregate_null_adaptor.rs

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ use databend_common_expression::utils::column_merge_validity;
2323
use databend_common_expression::ColumnBuilder;
2424
use databend_common_expression::ProjectedBlock;
2525
use databend_common_expression::Scalar;
26-
use databend_common_io::prelude::BinaryWrite;
26+
use databend_common_expression::ScalarRef;
27+
use databend_common_expression::StateSerdeItem;
2728

2829
use super::AggrState;
2930
use super::AggrStateLoc;
@@ -183,12 +184,24 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction for AggregateNullUnaryAdapto
183184
.accumulate_row(place, not_null_columns, validity, row)
184185
}
185186

186-
fn serialize_binary(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()> {
187-
self.0.serialize(place, writer)
187+
fn serialize_type(&self) -> Vec<StateSerdeItem> {
188+
self.0.serialize_type()
188189
}
189190

190-
fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> {
191-
self.0.merge(place, reader)
191+
fn serialize(&self, place: AggrState, builder: &mut [ColumnBuilder]) -> Result<()> {
192+
self.0.serialize(place, builder)
193+
}
194+
195+
fn serialize_binary(&self, _: AggrState, _: &mut Vec<u8>) -> Result<()> {
196+
unreachable!()
197+
}
198+
199+
fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> {
200+
self.0.merge(place, data)
201+
}
202+
203+
fn merge_binary(&self, _: AggrState, _: &mut &[u8]) -> Result<()> {
204+
unreachable!()
192205
}
193206

194207
fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> {
@@ -304,12 +317,20 @@ impl<const NULLABLE_RESULT: bool> AggregateFunction
304317
.accumulate_row(place, not_null_columns, validity, row)
305318
}
306319

307-
fn serialize_binary(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()> {
308-
self.0.serialize(place, writer)
320+
fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> {
321+
self.0.serialize(place, builders)
322+
}
323+
324+
fn serialize_binary(&self, _: AggrState, _: &mut Vec<u8>) -> Result<()> {
325+
unreachable!()
326+
}
327+
328+
fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> {
329+
self.0.merge(place, data)
309330
}
310331

311-
fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> {
312-
self.0.merge(place, reader)
332+
fn merge_binary(&self, _: AggrState, _: &mut &[u8]) -> Result<()> {
333+
unreachable!()
313334
}
314335

315336
fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> {
@@ -488,24 +509,42 @@ impl<const NULLABLE_RESULT: bool> CommonNullAdaptor<NULLABLE_RESULT> {
488509
.accumulate_row(place.remove_last_loc(), not_null_columns, row)
489510
}
490511

491-
fn serialize(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()> {
512+
fn serialize_type(&self) -> Vec<StateSerdeItem> {
492513
if !NULLABLE_RESULT {
493-
return self.nested.serialize_binary(place, writer);
514+
return self.nested.serialize_type();
494515
}
495-
496516
self.nested
497-
.serialize_binary(place.remove_last_loc(), writer)?;
517+
.serialize_type()
518+
.into_iter()
519+
.chain(Some(StateSerdeItem::Bool))
520+
.collect()
521+
}
522+
523+
fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> {
524+
if !NULLABLE_RESULT {
525+
return self.nested.serialize(place, builders);
526+
}
527+
let n = builders.len();
528+
debug_assert_eq!(self.nested.serialize_type().len() + 1, n);
529+
498530
let flag = get_flag(place);
499-
writer.write_scalar(&flag)
531+
builders
532+
.last_mut()
533+
.and_then(ColumnBuilder::as_boolean_mut)
534+
.unwrap()
535+
.push(flag);
536+
self.nested
537+
.serialize(place.remove_last_loc(), &mut builders[..(n - 1)])
500538
}
501539

502-
fn merge(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> {
540+
fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> {
503541
if !NULLABLE_RESULT {
504-
return self.nested.merge_binary(place, reader);
542+
return self.nested.merge(place, data);
505543
}
506544

507-
let flag = reader[reader.len() - 1];
508-
if flag == 0 {
545+
let n = data.len();
546+
let flag = *data.last().and_then(ScalarRef::as_boolean).unwrap();
547+
if !flag {
509548
return Ok(());
510549
}
511550

@@ -514,8 +553,7 @@ impl<const NULLABLE_RESULT: bool> CommonNullAdaptor<NULLABLE_RESULT> {
514553
self.init_state(place);
515554
}
516555
set_flag(place, true);
517-
self.nested
518-
.merge_binary(place.remove_last_loc(), &mut &reader[..reader.len() - 1])
556+
self.nested.merge(place.remove_last_loc(), &data[..n - 1])
519557
}
520558

521559
fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> {

src/query/functions/src/aggregates/adaptors/aggregate_ornull_adaptor.rs

Lines changed: 90 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ use databend_common_expression::AggrStateType;
2323
use databend_common_expression::ColumnBuilder;
2424
use databend_common_expression::ProjectedBlock;
2525
use databend_common_expression::Scalar;
26-
use databend_common_io::prelude::BinaryWrite;
26+
use databend_common_expression::ScalarRef;
27+
use databend_common_expression::StateSerdeItem;
2728

2829
use super::AggrState;
2930
use super::AggrStateLoc;
@@ -177,23 +178,45 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
177178
Ok(())
178179
}
179180

180-
#[inline]
181-
fn serialize_binary(&self, place: AggrState, writer: &mut Vec<u8>) -> Result<()> {
181+
fn serialize_type(&self) -> Vec<StateSerdeItem> {
182182
self.inner
183-
.serialize_binary(place.remove_last_loc(), writer)?;
184-
let flag = get_flag(place) as u8;
185-
writer.write_scalar(&flag)
183+
.serialize_type()
184+
.into_iter()
185+
.chain(Some(StateSerdeItem::Bool))
186+
.collect()
186187
}
187188

188-
#[inline]
189-
fn merge_binary(&self, place: AggrState, reader: &mut &[u8]) -> Result<()> {
190-
let flag = get_flag(place) || reader[reader.len() - 1] > 0;
189+
fn serialize(&self, place: AggrState, builders: &mut [ColumnBuilder]) -> Result<()> {
190+
let n = builders.len();
191+
debug_assert_eq!(self.inner.serialize_type().len() + 1, n);
192+
193+
let flag = get_flag(place);
194+
builders
195+
.last_mut()
196+
.and_then(ColumnBuilder::as_boolean_mut)
197+
.unwrap()
198+
.push(flag);
199+
200+
self.inner
201+
.serialize(place.remove_last_loc(), &mut builders[..n - 1])
202+
}
203+
204+
fn serialize_binary(&self, _: AggrState, _: &mut Vec<u8>) -> Result<()> {
205+
unreachable!()
206+
}
207+
208+
fn merge(&self, place: AggrState, data: &[ScalarRef]) -> Result<()> {
209+
let flag = get_flag(place) || *data.last().and_then(ScalarRef::as_boolean).unwrap();
191210
self.inner
192-
.merge_binary(place.remove_last_loc(), &mut &reader[..reader.len() - 1])?;
211+
.merge(place.remove_last_loc(), &data[..data.len() - 1])?;
193212
set_flag(place, flag);
194213
Ok(())
195214
}
196215

216+
fn merge_binary(&self, _: AggrState, _: &mut &[u8]) -> Result<()> {
217+
unreachable!()
218+
}
219+
197220
fn merge_states(&self, place: AggrState, rhs: AggrState) -> Result<()> {
198221
self.inner
199222
.merge_states(place.remove_last_loc(), rhs.remove_last_loc())?;
@@ -237,6 +260,63 @@ impl AggregateFunction for AggregateFunctionOrNullAdaptor {
237260
unsafe fn drop_state(&self, place: AggrState) {
238261
self.inner.drop_state(place.remove_last_loc())
239262
}
263+
264+
fn batch_merge(
265+
&self,
266+
places: &[StateAddr],
267+
loc: &[AggrStateLoc],
268+
state: &databend_common_expression::BlockEntry,
269+
) -> Result<()> {
270+
let column = state.to_column();
271+
for (place, data) in places.iter().zip(column.iter()) {
272+
self.merge(
273+
AggrState::new(*place, loc),
274+
data.as_tuple().unwrap().as_slice(),
275+
)?;
276+
}
277+
278+
Ok(())
279+
}
280+
281+
fn batch_merge_single(
282+
&self,
283+
place: AggrState,
284+
state: &databend_common_expression::BlockEntry,
285+
) -> Result<()> {
286+
let column = state.to_column();
287+
for data in column.iter() {
288+
self.merge(place, data.as_tuple().unwrap().as_slice())?;
289+
}
290+
Ok(())
291+
}
292+
293+
fn batch_merge_states(
294+
&self,
295+
places: &[StateAddr],
296+
rhses: &[StateAddr],
297+
loc: &[AggrStateLoc],
298+
) -> Result<()> {
299+
for (place, rhs) in places.iter().zip(rhses.iter()) {
300+
self.merge_states(AggrState::new(*place, loc), AggrState::new(*rhs, loc))?;
301+
}
302+
Ok(())
303+
}
304+
305+
fn batch_merge_result(
306+
&self,
307+
places: &[StateAddr],
308+
loc: Box<[AggrStateLoc]>,
309+
builder: &mut ColumnBuilder,
310+
) -> Result<()> {
311+
for place in places {
312+
self.merge_result(AggrState::new(*place, &loc), builder)?;
313+
}
314+
Ok(())
315+
}
316+
317+
fn get_if_condition(&self, _columns: ProjectedBlock) -> Option<Bitmap> {
318+
None
319+
}
240320
}
241321

242322
impl fmt::Display for AggregateFunctionOrNullAdaptor {

src/query/service/src/pipelines/processors/transforms/aggregator/transform_single_key.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ impl AccumulatingTransform for PartialSingleStateAggregator {
155155
)
156156
.zip(builders.iter_mut())
157157
{
158-
func.serialize(place, builder)?;
158+
let builders = builder.as_tuple_mut().unwrap().as_mut_slice();
159+
func.serialize(place, builders)?;
159160
}
160161

161162
let columns = builders.into_iter().map(|b| b.build()).collect();

0 commit comments

Comments
 (0)