Skip to content

Commit eb06a51

Browse files
committed
Improves Redshift and Aurora load
1 parent 3cab156 commit eb06a51

File tree

4 files changed

+62
-62
lines changed

4 files changed

+62
-62
lines changed

awswrangler/aurora.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -218,35 +218,35 @@ def load_table_postgres(dataframe: pd.DataFrame,
218218
connection.commit()
219219
logger.debug("CREATE TABLE committed.")
220220
for path in load_paths:
221-
Aurora._load_object_postgres_with_retry(connection=connection,
222-
schema_name=schema_name,
223-
table_name=table_name,
224-
path=path,
225-
region=region)
221+
sql = Aurora._get_load_sql(path=path,
222+
schema_name=schema_name,
223+
table_name=table_name,
224+
engine="postgres",
225+
region=region)
226+
Aurora._load_object_postgres_with_retry(connection=connection, sql=sql)
227+
logger.debug(f"Load committed for: {path}.")
226228

227229
@staticmethod
228230
@tenacity.retry(retry=tenacity.retry_if_exception_type(exception_types=ProgrammingError),
229231
wait=tenacity.wait_random_exponential(multiplier=0.5),
230232
stop=tenacity.stop_after_attempt(max_attempt_number=5),
231233
reraise=True,
232234
after=tenacity.after_log(logger, INFO))
233-
def _load_object_postgres_with_retry(connection: Any, schema_name: str, table_name: str, path: str,
234-
region: str) -> None:
235+
def _load_object_postgres_with_retry(connection: Any, sql: str) -> None:
236+
logger.debug(sql)
235237
with connection.cursor() as cursor:
236-
sql = Aurora._get_load_sql(path=path,
237-
schema_name=schema_name,
238-
table_name=table_name,
239-
engine="postgres",
240-
region=region)
241-
logger.debug(sql)
242238
try:
243239
cursor.execute(sql)
244240
except ProgrammingError as ex:
241+
logger.debug(f"Exception: {ex}")
242+
connection.rollback()
245243
if "The file has been modified" in str(ex):
246-
connection.rollback()
247244
raise ex
248-
connection.commit()
249-
logger.debug(f"Load committed for: {path}.")
245+
elif "0 rows were copied successfully" in str(ex):
246+
raise ex
247+
else:
248+
raise AuroraLoadError(str(ex))
249+
connection.commit()
250250

251251
@staticmethod
252252
def load_table_mysql(dataframe: pd.DataFrame,

awswrangler/pandas.py

Lines changed: 41 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -664,12 +664,14 @@ def _read_sql_athena_regular(self,
664664
dtype, parse_timestamps, parse_dates, converters = self._get_query_dtype(
665665
query_execution_id=query_execution_id)
666666
path = f"{s3_output}{query_execution_id}.csv"
667+
logger.debug("Start reading...")
667668
ret = self.read_csv(path=path,
668669
dtype=dtype,
669670
parse_dates=parse_timestamps,
670671
converters=converters,
671672
quoting=csv.QUOTE_ALL,
672673
max_result_size=max_result_size)
674+
logger.debug("Start type casting...")
673675
if max_result_size is None:
674676
if len(ret.index) > 0:
675677
for col in parse_dates:
@@ -1129,7 +1131,6 @@ def write_csv_dataframe(dataframe, path, preserve_index, compression, fs, extra_
11291131
elif serde == "LazySimpleSerDe":
11301132
csv_extra_args["quoting"] = csv.QUOTE_NONE
11311133
csv_extra_args["escapechar"] = "\\"
1132-
logger.debug(f"csv_extra_args: {csv_extra_args}")
11331134
csv_buffer: bytes = bytes(
11341135
dataframe.to_csv(None, header=False, index=preserve_index, compression=compression, **csv_extra_args),
11351136
"utf-8")
@@ -1360,19 +1361,19 @@ def read_parquet(self,
13601361
"""
13611362
procs_cpu_bound = procs_cpu_bound if procs_cpu_bound is not None else self._session.procs_cpu_bound if self._session.procs_cpu_bound is not None else 1
13621363
logger.debug(f"procs_cpu_bound: {procs_cpu_bound}")
1363-
df: Optional[pd.DataFrame] = None
1364+
dfs: List[pd.DataFrame] = []
13641365
session_primitives = self._session.primitives
13651366
path = [path] if type(path) == str else path # type: ignore
13661367
bounders = calculate_bounders(len(path), procs_cpu_bound)
13671368
logger.debug(f"len(bounders): {len(bounders)}")
13681369
if len(bounders) == 1:
1369-
df = Pandas._read_parquet_paths(session_primitives=session_primitives,
1370-
path=path,
1371-
columns=columns,
1372-
filters=filters,
1373-
procs_cpu_bound=procs_cpu_bound,
1374-
wait_objects=wait_objects,
1375-
wait_objects_timeout=wait_objects_timeout)
1370+
dfs = Pandas._read_parquet_paths(session_primitives=session_primitives,
1371+
path=path,
1372+
columns=columns,
1373+
filters=filters,
1374+
procs_cpu_bound=procs_cpu_bound,
1375+
wait_objects=wait_objects,
1376+
wait_objects_timeout=wait_objects_timeout)
13761377
else:
13771378
procs = []
13781379
receive_pipes = []
@@ -1398,15 +1399,16 @@ def read_parquet(self,
13981399
logger.debug(f"len(procs): {len(bounders)}")
13991400
for i in range(len(procs)):
14001401
logger.debug(f"Waiting pipe number: {i}")
1401-
df_received = receive_pipes[i].recv()
1402-
if df is None:
1403-
df = df_received
1404-
else:
1405-
df = pd.concat(objs=[df, df_received], ignore_index=True)
1402+
dfs_received: List[pd.DataFrame] = receive_pipes[i].recv()
1403+
dfs = dfs_received + dfs
14061404
logger.debug(f"Waiting proc number: {i}")
14071405
procs[i].join()
14081406
logger.debug(f"Closing proc number: {i}")
14091407
receive_pipes[i].close()
1408+
if len(dfs) == 1:
1409+
df: pd.DataFrame = dfs[0]
1410+
else:
1411+
df = pd.concat(objs=dfs, ignore_index=True)
14101412
return df
14111413

14121414
@staticmethod
@@ -1418,14 +1420,14 @@ def _read_parquet_paths_remote(send_pipe: mp.connection.Connection,
14181420
procs_cpu_bound: Optional[int] = None,
14191421
wait_objects: bool = False,
14201422
wait_objects_timeout: Optional[float] = 10.0):
1421-
df: pd.DataFrame = Pandas._read_parquet_paths(session_primitives=session_primitives,
1422-
path=path,
1423-
columns=columns,
1424-
filters=filters,
1425-
procs_cpu_bound=procs_cpu_bound,
1426-
wait_objects=wait_objects,
1427-
wait_objects_timeout=wait_objects_timeout)
1428-
send_pipe.send(df)
1423+
dfs: List[pd.DataFrame] = Pandas._read_parquet_paths(session_primitives=session_primitives,
1424+
path=path,
1425+
columns=columns,
1426+
filters=filters,
1427+
procs_cpu_bound=procs_cpu_bound,
1428+
wait_objects=wait_objects,
1429+
wait_objects_timeout=wait_objects_timeout)
1430+
send_pipe.send(dfs)
14291431
send_pipe.close()
14301432

14311433
@staticmethod
@@ -1435,7 +1437,7 @@ def _read_parquet_paths(session_primitives: "SessionPrimitives",
14351437
filters: Optional[Union[List[Tuple[Any]], List[List[Tuple[Any]]]]] = None,
14361438
procs_cpu_bound: Optional[int] = None,
14371439
wait_objects: bool = False,
1438-
wait_objects_timeout: Optional[float] = 10.0) -> pd.DataFrame:
1440+
wait_objects_timeout: Optional[float] = 10.0) -> List[pd.DataFrame]:
14391441
"""
14401442
Read parquet data from S3
14411443
@@ -1459,24 +1461,19 @@ def _read_parquet_paths(session_primitives: "SessionPrimitives",
14591461
procs_cpu_bound=procs_cpu_bound,
14601462
wait_objects=wait_objects,
14611463
wait_objects_timeout=wait_objects_timeout)
1464+
return [df]
14621465
else:
1463-
df = Pandas._read_parquet_path(session_primitives=session_primitives,
1464-
path=path[0],
1465-
columns=columns,
1466-
filters=filters,
1467-
procs_cpu_bound=procs_cpu_bound,
1468-
wait_objects=wait_objects,
1469-
wait_objects_timeout=wait_objects_timeout)
1470-
for p in path[1:]:
1471-
df_aux = Pandas._read_parquet_path(session_primitives=session_primitives,
1472-
path=p,
1473-
columns=columns,
1474-
filters=filters,
1475-
procs_cpu_bound=procs_cpu_bound,
1476-
wait_objects=wait_objects,
1477-
wait_objects_timeout=wait_objects_timeout)
1478-
df = pd.concat(objs=[df, df_aux], ignore_index=True)
1479-
return df
1466+
dfs: List[pd.DataFrame] = []
1467+
for p in path:
1468+
df = Pandas._read_parquet_path(session_primitives=session_primitives,
1469+
path=p,
1470+
columns=columns,
1471+
filters=filters,
1472+
procs_cpu_bound=procs_cpu_bound,
1473+
wait_objects=wait_objects,
1474+
wait_objects_timeout=wait_objects_timeout)
1475+
dfs.append(df)
1476+
return dfs
14801477

14811478
@staticmethod
14821479
def _read_parquet_path(session_primitives: "SessionPrimitives",
@@ -1851,17 +1848,17 @@ def read_csv_list(
18511848
procs.append(proc)
18521849
receive_pipes.append(receive_pipe)
18531850
utils.wait_process_release(processes=procs, target_number=procs_cpu_bound)
1851+
dfs: List[pd.DataFrame] = []
18541852
for i in range(len(procs)):
18551853
logger.debug(f"Waiting pipe number: {i}")
18561854
df_received = receive_pipes[i].recv()
1857-
if df is None:
1858-
df = df_received
1859-
else:
1860-
df = pd.concat(objs=[df, df_received], ignore_index=True)
1855+
dfs.append(df_received)
18611856
logger.debug(f"Waiting proc number: {i}")
18621857
procs[i].join()
18631858
logger.debug(f"Closing proc number: {i}")
18641859
receive_pipes[i].close()
1860+
logger.debug(f"Concatenating all {len(paths)} DataFrames...")
1861+
df = pd.concat(objs=dfs, ignore_index=True)
18651862
return df
18661863

18671864
def _read_csv_list_iterator(

awswrangler/redshift.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,10 @@ def load_table(dataframe,
226226
cursor.execute("-- AWS DATA WRANGLER\n SELECT pg_last_copy_id() AS query_id")
227227
query_id = cursor.fetchall()[0][0]
228228
sql = ("-- AWS DATA WRANGLER\n"
229-
f"SELECT COUNT(*) as num_files_loaded FROM STL_LOAD_COMMITS WHERE query = {query_id}")
229+
f"SELECT COUNT(DISTINCT filename) as num_files_loaded "
230+
f"FROM STL_LOAD_COMMITS "
231+
f"WHERE query = {query_id}")
232+
logger.debug(sql)
230233
cursor.execute(sql)
231234
num_files_loaded = cursor.fetchall()[0][0]
232235
if num_files_loaded != num_files:

testing/test_awswrangler/test_redshift.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ def test_to_redshift_pandas(session, bucket, redshift_parameters, sample_name, m
129129
],
130130
)
131131
def test_to_redshift_pandas_glue(session, bucket, redshift_parameters, sample_name, mode, factor, diststyle, distkey,
132-
sortstyle, sortkey):
132+
sortstyle, sortkey):
133133

134134
if sample_name == "micro":
135135
dates = ["date"]

0 commit comments

Comments
 (0)