1- import copy
21import json
32import pathlib
4- from distutils .version import LooseVersion
53from functools import reduce
64from glob import has_magic
75from numbers import Number
6+ from packaging .version import Version
87from typing import Any , Dict , Iterable , List , Optional , Tuple , Union
98
109import fsspec
3130)
3231
3332# improve pandas compatibility, based on geopandas _compat.py
34- PANDAS_GE_12 = str (pd .__version__ ) >= LooseVersion ("1.2.0" )
33+ PANDAS_GE_12 = Version (pd .__version__ ) >= Version ("1.2.0" )
3534
3635_geometry_dtypes = [
3736 PointDtype , MultiPointDtype , RingDtype , LineDtype ,
@@ -63,16 +62,18 @@ def _load_parquet_pandas_metadata(
6362 pqds = pq .ParquetDataset (
6463 path ,
6564 filesystem = filesystem ,
66- validate_schema = False ,
65+ #validate_schema=False,
66+ use_legacy_dataset = False ,
6767 ** engine_kwargs ,
6868 )
69- common_metadata = pqds .common_metadata
70- if common_metadata is None :
71- # Get metadata for first piece
72- piece = pqds .pieces [0 ]
73- metadata = piece .get_metadata ().metadata
74- else :
75- metadata = pqds .common_metadata .metadata
69+ filename = pathlib .Path (pqds .files [0 ]).parent .joinpath ("_common_metadata" )
70+ try :
71+ common_metadata = pq .read_metadata (filename )
72+ except FileNotFoundError :
73+ # Common metadata doesn't exist, so get metadata for first piece instead
74+ filename = pathlib .Path (pqds .files [0 ])
75+ common_metadata = pq .read_metadata (filename )
76+ metadata = common_metadata .metadata
7677 else :
7778 with filesystem .open (path ) as f :
7879 pf = pq .ParquetFile (f )
@@ -198,7 +199,7 @@ def to_parquet_dask(
198199 ** kwargs : Any ,
199200) -> None :
200201 engine_kwargs = engine_kwargs or {}
201-
202+
202203 if not isinstance (ddf , DaskGeoDataFrame ):
203204 raise TypeError (f"Expected DaskGeoDataFrame not { type (ddf )} " )
204205 filesystem = validate_coerce_filesystem (path , filesystem , storage_options )
@@ -207,48 +208,27 @@ def to_parquet_dask(
207208 path and filesystem .isdir (path ):
208209 filesystem .rm (path , recursive = True )
209210
210- dd_to_parquet (
211- ddf ,
212- path ,
213- engine = "pyarrow" ,
214- compression = compression ,
215- storage_options = storage_options ,
216- ** kwargs ,
217- )
218-
219- # Write partition bounding boxes to the _metadata file
211+ # Determine partition bounding boxes to save to _metadata file
220212 partition_bounds = {}
221213 for series_name in ddf .columns :
222214 series = ddf [series_name ]
223215 if isinstance (series .dtype , GeometryDtype ):
224- if series ._partition_bounds is None :
225- # Bounds are not already computed. Compute bounds from the parquet file
226- # that was just written.
227- filesystem .invalidate_cache (path )
228- series = read_parquet_dask (
229- path ,
230- columns = [series_name ],
231- filesystem = filesystem ,
232- load_divisions = False ,
233- storage_options = storage_options ,
234- )[series_name ]
235216 partition_bounds [series_name ] = series .partition_bounds .to_dict ()
236217
237218 spatial_metadata = {'partition_bounds' : partition_bounds }
238219 b_spatial_metadata = json .dumps (spatial_metadata ).encode ('utf' )
239220
240- pqds = pq .ParquetDataset (
221+ dd_to_parquet (
222+ ddf ,
241223 path ,
242- filesystem = filesystem ,
243- validate_schema = False ,
224+ engine = "pyarrow" ,
225+ compression = compression ,
226+ storage_options = storage_options ,
227+ custom_metadata = {b'spatialpandas' : b_spatial_metadata },
228+ write_metadata_file = True ,
244229 ** engine_kwargs ,
230+ ** kwargs ,
245231 )
246- all_metadata = copy .copy (pqds .common_metadata .metadata )
247- all_metadata [b'spatialpandas' ] = b_spatial_metadata
248- schema = pqds .common_metadata .schema .to_arrow_schema ()
249- new_schema = schema .with_metadata (all_metadata )
250- with filesystem .open (pqds .common_metadata_path , 'wb' ) as f :
251- pq .write_metadata (new_schema , f )
252232
253233
254234def read_parquet_dask (
@@ -293,7 +273,7 @@ def read_parquet_dask(
293273 build_sindex : boolean
294274 Whether to build partition level spatial indexes to speed up indexing.
295275 storage_options: Key/value pairs to be passed on to the file-system backend, if any.
296- engine_kwargs: pyarrow.parquet engine-related keyword arguments.
276+ engine_kwargs: pyarrow.parquet engine-related keyword arguments.
297277 Returns:
298278 DaskGeoDataFrame
299279 """
@@ -357,7 +337,8 @@ def _perform_read_parquet_dask(
357337 pa .parquet .ParquetDataset (
358338 path ,
359339 filesystem = filesystem ,
360- validate_schema = False ,
340+ #validate_schema=False,
341+ use_legacy_dataset = False ,
361342 ** engine_kwargs ,
362343 ) for path in paths
363344 ]
@@ -366,7 +347,7 @@ def _perform_read_parquet_dask(
366347 pieces = []
367348 for dataset in datasets :
368349 # Perform natural sort on pieces so that "part.10" comes after "part.2"
369- dataset_pieces = sorted (dataset .pieces , key = lambda piece : natural_sort_key (piece .path ))
350+ dataset_pieces = sorted (dataset .fragments , key = lambda piece : natural_sort_key (piece .path ))
370351 pieces .extend (dataset_pieces )
371352
372353 delayed_partitions = [
@@ -419,7 +400,7 @@ def _perform_read_parquet_dask(
419400 cols_no_index = None
420401
421402 meta = dd_read_parquet (
422- paths [0 ],
403+ datasets [ 0 ]. files [0 ],
423404 columns = cols_no_index ,
424405 filesystem = filesystem ,
425406 engine = 'pyarrow' ,
@@ -514,10 +495,15 @@ def _perform_read_parquet_dask(
514495
515496def _load_partition_bounds (pqds ):
516497 partition_bounds = None
517- if (pqds .common_metadata is not None and
518- b'spatialpandas' in pqds .common_metadata .metadata ):
498+ filename = pathlib .Path (pqds .files [0 ]).parent .joinpath ("_common_metadata" )
499+ try :
500+ common_metadata = pq .read_metadata (filename )
501+ except FileNotFoundError :
502+ common_metadata = None
503+
504+ if common_metadata is not None and b'spatialpandas' in common_metadata .metadata :
519505 spatial_metadata = json .loads (
520- pqds . common_metadata .metadata [b'spatialpandas' ].decode ('utf' )
506+ common_metadata .metadata [b'spatialpandas' ].decode ('utf' )
521507 )
522508 if "partition_bounds" in spatial_metadata :
523509 partition_bounds = {}
0 commit comments