Skip to content

Conversation

ntkathole
Copy link
Member

What this PR does / why we need it:

This PR introduces Ray as both a compute engine and an offline store in Feast. It enables scalable, distributed feature engineering and data access in Feast using Ray, making it easier to handle large datasets and complex feature pipelines.

Key Features

Ray Compute Engine

  • Distributed DAG execution for feature transformations, aggregations, joins, and materialization.
  • Automatic selection between broadcast and distributed join strategies.
  • Resource-aware partitioning and parallelism.
  • Efficient point-in-time joins for historical feature retrieval.
  • Seamless integration with Feast’s feature view and materialization APIs.

Ray Offline Store

  • High-performance data reading and writing using Ray Datasets.
  • Support for local and distributed Ray clusters.
  • Batch writing and saved dataset persistence.
  • Configurable resource limits for safe local development and testing.

Documentation added for both Ray compute engine and offline store, including configuration, resource management, and usage examples.

@ntkathole ntkathole self-assigned this Jul 20, 2025
@ntkathole ntkathole requested a review from a team as a code owner July 20, 2025 14:37
@ntkathole ntkathole marked this pull request as draft July 21, 2025 05:18
@ntkathole ntkathole force-pushed the ray_compute branch 20 times, most recently from 8549556 to 76af065 Compare July 26, 2025 13:14
@ntkathole ntkathole marked this pull request as ready for review July 26, 2025 13:16
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces Ray as both a compute engine and an offline store in Feast, enabling scalable, distributed feature engineering and data access for handling large datasets and complex feature pipelines. The implementation provides automatic join strategy selection, resource-aware partitioning, and seamless integration with Feast's existing APIs.

  • Added Ray compute engine with distributed DAG execution for feature transformations and materialization
  • Implemented Ray offline store with high-performance data reading/writing using Ray Datasets
  • Added comprehensive test coverage with unit and integration tests for both components

Reviewed Changes

Copilot reviewed 46 out of 48 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
setup.py Adds Ray as a new optional dependency with version requirement
sdk/python/feast/repo_config.py Registers Ray compute engine and offline store in configuration mappings
sdk/python/feast/infra/ray_shared_utils.py Shared utilities for Ray data processing and timestamp handling
sdk/python/feast/infra/offline_stores/contrib/ray_offline_store/ Complete Ray offline store implementation with optimization features
sdk/python/tests/unit/infra/compute_engines/ray_compute/ Unit tests for Ray compute engine nodes and configuration
sdk/python/tests/integration/compute_engines/ray_compute/ Integration tests for Ray compute engine functionality
sdk/python/feast/transformation/pandas_transformation.py Ray serialization compatibility fixes for PandasTransformation
Comments suppressed due to low confidence (1)

sdk/python/feast/infra/registry/caching_registry.py:429

  • [nitpick] Changing log levels from 'info' to 'debug' for registry refresh messages reduces observability. These messages may be important for debugging registry issues in production environments.
            logger.debug("Skipping refresh if already in progress")

Comment on lines +22 to +40
udf: Optional[Callable[[Any], Any]] = None,
udf_string: Optional[str] = None,
name: Optional[str] = None,
tags: Optional[dict[str, str]] = None,
description: str = "",
owner: str = "",
) -> "PandasTransformation":
instance = super(PandasTransformation, cls).__new__(
cls,
mode=TransformationMode.PANDAS,
udf=udf,
name=name,
udf_string=udf_string,
tags=tags,
description=description,
owner=owner,
# Handle Ray deserialization where parameters may not be provided
if udf is None and udf_string is None:
# Create a bare instance for deserialization
instance = object.__new__(cls)
return cast("PandasTransformation", instance)

# Ensure required parameters are not None before calling parent constructor
if udf is None:
raise ValueError("udf parameter cannot be None")
if udf_string is None:
raise ValueError("udf_string parameter cannot be None")

Copy link
Preview

Copilot AI Aug 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making previously required parameters optional (udf and udf_string) could break existing code that relies on these being required. Consider using a separate factory method for Ray deserialization instead of modifying the main constructor.

Copilot uses AI. Check for mistakes.

else 2
)
ctx.max_parallelism = self.available_cpus * multiplier
ctx.shuffle_strategy = "sort" # type: ignore
Copy link
Preview

Copilot AI Aug 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Using type: ignore comments should be avoided when possible. Consider using proper type annotations or handling the type checking issue explicitly.

Suggested change
ctx.shuffle_strategy = "sort" # type: ignore
ctx.shuffle_strategy = cast(str, "sort")

Copilot uses AI. Check for mistakes.

Comment on lines +151 to +152
ray_nodes.create_offline_store_retrieval_job = lambda **kwargs: mock_retrieval_job
result = node.execute(mock_context)
Copy link
Preview

Copilot AI Aug 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Monkey patching module functions in tests can lead to hard-to-debug issues. Consider using proper mocking with unittest.mock.patch or dependency injection instead.

Suggested change
ray_nodes.create_offline_store_retrieval_job = lambda **kwargs: mock_retrieval_job
result = node.execute(mock_context)
with patch(
"feast.infra.compute_engines.ray.nodes.create_offline_store_retrieval_job",
return_value=mock_retrieval_job,
):
result = node.execute(mock_context)

Copilot uses AI. Check for mistakes.

Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
timestamp_col = self.column_info.timestamp_column
requested_feats = getattr(self.column_info, "feature_cols", [])

# Check if the feature dataset contains aggregated features (from aggregation node)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

Copy link
Member

@franciscojavierarceo franciscojavierarceo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks great some small nits but i think we're almost there. can you make some issues to track the follow up here so we don't forget and I'll merge 👍

@@ -19,36 +19,61 @@
class PandasTransformation(Transformation):
def __new__(
cls,
udf: Callable[[Any], Any],
udf_string: str,
udf: Optional[Callable[[Any], Any]] = None,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why make this optional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not optional in real use, we are still maintaining strict validation (lines 72-75 enforce that they can't actually be None). We have to make it optional to ensures that PandasTransformation objects can be properly reconstructed across Ray workers without constructor failures.

Signed-off-by: ntkathole <nikhilkathole2683@gmail.com>
@franciscojavierarceo
Copy link
Member

Thanks for this @ntkathole !!!

@franciscojavierarceo franciscojavierarceo merged commit 72de088 into feast-dev:master Aug 12, 2025
19 checks passed
franciscojavierarceo pushed a commit that referenced this pull request Aug 14, 2025
# [0.52.0](v0.51.0...v0.52.0) (2025-08-14)

### Bug Fixes

* Correct entity value type mapping for aliased feature views ([#5492](#5492)) ([bdf20bb](bdf20bb))
* Correct namespace reference in remote Feast project setup for operator upgrade and previous version tests ([df391ec](df391ec))
* dell pydantic v1 ([1189512](1189512))
* Fixed the entity to on-demand feature view relationship ([1c59bba](1c59bba))
* Make transformers optional ([#5544](#5544)) ([a4eef38](a4eef38))
* Push Source inherits the timestamp fields from Data Source ([#5550](#5550)) ([b7ea5cc](b7ea5cc))
* Remove the devcontainer folder. ([a9815c2](a9815c2))

### Features

* Added API for discovering Feature Views by popular tags ([#5558](#5558)) ([2e5f564](2e5f564))
* Added filtering support for featureView and featureServices api ([#5552](#5552)) ([897b3f3](897b3f3))
* Added global search api and necessary unit tests ([#5532](#5532)) ([dd3061f](dd3061f))
* Added Ray Compute Engine and Ray Offline Store Support ([#5526](#5526)) ([72de088](72de088))
* Added recent visit logging api for registry server ([#5545](#5545)) ([2adcf2c](2adcf2c))
* **auth:** support client-credentials & static token for OIDC client auth ([fc44222](fc44222))
* **auth:** support client-credentials & static token for OIDC client auth ([795fc06](795fc06))
* Implement and enhance remote document retrieval functionality ([#5487](#5487)) ([d095b96](d095b96))
* Implemented consistent error handling ([7f10151](7f10151))
* Offline Store historical features retrieval without entity df, but based on datatime range ([#5527](#5527)) ([df942b9](df942b9))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants