Skip to content

Commit db730b6

Browse files
iameskildbrl0jbednar
authored
Add ability to pass storage_options to pandas.to_parquet. Update docs/CHANGELOG (#79)
* build sindex takes args, returns self, cleanup * Pass storage and engine options, add some typing * Pass kwargs to _load_parquet_pandas_metadata * Remove unused import in tests * Fix argument order for pack_partitions_to_parquet * Pass storage_options to read_parquet_dask * Update docstrings for pack_partitions_to_parquet * Update docstrings for validate_coerce_filesystem * Pass storage_options to read_parquet_dask * Update docstrings for read_parquet_dask * Update CHANGELOG * Add overwrite option, minor clean up * Update CHANGELOG * Implement version dependent pandas to_parquet function * Update CHANGELOG * Pass storage_options to read_parquet_dask * Add pd_to_parquet to __init__ * Upate import statement * Move import statement * Minor typing change * Add __all__, minor cleanup * Update to_parquet function call * Update CHANGELOG * Add engine_kwargs to pd.to_parquet call Co-authored-by: Brian Larsen <B_R_L@hotmail.com> * Add engine_kwargs to dask.to_parquet Co-authored-by: Brian Larsen <B_R_L@hotmail.com> * Add storage_options to read_parquet Co-authored-by: Brian Larsen <B_R_L@hotmail.com> * Add engine_kwargs to read_parquet Co-authored-by: Brian Larsen <B_R_L@hotmail.com> * Modify engine_kwargs to pack_partitions_to_parquet * Update CHANGELOG.md Co-authored-by: James A. Bednar <jbednar@users.noreply.github.com> * Update CHANGELOG.md Co-authored-by: James A. Bednar <jbednar@users.noreply.github.com> * Fix missing final newline Co-authored-by: James A. Bednar <jbednar@users.noreply.github.com> * Modify how fs is handled in to_parquet * Remove extra whitespace * Fix return statement indent * Remove extra whitespace Co-authored-by: Brian Larsen <B_R_L@hotmail.com> * Merge holoviz/spatialpandas master into fix/dask_parquet * Update engine_kwargs formatting * Move PANDAS_GE_12 into io/parquet.py Co-authored-by: Brian Larsen <B_R_L@hotmail.com> Co-authored-by: James A. Bednar <jbednar@users.noreply.github.com>
1 parent b50239a commit db730b6

File tree

4 files changed

+73
-16
lines changed

4 files changed

+73
-16
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
## Version 0.4.3
2+
3+
Date: 2021-08-05
4+
5+
This release primarily expands the optional arguments that can be passed to `to_parquet_dask`/`read_parquet_dask` ensuring that `storage_options` is successfully passed where needed. It also adds the ability to pass `storage_options` to the `pandas.to_parquet` function (only for pandas > 1.2) and renames any reference to `fname` with `path` to align with the pandas convention.
6+
7+
Bug fixes:
8+
- Update `validate_coerce_filesystem` to pass `storage_options` through. ([#78](https://github.com/holoviz/spatialpandas/pull/78))
9+
10+
111
## Version 0.4.2
212

313
This release primarily achieves compatibility with recent releases of Pandas. Many thanks to @Hoxbro for contributing the fixes and @philippjfr for ongoing maintenance of the project.

spatialpandas/dask.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -237,12 +237,16 @@ def pack_partitions_to_parquet(
237237
238238
These directories are deleted as soon as possible during the execution
239239
of the function.
240+
storage_options: Key/value pairs to be passed on to the file-system backend, if any.
241+
engine_kwargs: pyarrow.parquet engine-related keyword arguments.
240242
Returns:
241243
DaskGeoDataFrame backed by newly written parquet dataset
242244
"""
243245
from .io import read_parquet, read_parquet_dask
244246
from .io.utils import validate_coerce_filesystem
245247

248+
engine_kwargs = engine_kwargs or {}
249+
246250
# Get fsspec filesystem object
247251
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)
248252

@@ -346,9 +350,10 @@ def write_partition(df_part, part_path):
346350
f,
347351
compression=compression,
348352
index=True,
349-
**(engine_kwargs or {}),
353+
**engine_kwargs,
350354
)
351355

356+
352357
def process_partition(df, i):
353358
subpart_paths = {}
354359
for out_partition, df_part in df.groupby('_partition'):
@@ -394,7 +399,12 @@ def read_parquet_retry(parts_tmp_path, subpart_paths, part_output_path):
394399
# Handle rare case where the task was resubmitted and the work has
395400
# already been done. This shouldn't happen with pure=False, but it
396401
# seems like it does very rarely.
397-
return read_parquet(part_output_path, filesystem=filesystem)
402+
return read_parquet(
403+
part_output_path,
404+
filesystem=filesystem,
405+
storage_options=storage_options,
406+
**engine_kwargs,
407+
)
398408

399409
ls_res = sorted(filesystem.ls(parts_tmp_path, **ls_kwargs))
400410
subpart_paths_stripped = sorted([filesystem._strip_protocol(_) for _ in subpart_paths])
@@ -414,7 +424,12 @@ def read_parquet_retry(parts_tmp_path, subpart_paths, part_output_path):
414424
extras=list(extras)
415425
)
416426
)
417-
return read_parquet(parts_tmp_path, filesystem=filesystem)
427+
return read_parquet(
428+
parts_tmp_path,
429+
filesystem=filesystem,
430+
storage_options=storage_options,
431+
**engine_kwargs,
432+
)
418433

419434
def concat_parts(parts_tmp_path, subpart_paths, part_output_path):
420435
filesystem.invalidate_cache()
@@ -512,7 +527,12 @@ def write_commonmetadata_file():
512527
pq.write_metadata(new_schema, f)
513528
write_commonmetadata_file()
514529

515-
return read_parquet_dask(path, filesystem=filesystem)
530+
return read_parquet_dask(
531+
path,
532+
filesystem=filesystem,
533+
storage_options=storage_options,
534+
engine_kwargs=engine_kwargs,
535+
)
516536

517537
def _compute_packing_npartitions(self, npartitions):
518538
if npartitions is None:

spatialpandas/io/parquet.py

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import copy
22
import json
33
import pathlib
4+
from distutils.version import LooseVersion
45
from functools import reduce
56
from glob import has_magic
67
from numbers import Number
@@ -29,6 +30,9 @@
2930
validate_coerce_filesystem,
3031
)
3132

33+
# improve pandas compatibility, based on geopandas _compat.py
34+
PANDAS_GE_12 = str(pd.__version__) >= LooseVersion("1.2.0")
35+
3236
_geometry_dtypes = [
3337
PointDtype, MultiPointDtype, RingDtype, LineDtype,
3438
MultiLineDtype, PolygonDtype, MultiPolygonDtype
@@ -50,6 +54,7 @@ def _load_parquet_pandas_metadata(
5054
storage_options=None,
5155
engine_kwargs=None,
5256
):
57+
engine_kwargs = engine_kwargs or {}
5358
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)
5459
if not filesystem.exists(path):
5560
raise ValueError("Path not found: " + path)
@@ -59,7 +64,7 @@ def _load_parquet_pandas_metadata(
5964
path,
6065
filesystem=filesystem,
6166
validate_schema=False,
62-
**(engine_kwargs or {}),
67+
**engine_kwargs,
6368
)
6469
common_metadata = pqds.common_metadata
6570
if common_metadata is None:
@@ -98,20 +103,35 @@ def _get_geometry_columns(pandas_metadata):
98103

99104
def to_parquet(
100105
df: GeoDataFrame,
101-
fname: PathType,
106+
path: PathType,
102107
compression: Optional[str] = "snappy",
108+
filesystem: Optional[fsspec.AbstractFileSystem] = None,
103109
index: Optional[bool] = None,
110+
storage_options: Optional[Dict[str, Any]] = None,
104111
**kwargs: Any,
105112
) -> None:
113+
if filesystem is not None:
114+
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)
115+
106116
# Standard pandas to_parquet with pyarrow engine
107-
pd_to_parquet(
108-
df,
109-
fname,
110-
engine="pyarrow",
111-
compression=compression,
112-
index=index,
117+
to_parquet_args = {
118+
"df": df,
119+
"path": path,
120+
"engine": "pyarrow",
121+
"compression": compression,
122+
"filesystem": filesystem,
123+
"index": index,
113124
**kwargs,
114-
)
125+
}
126+
127+
if PANDAS_GE_12:
128+
to_parquet_args.update({"storage_options": storage_options})
129+
else:
130+
if filesystem is None:
131+
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)
132+
to_parquet_args.update({"filesystem": filesystem})
133+
134+
pd_to_parquet(**to_parquet_args)
115135

116136

117137
def read_parquet(
@@ -122,6 +142,7 @@ def read_parquet(
122142
engine_kwargs: Optional[Dict[str, Any]] = None,
123143
**kwargs: Any,
124144
) -> GeoDataFrame:
145+
engine_kwargs = engine_kwargs or {}
125146
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)
126147

127148
# Load pandas parquet metadata
@@ -154,7 +175,7 @@ def read_parquet(
154175
path,
155176
filesystem=filesystem,
156177
validate_schema=False,
157-
**(engine_kwargs or {}),
178+
**engine_kwargs,
158179
**kwargs,
159180
).read(columns=columns).to_pandas()
160181

@@ -176,6 +197,8 @@ def to_parquet_dask(
176197
engine_kwargs: Optional[Dict[str, Any]] = None,
177198
**kwargs: Any,
178199
) -> None:
200+
engine_kwargs = engine_kwargs or {}
201+
179202
if not isinstance(ddf, DaskGeoDataFrame):
180203
raise TypeError(f"Expected DaskGeoDataFrame not {type(ddf)}")
181204
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)
@@ -207,6 +230,7 @@ def to_parquet_dask(
207230
columns=[series_name],
208231
filesystem=filesystem,
209232
load_divisions=False,
233+
storage_options=storage_options,
210234
)[series_name]
211235
partition_bounds[series_name] = series.partition_bounds.to_dict()
212236

@@ -217,7 +241,7 @@ def to_parquet_dask(
217241
path,
218242
filesystem=filesystem,
219243
validate_schema=False,
220-
**(engine_kwargs or {}),
244+
**engine_kwargs,
221245
)
222246
all_metadata = copy.copy(pqds.common_metadata.metadata)
223247
all_metadata[b'spatialpandas'] = b_spatial_metadata
@@ -268,6 +292,8 @@ def read_parquet_dask(
268292
data written by dask/fastparquet, not otherwise.
269293
build_sindex : boolean
270294
Whether to build partition level spatial indexes to speed up indexing.
295+
storage_options: Key/value pairs to be passed on to the file-system backend, if any.
296+
engine_kwargs: pyarrow.parquet engine-related keyword arguments.
271297
Returns:
272298
DaskGeoDataFrame
273299
"""
@@ -321,12 +347,12 @@ def _perform_read_parquet_dask(
321347
storage_options=None,
322348
engine_kwargs=None,
323349
):
350+
engine_kwargs = engine_kwargs or {}
324351
filesystem = validate_coerce_filesystem(
325352
paths[0],
326353
filesystem,
327354
storage_options,
328355
)
329-
engine_kwargs = engine_kwargs or {}
330356
datasets = [
331357
pa.parquet.ParquetDataset(
332358
path,

spatialpandas/io/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def validate_coerce_filesystem(
2020
path: Path as a string
2121
filesystem: Optional fsspec filesystem object to use to open the file. If not
2222
provided, filesystem type is inferred from path
23+
storage_options: Key/value pairs to be passed on to the file-system backend, if any.
2324
2425
Returns:
2526
fsspec file system

0 commit comments

Comments
 (0)