Skip to content

Commit 2d549a9

Browse files
authored
Fix partition column projection with schema evolution (apache#2685)
Closes apache#2672 # Rationale for this change When performing column projection on partitioned tables with schema evolution, PyIceberg incorrectly uses the projected schema (containing only selected columns) instead of the full table schema when building partition types in `_get_column_projection_values()`. This causes `ValueError: Could not find field with id: X` when: 1. Reading from partitioned Iceberg tables 2. Using column projection (selecting specific columns, not `SELECT *`) 3. Selected columns do NOT include the partition field(s) 4. The table has undergone schema evolution (fields added/removed after initial creation) 5. Reading files that are missing some of the selected columns (written before schema evolution) The root cause is where `partition_spec.partition_type(projected_schema)` fails because the projected schema may be missing fields that exist in the partition specification. The fix passes the full table schema from `ArrowScan._table_metadata.schema()` through `_task_to_record_batches()` to `_get_column_projection_values()`, ensuring all fields are available when building partition accessors. ## Are these changes tested? Yes. Added a test `test_partition_column_projection_with_schema_evolution` that: - Creates a partitioned table with initial schema - Writes data with the initial schema - Evolves the schema by adding a new column - Writes data with the evolved schema - Performs column projection that excludes the partition field ## Are there any user-facing changes? No. Only internal helpers are changed
1 parent 5773b7f commit 2d549a9

File tree

2 files changed

+79
-3
lines changed

2 files changed

+79
-3
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1492,14 +1492,18 @@ def _field_id(self, field: pa.Field) -> int:
14921492

14931493

14941494
def _get_column_projection_values(
1495-
file: DataFile, projected_schema: Schema, partition_spec: Optional[PartitionSpec], file_project_field_ids: Set[int]
1495+
file: DataFile,
1496+
projected_schema: Schema,
1497+
table_schema: Schema,
1498+
partition_spec: Optional[PartitionSpec],
1499+
file_project_field_ids: Set[int],
14961500
) -> Dict[int, Any]:
14971501
"""Apply Column Projection rules to File Schema."""
14981502
project_schema_diff = projected_schema.field_ids.difference(file_project_field_ids)
14991503
if len(project_schema_diff) == 0 or partition_spec is None:
15001504
return EMPTY_DICT
15011505

1502-
partition_schema = partition_spec.partition_type(projected_schema)
1506+
partition_schema = partition_spec.partition_type(table_schema)
15031507
accessors = build_position_accessors(partition_schema)
15041508

15051509
projected_missing_fields = {}
@@ -1517,6 +1521,7 @@ def _task_to_record_batches(
15171521
task: FileScanTask,
15181522
bound_row_filter: BooleanExpression,
15191523
projected_schema: Schema,
1524+
table_schema: Schema,
15201525
projected_field_ids: Set[int],
15211526
positional_deletes: Optional[List[ChunkedArray]],
15221527
case_sensitive: bool,
@@ -1541,7 +1546,7 @@ def _task_to_record_batches(
15411546

15421547
# Apply column projection rules: https://iceberg.apache.org/spec/#column-projection
15431548
projected_missing_fields = _get_column_projection_values(
1544-
task.file, projected_schema, partition_spec, file_schema.field_ids
1549+
task.file, projected_schema, table_schema, partition_spec, file_schema.field_ids
15451550
)
15461551

15471552
pyarrow_filter = None
@@ -1763,6 +1768,7 @@ def _record_batches_from_scan_tasks_and_deletes(
17631768
task,
17641769
self._bound_row_filter,
17651770
self._projected_schema,
1771+
self._table_metadata.schema(),
17661772
self._projected_field_ids,
17671773
deletes_per_file.get(task.file.file_path),
17681774
self._case_sensitive,

tests/io/test_pyarrow.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2846,6 +2846,7 @@ def test_task_to_record_batches_nanos(format_version: TableVersion, tmpdir: str)
28462846
FileScanTask(data_file),
28472847
bound_row_filter=AlwaysTrue(),
28482848
projected_schema=table_schema,
2849+
table_schema=table_schema,
28492850
projected_field_ids={1},
28502851
positional_deletes=None,
28512852
case_sensitive=True,
@@ -4590,3 +4591,72 @@ def test_orc_stripe_based_batching(tmp_path: Path) -> None:
45904591
# Verify total rows
45914592
total_rows = sum(batch.num_rows for batch in batches)
45924593
assert total_rows == 10000, f"Expected 10000 total rows, got {total_rows}"
4594+
4595+
4596+
def test_partition_column_projection_with_schema_evolution(catalog: InMemoryCatalog) -> None:
4597+
"""Test column projection on partitioned table after schema evolution (https://github.com/apache/iceberg-python/issues/2672)."""
4598+
initial_schema = Schema(
4599+
NestedField(1, "partition_date", DateType(), required=False),
4600+
NestedField(2, "id", IntegerType(), required=False),
4601+
NestedField(3, "name", StringType(), required=False),
4602+
NestedField(4, "value", IntegerType(), required=False),
4603+
)
4604+
4605+
partition_spec = PartitionSpec(
4606+
PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="partition_date"),
4607+
)
4608+
4609+
catalog.create_namespace("default")
4610+
table = catalog.create_table(
4611+
"default.test_schema_evolution_projection",
4612+
schema=initial_schema,
4613+
partition_spec=partition_spec,
4614+
)
4615+
4616+
data_v1 = pa.Table.from_pylist(
4617+
[
4618+
{"partition_date": date(2024, 1, 1), "id": 1, "name": "Alice", "value": 100},
4619+
{"partition_date": date(2024, 1, 1), "id": 2, "name": "Bob", "value": 200},
4620+
],
4621+
schema=pa.schema(
4622+
[
4623+
("partition_date", pa.date32()),
4624+
("id", pa.int32()),
4625+
("name", pa.string()),
4626+
("value", pa.int32()),
4627+
]
4628+
),
4629+
)
4630+
4631+
table.append(data_v1)
4632+
4633+
with table.update_schema() as update:
4634+
update.add_column("new_column", StringType())
4635+
4636+
table = catalog.load_table("default.test_schema_evolution_projection")
4637+
4638+
data_v2 = pa.Table.from_pylist(
4639+
[
4640+
{"partition_date": date(2024, 1, 2), "id": 3, "name": "Charlie", "value": 300, "new_column": "new1"},
4641+
{"partition_date": date(2024, 1, 2), "id": 4, "name": "David", "value": 400, "new_column": "new2"},
4642+
],
4643+
schema=pa.schema(
4644+
[
4645+
("partition_date", pa.date32()),
4646+
("id", pa.int32()),
4647+
("name", pa.string()),
4648+
("value", pa.int32()),
4649+
("new_column", pa.string()),
4650+
]
4651+
),
4652+
)
4653+
4654+
table.append(data_v2)
4655+
4656+
result = table.scan(selected_fields=("id", "name", "value", "new_column")).to_arrow()
4657+
4658+
assert set(result.schema.names) == {"id", "name", "value", "new_column"}
4659+
assert result.num_rows == 4
4660+
result_sorted = result.sort_by("name")
4661+
assert result_sorted["name"].to_pylist() == ["Alice", "Bob", "Charlie", "David"]
4662+
assert result_sorted["new_column"].to_pylist() == [None, None, "new1", "new2"]

0 commit comments

Comments
 (0)