Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions jupyter_drives/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ def set_default_api_base_url(self):

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._load_credentials()

def _load_credentials(self):
# check if credentials were already set in jupyter_notebook_config.py
if self.access_key_id is not None and self.secret_access_key is not None:
self.credentials_already_set = self.access_key_id is not None and self.secret_access_key is not None
self.load_credentials()

def load_credentials(self):
if self.credentials_already_set:
return

# automatically extract credentials for S3 drives
Expand Down
77 changes: 55 additions & 22 deletions jupyter_drives/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@

import re

from tornado.ioloop import PeriodicCallback

# constant used as suffix to deal with directory objects
EMPTY_DIR_SUFFIX = '/.jupyter_drives_fix_dir'

# 15 minutes
CREDENTIALS_REFRESH = 15 * 60 * 1000

class JupyterDrivesManager():
"""
Jupyter-drives manager class.
Expand All @@ -46,21 +51,12 @@ def __init__(self, config: traitlets.config.Config) -> None:
self._client = httpx.AsyncClient()
self._content_managers = {}
self._max_files_listed = 1025
self._drives = None

# instate fsspec file system
self._file_system = fsspec.filesystem(self._config.provider, asynchronous=True)

# initiate aiobotocore session if we are dealing with S3 drives
if self._config.provider == 's3':
if self._config.access_key_id and self._config.secret_access_key:
self._s3_clients = {}
self._s3_session = get_session()
self._file_system = s3fs.S3FileSystem(anon=False, asynchronous=True, key=self._config.access_key_id, secret=self._config.secret_access_key, token=self._config.session_token)
else:
raise tornado.web.HTTPError(
status_code= httpx.codes.BAD_REQUEST,
reason="No credentials specified. Please set them in your user jupyter_server_config file.",
)
self._initialize_credentials_refresh()

@property
def base_api_url(self) -> str:
Expand All @@ -81,6 +77,45 @@ def per_page_argument(self) -> Optional[Tuple[str, int]]:
"""
return ("per_page", 100)

def _initialize_credentials_refresh(self):
self._drives_refresh_callback()
if not self._config.credentials_already_set:
self._drives_refresh_timer = PeriodicCallback(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is JupyterDrivesManager initialized multiple times ? This will need some deeper refactoring to make sure we dont have multiple refresh callbacks are not invoked for each instance of JupyterDrivesManager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JupyterDrivesManager gets initialized only once

self._drives_refresh_callback, CREDENTIALS_REFRESH
)
self._drives_refresh_timer.start()

def _drives_refresh_callback(self):
self._config.load_credentials()
self._initialize_s3_file_system()
self._initialize_drives()

def _initialize_s3_file_system(self):
# initiate aiobotocore session if we are dealing with S3 drives
if self._config.provider == 's3':
if self._config.access_key_id and self._config.secret_access_key:
self._s3_session = get_session()
self._file_system = s3fs.S3FileSystem(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to s3fs page - S3FS builds on [aiobotocore](https://aiobotocore.readthedocs.io/en/latest/) to provide a convenient Python filesystem interface for S3.
ideally this should pick up credentials from SageMaker Studio instance automatically. have you tested it ?

According to this github issue - nsidc/earthaccess#765 it may not work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from my understanding ideally it should work, however with the current implementation I see that we are explicitly providing the access key and token, so from my understanding S3FS will respect these values and won't refresh unless we do it explicitly again.

anon=False,
asynchronous=True,
key=self._config.access_key_id,
secret=self._config.secret_access_key,
token=self._config.session_token,
)
else:
raise tornado.web.HTTPError(
status_code=httpx.codes.BAD_REQUEST,
reason="No credentials specified. Please set them in your user jupyter_server_config file.",
)

def _initialize_drives(self):
if self._config.provider == "s3":
S3Drive = get_driver(Provider.S3)
self._drives = [S3Drive(self._config.access_key_id, self._config.secret_access_key, True, None, None, None, self._config.session_token)]
elif self._config.provider == 'gcs':
GCSDrive = get_driver(Provider.GOOGLE_STORAGE)
self._drives = [GCSDrive(self._config.access_key_id, self._config.secret_access_key)] # verfiy credentials needed

def set_listing_limit(self, new_limit):
"""Set new limit for listing.

Expand All @@ -105,23 +140,21 @@ async def list_drives(self):
"""
data = []
if self._config.access_key_id and self._config.secret_access_key:
if self._config.provider == "s3":
S3Drive = get_driver(Provider.S3)
drives = [S3Drive(self._config.access_key_id, self._config.secret_access_key, True, None, None, None, self._config.session_token)]

elif self._config.provider == 'gcs':
GCSDrive = get_driver(Provider.GOOGLE_STORAGE)
drives = [GCSDrive(self._config.access_key_id, self._config.secret_access_key)] # verfiy credentials needed

else:
if self._drives is None:
raise tornado.web.HTTPError(
status_code= httpx.codes.NOT_IMPLEMENTED,
reason="Listing drives not supported for given provider.",
)

results = []
for drive in drives:
results += drive.list_containers()
for drive in self._drives:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are changing the behavior of list drives function.
old: create a new drive for each call S3Drive
new: used the same cached S3Drive that is periodically refreshed.

Is this ok ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

verified that this is okay and also I believe it is meant to be stored for subsequent calls, so it should be an improvement. Only callout would be do the connections get correctly disposed when we refresh to get a new instance and discard the old one.

try:
results += drive.list_containers()
except Exception as e:
raise tornado.web.HTTPError(
status_code=httpx.codes.BAD_REQUEST,
reason=f"The following error occured when listing drives: {e}",
)

for result in results:
data.append(
Expand Down
Loading