diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 38390c39671..90224c44fad 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -2457,6 +2457,9 @@ def _copartition(self, axis, other, how, sort, force_repartition=False): base_frame._partitions, make_reindexer(do_reindex_base, base_frame_idx), ) + base_frame._partition_mgr_cls._update_partition_dimension_caches( + reindexed_base[0] if axis else reindexed_base.T[0] + ) if axis: base_lengths = [obj.width() for obj in reindexed_base[0]] else: diff --git a/modin/core/dataframe/pandas/partitioning/partition.py b/modin/core/dataframe/pandas/partitioning/partition.py index 8b56a64802c..559948260a6 100644 --- a/modin/core/dataframe/pandas/partitioning/partition.py +++ b/modin/core/dataframe/pandas/partitioning/partition.py @@ -15,6 +15,7 @@ from abc import ABC from copy import copy +from typing import Union, Any import pandas from pandas.api.types import is_scalar @@ -272,6 +273,24 @@ def length(self): ------- int The length of the object. + + Notes + ----- + Subclasses where `build_length_cache` returns a future-like object instead of a concrete + value should override this method to force the future's materialization. + """ + return self.build_length_cache() + + def build_length_cache(self) -> Union[Any, int]: + """ + Attempt to set this partition's length cache, and return it. + + Returns + ------- + Any | int + Either a future-like object representing the length of the object wrapped by this + partition, or the concrete value of the length if it was already cached or was + just computed. """ if self._length_cache is None: cls = type(self) @@ -280,6 +299,21 @@ def length(self): self._length_cache = self.apply(preprocessed_func) return self._length_cache + def set_length_cache(self, length: int): + """ + Attempt to set this partition's length cache field. + + This should be used in situations where the futures returned by ``build_length_cache`` + for multiple partitions were computed in parallel, and the value now needs to be + propagated back to this partition. + + Parameters + ---------- + length : int + The new value of the length cache. + """ + self._length_cache = length + def width(self): """ Get the width of the object wrapped by the partition. @@ -288,6 +322,24 @@ def width(self): ------- int The width of the object. + + Notes + ----- + Subclasses where `build_width_cache` returns a future-like object instead of a concrete + int should override this method to force the future's materialization. + """ + return self.build_width_cache() + + def build_width_cache(self) -> Union[Any, int]: + """ + Attempt to set this partition's width cache, and return it. + + Returns + ------- + Any | int + Either a future-like object representing the width of the object wrapped by this + partition, or the concrete value of the width if it was already cached or was + just computed. """ if self._width_cache is None: cls = type(self) @@ -296,6 +348,21 @@ def width(self): self._width_cache = self.apply(preprocessed_func) return self._width_cache + def set_width_cache(self, width: int): + """ + Attempt to set this partition's width cache field. + + This should be used in situations where the futures returned by ``build_width_cache`` + for multiple partitions were computed in parallel, and the value now needs to be + propagated back to this partition. + + Parameters + ---------- + width : int + The new value of the width cache. + """ + self._width_cache = width + @classmethod def empty(cls): """ diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index dc2042bf997..da1eb4a17c8 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -866,6 +866,25 @@ def get_indices(cls, axis, partitions, index_func=None): total_idx = new_idx[0].append(new_idx[1:]) if new_idx else new_idx return total_idx, new_idx + @classmethod + def _update_partition_dimension_caches(cls, partitions: np.ndarray): + """ + Build and set the length and width caches of each _physical_ partition in the given list of partitions. + + Parameters + ---------- + partitions : np.ndarray + The partitions for which to update length caches. + + Notes + ----- + For backends that support parallel computations, these caches are be computed asynchronously. + The naive implementation computes the length and width caches in serial. + """ + for part in partitions: + part.build_width_cache() + part.build_length_cache() + @classmethod def _apply_func_to_list_of_partitions_broadcast( cls, func, partitions, other, **kwargs @@ -1210,6 +1229,19 @@ def apply_func_to_indices_both_axis( if col_widths is None: col_widths = [None] * len(col_partitions_list) + if row_lengths is None and col_widths is None: + # Before anything else, compute length/widths of each partition (possibly in parallel) + all_parts = np.array( + [ + [ + partition_copy[row_blk_idx, col_blk_idx] + for row_blk_idx, _ in row_partitions_list + ] + for col_blk_idx, _ in col_partitions_list + ] + ).flatten() + cls._update_partition_dimension_caches(all_parts) + def compute_part_size(indexer, remote_part, part_idx, axis): """Compute indexer length along the specified axis for the passed partition.""" if isinstance(indexer, slice): diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py b/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py index 0466e25b163..77b4e468a47 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/dataframe/dataframe.py @@ -56,35 +56,30 @@ def _get_partition_size_along_axis(self, partition, axis=0): Returns ------- list - A list of lengths along the specified axis that sum to the overall length of the partition - along the specified axis. + A list of Dask futures representing lengths along the specified axis that sum to + the overall length of the partition along the specified axis. Notes ----- This utility function is used to ensure that computation occurs asynchronously across all partitions whether the partitions are virtual or physical partitions. """ + + def len_fn(df): + return len(df) if not axis else len(df.columns) + if isinstance(partition, self._partition_mgr_cls._partition_class): - return [ - partition.apply( - lambda df: len(df) if not axis else len(df.columns) - )._data - ] + return [partition.apply(len_fn)._data] elif partition.axis == axis: return [ - ptn.apply(lambda df: len(df) if not axis else len(df.columns))._data - for ptn in partition.list_of_block_partitions + ptn.apply(len_fn)._data for ptn in partition.list_of_block_partitions ] - return [ - partition.list_of_block_partitions[0] - .apply(lambda df: len(df) if not axis else (len(df.columns))) - ._data - ] + return [partition.list_of_block_partitions[0].apply(len_fn)._data] @property def _row_lengths(self): """ - Compute ther row partitions lengths if they are not cached. + Compute the row partitions lengths if they are not cached. Returns ------- diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py index fe3e4649f11..339b1301a59 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition.py @@ -13,6 +13,8 @@ """Module houses class that wraps data (block partition) and its metadata.""" +from typing import Union + from distributed import Future from distributed.utils import get_ip from dask.distributed import wait @@ -256,12 +258,25 @@ def length(self): int The length of the object. """ - if self._length_cache is None: - self._length_cache = self.apply(lambda df: len(df))._data + self.build_length_cache() if isinstance(self._length_cache, Future): self._length_cache = DaskWrapper.materialize(self._length_cache) return self._length_cache + def build_length_cache(self) -> Union[Future, int]: + """ + Attempt to set this partition's length cache, and return it. + + Returns + ------- + distributed.Future | int + Either a Dask future representing the length of the object wrapped by this partition, + or the concrete value of the length if it was already cached. + """ + if self._length_cache is None: + self._length_cache = self.apply(lambda df: len(df))._data + return self._length_cache + def width(self): """ Get the width of the object wrapped by the partition. @@ -271,12 +286,25 @@ def width(self): int The width of the object. """ - if self._width_cache is None: - self._width_cache = self.apply(lambda df: len(df.columns))._data + self.build_width_cache() if isinstance(self._width_cache, Future): self._width_cache = DaskWrapper.materialize(self._width_cache) return self._width_cache + def build_width_cache(self) -> Union[Future, int]: + """ + Attempt to set this partition's width cache, and return it. + + Returns + ------- + distributed.Future | int + Either a Dask future representing the length of the object wrapped by this partition, + or the concrete value of the length if it was already cached. + """ + if self._width_cache is None: + self._width_cache = self.apply(lambda df: len(df.columns))._data + return self._width_cache + def ip(self): """ Get the node IP address of the object wrapped by this partition. diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py index 4c4d1618c6b..050cf44160f 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py @@ -382,6 +382,20 @@ def length(self): """ if self._length_cache is None: if self.axis == 0: + caches = [ + obj.build_length_cache() + for obj in self.list_of_partitions_to_combine + ] + new_lengths = DaskWrapper.materialize( + [cache for cache in caches if isinstance(cache, Future)] + ) + dask_idx = 0 + for i, cache in enumerate(caches): + if isinstance(cache, Future): + self.list_of_partitions_to_combine[i].set_length_cache( + new_lengths[dask_idx] + ) + dask_idx += 1 self._length_cache = sum( obj.length() for obj in self.list_of_block_partitions ) @@ -402,6 +416,20 @@ def width(self): """ if self._width_cache is None: if self.axis == 1: + caches = [ + obj.build_width_cache() + for obj in self.list_of_partitions_to_combine + ] + new_widths = DaskWrapper.materialize( + [cache for cache in caches if isinstance(cache, Future)] + ) + dask_idx = 0 + for i, cache in enumerate(caches): + if isinstance(cache, Future): + self.list_of_partitions_to_combine[i].set_width_cache( + new_widths[dask_idx] + ) + dask_idx += 1 self._width_cache = sum( obj.width() for obj in self.list_of_block_partitions ) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/dataframe/dataframe.py b/modin/core/execution/ray/implementations/pandas_on_ray/dataframe/dataframe.py index 9070caa078c..60306c547a4 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/dataframe/dataframe.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/dataframe/dataframe.py @@ -13,6 +13,8 @@ """Module houses class that implements ``PandasDataframe`` using Ray.""" +import ray + from ..partitioning.partition_manager import PandasOnRayDataframePartitionManager from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe @@ -40,3 +42,123 @@ class PandasOnRayDataframe(PandasDataframe): """ _partition_mgr_cls = PandasOnRayDataframePartitionManager + + def _get_partition_size_along_axis(self, partition, axis=0): + """ + Compute the length along the specified axis of the specified partition. + + Parameters + ---------- + partition : ``PandasOnRayDataframeVirtualPartition`` or ``PandasOnRayDataframePartition`` + The partition whose size to compute. + axis : int, default: 0 + The axis along which to compute size. + + Returns + ------- + list + A list of Ray object IDs representing lengths along the specified axis that sum to the overall length of the partition + along the specified axis. + + Notes + ----- + This utility function is used to ensure that computation occurs asynchronously across all partitions + whether the partitions are virtual or physical partitions. + """ + + def len_fn(df): + return len(df) if not axis else len(df.columns) + + if isinstance(partition, self._partition_mgr_cls._partition_class): + return [partition.apply(len_fn)._data] + elif partition.axis == axis: + return [ + ptn.apply(len_fn)._data + for ptn in partition.list_of_partitions_to_combine + ] + return [partition.list_of_partitions_to_combine[0].apply(len_fn)._data] + + def _ray_get_nested(self, ray_list): + """ + Get the result of computations of a nested list of Ray object IDs, and returns a nested same dimension. + + For example, calling `_ray_get_nested([id1, [id2], id3])` would return a list of the form `[val1, [val2], val3]`. + + This function does not work for lists that are nested more than 3 layers (e.g. `[[[id]]]`). + + Parameters + ---------- + ray_list : list + A 2D list of Ray object IDs. + + Returns + ------- + list + A 2D list of computed values corresponding to the passed in object IDs, with the same + structure as the list that was passed in. + """ + # Lengths of lists in original `ray_list`, or -1 if just a single item and not a list + lens = [] + flat_obj_ids = [] + for lst_or_id in ray_list: + if isinstance(lst_or_id, list): + lens.append(len(lst_or_id)) + flat_obj_ids.extend(lst_or_id) + else: + lens.append(-1) + flat_obj_ids.append(lst_or_id) + flat_values = ray.get(flat_obj_ids) + nested_values = [] + flat_index = 0 + for length in lens: + if length == -1: + # Original list had a single element here + nested_values.append(flat_values[flat_index]) + flat_index += 1 + else: + # Original list had a nested list here + nested_values.append(flat_values[flat_index : flat_index + length]) + flat_index += length + return nested_values + + @property + def _row_lengths(self): + """ + Compute the row partitions lengths if they are not cached. + + Returns + ------- + list + A list of row partitions lengths. + """ + if self._row_lengths_cache is None: + row_lengths_list = self._ray_get_nested( + [ + self._get_partition_size_along_axis(obj, axis=0) + for obj in self._partitions.T[0] + ] + ) + self._row_lengths_cache = [sum(len_list) for len_list in row_lengths_list] + return self._row_lengths_cache + + @property + def _column_widths(self): + """ + Compute the column partitions widths if they are not cached. + + Returns + ------- + list + A list of column partitions widths. + """ + if self._column_widths_cache is None: + col_widths_list = self._ray_get_nested( + [ + self._get_partition_size_along_axis(obj, axis=1) + for obj in self._partitions[0] + ] + ) + self._column_widths_cache = [ + sum(width_list) for width_list in col_widths_list + ] + return self._column_widths_cache diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py index b7514c288d7..d4c67727678 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py @@ -13,6 +13,8 @@ """Module houses class that wraps data (block partition) and its metadata.""" +from typing import Union + import ray from ray.util import get_node_ip_address import uuid @@ -285,15 +287,29 @@ def length(self): int The length of the object. """ + self.build_length_cache() + if isinstance(self._length_cache, ObjectIDType): + self._length_cache = ray.get(self._length_cache) + return self._length_cache + + def build_length_cache(self) -> Union[ray.ObjectRef, int]: + """ + Attempt to set this partition's length cache, and return it. + + The call queue is always drained synchronously when this method is called. + + Returns + ------- + ray.ObjectRef | int + Either a Ray object ID representing the length of the object wrapped by this partition, + or the concrete value of the length if it was already cached. + """ if self._length_cache is None: if len(self.call_queue): self.drain_call_queue() - else: - self._length_cache, self._width_cache = _get_index_and_columns.remote( - self._data - ) - if isinstance(self._length_cache, ObjectIDType): - self._length_cache = ray.get(self._length_cache) + self._length_cache, self._width_cache = _get_index_and_columns.remote( + self._data + ) return self._length_cache def width(self): @@ -305,15 +321,29 @@ def width(self): int The width of the object. """ + self.build_width_cache() + if isinstance(self._width_cache, ObjectIDType): + self._width_cache = ray.get(self._width_cache) + return self._width_cache + + def build_width_cache(self) -> Union[ray.ObjectRef, int]: + """ + Attempt to set this partition's width cache, and return it. + + The call queue is always drained synchronously when this method is called. + + Returns + ------- + ray.ObjectRef | int + Either a Ray object ID representing the width of the object wrapped by this partition, + or the concrete value of the width if it was already cached. + """ if self._width_cache is None: if len(self.call_queue): self.drain_call_queue() - else: - self._length_cache, self._width_cache = _get_index_and_columns.remote( - self._data - ) - if isinstance(self._width_cache, ObjectIDType): - self._width_cache = ray.get(self._width_cache) + self._length_cache, self._width_cache = _get_index_and_columns.remote( + self._data + ) return self._width_cache def ip(self): diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py index 881624e89ee..5e234189c81 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py @@ -16,9 +16,11 @@ import inspect import threading +import numpy as np import ray from modin.config import ProgressBar +from modin.core.execution.ray.common.utils import ObjectIDType from modin.core.execution.ray.generic.partitioning import ( GenericRayDataframePartitionManager, ) @@ -223,6 +225,42 @@ def map_axis_partitions( **kwargs, ) + @classmethod + def _update_partition_dimension_caches(cls, partitions: np.ndarray): + """ + Build and set the length and width caches of each _physical_ partition in the given list of partitions. + + Parameters + ---------- + partitions : np.ndarray + The partitions for which to update length caches. + """ + futures = [] + # If `part_idxs[i] = j`, that means `futures[i]` represents a computation corresponding + # to a dimension of `partitions[j]`. + part_idxs = [] + # If `is_lens[i] = True`, then `futures[i]` is a computation for a length; otherwise + # it is for a width + is_lens = [] + for i, part in enumerate(partitions): + l_cache = part.build_length_cache() + if isinstance(l_cache, ObjectIDType): + futures.append(l_cache) + part_idxs.append(i) + is_lens.append(True) + w_cache = part.build_width_cache() + if isinstance(w_cache, ObjectIDType): + futures.append(w_cache) + part_idxs.append(i) + is_lens.append(False) + new_dims = ray.get(futures) + for i, new_cache in enumerate(new_dims): + part = partitions[part_idxs[i]] + if is_lens[i]: + part.set_length_cache(new_cache) + else: + part.set_width_cache(new_cache) + @classmethod @progress_bar_wrapper def _apply_func_to_list_of_partitions(cls, func, partitions, **kwargs): diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index bf3e40f0148..54e62dfb700 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -20,7 +20,7 @@ from modin.core.dataframe.pandas.partitioning.axis_partition import ( PandasDataframeAxisPartition, ) -from modin.core.execution.ray.common.utils import deserialize +from modin.core.execution.ray.common.utils import deserialize, ObjectIDType from .partition import PandasOnRayDataframePartition @@ -388,6 +388,20 @@ def length(self): """ if self._length_cache is None: if self.axis == 0: + caches = [ + obj.build_length_cache() + for obj in self.list_of_partitions_to_combine + ] + new_lengths = ray.get( + [cache for cache in caches if isinstance(cache, ObjectIDType)] + ) + ray_idx = 0 + for i, cache in enumerate(caches): + if isinstance(cache, ObjectIDType): + self.list_of_partitions_to_combine[i].set_length_cache( + new_lengths[ray_idx] + ) + ray_idx += 1 self._length_cache = sum( obj.length() for obj in self.list_of_block_partitions ) @@ -408,6 +422,20 @@ def width(self): """ if self._width_cache is None: if self.axis == 1: + caches = [ + obj.build_width_cache() + for obj in self.list_of_partitions_to_combine + ] + new_widths = ray.get( + [cache for cache in caches if isinstance(cache, ObjectIDType)] + ) + ray_idx = 0 + for i, cache in enumerate(caches): + if isinstance(cache, ObjectIDType): + self.list_of_partitions_to_combine[i].set_width_cache( + new_widths[ray_idx] + ) + ray_idx += 1 self._width_cache = sum( obj.width() for obj in self.list_of_block_partitions )