Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tom_common/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ def __get__(self, instance, owner):
if not isinstance(cipher, Fernet):
raise AttributeError(
f"A Fernet cipher must be set on the '{owner.__name__}' instance "
f"as '_cipher' to access property '{self.property_name}'."
f"as '_cipher' to access property '{self.property_name}'. "
f"Please use session_utils.get_encrypted_field() instead of direct access."
)

encrypted_value = getattr(instance, self.db_field_name)
Expand Down
133 changes: 132 additions & 1 deletion tom_observations/facility.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from abc import ABC, abstractmethod
import copy
from enum import Enum
import logging
import requests

Expand All @@ -8,13 +9,32 @@
from django import forms
from django.conf import settings
from django.contrib.auth.models import Group
from django.core.exceptions import ImproperlyConfigured
from django.core.files.base import ContentFile
from django.utils.module_loading import import_string

from tom_targets.models import Target

logger = logging.getLogger(__name__)


class CredentialStatus(Enum):
"""
Enum representing the status of facility credentials.

This enum is used to track the state of credentials throughout the facility lifecycle,
providing clear information about whether credentials are available, where they came from,
and any validation issues.
"""
NOT_INITIALIZED = "not_initialized" # set_user() hasn't been called yet
NO_PROFILE = "no_profile" # User has no Profile (raises ImproperlyConfigured)
PROFILE_EMPTY = "profile_empty" # Profile exists but credentials empty
USING_DEFAULTS = "using_defaults" # Using settings.FACILITIES defaults
USING_USER_CREDS = "using_user_creds" # Using user's Profile credentials
VALIDATION_FAILED_AUTH = "validation_failed_auth" # Credentials failed (401/403)
VALIDATION_FAILED_NETWORK = "validation_failed_network" # Network/server issue


DEFAULT_FACILITY_CLASSES = [
'tom_observations.facilities.lco.LCOFacility',
'tom_observations.facilities.gemini.GEMFacility',
Expand Down Expand Up @@ -66,8 +86,17 @@ class BaseObservationForm(forms.Form):
observation_type = forms.CharField(required=False, max_length=50, widget=forms.HiddenInput())

def __init__(self, *args, **kwargs):
# DEBUG: Log what parameters are being passed
logger.debug(f'BaseObservationForm.__init__ kwargs: {kwargs}')

# Accept facility parameter but don't require it (for backward compatibility)
facility = kwargs.pop('facility', None)
self.validation_message = 'This observation is valid.'
super().__init__(*args, **kwargs)

# Store facility reference if provided
if facility is not None:
self.facility = facility
self.helper = FormHelper()
if settings.TARGET_PERMISSIONS_ONLY:
self.common_layout = Layout('facility', 'target_id', 'observation_type')
Expand Down Expand Up @@ -194,10 +223,92 @@ class BaseObservationFacility(ABC):

def __init__(self):
self.user = None
self.credential_status = CredentialStatus.NOT_INITIALIZED

def set_user(self, user):
self.user = user


def _is_credential_empty(self, credential):
"""
Check if a credential is empty (None, empty string, or whitespace only).

Args:
credential: The credential value to check

Returns:
bool: True if credential is empty, False otherwise
"""
return credential is None or (isinstance(credential, str) and not credential.strip())

def _get_setting_credentials(self, facility_name, credential_keys):
"""
Safely get credentials from settings.FACILITIES.

Args:
facility_name (str): Name of the facility in settings.FACILITIES
credential_keys (list): List of credential key names to retrieve

Returns:
dict: Dictionary mapping credential keys to their values

Raises:
ImproperlyConfigured: If facility or required keys are missing from settings
"""
if not hasattr(settings, 'FACILITIES') or facility_name not in settings.FACILITIES:
raise ImproperlyConfigured(
f"No configuration found for '{facility_name}' in settings.FACILITIES. "
f"Please add default credentials to settings.FACILITIES['{facility_name}']."
)

facility_settings = settings.FACILITIES[facility_name]
credentials = {}

for key in credential_keys:
if key not in facility_settings:
raise ImproperlyConfigured(
f"Required credential key '{key}' not found in "
f"settings.FACILITIES['{facility_name}']. "
f"Please add '{key}' to the facility configuration."
)
credentials[key] = facility_settings[key]

return credentials

def _raise_no_profile_error(self, user, facility_name):
"""
Raise ImproperlyConfigured for missing user profile.

Args:
user: Django User instance
facility_name (str): Name of the facility

Raises:
ImproperlyConfigured: Always raises with informative message
"""
raise ImproperlyConfigured(
f"User '{user.username}' has no {facility_name}Profile configured. "
f"Please create a {facility_name}Profile for this user in the admin interface."
)

def _raise_no_defaults_error(self, user, facility_name):
"""
Raise ImproperlyConfigured when default credentials are needed but missing.

Args:
user: Django User instance
facility_name (str): Name of the facility

Raises:
ImproperlyConfigured: Always raises with informative message
"""
raise ImproperlyConfigured(
f"User '{user.username}' has no credentials configured and no default credentials "
f"found in settings.FACILITIES['{facility_name}']. "
f"Please configure either user credentials in {facility_name}Profile or "
f"default credentials in settings."
)

def all_data_products(self, observation_record):
from tom_dataproducts.models import DataProduct
products = {'saved': [], 'unsaved': []}
Expand Down Expand Up @@ -226,7 +337,27 @@ def all_data_products(self, observation_record):
def get_form(self, observation_type):
"""
This method takes in an observation type and returns the form type that matches it.
"""

Note: This method returns form classes, not instances, to support composite form creation
in ObservationCreateView. Use create_form_instance() for direct form instantiation.
"""

def create_form_instance(self, observation_type, **kwargs):
"""
Create a form instance with facility context injected.

The ObservationCreateView handles setting the user context on the facility instance
via set_user() in its dispatch() method. Forms receive the facility instance and
can query it for user-specific data (credentials, API clients, etc.) rather than
handling business logic themselves.

:param observation_type: The type of observation form to create
:param kwargs: Additional keyword arguments for form instantiation
:return: Form instance configured for the observation type
"""
form_class = self.get_form(observation_type)
kwargs['facility'] = self
return form_class(**kwargs)

def get_form_classes_for_display(self, **kwargs):
"""
Expand Down
164 changes: 164 additions & 0 deletions tom_observations/tests/test_credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""
Tests for credential management functionality in BaseObservationFacility.

This test module covers:
- CredentialStatus enum
- Credential helper methods in BaseObservationFacility
- Credential validation and fallback logic
"""
from django.test import TestCase, override_settings
from django.contrib.auth.models import User
from django.core.exceptions import ImproperlyConfigured

from tom_observations.facility import BaseObservationFacility, CredentialStatus


class TestFacility(BaseObservationFacility):
"""Concrete test facility for testing credential management."""
name = 'TestFacility'

def get_form(self, observation_type):
return None

def submit_observation(self, observation_payload):
return [1]

def validate_observation(self, observation_payload):
return True

def get_observation_url(self, observation_id):
return ''

def get_terminal_observing_states(self):
return ['COMPLETED', 'FAILED']

def get_observing_sites(self):
return {}


class CredentialStatusEnumTests(TestCase):
"""Test the CredentialStatus enum values and behavior."""

def test_enum_values_exist(self):
"""Test that all expected enum values are defined."""
self.assertEqual(CredentialStatus.NOT_INITIALIZED.value, "not_initialized")
self.assertEqual(CredentialStatus.NO_PROFILE.value, "no_profile")
self.assertEqual(CredentialStatus.PROFILE_EMPTY.value, "profile_empty")
self.assertEqual(CredentialStatus.USING_DEFAULTS.value, "using_defaults")
self.assertEqual(CredentialStatus.USING_USER_CREDS.value, "using_user_creds")
self.assertEqual(CredentialStatus.VALIDATION_FAILED_AUTH.value, "validation_failed_auth")
self.assertEqual(CredentialStatus.VALIDATION_FAILED_NETWORK.value, "validation_failed_network")

def test_enum_membership(self):
"""Test that we can check enum membership."""
self.assertIn(CredentialStatus.NOT_INITIALIZED, CredentialStatus)
self.assertIn(CredentialStatus.USING_USER_CREDS, CredentialStatus)

def test_enum_comparison(self):
"""Test that enum values can be compared."""
status1 = CredentialStatus.USING_USER_CREDS
status2 = CredentialStatus.USING_USER_CREDS
status3 = CredentialStatus.USING_DEFAULTS

self.assertEqual(status1, status2)
self.assertNotEqual(status1, status3)

def test_enum_string_representation(self):
"""Test that enum has useful string representation."""
status = CredentialStatus.USING_USER_CREDS
self.assertIn("using_user_creds", str(status.value))


class BaseObservationFacilityCredentialTests(TestCase):
"""Test credential-related methods in BaseObservationFacility."""

def setUp(self):
"""Set up test fixtures."""
self.facility = TestFacility()
self.user = User.objects.create_user(username='testuser', password='testpass')

def test_initial_credential_status(self):
"""Test that facility starts with NOT_INITIALIZED status."""
self.assertEqual(self.facility.credential_status, CredentialStatus.NOT_INITIALIZED)

def test_is_credential_empty_with_none(self):
"""Test that None is recognized as empty credential."""
self.assertTrue(self.facility._is_credential_empty(None))

def test_is_credential_empty_with_empty_string(self):
"""Test that empty string is recognized as empty credential."""
self.assertTrue(self.facility._is_credential_empty(''))
self.assertTrue(self.facility._is_credential_empty(""))

def test_is_credential_empty_with_whitespace(self):
"""Test that whitespace-only strings are recognized as empty."""
self.assertTrue(self.facility._is_credential_empty(' '))
self.assertTrue(self.facility._is_credential_empty('\t\n'))

def test_is_credential_empty_with_valid_credential(self):
"""Test that valid credentials are not recognized as empty."""
self.assertFalse(self.facility._is_credential_empty('valid_username'))
self.assertFalse(self.facility._is_credential_empty('p@ssw0rd'))

@override_settings(FACILITIES={
'TEST_FACILITY': {
'username': 'default_user',
'password': 'default_pass'
}
})
def test_get_setting_credentials_success(self):
"""Test successfully getting credentials from settings."""
creds = self.facility._get_setting_credentials(
'TEST_FACILITY',
['username', 'password']
)

self.assertEqual(creds['username'], 'default_user')
self.assertEqual(creds['password'], 'default_pass')

@override_settings(FACILITIES={})
def test_get_setting_credentials_no_facility(self):
"""Test that missing facility in settings raises ImproperlyConfigured."""
with self.assertRaises(ImproperlyConfigured) as cm:
self.facility._get_setting_credentials('MISSING_FACILITY', ['username'])

self.assertIn('MISSING_FACILITY', str(cm.exception))
self.assertIn('settings.FACILITIES', str(cm.exception))

@override_settings(FACILITIES={
'TEST_FACILITY': {
'username': 'default_user'
# 'password' is missing
}
})
def test_get_setting_credentials_missing_key(self):
"""Test that missing credential key raises ImproperlyConfigured."""
with self.assertRaises(ImproperlyConfigured) as cm:
self.facility._get_setting_credentials(
'TEST_FACILITY',
['username', 'password']
)

self.assertIn('password', str(cm.exception))
self.assertIn('TEST_FACILITY', str(cm.exception))

def test_raise_no_profile_error(self):
"""Test that _raise_no_profile_error raises with proper message."""
with self.assertRaises(ImproperlyConfigured) as cm:
self.facility._raise_no_profile_error(self.user, 'TestFacility')

exception_message = str(cm.exception)
self.assertIn('testuser', exception_message)
self.assertIn('TestFacility', exception_message)
self.assertIn('Profile', exception_message)

def test_raise_no_defaults_error(self):
"""Test that _raise_no_defaults_error raises with proper message."""
with self.assertRaises(ImproperlyConfigured) as cm:
self.facility._raise_no_defaults_error(self.user, 'TestFacility')

exception_message = str(cm.exception)
self.assertIn('testuser', exception_message)
self.assertIn('TestFacility', exception_message)
self.assertIn('default credentials', exception_message)
self.assertIn('settings.FACILITIES', exception_message)
Loading
Loading