diff --git a/services/worker/src/worker/job_runners/config/parquet_and_info.py b/services/worker/src/worker/job_runners/config/parquet_and_info.py index 48f3673d9..5046fcc21 100644 --- a/services/worker/src/worker/job_runners/config/parquet_and_info.py +++ b/services/worker/src/worker/job_runners/config/parquet_and_info.py @@ -19,7 +19,6 @@ import datasets.exceptions import datasets.info import fsspec -import numpy as np import pyarrow as pa import pyarrow.parquet as pq from datasets import DownloadConfig, Features, load_dataset_builder @@ -32,10 +31,8 @@ from datasets.utils.file_utils import ( ArchiveIterable, FilesIterable, - is_relative_path, - url_or_path_join, ) -from datasets.utils.py_utils import asdict, map_nested +from datasets.utils.py_utils import asdict from fsspec.core import filesystem, url_to_fs from fsspec.implementations.http import HTTPFileSystem from fsspec.implementations.local import LocalFileOpener, LocalFileSystem @@ -310,34 +307,6 @@ def _fsspec_request_size(urlpath: str, storage_options: dict[str, Any]) -> Optio return None -class _MockStreamingDownloadManager(StreamingDownloadManager): # type: ignore - def __init__(self, *args: Any, **kwargs: Any) -> None: - super().__init__(*args, **kwargs) - self.ext_data_files: list[str] = [] - - def download(self, url_or_urls: Any) -> Any: - url_or_urls = map_nested( - self._download, - url_or_urls, - map_tuple=True, - parallel_min_length=np.inf, - # ^ parallel_min_length has int type, but is currently used in datasets for a comparison only - # and it works with np.inf. No conversion is involved - # (would raise: OverflowError: cannot convert float infinity to integer) - ) - return url_or_urls - - def _download(self, urlpath: Any) -> str: - urlpath_str = str(urlpath) - if is_relative_path(urlpath_str): - # append the relative path to the base_path - urlpath_str = url_or_path_join(self._base_path, urlpath_str) - elif not urlpath_str.startswith(self._base_path): - # it's an external file - self.ext_data_files.append(urlpath_str) - return urlpath_str - - def get_writer_batch_size_from_info(ds_config_info: datasets.info.DatasetInfo) -> Optional[int]: """ Get the writer_batch_size that defines the maximum row group size in the parquet files.