-
Notifications
You must be signed in to change notification settings - Fork 6.8k
[Data] decrease parquet metadata storage usage #54821
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
4e5eb34
b1c9309
2c0ffd3
ed52615
51b698a
e016ea0
16ef5ae
403bdd1
ae59f0a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -150,18 +150,20 @@ def fetch_func(fragments): | |
**ray_remote_args, | ||
) | ||
) | ||
|
||
else: | ||
raw_metadata = _fetch_metadata(fragments) | ||
|
||
return _dedupe_metadata(raw_metadata) | ||
return _dedupe_fragment_metadata(raw_metadata) | ||
sword865 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
|
||
def _fetch_metadata_serialization_wrapper( | ||
fragments: List["SerializedFragment"], | ||
retry_match: Optional[List[str]], | ||
retry_max_attempts: int, | ||
retry_max_interval: int, | ||
) -> List["pyarrow.parquet.FileMetaData"]: | ||
) -> List["_ParquetFileFragmentMetaData"]: | ||
from ray.data._internal.datasource.parquet_datasource import ( | ||
_deserialize_fragments_with_retry, | ||
) | ||
|
@@ -209,14 +211,14 @@ def _fetch_metadata_serialization_wrapper( | |
|
||
def _fetch_metadata( | ||
fragments: List["pyarrow.dataset.ParquetFileFragment"], | ||
) -> List["pyarrow.parquet.FileMetaData"]: | ||
fragment_metadata = [] | ||
) -> List[_ParquetFileFragmentMetaData]: | ||
fragment_metadata: List["pyarrow.parquet.FileMetaData"] = [] | ||
for f in fragments: | ||
try: | ||
fragment_metadata.append(f.metadata) | ||
except AttributeError: | ||
break | ||
return fragment_metadata | ||
return _dedupe_metadata(fragment_metadata) | ||
bveeramani marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
def _dedupe_metadata( | ||
|
@@ -227,8 +229,8 @@ def _dedupe_metadata( | |
memory usage by only keeping unique schema objects across all | ||
file fragments. This method deduplicates the schemas and returns | ||
a list of `_ParquetFileFragmentMetaData` objects.""" | ||
schema_to_id = {} # schema_id -> serialized_schema | ||
id_to_schema = {} # serialized_schema -> schema_id | ||
schema_to_id = {} # schema_ser -> schema_id | ||
id_to_schema = {} # schema_id -> schema_ser | ||
stripped_metadatas = [] | ||
for fragment_metadata in raw_metadatas: | ||
stripped_md = _ParquetFileFragmentMetaData(fragment_metadata) | ||
|
@@ -240,8 +242,50 @@ def _dedupe_metadata( | |
id_to_schema[schema_id] = schema_ser | ||
stripped_md.set_schema_pickled(schema_ser) | ||
else: | ||
schema_id = schema_to_id.get(schema_ser) | ||
schema_id = schema_to_id[schema_ser] # Direct access instead of .get() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the advantage of using direct access rather than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Provided that the condition evaluates to false and schema_ser is present in schema_to_id, employ direct access, which will be consistent with the subsequent statement: |
||
existing_schema_ser = id_to_schema[schema_id] | ||
stripped_md.set_schema_pickled(existing_schema_ser) | ||
stripped_metadatas.append(stripped_md) | ||
return stripped_metadatas | ||
|
||
def _dedupe_fragment_metadata( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this name is slightly misleading. Based on the name, I'd expect that this function removes duplicate metadata objects from the input list. But in reality, it's deduplicating equivalent but distinct schema objects (not metadata objects). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. refine function to _dedupe_schemas. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Separately, the implementation of this method seems almost identical to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Eliminate duplicate code in the two function. |
||
metadatas: List[_ParquetFileFragmentMetaData], | ||
) -> List[_ParquetFileFragmentMetaData]: | ||
"""Deduplicates schema objects across existing _ParquetFileFragmentMetaData objects. | ||
|
||
For datasets with a large number of columns, the pickled schema can be very large. | ||
This function reduces memory usage by ensuring that identical schemas across multiple | ||
fragment metadata objects reference the same underlying pickled schema object, | ||
rather than each fragment maintaining its own copy. | ||
|
||
Args: | ||
metadatas: List of _ParquetFileFragmentMetaData objects that already have | ||
pickled schemas set. | ||
|
||
Returns: | ||
The same list of _ParquetFileFragmentMetaData objects, but with duplicate | ||
schemas deduplicated to reference the same object in memory. | ||
""" | ||
schema_to_id = {} # schema_ser -> schema_id | ||
id_to_schema = {} # schema_id -> schema_ser | ||
stripped_metadatas = [] | ||
|
||
for metadata in metadatas: | ||
# Get the current schema serialization | ||
schema_ser = metadata.schema_pickled | ||
|
||
if schema_ser not in schema_to_id: | ||
# This is a new unique schema | ||
schema_id = len(schema_to_id) | ||
schema_to_id[schema_ser] = schema_id | ||
id_to_schema[schema_id] = schema_ser | ||
# No need to set schema_pickled - it already has the correct value | ||
else: | ||
# This schema already exists, reuse the existing one | ||
schema_id = schema_to_id[schema_ser] | ||
existing_schema_ser = id_to_schema[schema_id] | ||
metadata.set_schema_pickled(existing_schema_ser) | ||
|
||
stripped_metadatas.append(metadata) | ||
sword865 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
return stripped_metadatas |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we not want to dedupe metadata for this code path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the else branch handles only a single task that is already deduplicated in _fetch_metadata, there is no need to perform deduplication again (which is only required for multi-task scenarios).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Preference is to dedupe schemas in both branches to avoid making assumptions about the implementation, I'm not strongly opinionated on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could introduce redundant computation, so I suggest avoiding it at this point in the
else
branch.