diff --git a/python/ray/data/BUILD b/python/ray/data/BUILD index 6b0083700fd0..dee4569718c8 100644 --- a/python/ray/data/BUILD +++ b/python/ray/data/BUILD @@ -1532,3 +1532,31 @@ py_test( "//:ray_lib", ], ) + +py_test( + name = "test_fillna", + size = "small", + srcs = ["tests/test_fillna.py"], + tags = [ + "exclusive", + "team:data", + ], + deps = [ + ":conftest", + "//:ray_lib", + ], +) + +py_test( + name = "test_dropna", + size = "small", + srcs = ["tests/test_dropna.py"], + tags = [ + "exclusive", + "team:data", + ], + deps = [ + ":conftest", + "//:ray_lib", + ], +) diff --git a/python/ray/data/_internal/logical/operators/dropna_operator.py b/python/ray/data/_internal/logical/operators/dropna_operator.py new file mode 100644 index 000000000000..8d9cfb194cef --- /dev/null +++ b/python/ray/data/_internal/logical/operators/dropna_operator.py @@ -0,0 +1,103 @@ +""" +DropNa logical operator. + +This module defines the DropNa logical operator for removing rows with missing +values from Ray datasets. +""" + +from typing import Any, Dict, List, Optional + +from ray.data._internal.compute import ComputeStrategy +from ray.data._internal.logical.interfaces import LogicalOperator +from ray.data._internal.logical.operators.map_operator import AbstractMap + + +class DropNa(AbstractMap): + """Logical operator for dropna operation. + + This operator represents the logical intent to remove rows containing + missing values (null/None/NaN) from a dataset based on specified criteria. + + Examples: + .. testcode:: + + # Drop rows with any missing values + dropna_op = DropNa(input_op, how="any") + + # Drop rows only if all values are missing + dropna_op = DropNa(input_op, how="all") + + # Drop rows with missing values in specific columns + dropna_op = DropNa(input_op, subset=["col1", "col2"]) + + # Keep rows with at least 2 non-missing values + dropna_op = DropNa(input_op, thresh=2) + + Args: + input_op: The input logical operator. + how: Determines which rows to drop. 'any' drops rows with any missing + values, 'all' drops rows where all values are missing. + subset: Optional list of column names to consider for missing values. + If None, all columns will be considered. + thresh: Optional minimum number of non-missing values required to keep + a row. If specified, overrides the 'how' parameter. + compute: Optional compute strategy for the operation. + ray_remote_args: Optional Ray remote arguments for distributed execution. + """ + + def __init__( + self, + input_op: LogicalOperator, + how: str = "any", + subset: Optional[List[str]] = None, + thresh: Optional[int] = None, + compute: Optional[ComputeStrategy] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + ): + super().__init__( + "DropNa", + input_op=input_op, + compute=compute, + ray_remote_args=ray_remote_args, + ) + self._how = how + self._subset = subset + self._thresh = thresh + self._batch_format = "pyarrow" + self._zero_copy_batch = True + + @property + def how(self) -> str: + """The strategy for determining which rows to drop. + + Returns: + Either 'any' (drop if any column has missing values) or + 'all' (drop only if all columns have missing values). + """ + return self._how + + @property + def subset(self) -> Optional[List[str]]: + """The subset of columns to consider for missing values. + + Returns: + List of column names, or None if all columns should be considered. + """ + return self._subset + + @property + def thresh(self) -> Optional[int]: + """The minimum number of non-missing values required to keep a row. + + Returns: + Integer threshold, or None if not using threshold-based dropping. + """ + return self._thresh + + def can_modify_num_rows(self) -> bool: + """Check if this operator can modify the number of rows. + + Returns: + True, as dropna operations can remove rows from the dataset. + """ + return True diff --git a/python/ray/data/_internal/logical/operators/fillna_operator.py b/python/ray/data/_internal/logical/operators/fillna_operator.py new file mode 100644 index 000000000000..bbc1cd0d44a6 --- /dev/null +++ b/python/ray/data/_internal/logical/operators/fillna_operator.py @@ -0,0 +1,87 @@ +""" +FillNa logical operator. + +This module defines the FillNa logical operator for filling missing values +in Ray datasets. +""" + +from typing import Any, Dict, List, Optional, Union + +from ray.data._internal.compute import ComputeStrategy +from ray.data._internal.logical.interfaces import LogicalOperator +from ray.data._internal.logical.operators.map_operator import AbstractMap + + +class FillNa(AbstractMap): + """Logical operator for fillna operation. + + This operator represents the logical intent to fill missing values + (null/None/NaN) in a dataset with specified replacement values. + + Examples: + .. testcode:: + + # Fill all missing values with 0 + fillna_op = FillNa(input_op, value=0) + + # Fill column-specific values + fillna_op = FillNa(input_op, value={"col1": 0, "col2": "missing"}) + + # Fill only specific columns + fillna_op = FillNa(input_op, value=0, subset=["col1", "col2"]) + + Args: + input_op: The input logical operator. + value: Value(s) to use for filling missing entries. Can be a scalar + value to fill all columns, or a dictionary mapping column names + to fill values for column-specific filling. + subset: Optional list of column names to restrict the filling operation. + If None, all columns will be processed. + compute: Optional compute strategy for the operation. + ray_remote_args: Optional Ray remote arguments for distributed execution. + """ + + def __init__( + self, + input_op: LogicalOperator, + value: Union[Any, Dict[str, Any]], + subset: Optional[List[str]] = None, + compute: Optional[ComputeStrategy] = None, + ray_remote_args: Optional[Dict[str, Any]] = None, + ): + super().__init__( + "FillNa", + input_op=input_op, + compute=compute, + ray_remote_args=ray_remote_args, + ) + self._value = value + self._subset = subset + self._batch_format = "pyarrow" + self._zero_copy_batch = True + + @property + def value(self) -> Union[Any, Dict[str, Any]]: + """The fill value(s) to use for replacing missing entries. + + Returns: + Either a scalar value or a dictionary of column-specific values. + """ + return self._value + + @property + def subset(self) -> Optional[List[str]]: + """The subset of columns to apply the fill operation to. + + Returns: + List of column names, or None if all columns should be processed. + """ + return self._subset + + def can_modify_num_rows(self) -> bool: + """Check if this operator can modify the number of rows. + + Returns: + False, as fillna operations preserve the number of rows. + """ + return False diff --git a/python/ray/data/_internal/planner/plan_dropna_op.py b/python/ray/data/_internal/planner/plan_dropna_op.py new file mode 100644 index 000000000000..5777a3d5606b --- /dev/null +++ b/python/ray/data/_internal/planner/plan_dropna_op.py @@ -0,0 +1,237 @@ +""" +Plan DropNa logical operator. + +This module contains the planning logic for converting DropNa logical operators +into physical execution plans using MapOperator. +""" + +from typing import List, Optional + +import pyarrow as pa +import pyarrow.compute as pc + +from ray.data._internal.execution.interfaces import PhysicalOperator +from ray.data._internal.execution.operators.map_operator import MapOperator +from ray.data._internal.logical.operators.dropna_operator import DropNa +from ray.data._internal.planner.plan_udf_map_op import ( + _create_map_transformer_for_block_based_map_op, + _generate_transform_fn_for_map_block, + _try_wrap_udf_exception, + get_compute, +) +from ray.data.context import DataContext + + +def plan_dropna_op( + op: DropNa, + physical_children: List[PhysicalOperator], + data_context: DataContext, +) -> MapOperator: + """Plan a DropNa logical operator into a physical MapOperator. + + This function converts a DropNa logical operator into a physical execution + plan that removes rows containing missing values (null/None/NaN) from the + dataset based on the specified criteria. + + Args: + op: The DropNa logical operator containing drop configuration. + physical_children: List containing exactly one input physical operator. + data_context: The execution context for data processing. + + Returns: + A MapOperator that performs the dropna operation on input data. + + Raises: + AssertionError: If physical_children doesn't contain exactly one operator. + """ + assert len(physical_children) == 1 + input_physical_dag = physical_children[0] + + how = op.how + subset = op.subset + thresh = op.thresh + + def fn(batch: pa.Table) -> pa.Table: + """Transform function that drops rows with missing values from a PyArrow table. + + Args: + batch: Input PyArrow table to process. + + Returns: + PyArrow table with rows containing missing values removed. + """ + try: + if batch.num_rows == 0: + return batch + + # Determine which columns to check for missing values + if subset: + # Filter out any nonexistent columns from subset + available_columns = set(batch.schema.names) + columns_to_check = [col for col in subset if col in available_columns] + # If subset is provided but no valid columns found, return original batch + if not columns_to_check: + return batch + else: + columns_to_check = list(batch.schema.names) + + # Create mask for rows to keep + if thresh is not None: + # Count non-missing values per row + mask = _create_thresh_mask(batch, columns_to_check, thresh) + else: + # Create mask based on how parameter ("any" or "all") + mask = _create_how_mask(batch, columns_to_check, how) + + # Filter the table and preserve schema + return _filter_with_schema_preservation(batch, mask) + + except Exception as e: + _try_wrap_udf_exception(e, batch) + + compute = get_compute(op._compute) + transform_fn = _generate_transform_fn_for_map_block(fn) + map_transformer = _create_map_transformer_for_block_based_map_op( + transform_fn, + ) + + return MapOperator.create( + map_transformer, + input_physical_dag, + data_context, + name="DropNa", + compute_strategy=compute, + ray_remote_args=op._ray_remote_args, + ) + + +def _is_not_missing(column: pa.Array) -> pa.Array: + """Check if values in a column are not missing (not null and not NaN). + + Args: + column: The PyArrow array to check for missing values. + + Returns: + Boolean array indicating which values are not missing. + + Examples: + .. doctest:: + + >>> import pyarrow as pa + >>> import numpy as np + >>> column = pa.array([1.0, None, np.nan, 4.0]) + >>> mask = _is_not_missing(column) + >>> mask.to_pylist() + [True, False, False, True] + """ + is_not_null = pc.is_valid(column) + + # For floating point columns, also check for NaN + if pa.types.is_floating(column.type) or pa.types.is_decimal(column.type): + is_not_nan = pc.invert(pc.is_nan(column)) + return pc.and_(is_not_null, is_not_nan) + else: + return is_not_null + + +def _create_thresh_mask( + batch: pa.Table, columns_to_check: List[str], thresh: int +) -> pa.Array: + """Create mask for threshold-based row dropping. + + Counts non-missing values across specified columns and keeps rows that + have at least 'thresh' non-missing values. + + Args: + batch: The PyArrow table to analyze. + columns_to_check: List of column names to check for missing values. + thresh: Minimum number of non-missing values required to keep a row. + + Returns: + Boolean array indicating which rows to keep. + """ + # Count non-missing values across specified columns + non_missing_counts: Optional[pa.Array] = None + + for col_name in columns_to_check: + column = batch.column(col_name) + is_not_missing = _is_not_missing(column) + is_not_missing_int = pc.cast(is_not_missing, pa.int32()) + + if non_missing_counts is None: + non_missing_counts = is_not_missing_int + else: + non_missing_counts = pc.add(non_missing_counts, is_not_missing_int) + + # Keep rows with at least thresh non-missing values + return pc.greater_equal(non_missing_counts, pa.scalar(thresh)) + + +def _create_how_mask( + batch: pa.Table, columns_to_check: List[str], how: str +) -> pa.Array: + """Create mask for how-based row dropping ('any' or 'all'). + + Args: + batch: The PyArrow table to analyze. + columns_to_check: List of column names to check for missing values. + how: Either 'any' (drop if any column has missing values) or + 'all' (drop only if all columns have missing values). + + Returns: + Boolean array indicating which rows to keep. + + Raises: + ValueError: If 'how' parameter is not 'any' or 'all'. + """ + mask: Optional[pa.Array] = None + + for col_name in columns_to_check: + column = batch.column(col_name) + is_not_missing = _is_not_missing(column) + + if mask is None: + mask = is_not_missing + else: + if how == "any": + # Keep rows where ALL checked columns are not missing + mask = pc.and_(mask, is_not_missing) + elif how == "all": + # Keep rows where ANY checked column is not missing + mask = pc.or_(mask, is_not_missing) + else: + raise ValueError(f"Invalid 'how' parameter: {how}. Must be 'any' or 'all'.") + + return mask + + +def _filter_with_schema_preservation(batch: pa.Table, mask: pa.Array) -> pa.Table: + """Filter table while preserving original schema. + + Applies the boolean mask to filter rows while ensuring the output table + maintains the same schema as the input, even for empty results. + + Args: + batch: The PyArrow table to filter. + mask: Boolean array indicating which rows to keep. + + Returns: + Filtered PyArrow table with preserved schema. + """ + filtered_batch = pc.filter(batch, mask) + + # Ensure schema is preserved (important for empty results) + if filtered_batch.schema != batch.schema: + try: + filtered_batch = filtered_batch.cast(batch.schema) + except pa.ArrowInvalid: + # If casting fails, create empty table with original schema + if filtered_batch.num_rows == 0: + filtered_batch = pa.table( + { + name: pa.array([], type=field.type) + for name, field in zip(batch.schema.names, batch.schema) + } + ) + + return filtered_batch diff --git a/python/ray/data/_internal/planner/plan_fillna_op.py b/python/ray/data/_internal/planner/plan_fillna_op.py new file mode 100644 index 000000000000..74f2bee56de0 --- /dev/null +++ b/python/ray/data/_internal/planner/plan_fillna_op.py @@ -0,0 +1,164 @@ +""" +Plan FillNa logical operator. + +This module contains the planning logic for converting FillNa logical operators +into physical execution plans using MapOperator. +""" + +from typing import Any, Dict, List + +import pyarrow as pa +import pyarrow.compute as pc + +from ray.data._internal.execution.interfaces import PhysicalOperator +from ray.data._internal.execution.operators.map_operator import MapOperator +from ray.data._internal.logical.operators.fillna_operator import FillNa +from ray.data._internal.planner.plan_udf_map_op import ( + _create_map_transformer_for_block_based_map_op, + _generate_transform_fn_for_map_block, + _try_wrap_udf_exception, + get_compute, +) +from ray.data.context import DataContext + + +def plan_fillna_op( + op: FillNa, + physical_children: List[PhysicalOperator], + data_context: DataContext, +) -> MapOperator: + """Plan a FillNa logical operator into a physical MapOperator. + + This function converts a FillNa logical operator into a physical execution + plan that fills missing values (null/None/NaN) in the dataset with specified + replacement values. + + Args: + op: The FillNa logical operator containing fill configuration. + physical_children: List containing exactly one input physical operator. + data_context: The execution context for data processing. + + Returns: + A MapOperator that performs the fillna operation on input data. + + Raises: + AssertionError: If physical_children doesn't contain exactly one operator. + """ + assert len(physical_children) == 1 + input_physical_dag = physical_children[0] + + value = op.value + subset = op.subset + + def fn(batch: pa.Table) -> pa.Table: + """Transform function that fills missing values in a PyArrow table. + + Args: + batch: Input PyArrow table to process. + + Returns: + PyArrow table with missing values filled. + """ + try: + if batch.num_rows == 0: + return batch + + # If no subset specified, apply to all columns + columns_to_fill = subset if subset else batch.schema.names + + # Create a new table with filled values + new_columns: Dict[str, pa.Array] = {} + + for col_name in batch.schema.names: + column = batch.column(col_name) + + if col_name in columns_to_fill: + if isinstance(value, dict): + # Column-specific fill values + fill_value = value.get(col_name) + if fill_value is not None: + new_columns[col_name] = _fill_column(column, fill_value) + else: + new_columns[col_name] = column + else: + # Scalar fill value for all columns + new_columns[col_name] = _fill_column(column, value) + else: + new_columns[col_name] = column + + return pa.table(new_columns) + + except Exception as e: + _try_wrap_udf_exception(e, batch) + + compute = get_compute(op._compute) + transform_fn = _generate_transform_fn_for_map_block(fn) + map_transformer = _create_map_transformer_for_block_based_map_op( + transform_fn, + ) + + return MapOperator.create( + map_transformer, + input_physical_dag, + data_context, + name="FillNa", + compute_strategy=compute, + ray_remote_args=op._ray_remote_args, + ) + + +def _fill_column(column: pa.Array, fill_value: Any) -> pa.Array: + """Fill missing values in a single PyArrow column. + + Handles null values and NaN values for floating point columns using PyArrow's + built-in capabilities with appropriate type handling and casting. + + Args: + column: The PyArrow array to fill missing values in. + fill_value: The value to use for filling missing entries. + + Returns: + A new PyArrow array with missing values filled. + + Examples: + .. doctest:: + + >>> import pyarrow as pa + >>> column = pa.array([1, None, 3]) + >>> filled = _fill_column(column, 0) + >>> filled.to_pylist() + [1, 0, 3] + """ + try: + # For null type columns, let PyArrow infer the type from the fill value + if pa.types.is_null(column.type): + # Create a new array with the fill value repeated for each row + fill_array = pa.array([fill_value] * len(column)) + return fill_array + + # For regular columns, try to create a scalar with the column's type + try: + fill_scalar = pa.scalar(fill_value, type=column.type) + except (pa.ArrowInvalid, pa.ArrowTypeError): + # If type conversion fails, let PyArrow handle it by inferring from value + fill_scalar = pa.scalar(fill_value) + # Try to cast to column type, but if it fails, PyArrow will handle the type promotion + try: + fill_scalar = fill_scalar.cast(column.type) + except (pa.ArrowInvalid, pa.ArrowNotImplementedError): + # If casting fails, use the inferred type + pass + + # Use PyArrow's fill_null to handle null values + filled_column = pc.fill_null(column, fill_scalar) + + # For floating point columns, also handle NaN values + if pa.types.is_floating(filled_column.type): + nan_mask = pc.is_nan(filled_column) + filled_column = pc.if_else(nan_mask, fill_scalar, filled_column) + + return filled_column + + except Exception: + # If all else fails, return original column to maintain data integrity + return column diff --git a/python/ray/data/_internal/planner/planner.py b/python/ray/data/_internal/planner/planner.py index 3b42c4ab7f91..0b2b1da4e13c 100644 --- a/python/ray/data/_internal/planner/planner.py +++ b/python/ray/data/_internal/planner/planner.py @@ -21,6 +21,8 @@ AbstractAllToAll, ) from ray.data._internal.logical.operators.count_operator import Count +from ray.data._internal.logical.operators.dropna_operator import DropNa +from ray.data._internal.logical.operators.fillna_operator import FillNa from ray.data._internal.logical.operators.from_operators import AbstractFrom from ray.data._internal.logical.operators.input_data_operator import InputData from ray.data._internal.logical.operators.join_operator import Join @@ -36,6 +38,8 @@ from ray.data._internal.logical.operators.streaming_split_operator import StreamingSplit from ray.data._internal.logical.operators.write_operator import Write from ray.data._internal.planner.plan_all_to_all_op import plan_all_to_all_op +from ray.data._internal.planner.plan_dropna_op import plan_dropna_op +from ray.data._internal.planner.plan_fillna_op import plan_fillna_op from ray.data._internal.planner.plan_read_op import plan_read_op from ray.data._internal.planner.plan_udf_map_op import ( plan_filter_op, @@ -59,7 +63,6 @@ def plan_input_data_op( ) -> PhysicalOperator: """Get the corresponding DAG of physical operators for InputData.""" assert len(physical_children) == 0 - return InputDataBuffer( data_context, input_data=logical_op.input_data, @@ -153,6 +156,8 @@ class Planner: Zip: plan_zip_op, Limit: plan_limit_op, Count: plan_count_op, + DropNa: plan_dropna_op, + FillNa: plan_fillna_op, Project: plan_project_op, StreamingRepartition: plan_streaming_repartition_op, Join: plan_join_op, diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 8766434c425a..3b25d9dc9f60 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -66,6 +66,8 @@ Sort, ) from ray.data._internal.logical.operators.count_operator import Count +from ray.data._internal.logical.operators.dropna_operator import DropNa +from ray.data._internal.logical.operators.fillna_operator import FillNa from ray.data._internal.logical.operators.input_data_operator import InputData from ray.data._internal.logical.operators.join_operator import Join from ray.data._internal.logical.operators.map_operator import ( @@ -782,6 +784,204 @@ def _map_batches_without_batch_size_validation( logical_plan = LogicalPlan(map_batches_op, self.context) return Dataset(plan, logical_plan) + @PublicAPI(api_group=BT_API_GROUP) + def fillna( + self, + value: Union[Any, Dict[str, Any]] = None, + *, + subset: Optional[List[str]] = None, + compute: Union[str, ComputeStrategy] = None, + **ray_remote_args, + ) -> "Dataset": + """Fill missing values (null/NaN) in the dataset. + + This method fills missing values with the specified value, similar to + pandas.DataFrame.fillna() and PySpark DataFrame.na.fill(). + + Examples: + Fill all missing values with a scalar: + + .. testcode:: + + import ray + import numpy as np + + ds = ray.data.from_items([ + {"a": 1, "b": 2.0}, + {"a": None, "b": np.nan}, + {"a": 3, "b": None} + ]) + ds.fillna(0).show() + + Fill missing values with column-specific values: + + .. testcode:: + + import ray + import numpy as np + + ds = ray.data.from_items([ + {"a": 1, "b": 2.0}, + {"a": None, "b": np.nan}, + {"a": 3, "b": None} + ]) + ds.fillna({"a": 0, "b": -1.0}).show() + + Fill missing values in specific columns: + + .. testcode:: + + import ray + import numpy as np + + ds = ray.data.from_items([ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": None, "b": np.nan, "c": None}, + {"a": 3, "b": None, "c": "z"} + ]) + ds.fillna(0, subset=["a"]).show() + + Time complexity: O(dataset size / parallelism) + + Args: + value: Value to use to fill missing values. Can be a scalar value to fill all + missing values, or a dictionary mapping column names to fill values for + specific columns. + subset: List of column names to consider for filling. If None, all columns + are considered. + compute: This argument is deprecated. Use ``ray_remote_args`` to configure + distributed execution. + ray_remote_args: Additional resource requirements to request from + Ray (e.g., num_gpus=1 to request GPUs for the map tasks). See + :func:`ray.remote` for details. + + Returns: + A new dataset with missing values filled. + """ + plan = self._plan.copy() + fillna_op = FillNa( + self._logical_plan.dag, + value=value, + subset=subset, + compute=compute, + ray_remote_args=ray_remote_args, + ) + logical_plan = LogicalPlan(fillna_op, self.context) + return Dataset(plan, logical_plan) + + @PublicAPI(api_group=BT_API_GROUP) + def dropna( + self, + *, + how: str = "any", + subset: Optional[List[str]] = None, + thresh: Optional[int] = None, + compute: Union[str, ComputeStrategy] = None, + **ray_remote_args, + ) -> "Dataset": + """Drop rows with missing values (null/NaN) from the dataset. + + This method removes rows containing missing values, similar to + pandas.DataFrame.dropna() and PySpark DataFrame.na.drop(). + + Examples: + Drop rows with any missing values: + + .. testcode:: + + import ray + import numpy as np + + ds = ray.data.from_items([ + {"a": 1, "b": 2.0}, + {"a": None, "b": 3.0}, + {"a": 2, "b": np.nan}, + {"a": 3, "b": 4.0} + ]) + ds.dropna().show() + + Drop rows where all values are missing: + + .. testcode:: + + import ray + import numpy as np + + ds = ray.data.from_items([ + {"a": 1, "b": 2.0}, + {"a": None, "b": None}, + {"a": 2, "b": np.nan}, + {"a": 3, "b": 4.0} + ]) + ds.dropna(how="all").show() + + Drop rows with missing values in specific columns: + + .. testcode:: + + import ray + import numpy as np + + ds = ray.data.from_items([ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": None, "b": 3.0, "c": "y"}, + {"a": 2, "b": np.nan, "c": "z"}, + {"a": 3, "b": 4.0, "c": None} + ]) + ds.dropna(subset=["a", "b"]).show() + + Drop rows with less than a threshold of non-null values: + + .. testcode:: + + import ray + import numpy as np + + ds = ray.data.from_items([ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": None, "b": None, "c": "y"}, + {"a": 2, "b": np.nan, "c": None}, + {"a": 3, "b": 4.0, "c": None} + ]) + ds.dropna(thresh=2).show() + + Time complexity: O(dataset size / parallelism) + + Args: + how: Determines how to drop rows: + - "any": Drop rows where any value is missing (default) + - "all": Drop rows where all values are missing + subset: List of column names to consider for dropping. If None, all columns + are considered. + thresh: Minimum number of non-null values required to keep a row. If specified, + this overrides the 'how' parameter. + compute: This argument is deprecated. Use ``ray_remote_args`` to configure + distributed execution. + ray_remote_args: Additional resource requirements to request from + Ray (e.g., num_gpus=1 to request GPUs for the map tasks). See + :func:`ray.remote` for details. + + Returns: + A new dataset with rows containing missing values removed. + + Raises: + ValueError: If 'how' is not "any" or "all". + """ + if how not in ["any", "all"]: + raise ValueError("'how' must be 'any' or 'all'") + + plan = self._plan.copy() + dropna_op = DropNa( + self._logical_plan.dag, + how=how, + subset=subset, + thresh=thresh, + compute=compute, + ray_remote_args=ray_remote_args, + ) + logical_plan = LogicalPlan(dropna_op, self.context) + return Dataset(plan, logical_plan) + @PublicAPI(api_group=EXPRESSION_API_GROUP, stability="alpha") def with_column(self, column_name: str, expr: Expr, **ray_remote_args) -> "Dataset": """ diff --git a/python/ray/data/tests/test_dropna.py b/python/ray/data/tests/test_dropna.py new file mode 100644 index 000000000000..527f349dc567 --- /dev/null +++ b/python/ray/data/tests/test_dropna.py @@ -0,0 +1,197 @@ +import numpy as np +import pandas as pd +import pyarrow as pa +import pytest + +import ray +from ray.data.tests.conftest import * # noqa + + +def test_dropna_any(): + """Test dropna with how='any' drops rows with any missing values.""" + ds = ray.data.from_items( + [ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": None, "b": 3.0, "c": "y"}, + {"a": 2, "b": np.nan, "c": "z"}, + {"a": 3, "b": 4.0, "c": None}, + {"a": 4, "b": 5.0, "c": "w"}, + ] + ) + + result = ds.dropna(how="any") + rows = result.take_all() + + expected = [{"a": 1, "b": 2.0, "c": "x"}, {"a": 4, "b": 5.0, "c": "w"}] + + assert rows == expected + + +def test_dropna_all(): + """Test dropna with how='all' drops rows where all values are missing.""" + ds = ray.data.from_items( + [ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": None, "b": None, "c": None}, + {"a": 2, "b": np.nan, "c": "z"}, + {"a": None, "b": 4.0, "c": None}, + {"a": 4, "b": 5.0, "c": "w"}, + ] + ) + + result = ds.dropna(how="all") + rows = result.take_all() + + expected = [ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": 2, "b": np.nan, "c": "z"}, + {"a": None, "b": 4.0, "c": None}, + {"a": 4, "b": 5.0, "c": "w"}, + ] + + # Compare while handling NaN/None values + assert len(rows) == len(expected) + for i, (actual, exp) in enumerate(zip(rows, expected)): + for key in exp.keys(): + if pd.isna(exp[key]): + assert pd.isna(actual[key]) + else: + assert actual[key] == exp[key] + + +def test_dropna_subset(): + """Test dropna with subset parameter to consider only specified columns.""" + ds = ray.data.from_items( + [ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": None, "b": 3.0, "c": "y"}, + {"a": 2, "b": np.nan, "c": "z"}, + {"a": 3, "b": 4.0, "c": None}, + {"a": 4, "b": 5.0, "c": "w"}, + ] + ) + + result = ds.dropna(subset=["a", "b"]) + rows = result.take_all() + + expected = [ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": 3, "b": 4.0, "c": None}, + {"a": 4, "b": 5.0, "c": "w"}, + ] + + assert rows == expected + + +def test_dropna_thresh(): + """Test dropna with thresh parameter for minimum non-null values.""" + ds = ray.data.from_items( + [ + {"a": 1, "b": 2.0, "c": "x"}, # 3 non-null + {"a": None, "b": 3.0, "c": "y"}, # 2 non-null + {"a": 2, "b": np.nan, "c": None}, # 1 non-null + {"a": None, "b": None, "c": None}, # 0 non-null + {"a": 4, "b": 5.0, "c": "w"}, # 3 non-null + ] + ) + + result = ds.dropna(thresh=2) + rows = result.take_all() + + expected = [ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": None, "b": 3.0, "c": "y"}, + {"a": 4, "b": 5.0, "c": "w"}, + ] + + # Compare while handling None values + assert len(rows) == len(expected) + for i, (actual, exp) in enumerate(zip(rows, expected)): + for key in exp.keys(): + if exp[key] is None: + assert actual[key] is None + else: + assert actual[key] == exp[key] + + +def test_dropna_thresh_with_subset(): + """Test dropna with thresh parameter and subset.""" + ds = ray.data.from_items( + [ + {"a": 1, "b": 2.0, "c": "x"}, # a,b: 2 non-null + {"a": None, "b": 3.0, "c": "y"}, # a,b: 1 non-null + {"a": 2, "b": np.nan, "c": "z"}, # a,b: 1 non-null + {"a": None, "b": None, "c": "w"}, # a,b: 0 non-null + {"a": 4, "b": 5.0, "c": "v"}, # a,b: 2 non-null + ] + ) + + result = ds.dropna(thresh=2, subset=["a", "b"]) + rows = result.take_all() + + expected = [{"a": 1, "b": 2.0, "c": "x"}, {"a": 4, "b": 5.0, "c": "v"}] + + assert rows == expected + + +def test_dropna_empty_dataset(): + """Test dropna on empty dataset.""" + schema = pa.schema([("a", pa.int64()), ("b", pa.float64()), ("c", pa.string())]) + ds = ray.data.from_arrow(pa.table({"a": [], "b": [], "c": []}, schema=schema)) + + result = ds.dropna() + assert result.count() == 0 + + +def test_dropna_no_missing_values(): + """Test dropna on dataset with no missing values.""" + ds = ray.data.from_items( + [ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": 2, "b": 3.0, "c": "y"}, + {"a": 3, "b": 4.0, "c": "z"}, + ] + ) + + result = ds.dropna() + rows = result.take_all() + + expected = [ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": 2, "b": 3.0, "c": "y"}, + {"a": 3, "b": 4.0, "c": "z"}, + ] + + assert rows == expected + + +def test_dropna_all_rows_dropped(): + """Test dropna when all rows should be dropped.""" + ds = ray.data.from_items( + [ + {"a": None, "b": np.nan, "c": None}, + {"a": None, "b": None, "c": None}, + {"a": np.nan, "b": None, "c": np.nan}, + ] + ) + + result = ds.dropna(how="any") + assert result.count() == 0 + + +def test_dropna_single_column(): + """Test dropna on dataset with single column.""" + ds = ray.data.from_items([{"a": 1}, {"a": None}, {"a": 3}, {"a": None}, {"a": 5}]) + + result = ds.dropna() + rows = result.take_all() + + expected = [{"a": 1}, {"a": 3}, {"a": 5}] + + assert rows == expected + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main([__file__])) diff --git a/python/ray/data/tests/test_fillna.py b/python/ray/data/tests/test_fillna.py new file mode 100644 index 000000000000..d340ce61b1e6 --- /dev/null +++ b/python/ray/data/tests/test_fillna.py @@ -0,0 +1,148 @@ +import numpy as np +import pandas as pd +import pyarrow as pa +import pytest + +import ray +from ray.data.tests.conftest import * # noqa + + +def test_fillna_scalar_value(): + """Test fillna with scalar value fills all missing values.""" + ds = ray.data.from_items( + [ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": None, "b": np.nan, "c": None}, + {"a": 3, "b": None, "c": "z"}, + {"a": 4, "b": 5.0, "c": "w"}, + ] + ) + + result = ds.fillna(0) + rows = result.take_all() + + expected = [ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": 0, "b": 0.0, "c": "0"}, + {"a": 3, "b": 0.0, "c": "z"}, + {"a": 4, "b": 5.0, "c": "w"}, + ] + + assert rows == expected + + +def test_fillna_dict_values(): + """Test fillna with dictionary values for column-specific filling.""" + ds = ray.data.from_items( + [ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": None, "b": np.nan, "c": None}, + {"a": 3, "b": None, "c": "z"}, + {"a": None, "b": 4.0, "c": None}, + ] + ) + + result = ds.fillna({"a": -1, "b": -2.0, "c": "missing"}) + rows = result.take_all() + + expected = [ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": -1, "b": -2.0, "c": "missing"}, + {"a": 3, "b": -2.0, "c": "z"}, + {"a": -1, "b": 4.0, "c": "missing"}, + ] + + assert rows == expected + + +def test_fillna_with_subset(): + """Test fillna with subset parameter to fill only specified columns.""" + ds = ray.data.from_items( + [ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": None, "b": np.nan, "c": None}, + {"a": 3, "b": None, "c": "z"}, + {"a": None, "b": 4.0, "c": None}, + ] + ) + + result = ds.fillna(0, subset=["a"]) + rows = result.take_all() + + expected = [ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": 0, "b": np.nan, "c": None}, + {"a": 3, "b": None, "c": "z"}, + {"a": 0, "b": 4.0, "c": None}, + ] + + # Compare while handling NaN values + assert len(rows) == len(expected) + for i, (actual, exp) in enumerate(zip(rows, expected)): + assert actual["a"] == exp["a"] + assert actual["c"] == exp["c"] + if pd.isna(exp["b"]): + assert pd.isna(actual["b"]) + else: + assert actual["b"] == exp["b"] + + +def test_fillna_empty_dataset(): + """Test fillna on empty dataset.""" + schema = pa.schema([("a", pa.int64()), ("b", pa.float64()), ("c", pa.string())]) + ds = ray.data.from_arrow(pa.table({"a": [], "b": [], "c": []}, schema=schema)) + + result = ds.fillna(0) + assert result.count() == 0 + + +def test_fillna_no_missing_values(): + """Test fillna on dataset with no missing values.""" + ds = ray.data.from_items( + [ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": 2, "b": 3.0, "c": "y"}, + {"a": 3, "b": 4.0, "c": "z"}, + ] + ) + + result = ds.fillna(0) + rows = result.take_all() + + expected = [ + {"a": 1, "b": 2.0, "c": "x"}, + {"a": 2, "b": 3.0, "c": "y"}, + {"a": 3, "b": 4.0, "c": "z"}, + ] + + assert rows == expected + + +def test_fillna_different_dtypes(): + """Test fillna with different data types.""" + ds = ray.data.from_items( + [ + {"int_col": 1, "float_col": 1.5, "str_col": "a", "bool_col": True}, + {"int_col": None, "float_col": np.nan, "str_col": None, "bool_col": None}, + {"int_col": 3, "float_col": None, "str_col": "c", "bool_col": False}, + ] + ) + + result = ds.fillna( + {"int_col": 0, "float_col": 0.0, "str_col": "missing", "bool_col": False} + ) + rows = result.take_all() + + expected = [ + {"int_col": 1, "float_col": 1.5, "str_col": "a", "bool_col": True}, + {"int_col": 0, "float_col": 0.0, "str_col": "missing", "bool_col": False}, + {"int_col": 3, "float_col": 0.0, "str_col": "c", "bool_col": False}, + ] + + assert rows == expected + + +if __name__ == "__main__": + import sys + + sys.exit(pytest.main([__file__]))