diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..8af1969 --- /dev/null +++ b/.flake8 @@ -0,0 +1,6 @@ +[flake8] +ignore = E203, E231, E501, W503, E402, E741 +exclude = ./.venv +max-line-length = 99 +max-complexity = 18 +select = B,C,E,F,W,T4,B9 \ No newline at end of file diff --git a/README.md b/README.md index f0e3ea4..66cdee7 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,68 @@ if __name__ == '__main__': ``` +## S3 upload support +Direct S3 support is can be enabled by installing the package with `s3` option (`python -m pip install dash-uploader[s3]`). +In order to upload the files directly to an S3 bucket, you could set up an `S3Configuration` object and pass it +to the `configure_upload` method. + +Here is a minimal example; + +```python +import boto3 +session = boto3.Session() +# credentials will be fetched from environment variables or local aws configuration +# see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html on +# how to configure credentials for s3 +credentials = session.get_credentials() +credentials = credentials.get_frozen_credentials() +access_key = credentials.access_key +secret_key = credentials.secret_key +from dash_uploader import s3 +s3_config = s3.S3Configuration( + location=s3.S3Location( + region_name = "eu-central-1", + endpoint_url="https://s3.eu-central-1.amazonaws.com", + use_ssl=True, + bucket="my-bucket", + prefix="my-prefix", + ), + credentials=s3.S3Credentials( + aws_access_key_id=credentials.access_key, + aws_secret_access_key=credentials.secret_key, + ) +) + +UPLOAD_FOLDER_ROOT = r"/tmp/Uploads" +du.configure_upload(app=app, folder=UPLOAD_FOLDER_ROOT, s3_config=s3_config) + +``` + +> If `s3_config` is not specified, then a standard upload to local storage will be used. + +Files will be uploaded to `///[/]`. (`/` will be used only if `use_upload_id` set to `True` in `du.configure_upload`) + +>⚠️ Large files will be uploaded to s3 in chunks using multiple upload functionality. +`boto3` supports multiple upload only if the chunks are larger than 5Mib. This can be set while creating the `du.Upload` component. + +> `UPLOAD_FOLDER_ROOT` is still required to store the chunks of large files being uploaded to s3. + + +## Passing multiple states to `du.callback` + +Now it is possible to capture Dash components states in a `du.callback`. Simply pass `State` object(s) like you would typically do for a Dash callback. + +```python +@du.callback( + output=Output("callback-output", "children"), + id="dash-uploader", + state=State("callback-output", "children"), +) +def callback_on_completion(status: du.UploadStatus, state): + return html.Ul([html.Li(str(x)) for x in status.uploaded_files]) + +``` + ## Contributing diff --git a/dash_uploader/callbacks.py b/dash_uploader/callbacks.py index 8ada20f..d38838b 100644 --- a/dash_uploader/callbacks.py +++ b/dash_uploader/callbacks.py @@ -1,7 +1,9 @@ from pathlib import Path +from urllib.parse import urljoin from dash.exceptions import PreventUpdate from dash.dependencies import Input, State +from dash_uploader.s3 import S3Location import dash_uploader.settings as settings from dash_uploader.uploadstatus import UploadStatus @@ -21,29 +23,45 @@ def wrapper( uploaded_files_size, total_files_size, upload_id, + *args, + **kwargs, ): if not callbackbump: raise PreventUpdate() uploadedfilepaths = [] + s3_location = None if uploaded_filenames is not None: - if upload_id: - root_folder = Path(settings.UPLOAD_FOLDER_ROOT) / upload_id + + # get config and upload id + s3_config = settings.s3_config + s3_location:S3Location = s3_config.location if s3_config else None + _upload_id = upload_id or "" + + # build root folder + if s3_location: + _url = urljoin(s3_location.endpoint_url, s3_location.bucket) + _url = urljoin(_url, s3_location.prefix, allow_fragments=True) + _url = urljoin(_url, _upload_id, allow_fragments=True) + root_folder = Path(_url) else: - root_folder = Path(settings.UPLOAD_FOLDER_ROOT) + root_folder = Path(settings.UPLOAD_FOLDER_ROOT) / _upload_id + # construct full paths to the uploaded files, local or s3 for filename in uploaded_filenames: file = root_folder / filename uploadedfilepaths.append(str(file)) + status = UploadStatus( uploaded_files=uploadedfilepaths, n_total=total_files_count, uploaded_size_mb=uploaded_files_size, total_size_mb=total_files_size, upload_id=upload_id, + s3_location=s3_location, ) - return callback(status) + return callback(status, *args, **kwargs) return wrapper @@ -51,6 +69,7 @@ def wrapper( def callback( output, id="dash-uploader", + state=None, ): """ Add a callback to dash application. @@ -63,12 +82,15 @@ def callback( The output dash component id: str The id of the du.Upload component. + state: dash State(s) + The state dash component Example ------- @du.callback( output=Output('callback-output', 'children'), id='dash-uploader', + state=State('callback-state', 'children'), ) def get_a_list(filenames): return html.Ul([html.Li(filenames)]) @@ -108,6 +130,11 @@ def add_callback(function): # the `prevent_initial_call` option was added in Dash v.1.12 kwargs["prevent_initial_call"] = True + # input states from application + extra_states = [] + if state: + extra_states = [state] if isinstance(state, State) else state + # Input: Change in the props will trigger callback. # Whenever 'this.props.setProps' is called on the JS side, # (dash specific special prop that is passed to every @@ -126,8 +153,9 @@ def add_callback(function): State(id, "uploadedFilesSize"), State(id, "totalFilesSize"), State(id, "upload_id"), - ], - **kwargs + ] + + extra_states, + **kwargs, )(dash_callback) return function diff --git a/dash_uploader/configure_upload.py b/dash_uploader/configure_upload.py index 620ada4..e9cddb7 100644 --- a/dash_uploader/configure_upload.py +++ b/dash_uploader/configure_upload.py @@ -1,5 +1,5 @@ import logging - +from dash_uploader import s3 import dash_uploader.settings as settings from dash_uploader.upload import update_upload_api from dash_uploader.httprequesthandler import HttpRequestHandler @@ -9,7 +9,12 @@ def configure_upload( - app, folder, use_upload_id=True, upload_api=None, http_request_handler=None + app, + folder, + use_upload_id=True, + upload_api=None, + http_request_handler=None, + s3_config: s3.S3Configuration = None, ): r""" Configure the upload APIs for dash app. @@ -28,8 +33,10 @@ def configure_upload( use_upload_id: bool Determines if the uploads are put into folders defined by a "upload id" (upload_id). - If True, uploads will be put into `folder`//; - that is, every user (for example with different + If True, uploads will be put into + `folder`// or `s3_config.location.prefix`/// + if s3_config is provided. + That is, every user (for example with different session id) will use their own folder. If False, all files from all sessions are uploaded into same folder (not recommended). @@ -44,9 +51,14 @@ def configure_upload( If you provide a class, use a subclass of HttpRequestHandler. See the documentation of dash_uploader.HttpRequestHandler for more details. + s3_config: None or class + Used for uploading file to a s3 bucket. If provided, `folder` will be used for + temp folder for chunks during multipart upload + """ settings.UPLOAD_FOLDER_ROOT = folder settings.app = app + settings.s3_config = s3_config if upload_api is None: upload_api = settings.upload_api @@ -71,6 +83,7 @@ def configure_upload( upload_api, http_request_handler=http_request_handler, use_upload_id=use_upload_id, + s3_config=s3_config, ) @@ -80,6 +93,7 @@ def decorate_server( upload_api, http_request_handler, use_upload_id=True, + s3_config: s3.S3Configuration = None, ): """ Parameters @@ -100,10 +114,16 @@ def decorate_server( session id) will use their own folder. If False, all files from all sessions are uploaded into same folder (not recommended). + s3_config: None or class + Used for uploading file to a s3 bucket. If provided, `folder` will be used for + temp folder for chunks during multipart upload """ handler = http_request_handler( - server, upload_folder=temp_base, use_upload_id=use_upload_id + server, + upload_folder=temp_base, + use_upload_id=use_upload_id, + s3_config=s3_config, ) server.add_url_rule(upload_api, None, handler.get, methods=["GET"]) diff --git a/dash_uploader/httprequesthandler.py b/dash_uploader/httprequesthandler.py index af0c9af..9ac3e19 100644 --- a/dash_uploader/httprequesthandler.py +++ b/dash_uploader/httprequesthandler.py @@ -4,14 +4,27 @@ import shutil import time import traceback +from typing import Dict, Final from flask import request from flask import abort - +from dash_uploader.s3 import S3Configuration from dash_uploader.utils import retry +# try importing boto3 as it is a feature dependency +try: + import boto3 +except ImportError: + HAS_BOTO = False +else: + HAS_BOTO = True + + logger = logging.getLogger(__name__) +# chunk size should be greater than 5Mb for s3 multipart upload +S3_MIN_CHUNK_SIZE: Final[int] = 5 * 1024 * 1024 + def get_chunk_name(uploaded_filename, chunk_number): return f"{uploaded_filename}_part_{chunk_number}" @@ -34,6 +47,8 @@ def __init__(self, request): """ # Available fields: https://github.com/flowjs/flow.js self.n_chunks_total = request.form.get("flowTotalChunks", type=int) + self.total_size = request.form.get("flowTotalSize", type=int) + self.chunk_size = request.form.get("flowChunkSize", type=int) self.chunk_number = request.form.get("flowChunkNumber", default=1, type=int) self.filename = request.form.get("flowFilename", default="error", type=str) # 'unique' identifier for the file that is being uploaded. @@ -56,9 +71,13 @@ def __init__(self, request): class BaseHttpRequestHandler: + UPLOADS: Dict[str, dict] = {} + remove_file = staticmethod(retry(wait_time=0.35, max_time=15.0)(remove_file)) - def __init__(self, server, upload_folder, use_upload_id): + def __init__( + self, server, upload_folder, use_upload_id, s3_config: S3Configuration = None + ): """ Parameters ---------- @@ -74,12 +93,38 @@ def __init__(self, server, upload_folder, use_upload_id): session id) will use their own folder. If False, all files from all sessions are uploaded into same folder (not recommended). + s3_config: None or class + Used for uploading file to a s3 bucket. If provided, `folder` will be used for + temp folder for chunks during multipart upload """ self.server = server self.upload_folder = pathlib.Path(upload_folder) self.use_upload_id = use_upload_id + if not s3_config: + self.upload_to_s3 = False + else: + if not HAS_BOTO: + raise ValueError( + "`s3_config` is provided but boto3 is missing. Please re-install dash_uploader with 's3' feature enabled" + ) + self.upload_to_s3 = True + self.s3 = boto3.client( + "s3", + region_name=s3_config.location.region_name, + use_ssl=s3_config.location.use_ssl, + endpoint_url=s3_config.location.endpoint_url, + aws_access_key_id=s3_config.credentials.aws_access_key_id, + aws_secret_access_key=s3_config.credentials.aws_secret_access_key, + ) + self.bucket = s3_config.location.bucket # "my-bucket" + pf = s3_config.location.prefix # "my-root-folder/" + # append trailing separator if provided + pf = pf + "/" if pf and not pf.endswith("/") else pf + # remove leading slash if present + self.prefix = pf.removeprefix("/") + def post(self): try: return self._post() @@ -96,6 +141,29 @@ def _post(self): if not temporary_folder_for_file_chunks.exists(): temporary_folder_for_file_chunks.mkdir(parents=True) + if self.upload_to_s3: + # use multipart upload for multichunks + if r.n_chunks_total > 1: + # chunk size should be greater than 5Mb for s3 multipart upload + if r.chunk_number == 1 and r.chunk_size <= S3_MIN_CHUNK_SIZE: + # set chunkSize to a value greater than 5 for Upload component + abort( + 500, + "Chunk size should be greater than 5 Mb for multipart upload", + ) + + res = self.s3.create_multipart_upload( + Bucket=self.bucket, Key=self.get_s3_path(r) + ) + s3_upload_id = res["UploadId"] + self.UPLOADS[r.unique_identifier] = { + "UploadId": s3_upload_id, + "Parts": [], + } + self.server.logger.debug("Start multipart upload %s" % s3_upload_id) + else: + # do nothing for single chunks, just upload later + pass # save the chunk data chunk_name = get_chunk_name(r.filename, r.chunk_number) @@ -108,6 +176,32 @@ def _post(self): os.utime(lock_file_path, None) r.chunk_data.save(chunk_file) + + if self.upload_to_s3: + with open(chunk_file, "rb") as stored_chunk_file: + if r.n_chunks_total > 1: + s3_upload = self.UPLOADS.get(r.unique_identifier) + part = self.s3.upload_part( + Body=stored_chunk_file, + Bucket=self.bucket, + Key=self.get_s3_path(r), + UploadId=s3_upload["UploadId"], + PartNumber=r.chunk_number, + ) + self.server.logger.debug( + "Uploaded part to s3: %s - %s", r.chunk_number, part + ) + s3_upload["Parts"].append( + {"PartNumber": r.chunk_number, "ETag": part["ETag"]} + ) + else: + # upload chunk directly + self.s3.upload_fileobj( + Fileobj=stored_chunk_file, + Bucket=self.bucket, + Key=self.get_s3_path(r), + ) + self.remove_file(lock_file_path) # check if the upload is complete @@ -147,18 +241,34 @@ def _post(self): ) time.sleep(1) - # Make sure some other chunk didn't trigger file reconstruction - target_file_name = os.path.join(upload_session_root, r.filename) - if os.path.exists(target_file_name): - logger.info("File %s exists already. Overwriting..", target_file_name) - self.remove_file(target_file_name) - - with open(target_file_name, "ab") as target_file: - for p in chunk_paths: - with open(p, "rb") as stored_chunk_file: - target_file.write(stored_chunk_file.read()) - self.server.logger.debug("File saved to: %s", target_file_name) + if self.upload_to_s3 and r.n_chunks_total > 1: + # we need to complete the multipart upload process + s3_upload = self.UPLOADS.get(r.unique_identifier) + result = self.s3.complete_multipart_upload( + Bucket=self.bucket, + Key=self.get_s3_path(r), + UploadId=s3_upload["UploadId"], + MultipartUpload={"Parts": s3_upload["Parts"]}, + ) + self.server.logger.debug("Uploaded file to s3: %s", result) + else: + # Make sure some other chunk didn't trigger file reconstruction + target_file_name = os.path.join(upload_session_root, r.filename) + if os.path.exists(target_file_name): + logger.info( + "File %s exists already. Overwriting..", target_file_name + ) + self.remove_file(target_file_name) + + with open(target_file_name, "ab") as target_file: + for p in chunk_paths: + with open(p, "rb") as stored_chunk_file: + target_file.write(stored_chunk_file.read()) + self.server.logger.debug("File saved to: %s", target_file_name) shutil.rmtree(temporary_folder_for_file_chunks) + if self.upload_to_s3 and r.n_chunks_total > 1: + # remove the upload record from the hash table + self.UPLOADS.pop(r.unique_identifier, None) return r.filename @@ -206,6 +316,9 @@ def get_upload_session_root(self, upload_id): else self.upload_folder ) + def get_s3_path(self, r: RequestData): + return os.path.join(self.prefix, r.upload_id, r.relative_path) + class HttpRequestHandler(BaseHttpRequestHandler): # You may use the flask.request diff --git a/dash_uploader/s3.py b/dash_uploader/s3.py new file mode 100644 index 0000000..9ab4ba6 --- /dev/null +++ b/dash_uploader/s3.py @@ -0,0 +1,31 @@ +# Confguration parameters if the files to be uploaded to a S3 bucket +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class S3Location: + # s3 region name + region_name: str + # bucket name + bucket: str + # s3 endpoint URL like 'example.xxx.amazonaws.com' + # if "http/https" scheme provided `use_ssl` is ignored + endpoint_url: str + # whether to use secure connection + use_ssl: Optional[bool] = True + # optional prefix under the bucket if provided should end with '/' + prefix: Optional[str] = "" + + +@dataclass +class S3Credentials: + # access key id + aws_access_key_id: str + # secret key + aws_secret_access_key: str + +@dataclass +class S3Configuration: + location: S3Location + credentials: S3Credentials \ No newline at end of file diff --git a/dash_uploader/settings.py b/dash_uploader/settings.py index 4153a36..3f13deb 100644 --- a/dash_uploader/settings.py +++ b/dash_uploader/settings.py @@ -1,3 +1,4 @@ + # The default upload api endpoint # The du.configure_upload can change this upload_api = "/API/dash-uploader" @@ -18,3 +19,7 @@ # `requests_pathname_prefix` and `routes_pathname_prefix`, # not `url_base_pathname`. routes_pathname_prefix = "/" + + +# s3 config to be used by httphandler and UploadStatus +s3_config = None diff --git a/dash_uploader/uploadstatus.py b/dash_uploader/uploadstatus.py index 1277711..596ae52 100644 --- a/dash_uploader/uploadstatus.py +++ b/dash_uploader/uploadstatus.py @@ -1,5 +1,6 @@ from pathlib import Path import warnings +from dash_uploader.s3 import S3Location class UploadStatus: @@ -22,6 +23,8 @@ class UploadStatus: Total size of files to be uploaded in Megabytes status.upload_id (str or None): The upload id used in the upload process, if any. + status.s3_location (S3Location or None): + The S3 location used for uploading, if any """ def __init__( @@ -31,6 +34,7 @@ def __init__( uploaded_size_mb, total_size_mb, upload_id=None, + s3_location: S3Location = None, ): """ Parameters @@ -47,6 +51,8 @@ def __init__( The size of all files to be uploaded upload_id: None or str The upload id used. + s3_location (S3Location or None): + The S3 location used for uploading, if any """ self.uploaded_files = [Path(x) for x in uploaded_files] @@ -65,6 +71,7 @@ def __init__( self.uploaded_size_mb = uploaded_size_mb self.total_size_mb = total_size_mb self.progress = uploaded_size_mb / total_size_mb + self.s3_location = s3_location def __str__(self): @@ -78,5 +85,6 @@ def __str__(self): f"total_size_mb = {self.total_size_mb}", f"progress = {self.progress}", f"upload_id = {self.upload_id}", + f"s3_location = {self.s3_location}", ] return "" diff --git a/package.json b/package.json index b4481b9..4bf9615 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "dash_uploader", - "version": "0.7.0-a1", + "version": "0.7.0-a4", "description": "Upload component for Dash", "repository": { "type": "git", diff --git a/setup.py b/setup.py index 113f4d1..f0d2a58 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,8 @@ # Needs: import chromedriver_binary to the top of your test script. "chromedriver-binary", "Werkzeug~=2.0.3", - ] + ], + "s3": ["boto3>=1.22.5"], }, classifiers=[ "Framework :: Dash", diff --git a/tests/conftest.py b/tests/conftest.py index d1dbd77..ba47a44 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -34,4 +34,4 @@ def pytest_collection_modifyitems(items): assert ( len(items) == N_items ), "Tests dropped out in reordering! This should never happen." - return items \ No newline at end of file + return items diff --git a/tests/test_chromedriver.py b/tests/test_chromedriver.py index e90c429..29aafbd 100644 --- a/tests/test_chromedriver.py +++ b/tests/test_chromedriver.py @@ -42,4 +42,4 @@ def test_chromedriver_version_okay(): print(msg) warnings.warn(msg) - raise e \ No newline at end of file + raise e diff --git a/usage.py b/usage.py index 94b8451..700bfc8 100644 --- a/usage.py +++ b/usage.py @@ -3,17 +3,42 @@ import dash_uploader as du import dash + if du.utils.dash_version_is_at_least("2.0.0"): from dash import html # if dash <= 2.0.0, use: import dash_html_components as html else: import dash_html_components as html -from dash.dependencies import Output +from dash.dependencies import Output, State app = dash.Dash(__name__) -UPLOAD_FOLDER_ROOT = r"C:\tmp\Uploads" -du.configure_upload(app, UPLOAD_FOLDER_ROOT) +s3_config = None +# uncomment the following lines to get stored credentials from env or aws config files + +# import boto3 +# session = boto3.Session() +# credentials = session.get_credentials() +# credentials = credentials.get_frozen_credentials() +# access_key = credentials.access_key +# secret_key = credentials.secret_key +# from dash_uploader import s3 +# s3_config = s3.S3Configuration( +# location=s3.S3Location( +# region_name = "eu-central-1", +# endpoint_url="https://s3.eu-central-1.amazonaws.com", +# use_ssl=True, +# bucket="my-bucket", +# prefix="my-prefix", +# ), +# credentials=s3.S3Credentials( +# aws_access_key_id=credentials.access_key, +# aws_secret_access_key=credentials.secret_key, +# ) +# ) + +UPLOAD_FOLDER_ROOT = r"/tmp/Uploads" +du.configure_upload(app=app, folder=UPLOAD_FOLDER_ROOT, s3_config=s3_config) def get_upload_component(id): @@ -25,6 +50,7 @@ def get_upload_component(id): pause_button=True, max_file_size=130, # 130 Mb max_total_size=350, + # chunk_size=6, # 6 MB to use multipart upload to s3 # filetypes=["csv", "zip"], upload_id=uuid.uuid1(), # Unique session id max_files=10, @@ -59,13 +85,17 @@ def get_app_layout(): # This way we can use unique session id's as upload_id's app.layout = get_app_layout +# uncomment the following line to get the logs +# app.server.logger.setLevel(logging.DEBUG) + # 3) Create a callback @du.callback( output=Output("callback-output", "children"), id="dash-uploader", + state=State("callback-output", "children"), ) -def callback_on_completion(status: du.UploadStatus): +def callback_on_completion(status: du.UploadStatus, state): return html.Ul([html.Li(str(x)) for x in status.uploaded_files])