Skip to content

Commit a1ea75e

Browse files
committed
Bumping version to 0.0.21
1 parent d587118 commit a1ea75e

File tree

6 files changed

+24
-30
lines changed

6 files changed

+24
-30
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
> Utility belt to handle data on AWS.
44
5-
[![Release](https://img.shields.io/badge/release-0.0.20-brightgreen.svg)](https://pypi.org/project/awswrangler/)
5+
[![Release](https://img.shields.io/badge/release-0.0.21-brightgreen.svg)](https://pypi.org/project/awswrangler/)
66
[![Downloads](https://img.shields.io/pypi/dm/awswrangler.svg)](https://pypi.org/project/awswrangler/)
77
[![Python Version](https://img.shields.io/badge/python-3.6%20%7C%203.7-brightgreen.svg)](https://pypi.org/project/awswrangler/)
88
[![Documentation Status](https://readthedocs.org/projects/aws-data-wrangler/badge/?version=latest)](https://aws-data-wrangler.readthedocs.io/en/latest/?badge=latest)

awswrangler/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
__title__ = "awswrangler"
22
__description__ = "Utility belt to handle data on AWS."
3-
__version__ = "0.0.20"
3+
__version__ = "0.0.21"
44
__license__ = "Apache License 2.0"

awswrangler/s3.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ def parse_path(path):
7070
def parse_object_path(path):
7171
return path.replace("s3://", "").split("/", 1)
7272

73-
def delete_objects(self, path):
73+
def delete_objects(self, path: str, procs_io_bound: Optional[int] = None) -> None:
74+
if not procs_io_bound:
75+
procs_io_bound = self._session.procs_io_bound
7476
bucket, path = self.parse_path(path=path)
7577
client = self._session.boto3_session.client(service_name="s3", config=self._session.botocore_config)
7678
procs = []
@@ -93,7 +95,7 @@ def delete_objects(self, path):
9395
proc.daemon = False
9496
proc.start()
9597
procs.append(proc)
96-
if len(procs) == self._session.procs_io_bound:
98+
if len(procs) == procs_io_bound:
9799
wait_process_release(procs)
98100
else:
99101
logger.debug(f"Starting last delete call...")

awswrangler/spark.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import List, Tuple, Dict, Any
1+
from typing import List, Tuple, Dict, Any, Optional
22
import logging
33
import os
44

@@ -18,11 +18,7 @@
1818
class Spark:
1919
def __init__(self, session):
2020
self._session = session
21-
cpus: int = os.cpu_count()
22-
if cpus == 1:
23-
self._procs_io_bound: int = 1
24-
else:
25-
self._procs_io_bound = int(cpus / 2)
21+
self._procs_io_bound: int = 1
2622
logging.info(f"_procs_io_bound: {self._procs_io_bound}")
2723

2824
def read_csv(self, **args) -> DataFrame:
@@ -61,9 +57,9 @@ def to_redshift(
6157
table: str,
6258
iam_role: str,
6359
diststyle: str = "AUTO",
64-
distkey=None,
60+
distkey: Optional[str] = None,
6561
sortstyle: str = "COMPOUND",
66-
sortkey=None,
62+
sortkey: Optional[str] = None,
6763
min_num_partitions: int = 200,
6864
mode: str = "append",
6965
) -> None:
@@ -87,7 +83,7 @@ def to_redshift(
8783
logger.debug(f"Minimum number of partitions : {min_num_partitions}")
8884
if path[-1] != "/":
8985
path += "/"
90-
self._session.s3.delete_objects(path=path)
86+
self._session.s3.delete_objects(path=path, procs_io_bound=self._procs_io_bound)
9187
spark: SparkSession = self._session.spark_session
9288
casts: Dict[str, str] = Spark._extract_casts(dataframe.dtypes)
9389
dataframe = Spark.date2timestamp(dataframe)
@@ -125,9 +121,9 @@ def write(pandas_dataframe: pd.DataFrame) -> pd.DataFrame:
125121
cast_columns=casts)
126122
return pd.DataFrame.from_dict({"objects_paths": paths})
127123

128-
df_objects_paths = dataframe.repartition(numPartitions=num_partitions) # type: ignore
129-
df_objects_paths = df_objects_paths.withColumn(par_col_name, spark_partition_id()) # type: ignore
130-
df_objects_paths = df_objects_paths.groupby(par_col_name).apply(write) # type: ignore
124+
df_objects_paths: DataFrame = dataframe.repartition(numPartitions=num_partitions) # type: ignore
125+
df_objects_paths: DataFrame = df_objects_paths.withColumn(par_col_name, spark_partition_id()) # type: ignore
126+
df_objects_paths: DataFrame = df_objects_paths.groupby(par_col_name).apply(write) # type: ignore
131127

132128
objects_paths: List[str] = list(df_objects_paths.toPandas()["objects_paths"])
133129
dataframe.unpersist()
@@ -155,7 +151,7 @@ def write(pandas_dataframe: pd.DataFrame) -> pd.DataFrame:
155151
sortkey=sortkey,
156152
mode=mode,
157153
cast_columns=casts)
158-
self._session.s3.delete_objects(path=path)
154+
self._session.s3.delete_objects(path=path, procs_io_bound=self._procs_io_bound)
159155

160156
def create_glue_table(self,
161157
database,

building/publish.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ set -e
33

44
cd ..
55
rm -fr build dist .egg awswrangler.egg-info
6-
python setup.py sdist
6+
python3.6 setup.py bdist_egg
7+
python3.6 setup.py bdist_wheel
8+
python3.6 setup.py sdist
79
twine upload dist/*
810
rm -fr build dist .egg awswrangler.egg-info

testing/test_awswrangler/test_redshift.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -281,12 +281,7 @@ def test_to_redshift_spark_big(session, bucket, redshift_parameters):
281281

282282

283283
def test_to_redshift_spark_bool(session, bucket, redshift_parameters):
284-
dataframe = session.spark_session.createDataFrame(
285-
pd.DataFrame({
286-
"A": [1, 2, 3],
287-
"B": [True, False, True]
288-
})
289-
)
284+
dataframe = session.spark_session.createDataFrame(pd.DataFrame({"A": [1, 2, 3], "B": [True, False, True]}))
290285
print(dataframe)
291286
print(dataframe.dtypes)
292287
con = Redshift.generate_connection(
@@ -318,12 +313,11 @@ def test_to_redshift_spark_bool(session, bucket, redshift_parameters):
318313

319314

320315
def test_stress_to_redshift_spark_big(session, bucket, redshift_parameters):
321-
dataframe = session.spark_session.createDataFrame(
322-
pd.DataFrame({
323-
"A": list(range(1_000_000)),
324-
"B": list(range(1_000_000)),
325-
"C": list(range(1_000_000))
326-
}))
316+
print("Creating DataFrame...")
317+
dataframe = session.spark_session.createDataFrame(pd.DataFrame({
318+
"A": list(range(10_000)),
319+
"B": list(range(10_000))
320+
}))
327321
dataframe.cache()
328322
for i in range(10):
329323
print(f"Run number: {i}")

0 commit comments

Comments
 (0)