Skip to content
60 changes: 52 additions & 8 deletions python/ray/data/datasource/parquet_meta_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,20 @@ def fetch_func(fragments):
**ray_remote_args,
)
)

else:
raw_metadata = _fetch_metadata(fragments)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not want to dedupe metadata for this code path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the else branch handles only a single task that is already deduplicated in _fetch_metadata, there is no need to perform deduplication again (which is only required for multi-task scenarios).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Preference is to dedupe schemas in both branches to avoid making assumptions about the implementation, I'm not strongly opinionated on this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could introduce redundant computation, so I suggest avoiding it at this point in the else branch.


return _dedupe_metadata(raw_metadata)
return _dedupe_fragment_metadata(raw_metadata)



def _fetch_metadata_serialization_wrapper(
fragments: List["SerializedFragment"],
retry_match: Optional[List[str]],
retry_max_attempts: int,
retry_max_interval: int,
) -> List["pyarrow.parquet.FileMetaData"]:
) -> List["_ParquetFileFragmentMetaData"]:
from ray.data._internal.datasource.parquet_datasource import (
_deserialize_fragments_with_retry,
)
Expand Down Expand Up @@ -209,14 +211,14 @@ def _fetch_metadata_serialization_wrapper(

def _fetch_metadata(
fragments: List["pyarrow.dataset.ParquetFileFragment"],
) -> List["pyarrow.parquet.FileMetaData"]:
fragment_metadata = []
) -> List[_ParquetFileFragmentMetaData]:
fragment_metadata: List["pyarrow.parquet.FileMetaData"] = []
for f in fragments:
try:
fragment_metadata.append(f.metadata)
except AttributeError:
break
return fragment_metadata
return _dedupe_metadata(fragment_metadata)


def _dedupe_metadata(
Expand All @@ -227,8 +229,8 @@ def _dedupe_metadata(
memory usage by only keeping unique schema objects across all
file fragments. This method deduplicates the schemas and returns
a list of `_ParquetFileFragmentMetaData` objects."""
schema_to_id = {} # schema_id -> serialized_schema
id_to_schema = {} # serialized_schema -> schema_id
schema_to_id = {} # schema_ser -> schema_id
id_to_schema = {} # schema_id -> schema_ser
stripped_metadatas = []
for fragment_metadata in raw_metadatas:
stripped_md = _ParquetFileFragmentMetaData(fragment_metadata)
Expand All @@ -240,8 +242,50 @@ def _dedupe_metadata(
id_to_schema[schema_id] = schema_ser
stripped_md.set_schema_pickled(schema_ser)
else:
schema_id = schema_to_id.get(schema_ser)
schema_id = schema_to_id[schema_ser] # Direct access instead of .get()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the advantage of using direct access rather than get here?

Copy link
Contributor Author

@sword865 sword865 Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provided that the condition evaluates to false and schema_ser is present in schema_to_id, employ direct access, which will be consistent with the subsequent statement: existing_schema_ser = id_to_schema[schema_id]

existing_schema_ser = id_to_schema[schema_id]
stripped_md.set_schema_pickled(existing_schema_ser)
stripped_metadatas.append(stripped_md)
return stripped_metadatas

def _dedupe_fragment_metadata(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this name is slightly misleading. Based on the name, I'd expect that this function removes duplicate metadata objects from the input list. But in reality, it's deduplicating equivalent but distinct schema objects (not metadata objects).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refine function to _dedupe_schemas.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separately, the implementation of this method seems almost identical to _dedupe_metadata, and I don't think think the distinction between the functions is obvious. Do we need both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eliminate duplicate code in the two function.

metadatas: List[_ParquetFileFragmentMetaData],
) -> List[_ParquetFileFragmentMetaData]:
"""Deduplicates schema objects across existing _ParquetFileFragmentMetaData objects.

For datasets with a large number of columns, the pickled schema can be very large.
This function reduces memory usage by ensuring that identical schemas across multiple
fragment metadata objects reference the same underlying pickled schema object,
rather than each fragment maintaining its own copy.

Args:
metadatas: List of _ParquetFileFragmentMetaData objects that already have
pickled schemas set.

Returns:
The same list of _ParquetFileFragmentMetaData objects, but with duplicate
schemas deduplicated to reference the same object in memory.
"""
schema_to_id = {} # schema_ser -> schema_id
id_to_schema = {} # schema_id -> schema_ser
stripped_metadatas = []

for metadata in metadatas:
# Get the current schema serialization
schema_ser = metadata.schema_pickled

if schema_ser not in schema_to_id:
# This is a new unique schema
schema_id = len(schema_to_id)
schema_to_id[schema_ser] = schema_id
id_to_schema[schema_id] = schema_ser
# No need to set schema_pickled - it already has the correct value
else:
# This schema already exists, reuse the existing one
schema_id = schema_to_id[schema_ser]
existing_schema_ser = id_to_schema[schema_id]
metadata.set_schema_pickled(existing_schema_ser)

stripped_metadatas.append(metadata)

return stripped_metadatas