From 071c8804ddb1f08066076054b9846394b7f9269b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Yi=C4=9Fitsoy?= Date: Wed, 4 May 2022 17:43:01 +0200 Subject: [PATCH 1/7] feat: extend request handler to upload to s3 --- dash_uploader/configure_upload.py | 18 +++- dash_uploader/httprequesthandler.py | 129 +++++++++++++++++++++++++--- dash_uploader/settings.py | 22 +++++ setup.py | 3 + usage.py | 32 ++++++- 5 files changed, 187 insertions(+), 17 deletions(-) diff --git a/dash_uploader/configure_upload.py b/dash_uploader/configure_upload.py index 620ada4..9f327ad 100644 --- a/dash_uploader/configure_upload.py +++ b/dash_uploader/configure_upload.py @@ -9,7 +9,12 @@ def configure_upload( - app, folder, use_upload_id=True, upload_api=None, http_request_handler=None + app, + folder, + use_upload_id=True, + upload_api=None, + http_request_handler=None, + s3_config: settings.S3Configuration=None ): r""" Configure the upload APIs for dash app. @@ -44,6 +49,10 @@ def configure_upload( If you provide a class, use a subclass of HttpRequestHandler. See the documentation of dash_uploader.HttpRequestHandler for more details. + s3_config: None or class + Used for uploading file to a s3 bucket. If provided, `folder` will be used for + temp folder for chunks during multipart upload + """ settings.UPLOAD_FOLDER_ROOT = folder settings.app = app @@ -71,6 +80,7 @@ def configure_upload( upload_api, http_request_handler=http_request_handler, use_upload_id=use_upload_id, + s3_config=s3_config, ) @@ -80,6 +90,7 @@ def decorate_server( upload_api, http_request_handler, use_upload_id=True, + s3_config: settings.S3Configuration=None ): """ Parameters @@ -100,10 +111,13 @@ def decorate_server( session id) will use their own folder. If False, all files from all sessions are uploaded into same folder (not recommended). + s3_config: None or class + Used for uploading file to a s3 bucket. If provided, `folder` will be used for + temp folder for chunks during multipart upload """ handler = http_request_handler( - server, upload_folder=temp_base, use_upload_id=use_upload_id + server, upload_folder=temp_base, use_upload_id=use_upload_id, s3_config=s3_config ) server.add_url_rule(upload_api, None, handler.get, methods=["GET"]) diff --git a/dash_uploader/httprequesthandler.py b/dash_uploader/httprequesthandler.py index af0c9af..decd786 100644 --- a/dash_uploader/httprequesthandler.py +++ b/dash_uploader/httprequesthandler.py @@ -4,14 +4,27 @@ import shutil import time import traceback +from typing import Dict, Final from flask import request from flask import abort - +from dash_uploader.settings import S3Configuration from dash_uploader.utils import retry +# try importing boto3 as it is a feature dependency +try: + import boto3 +except ImportError: + HAS_BOTO = False +else: + HAS_BOTO = True + + logger = logging.getLogger(__name__) +# chunk size should be greater than 5Mb for s3 multipart upload +S3_MIN_CHUNK_SIZE: Final[int] = 5 * 1024 * 1024 + def get_chunk_name(uploaded_filename, chunk_number): return f"{uploaded_filename}_part_{chunk_number}" @@ -34,6 +47,8 @@ def __init__(self, request): """ # Available fields: https://github.com/flowjs/flow.js self.n_chunks_total = request.form.get("flowTotalChunks", type=int) + self.total_size = request.form.get("flowTotalSize", type=int) + self.chunk_size = request.form.get("flowChunkSize", type=int) self.chunk_number = request.form.get("flowChunkNumber", default=1, type=int) self.filename = request.form.get("flowFilename", default="error", type=str) # 'unique' identifier for the file that is being uploaded. @@ -56,9 +71,16 @@ def __init__(self, request): class BaseHttpRequestHandler: + UPLOADS: Dict[str,dict] = {} + remove_file = staticmethod(retry(wait_time=0.35, max_time=15.0)(remove_file)) - def __init__(self, server, upload_folder, use_upload_id): + def __init__( + self, + server, + upload_folder, + use_upload_id, + s3_config: S3Configuration=None): """ Parameters ---------- @@ -74,12 +96,36 @@ def __init__(self, server, upload_folder, use_upload_id): session id) will use their own folder. If False, all files from all sessions are uploaded into same folder (not recommended). + s3_config: None or class + Used for uploading file to a s3 bucket. If provided, `folder` will be used for + temp folder for chunks during multipart upload """ self.server = server self.upload_folder = pathlib.Path(upload_folder) self.use_upload_id = use_upload_id + + if not s3_config: + self.upload_to_s3 = False + else: + if not HAS_BOTO: + raise ValueError("`s3_config` is provided but boto3 is missing. Please re-install dash_uploader with 's3' feature enabled") + self.upload_to_s3 = True + self.s3 = boto3.client( + "s3", + region_name=s3_config.region_name, + use_ssl=s3_config.use_ssl, + endpoint_url=s3_config.endpoint_url, + aws_access_key_id=s3_config.aws_access_key_id, + aws_secret_access_key=s3_config.aws_secret_access_key, + ) + self.bucket = s3_config.bucket #"my-bucket" + pf = s3_config.prefix # "my-root-folder/" + # append trailing separator if provided + pf = pf + "/" if pf and not pf.endswith("/") else pf + # remove leading slash if present + self.prefix = pf.removeprefix("/") def post(self): try: return self._post() @@ -96,6 +142,22 @@ def _post(self): if not temporary_folder_for_file_chunks.exists(): temporary_folder_for_file_chunks.mkdir(parents=True) + if self.upload_to_s3: + # use multipart upload for multichunks + if r.n_chunks_total > 1: + # chunk size should be greater than 5Mb for s3 multipart upload + if r.chunk_number == 1 and r.chunk_size <= S3_MIN_CHUNK_SIZE: + # set chunkSize to a value greater than 5 for Upload component + abort(500, "Chunk size should be greater than 5 Mb for multipart upload") + + res = self.s3.create_multipart_upload(Bucket=self.bucket, Key=self.prefix + r.relative_path) + upload_id = res["UploadId"] + self.UPLOADS[r.unique_identifier] = {"UploadId" : upload_id, "Parts": []} + self.server.logger.debug("Start multipart upload %s" % upload_id) + else: + # do nothing for single chunks, just upload later + pass + # save the chunk data chunk_name = get_chunk_name(r.filename, r.chunk_number) @@ -108,6 +170,33 @@ def _post(self): os.utime(lock_file_path, None) r.chunk_data.save(chunk_file) + + if self.upload_to_s3: + with open(chunk_file, "rb") as stored_chunk_file: + if r.n_chunks_total > 1: + s3_upload = self.UPLOADS.get(r.unique_identifier) + part = self.s3.upload_part( + Body=stored_chunk_file, + Bucket=self.bucket, + Key=self.prefix + r.relative_path, + UploadId=s3_upload["UploadId"], + PartNumber=r.chunk_number + ) + self.server.logger.debug("Uploaded part to s3: %s - %s", r.chunk_number, part) + s3_upload["Parts"].append( + { + "PartNumber": r.chunk_number, + "ETag": part["ETag"] + } + ) + else: + # upload chunk directly + self.s3.upload_fileobj( + Fileobj=stored_chunk_file, + Bucket=self.bucket, + Key=self.prefix + r.relative_path, + ) + self.remove_file(lock_file_path) # check if the upload is complete @@ -147,18 +236,32 @@ def _post(self): ) time.sleep(1) - # Make sure some other chunk didn't trigger file reconstruction - target_file_name = os.path.join(upload_session_root, r.filename) - if os.path.exists(target_file_name): - logger.info("File %s exists already. Overwriting..", target_file_name) - self.remove_file(target_file_name) - - with open(target_file_name, "ab") as target_file: - for p in chunk_paths: - with open(p, "rb") as stored_chunk_file: - target_file.write(stored_chunk_file.read()) - self.server.logger.debug("File saved to: %s", target_file_name) + if self.upload_to_s3 and r.n_chunks_total > 1: + # we need to complete the multipart upload process + s3_upload = self.UPLOADS.get(r.unique_identifier) + result = self.s3.complete_multipart_upload( + Bucket=self.bucket, + Key=self.prefix + r.relative_path, + UploadId=s3_upload["UploadId"], + MultipartUpload={"Parts": s3_upload["Parts"]} + ) + self.server.logger.debug("Uploaded file to s3") + else: + # Make sure some other chunk didn't trigger file reconstruction + target_file_name = os.path.join(upload_session_root, r.filename) + if os.path.exists(target_file_name): + logger.info("File %s exists already. Overwriting..", target_file_name) + self.remove_file(target_file_name) + + with open(target_file_name, "ab") as target_file: + for p in chunk_paths: + with open(p, "rb") as stored_chunk_file: + target_file.write(stored_chunk_file.read()) + self.server.logger.debug("File saved to: %s", target_file_name) shutil.rmtree(temporary_folder_for_file_chunks) + if self.upload_to_s3 and r.n_chunks_total > 1: + # remove the upload record from the hash table + self.UPLOADS.pop(r.unique_identifier, None) return r.filename diff --git a/dash_uploader/settings.py b/dash_uploader/settings.py index 4153a36..d1696e4 100644 --- a/dash_uploader/settings.py +++ b/dash_uploader/settings.py @@ -1,3 +1,6 @@ +from typing import Optional +from dataclasses import dataclass + # The default upload api endpoint # The du.configure_upload can change this upload_api = "/API/dash-uploader" @@ -18,3 +21,22 @@ # `requests_pathname_prefix` and `routes_pathname_prefix`, # not `url_base_pathname`. routes_pathname_prefix = "/" + +# Confguration parameters if the files to be uploaded to a S3 bucket +@dataclass +class S3Configuration: + # s3 region name + region_name: str + # bucket name + bucket: str + # s3 endpoint URL like 'example.xxx.amazonaws.com' + # if "http/https" scheme provided `use_ssl` is ignored + endpoint_url: str + # access key id + aws_access_key_id: str + # secret key + aws_secret_access_key: str + # whether to use secure connection + use_ssl: Optional[bool] = True + # optional prefix under the bucket if provided should end with '/' + prefix: Optional[str] = "" diff --git a/setup.py b/setup.py index 113f4d1..3005e70 100644 --- a/setup.py +++ b/setup.py @@ -42,6 +42,9 @@ # Needs: import chromedriver_binary to the top of your test script. "chromedriver-binary", "Werkzeug~=2.0.3", + ], + "s3" : [ + "boto3>=1.22.5" ] }, classifiers=[ diff --git a/usage.py b/usage.py index 94b8451..c2d7e09 100644 --- a/usage.py +++ b/usage.py @@ -3,6 +3,8 @@ import dash_uploader as du import dash +from dash_uploader.settings import S3Configuration + if du.utils.dash_version_is_at_least("2.0.0"): from dash import html # if dash <= 2.0.0, use: import dash_html_components as html else: @@ -12,8 +14,31 @@ app = dash.Dash(__name__) -UPLOAD_FOLDER_ROOT = r"C:\tmp\Uploads" -du.configure_upload(app, UPLOAD_FOLDER_ROOT) +s3_config = None +### uncomment the following lines to get stored credentials from env or aws config files + +# import boto3 +# session = boto3.Session() +# credentials = session.get_credentials() +# credentials = credentials.get_frozen_credentials() +# access_key = credentials.access_key +# secret_key = credentials.secret_key +# s3_config = S3Configuration( +# region_name = "eu-central-1", +# endpoint_url="https://s3.eu-central-1.amazonaws.com", +# use_ssl=True, +# aws_access_key_id=credentials.access_key, +# aws_secret_access_key=credentials.secret_key, +# bucket="my-bucket", +# prefix="my-prefix", +# ) + +UPLOAD_FOLDER_ROOT = r"/tmp/Uploads" +du.configure_upload( + app=app, + folder=UPLOAD_FOLDER_ROOT, + s3_config=s3_config +) def get_upload_component(id): @@ -25,6 +50,7 @@ def get_upload_component(id): pause_button=True, max_file_size=130, # 130 Mb max_total_size=350, + # chunk_size=6, # 6 MB to use multipart upload to s3 # filetypes=["csv", "zip"], upload_id=uuid.uuid1(), # Unique session id max_files=10, @@ -59,6 +85,8 @@ def get_app_layout(): # This way we can use unique session id's as upload_id's app.layout = get_app_layout +# uncomment the following line to get the logs +# app.server.logger.setLevel(logging.DEBUG) # 3) Create a callback @du.callback( From ea556c98f8c9d8b8667e3ac96fe9eecf861e2140 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Yi=C4=9Fitsoy?= Date: Wed, 4 May 2022 17:58:08 +0200 Subject: [PATCH 2/7] chore: linting --- dash_uploader/configure_upload.py | 23 ++++--- dash_uploader/httprequesthandler.py | 95 ++++++++++++++++------------- dash_uploader/settings.py | 3 +- setup.py | 4 +- tests/conftest.py | 2 +- tests/test_chromedriver.py | 2 +- usage.py | 11 ++-- 7 files changed, 73 insertions(+), 67 deletions(-) diff --git a/dash_uploader/configure_upload.py b/dash_uploader/configure_upload.py index 9f327ad..52b0823 100644 --- a/dash_uploader/configure_upload.py +++ b/dash_uploader/configure_upload.py @@ -9,12 +9,12 @@ def configure_upload( - app, - folder, - use_upload_id=True, - upload_api=None, - http_request_handler=None, - s3_config: settings.S3Configuration=None + app, + folder, + use_upload_id=True, + upload_api=None, + http_request_handler=None, + s3_config: settings.S3Configuration = None, ): r""" Configure the upload APIs for dash app. @@ -50,7 +50,7 @@ def configure_upload( See the documentation of dash_uploader.HttpRequestHandler for more details. s3_config: None or class - Used for uploading file to a s3 bucket. If provided, `folder` will be used for + Used for uploading file to a s3 bucket. If provided, `folder` will be used for temp folder for chunks during multipart upload """ @@ -90,7 +90,7 @@ def decorate_server( upload_api, http_request_handler, use_upload_id=True, - s3_config: settings.S3Configuration=None + s3_config: settings.S3Configuration = None, ): """ Parameters @@ -112,12 +112,15 @@ def decorate_server( all files from all sessions are uploaded into same folder (not recommended). s3_config: None or class - Used for uploading file to a s3 bucket. If provided, `folder` will be used for + Used for uploading file to a s3 bucket. If provided, `folder` will be used for temp folder for chunks during multipart upload """ handler = http_request_handler( - server, upload_folder=temp_base, use_upload_id=use_upload_id, s3_config=s3_config + server, + upload_folder=temp_base, + use_upload_id=use_upload_id, + s3_config=s3_config, ) server.add_url_rule(upload_api, None, handler.get, methods=["GET"]) diff --git a/dash_uploader/httprequesthandler.py b/dash_uploader/httprequesthandler.py index decd786..a3b4305 100644 --- a/dash_uploader/httprequesthandler.py +++ b/dash_uploader/httprequesthandler.py @@ -71,16 +71,13 @@ def __init__(self, request): class BaseHttpRequestHandler: - UPLOADS: Dict[str,dict] = {} + UPLOADS: Dict[str, dict] = {} remove_file = staticmethod(retry(wait_time=0.35, max_time=15.0)(remove_file)) def __init__( - self, - server, - upload_folder, - use_upload_id, - s3_config: S3Configuration=None): + self, server, upload_folder, use_upload_id, s3_config: S3Configuration = None + ): """ Parameters ---------- @@ -97,7 +94,7 @@ def __init__( all files from all sessions are uploaded into same folder (not recommended). s3_config: None or class - Used for uploading file to a s3 bucket. If provided, `folder` will be used for + Used for uploading file to a s3 bucket. If provided, `folder` will be used for temp folder for chunks during multipart upload """ @@ -105,12 +102,13 @@ def __init__( self.upload_folder = pathlib.Path(upload_folder) self.use_upload_id = use_upload_id - if not s3_config: self.upload_to_s3 = False else: if not HAS_BOTO: - raise ValueError("`s3_config` is provided but boto3 is missing. Please re-install dash_uploader with 's3' feature enabled") + raise ValueError( + "`s3_config` is provided but boto3 is missing. Please re-install dash_uploader with 's3' feature enabled" + ) self.upload_to_s3 = True self.s3 = boto3.client( "s3", @@ -120,12 +118,13 @@ def __init__( aws_access_key_id=s3_config.aws_access_key_id, aws_secret_access_key=s3_config.aws_secret_access_key, ) - self.bucket = s3_config.bucket #"my-bucket" - pf = s3_config.prefix # "my-root-folder/" + self.bucket = s3_config.bucket # "my-bucket" + pf = s3_config.prefix # "my-root-folder/" # append trailing separator if provided - pf = pf + "/" if pf and not pf.endswith("/") else pf + pf = pf + "/" if pf and not pf.endswith("/") else pf # remove leading slash if present self.prefix = pf.removeprefix("/") + def post(self): try: return self._post() @@ -148,17 +147,24 @@ def _post(self): # chunk size should be greater than 5Mb for s3 multipart upload if r.chunk_number == 1 and r.chunk_size <= S3_MIN_CHUNK_SIZE: # set chunkSize to a value greater than 5 for Upload component - abort(500, "Chunk size should be greater than 5 Mb for multipart upload") + abort( + 500, + "Chunk size should be greater than 5 Mb for multipart upload", + ) - res = self.s3.create_multipart_upload(Bucket=self.bucket, Key=self.prefix + r.relative_path) + res = self.s3.create_multipart_upload( + Bucket=self.bucket, Key=self.prefix + r.relative_path + ) upload_id = res["UploadId"] - self.UPLOADS[r.unique_identifier] = {"UploadId" : upload_id, "Parts": []} + self.UPLOADS[r.unique_identifier] = { + "UploadId": upload_id, + "Parts": [], + } self.server.logger.debug("Start multipart upload %s" % upload_id) else: # do nothing for single chunks, just upload later pass - # save the chunk data chunk_name = get_chunk_name(r.filename, r.chunk_number) chunk_file = temporary_folder_for_file_chunks / chunk_name @@ -172,30 +178,29 @@ def _post(self): r.chunk_data.save(chunk_file) if self.upload_to_s3: - with open(chunk_file, "rb") as stored_chunk_file: - if r.n_chunks_total > 1: - s3_upload = self.UPLOADS.get(r.unique_identifier) - part = self.s3.upload_part( - Body=stored_chunk_file, - Bucket=self.bucket, - Key=self.prefix + r.relative_path, - UploadId=s3_upload["UploadId"], - PartNumber=r.chunk_number - ) - self.server.logger.debug("Uploaded part to s3: %s - %s", r.chunk_number, part) - s3_upload["Parts"].append( - { - "PartNumber": r.chunk_number, - "ETag": part["ETag"] - } - ) - else: - # upload chunk directly - self.s3.upload_fileobj( - Fileobj=stored_chunk_file, - Bucket=self.bucket, - Key=self.prefix + r.relative_path, - ) + with open(chunk_file, "rb") as stored_chunk_file: + if r.n_chunks_total > 1: + s3_upload = self.UPLOADS.get(r.unique_identifier) + part = self.s3.upload_part( + Body=stored_chunk_file, + Bucket=self.bucket, + Key=self.prefix + r.relative_path, + UploadId=s3_upload["UploadId"], + PartNumber=r.chunk_number, + ) + self.server.logger.debug( + "Uploaded part to s3: %s - %s", r.chunk_number, part + ) + s3_upload["Parts"].append( + {"PartNumber": r.chunk_number, "ETag": part["ETag"]} + ) + else: + # upload chunk directly + self.s3.upload_fileobj( + Fileobj=stored_chunk_file, + Bucket=self.bucket, + Key=self.prefix + r.relative_path, + ) self.remove_file(lock_file_path) @@ -238,19 +243,21 @@ def _post(self): if self.upload_to_s3 and r.n_chunks_total > 1: # we need to complete the multipart upload process - s3_upload = self.UPLOADS.get(r.unique_identifier) + s3_upload = self.UPLOADS.get(r.unique_identifier) result = self.s3.complete_multipart_upload( Bucket=self.bucket, Key=self.prefix + r.relative_path, UploadId=s3_upload["UploadId"], - MultipartUpload={"Parts": s3_upload["Parts"]} + MultipartUpload={"Parts": s3_upload["Parts"]}, ) - self.server.logger.debug("Uploaded file to s3") + self.server.logger.debug("Uploaded file to s3: %s", result) else: # Make sure some other chunk didn't trigger file reconstruction target_file_name = os.path.join(upload_session_root, r.filename) if os.path.exists(target_file_name): - logger.info("File %s exists already. Overwriting..", target_file_name) + logger.info( + "File %s exists already. Overwriting..", target_file_name + ) self.remove_file(target_file_name) with open(target_file_name, "ab") as target_file: diff --git a/dash_uploader/settings.py b/dash_uploader/settings.py index d1696e4..7d9cf31 100644 --- a/dash_uploader/settings.py +++ b/dash_uploader/settings.py @@ -22,6 +22,7 @@ # not `url_base_pathname`. routes_pathname_prefix = "/" + # Confguration parameters if the files to be uploaded to a S3 bucket @dataclass class S3Configuration: @@ -34,7 +35,7 @@ class S3Configuration: endpoint_url: str # access key id aws_access_key_id: str - # secret key + # secret key aws_secret_access_key: str # whether to use secure connection use_ssl: Optional[bool] = True diff --git a/setup.py b/setup.py index 3005e70..f0d2a58 100644 --- a/setup.py +++ b/setup.py @@ -43,9 +43,7 @@ "chromedriver-binary", "Werkzeug~=2.0.3", ], - "s3" : [ - "boto3>=1.22.5" - ] + "s3": ["boto3>=1.22.5"], }, classifiers=[ "Framework :: Dash", diff --git a/tests/conftest.py b/tests/conftest.py index d1dbd77..ba47a44 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -34,4 +34,4 @@ def pytest_collection_modifyitems(items): assert ( len(items) == N_items ), "Tests dropped out in reordering! This should never happen." - return items \ No newline at end of file + return items diff --git a/tests/test_chromedriver.py b/tests/test_chromedriver.py index e90c429..29aafbd 100644 --- a/tests/test_chromedriver.py +++ b/tests/test_chromedriver.py @@ -42,4 +42,4 @@ def test_chromedriver_version_okay(): print(msg) warnings.warn(msg) - raise e \ No newline at end of file + raise e diff --git a/usage.py b/usage.py index c2d7e09..a1cb2dd 100644 --- a/usage.py +++ b/usage.py @@ -3,7 +3,6 @@ import dash_uploader as du import dash -from dash_uploader.settings import S3Configuration if du.utils.dash_version_is_at_least("2.0.0"): from dash import html # if dash <= 2.0.0, use: import dash_html_components as html @@ -15,7 +14,7 @@ app = dash.Dash(__name__) s3_config = None -### uncomment the following lines to get stored credentials from env or aws config files +# uncomment the following lines to get stored credentials from env or aws config files # import boto3 # session = boto3.Session() @@ -23,6 +22,7 @@ # credentials = credentials.get_frozen_credentials() # access_key = credentials.access_key # secret_key = credentials.secret_key +# from dash_uploader.settings import S3Configuration # s3_config = S3Configuration( # region_name = "eu-central-1", # endpoint_url="https://s3.eu-central-1.amazonaws.com", @@ -34,11 +34,7 @@ # ) UPLOAD_FOLDER_ROOT = r"/tmp/Uploads" -du.configure_upload( - app=app, - folder=UPLOAD_FOLDER_ROOT, - s3_config=s3_config -) +du.configure_upload(app=app, folder=UPLOAD_FOLDER_ROOT, s3_config=s3_config) def get_upload_component(id): @@ -88,6 +84,7 @@ def get_app_layout(): # uncomment the following line to get the logs # app.server.logger.setLevel(logging.DEBUG) + # 3) Create a callback @du.callback( output=Output("callback-output", "children"), From b1305ade9b548adb76fdcef5fdf0c8f14b34c7d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Yi=C4=9Fitsoy?= Date: Fri, 6 May 2022 17:34:29 +0200 Subject: [PATCH 3/7] feeat(s3): extend uploadstatus and provide more states in callback --- .flake8 | 6 +++++ dash_uploader/callbacks.py | 40 ++++++++++++++++++++++++----- dash_uploader/configure_upload.py | 13 ++++++---- dash_uploader/httprequesthandler.py | 33 +++++++++++++----------- dash_uploader/s3.py | 31 ++++++++++++++++++++++ dash_uploader/settings.py | 22 ++-------------- dash_uploader/uploadstatus.py | 8 ++++++ package.json | 2 +- usage.py | 17 +++++++----- 9 files changed, 119 insertions(+), 53 deletions(-) create mode 100644 .flake8 create mode 100644 dash_uploader/s3.py diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..8af1969 --- /dev/null +++ b/.flake8 @@ -0,0 +1,6 @@ +[flake8] +ignore = E203, E231, E501, W503, E402, E741 +exclude = ./.venv +max-line-length = 99 +max-complexity = 18 +select = B,C,E,F,W,T4,B9 \ No newline at end of file diff --git a/dash_uploader/callbacks.py b/dash_uploader/callbacks.py index 8ada20f..562d0f9 100644 --- a/dash_uploader/callbacks.py +++ b/dash_uploader/callbacks.py @@ -1,7 +1,9 @@ from pathlib import Path +from urllib.parse import urljoin from dash.exceptions import PreventUpdate from dash.dependencies import Input, State +from dash_uploader.s3 import S3Location import dash_uploader.settings as settings from dash_uploader.uploadstatus import UploadStatus @@ -21,29 +23,45 @@ def wrapper( uploaded_files_size, total_files_size, upload_id, + *args, + **kwargs, ): if not callbackbump: raise PreventUpdate() uploadedfilepaths = [] + s3_location = None if uploaded_filenames is not None: - if upload_id: - root_folder = Path(settings.UPLOAD_FOLDER_ROOT) / upload_id + + # get config and upload id + s3_config = settings.s3_config + s3_location:S3Location = s3_config.location if s3_config else None + _upload_id = upload_id or "" + + # build root folder + if s3_location: + _url = urljoin(s3_location.endpoint_url, s3_location.bucket) + _url = urljoin(_url, s3_location.prefix) + _url = urljoin(_url, _upload_id) + root_folder = Path(_url) else: - root_folder = Path(settings.UPLOAD_FOLDER_ROOT) + root_folder = Path(settings.UPLOAD_FOLDER_ROOT) / _upload_id + # construct full paths to the uploaded files, local or s3 for filename in uploaded_filenames: file = root_folder / filename uploadedfilepaths.append(str(file)) + status = UploadStatus( uploaded_files=uploadedfilepaths, n_total=total_files_count, uploaded_size_mb=uploaded_files_size, total_size_mb=total_files_size, upload_id=upload_id, + s3_location=s3_location, ) - return callback(status) + return callback(status, *args, **kwargs) return wrapper @@ -51,6 +69,7 @@ def wrapper( def callback( output, id="dash-uploader", + state=None, ): """ Add a callback to dash application. @@ -63,12 +82,15 @@ def callback( The output dash component id: str The id of the du.Upload component. + state: dash State(s) + The state dash component Example ------- @du.callback( output=Output('callback-output', 'children'), id='dash-uploader', + state=State('callback-state', 'children'), ) def get_a_list(filenames): return html.Ul([html.Li(filenames)]) @@ -108,6 +130,11 @@ def add_callback(function): # the `prevent_initial_call` option was added in Dash v.1.12 kwargs["prevent_initial_call"] = True + # input states from application + extra_states = [] + if state: + extra_states = [state] if isinstance(state, State) else state + # Input: Change in the props will trigger callback. # Whenever 'this.props.setProps' is called on the JS side, # (dash specific special prop that is passed to every @@ -126,8 +153,9 @@ def add_callback(function): State(id, "uploadedFilesSize"), State(id, "totalFilesSize"), State(id, "upload_id"), - ], - **kwargs + ] + + extra_states, + **kwargs, )(dash_callback) return function diff --git a/dash_uploader/configure_upload.py b/dash_uploader/configure_upload.py index 52b0823..e9cddb7 100644 --- a/dash_uploader/configure_upload.py +++ b/dash_uploader/configure_upload.py @@ -1,5 +1,5 @@ import logging - +from dash_uploader import s3 import dash_uploader.settings as settings from dash_uploader.upload import update_upload_api from dash_uploader.httprequesthandler import HttpRequestHandler @@ -14,7 +14,7 @@ def configure_upload( use_upload_id=True, upload_api=None, http_request_handler=None, - s3_config: settings.S3Configuration = None, + s3_config: s3.S3Configuration = None, ): r""" Configure the upload APIs for dash app. @@ -33,8 +33,10 @@ def configure_upload( use_upload_id: bool Determines if the uploads are put into folders defined by a "upload id" (upload_id). - If True, uploads will be put into `folder`//; - that is, every user (for example with different + If True, uploads will be put into + `folder`// or `s3_config.location.prefix`/// + if s3_config is provided. + That is, every user (for example with different session id) will use their own folder. If False, all files from all sessions are uploaded into same folder (not recommended). @@ -56,6 +58,7 @@ def configure_upload( """ settings.UPLOAD_FOLDER_ROOT = folder settings.app = app + settings.s3_config = s3_config if upload_api is None: upload_api = settings.upload_api @@ -90,7 +93,7 @@ def decorate_server( upload_api, http_request_handler, use_upload_id=True, - s3_config: settings.S3Configuration = None, + s3_config: s3.S3Configuration = None, ): """ Parameters diff --git a/dash_uploader/httprequesthandler.py b/dash_uploader/httprequesthandler.py index a3b4305..9ac3e19 100644 --- a/dash_uploader/httprequesthandler.py +++ b/dash_uploader/httprequesthandler.py @@ -8,7 +8,7 @@ from flask import request from flask import abort -from dash_uploader.settings import S3Configuration +from dash_uploader.s3 import S3Configuration from dash_uploader.utils import retry # try importing boto3 as it is a feature dependency @@ -112,14 +112,14 @@ def __init__( self.upload_to_s3 = True self.s3 = boto3.client( "s3", - region_name=s3_config.region_name, - use_ssl=s3_config.use_ssl, - endpoint_url=s3_config.endpoint_url, - aws_access_key_id=s3_config.aws_access_key_id, - aws_secret_access_key=s3_config.aws_secret_access_key, + region_name=s3_config.location.region_name, + use_ssl=s3_config.location.use_ssl, + endpoint_url=s3_config.location.endpoint_url, + aws_access_key_id=s3_config.credentials.aws_access_key_id, + aws_secret_access_key=s3_config.credentials.aws_secret_access_key, ) - self.bucket = s3_config.bucket # "my-bucket" - pf = s3_config.prefix # "my-root-folder/" + self.bucket = s3_config.location.bucket # "my-bucket" + pf = s3_config.location.prefix # "my-root-folder/" # append trailing separator if provided pf = pf + "/" if pf and not pf.endswith("/") else pf # remove leading slash if present @@ -153,14 +153,14 @@ def _post(self): ) res = self.s3.create_multipart_upload( - Bucket=self.bucket, Key=self.prefix + r.relative_path + Bucket=self.bucket, Key=self.get_s3_path(r) ) - upload_id = res["UploadId"] + s3_upload_id = res["UploadId"] self.UPLOADS[r.unique_identifier] = { - "UploadId": upload_id, + "UploadId": s3_upload_id, "Parts": [], } - self.server.logger.debug("Start multipart upload %s" % upload_id) + self.server.logger.debug("Start multipart upload %s" % s3_upload_id) else: # do nothing for single chunks, just upload later pass @@ -184,7 +184,7 @@ def _post(self): part = self.s3.upload_part( Body=stored_chunk_file, Bucket=self.bucket, - Key=self.prefix + r.relative_path, + Key=self.get_s3_path(r), UploadId=s3_upload["UploadId"], PartNumber=r.chunk_number, ) @@ -199,7 +199,7 @@ def _post(self): self.s3.upload_fileobj( Fileobj=stored_chunk_file, Bucket=self.bucket, - Key=self.prefix + r.relative_path, + Key=self.get_s3_path(r), ) self.remove_file(lock_file_path) @@ -246,7 +246,7 @@ def _post(self): s3_upload = self.UPLOADS.get(r.unique_identifier) result = self.s3.complete_multipart_upload( Bucket=self.bucket, - Key=self.prefix + r.relative_path, + Key=self.get_s3_path(r), UploadId=s3_upload["UploadId"], MultipartUpload={"Parts": s3_upload["Parts"]}, ) @@ -316,6 +316,9 @@ def get_upload_session_root(self, upload_id): else self.upload_folder ) + def get_s3_path(self, r: RequestData): + return os.path.join(self.prefix, r.upload_id, r.relative_path) + class HttpRequestHandler(BaseHttpRequestHandler): # You may use the flask.request diff --git a/dash_uploader/s3.py b/dash_uploader/s3.py new file mode 100644 index 0000000..9ab4ba6 --- /dev/null +++ b/dash_uploader/s3.py @@ -0,0 +1,31 @@ +# Confguration parameters if the files to be uploaded to a S3 bucket +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class S3Location: + # s3 region name + region_name: str + # bucket name + bucket: str + # s3 endpoint URL like 'example.xxx.amazonaws.com' + # if "http/https" scheme provided `use_ssl` is ignored + endpoint_url: str + # whether to use secure connection + use_ssl: Optional[bool] = True + # optional prefix under the bucket if provided should end with '/' + prefix: Optional[str] = "" + + +@dataclass +class S3Credentials: + # access key id + aws_access_key_id: str + # secret key + aws_secret_access_key: str + +@dataclass +class S3Configuration: + location: S3Location + credentials: S3Credentials \ No newline at end of file diff --git a/dash_uploader/settings.py b/dash_uploader/settings.py index 7d9cf31..3f13deb 100644 --- a/dash_uploader/settings.py +++ b/dash_uploader/settings.py @@ -1,5 +1,3 @@ -from typing import Optional -from dataclasses import dataclass # The default upload api endpoint # The du.configure_upload can change this @@ -23,21 +21,5 @@ routes_pathname_prefix = "/" -# Confguration parameters if the files to be uploaded to a S3 bucket -@dataclass -class S3Configuration: - # s3 region name - region_name: str - # bucket name - bucket: str - # s3 endpoint URL like 'example.xxx.amazonaws.com' - # if "http/https" scheme provided `use_ssl` is ignored - endpoint_url: str - # access key id - aws_access_key_id: str - # secret key - aws_secret_access_key: str - # whether to use secure connection - use_ssl: Optional[bool] = True - # optional prefix under the bucket if provided should end with '/' - prefix: Optional[str] = "" +# s3 config to be used by httphandler and UploadStatus +s3_config = None diff --git a/dash_uploader/uploadstatus.py b/dash_uploader/uploadstatus.py index 1277711..596ae52 100644 --- a/dash_uploader/uploadstatus.py +++ b/dash_uploader/uploadstatus.py @@ -1,5 +1,6 @@ from pathlib import Path import warnings +from dash_uploader.s3 import S3Location class UploadStatus: @@ -22,6 +23,8 @@ class UploadStatus: Total size of files to be uploaded in Megabytes status.upload_id (str or None): The upload id used in the upload process, if any. + status.s3_location (S3Location or None): + The S3 location used for uploading, if any """ def __init__( @@ -31,6 +34,7 @@ def __init__( uploaded_size_mb, total_size_mb, upload_id=None, + s3_location: S3Location = None, ): """ Parameters @@ -47,6 +51,8 @@ def __init__( The size of all files to be uploaded upload_id: None or str The upload id used. + s3_location (S3Location or None): + The S3 location used for uploading, if any """ self.uploaded_files = [Path(x) for x in uploaded_files] @@ -65,6 +71,7 @@ def __init__( self.uploaded_size_mb = uploaded_size_mb self.total_size_mb = total_size_mb self.progress = uploaded_size_mb / total_size_mb + self.s3_location = s3_location def __str__(self): @@ -78,5 +85,6 @@ def __str__(self): f"total_size_mb = {self.total_size_mb}", f"progress = {self.progress}", f"upload_id = {self.upload_id}", + f"s3_location = {self.s3_location}", ] return "" diff --git a/package.json b/package.json index b4481b9..a20342c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "dash_uploader", - "version": "0.7.0-a1", + "version": "0.7.0-a3", "description": "Upload component for Dash", "repository": { "type": "git", diff --git a/usage.py b/usage.py index a1cb2dd..700bfc8 100644 --- a/usage.py +++ b/usage.py @@ -9,7 +9,7 @@ else: import dash_html_components as html -from dash.dependencies import Output +from dash.dependencies import Output, State app = dash.Dash(__name__) @@ -22,15 +22,19 @@ # credentials = credentials.get_frozen_credentials() # access_key = credentials.access_key # secret_key = credentials.secret_key -# from dash_uploader.settings import S3Configuration -# s3_config = S3Configuration( +# from dash_uploader import s3 +# s3_config = s3.S3Configuration( +# location=s3.S3Location( # region_name = "eu-central-1", # endpoint_url="https://s3.eu-central-1.amazonaws.com", # use_ssl=True, -# aws_access_key_id=credentials.access_key, -# aws_secret_access_key=credentials.secret_key, # bucket="my-bucket", # prefix="my-prefix", +# ), +# credentials=s3.S3Credentials( +# aws_access_key_id=credentials.access_key, +# aws_secret_access_key=credentials.secret_key, +# ) # ) UPLOAD_FOLDER_ROOT = r"/tmp/Uploads" @@ -89,8 +93,9 @@ def get_app_layout(): @du.callback( output=Output("callback-output", "children"), id="dash-uploader", + state=State("callback-output", "children"), ) -def callback_on_completion(status: du.UploadStatus): +def callback_on_completion(status: du.UploadStatus, state): return html.Ul([html.Li(str(x)) for x in status.uploaded_files]) From 0b05ec173baa815ae01e08aa0bd73560d45ce45e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Yi=C4=9Fitsoy?= Date: Tue, 10 May 2022 12:14:18 +0200 Subject: [PATCH 4/7] fix: add url fragments --- dash_uploader/callbacks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dash_uploader/callbacks.py b/dash_uploader/callbacks.py index 562d0f9..d38838b 100644 --- a/dash_uploader/callbacks.py +++ b/dash_uploader/callbacks.py @@ -41,8 +41,8 @@ def wrapper( # build root folder if s3_location: _url = urljoin(s3_location.endpoint_url, s3_location.bucket) - _url = urljoin(_url, s3_location.prefix) - _url = urljoin(_url, _upload_id) + _url = urljoin(_url, s3_location.prefix, allow_fragments=True) + _url = urljoin(_url, _upload_id, allow_fragments=True) root_folder = Path(_url) else: root_folder = Path(settings.UPLOAD_FOLDER_ROOT) / _upload_id From 2ea328ba44ba15090c91df95d37b0268afc5421b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Yi=C4=9Fitsoy?= Date: Tue, 10 May 2022 12:18:08 +0200 Subject: [PATCH 5/7] chore: bump version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index a20342c..4bf9615 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "dash_uploader", - "version": "0.7.0-a3", + "version": "0.7.0-a4", "description": "Upload component for Dash", "repository": { "type": "git", From 75ffd86d9a4acc31d1488970173409aaec9eca0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Yi=C4=9Fitsoy?= Date: Tue, 10 May 2022 14:00:07 +0200 Subject: [PATCH 6/7] doc: update README --- README.md | 56 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/README.md b/README.md index f0e3ea4..9319210 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,62 @@ if __name__ == '__main__': ``` +## S3 upload support +Direct S3 support is can be enabled by installing the package with `s3` option (`python -m pip install dash-uploader[s3]`). +In order to upload the files directly to an S3 bucket, you could set up an `S3Configuration` object and pass it +to the `configure_upload` method. + +Here is a minimal example; + +```python +import boto3 +session = boto3.Session() +# credentials will be fetched from environment variables or local aws configuration +# see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html on +# how to configure credentials for s3 +credentials = session.get_credentials() +credentials = credentials.get_frozen_credentials() +access_key = credentials.access_key +secret_key = credentials.secret_key +from dash_uploader import s3 +s3_config = s3.S3Configuration( + location=s3.S3Location( + region_name = "eu-central-1", + endpoint_url="https://s3.eu-central-1.amazonaws.com", + use_ssl=True, + bucket="my-bucket", + prefix="my-prefix", + ), + credentials=s3.S3Credentials( + aws_access_key_id=credentials.access_key, + aws_secret_access_key=credentials.secret_key, + ) +) + +UPLOAD_FOLDER_ROOT = r"/tmp/Uploads" +du.configure_upload(app=app, folder=UPLOAD_FOLDER_ROOT, s3_config=s3_config) + +``` + +>⚠️ Large files will be uploaded to s3 in chunks using multiple upload functionality. +`boto3` supports multiple upload only if the chunks are larger than 5Mib. This can be set while creating the `du.Upload` component. + + +## Passing multiple states to `du.callback` + +Now it is possible to capture Dash components states in a `du.callback`. Simply pass `State` object(s) like you would typically do for a Dash callback. + +```python +@du.callback( + output=Output("callback-output", "children"), + id="dash-uploader", + state=State("callback-output", "children"), +) +def callback_on_completion(status: du.UploadStatus, state): + return html.Ul([html.Li(str(x)) for x in status.uploaded_files]) + +``` + ## Contributing From 111f23b8a1c327b392e7b9ddd299cdbcce6a06d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mehmet=20Yi=C4=9Fitsoy?= Date: Tue, 10 May 2022 14:13:41 +0200 Subject: [PATCH 7/7] docs: more documentation --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 9319210..66cdee7 100644 --- a/README.md +++ b/README.md @@ -190,9 +190,15 @@ du.configure_upload(app=app, folder=UPLOAD_FOLDER_ROOT, s3_config=s3_config) ``` +> If `s3_config` is not specified, then a standard upload to local storage will be used. + +Files will be uploaded to `///[/]`. (`/` will be used only if `use_upload_id` set to `True` in `du.configure_upload`) + >⚠️ Large files will be uploaded to s3 in chunks using multiple upload functionality. `boto3` supports multiple upload only if the chunks are larger than 5Mib. This can be set while creating the `du.Upload` component. +> `UPLOAD_FOLDER_ROOT` is still required to store the chunks of large files being uploaded to s3. + ## Passing multiple states to `du.callback`