Skip to content

Commit e37b70f

Browse files
authored
Merge pull request #49 from CBIIT/CRDCDH-2169-001-pgu
CRDCDH 2169
2 parents ef8f91a + 7ec716e commit e37b70f

File tree

9 files changed

+263
-77
lines changed

9 files changed

+263
-77
lines changed

requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pyyaml
22
boto3
33
requests
4-
requests_aws4auth
4+
requests_aws4auth
5+
tqdm

src/common/graphql_client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,9 @@ def get_data_file_config(self, submissionID):
166166
if status == 200:
167167
results = response.json()
168168
if results.get("errors"):
169-
self.log.error(f'Get data file config failed: {results.get("errors")[0].get("message")}.')
169+
msg = f'Get data file config failed: {results.get("errors")[0].get("message")}.'
170+
self.log.error(msg)
171+
return False, None
170172
else:
171173
data_file_config = results.get("data").get("retrieveFileNodeConfig")
172174
if data_file_config:

src/common/s3util.py

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
#!/usr/bin/env python
2+
import os
23
import boto3
4+
from boto3.s3.transfer import TransferConfig, S3Transfer
5+
from tqdm import tqdm # For progress bar
36
from botocore.exceptions import ClientError
47

58
from bento.common.utils import get_logger
@@ -55,14 +58,29 @@ def file_exists_on_s3(self, key):
5558
self.log.exception(e)
5659
return False, msg
5760

58-
def put_file_obj(self, key, data, md5_base64):
59-
return self.bucket.put_object(Key=key,
60-
Body=data,
61-
ContentMD5=md5_base64,
62-
ACL= BUCKET_OWNER_ACL)
61+
def put_file_obj(self, file_size, key, data, md5_base64):
62+
# Initialize the progress bar
63+
progress = create_progress_bar(file_size)
64+
chunk_size = 1024 * 1024 if file_size >= 1024 * 1024 else file_size #chunk data for display progress for small metadata file < 4,500,000,000 bytes
65+
try:
66+
# Upload the file in chunks
67+
for chunk in iter(lambda: data.read(chunk_size), b''):
68+
self.bucket.put_object(Key=key,
69+
Body=chunk,
70+
ContentMD5=md5_base64,
71+
ACL= BUCKET_OWNER_ACL,
72+
)
73+
# Update the progress bar
74+
progress.update(len(chunk))
75+
finally:
76+
# Close the progress bar
77+
progress.close()
6378

64-
def upload_file_obj(self, key, data, config=None, extra_args={'ACL': BUCKET_OWNER_ACL}):
65-
return self.bucket.upload_fileobj(data, key, ExtraArgs=extra_args, Config=config)
79+
def upload_file_obj(self, file_size, key, data, config=None, extra_args={'ACL': BUCKET_OWNER_ACL}):
80+
self.bucket.upload_fileobj(
81+
data, key, ExtraArgs=extra_args, Config=config,
82+
Callback=ProgressPercentage(file_size)
83+
)
6684

6785
def get_object_size(self, key):
6886
try:
@@ -90,7 +108,9 @@ def same_size_file_exists(self, key, file_size):
90108

91109
def download_object(self, key, local_file_path):
92110
try:
93-
self.bucket.download_file( key, local_file_path)
111+
file_size, msg = self.get_object_size(key)
112+
self.bucket.download_file(key, local_file_path,
113+
Callback=ProgressPercentage(file_size))
94114
return True, None
95115
except ClientError as ce:
96116
msg = None
@@ -115,5 +135,26 @@ def close(self):
115135
self.client = None
116136
self.bucket = None
117137
self.s3 = None
138+
139+
"""
140+
S3 util class to display upload progress
141+
"""
142+
class ProgressPercentage(object):
143+
def __init__(self, file_size):
144+
self._size = file_size
145+
self._seen_so_far = 0
146+
self._progress = create_progress_bar(file_size)
147+
148+
def __call__(self, bytes_amount):
149+
self._seen_so_far += bytes_amount
150+
self._progress.update(bytes_amount)
151+
152+
def __del__(self):
153+
self._progress.close()
154+
155+
def create_progress_bar(file_size):
156+
progress_bar = tqdm(total= file_size, unit='B', unit_scale=True, desc="Progress", smoothing=0.0,
157+
bar_format="{l_bar}\033[1;32m{bar}\033[0m| {n_fmt}/{total_fmt} [elapsed: {elapsed} | remaining: {remaining}, {rate_fmt}]")
158+
return progress_bar
118159

119160

src/common/utils.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from uuid import UUID
55
from datetime import datetime
66
from common.constants import S3_START
7+
import os
78

89
"""
910
clean_up_key_value(dict)
@@ -94,4 +95,54 @@ def extract_s3_info_from_url(url):
9495
prefix = "/".join(split_list[1:])
9596
return bucket_name, prefix
9697

98+
def format_size(size_in_bytes):
99+
"""
100+
Convert a size in bytes to a human-readable format with proper units.
101+
"""
102+
# Define the units and their respective thresholds
103+
units = ["Bytes", "KB", "MB", "GB", "TB", "PB"]
104+
unit_index = 0
105+
106+
# Convert to the appropriate unit
107+
while size_in_bytes >= 1024 and unit_index < len(units) - 1:
108+
size_in_bytes /= 1024
109+
unit_index += 1
110+
111+
# Return the formatted string with 2 decimal places
112+
return f"{size_in_bytes:.2f} {units[unit_index]}"
113+
114+
def calculate_eclipse_time(file_size, upload_speed):
115+
"""
116+
Calculate the upload time in hh:mm:ss format given the file size and upload speed.
117+
118+
:param file_size: Size of the file in bytes.
119+
:param upload_speed: Upload speed in bps (megabits per second).
120+
:return: Upload time in hh:mm:ss format.
121+
"""
122+
# Convert file size from bytes to bits (1 byte = 8 bits)
123+
file_size_bits = file_size * 8
124+
125+
# Calculate total upload time in seconds
126+
upload_time_seconds = file_size_bits / upload_speed
127+
128+
# Format the time as hh:mm:ss
129+
return format_time(upload_time_seconds)
130+
131+
def format_time(seconds):
132+
"""
133+
Format seconds into hh:mm:ss format.
134+
135+
:param seconds: Time in seconds.
136+
:return: Formatted time string in hh:mm:ss format.
137+
"""
138+
if seconds < 1:
139+
return "less than 1 sec"
140+
hours = int(seconds // 3600)
141+
remaining_seconds = seconds % 3600
142+
minutes = int(remaining_seconds // 60)
143+
seconds = int(remaining_seconds % 60)
144+
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
145+
146+
147+
97148

src/copier.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,12 @@ def copy_file(self, file_info, overwrite, dryrun):
102102
else:
103103
file_info[SKIPPED] = False
104104
#self.log.info(f'Copying from {org_url} to s3://{self.bucket_name}/{key.strip("/")} ...')
105-
self.log.info(f'Copying from {org_url} to destination folder in S3 bucket ...')
106-
dest_size, msg = self._upload_obj(org_url, key, org_size)
105+
self.log.info(f'Uploading file, "{org_url}" to destination...')
106+
dest_size = self._upload_obj(org_url, key, org_size)
107107
if dest_size != org_size:
108108
self.log.error(f'Uploading “{file_name}” failed - uploading was not complete. Please try again and contact the helpdesk if this error persists.')
109109
return {self.STATUS: False}
110-
110+
111111
return succeed
112112
except ClientError as ce:
113113
self.log.debug(ce)
@@ -137,13 +137,13 @@ def _upload_obj(self, org_url, key, org_size):
137137
t_config = TransferConfig(multipart_threshold=self.MULTI_PART_THRESHOLD,
138138
multipart_chunksize=chunk_size)
139139
with open(org_url, 'rb') as stream:
140-
self.bucket.upload_file_obj(key, stream, t_config)
140+
self.bucket.upload_file_obj(org_size, key, stream, t_config)
141141
else: #small file
142142
md5_obj = get_md5_hex_n_base64(org_url)
143143
md5_base64 = md5_obj['base64']
144144
with open(org_url, 'rb') as data:
145-
self.bucket.put_file_obj(key, data, md5_base64)
146-
145+
self.bucket.put_file_obj(org_size, key, data, md5_base64 )
146+
147147
self.files_copied += 1
148-
self.log.info(f'Copying file {key} SUCCEEDED!')
149-
return self.bucket.get_object_size(key)
148+
size, msg = self.bucket.get_object_size(key)
149+
return size

0 commit comments

Comments
 (0)