Skip to content

Commit b50239a

Browse files
authored
Pass storage options, add some typing, minor clean up (#78)
* build sindex takes args, returns self, cleanup * Pass storage and engine options, add some typing * Pass kwargs to _load_parquet_pandas_metadata * Remove unused import in tests * Fix argument order for pack_partitions_to_parquet * Add overwrite option, minor clean up * Add __all__, minor cleanup
1 parent d5c079b commit b50239a

File tree

8 files changed

+206
-89
lines changed

8 files changed

+206
-89
lines changed

spatialpandas/__init__.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import param as _param
22

3-
from . import geometry, spatialindex, tools # noqa
4-
from .geodataframe import GeoDataFrame # noqa
5-
from .geoseries import GeoSeries # noqa
6-
from .tools.sjoin import sjoin # noqa
3+
from . import geometry, spatialindex, tools
4+
from .geodataframe import GeoDataFrame
5+
from .geoseries import GeoSeries
6+
from .tools.sjoin import sjoin
77

88
try:
99
import dask.dataframe # noqa
10+
1011
# Import to trigger registration of types with Dask
1112
import spatialpandas.dask # noqa
1213
except ImportError:
@@ -19,3 +20,12 @@
1920
archive_commit="$Format:%h$",
2021
reponame="spatialpandas",
2122
))
23+
24+
__all__ = [
25+
"GeoDataFrame",
26+
"GeoSeries",
27+
"geometry",
28+
"sjoin",
29+
"spatialindex",
30+
"tools",
31+
]

spatialpandas/dask.py

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,11 @@ def cx(self):
8383
def cx_partitions(self):
8484
return _DaskPartitionCoordinateIndexer(self, self.partition_sindex)
8585

86-
def build_sindex(self):
87-
def build_sindex(series):
88-
series.build_sindex()
86+
def build_sindex(self, **kwargs):
87+
def build_sindex(series, **kwargs):
88+
series.build_sindex(**kwargs)
8989
return series
90-
return self.map_partitions(build_sindex, meta=self._meta)
90+
return self.map_partitions(build_sindex, **kwargs, meta=self._meta)
9191

9292
def intersects_bounds(self, bounds):
9393
return self.map_partitions(lambda s: s.intersects_bounds(bounds))
@@ -200,8 +200,17 @@ def pack_partitions(self, npartitions=None, p=15, shuffle='tasks'):
200200
return ddf
201201

202202
def pack_partitions_to_parquet(
203-
self, path, filesystem=None, npartitions=None, p=15, compression="snappy",
204-
tempdir_format=None, _retry_args=None
203+
self,
204+
path,
205+
filesystem=None,
206+
npartitions=None,
207+
p=15,
208+
compression="snappy",
209+
tempdir_format=None,
210+
_retry_args=None,
211+
storage_options=None,
212+
engine_kwargs=None,
213+
overwrite=False,
205214
):
206215
"""
207216
Repartition and reorder dataframe spatially along a Hilbert space filling curve
@@ -235,14 +244,14 @@ def pack_partitions_to_parquet(
235244
from .io.utils import validate_coerce_filesystem
236245

237246
# Get fsspec filesystem object
238-
filesystem = validate_coerce_filesystem(path, filesystem)
247+
filesystem = validate_coerce_filesystem(path, filesystem, storage_options)
239248

240249
# Decorator for operations that should be retried
241250
if _retry_args is None:
242251
_retry_args = dict(
243252
wait_exponential_multiplier=100,
244253
wait_exponential_max=120000,
245-
stop_max_attempt_number=24
254+
stop_max_attempt_number=24,
246255
)
247256
retryit = retry(**_retry_args)
248257

@@ -320,7 +329,8 @@ def move_retry(p1, p2):
320329

321330
# Initialize output partition directory structure
322331
filesystem.invalidate_cache()
323-
rm_retry(path)
332+
if overwrite:
333+
rm_retry(path)
324334

325335
for out_partition in out_partitions:
326336
part_dir = os.path.join(path, "part.%d.parquet" % out_partition)
@@ -332,17 +342,24 @@ def move_retry(p1, p2):
332342
@retryit
333343
def write_partition(df_part, part_path):
334344
with filesystem.open(part_path, "wb") as f:
335-
df_part.to_parquet(f, compression=compression, index=True)
345+
df_part.to_parquet(
346+
f,
347+
compression=compression,
348+
index=True,
349+
**(engine_kwargs or {}),
350+
)
336351

337352
def process_partition(df, i):
338353
subpart_paths = {}
339354
for out_partition, df_part in df.groupby('_partition'):
340355
part_path = os.path.join(
341356
tempdir_format.format(partition=out_partition, uuid=dataset_uuid),
342-
'part.%d.parquet' % i
357+
'part.%d.parquet' % i,
343358
)
344-
df_part = df_part.drop('_partition', axis=1).set_index(
345-
'hilbert_distance', drop=True
359+
df_part = (
360+
df_part
361+
.drop('_partition', axis=1)
362+
.set_index('hilbert_distance', drop=True)
346363
)
347364
write_partition(df_part, part_path)
348365
subpart_paths[out_partition] = part_path
@@ -528,11 +545,11 @@ def _propagate_props_to_series(self, new_series):
528545
new_series._partition_sindex = self._partition_sindex[new_series.name]
529546
return new_series
530547

531-
def build_sindex(self):
532-
def build_sindex(df):
533-
df.build_sindex()
548+
def build_sindex(self, **kwargs):
549+
def build_sindex(df, **kwargs):
550+
df.build_sindex(**kwargs)
534551
return df
535-
return self.map_partitions(build_sindex, meta=self._meta)
552+
return self.map_partitions(build_sindex, **kwargs, meta=self._meta)
536553

537554
def persist(self, **kwargs):
538555
return self._propagate_props_to_dataframe(

spatialpandas/geodataframe.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,9 @@ def cx(self):
113113
self.geometry.array, parent=self
114114
)
115115

116-
def build_sindex(self):
117-
self.geometry.build_sindex()
116+
def build_sindex(self, **kwargs):
117+
self.geometry.build_sindex(**kwargs)
118+
return self
118119

119120
def _ensure_type(self, obj):
120121
# Override because a GeoDataFrame operation may result in a regular DataFrame,

spatialpandas/geometry/base.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -555,9 +555,10 @@ def sindex(self):
555555
self.build_sindex()
556556
return self._sindex
557557

558-
def build_sindex(self):
558+
def build_sindex(self, **kwargs):
559559
if self._sindex is None:
560-
self._sindex = HilbertRtree(self.bounds)
560+
self._sindex = HilbertRtree(self.bounds, **kwargs)
561+
return self
561562

562563
@property
563564
def cx(self):

spatialpandas/geoseries.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,9 @@ def cx(self):
7676
from .geometry.base import _CoordinateIndexer
7777
return _CoordinateIndexer(self.array, parent=self)
7878

79-
def build_sindex(self):
80-
self.array.build_sindex()
79+
def build_sindex(self, **kwargs):
80+
self.array.build_sindex(**kwargs)
81+
return self
8182

8283
def intersects_bounds(self, bounds):
8384
return pd.Series(

0 commit comments

Comments
 (0)