-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: Added Ray Compute Engine and Ray Offline Store Support #5526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Added Ray Compute Engine and Ray Offline Store Support #5526
Conversation
8549556
to
76af065
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces Ray as both a compute engine and an offline store in Feast, enabling scalable, distributed feature engineering and data access for handling large datasets and complex feature pipelines. The implementation provides automatic join strategy selection, resource-aware partitioning, and seamless integration with Feast's existing APIs.
- Added Ray compute engine with distributed DAG execution for feature transformations and materialization
- Implemented Ray offline store with high-performance data reading/writing using Ray Datasets
- Added comprehensive test coverage with unit and integration tests for both components
Reviewed Changes
Copilot reviewed 46 out of 48 changed files in this pull request and generated 5 comments.
Show a summary per file
File | Description |
---|---|
setup.py | Adds Ray as a new optional dependency with version requirement |
sdk/python/feast/repo_config.py | Registers Ray compute engine and offline store in configuration mappings |
sdk/python/feast/infra/ray_shared_utils.py | Shared utilities for Ray data processing and timestamp handling |
sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ | Complete Ray offline store implementation with optimization features |
sdk/python/tests/unit/infra/compute_engines/ray_compute/ | Unit tests for Ray compute engine nodes and configuration |
sdk/python/tests/integration/compute_engines/ray_compute/ | Integration tests for Ray compute engine functionality |
sdk/python/feast/transformation/pandas_transformation.py | Ray serialization compatibility fixes for PandasTransformation |
Comments suppressed due to low confidence (1)
sdk/python/feast/infra/registry/caching_registry.py:429
- [nitpick] Changing log levels from 'info' to 'debug' for registry refresh messages reduces observability. These messages may be important for debugging registry issues in production environments.
logger.debug("Skipping refresh if already in progress")
udf: Optional[Callable[[Any], Any]] = None, | ||
udf_string: Optional[str] = None, | ||
name: Optional[str] = None, | ||
tags: Optional[dict[str, str]] = None, | ||
description: str = "", | ||
owner: str = "", | ||
) -> "PandasTransformation": | ||
instance = super(PandasTransformation, cls).__new__( | ||
cls, | ||
mode=TransformationMode.PANDAS, | ||
udf=udf, | ||
name=name, | ||
udf_string=udf_string, | ||
tags=tags, | ||
description=description, | ||
owner=owner, | ||
# Handle Ray deserialization where parameters may not be provided | ||
if udf is None and udf_string is None: | ||
# Create a bare instance for deserialization | ||
instance = object.__new__(cls) | ||
return cast("PandasTransformation", instance) | ||
|
||
# Ensure required parameters are not None before calling parent constructor | ||
if udf is None: | ||
raise ValueError("udf parameter cannot be None") | ||
if udf_string is None: | ||
raise ValueError("udf_string parameter cannot be None") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making previously required parameters optional (udf and udf_string) could break existing code that relies on these being required. Consider using a separate factory method for Ray deserialization instead of modifying the main constructor.
Copilot uses AI. Check for mistakes.
else 2 | ||
) | ||
ctx.max_parallelism = self.available_cpus * multiplier | ||
ctx.shuffle_strategy = "sort" # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Using type: ignore comments should be avoided when possible. Consider using proper type annotations or handling the type checking issue explicitly.
ctx.shuffle_strategy = "sort" # type: ignore | |
ctx.shuffle_strategy = cast(str, "sort") |
Copilot uses AI. Check for mistakes.
ray_nodes.create_offline_store_retrieval_job = lambda **kwargs: mock_retrieval_job | ||
result = node.execute(mock_context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Monkey patching module functions in tests can lead to hard-to-debug issues. Consider using proper mocking with unittest.mock.patch or dependency injection instead.
ray_nodes.create_offline_store_retrieval_job = lambda **kwargs: mock_retrieval_job | |
result = node.execute(mock_context) | |
with patch( | |
"feast.infra.compute_engines.ray.nodes.create_offline_store_retrieval_job", | |
return_value=mock_retrieval_job, | |
): | |
result = node.execute(mock_context) |
Copilot uses AI. Check for mistakes.
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
timestamp_col = self.column_info.timestamp_column | ||
requested_feats = getattr(self.column_info, "feature_cols", []) | ||
|
||
# Check if the feature dataset contains aggregated features (from aggregation node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ray.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks great some small nits but i think we're almost there. can you make some issues to track the follow up here so we don't forget and I'll merge 👍
@@ -19,36 +19,61 @@ | |||
class PandasTransformation(Transformation): | |||
def __new__( | |||
cls, | |||
udf: Callable[[Any], Any], | |||
udf_string: str, | |||
udf: Optional[Callable[[Any], Any]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why make this optional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not optional in real use, we are still maintaining strict validation (lines 72-75 enforce that they can't actually be None). We have to make it optional to ensures that PandasTransformation objects can be properly reconstructed across Ray workers without constructor failures.
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Thanks for this @ntkathole !!! |
# [0.52.0](v0.51.0...v0.52.0) (2025-08-14) ### Bug Fixes * Correct entity value type mapping for aliased feature views ([#5492](#5492)) ([bdf20bb](bdf20bb)) * Correct namespace reference in remote Feast project setup for operator upgrade and previous version tests ([df391ec](df391ec)) * dell pydantic v1 ([1189512](1189512)) * Fixed the entity to on-demand feature view relationship ([1c59bba](1c59bba)) * Make transformers optional ([#5544](#5544)) ([a4eef38](a4eef38)) * Push Source inherits the timestamp fields from Data Source ([#5550](#5550)) ([b7ea5cc](b7ea5cc)) * Remove the devcontainer folder. ([a9815c2](a9815c2)) ### Features * Added API for discovering Feature Views by popular tags ([#5558](#5558)) ([2e5f564](2e5f564)) * Added filtering support for featureView and featureServices api ([#5552](#5552)) ([897b3f3](897b3f3)) * Added global search api and necessary unit tests ([#5532](#5532)) ([dd3061f](dd3061f)) * Added Ray Compute Engine and Ray Offline Store Support ([#5526](#5526)) ([72de088](72de088)) * Added recent visit logging api for registry server ([#5545](#5545)) ([2adcf2c](2adcf2c)) * **auth:** support client-credentials & static token for OIDC client auth ([fc44222](fc44222)) * **auth:** support client-credentials & static token for OIDC client auth ([795fc06](795fc06)) * Implement and enhance remote document retrieval functionality ([#5487](#5487)) ([d095b96](d095b96)) * Implemented consistent error handling ([7f10151](7f10151)) * Offline Store historical features retrieval without entity df, but based on datatime range ([#5527](#5527)) ([df942b9](df942b9))
What this PR does / why we need it:
This PR introduces Ray as both a compute engine and an offline store in Feast. It enables scalable, distributed feature engineering and data access in Feast using Ray, making it easier to handle large datasets and complex feature pipelines.
Key Features
Ray Compute Engine
Ray Offline Store
Documentation added for both Ray compute engine and offline store, including configuration, resource management, and usage examples.