Skip to content

[Data] Add fillna and dropna functions #54844

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1503,3 +1503,31 @@ py_test(
"//:ray_lib",
],
)

py_test(
name = "test_fillna",
size = "small",
srcs = ["tests/test_fillna.py"],
tags = [
"exclusive",
"team:data",
],
deps = [
":conftest",
"//:ray_lib",
],
)

py_test(
name = "test_dropna",
size = "small",
srcs = ["tests/test_dropna.py"],
tags = [
"exclusive",
"team:data",
],
deps = [
":conftest",
"//:ray_lib",
],
)
74 changes: 74 additions & 0 deletions python/ray/data/_internal/logical/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,3 +360,77 @@ def target_num_rows_per_block(self) -> int:

def can_modify_num_rows(self) -> bool:
return False


class FillNa(AbstractMap):
"""Logical operator for fillna operation."""

def __init__(
self,
input_op: LogicalOperator,
value: Any,
subset: Optional[List[str]] = None,
compute: Optional[ComputeStrategy] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
super().__init__(
"FillNa",
input_op=input_op,
compute=compute,
ray_remote_args=ray_remote_args,
)
self._value = value
self._subset = subset
self._batch_format = "pyarrow"
self._zero_copy_batch = True

@property
def value(self) -> Any:
return self._value

@property
def subset(self) -> Optional[List[str]]:
return self._subset

def can_modify_num_rows(self) -> bool:
return False


class DropNa(AbstractMap):
"""Logical operator for dropna operation."""

def __init__(
self,
input_op: LogicalOperator,
how: str = "any",
subset: Optional[List[str]] = None,
thresh: Optional[int] = None,
compute: Optional[ComputeStrategy] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
):
super().__init__(
"DropNa",
input_op=input_op,
compute=compute,
ray_remote_args=ray_remote_args,
)
self._how = how
self._subset = subset
self._thresh = thresh
self._batch_format = "pyarrow"
self._zero_copy_batch = True

@property
def how(self) -> str:
return self._how

@property
def subset(self) -> Optional[List[str]]:
return self._subset

@property
def thresh(self) -> Optional[int]:
return self._thresh

def can_modify_num_rows(self) -> bool:
return True
148 changes: 148 additions & 0 deletions python/ray/data/_internal/planner/plan_udf_map_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,3 +820,151 @@ def _transform(batch_iter: Iterable[T], task_context: TaskContext) -> Iterable[U
yield item

return _transform


def plan_fillna_op(
op: "FillNa",
physical_children: List[PhysicalOperator],
data_context: DataContext,
) -> MapOperator:
"""Plan a FillNa logical operator."""
import pyarrow as pa
import pyarrow.compute as pc
from ray.data._internal.logical.operators.map_operator import FillNa

assert len(physical_children) == 1
input_physical_dag = physical_children[0]

value = op.value
subset = op.subset

def fn(batch: pa.Table) -> pa.Table:
try:
# If no subset specified, apply to all columns
columns_to_fill = subset if subset else batch.schema.names

# Create a new table with filled values
new_columns = {}

for col_name in batch.schema.names:
column = batch.column(col_name)

if col_name in columns_to_fill:
if isinstance(value, dict):
# Column-specific fill values
fill_value = value.get(col_name)
if fill_value is not None:
# Convert fill_value to appropriate PyArrow scalar
fill_scalar = pa.scalar(fill_value, type=column.type)
new_columns[col_name] = pc.fill_null(column, fill_scalar)
else:
new_columns[col_name] = column
else:
# Scalar fill value for all columns
fill_scalar = pa.scalar(value, type=column.type)
new_columns[col_name] = pc.fill_null(column, fill_scalar)
else:
new_columns[col_name] = column

return pa.table(new_columns, schema=batch.schema)

except Exception as e:
_try_wrap_udf_exception(e, batch)

compute = get_compute(op._compute)
transform_fn = _generate_transform_fn_for_map_block(fn)
map_transformer = _create_map_transformer_for_block_based_map_op(
transform_fn,
batch_format="pyarrow",
zero_copy_batch=True,
)

return MapOperator.create(
map_transformer,
input_physical_dag,
name="FillNa",
compute_strategy=compute,
ray_remote_args=op._ray_remote_args,
)


def plan_dropna_op(
op: "DropNa",
physical_children: List[PhysicalOperator],
data_context: DataContext,
) -> MapOperator:
"""Plan a DropNa logical operator."""
import pyarrow as pa
import pyarrow.compute as pc
from ray.data._internal.logical.operators.map_operator import DropNa

assert len(physical_children) == 1
input_physical_dag = physical_children[0]

how = op.how
subset = op.subset
thresh = op.thresh

def fn(batch: pa.Table) -> pa.Table:
try:
if batch.num_rows == 0:
return batch

# Determine which columns to check for nulls
columns_to_check = subset if subset else batch.schema.names

if thresh is not None:
# Threshold-based dropping: keep rows with at least `thresh` non-null values
non_null_counts = pa.array([0] * batch.num_rows, type=pa.int32())

for col_name in columns_to_check:
column = batch.column(col_name)
is_valid = pc.is_valid(column).cast(pa.int32())
non_null_counts = pc.add(non_null_counts, is_valid)

mask = pc.greater_equal(non_null_counts, pa.scalar(thresh))

else:
# Create mask based on how parameter
if how == "any":
# Drop rows where ANY value in specified columns is null
mask = None
for col_name in columns_to_check:
column = batch.column(col_name)
is_valid = pc.is_valid(column)
if mask is None:
mask = is_valid
else:
mask = pc.and_(mask, is_valid)
elif how == "all":
# Drop rows where ALL values in specified columns are null
mask = None
for col_name in columns_to_check:
column = batch.column(col_name)
is_valid = pc.is_valid(column)
if mask is None:
mask = is_valid
else:
mask = pc.or_(mask, is_valid)

# Filter the table using the mask
return pc.filter(batch, mask)

except Exception as e:
_try_wrap_udf_exception(e, batch)

compute = get_compute(op._compute)
transform_fn = _generate_transform_fn_for_map_block(fn)
map_transformer = _create_map_transformer_for_block_based_map_op(
transform_fn,
batch_format="pyarrow",
zero_copy_batch=True,
)

return MapOperator.create(
map_transformer,
input_physical_dag,
name="DropNa",
compute_strategy=compute,
ray_remote_args=op._ray_remote_args,
)
6 changes: 6 additions & 0 deletions python/ray/data/_internal/planner/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def _register_default_plan_logical_op_fns():
from ray.data._internal.logical.operators.input_data_operator import InputData
from ray.data._internal.logical.operators.map_operator import (
AbstractUDFMap,
DropNa,
FillNa,
Filter,
Project,
StreamingRepartition,
Expand All @@ -62,6 +64,8 @@ def _register_default_plan_logical_op_fns():
from ray.data._internal.planner.plan_all_to_all_op import plan_all_to_all_op
from ray.data._internal.planner.plan_read_op import plan_read_op
from ray.data._internal.planner.plan_udf_map_op import (
plan_dropna_op,
plan_fillna_op,
plan_filter_op,
plan_project_op,
plan_streaming_repartition_op,
Expand Down Expand Up @@ -130,6 +134,8 @@ def plan_count_op(logical_op, physical_children, data_context):
register_plan_logical_op_fn(Count, plan_count_op)

register_plan_logical_op_fn(Project, plan_project_op)
register_plan_logical_op_fn(FillNa, plan_fillna_op)
register_plan_logical_op_fn(DropNa, plan_dropna_op)

register_plan_logical_op_fn(StreamingRepartition, plan_streaming_repartition_op)

Expand Down
Loading