Skip to content

Commit 2748953

Browse files
committed
Limiting total amount of processes adding logs.
1 parent 49ff97e commit 2748953

File tree

2 files changed

+70
-6
lines changed

2 files changed

+70
-6
lines changed

awswrangler/pandas.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from io import BytesIO
22
import multiprocessing as mp
33
import logging
4+
from math import floor
45

56
import pandas
67
import pyarrow
@@ -104,6 +105,7 @@ def to_csv(
104105
preserve_index=True,
105106
mode="append",
106107
procs_cpu_bound=None,
108+
procs_io_bound=None,
107109
):
108110
return self.to_s3(
109111
dataframe=dataframe,
@@ -115,6 +117,7 @@ def to_csv(
115117
preserve_index=preserve_index,
116118
mode=mode,
117119
procs_cpu_bound=procs_cpu_bound,
120+
procs_io_bound=procs_io_bound,
118121
)
119122

120123
def to_parquet(
@@ -127,6 +130,7 @@ def to_parquet(
127130
preserve_index=True,
128131
mode="append",
129132
procs_cpu_bound=None,
133+
procs_io_bound=None,
130134
):
131135
return self.to_s3(
132136
dataframe=dataframe,
@@ -138,6 +142,7 @@ def to_parquet(
138142
preserve_index=preserve_index,
139143
mode=mode,
140144
procs_cpu_bound=procs_cpu_bound,
145+
procs_io_bound=procs_io_bound,
141146
)
142147

143148
def to_s3(
@@ -151,6 +156,7 @@ def to_s3(
151156
preserve_index=True,
152157
mode="append",
153158
procs_cpu_bound=None,
159+
procs_io_bound=None,
154160
):
155161
if not partition_cols:
156162
partition_cols = []
@@ -167,6 +173,7 @@ def to_s3(
167173
file_format=file_format,
168174
mode=mode,
169175
procs_cpu_bound=procs_cpu_bound,
176+
procs_io_bound=procs_io_bound,
170177
)
171178
if database:
172179
self._session.glue.metadata_to_glue(
@@ -191,9 +198,14 @@ def data_to_s3(
191198
preserve_index=True,
192199
mode="append",
193200
procs_cpu_bound=None,
201+
procs_io_bound=None,
194202
):
195203
if not procs_cpu_bound:
196204
procs_cpu_bound = self._session.procs_cpu_bound
205+
if not procs_io_bound:
206+
procs_io_bound = self._session.procs_io_bound
207+
LOGGER.debug(f"procs_cpu_bound: {procs_cpu_bound}")
208+
LOGGER.debug(f"procs_io_bound: {procs_io_bound}")
197209
if path[-1] == "/":
198210
path = path[:-1]
199211
file_format = file_format.lower()
@@ -237,8 +249,15 @@ def data_to_s3(
237249
file_format=file_format,
238250
)
239251
if mode == "overwrite_partitions" and partition_cols:
252+
if procs_io_bound > procs_cpu_bound:
253+
num_procs = floor(
254+
float(procs_io_bound) / float(procs_cpu_bound))
255+
else:
256+
num_procs = 1
257+
LOGGER.debug(
258+
f"num_procs for delete_not_listed_objects: {num_procs}")
240259
self._session.s3.delete_not_listed_objects(
241-
objects_paths=objects_paths)
260+
objects_paths=objects_paths, procs_io_bound=num_procs)
242261
return objects_paths
243262

244263
@staticmethod

awswrangler/s3.py

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,14 @@ def delete_objects(self, path):
6363
service_name="s3", config=self._session.botocore_config)
6464
procs = []
6565
args = {"Bucket": bucket, "MaxKeys": 1000, "Prefix": path}
66+
LOGGER.debug(f"Arguments: \n{args}")
6667
next_continuation_token = True
6768
while next_continuation_token:
6869
res = client.list_objects_v2(**args)
6970
if not res.get("Contents"):
7071
break
7172
keys = [{"Key": x.get("Key")} for x in res.get("Contents")]
73+
LOGGER.debug(f"Number of listed keys: {len(keys)}")
7274
next_continuation_token = res.get("NextContinuationToken")
7375
if next_continuation_token:
7476
args["ContinuationToken"] = next_continuation_token
@@ -79,15 +81,25 @@ def delete_objects(self, path):
7981
proc.daemon = False
8082
proc.start()
8183
procs.append(proc)
84+
while len(procs) >= self._session.procs_io_bound:
85+
LOGGER.debug(
86+
f"len(procs) ({len(procs)}) >= self._session.procs_io_bound ({self._session.procs_io_bound})"
87+
)
88+
procs[0].join()
89+
del procs[0]
90+
LOGGER.debug(f"Processes deleted from list.")
8291
else:
92+
LOGGER.debug(f"Starting last delete call...")
8393
self.delete_objects_batch(self._session.primitives, bucket,
8494
keys)
95+
LOGGER.debug(f"Waiting final processes...")
8596
for proc in procs:
8697
proc.join()
8798

8899
def delete_listed_objects(self, objects_paths, procs_io_bound=None):
89100
if not procs_io_bound:
90101
procs_io_bound = self._session.procs_io_bound
102+
LOGGER.debug(f"procs_io_bound: {procs_io_bound}")
91103
buckets = {}
92104
for path in objects_paths:
93105
path_cleaned = path.replace("s3://", "")
@@ -98,8 +110,11 @@ def delete_listed_objects(self, objects_paths, procs_io_bound=None):
98110

99111
for bucket, batch in buckets.items():
100112
procs = []
113+
LOGGER.debug(f"bucket: {bucket}")
101114
if procs_io_bound > 1:
115+
LOGGER.debug(f"len(batch): {len(batch)}")
102116
bounders = calculate_bounders(len(batch), procs_io_bound)
117+
LOGGER.debug(f"bounders: {bounders}")
103118
for bounder in bounders:
104119
proc = mp.Process(
105120
target=self.delete_objects_batch,
@@ -118,7 +133,11 @@ def delete_listed_objects(self, objects_paths, procs_io_bound=None):
118133
for proc in procs:
119134
proc.join()
120135

121-
def delete_not_listed_objects(self, objects_paths):
136+
def delete_not_listed_objects(self, objects_paths, procs_io_bound=None):
137+
if not procs_io_bound:
138+
procs_io_bound = self._session.procs_io_bound
139+
LOGGER.debug(f"procs_io_bound: {procs_io_bound}")
140+
122141
partitions = {}
123142
for object_path in objects_paths:
124143
partition_path = f"{object_path.rsplit('/', 1)[0]}/"
@@ -129,20 +148,35 @@ def delete_not_listed_objects(self, objects_paths):
129148
for partition_path, batch in partitions.items():
130149
proc = mp.Process(
131150
target=self.delete_not_listed_batch,
132-
args=(self._session.primitives, partition_path, batch),
151+
args=(self._session.primitives, partition_path, batch, 1),
133152
)
134153
proc.daemon = False
135154
proc.start()
136155
procs.append(proc)
156+
while len(procs) >= procs_io_bound:
157+
LOGGER.debug(
158+
f"len(procs) ({len(procs)}) >= procs_io_bound ({procs_io_bound})"
159+
)
160+
procs[0].join()
161+
del procs[0]
162+
LOGGER.debug(f"Processes deleted from list.")
163+
LOGGER.debug(f"Waiting final processes...")
137164
for proc in procs:
138165
proc.join()
139166

140167
@staticmethod
141-
def delete_not_listed_batch(session_primitives, partition_path, batch):
168+
def delete_not_listed_batch(session_primitives,
169+
partition_path,
170+
batch,
171+
procs_io_bound=None):
142172
session = session_primitives.session
173+
if not procs_io_bound:
174+
procs_io_bound = session.procs_io_bound
175+
LOGGER.debug(f"procs_io_bound: {procs_io_bound}")
143176
keys = session.s3.list_objects(path=partition_path)
144177
dead_keys = [key for key in keys if key not in batch]
145-
session.s3.delete_listed_objects(objects_paths=dead_keys)
178+
session.s3.delete_listed_objects(objects_paths=dead_keys,
179+
procs_io_bound=1)
146180

147181
@staticmethod
148182
def delete_objects_batch(session_primitives, bucket, batch):
@@ -151,6 +185,7 @@ def delete_objects_batch(session_primitives, bucket, batch):
151185
config=session.botocore_config)
152186
num_requests = int(ceil((float(len(batch)) / 1000.0)))
153187
bounders = calculate_bounders(len(batch), num_requests)
188+
LOGGER.debug(f"Bounders: {bounders}")
154189
for bounder in bounders:
155190
client.delete_objects(
156191
Bucket=bucket,
@@ -193,25 +228,30 @@ def _get_objects_head_remote(send_pipe, session_primitives, objects_paths):
193228
client = session.boto3_session.client(service_name="s3",
194229
config=session.botocore_config)
195230
objects_sizes = {}
231+
LOGGER.debug(f"len(objects_paths): {len(objects_paths)}")
196232
for object_path in objects_paths:
197233
bucket, key = object_path.replace("s3://", "").split("/", 1)
198234
res = S3._head_object_with_retry(client=client,
199235
bucket=bucket,
200236
key=key)
201237
size = res.get("ContentLength")
202238
objects_sizes[object_path] = size
239+
LOGGER.debug(f"len(objects_sizes): {len(objects_sizes)}")
203240
send_pipe.send(objects_sizes)
204241
send_pipe.close()
205242

206243
def get_objects_sizes(self, objects_paths, procs_io_bound=None):
207244
if not procs_io_bound:
208245
procs_io_bound = self._session.procs_io_bound
246+
LOGGER.debug(f"procs_io_bound: {procs_io_bound}")
209247
objects_sizes = {}
210248
procs = []
211249
receive_pipes = []
212250
bounders = calculate_bounders(len(objects_paths), procs_io_bound)
251+
LOGGER.debug(f"len(bounders): {len(bounders)}")
213252
for bounder in bounders:
214253
receive_pipe, send_pipe = mp.Pipe()
254+
LOGGER.debug(f"bounder: {bounder}")
215255
proc = mp.Process(
216256
target=self._get_objects_head_remote,
217257
args=(
@@ -224,8 +264,13 @@ def get_objects_sizes(self, objects_paths, procs_io_bound=None):
224264
proc.start()
225265
procs.append(proc)
226266
receive_pipes.append(receive_pipe)
267+
LOGGER.debug(f"len(procs): {len(bounders)}")
227268
for i in range(len(procs)):
228-
objects_sizes.update(receive_pipes[i].recv())
269+
LOGGER.debug(f"Waiting pipe number: {i}")
270+
receved = receive_pipes[i].recv()
271+
objects_sizes.update(receved)
272+
LOGGER.debug(f"Waiting proc number: {i}")
229273
procs[i].join()
274+
LOGGER.debug(f"Closing proc number: {i}")
230275
receive_pipes[i].close()
231276
return objects_sizes

0 commit comments

Comments
 (0)