diff --git a/.gitignore b/.gitignore index c8fc73a..2b43863 100644 --- a/.gitignore +++ b/.gitignore @@ -123,3 +123,6 @@ dmypy.json # Yarn cache .yarn/ + +# untitled notebooks +Untitled.ipynb diff --git a/jupyter_drives/handlers.py b/jupyter_drives/handlers.py index 696f082..9b3a775 100644 --- a/jupyter_drives/handlers.py +++ b/jupyter_drives/handlers.py @@ -88,7 +88,8 @@ async def get(self, drive: str = "", path: str = ""): @tornado.web.authenticated async def post(self, drive: str = "", path: str = ""): - result = await self._manager.new_file(drive, path) + body = self.get_json_body() + result = await self._manager.new_file(drive, path, **body) self.finish(result) @tornado.web.authenticated diff --git a/jupyter_drives/manager.py b/jupyter_drives/manager.py index 7c467b7..209276f 100644 --- a/jupyter_drives/manager.py +++ b/jupyter_drives/manager.py @@ -2,7 +2,7 @@ import json import logging from typing import Dict, List, Optional, Tuple, Union, Any -from datetime import timedelta +from datetime import timedelta, datetime import os import tornado @@ -17,12 +17,17 @@ from libcloud.storage.providers import get_driver import pyarrow from aiobotocore.session import get_session +import fsspec +import s3fs from .log import get_logger from .base import DrivesConfig import re +# constant used as suffix to deal with directory objects +EMPTY_DIR_SUFFIX = '/.jupyter_drives_fix_dir' + class JupyterDrivesManager(): """ Jupyter-drives manager class. @@ -40,13 +45,17 @@ def __init__(self, config: traitlets.config.Config) -> None: self._config = DrivesConfig(config=config) self._client = httpx.AsyncClient() self._content_managers = {} - self._max_files_listed = 1000 + self._max_files_listed = 1025 + + # instate fsspec file system + self._file_system = fsspec.filesystem(self._config.provider, asynchronous=True) # initiate aiobotocore session if we are dealing with S3 drives if self._config.provider == 's3': - if self._config.access_key_id and self._config.secret_access_key: + if self._config.access_key_id and self._config.secret_access_key: self._s3_clients = {} self._s3_session = get_session() + self._file_system = s3fs.S3FileSystem(anon=False, asynchronous=True, key=self._config.access_key_id, secret=self._config.secret_access_key, token=self._config.session_token) else: raise tornado.web.HTTPError( status_code= httpx.codes.BAD_REQUEST, @@ -216,56 +225,47 @@ async def get_contents(self, drive_name, path): try : data = [] - isDir = False - emptyDir = True # assume we are dealing with an empty directory - - chunk_size = 100 - if self._max_files_listed < chunk_size: - chunk_size = self._max_files_listed - no_batches = int(self._max_files_listed/chunk_size) - - # using Arrow lists as they are recommended for large results - # stream will be an async iterable of RecordBatch - current_batch = 0 - stream = obs.list(self._content_managers[drive_name]["store"], path, chunk_size=chunk_size, return_arrow=True) - async for batch in stream: - current_batch += 1 - # reached last batch that can be shown (partially) - if current_batch == no_batches + 1: - remaining_files = self._max_files_listed - no_batches*chunk_size - - # if content exists we are dealing with a directory - if isDir is False and batch: - isDir = True - emptyDir = False + is_dir = await self._file_system._isdir(drive_name + '/' + path) + + if is_dir == True: + chunk_size = 1024 + if self._max_files_listed < chunk_size: + chunk_size = self._max_files_listed + no_batches = int(self._max_files_listed/chunk_size) + + # using Arrow lists as they are recommended for large results + # stream will be an async iterable of RecordBatch + current_batch = 0 + stream = obs.list(self._content_managers[drive_name]["store"], path, chunk_size=chunk_size, return_arrow=True) + async for batch in stream: + current_batch += 1 + # reached last batch that can be shown (partially) + if current_batch == no_batches + 1: + remaining_files = self._max_files_listed - no_batches*chunk_size + + contents_list = pyarrow.record_batch(batch).to_pylist() + for object in contents_list: + # when listing the last batch (partially), make sure we don't exceed limit + if current_batch == no_batches + 1: + if remaining_files <= 0: + break + remaining_files -= 1 + data.append({ + "path": object["path"], + "last_modified": object["last_modified"].isoformat(), + "size": object["size"], + }) - contents_list = pyarrow.record_batch(batch).to_pylist() - for object in contents_list: - # when listing the last batch (partially), make sure we don't exceed limit + # check if we reached the limit of files that can be listed if current_batch == no_batches + 1: - if remaining_files <= 0: - break - remaining_files -= 1 - data.append({ - "path": object["path"], - "last_modified": object["last_modified"].isoformat(), - "size": object["size"], - }) - - # check if we reached the limit of files that can be listed - if current_batch == no_batches + 1: - break + break - # check if we are dealing with an empty drive - if isDir is False and path != '': + else: content = b"" # retrieve contents of object obj = await obs.get_async(self._content_managers[drive_name]["store"], path) stream = obj.stream(min_chunk_size=5 * 1024 * 1024) # 5MB sized chunks async for buf in stream: - # if content exists we are dealing with a file - if emptyDir is True and buf: - emptyDir = False content += buf # retrieve metadata of object @@ -286,18 +286,6 @@ async def get_contents(self, drive_name, path): "size": metadata["size"] } - # dealing with the case of an empty directory, making sure it is not an empty file - if emptyDir is True: - ext_list = ['.R', '.bmp', '.csv', '.gif', '.html', '.ipynb', '.jl', '.jpeg', '.jpg', '.json', '.jsonl', '.md', '.ndjson', '.pdf', '.png', '.py', '.svg', '.tif', '.tiff', '.tsv', '.txt', '.webp', '.yaml', '.yml'] - object_name = os.path.basename(path) - # if object doesn't contain . or doesn't end in one of the registered extensions - if object_name.find('.') == -1 or ext_list.count(os.path.splitext(object_name)[1]) == 0: - data = [] - - # remove upper logic once directories are fixed - check = self._check_object(drive_name, path) - print(check) - response = { "data": data } @@ -309,27 +297,33 @@ async def get_contents(self, drive_name, path): return response - async def new_file(self, drive_name, path): + async def new_file(self, drive_name, path, type): """Create a new file or directory at the given path. Args: drive_name: name of drive where the new content is created path: path where new content should be created + type: whether we are dealing with a file or a directory """ data = {} try: # eliminate leading and trailing backslashes path = path.strip('/') - # TO DO: switch to mode "created", which is not implemented yet - await obs.put_async(self._content_managers[drive_name]["store"], path, b"", mode = "overwrite") - metadata = await obs.head_async(self._content_managers[drive_name]["store"], path) + object_name = drive_name + '/' + path + # in the case of S3 directories, we need to add a suffix to feign the creation of a directory + if type == 'directory' and self._config.provider == 's3': + object_name = object_name + EMPTY_DIR_SUFFIX + await self._file_system._touch(object_name) + metadata = await self._file_system._info(object_name) + data = { "path": path, "content": "", - "last_modified": metadata["last_modified"].isoformat(), - "size": metadata["size"] + "last_modified": metadata["LastModified"].isoformat(), + "size": metadata["size"], + "type": metadata["type"] } except Exception as e: raise tornado.web.HTTPError( @@ -371,9 +365,8 @@ async def save_file(self, drive_name, path, content, options_format, content_for byte_array = bytearray(slice_) byte_arrays.append(byte_array) - # combine byte arrays and wrap in a BytesIO object - formatted_content = BytesIO(b"".join(byte_arrays)) - formatted_content.seek(0) # reset cursor for any further reading + # combine byte arrays + formatted_content = b"".join(byte_arrays) elif options_format == 'text': formatted_content = content.encode("utf-8") else: @@ -381,13 +374,13 @@ async def save_file(self, drive_name, path, content, options_format, content_for if formatted_content is None or formatted_content == '': formatted_content = b'' - await obs.put_async(self._content_managers[drive_name]["store"], path, formatted_content, mode = "overwrite") - metadata = await obs.head_async(self._content_managers[drive_name]["store"], path) + await self._file_system._pipe(drive_name + '/' + path, formatted_content) + metadata = await self._file_system._info(drive_name + '/' + path) data = { "path": path, "content": content, - "last_modified": metadata["last_modified"].isoformat(), + "last_modified": metadata["LastModified"].isoformat(), "size": metadata["size"] } except Exception as e: @@ -414,12 +407,20 @@ async def rename_file(self, drive_name, path, new_path): # eliminate leading and trailing backslashes path = path.strip('/') - await obs.rename_async(self._content_managers[drive_name]["store"], path, new_path) - metadata = await obs.head_async(self._content_managers[drive_name]["store"], new_path) + object_name = drive_name + '/' + path + new_object_name = drive_name + '/' + new_path + is_dir = await self._file_system._isdir(object_name) + if is_dir == True: + object_name = object_name + EMPTY_DIR_SUFFIX + new_object_name = new_object_name + EMPTY_DIR_SUFFIX + await self._fix_dir(drive_name, path) + + await self._file_system._mv_file(object_name, new_object_name) + metadata = await self._file_system._info(new_object_name) data = { "path": new_path, - "last_modified": metadata["last_modified"].isoformat(), + "last_modified": metadata["LastModified"].isoformat(), "size": metadata["size"] } except Exception as e: @@ -443,7 +444,17 @@ async def delete_file(self, drive_name, path): try: # eliminate leading and trailing backslashes path = path.strip('/') - await obs.delete_async(self._content_managers[drive_name]["store"], path) + is_dir = await self._file_system._isdir(drive_name + '/' + path) + if is_dir == True: + await self._fix_dir(drive_name, path) + await self._file_system._rm(drive_name + '/' + path, recursive = True) + + # checking for remaining directories and deleting them + stream = obs.list(self._content_managers[drive_name]["store"], path, chunk_size=100, return_arrow=True) + async for batch in stream: + contents_list = pyarrow.record_batch(batch).to_pylist() + for object in contents_list: + await self._fix_dir(drive_name, object["path"], delete_only = True) except Exception as e: raise tornado.web.HTTPError( @@ -453,25 +464,6 @@ async def delete_file(self, drive_name, path): return - async def check_file(self, drive_name, path): - """Check if an object already exists within a drive. - - Args: - drive_name: name of drive where object exists - path: path where content is located - """ - try: - # eliminate leading and trailing backslashes - path = path.strip('/') - await obs.head_async(self._content_managers[drive_name]["store"], path) - except Exception: - raise tornado.web.HTTPError( - status_code= httpx.codes.NOT_FOUND, - reason="Object does not already exist within drive.", - ) - - return - async def copy_file(self, drive_name, path, to_path, to_drive): """Save file with new content. @@ -486,29 +478,26 @@ async def copy_file(self, drive_name, path, to_path, to_drive): # eliminate leading and trailing backslashes path = path.strip('/') - # copy object within same drive + object_name = drive_name + '/' + path + # copy objects within same drive if to_drive == drive_name: - await obs.copy_async(self._content_managers[drive_name]["store"], path, to_path) - metadata = await obs.head_async(self._content_managers[drive_name]["store"], to_path) - # copy object to another drive + to_object_name = drive_name + '/' + to_path + # copy objects to another drive else: - content = b'' - try: - # retrieving contents of file - file = await obs.get_async(self._content_managers[drive_name]["store"], path) - stream = file.stream(min_chunk_size=5 * 1024 * 1024) # 5MB sized chunks - async for buf in stream: - content += buf - except: - # dealing with a directory, no contents to retrieve - pass - - await obs.put_async(self._content_managers[to_drive]["store"], to_path, content) - metadata = await obs.head_async(self._content_managers[to_drive]["store"], to_path) + to_object_name = to_drive + '/' + to_path + + is_dir = await self._file_system._isdir(object_name) + if is_dir == True: + object_name = object_name + EMPTY_DIR_SUFFIX + to_object_name = to_object_name + EMPTY_DIR_SUFFIX + await self._fix_dir(drive_name, path) + + await self._file_system._copy(object_name, to_object_name) + metadata = await self._file_system._info(to_object_name) data = { "path": to_path, - "last_modified": metadata["last_modified"].isoformat(), + "last_modified": metadata["LastModified"].isoformat(), "size": metadata["size"] } except Exception as e: @@ -572,29 +561,32 @@ async def _get_drive_location(self, drive_name): return location - def _check_object(self, drive_name, path): - """Helping function to check if we are dealing with an empty file or directory. - + async def _fix_dir(self, drive_name, path, delete_only = False): + """Helping function to fix a directory. It applies to the S3 folders created in the AWS console. + Args: drive_name: name of drive where object exists - path: path to object to check + path: path of object to fix """ - isDir = False - try: - location = self._content_managers[drive_name]["location"] - if location not in self._s3_clients: - self._s3_clients[location] = self._s3_session.client('s3', location) - - listing = self._s3_clients[location].list_objects_v2(Bucket = drive_name, Prefix = path + '/') - if 'Contents' in listing: - isDir = True + try: + check = await self._file_system._exists(drive_name + '/' + path + EMPTY_DIR_SUFFIX) + if check == True: # directory has right format + return + else: # directory was created from console + # delete original object + async with self._s3_session.create_client('s3', aws_secret_access_key=self._config.secret_access_key, aws_access_key_id=self._config.access_key_id, aws_session_token=self._config.session_token) as client: + await client.delete_object(Bucket=drive_name, Key=path+'/') + if delete_only == True: + return + # create new directory + await self._file_system._touch(drive_name + '/' + path + EMPTY_DIR_SUFFIX) except Exception as e: - raise tornado.web.HTTPError( + raise tornado.web.HTTPError( status_code= httpx.codes.BAD_REQUEST, - reason=f"The following error occured when retriving the drive location: {e}", + reason=f"The following error occured when fixing the directory object: {e}", ) - return isDir + return async def _call_provider( self, diff --git a/pyproject.toml b/pyproject.toml index e5d9d31..eb2d91e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,8 @@ dependencies = [ "arro3-core>=0.2.1,<0.3", "pyarrow>=18.0.0,<19.0.0", "aiobotocore>=2.15.2,<2.16.0", + "fsspec>=2024.10.0,<2024.11.0", + "s3fs>=2024.10.0,<2024.11.0", "jupyter_server>=2.14.2,<3", "apache-libcloud>=3.8.0, <4", "entrypoints>=0.4, <0.5", diff --git a/src/contents.ts b/src/contents.ts index c628e1d..d430c36 100644 --- a/src/contents.ts +++ b/src/contents.ts @@ -310,6 +310,7 @@ export class Drive implements Contents.IDrive { data = await createObject(currentDrive.name, { name: name, path: relativePath, + type: options.type, registeredFileTypes: this._registeredFileTypes }); } else { diff --git a/src/requests.ts b/src/requests.ts index 1b616c1..06816e4 100644 --- a/src/requests.ts +++ b/src/requests.ts @@ -218,6 +218,7 @@ export async function createObject( options: { name: string; path: string; + type: string; registeredFileTypes: IRegisteredFileTypes; } ) { @@ -226,7 +227,10 @@ export async function createObject( : options.name; const response = await requestAPI( 'drives/' + driveName + '/' + path, - 'POST' + 'POST', + { + type: options.type + } ); const [fileType, fileMimeType, fileFormat] = getFileType( @@ -263,26 +267,7 @@ export async function deleteObjects( path: string; } ) { - // get list of contents with given prefix (path) - const response = await requestAPI( - 'drives/' + driveName + '/' + options.path, - 'GET' - ); - - // deleting contents of a directory - if (response.data.length !== undefined && response.data.length !== 0) { - await Promise.all( - response.data.map(async (c: any) => { - return Private.deleteSingleObject(driveName, c.path); - }) - ); - } - try { - // always deleting the object (file or main directory) - return Private.deleteSingleObject(driveName, options.path); - } catch (error) { - // deleting failed if directory didn't exist and was only part of a path - } + await requestAPI('drives/' + driveName + '/' + options.path, 'DELETE'); } /** @@ -513,20 +498,6 @@ export const countObjectNameAppearances = async ( }; namespace Private { - /** - * Helping function for deleting files inside - * a directory, in the case of deleting the directory. - * - * @param driveName - * @param objectPath complete path of object to delete - */ - export async function deleteSingleObject( - driveName: string, - objectPath: string - ) { - await requestAPI('drives/' + driveName + '/' + objectPath, 'DELETE'); - } - /** * Helping function for renaming files inside * a directory, in the case of deleting the directory.