From 6e91dbb2c7a3a2ec2800a1ec01fdc85b27ee0bab Mon Sep 17 00:00:00 2001 From: isofinly Date: Sun, 5 Oct 2025 23:01:03 +0300 Subject: [PATCH 1/2] feat: Implement window constraints with new command-line options. --- cats/__init__.py | 144 ++++++++- cats/forecast.py | 116 ++++++- tests/test_window_constraints.py | 540 +++++++++++++++++++++++++++++++ 3 files changed, 788 insertions(+), 12 deletions(-) create mode 100644 tests/test_window_constraints.py diff --git a/cats/__init__.py b/cats/__init__.py index 71b5214..8c2fdfc 100644 --- a/cats/__init__.py +++ b/cats/__init__.py @@ -15,7 +15,10 @@ from .configure import get_runtime_config from .constants import CATS_ASCII_BANNER_COLOUR, CATS_ASCII_BANNER_NO_COLOUR from .plotting import plotplan -from .forecast import CarbonIntensityAverageEstimate, WindowedForecast +from .forecast import ( + CarbonIntensityAverageEstimate, + ConstrainedWindowedForecast, +) __version__ = "1.1.0" @@ -28,6 +31,78 @@ def indent_lines(lines, spaces): return "\n".join(" " * spaces + line for line in lines.split("\n")) +def parse_time_constraint( + time_str: str, timezone_info=None +) -> Optional[datetime.datetime]: + """ + Parse a time constraint string into a datetime object. + + :param time_str: Time string in various formats (HH:MM, YYYY-MM-DDTHH:MM, etc.) + :param timezone_info: Default timezone if not specified in the string + :return: Parsed datetime object + :raises ValueError: If the time string cannot be parsed + """ + if not time_str: + return None + + # If timezone_info is not provided, use system local timezone + if timezone_info is None: + timezone_info = datetime.datetime.now().astimezone().tzinfo + + # Try to parse as full ISO format first + try: + if "T" in time_str: + # Full datetime string + if time_str.endswith("Z"): + time_str = time_str[:-1] + "+00:00" + elif time_str[-6] not in ["+", "-"] and time_str[-3] != ":": + # No timezone info, add default + dt = datetime.datetime.fromisoformat(time_str) + return dt.replace(tzinfo=timezone_info) + return datetime.datetime.fromisoformat(time_str) + else: + # Time only (HH:MM or HH:MM:SS) + time_part = datetime.time.fromisoformat(time_str) + today = datetime.datetime.now().date() + return datetime.datetime.combine(today, time_part, tzinfo=timezone_info) + except ValueError as e: + raise ValueError(f"Unable to parse time constraint '{time_str}': {e}") + + +def validate_window_constraints( + start_window: Optional[str], end_window: Optional[str], window_minutes: int +) -> tuple[Optional[datetime.datetime], Optional[datetime.datetime], int]: + """ + Validate and parse window constraints. + + :param start_window: Start window constraint string + :param end_window: End window constraint string + :param window_minutes: Maximum window duration in minutes + :return: Tuple of (start_datetime, end_datetime, validated_window_minutes) + :raises ValueError: If constraints are invalid + """ + # Validate window minutes + if window_minutes < 1 or window_minutes > 2880: + raise ValueError("Window must be between 1 and 2880 minutes (48 hours)") + + start_datetime = None + end_datetime = None + + # Parse time constraints if provided + if start_window and start_window.strip(): + start_datetime = parse_time_constraint(start_window) + + if end_window and end_window.strip(): + end_datetime = parse_time_constraint(end_window) + + # Validate that start is before end if both are provided + if start_datetime and end_datetime: + if start_datetime >= end_datetime: + raise ValueError("Start window must be before end window") + + return start_datetime, end_datetime, window_minutes + + def parse_arguments(): """ Parse command line arguments @@ -201,6 +276,27 @@ def positive_integer(string): "\"pip install 'climate-aware-task-scheduler[plots]'\"", action="store_true", ) + parser.add_argument( + "--window", + type=positive_integer, + help="Maximum time window to search for optimal start time, in minutes. " + "Must be between 1 and 2880 (48 hours). Default: 2880 minutes (48 hours).", + default=2880, + ) + parser.add_argument( + "--start-window", + type=str, + help="Earliest time the job is allowed to start, in ISO format (e.g., '2024-01-15T09:00'). " + "If only time is provided (e.g., '09:00'), today's date is assumed. " + "Timezone info is optional and defaults to system timezone.", + ) + parser.add_argument( + "--end-window", + type=str, + help="Latest time the job is allowed to start, in ISO format (e.g., '2024-01-15T17:00'). " + "If only time is provided (e.g., '17:00'), today's date is assumed. " + "Timezone info is optional and defaults to system timezone.", + ) return parser @@ -324,9 +420,6 @@ def main(arguments=None) -> int: args = parser.parse_args(arguments) colour_output = args.no_colour or args.no_color - # Print CATS ASCII art banner, before any output from printing or logging - print_banner(colour_output) - if args.command and not args.scheduler: print( "cats: To run a command or sbatch script with the -c or --command option, you must\n" @@ -335,11 +428,27 @@ def main(arguments=None) -> int: return 1 CI_API_interface, location, duration, jobinfo, PUE = get_runtime_config(args) - if duration > CI_API_interface.max_duration: - print( - f"""API allows a maximum job duration of {CI_API_interface.max_duration} minutes. -This is usually due to forecast limitations.""" + + # Validate and parse window constraints + try: + start_constraint, end_constraint, max_window = validate_window_constraints( + args.start_window, args.end_window, args.window ) + except ValueError as e: + print(f"Error in window constraints: {e}") + return 1 + # Check against both API limit and user-specified window + effective_max_duration = min(CI_API_interface.max_duration, max_window) + if duration > effective_max_duration: + if max_window < CI_API_interface.max_duration: + print( + f"""Job duration ({duration} minutes) exceeds specified window ({max_window} minutes).""" + ) + else: + print( + f"""API allows a maximum job duration of {CI_API_interface.max_duration} minutes. +This is usually due to forecast limitations.""" + ) return 1 ######################## @@ -362,8 +471,21 @@ def main(arguments=None) -> int: # Find best possible average carbon intensity, along # with corresponding job start time. - wf = WindowedForecast( - CI_forecast, duration, start=datetime.datetime.now().astimezone() + search_start = datetime.datetime.now().astimezone() + + # Apply start window constraint if provided + if start_constraint: + # Ensure start constraint is in the same timezone as search_start + if start_constraint.tzinfo != search_start.tzinfo: + start_constraint = start_constraint.astimezone(search_start.tzinfo) + search_start = max(search_start, start_constraint) + + wf = ConstrainedWindowedForecast( + CI_forecast, + duration, + start=search_start, + max_window_minutes=max_window, + end_constraint=end_constraint, ) now_avg, best_avg = wf[0], min(wf) output = CATSOutput(now_avg, best_avg, location, "GBR", colour=not colour_output) @@ -390,6 +512,8 @@ def main(arguments=None) -> int: dateformat = args.dateformat or "" print(output.to_json(dateformat, sort_keys=True, indent=2)) else: + # Print CATS ASCII art banner, before any output from printing or logging + print_banner(colour_output) print(output) if args.plot: plotplan(CI_forecast, output) diff --git a/cats/forecast.py b/cats/forecast.py index 54ef6f0..882c902 100644 --- a/cats/forecast.py +++ b/cats/forecast.py @@ -29,8 +29,8 @@ class CarbonIntensityAverageEstimate: value: float start: datetime # Start of the time-integration window end: datetime # End of the time-integration window - start_value: float # CI point estimate at start time - end_value: float # CI point estimate at end time + start_value: float # CI point estimate at start time + end_value: float # CI point estimate at end time class WindowedForecast: @@ -154,3 +154,115 @@ def __iter__(self): def __len__(self): return len(self.data) - self.ndata + + +class ConstrainedWindowedForecast: + """ + A wrapper around WindowedForecast that applies time window constraints. + + This class filters the available forecast windows based on: + - Maximum window duration (cutoff time) + - End time constraint (latest allowed start time) + """ + + def __init__( + self, + data: list[CarbonIntensityPointEstimate], + duration: int, # in minutes + start: datetime, + max_window_minutes: int = 2880, + end_constraint: Optional[datetime] = None, + ): + self.max_window_minutes = max_window_minutes + self.end_constraint = end_constraint + self.duration = duration + + # Filter data based on constraints + filtered_data = self._filter_data_by_constraints( + data, start, duration, max_window_minutes, end_constraint + ) + + # Create the underlying WindowedForecast with filtered data + self._wf = WindowedForecast(filtered_data, duration, start) + + def _filter_data_by_constraints( + self, + data: list[CarbonIntensityPointEstimate], + start: datetime, + duration: int, + max_window_minutes: int, + end_constraint: Optional[datetime], + ) -> list[CarbonIntensityPointEstimate]: + """Filter forecast data based on time constraints.""" + + # Calculate the maximum time we need data for + search_window_end = start + timedelta(minutes=max_window_minutes) + + if end_constraint: + # Ensure timezone compatibility + if end_constraint.tzinfo != start.tzinfo: + end_constraint = end_constraint.astimezone(start.tzinfo) + # Jobs must start before end_constraint + search_window_end = min(search_window_end, end_constraint) + + # We need data points to cover jobs starting up to search_window_end + # plus the duration of those jobs + max_data_time = search_window_end + timedelta(minutes=duration) + + # Filter data to respect the constraints + filtered_data = [] + for d in data: + if d.datetime <= max_data_time: + filtered_data.append(d) + else: + break + + if len(filtered_data) < 2: + raise ValueError( + "Insufficient forecast data for the specified time window constraints. " + f"Try increasing --window or adjusting --end-window." + ) + + return filtered_data + + def __getitem__(self, index: int) -> CarbonIntensityAverageEstimate: + """Get forecast window at index, but only if it respects constraints.""" + if index >= len(self): + raise IndexError("Window index out of range") + return self._wf[index] + + def __iter__(self): + """Iterate over valid forecast windows.""" + for index in range(len(self)): + yield self[index] + + def __len__(self): + """Return number of valid forecast windows respecting all constraints.""" + base_length = len(self._wf) + + if base_length <= 0: + return 0 + + max_valid_index = base_length - 1 + + # Check max window constraint + if self.max_window_minutes < 2880: # Only if different from default + data_stepsize_minutes = self._wf.data_stepsize.total_seconds() / 60 + max_index_by_window = int(self.max_window_minutes / data_stepsize_minutes) + max_valid_index = min(max_valid_index, max_index_by_window) + + # Check end constraint + if self.end_constraint: + if self.end_constraint.tzinfo != self._wf.start.tzinfo: + end_constraint = self.end_constraint.astimezone(self._wf.start.tzinfo) + else: + end_constraint = self.end_constraint + + # Find the maximum index where job start time is before end_constraint + for i in range(min(base_length, max_valid_index + 1)): + window_start = self._wf.start + i * self._wf.data_stepsize + if window_start >= end_constraint: + max_valid_index = i - 1 + break + + return max(0, max_valid_index + 1) diff --git a/tests/test_window_constraints.py b/tests/test_window_constraints.py new file mode 100644 index 0000000..5df7453 --- /dev/null +++ b/tests/test_window_constraints.py @@ -0,0 +1,540 @@ +import csv +import pytest +from datetime import datetime, timedelta, timezone, time +from pathlib import Path + +from unittest.mock import MagicMock, patch +from zoneinfo import ZoneInfo + +from cats import main, parse_time_constraint, validate_window_constraints +from cats.forecast import ( + CarbonIntensityPointEstimate, + ConstrainedWindowedForecast, +) + + +@pytest.fixture(scope="session") +def sample_data() -> list[CarbonIntensityPointEstimate]: + """Load sample carbon intensity data for testing.""" + with open(Path(__file__).parent / "carbon_intensity_24h.csv", "r") as f: + csvfile = csv.reader(f, delimiter=",") + _ = next(csvfile) # Skip header line + data = [ + CarbonIntensityPointEstimate( + datetime=datetime.fromisoformat(datestr[:-1] + "+00:00"), + value=float(intensity_value), + ) + for datestr, _, _, intensity_value in csvfile + ] + return data + + +class TestParseTimeConstraint: + """Test the parse_time_constraint function.""" + + def test_parse_full_iso_datetime(self): + """Test parsing full ISO datetime strings.""" + result = parse_time_constraint("2024-01-15T09:30:00") + expected = datetime(2024, 1, 15, 9, 30, 0) + assert result is not None + assert result.replace(tzinfo=None) == expected + + def test_parse_iso_datetime_with_timezone(self): + """Test parsing ISO datetime with timezone info.""" + result = parse_time_constraint("2024-01-15T09:30:00+00:00") + expected = datetime(2024, 1, 15, 9, 30, 0, tzinfo=timezone.utc) + assert result == expected + + def test_parse_iso_datetime_with_z_suffix(self): + """Test parsing ISO datetime with Z suffix (UTC).""" + result = parse_time_constraint("2024-01-15T09:30:00Z") + expected = datetime(2024, 1, 15, 9, 30, 0, tzinfo=timezone.utc) + assert result == expected + + def test_parse_time_only(self): + """Test parsing time-only strings.""" + result = parse_time_constraint("09:30") + today = datetime.now().date() + expected_time = datetime.combine(today, time(9, 30)) + assert result is not None + assert result.replace(tzinfo=None) == expected_time.replace(tzinfo=None) + + def test_parse_time_with_seconds(self): + """Test parsing time with seconds.""" + result = parse_time_constraint("09:30:45") + today = datetime.now().date() + expected_time = datetime.combine(today, time(9, 30, 45)) + assert result is not None + assert result.replace(tzinfo=None) == expected_time.replace(tzinfo=None) + + def test_parse_empty_string_returns_none(self): + """Test that empty string returns None.""" + result = parse_time_constraint("") + assert result is None + + def test_parse_none_returns_none(self): + """Test that None returns None.""" + result = parse_time_constraint("") # Pass empty string instead of None + assert result is None + + def test_parse_with_custom_timezone(self): + """Test parsing with custom default timezone.""" + bst = timezone(timedelta(hours=1)) + result = parse_time_constraint("09:30", timezone_info=bst) + today = datetime.now().date() + expected = datetime.combine(today, time(9, 30), tzinfo=bst) + assert result == expected + + def test_parse_invalid_format_raises_error(self): + """Test that invalid formats raise ValueError.""" + with pytest.raises(ValueError, match="Unable to parse time constraint"): + _ = parse_time_constraint("not-a-time") + + def test_parse_invalid_date_raises_error(self): + """Test that invalid dates raise ValueError.""" + with pytest.raises(ValueError, match="Unable to parse time constraint"): + _ = parse_time_constraint("2024-13-45T09:30:00") + + +class TestValidateWindowConstraints: + """Test the validate_window_constraints function.""" + + def test_validate_valid_window_minutes(self): + """Test validation of valid window minutes.""" + start_dt, end_dt, window = validate_window_constraints("", "", 1440) + assert start_dt is None + assert end_dt is None + assert window == 1440 + + def test_validate_window_too_small_raises_error(self): + """Test that window < 1 raises ValueError.""" + with pytest.raises( + ValueError, match="Window must be between 1 and 2880 minutes" + ): + validate_window_constraints("", "", 0) + + def test_validate_window_too_large_raises_error(self): + """Test that window > 2880 raises ValueError.""" + with pytest.raises( + ValueError, match="Window must be between 1 and 2880 minutes" + ): + validate_window_constraints("", "", 2881) + + def test_validate_boundary_values(self): + """Test boundary values for window minutes.""" + # Test minimum + start_dt, end_dt, window = validate_window_constraints("", "", 1) + assert window == 1 + + # Test maximum + start_dt, end_dt, window = validate_window_constraints("", "", 2880) + assert window == 2880 + + def test_validate_start_before_end_constraint(self): + """Test that start must be before end.""" + with pytest.raises(ValueError, match="Start window must be before end window"): + _ = validate_window_constraints( + "2024-01-15T10:00:00", "2024-01-15T09:00:00", 1440 + ) + + def test_validate_same_start_and_end_raises_error(self): + """Test that start and end at same time raises error.""" + with pytest.raises(ValueError, match="Start window must be before end window"): + _ = validate_window_constraints( + "2024-01-15T09:00:00", "2024-01-15T09:00:00", 1440 + ) + + def test_validate_with_valid_time_constraints(self): + """Test validation with valid time constraints.""" + start_dt, end_dt, window = validate_window_constraints( + "2024-01-15T09:00:00", "2024-01-15T17:00:00", 480 + ) + assert start_dt is not None + assert end_dt is not None + assert start_dt == datetime(2024, 1, 15, 9, 0, 0).replace( + tzinfo=start_dt.tzinfo + ) + assert end_dt == datetime(2024, 1, 15, 17, 0, 0).replace(tzinfo=end_dt.tzinfo) + assert window == 480 + + def test_validate_with_only_start_constraint(self): + """Test validation with only start constraint.""" + start_dt, end_dt, window = validate_window_constraints( + "2024-01-15T09:00:00", "", 720 + ) + assert start_dt is not None + assert end_dt is None + assert window == 720 + + def test_validate_with_only_end_constraint(self): + """Test validation with only end constraint.""" + start_dt, end_dt, window = validate_window_constraints( + "", "2024-01-15T17:00:00", 360 + ) + assert start_dt is None + assert end_dt is not None + assert window == 360 + + +class TestConstrainedWindowedForecast: + """Test the ConstrainedWindowedForecast class.""" + + def test_basic_functionality_without_constraints( + self, sample_data: list[CarbonIntensityPointEstimate] + ): + """Test that ConstrainedWindowedForecast works like WindowedForecast without constraints.""" + duration = 180 # 3 hours + start = sample_data[0].datetime + + cwf = ConstrainedWindowedForecast(sample_data, duration, start) + + # Should have valid length + assert len(cwf) > 0 + + # Should be able to get first and find minimum + first = cwf[0] + minimum = min(cwf) + + assert first.start == start + assert first.end == start + timedelta(minutes=duration) + assert minimum.value <= first.value # minimum should be <= first + + def test_window_constraint_limits_search_space( + self, sample_data: list[CarbonIntensityPointEstimate] + ): + """Test that max_window_minutes limits the search space.""" + duration = 60 # 1 hour + start = sample_data[0].datetime + + # Full window + cwf_full = ConstrainedWindowedForecast(sample_data, duration, start) + + # Constrained window + cwf_constrained = ConstrainedWindowedForecast( + sample_data, + duration, + start, + max_window_minutes=240, # 4 hours + ) + + # Constrained should have fewer or equal options + assert len(cwf_constrained) <= len(cwf_full) + + def test_end_constraint_limits_start_times( + self, sample_data: list[CarbonIntensityPointEstimate] + ): + """Test that end_constraint limits when jobs can start.""" + duration = 60 # 1 hour + start = sample_data[0].datetime + end_constraint = start + timedelta(hours=6) # Jobs must start within 6 hours + + cwf = ConstrainedWindowedForecast( + sample_data, duration, start, end_constraint=end_constraint + ) + + # All start times should be before the constraint + for window in cwf: + assert window.start < end_constraint + + def test_combined_constraints( + self, sample_data: list[CarbonIntensityPointEstimate] + ): + """Test using both window and end constraints together.""" + duration = 90 # 1.5 hours + start = sample_data[0].datetime + max_window = 300 # 5 hours + end_constraint = start + timedelta(hours=4) # Must start within 4 hours + + cwf = ConstrainedWindowedForecast( + sample_data, + duration, + start, + max_window_minutes=max_window, + end_constraint=end_constraint, + ) + + # Should have valid options + assert len(cwf) > 0 + + # All options should respect both constraints + for window in cwf: + assert window.start < end_constraint + assert (window.start - start).total_seconds() / 60 <= max_window + + def test_insufficient_data_raises_error(self): + """Test that insufficient data raises appropriate error.""" + # Create minimal data + utc = ZoneInfo("UTC") + minimal_data = [ + CarbonIntensityPointEstimate( + datetime=datetime(2024, 1, 1, 12, 0, tzinfo=utc), value=100 + ) + ] + + with pytest.raises(ValueError, match="Insufficient forecast data"): + _ = ConstrainedWindowedForecast( + minimal_data, 60, minimal_data[0].datetime, max_window_minutes=30 + ) + + def test_timezone_handling_in_end_constraint( + self, sample_data: list[CarbonIntensityPointEstimate] + ): + """Test proper timezone handling for end constraints.""" + duration = 60 + start = sample_data[0].datetime + + # End constraint in different timezone + bst = timezone(timedelta(hours=1)) + end_constraint = (start + timedelta(hours=6)).astimezone(bst) + + cwf = ConstrainedWindowedForecast( + sample_data, duration, start, end_constraint=end_constraint + ) + + # Should still work correctly despite timezone difference + assert len(cwf) > 0 + + def test_index_out_of_range_raises_error( + self, sample_data: list[CarbonIntensityPointEstimate] + ): + """Test that accessing invalid index raises IndexError.""" + duration = 60 + start = sample_data[0].datetime + + cwf = ConstrainedWindowedForecast( + sample_data, duration, start, max_window_minutes=120 + ) + + with pytest.raises(IndexError, match="Window index out of range"): + cwf[len(cwf)] + + def test_iteration_works_correctly( + self, sample_data: list[CarbonIntensityPointEstimate] + ): + """Test that iteration over forecast works correctly.""" + duration = 60 + start = sample_data[0].datetime + + cwf = ConstrainedWindowedForecast( + sample_data, duration, start, max_window_minutes=240 + ) + + # Test that we can iterate and get consistent results + windows = list(cwf) + assert len(windows) == len(cwf) + + # Test that iteration and indexing give same results + for i, window in enumerate(cwf): + assert window == cwf[i] + + +class TestMainIntegration: + """Integration tests for main function with window constraints.""" + + @patch("cats.get_CI_forecast") + @patch("cats.configure.get_runtime_config") + def test_main_with_window_constraint( + self, mock_config: MagicMock, mock_forecast: MagicMock + ): + """Test main function with --window parameter.""" + # Mock the configuration + from cats.CI_api_interface import API_interfaces + + mock_config.return_value = ( + API_interfaces["carbonintensity.org.uk"], + "OX1", + 60, # duration + None, # jobinfo + None, # PUE + ) + + # Mock forecast data + utc = ZoneInfo("UTC") + base_time = datetime.now(utc) + mock_forecast.return_value = [ + CarbonIntensityPointEstimate( + datetime=base_time + timedelta(minutes=i * 30), value=100 - i * 5 + ) + for i in range(100) # 50 hours of data + ] + + # Test with window constraint + result = main(["-d", "60", "--loc", "OX1", "--window", "480"]) + assert result == 0 + + @patch("cats.get_CI_forecast") + @patch("cats.configure.get_runtime_config") + def test_main_with_time_window_constraints( + self, mock_config: MagicMock, mock_forecast: MagicMock + ): + """Test main function with --start-window and --end-window parameters.""" + # Mock the configuration + from cats.CI_api_interface import API_interfaces + + mock_config.return_value = ( + API_interfaces["carbonintensity.org.uk"], + "OX1", + 60, # duration + None, # jobinfo + None, # PUE + ) + + # Mock forecast data + utc = ZoneInfo("UTC") + now = datetime.now(utc) + mock_forecast.return_value = [ + CarbonIntensityPointEstimate( + datetime=now + timedelta(minutes=i * 30), value=100 - i * 2 + ) + for i in range(100) # 50 hours of data + ] + + # Test with both start and end window constraints + tomorrow = (now + timedelta(days=1)).strftime("%Y-%m-%dT%H:%M") + day_after = (now + timedelta(days=2)).strftime("%Y-%m-%dT%H:%M") + + result = main( + [ + "-d", + "60", + "--loc", + "OX1", + "--start-window", + tomorrow, + "--end-window", + day_after, + ] + ) + assert result == 0 + + @patch("cats.get_CI_forecast") + @patch("cats.configure.get_runtime_config") + def test_main_with_invalid_window_constraints( + self, mock_config: MagicMock, mock_forecast: MagicMock + ): + """Test main function with invalid window constraints.""" + # Mock the configuration + from cats.CI_api_interface import API_interfaces + + mock_config.return_value = ( + API_interfaces["carbonintensity.org.uk"], + "OX1", + 60, # duration + None, # jobinfo + None, # PUE + ) + + # Test with invalid window size + result = main(["-d", "60", "--loc", "OX1", "--window", "5000"]) + assert result == 1 # Should fail + + @patch("cats.get_CI_forecast") + @patch("cats.configure.get_runtime_config") + def test_main_with_duration_exceeds_window( + self, mock_config: MagicMock, mock_forecast: MagicMock + ): + """Test main function when job duration exceeds specified window.""" + # Mock the configuration + from cats.CI_api_interface import API_interfaces + + mock_config.return_value = ( + API_interfaces["carbonintensity.org.uk"], + "OX1", + 480, # 8 hour duration + None, # jobinfo + None, # PUE + ) + + # Test with window smaller than duration + result = main(["-d", "480", "--loc", "OX1", "--window", "240"]) + assert result == 1 # Should fail + + @patch("cats.get_CI_forecast") + @patch("cats.configure.get_runtime_config") + def test_main_with_conflicting_time_windows( + self, mock_config: MagicMock, mock_forecast: MagicMock + ): + """Test main function with conflicting start and end windows.""" + # Mock the configuration + from cats.CI_api_interface import API_interfaces + + mock_config.return_value = ( + API_interfaces["carbonintensity.org.uk"], + "OX1", + 60, + None, + None, + ) + + # Test with start after end + result = main( + [ + "-d", + "60", + "--loc", + "OX1", + "--start-window", + "2024-01-15T18:00", + "--end-window", + "2024-01-15T09:00", + ] + ) + assert result == 1 # Should fail + + def test_help_displays_new_options(self): + """Test that --help displays the new window options.""" + with pytest.raises(SystemExit): + _ = main(["--help"]) + + +class TestEdgeCases: + """Test edge cases and error conditions.""" + + def test_very_short_window_with_sample_data( + self, sample_data: list[CarbonIntensityPointEstimate] + ): + """Test behavior with very short window constraint.""" + duration = 30 # 30 minutes + start = sample_data[0].datetime + + # 1 hour window + cwf = ConstrainedWindowedForecast( + sample_data, duration, start, max_window_minutes=60 + ) + + # Should still have at least one option + assert len(cwf) > 0 + + def test_end_constraint_in_past_relative_to_start( + self, sample_data: list[CarbonIntensityPointEstimate] + ): + """Test end constraint that's before the start time.""" + duration = 60 + start = sample_data[5].datetime # Start later in the data + end_constraint = sample_data[2].datetime # End before start + + # This should raise an error due to insufficient data + with pytest.raises( + ValueError, match="No index found for closest data point past job end time" + ): + _ = ConstrainedWindowedForecast( + sample_data, duration, start, end_constraint=end_constraint + ) + + def test_window_exactly_matches_job_duration( + self, sample_data: list[CarbonIntensityPointEstimate] + ): + """Test when window size exactly matches job duration.""" + duration = 120 # 2 hours + start = sample_data[0].datetime + + cwf = ConstrainedWindowedForecast( + sample_data, duration, start, max_window_minutes=duration + ) + + # Should have limited options due to the window constraint + assert len(cwf) > 0 + # All valid start times should be within the window + for window in cwf: + time_diff = (window.start - start).total_seconds() / 60 + assert time_diff <= duration From 87e4abbdc0e47a50f75593f0fc555f2ec97b3704 Mon Sep 17 00:00:00 2001 From: isofinly Date: Thu, 9 Oct 2025 16:16:49 +0300 Subject: [PATCH 2/2] fix: Limit max windows time to 47 to prevent crashes --- cats/CI_api_interface.py | 2 +- cats/__init__.py | 8 ++++---- cats/forecast.py | 4 ++-- tests/test_window_constraints.py | 10 +++++----- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/cats/CI_api_interface.py b/cats/CI_api_interface.py index d9cea8e..5046d5a 100644 --- a/cats/CI_api_interface.py +++ b/cats/CI_api_interface.py @@ -47,7 +47,7 @@ def ciuk_parse_response_data(response: dict): and is set up to cache data from call to call even accross different processes within the same half hour window. The returned prediction data is in half hour blocks starting from the half hour containing the current - time and extending for 48 hours into the future. + time and extending for 47 hours into the future. :param response: :return: diff --git a/cats/__init__.py b/cats/__init__.py index 8c2fdfc..32fa718 100644 --- a/cats/__init__.py +++ b/cats/__init__.py @@ -82,8 +82,8 @@ def validate_window_constraints( :raises ValueError: If constraints are invalid """ # Validate window minutes - if window_minutes < 1 or window_minutes > 2880: - raise ValueError("Window must be between 1 and 2880 minutes (48 hours)") + if window_minutes < 1 or window_minutes > 2820: + raise ValueError("Window must be between 1 and 2820 minutes (47 hours)") start_datetime = None end_datetime = None @@ -280,8 +280,8 @@ def positive_integer(string): "--window", type=positive_integer, help="Maximum time window to search for optimal start time, in minutes. " - "Must be between 1 and 2880 (48 hours). Default: 2880 minutes (48 hours).", - default=2880, + "Must be between 1 and 2820 (47 hours). Default: 2820 minutes (47 hours).", + default=2820, ) parser.add_argument( "--start-window", diff --git a/cats/forecast.py b/cats/forecast.py index 882c902..c9f6a00 100644 --- a/cats/forecast.py +++ b/cats/forecast.py @@ -170,7 +170,7 @@ def __init__( data: list[CarbonIntensityPointEstimate], duration: int, # in minutes start: datetime, - max_window_minutes: int = 2880, + max_window_minutes: int = 2820, end_constraint: Optional[datetime] = None, ): self.max_window_minutes = max_window_minutes @@ -246,7 +246,7 @@ def __len__(self): max_valid_index = base_length - 1 # Check max window constraint - if self.max_window_minutes < 2880: # Only if different from default + if self.max_window_minutes < 2820: # Only if different from default data_stepsize_minutes = self._wf.data_stepsize.total_seconds() / 60 max_index_by_window = int(self.max_window_minutes / data_stepsize_minutes) max_valid_index = min(max_valid_index, max_index_by_window) diff --git a/tests/test_window_constraints.py b/tests/test_window_constraints.py index 5df7453..d32352f 100644 --- a/tests/test_window_constraints.py +++ b/tests/test_window_constraints.py @@ -109,14 +109,14 @@ def test_validate_valid_window_minutes(self): def test_validate_window_too_small_raises_error(self): """Test that window < 1 raises ValueError.""" with pytest.raises( - ValueError, match="Window must be between 1 and 2880 minutes" + ValueError, match="Window must be between 1 and 2820 minutes" ): validate_window_constraints("", "", 0) def test_validate_window_too_large_raises_error(self): - """Test that window > 2880 raises ValueError.""" + """Test that window > 2820 raises ValueError.""" with pytest.raises( - ValueError, match="Window must be between 1 and 2880 minutes" + ValueError, match="Window must be between 1 and 2820 minutes" ): validate_window_constraints("", "", 2881) @@ -127,8 +127,8 @@ def test_validate_boundary_values(self): assert window == 1 # Test maximum - start_dt, end_dt, window = validate_window_constraints("", "", 2880) - assert window == 2880 + start_dt, end_dt, window = validate_window_constraints("", "", 2820) + assert window == 2820 def test_validate_start_before_end_constraint(self): """Test that start must be before end."""