Skip to content
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions enterprise_gateway/services/sessions/kernelsessionmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import os
import threading

import requests
from jupyter_core.paths import jupyter_data_dir
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
from traitlets import Bool, Unicode, default
from traitlets.config.configurable import LoggingConfigurable

Expand Down Expand Up @@ -385,3 +387,143 @@ def _get_sessions_loc(self):
if not os.path.exists(path):
os.makedirs(path, 0o755)
return path


class WebhookKernelSessionManager(KernelSessionManager):
"""
Performs kernel session persistence operations against URL provided (EG_WEBHOOK_URL). The URL must have 4 endpoints
associated with it. 1 delete endpoint that takes a list of kernel ids in the body, 1 post endpoint that takes kernels id as a
url param and the kernel session as the body, 1 get endpoint that returns all kernel sessions, and 1 get endpoint that returns
a specific kernel session based on kernel id as url param.
"""

# Webhook URL
webhook_url_env = "EG_WEBHOOK_URL"
webhook_url = Unicode(
config=True,
help="""URL endpoint for webhook kernel session manager""",
)

@default("webhook_url")
def webhook_url_default(self):
return os.getenv(self.webhook_url_env, "")

# Webhook Username
webhook_username_env = "EG_WEBHOOK_USERNAME"
webhook_username = Unicode(
config=True,
help="""Username for webhook kernel session manager API auth""",
)

@default("webhook_username")
def webhook_username_default(self):
return os.getenv(self.webhook_username_env, "")

# Webhook Password
webhook_password_env = "EG_WEBHOOK_PASSWORD"
webhook_password = Unicode(
config=True,
help="""Password for webhook kernel session manager API auth""",
)

@default("webhook_password")
def webhook_password_default(self):
return os.getenv(self.webhook_password_env, "")

# Auth Type
auth_type_env = "EG_AUTH_TYPE"
auth_type = Unicode(
config=True,
help="""ROPC for webhook kernel session manager API auth Basic, Digest or None""",
)

@default("auth_type")
def auth_type_default(self):
return os.getenv(self.auth_type_env, "")

def __init__(self, kernel_manager, **kwargs):
super().__init__(kernel_manager, **kwargs)
if self.enable_persistence:
self.log.info("Webhook kernel session persistence activated")
self.auth = ""
if self.auth_type:
if self.webhook_username and self.webhook_password:
if self.auth_type.lower() == "basic":
self.auth = HTTPBasicAuth(self.webhook_username, self.webhook_password)
elif self.auth_type.lower() == "digest":
self.auth = HTTPDigestAuth(self.webhook_username, self.webhook_password)
elif self.auth_type.lower() == "none":
self.auth = ""
else:
self.log.error("No such option for auth_type/EG_AUTH_TYPE")
else:
self.log.error("Username and/or password aren't set")

def delete_sessions(self, kernel_ids):
"""
Deletes kernel sessions from database

:param list of strings kernel_ids: A list of kernel ids
"""
if self.enable_persistence:
response = requests.delete(self.webhook_url, auth=self.auth, json=kernel_ids)
self.log.debug(f"Webhook kernel session deleting: {kernel_ids}")
if response.status_code != 204:
self.log.error(response.raise_for_status())

def save_session(self, kernel_id):
"""
Saves kernel session to database

:param string kernel_id: A kernel id
"""
if self.enable_persistence:
if kernel_id is not None:
temp_session = dict()
temp_session[kernel_id] = self._sessions[kernel_id]
body = KernelSessionManager.pre_save_transformation(temp_session)
response = requests.post(
f"{self.webhook_url}/{kernel_id}", auth=self.auth, json=body
)
self.log.debug(f"Webhook kernel session saving: {kernel_id}")
if response.status_code != 204:
self.log.error(response.raise_for_status())

def load_sessions(self):
"""
Loads kernel sessions from database
"""
if self.enable_persistence:
response = requests.get(self.webhook_url, auth=self.auth)
if response.status_code == 200:
kernel_sessions = response.content
for kernel_session in kernel_sessions:
self._load_session_from_file(kernel_session)
else:
self.log.error(response.raise_for_status())

def load_session(self, kernel_id):
"""
Loads a kernel session from database

:param string kernel_id: A kernel id
"""
if self.enable_persistence:
if kernel_id is not None:
response = requests.get(f"{self.webhook_url}/{kernel_id}", auth=self.auth)
if response.status_code == 200:
kernel_session = response.content
self._load_session_from_file(kernel_session)
else:
self.log.error(response.raise_for_status())

def _load_session_from_file(self, kernel):
"""
Loads kernel session to current session

:param dictionary kernel: Kernel session information
"""
self.log.debug("Loading saved session(s)")
self._sessions.update(
KernelSessionManager.post_load_transformation(json.loads(kernel)["kernel"])
)