diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index ab5a96f751..de6bd1ed59 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -54,8 +54,9 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator; use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; +use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; -use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; +use crate::spec::{Datum, NameMapping, NestedField, PrimitiveLiteral, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; @@ -250,12 +251,20 @@ impl ArrowReader { initial_stream_builder }; + // Filter out metadata fields for Parquet projection (they don't exist in files) + let project_field_ids_without_metadata: Vec = task + .project_field_ids + .iter() + .filter(|&&id| !is_metadata_field(id)) + .copied() + .collect(); + // Create projection mask based on field IDs // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false) // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match) // - If fallback IDs: position-based projection (missing_field_ids=true) let projection_mask = Self::get_arrow_projection_mask( - &task.project_field_ids, + &project_field_ids_without_metadata, &task.schema, record_batch_stream_builder.parquet_schema(), record_batch_stream_builder.schema(), @@ -266,16 +275,20 @@ impl ArrowReader { record_batch_stream_builder.with_projection(projection_mask.clone()); // RecordBatchTransformer performs any transformations required on the RecordBatches - // that come back from the file, such as type promotion, default column insertion - // and column re-ordering. + // that come back from the file, such as type promotion, default column insertion, + // column re-ordering, partition constants, and virtual field addition (like _file) let mut record_batch_transformer_builder = - RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids()); + RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids()) + .with_constant( + RESERVED_FIELD_ID_FILE, + PrimitiveLiteral::String(task.data_file_path.clone()), + )?; if let (Some(partition_spec), Some(partition_data)) = (task.partition_spec.clone(), task.partition.clone()) { record_batch_transformer_builder = - record_batch_transformer_builder.with_partition(partition_spec, partition_data); + record_batch_transformer_builder.with_partition(partition_spec, partition_data)?; } let mut record_batch_transformer = record_batch_transformer_builder.build(); @@ -416,7 +429,10 @@ impl ArrowReader { record_batch_stream_builder .build()? .map(move |batch| match batch { - Ok(batch) => record_batch_transformer.process_record_batch(batch), + Ok(batch) => { + // Process the record batch (type promotion, column reordering, virtual fields, etc.) + record_batch_transformer.process_record_batch(batch) + } Err(err) => Err(err.into()), }); diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index a20adb6a5a..8a3018b66a 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -19,18 +19,19 @@ use std::collections::HashMap; use std::sync::Arc; use arrow_array::{ - Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float32Array, - Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, StringArray, - StructArray, + Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, Decimal128Array, + Float32Array, Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, + RunArray, StringArray, StructArray, }; use arrow_buffer::NullBuffer; use arrow_cast::cast; use arrow_schema::{ - DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, + DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, }; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::schema_to_arrow_schema; +use crate::metadata_columns::get_metadata_field; use crate::spec::{ Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, Transform, }; @@ -146,13 +147,13 @@ enum SchemaComparison { /// Builder for RecordBatchTransformer to improve ergonomics when constructing with optional parameters. /// -/// See [`RecordBatchTransformer`] for details on partition spec and partition data. +/// Constant fields are pre-computed for both virtual/metadata fields (like _file) and +/// identity-partitioned fields to avoid duplicate work during batch processing. #[derive(Debug)] pub(crate) struct RecordBatchTransformerBuilder { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, - partition_spec: Option>, - partition_data: Option, + constant_fields: HashMap, } impl RecordBatchTransformerBuilder { @@ -163,32 +164,49 @@ impl RecordBatchTransformerBuilder { Self { snapshot_schema, projected_iceberg_field_ids: projected_iceberg_field_ids.to_vec(), - partition_spec: None, - partition_data: None, + constant_fields: HashMap::new(), } } + /// Add a constant value for a specific field ID. + /// This is used for virtual/metadata fields like _file that have constant values per batch. + /// + /// # Arguments + /// * `field_id` - The field ID to associate with the constant + /// * `value` - The constant value for this field + pub(crate) fn with_constant(mut self, field_id: i32, value: PrimitiveLiteral) -> Result { + let arrow_type = RecordBatchTransformer::primitive_literal_to_arrow_type(&value)?; + self.constant_fields.insert(field_id, (arrow_type, value)); + Ok(self) + } + /// Set partition spec and data together for identifying identity-transformed partition columns. /// /// Both partition_spec and partition_data must be provided together since the spec defines /// which fields are identity-partitioned, and the data provides their constant values. - /// One without the other cannot produce a valid constants map. + /// This method computes the partition constants and merges them into constant_fields. pub(crate) fn with_partition( mut self, partition_spec: Arc, partition_data: Struct, - ) -> Self { - self.partition_spec = Some(partition_spec); - self.partition_data = Some(partition_data); - self + ) -> Result { + // Compute partition constants for identity-transformed fields + let partition_constants = constants_map(&partition_spec, &partition_data); + + // Add partition constants to constant_fields (compute REE types from literals) + for (field_id, value) in partition_constants { + let arrow_type = RecordBatchTransformer::primitive_literal_to_arrow_type(&value)?; + self.constant_fields.insert(field_id, (arrow_type, value)); + } + + Ok(self) } pub(crate) fn build(self) -> RecordBatchTransformer { RecordBatchTransformer { snapshot_schema: self.snapshot_schema, projected_iceberg_field_ids: self.projected_iceberg_field_ids, - partition_spec: self.partition_spec, - partition_data: self.partition_data, + constant_fields: self.constant_fields, batch_transform: None, } } @@ -228,16 +246,10 @@ impl RecordBatchTransformerBuilder { pub(crate) struct RecordBatchTransformer { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, - - /// Partition spec for identifying identity-transformed partition columns (spec rule #1). - /// Only fields with identity transforms use partition data constants; non-identity transforms - /// (bucket, truncate, etc.) must read source columns from data files. - partition_spec: Option>, - - /// Partition data providing constant values for identity-transformed partition columns (spec rule #1). - /// For example, in a file at path `dept=engineering/file.parquet`, this would contain - /// the value "engineering" for the dept field. - partition_data: Option, + // Pre-computed constant field information: field_id -> (arrow_type, value) + // Includes both virtual/metadata fields (like _file) and identity-partitioned fields + // Avoids type conversions during batch processing + constant_fields: HashMap, // BatchTransform gets lazily constructed based on the schema of // the first RecordBatch we receive from the file @@ -279,8 +291,7 @@ impl RecordBatchTransformer { record_batch.schema_ref(), self.snapshot_schema.as_ref(), &self.projected_iceberg_field_ids, - self.partition_spec.as_ref().map(|s| s.as_ref()), - self.partition_data.as_ref(), + &self.constant_fields, )?); self.process_record_batch(record_batch)? @@ -299,8 +310,7 @@ impl RecordBatchTransformer { source_schema: &ArrowSchemaRef, snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], - partition_spec: Option<&PartitionSpec>, - partition_data: Option<&Struct>, + constant_fields: &HashMap, ) -> Result { let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); let field_id_to_mapped_schema_map = @@ -311,22 +321,39 @@ impl RecordBatchTransformer { let fields: Result> = projected_iceberg_field_ids .iter() .map(|field_id| { - Ok(field_id_to_mapped_schema_map - .get(field_id) - .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? - .0 - .clone()) + // Check if this is a constant field (virtual or partition) + if constant_fields.contains_key(field_id) { + // For metadata/virtual fields (like _file), get name from metadata_columns + // For partition fields, get name from schema (they exist in schema) + if let Ok(field) = get_metadata_field(*field_id) { + // This is a metadata/virtual field - use the predefined field + Ok(field) + } else { + // This is a partition constant field (exists in schema but uses constant value) + let field = &field_id_to_mapped_schema_map + .get(field_id) + .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? + .0; + let (arrow_type, _) = constant_fields.get(field_id).unwrap(); + // Use the type from constant_fields (REE for constants) + let constant_field = + Field::new(field.name(), arrow_type.clone(), field.is_nullable()) + .with_metadata(field.metadata().clone()); + Ok(Arc::new(constant_field)) + } + } else { + // Regular field - use schema as-is + Ok(field_id_to_mapped_schema_map + .get(field_id) + .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? + .0 + .clone()) + } }) .collect(); let target_schema = Arc::new(ArrowSchema::new(fields?)); - let constants_map = if let (Some(spec), Some(data)) = (partition_spec, partition_data) { - constants_map(spec, data) - } else { - HashMap::new() - }; - match Self::compare_schemas(source_schema, &target_schema) { SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough), SchemaComparison::NameChangesOnly => Ok(BatchTransform::ModifySchema { target_schema }), @@ -336,8 +363,7 @@ impl RecordBatchTransformer { snapshot_schema, projected_iceberg_field_ids, field_id_to_mapped_schema_map, - constants_map, - partition_spec, + constant_fields, )?, target_schema, }), @@ -394,8 +420,7 @@ impl RecordBatchTransformer { snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], field_id_to_mapped_schema_map: HashMap, - constants_map: HashMap, - _partition_spec: Option<&PartitionSpec>, + constant_fields: &HashMap, ) -> Result> { let field_id_to_source_schema_map = Self::build_field_id_to_arrow_schema_map(source_schema)?; @@ -403,6 +428,17 @@ impl RecordBatchTransformer { projected_iceberg_field_ids .iter() .map(|field_id| { + // Check if this is a constant field (metadata/virtual or identity-partitioned) + // Constant fields always use their pre-computed constant values, regardless of whether + // they exist in the Parquet file. This is per Iceberg spec rule #1: partition metadata + // is authoritative and should be preferred over file data. + if let Some((arrow_type, value)) = constant_fields.get(field_id) { + return Ok(ColumnSource::Add { + value: Some(value.clone()), + target_type: arrow_type.clone(), + }); + } + let (target_field, _) = field_id_to_mapped_schema_map .get(field_id) @@ -451,13 +487,8 @@ impl RecordBatchTransformer { ); // Apply spec's fallback steps for "not present" fields. - let column_source = if let Some(constant_value) = constants_map.get(field_id) { - // Rule #1: Identity partition constant - ColumnSource::Add { - value: Some(constant_value.clone()), - target_type: target_type.clone(), - } - } else if let Some(source) = field_by_id { + // Rule #1 (constants) is handled at the beginning of this function + let column_source = if let Some(source) = field_by_id { source } else { // Rules #2, #3 and #4: @@ -471,6 +502,7 @@ impl RecordBatchTransformer { None } }); + ColumnSource::Add { value: default_value, target_type: target_type.clone(), @@ -539,83 +571,253 @@ impl RecordBatchTransformer { prim_lit: &Option, num_rows: usize, ) -> Result { - Ok(match (target_type, prim_lit) { - (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => { - Arc::new(BooleanArray::from(vec![*value; num_rows])) - } - (DataType::Boolean, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(BooleanArray::from(vals)) - } - (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => { - Arc::new(Int32Array::from(vec![*value; num_rows])) - } - (DataType::Int32, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Int32Array::from(vals)) - } - (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => { - Arc::new(Date32Array::from(vec![*value; num_rows])) - } - (DataType::Date32, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Date32Array::from(vals)) - } - (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => { - Arc::new(Int64Array::from(vec![*value; num_rows])) - } - (DataType::Int64, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Int64Array::from(vals)) - } - (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => { - Arc::new(Float32Array::from(vec![value.0; num_rows])) - } - (DataType::Float32, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Float32Array::from(vals)) - } - (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => { - Arc::new(Float64Array::from(vec![value.0; num_rows])) - } - (DataType::Float64, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(Float64Array::from(vals)) - } - (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => { - Arc::new(StringArray::from(vec![value.clone(); num_rows])) - } - (DataType::Utf8, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(StringArray::from(vals)) - } - (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => { - Arc::new(BinaryArray::from_vec(vec![value; num_rows])) - } - (DataType::Binary, None) => { - let vals: Vec> = vec![None; num_rows]; - Arc::new(BinaryArray::from_opt_vec(vals)) - } - (DataType::Struct(fields), None) => { - // Create a StructArray filled with nulls. Per Iceberg spec, optional struct fields - // default to null when added to the schema. We defer non-null default struct values - // and leave them as not implemented yet. - let null_arrays: Vec = fields - .iter() - .map(|field| Self::create_column(field.data_type(), &None, num_rows)) - .collect::>>()?; - - Arc::new(StructArray::new( - fields.clone(), - null_arrays, - Some(NullBuffer::new_null(num_rows)), + // Check if this is a RunEndEncoded type (for constant fields) + if let DataType::RunEndEncoded(_, values_field) = target_type { + // Helper to create a Run-End Encoded array + let create_ree_array = |values_array: ArrayRef| -> Result { + let run_ends = if num_rows == 0 { + Int32Array::from(Vec::::new()) + } else { + Int32Array::from(vec![num_rows as i32]) + }; + Ok(Arc::new( + RunArray::try_new(&run_ends, &values_array).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to create RunArray for constant value", + ) + .with_source(e) + })?, )) - } - (DataType::Null, _) => Arc::new(NullArray::new(num_rows)), - (dt, _) => { + }; + + // Create the values array based on the literal value + let values_array: ArrayRef = match (values_field.data_type(), prim_lit) { + (DataType::Boolean, Some(PrimitiveLiteral::Boolean(v))) => { + Arc::new(BooleanArray::from(vec![*v])) + } + (DataType::Boolean, None) => { + Arc::new(BooleanArray::from(vec![Option::::None])) + } + (DataType::Int32, Some(PrimitiveLiteral::Int(v))) => { + Arc::new(Int32Array::from(vec![*v])) + } + (DataType::Int32, None) => Arc::new(Int32Array::from(vec![Option::::None])), + (DataType::Date32, Some(PrimitiveLiteral::Int(v))) => { + Arc::new(Date32Array::from(vec![*v])) + } + (DataType::Date32, None) => Arc::new(Date32Array::from(vec![Option::::None])), + (DataType::Int64, Some(PrimitiveLiteral::Long(v))) => { + Arc::new(Int64Array::from(vec![*v])) + } + (DataType::Int64, None) => Arc::new(Int64Array::from(vec![Option::::None])), + (DataType::Float32, Some(PrimitiveLiteral::Float(v))) => { + Arc::new(Float32Array::from(vec![v.0])) + } + (DataType::Float32, None) => { + Arc::new(Float32Array::from(vec![Option::::None])) + } + (DataType::Float64, Some(PrimitiveLiteral::Double(v))) => { + Arc::new(Float64Array::from(vec![v.0])) + } + (DataType::Float64, None) => { + Arc::new(Float64Array::from(vec![Option::::None])) + } + (DataType::Utf8, Some(PrimitiveLiteral::String(v))) => { + Arc::new(StringArray::from(vec![v.as_str()])) + } + (DataType::Utf8, None) => Arc::new(StringArray::from(vec![Option::<&str>::None])), + (DataType::Binary, Some(PrimitiveLiteral::Binary(v))) => { + Arc::new(BinaryArray::from_vec(vec![v.as_slice()])) + } + (DataType::Binary, None) => { + Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None])) + } + (DataType::Decimal128(_, _), Some(PrimitiveLiteral::Int128(v))) => { + Arc::new(arrow_array::Decimal128Array::from(vec![{ *v }])) + } + (DataType::Decimal128(_, _), Some(PrimitiveLiteral::UInt128(v))) => { + Arc::new(arrow_array::Decimal128Array::from(vec![*v as i128])) + } + (DataType::Decimal128(_, _), None) => { + Arc::new(arrow_array::Decimal128Array::from(vec![ + Option::::None, + ])) + } + (DataType::Struct(fields), None) => { + // Create a single-element StructArray with nulls + let null_arrays: Vec = fields + .iter() + .map(|f| { + // Recursively create null arrays for struct fields + // For primitive fields in structs, use simple null arrays (not REE within struct) + match f.data_type() { + DataType::Boolean => { + Arc::new(BooleanArray::from(vec![Option::::None])) + as ArrayRef + } + DataType::Int32 | DataType::Date32 => { + Arc::new(Int32Array::from(vec![Option::::None])) + } + DataType::Int64 => { + Arc::new(Int64Array::from(vec![Option::::None])) + } + DataType::Float32 => { + Arc::new(Float32Array::from(vec![Option::::None])) + } + DataType::Float64 => { + Arc::new(Float64Array::from(vec![Option::::None])) + } + DataType::Utf8 => { + Arc::new(StringArray::from(vec![Option::<&str>::None])) + } + DataType::Binary => { + Arc::new(BinaryArray::from_opt_vec(vec![Option::<&[u8]>::None])) + } + _ => panic!("Unsupported struct field type: {:?}", f.data_type()), + } + }) + .collect(); + Arc::new(arrow_array::StructArray::new( + fields.clone(), + null_arrays, + Some(arrow_buffer::NullBuffer::new_null(1)), + )) + } + _ => { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Unsupported constant type combination: {:?} with {:?}", + values_field.data_type(), + prim_lit + ), + )); + } + }; + + // Wrap in Run-End Encoding + create_ree_array(values_array) + } else { + // Non-REE type (simple arrays for non-constant fields) + Ok(match (target_type, prim_lit) { + (DataType::Boolean, Some(PrimitiveLiteral::Boolean(value))) => { + Arc::new(BooleanArray::from(vec![*value; num_rows])) + } + (DataType::Boolean, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(BooleanArray::from(vals)) + } + (DataType::Int32, Some(PrimitiveLiteral::Int(value))) => { + Arc::new(Int32Array::from(vec![*value; num_rows])) + } + (DataType::Int32, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Int32Array::from(vals)) + } + (DataType::Date32, Some(PrimitiveLiteral::Int(value))) => { + Arc::new(Date32Array::from(vec![*value; num_rows])) + } + (DataType::Date32, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Date32Array::from(vals)) + } + (DataType::Int64, Some(PrimitiveLiteral::Long(value))) => { + Arc::new(Int64Array::from(vec![*value; num_rows])) + } + (DataType::Int64, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Int64Array::from(vals)) + } + (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => { + Arc::new(Float32Array::from(vec![value.0; num_rows])) + } + (DataType::Float32, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Float32Array::from(vals)) + } + (DataType::Float64, Some(PrimitiveLiteral::Double(value))) => { + Arc::new(Float64Array::from(vec![value.0; num_rows])) + } + (DataType::Float64, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Float64Array::from(vals)) + } + (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => { + Arc::new(StringArray::from(vec![value.clone(); num_rows])) + } + (DataType::Utf8, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(StringArray::from(vals)) + } + (DataType::Binary, Some(PrimitiveLiteral::Binary(value))) => { + Arc::new(BinaryArray::from_vec(vec![value; num_rows])) + } + (DataType::Binary, None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(BinaryArray::from_opt_vec(vals)) + } + (DataType::Decimal128(_, _), Some(PrimitiveLiteral::Int128(value))) => { + Arc::new(Decimal128Array::from(vec![*value; num_rows])) + } + (DataType::Decimal128(_, _), Some(PrimitiveLiteral::UInt128(value))) => { + Arc::new(Decimal128Array::from(vec![*value as i128; num_rows])) + } + (DataType::Decimal128(_, _), None) => { + let vals: Vec> = vec![None; num_rows]; + Arc::new(Decimal128Array::from(vals)) + } + (DataType::Struct(fields), None) => { + // Create a StructArray filled with nulls + let null_arrays: Vec = fields + .iter() + .map(|field| Self::create_column(field.data_type(), &None, num_rows)) + .collect::>>()?; + + Arc::new(StructArray::new( + fields.clone(), + null_arrays, + Some(NullBuffer::new_null(num_rows)), + )) + } + (DataType::Null, _) => Arc::new(NullArray::new(num_rows)), + (dt, _) => { + return Err(Error::new( + ErrorKind::Unexpected, + format!("unexpected target column type {}", dt), + )); + } + }) + } + } + + /// Converts a PrimitiveLiteral to its corresponding Arrow DataType. + /// This is used for constant fields to determine the Arrow type. + /// For constant values, we use Run-End Encoding for all types to save memory. + fn primitive_literal_to_arrow_type(literal: &PrimitiveLiteral) -> Result { + // Helper to create REE type with the given values type + // Note: values field is nullable as Arrow expects this when building the + // final Arrow schema with `RunArray::try_new`. + let make_ree = |values_type: DataType| -> DataType { + let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); + let values_field = Arc::new(Field::new("values", values_type, true)); + DataType::RunEndEncoded(run_ends_field, values_field) + }; + + Ok(match literal { + PrimitiveLiteral::Boolean(_) => make_ree(DataType::Boolean), + PrimitiveLiteral::Int(_) => make_ree(DataType::Int32), + PrimitiveLiteral::Long(_) => make_ree(DataType::Int64), + PrimitiveLiteral::Float(_) => make_ree(DataType::Float32), + PrimitiveLiteral::Double(_) => make_ree(DataType::Float64), + PrimitiveLiteral::String(_) => make_ree(DataType::Utf8), + PrimitiveLiteral::Binary(_) => make_ree(DataType::Binary), + PrimitiveLiteral::Int128(_) => make_ree(DataType::Decimal128(38, 0)), + PrimitiveLiteral::UInt128(_) => make_ree(DataType::Decimal128(38, 0)), + PrimitiveLiteral::AboveMax | PrimitiveLiteral::BelowMin => { return Err(Error::new( ErrorKind::Unexpected, - format!("unexpected target column type {}", dt), + "Cannot create arrow type for AboveMax/BelowMin literal", )); } }) @@ -639,6 +841,54 @@ mod test { }; use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Struct, Type}; + /// Helper to extract string values from either StringArray or RunEndEncoded + /// Returns empty string for null values + fn get_string_value(array: &dyn Array, index: usize) -> String { + if let Some(string_array) = array.as_any().downcast_ref::() { + if string_array.is_null(index) { + String::new() + } else { + string_array.value(index).to_string() + } + } else if let Some(run_array) = array + .as_any() + .downcast_ref::>() + { + let values = run_array.values(); + let string_values = values + .as_any() + .downcast_ref::() + .expect("REE values should be StringArray"); + // For REE, all rows have the same value (index 0 in the values array) + if string_values.is_null(0) { + String::new() + } else { + string_values.value(0).to_string() + } + } else { + panic!("Expected StringArray or RunEndEncoded"); + } + } + + /// Helper to extract int values from either Int32Array or RunEndEncoded + fn get_int_value(array: &dyn Array, index: usize) -> i32 { + if let Some(int_array) = array.as_any().downcast_ref::() { + int_array.value(index) + } else if let Some(run_array) = array + .as_any() + .downcast_ref::>() + { + let values = run_array.values(); + let int_values = values + .as_any() + .downcast_ref::() + .expect("REE values should be Int32Array"); + int_values.value(0) + } else { + panic!("Expected Int32Array or RunEndEncoded"); + } + } + #[test] fn build_field_id_to_source_schema_map_works() { let arrow_schema = arrow_schema_already_same_as_target(); @@ -1137,6 +1387,7 @@ mod test { let mut transformer = RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) .with_partition(partition_spec, partition_data) + .expect("Failed to add partition constants") .build(); // Create a Parquet RecordBatch with actual data @@ -1257,6 +1508,7 @@ mod test { let mut transformer = RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) .with_partition(partition_spec, partition_data) + .expect("Failed to add partition constants") .build(); let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ @@ -1271,30 +1523,23 @@ mod test { assert_eq!(result.num_columns(), 3); assert_eq!(result.num_rows(), 2); - let id_column = result - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(id_column.value(0), 100); - assert_eq!(id_column.value(1), 200); + // Use helpers to handle both simple and REE arrays + assert_eq!(get_int_value(result.column(0).as_ref(), 0), 100); + assert_eq!(get_int_value(result.column(0).as_ref(), 1), 200); - let dept_column = result - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - // This value MUST come from partition metadata (constant) - assert_eq!(dept_column.value(0), "engineering"); - assert_eq!(dept_column.value(1), "engineering"); + // dept column comes from partition metadata (constant) - will be REE + assert_eq!( + get_string_value(result.column(1).as_ref(), 0), + "engineering" + ); + assert_eq!( + get_string_value(result.column(1).as_ref(), 1), + "engineering" + ); - let name_column = result - .column(2) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(name_column.value(0), "Alice"); - assert_eq!(name_column.value(1), "Bob"); + // name column comes from file + assert_eq!(get_string_value(result.column(2).as_ref(), 0), "Alice"); + assert_eq!(get_string_value(result.column(2).as_ref(), 1), "Bob"); } /// Test bucket partitioning with renamed source column. @@ -1372,6 +1617,7 @@ mod test { let mut transformer = RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) .with_partition(partition_spec, partition_data) + .expect("Failed to add partition constants") .build(); // Create a Parquet RecordBatch with actual data @@ -1476,6 +1722,7 @@ mod test { let mut transformer = RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) .with_partition(partition_spec, partition_data) + .expect("Failed to add partition constants") .build(); let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ @@ -1492,48 +1739,37 @@ mod test { // Verify each column demonstrates the correct spec rule: // Normal case: id from Parquet by field ID - let id_column = result - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(id_column.value(0), 100); - assert_eq!(id_column.value(1), 200); - - // Rule #1: dept from partition metadata (identity transform) - let dept_column = result - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(dept_column.value(0), "engineering"); - assert_eq!(dept_column.value(1), "engineering"); + // Use helpers to handle both simple and REE arrays + assert_eq!(get_int_value(result.column(0).as_ref(), 0), 100); + assert_eq!(get_int_value(result.column(0).as_ref(), 1), 200); + + // Rule #1: dept from partition metadata (identity transform) - will be REE + assert_eq!( + get_string_value(result.column(1).as_ref(), 0), + "engineering" + ); + assert_eq!( + get_string_value(result.column(1).as_ref(), 1), + "engineering" + ); - // Rule #2: data from Parquet via name mapping - let data_column = result - .column(2) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(data_column.value(0), "value1"); - assert_eq!(data_column.value(1), "value2"); + // Rule #2: data from Parquet via name mapping - will be regular array + assert_eq!(get_string_value(result.column(2).as_ref(), 0), "value1"); + assert_eq!(get_string_value(result.column(2).as_ref(), 1), "value2"); - // Rule #3: category from initial_default - let category_column = result - .column(3) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(category_column.value(0), "default_category"); - assert_eq!(category_column.value(1), "default_category"); + // Rule #3: category from initial_default - will be REE + assert_eq!( + get_string_value(result.column(3).as_ref(), 0), + "default_category" + ); + assert_eq!( + get_string_value(result.column(3).as_ref(), 1), + "default_category" + ); - // Rule #4: notes is null (no default, not in Parquet, not in partition) - let notes_column = result - .column(4) - .as_any() - .downcast_ref::() - .unwrap(); - assert!(notes_column.is_null(0)); - assert!(notes_column.is_null(1)); + // Rule #4: notes is null (no default, not in Parquet, not in partition) - will be REE with null + // For null REE arrays, we still use the helper which handles extraction + assert_eq!(get_string_value(result.column(4).as_ref(), 0), ""); + assert_eq!(get_string_value(result.column(4).as_ref(), 1), ""); } } diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index aae8efed74..8d8f40f72d 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -96,4 +96,5 @@ mod utils; pub mod writer; mod delete_vector; +pub mod metadata_columns; pub mod puffin; diff --git a/crates/iceberg/src/metadata_columns.rs b/crates/iceberg/src/metadata_columns.rs new file mode 100644 index 0000000000..30e4b4f2b9 --- /dev/null +++ b/crates/iceberg/src/metadata_columns.rs @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metadata columns (virtual/reserved fields) for Iceberg tables. +//! +//! This module defines metadata columns that can be requested in projections +//! but are not stored in data files. Instead, they are computed on-the-fly +//! during reading. Examples include the _file column (file path) and future +//! columns like partition values or row numbers. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_schema::{DataType, Field}; +use once_cell::sync::Lazy; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::{Error, ErrorKind, Result}; + +/// Reserved field ID for the file path (_file) column per Iceberg spec +pub const RESERVED_FIELD_ID_FILE: i32 = i32::MAX - 1; + +/// Reserved column name for the file path metadata column +pub const RESERVED_COL_NAME_FILE: &str = "_file"; + +/// Lazy-initialized Arrow Field definition for the _file metadata column. +/// Uses Run-End Encoding for memory efficiency. +static FILE_FIELD: Lazy> = Lazy::new(|| { + let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); + let values_field = Arc::new(Field::new("values", DataType::Utf8, true)); + Arc::new( + Field::new( + RESERVED_COL_NAME_FILE, + DataType::RunEndEncoded(run_ends_field, values_field), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + RESERVED_FIELD_ID_FILE.to_string(), + )])), + ) +}); + +/// Returns the Arrow Field definition for the _file metadata column. +/// +/// # Returns +/// A reference to the _file field definition (RunEndEncoded type) +pub fn file_field() -> &'static Arc { + &FILE_FIELD +} + +/// Returns the Arrow Field definition for a metadata field ID. +/// +/// # Arguments +/// * `field_id` - The metadata field ID +/// +/// # Returns +/// The Arrow Field definition for the metadata column, or an error if not a metadata field +pub fn get_metadata_field(field_id: i32) -> Result> { + match field_id { + RESERVED_FIELD_ID_FILE => Ok(Arc::clone(file_field())), + _ if is_metadata_field(field_id) => { + // Future metadata fields can be added here + Err(Error::new( + ErrorKind::Unexpected, + format!( + "Metadata field ID {} recognized but field definition not implemented", + field_id + ), + )) + } + _ => Err(Error::new( + ErrorKind::Unexpected, + format!("Field ID {} is not a metadata field", field_id), + )), + } +} + +/// Returns the field ID for a metadata column name. +/// +/// # Arguments +/// * `column_name` - The metadata column name +/// +/// # Returns +/// The field ID of the metadata column, or an error if the column name is not recognized +pub fn get_metadata_field_id(column_name: &str) -> Result { + match column_name { + RESERVED_COL_NAME_FILE => Ok(RESERVED_FIELD_ID_FILE), + _ => Err(Error::new( + ErrorKind::Unexpected, + format!("Unknown/unsupported metadata column name: {column_name}"), + )), + } +} + +/// Checks if a field ID is a metadata field. +/// +/// # Arguments +/// * `field_id` - The field ID to check +/// +/// # Returns +/// `true` if the field ID is a (currently supported) metadata field, `false` otherwise +pub fn is_metadata_field(field_id: i32) -> bool { + field_id == RESERVED_FIELD_ID_FILE + // Additional metadata fields can be checked here in the future +} + +/// Checks if a column name is a metadata column. +/// +/// # Arguments +/// * `column_name` - The column name to check +/// +/// # Returns +/// `true` if the column name is a metadata column, `false` otherwise +pub fn is_metadata_column_name(column_name: &str) -> bool { + get_metadata_field_id(column_name).is_ok() +} diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 3e319ca062..78aa068d62 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -36,6 +36,9 @@ use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; +use crate::metadata_columns::{ + RESERVED_COL_NAME_FILE, get_metadata_field_id, is_metadata_column_name, +}; use crate::runtime::spawn; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; @@ -124,6 +127,45 @@ impl<'a> TableScanBuilder<'a> { self } + /// Include the _file metadata column in the scan. + /// + /// This is a convenience method that adds the _file column to the current selection. + /// If no columns are currently selected (select_all), this will select all columns plus _file. + /// If specific columns are selected, this adds _file to that selection. + /// + /// # Example + /// ```no_run + /// # use iceberg::table::Table; + /// # async fn example(table: Table) -> iceberg::Result<()> { + /// // Select id, name, and _file + /// let scan = table + /// .scan() + /// .select(["id", "name"]) + /// .with_file_column() + /// .build()?; + /// # Ok(()) + /// # } + /// ``` + pub fn with_file_column(mut self) -> Self { + let mut columns = self.column_names.unwrap_or_else(|| { + // No explicit selection - get all column names from schema + self.table + .metadata() + .current_schema() + .as_struct() + .fields() + .iter() + .map(|f| f.name.clone()) + .collect() + }); + + // Add _file column + columns.push(RESERVED_COL_NAME_FILE.to_string()); + + self.column_names = Some(columns); + self + } + /// Set the snapshot to scan. When not set, it uses current snapshot. pub fn snapshot_id(mut self, snapshot_id: i64) -> Self { self.snapshot_id = Some(snapshot_id); @@ -217,9 +259,13 @@ impl<'a> TableScanBuilder<'a> { let schema = snapshot.schema(self.table.metadata())?; - // Check that all column names exist in the schema. + // Check that all column names exist in the schema (skip reserved columns). if let Some(column_names) = self.column_names.as_ref() { for column_name in column_names { + // Skip reserved columns that don't exist in the schema + if is_metadata_column_name(column_name) { + continue; + } if schema.field_by_name(column_name).is_none() { return Err(Error::new( ErrorKind::DataInvalid, @@ -240,6 +286,12 @@ impl<'a> TableScanBuilder<'a> { }); for column_name in column_names.iter() { + // Handle metadata columns (like "_file") + if is_metadata_column_name(column_name) { + field_ids.push(get_metadata_field_id(column_name)?); + continue; + } + let field_id = schema.field_id_by_name(column_name).ok_or_else(|| { Error::new( ErrorKind::DataInvalid, @@ -254,10 +306,10 @@ impl<'a> TableScanBuilder<'a> { Error::new( ErrorKind::FeatureUnsupported, format!( - "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}" - ), - ) - })?; + "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}" + ), + ) + })?; field_ids.push(field_id); } @@ -559,8 +611,10 @@ pub mod tests { use std::fs::File; use std::sync::Arc; + use arrow_array::cast::AsArray; use arrow_array::{ - ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, + Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, + StringArray, }; use futures::{TryStreamExt, stream}; use minijinja::value::Value; @@ -575,6 +629,7 @@ pub mod tests { use crate::arrow::ArrowReaderBuilder; use crate::expr::{BoundPredicate, Reference}; use crate::io::{FileIO, OutputFile}; + use crate::metadata_columns::RESERVED_COL_NAME_FILE; use crate::scan::FileScanTask; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry, @@ -1800,4 +1855,319 @@ pub mod tests { }; test_fn(task); } + + #[tokio::test] + async fn test_select_with_file_column() { + use arrow_array::cast::AsArray; + + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select regular columns plus the _file column + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Verify we have 2 columns: x and _file + assert_eq!(batches[0].num_columns(), 2); + + // Verify the x column exists and has correct data + let x_col = batches[0].column_by_name("x").unwrap(); + let x_arr = x_col.as_primitive::(); + assert_eq!(x_arr.value(0), 1); + + // Verify the _file column exists + let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE); + assert!( + file_col.is_some(), + "_file column should be present in the batch" + ); + + // Verify the _file column contains a file path + let file_col = file_col.unwrap(); + assert!( + matches!( + file_col.data_type(), + arrow_schema::DataType::RunEndEncoded(_, _) + ), + "_file column should use RunEndEncoded type" + ); + + // Decode the RunArray to verify it contains the file path + let run_array = file_col + .as_any() + .downcast_ref::>() + .expect("_file column should be a RunArray"); + + let values = run_array.values(); + let string_values = values.as_string::(); + assert_eq!(string_values.len(), 1, "Should have a single file path"); + + let file_path = string_values.value(0); + assert!( + file_path.ends_with(".parquet"), + "File path should end with .parquet, got: {}", + file_path + ); + } + + #[tokio::test] + async fn test_select_file_column_position() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select columns in specific order: x, _file, z + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_FILE, "z"]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 3); + + // Verify column order: x at position 0, _file at position 1, z at position 2 + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), "x"); + assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE); + assert_eq!(schema.field(2).name(), "z"); + + // Verify columns by name also works + assert!(batches[0].column_by_name("x").is_some()); + assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some()); + assert!(batches[0].column_by_name("z").is_some()); + } + + #[tokio::test] + async fn test_select_file_column_only() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select only the _file column + let table_scan = fixture + .table + .scan() + .select([RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Should have exactly 1 column + assert_eq!(batches[0].num_columns(), 1); + + // Verify it's the _file column + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE); + + // Verify the batch has the correct number of rows + // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted) + // Each file has 1024 rows, so total is 2048 rows + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 2048); + } + + #[tokio::test] + async fn test_file_column_with_multiple_files() { + use std::collections::HashSet; + + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select x and _file columns + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Collect all unique file paths from the batches + let mut file_paths = HashSet::new(); + for batch in &batches { + let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap(); + let run_array = file_col + .as_any() + .downcast_ref::>() + .expect("_file column should be a RunArray"); + + let values = run_array.values(); + let string_values = values.as_string::(); + for i in 0..string_values.len() { + file_paths.insert(string_values.value(i).to_string()); + } + } + + // We should have multiple files (the test creates 1.parquet and 3.parquet) + assert!(!file_paths.is_empty(), "Should have at least one file path"); + + // All paths should end with .parquet + for path in &file_paths { + assert!( + path.ends_with(".parquet"), + "All file paths should end with .parquet, got: {}", + path + ); + } + } + + #[tokio::test] + async fn test_file_column_at_start() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select _file at the start + let table_scan = fixture + .table + .scan() + .select([RESERVED_COL_NAME_FILE, "x", "y"]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 3); + + // Verify _file is at position 0 + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE); + assert_eq!(schema.field(1).name(), "x"); + assert_eq!(schema.field(2).name(), "y"); + } + + #[tokio::test] + async fn test_file_column_at_end() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select _file at the end + let table_scan = fixture + .table + .scan() + .select(["x", "y", RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 3); + + // Verify _file is at position 2 (the end) + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), "x"); + assert_eq!(schema.field(1).name(), "y"); + assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE); + } + + #[tokio::test] + async fn test_select_with_repeated_column_names() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select with repeated column names - both regular columns and virtual columns + // Repeated columns should appear multiple times in the result (duplicates are allowed) + let table_scan = fixture + .table + .scan() + .select([ + "x", + RESERVED_COL_NAME_FILE, + "x", // x repeated + "y", + RESERVED_COL_NAME_FILE, // _file repeated + "y", // y repeated + ]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Verify we have exactly 6 columns (duplicates are allowed and preserved) + assert_eq!( + batches[0].num_columns(), + 6, + "Should have exactly 6 columns with duplicates" + ); + + let schema = batches[0].schema(); + + // Verify columns appear in the exact order requested: x, _file, x, y, _file, y + assert_eq!(schema.field(0).name(), "x", "Column 0 should be x"); + assert_eq!( + schema.field(1).name(), + RESERVED_COL_NAME_FILE, + "Column 1 should be _file" + ); + assert_eq!( + schema.field(2).name(), + "x", + "Column 2 should be x (duplicate)" + ); + assert_eq!(schema.field(3).name(), "y", "Column 3 should be y"); + assert_eq!( + schema.field(4).name(), + RESERVED_COL_NAME_FILE, + "Column 4 should be _file (duplicate)" + ); + assert_eq!( + schema.field(5).name(), + "y", + "Column 5 should be y (duplicate)" + ); + + // Verify all columns have correct data types + assert!( + matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64), + "Column x should be Int64" + ); + assert!( + matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64), + "Column x (duplicate) should be Int64" + ); + assert!( + matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64), + "Column y should be Int64" + ); + assert!( + matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64), + "Column y (duplicate) should be Int64" + ); + assert!( + matches!( + schema.field(1).data_type(), + arrow_schema::DataType::RunEndEncoded(_, _) + ), + "_file column should use RunEndEncoded type" + ); + assert!( + matches!( + schema.field(4).data_type(), + arrow_schema::DataType::RunEndEncoded(_, _) + ), + "_file column (duplicate) should use RunEndEncoded type" + ); + } }