Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion jupyter_drives/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ async def get(self, drive: str = "", path: str = ""):

@tornado.web.authenticated
async def post(self, drive: str = "", path: str = ""):
result = await self._manager.new_file(drive, path)
body = self.get_json_body()
result = await self._manager.new_file(drive, path, **body)
self.finish(result)

@tornado.web.authenticated
Expand Down
197 changes: 177 additions & 20 deletions jupyter_drives/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
from typing import Dict, List, Optional, Tuple, Union, Any
from datetime import datetime

import os
import tornado
Expand Down Expand Up @@ -262,28 +263,33 @@ async def get_contents(self, drive_name, path):

return response

async def new_file(self, drive_name, path):
async def new_file(self, drive_name, path, is_dir):
"""Create a new file or directory at the given path.

Args:
drive_name: name of drive where the new content is created
path: path where new content should be created
is_dir: boolean showing whether we are dealing with a directory or a file
"""
data = {}
try:
# eliminate leading and trailing backslashes
path = path.strip('/')

# TO DO: switch to mode "created", which is not implemented yet
await obs.put_async(self._content_managers[drive_name]["store"], path, b"", mode = "overwrite")
metadata = await obs.head_async(self._content_managers[drive_name]["store"], path)

data = {
"path": path,
"content": "",
"last_modified": metadata["last_modified"].isoformat(),
"size": metadata["size"]
}
if is_dir == False or self._config.provider != 's3':
# TO DO: switch to mode "created", which is not implemented yet
await obs.put_async(self._content_managers[drive_name]["store"], path, b"", mode = "overwrite")
metadata = await obs.head_async(self._content_managers[drive_name]["store"], path)
data = {
"path": path,
"content": "",
"last_modified": metadata["last_modified"].isoformat(),
"size": metadata["size"]
}
elif is_dir == True and self._config.provider == 's3':
# create an empty directory through boto, as obstore does not allow it
data = self._create_empty_directory(drive_name, path)

except Exception as e:
raise tornado.web.HTTPError(
status_code= httpx.codes.BAD_REQUEST,
Expand Down Expand Up @@ -361,18 +367,39 @@ async def rename_file(self, drive_name, path, new_path):
new_path: path of new file name
"""
data = {}
finished = False
try:
# eliminate leading and trailing backslashes
path = path.strip('/')

await obs.rename_async(self._content_managers[drive_name]["store"], path, new_path)
metadata = await obs.head_async(self._content_managers[drive_name]["store"], new_path)

data = {
"path": new_path,
"last_modified": metadata["last_modified"].isoformat(),
"size": metadata["size"]
}
# get list of contents with given prefix (path)
stream = obs.list(self._content_managers[drive_name]["store"], path, chunk_size=100, return_arrow=True)
async for batch in stream:
contents_list = pyarrow.record_batch(batch).to_pylist()
# rename each object within directory
for object in contents_list:
finished = True
remaining_path = object["path"][len(path)+1:]
old_path = path if remaining_path == '' else os.path.join(path, remaining_path)
formatted_new_path = new_path if remaining_path == '' else os.path.join(new_path, remaining_path)
try:
await obs.rename_async(self._content_managers[drive_name]["store"], old_path, formatted_new_path)
except Exception as e:
# we are dealing with a directory rename in S3 and obstore doesn't find the object
if self._config.provider == 's3':
self._rename_directory(drive_name, old_path, formatted_new_path)
else:
raise tornado.web.HTTPError(
status_code= httpx.codes.BAD_REQUEST,
reason=f"The following error occured when renaming the object: {e}",
)

# no extra S3 directories to rename
if data == {} and finished == False:
# rename single file from root(won't be listed above)
await obs.rename_async(self._content_managers[drive_name]["store"], path, new_path)

data = await self._get_metadata(drive_name, new_path)
except Exception as e:
raise tornado.web.HTTPError(
status_code= httpx.codes.BAD_REQUEST,
Expand All @@ -394,8 +421,22 @@ async def delete_file(self, drive_name, path):
try:
# eliminate leading and trailing backslashes
path = path.strip('/')
await obs.delete_async(self._content_managers[drive_name]["store"], path)

# get list of contents with given prefix (path)
stream = obs.list(self._content_managers[drive_name]["store"], path, chunk_size=100, return_arrow=True)
async for batch in stream:
contents_list = pyarrow.record_batch(batch).to_pylist()
# delete each object within directory
for object in contents_list:
await obs.delete_async(self._content_managers[drive_name]["store"], object["path"])

# delete file
await obs.delete_async(self._content_managers[drive_name]["store"], path)

# when dealing with S3 directory, use helping function to delete remaining directories
if self._config.provider == 's3':
await self._delete_directories(drive_name, path)

except Exception as e:
raise tornado.web.HTTPError(
status_code= httpx.codes.BAD_REQUEST,
Expand Down Expand Up @@ -500,6 +541,122 @@ def _check_object(self, drive_name, path):

return isDir

def _create_empty_directory(self, drive_name, path):
"""Helping function to create an empty directory, when dealing with S3 buckets.

Args:
drive_name: name of drive where to create object
path: path of new object
"""
data = {}
try:
location = self._content_managers[drive_name]["location"]
if location not in self._s3_clients:
self._s3_clients[location] = self._s3_session.client('s3', location)

self._s3_clients[location].put_object(Bucket = drive_name, Key = path + '/')
metadata = self._s3_clients[location].head_object(Bucket = drive_name, Key = path + '/')

data = {
"path": path,
"content": "",
"last_modified": metadata["LastModified"].isoformat(),
"size": 0
}
except Exception as e:
raise tornado.web.HTTPError(
status_code= httpx.codes.BAD_REQUEST,
reason=f"The following error occured when creating the directory: {e}",
)

return data

async def _delete_directories(self, drive_name, path):
"""Helping function to delete directories, when dealing with S3 buckets.

Args:
drive_name: name of drive where to create object
path: path of new object
"""
try:
location = self._content_managers[drive_name]["location"]
if location not in self._s3_clients:
self._s3_clients[location] = self._s3_session.client('s3', location)

# delete remaining sub-directories
stream = obs.list(self._content_managers[drive_name]["store"], path, chunk_size=100, return_arrow=True)
async for batch in stream:
contents_list = pyarrow.record_batch(batch).to_pylist()
for object in contents_list:
self._s3_clients[location].delete_object(Bucket=drive_name, Key=object["path"]+'/')

# delete main directory
self._s3_clients[location].delete_object(Bucket=drive_name, Key=path+'/')

except Exception as e:
raise tornado.web.HTTPError(
status_code= httpx.codes.BAD_REQUEST,
reason=f"The following error occured when deleting the directory: {e}",
)

return

def _rename_directory(self, drive_name, path, new_path):
"""Helping function to rename directories, when dealing with S3 buckets.

Args:
drive_name: name of drive where to create object
path: path of object
new_path: new path of object
"""
try:
location = self._content_managers[drive_name]["location"]
if location not in self._s3_clients:
self._s3_clients[location] = self._s3_session.client('s3', location)

self._s3_clients[location].copy_object(Bucket=drive_name, CopySource=os.path.join(drive_name, path)+'/', Key=new_path + '/')
self._s3_clients[location].delete_object(Bucket=drive_name, Key = path + '/')
except Exception:
# object is not found if we are not dealing with directory
pass

return

async def _get_metadata(self, drive_name, path):
"""Helping function to get metadata of object.

Args:
drive_name: name of drive where to create object
path: path of object
"""
try:
metadata = await obs.head_async(self._content_managers[drive_name]["store"], path)
data = {
"path": path,
"last_modified": metadata["last_modified"].isoformat(),
"size": metadata["size"]
}
except Exception:
try:
location = self._content_managers[drive_name]["location"]
if location not in self._s3_clients:
self._s3_clients[location] = self._s3_session.client('s3', location)

metadata = self._s3_clients[location].head_object(Bucket=drive_name, Key=path + '/')
data = {
"path": path,
"last_modified": metadata["last_modified"].isoformat(),
"size": metadata["size"]
}
except Exception:
data = {
"path": path,
"last_modified": datetime.now().isoformat(),
"size": 0
}

return data

async def _call_provider(
self,
url: str,
Expand Down
1 change: 1 addition & 0 deletions src/contents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ export class Drive implements Contents.IDrive {
data = await createObject(currentDrive.name, {
name: name,
path: relativePath,
isDir: options.type === 'directory' ? true : false,
registeredFileTypes: this._registeredFileTypes
});
} else {
Expand Down
Loading
Loading