Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5d9ee2e
Add PartitionSpec to FileScanTask and handling in RecordBatchTransfor…
mbutrovich Nov 3, 2025
ff2347a
format
mbutrovich Nov 3, 2025
b9e6f1e
Put back changes that accidentally got lost from not having an update…
mbutrovich Nov 3, 2025
e7ff597
Format, update comments
mbutrovich Nov 3, 2025
f481c66
Merge branch 'main' into partition-spec-support
mbutrovich Nov 4, 2025
da243bf
Address spec_id and serde feedback. Need to think about the name mapp…
mbutrovich Nov 4, 2025
75cc2bc
Add NameMapping. Still need to populate from table metadata. Want to …
mbutrovich Nov 4, 2025
29dde0e
Handle field ID conflicts in add_files with name mapping
mbutrovich Nov 4, 2025
c953837
clean up comments a bit, add a new test
mbutrovich Nov 4, 2025
b30195c
remove test accidentally brought in from #1778.
mbutrovich Nov 4, 2025
135f149
remove inadvertent changes, make comments more succinct
mbutrovich Nov 4, 2025
2b1c28a
Merge branch 'main' into partition-spec-support
mbutrovich Nov 4, 2025
72befec
Merge branch 'main' into partition-spec-support
mbutrovich Nov 5, 2025
37b1513
Fix test after merging in main.
mbutrovich Nov 5, 2025
c2fcc66
Address PR feedback.
mbutrovich Nov 6, 2025
668e78b
Adjust comments, mostly just to kick CI.
mbutrovich Nov 6, 2025
e7314bb
Merge branch 'main' into partition-spec-support
mbutrovich Nov 10, 2025
360d626
Address PR feedback.
mbutrovich Nov 10, 2025
fe45218
Apply name mapping in ArrowReader instead of RecordBatchTransformer t…
mbutrovich Nov 11, 2025
2938393
Merge branch 'main' into partition-spec-support
mbutrovich Nov 11, 2025
daf4d72
Fix tests in record_batch_transformer.rs to match new behavior.
mbutrovich Nov 11, 2025
e61f7a7
Address PR feedback.
mbutrovich Nov 12, 2025
65ae399
Merge branch 'main' into partition-spec-support
mbutrovich Nov 12, 2025
894eb58
Merge branch 'main' into partition-spec-support
liurenjie1024 Nov 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,9 @@ mod tests {
project_field_ids: vec![2, 3],
predicate: None,
deletes: vec![pos_del, eq_del],
partition: None,
partition_spec: None,
name_mapping: None,
};

// Load the deletes - should handle both types without error
Expand Down
6 changes: 6 additions & 0 deletions crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ pub(crate) mod tests {
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_1, pos_del_2.clone()],
partition: None,
partition_spec: None,
name_mapping: None,
},
FileScanTask {
start: 0,
Expand All @@ -352,6 +355,9 @@ pub(crate) mod tests {
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_3],
partition: None,
partition_spec: None,
name_mapping: None,
},
];

Expand Down
211 changes: 208 additions & 3 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,14 @@ impl ArrowReader {

// RecordBatchTransformer performs any transformations required on the RecordBatches
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());
// and column re-ordering.
let mut record_batch_transformer = RecordBatchTransformer::build_with_partition_data(
task.schema_ref(),
task.project_field_ids(),
task.partition_spec.clone(),
task.partition.clone(),
task.name_mapping.clone(),
);

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
Expand Down Expand Up @@ -1948,6 +1953,9 @@ message schema {
project_field_ids: vec![1],
predicate: Some(predicate.bind(schema, true).unwrap()),
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -2266,6 +2274,9 @@ message schema {
project_field_ids: vec![1],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
};

// Task 2: read the second and third row groups
Expand All @@ -2279,6 +2290,9 @@ message schema {
project_field_ids: vec![1],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
};

let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
Expand Down Expand Up @@ -2403,6 +2417,9 @@ message schema {
project_field_ids: vec![1, 2], // Request both columns 'a' and 'b'
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -2571,6 +2588,9 @@ message schema {
partition_spec_id: 0,
equality_ids: None,
}],
partition: None,
partition_spec: None,
name_mapping: None,
};

let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
Expand Down Expand Up @@ -2786,6 +2806,9 @@ message schema {
partition_spec_id: 0,
equality_ids: None,
}],
partition: None,
partition_spec: None,
name_mapping: None,
};

let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
Expand Down Expand Up @@ -2994,6 +3017,9 @@ message schema {
partition_spec_id: 0,
equality_ids: None,
}],
partition: None,
partition_spec: None,
name_mapping: None,
};

let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
Expand Down Expand Up @@ -3094,6 +3120,9 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3188,6 +3217,9 @@ message schema {
project_field_ids: vec![1, 3],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3271,6 +3303,9 @@ message schema {
project_field_ids: vec![1, 2, 3],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3368,6 +3403,9 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3494,6 +3532,9 @@ message schema {
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3587,6 +3628,9 @@ message schema {
project_field_ids: vec![1, 5, 2],
predicate: None,
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand Down Expand Up @@ -3693,6 +3737,9 @@ message schema {
project_field_ids: vec![1, 2, 3],
predicate: Some(predicate.bind(schema, true).unwrap()),
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;
Expand All @@ -3708,4 +3755,162 @@ message schema {
// Should return empty results
assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0));
}

/// Test bucket partitioning reads source column from data file (not partition metadata).
///
/// This is an integration test verifying the complete ArrowReader pipeline with bucket partitioning.
/// It corresponds to TestRuntimeFiltering tests in Iceberg Java (e.g., testRenamedSourceColumnTable).
///
/// # Iceberg Spec Requirements
///
/// Per the Iceberg spec "Column Projection" section:
/// > "Return the value from partition metadata if an **Identity Transform** exists for the field"
///
/// This means:
/// - Identity transforms (e.g., `identity(dept)`) use constants from partition metadata
/// - Non-identity transforms (e.g., `bucket(4, id)`) must read source columns from data files
/// - Partition metadata for bucket transforms stores bucket numbers (0-3), NOT source values
///
/// Java's PartitionUtil.constantsMap() implements this via:
/// ```java
/// if (field.transform().isIdentity()) {
/// idToConstant.put(field.sourceId(), converted);
/// }
/// ```
///
/// # What This Test Verifies
///
/// This test ensures the full ArrowReader → RecordBatchTransformer pipeline correctly handles
/// bucket partitioning when FileScanTask provides partition_spec and partition_data:
///
/// - Parquet file has field_id=1 named "id" with actual data [1, 5, 9, 13]
/// - FileScanTask specifies partition_spec with bucket(4, id) and partition_data with bucket=1
/// - RecordBatchTransformer.constants_map() excludes bucket-partitioned field from constants
/// - ArrowReader correctly reads [1, 5, 9, 13] from the data file
/// - Values are NOT replaced with constant 1 from partition metadata
///
/// # Why This Matters
///
/// Without correct handling:
/// - Runtime filtering would break (e.g., `WHERE id = 5` would fail)
/// - Query results would be incorrect (all rows would have id=1)
/// - Bucket partitioning would be unusable for query optimization
///
/// # References
/// - Iceberg spec: format/spec.md "Column Projection" + "Partition Transforms"
/// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
/// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
#[tokio::test]
async fn test_bucket_partitioning_reads_source_column_from_file() {
use arrow_array::Int32Array;

use crate::spec::{Literal, PartitionSpec, Struct, Transform};

// Iceberg schema with id and name columns
let schema = Arc::new(
Schema::builder()
.with_schema_id(0)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap(),
);

// Partition spec: bucket(4, id)
let partition_spec = Arc::new(
PartitionSpec::builder(schema.clone())
.with_spec_id(0)
.add_partition_field("id", "id_bucket", Transform::Bucket(4))
.unwrap()
.build()
.unwrap(),
);

// Partition data: bucket value is 1
let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]);

// Create Arrow schema with field IDs for Parquet file
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)])),
]));

// Write Parquet file with data
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();

let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef;
let name_data =
Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef;

let to_write =
RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap();

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();
let file = File::create(format!("{}/data.parquet", &table_location)).unwrap();
let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap();
writer.write(&to_write).expect("Writing batch");
writer.close().unwrap();

// Read the Parquet file with partition spec and data
let reader = ArrowReaderBuilder::new(file_io).build();
let tasks = Box::pin(futures::stream::iter(
vec![Ok(FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: format!("{}/data.parquet", table_location),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1, 2],
predicate: None,
deletes: vec![],
partition: Some(partition_data),
partition_spec: Some(partition_spec),
name_mapping: None,
})]
.into_iter(),
)) as FileScanTaskStream;

let result = reader
.read(tasks)
.unwrap()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

// Verify we got the correct data
assert_eq!(result.len(), 1);
let batch = &result[0];

assert_eq!(batch.num_columns(), 2);
assert_eq!(batch.num_rows(), 4);

// The id column MUST contain actual values from the Parquet file [1, 5, 9, 13],
// NOT the constant partition value 1
let id_col = batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(id_col.value(0), 1);
assert_eq!(id_col.value(1), 5);
assert_eq!(id_col.value(2), 9);
assert_eq!(id_col.value(3), 13);

let name_col = batch.column(1).as_string::<i32>();
assert_eq!(name_col.value(0), "Alice");
assert_eq!(name_col.value(1), "Bob");
assert_eq!(name_col.value(2), "Charlie");
assert_eq!(name_col.value(3), "Dave");
}
}
Loading
Loading