From 817ac7d3de2cad37496b4ef35dc8f708ee27c39d Mon Sep 17 00:00:00 2001 From: "guojie.lgj" Date: Tue, 25 Jul 2023 19:14:43 +0800 Subject: [PATCH 1/4] fix offset index none --- parquet/src/column/writer/mod.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 1cacfe793328..933992752f55 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -500,14 +500,12 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { let metadata = self.write_column_metadata()?; self.page_writer.close()?; - let (column_index, offset_index) = if self.column_index_builder.valid() { - // build the column and offset index - let column_index = self.column_index_builder.build_to_thrift(); - let offset_index = self.offset_index_builder.build_to_thrift(); - (Some(column_index), Some(offset_index)) + let column_index = if self.column_index_builder.valid() { + Some(self.column_index_builder.build_to_thrift()) } else { - (None, None) + None }; + let offset_index = Some(self.offset_index_builder.build_to_thrift()); Ok(ColumnCloseResult { bytes_written: self.column_metrics.total_bytes_written, From 8509c81b974964208495e4e9fcecb47841dfbd9d Mon Sep 17 00:00:00 2001 From: "guojie.lgj" Date: Thu, 27 Jul 2023 14:48:01 +0800 Subject: [PATCH 2/4] add test --- parquet/src/arrow/arrow_writer/mod.rs | 32 +++++++++++++++++++++++++++ parquet/src/column/writer/mod.rs | 19 ++++++++++++---- 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index ccec4ffb20c0..732f1cd05219 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1650,6 +1650,38 @@ mod tests { writer.close().unwrap(); } + #[test] + fn check_page_offset_index() { + let values = Arc::new( + [Some(f64::NAN), Some(f64::NAN), Some(f64::NAN)] + .iter() + .cycle() + .copied() + .take(200_000) + .collect::(), + ); + let schema = + Schema::new(vec![Field::new("col", values.data_type().clone(), true)]); + let expected_batch = + RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap(); + let file = tempfile::tempfile().unwrap(); + + let mut writer = ArrowWriter::try_new( + file.try_clone().unwrap(), + expected_batch.schema(), + None, + ) + .expect("Unable to write file"); + writer.write(&expected_batch).unwrap(); + let file_meta_data = writer.close().unwrap(); + for row_group in file_meta_data.row_groups { + for column in row_group.columns { + assert!(column.offset_index_offset.is_some()); + assert!(column.offset_index_length.is_some()); + } + } + } + #[test] fn i8_single_column() { required_and_optional::(0..SMALL_SIZE as i8); diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 933992752f55..2fe987d81dac 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -500,12 +500,23 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { let metadata = self.write_column_metadata()?; self.page_writer.close()?; - let column_index = if self.column_index_builder.valid() { - Some(self.column_index_builder.build_to_thrift()) + // previous + let (column_index, offset_index) = if self.column_index_builder.valid() { + // build the column and offset index + let column_index = self.column_index_builder.build_to_thrift(); + let offset_index = self.offset_index_builder.build_to_thrift(); + (Some(column_index), Some(offset_index)) } else { - None + (None, None) }; - let offset_index = Some(self.offset_index_builder.build_to_thrift()); + + // modified + // let column_index = if self.column_index_builder.valid() { + // Some(self.column_index_builder.build_to_thrift()) + // } else { + // None + // }; + // let offset_index = Some(self.offset_index_builder.build_to_thrift()); Ok(ColumnCloseResult { bytes_written: self.column_metrics.total_bytes_written, From c058d960aee653a43fd7a7ada4ce7e43e0b5e3a4 Mon Sep 17 00:00:00 2001 From: "guojie.lgj" Date: Thu, 27 Jul 2023 19:16:00 +0800 Subject: [PATCH 3/4] add test --- parquet/src/arrow/arrow_writer/mod.rs | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 732f1cd05219..f59145be087e 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1651,7 +1651,7 @@ mod tests { } #[test] - fn check_page_offset_index() { + fn check_page_offset_index_with_nan() { let values = Arc::new( [Some(f64::NAN), Some(f64::NAN), Some(f64::NAN)] .iter() @@ -1676,10 +1676,25 @@ mod tests { let file_meta_data = writer.close().unwrap(); for row_group in file_meta_data.row_groups { for column in row_group.columns { - assert!(column.offset_index_offset.is_some()); - assert!(column.offset_index_length.is_some()); + // Note: Both offset_index_offset and offset_index_length are none. + assert!(column.offset_index_offset.is_none()); + assert!(column.offset_index_length.is_none()); } } + + let file_reader = SerializedFileReader::new_with_options( + file, + ReadOptionsBuilder::new() + .with_reader_properties( + ReaderProperties::builder() + .set_read_bloom_filter(true) + .build(), + ) + .with_page_index() + .build(), + ) + .unwrap(); + file_reader.metadata(); } #[test] From b942c1db8114dd374e750d2a0a7df04008ee6fc1 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 28 Jul 2023 11:06:03 +0100 Subject: [PATCH 4/4] Cleanup --- parquet/src/arrow/arrow_writer/mod.rs | 48 ++++++--------------------- parquet/src/column/writer/mod.rs | 22 +++--------- 2 files changed, 16 insertions(+), 54 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index f59145be087e..d3d4e2626fe3 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1652,49 +1652,23 @@ mod tests { #[test] fn check_page_offset_index_with_nan() { - let values = Arc::new( - [Some(f64::NAN), Some(f64::NAN), Some(f64::NAN)] - .iter() - .cycle() - .copied() - .take(200_000) - .collect::(), - ); - let schema = - Schema::new(vec![Field::new("col", values.data_type().clone(), true)]); - let expected_batch = - RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap(); - let file = tempfile::tempfile().unwrap(); + let values = Arc::new(Float64Array::from(vec![f64::NAN; 10])); + let schema = Schema::new(vec![Field::new("col", DataType::Float64, true)]); + let batch = RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap(); - let mut writer = ArrowWriter::try_new( - file.try_clone().unwrap(), - expected_batch.schema(), - None, - ) - .expect("Unable to write file"); - writer.write(&expected_batch).unwrap(); + let mut out = Vec::with_capacity(1024); + let mut writer = ArrowWriter::try_new(&mut out, batch.schema(), None) + .expect("Unable to write file"); + writer.write(&batch).unwrap(); let file_meta_data = writer.close().unwrap(); for row_group in file_meta_data.row_groups { for column in row_group.columns { - // Note: Both offset_index_offset and offset_index_length are none. - assert!(column.offset_index_offset.is_none()); - assert!(column.offset_index_length.is_none()); + assert!(column.offset_index_offset.is_some()); + assert!(column.offset_index_length.is_some()); + assert!(column.column_index_offset.is_none()); + assert!(column.column_index_length.is_none()); } } - - let file_reader = SerializedFileReader::new_with_options( - file, - ReadOptionsBuilder::new() - .with_reader_properties( - ReaderProperties::builder() - .set_read_bloom_filter(true) - .build(), - ) - .with_page_index() - .build(), - ) - .unwrap(); - file_reader.metadata(); } #[test] diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 2fe987d81dac..3d8ce283ae64 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -500,23 +500,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { let metadata = self.write_column_metadata()?; self.page_writer.close()?; - // previous - let (column_index, offset_index) = if self.column_index_builder.valid() { - // build the column and offset index - let column_index = self.column_index_builder.build_to_thrift(); - let offset_index = self.offset_index_builder.build_to_thrift(); - (Some(column_index), Some(offset_index)) - } else { - (None, None) - }; - - // modified - // let column_index = if self.column_index_builder.valid() { - // Some(self.column_index_builder.build_to_thrift()) - // } else { - // None - // }; - // let offset_index = Some(self.offset_index_builder.build_to_thrift()); + let column_index = self + .column_index_builder + .valid() + .then(|| self.column_index_builder.build_to_thrift()); + let offset_index = Some(self.offset_index_builder.build_to_thrift()); Ok(ColumnCloseResult { bytes_written: self.column_metrics.total_bytes_written,