Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cats/CI_api_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def ciuk_parse_response_data(response: dict):
and is set up to cache data from call to call even accross different
processes within the same half hour window. The returned prediction data
is in half hour blocks starting from the half hour containing the current
time and extending for 48 hours into the future.
time and extending for 47 hours into the future.

:param response:
:return:
Expand Down
144 changes: 134 additions & 10 deletions cats/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
from .configure import get_runtime_config
from .constants import CATS_ASCII_BANNER_COLOUR, CATS_ASCII_BANNER_NO_COLOUR
from .plotting import plotplan
from .forecast import CarbonIntensityAverageEstimate, WindowedForecast
from .forecast import (
CarbonIntensityAverageEstimate,
ConstrainedWindowedForecast,
)

__version__ = "1.1.0"

Expand All @@ -28,6 +31,78 @@ def indent_lines(lines, spaces):
return "\n".join(" " * spaces + line for line in lines.split("\n"))


def parse_time_constraint(
time_str: str, timezone_info=None
) -> Optional[datetime.datetime]:
"""
Parse a time constraint string into a datetime object.

:param time_str: Time string in various formats (HH:MM, YYYY-MM-DDTHH:MM, etc.)
:param timezone_info: Default timezone if not specified in the string
:return: Parsed datetime object
:raises ValueError: If the time string cannot be parsed
"""
if not time_str:
return None

# If timezone_info is not provided, use system local timezone
if timezone_info is None:
timezone_info = datetime.datetime.now().astimezone().tzinfo

# Try to parse as full ISO format first
try:
if "T" in time_str:
# Full datetime string
if time_str.endswith("Z"):
time_str = time_str[:-1] + "+00:00"
elif time_str[-6] not in ["+", "-"] and time_str[-3] != ":":
# No timezone info, add default
dt = datetime.datetime.fromisoformat(time_str)
return dt.replace(tzinfo=timezone_info)
return datetime.datetime.fromisoformat(time_str)
else:
# Time only (HH:MM or HH:MM:SS)
time_part = datetime.time.fromisoformat(time_str)
today = datetime.datetime.now().date()
return datetime.datetime.combine(today, time_part, tzinfo=timezone_info)
except ValueError as e:
raise ValueError(f"Unable to parse time constraint '{time_str}': {e}")


def validate_window_constraints(
start_window: Optional[str], end_window: Optional[str], window_minutes: int
) -> tuple[Optional[datetime.datetime], Optional[datetime.datetime], int]:
"""
Validate and parse window constraints.

:param start_window: Start window constraint string
:param end_window: End window constraint string
:param window_minutes: Maximum window duration in minutes
:return: Tuple of (start_datetime, end_datetime, validated_window_minutes)
:raises ValueError: If constraints are invalid
"""
# Validate window minutes
if window_minutes < 1 or window_minutes > 2820:
raise ValueError("Window must be between 1 and 2820 minutes (47 hours)")

start_datetime = None
end_datetime = None

# Parse time constraints if provided
if start_window and start_window.strip():
start_datetime = parse_time_constraint(start_window)

if end_window and end_window.strip():
end_datetime = parse_time_constraint(end_window)

# Validate that start is before end if both are provided
if start_datetime and end_datetime:
if start_datetime >= end_datetime:
raise ValueError("Start window must be before end window")

return start_datetime, end_datetime, window_minutes


def parse_arguments():
"""
Parse command line arguments
Expand Down Expand Up @@ -201,6 +276,27 @@ def positive_integer(string):
"\"pip install 'climate-aware-task-scheduler[plots]'\"",
action="store_true",
)
parser.add_argument(
"--window",
type=positive_integer,
help="Maximum time window to search for optimal start time, in minutes. "
"Must be between 1 and 2820 (47 hours). Default: 2820 minutes (47 hours).",
default=2820,
)
parser.add_argument(
"--start-window",
type=str,
help="Earliest time the job is allowed to start, in ISO format (e.g., '2024-01-15T09:00'). "
"If only time is provided (e.g., '09:00'), today's date is assumed. "
"Timezone info is optional and defaults to system timezone.",
)
parser.add_argument(
"--end-window",
type=str,
help="Latest time the job is allowed to start, in ISO format (e.g., '2024-01-15T17:00'). "
"If only time is provided (e.g., '17:00'), today's date is assumed. "
"Timezone info is optional and defaults to system timezone.",
)

return parser

Expand Down Expand Up @@ -324,9 +420,6 @@ def main(arguments=None) -> int:
args = parser.parse_args(arguments)
colour_output = args.no_colour or args.no_color

# Print CATS ASCII art banner, before any output from printing or logging
print_banner(colour_output)

if args.command and not args.scheduler:
print(
"cats: To run a command or sbatch script with the -c or --command option, you must\n"
Expand All @@ -335,11 +428,27 @@ def main(arguments=None) -> int:
return 1

CI_API_interface, location, duration, jobinfo, PUE = get_runtime_config(args)
if duration > CI_API_interface.max_duration:
print(
f"""API allows a maximum job duration of {CI_API_interface.max_duration} minutes.
This is usually due to forecast limitations."""

# Validate and parse window constraints
try:
start_constraint, end_constraint, max_window = validate_window_constraints(
args.start_window, args.end_window, args.window
)
except ValueError as e:
print(f"Error in window constraints: {e}")
return 1
# Check against both API limit and user-specified window
effective_max_duration = min(CI_API_interface.max_duration, max_window)
if duration > effective_max_duration:
if max_window < CI_API_interface.max_duration:
print(
f"""Job duration ({duration} minutes) exceeds specified window ({max_window} minutes)."""
)
else:
print(
f"""API allows a maximum job duration of {CI_API_interface.max_duration} minutes.
This is usually due to forecast limitations."""
)
return 1

########################
Expand All @@ -362,8 +471,21 @@ def main(arguments=None) -> int:

# Find best possible average carbon intensity, along
# with corresponding job start time.
wf = WindowedForecast(
CI_forecast, duration, start=datetime.datetime.now().astimezone()
search_start = datetime.datetime.now().astimezone()

# Apply start window constraint if provided
if start_constraint:
# Ensure start constraint is in the same timezone as search_start
if start_constraint.tzinfo != search_start.tzinfo:
start_constraint = start_constraint.astimezone(search_start.tzinfo)
search_start = max(search_start, start_constraint)

wf = ConstrainedWindowedForecast(
CI_forecast,
duration,
start=search_start,
max_window_minutes=max_window,
end_constraint=end_constraint,
)
now_avg, best_avg = wf[0], min(wf)
output = CATSOutput(now_avg, best_avg, location, "GBR", colour=not colour_output)
Expand All @@ -390,6 +512,8 @@ def main(arguments=None) -> int:
dateformat = args.dateformat or ""
print(output.to_json(dateformat, sort_keys=True, indent=2))
else:
# Print CATS ASCII art banner, before any output from printing or logging
print_banner(colour_output)
print(output)
if args.plot:
plotplan(CI_forecast, output)
Expand Down
116 changes: 114 additions & 2 deletions cats/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ class CarbonIntensityAverageEstimate:
value: float
start: datetime # Start of the time-integration window
end: datetime # End of the time-integration window
start_value: float CI point estimate at start time
end_value: float CI point estimate at end time
start_value: float # CI point estimate at start time
end_value: float # CI point estimate at end time


class WindowedForecast:
Expand Down Expand Up @@ -154,3 +154,115 @@ def __iter__(self):

def __len__(self):
return len(self.data) - self.ndata


class ConstrainedWindowedForecast:
"""
A wrapper around WindowedForecast that applies time window constraints.

This class filters the available forecast windows based on:
- Maximum window duration (cutoff time)
- End time constraint (latest allowed start time)
"""

def __init__(
self,
data: list[CarbonIntensityPointEstimate],
duration: int, # in minutes
start: datetime,
max_window_minutes: int = 2820,
end_constraint: Optional[datetime] = None,
):
self.max_window_minutes = max_window_minutes
self.end_constraint = end_constraint
self.duration = duration

# Filter data based on constraints
filtered_data = self._filter_data_by_constraints(
data, start, duration, max_window_minutes, end_constraint
)

# Create the underlying WindowedForecast with filtered data
self._wf = WindowedForecast(filtered_data, duration, start)

def _filter_data_by_constraints(
self,
data: list[CarbonIntensityPointEstimate],
start: datetime,
duration: int,
max_window_minutes: int,
end_constraint: Optional[datetime],
) -> list[CarbonIntensityPointEstimate]:
"""Filter forecast data based on time constraints."""

# Calculate the maximum time we need data for
search_window_end = start + timedelta(minutes=max_window_minutes)

if end_constraint:
# Ensure timezone compatibility
if end_constraint.tzinfo != start.tzinfo:
end_constraint = end_constraint.astimezone(start.tzinfo)
# Jobs must start before end_constraint
search_window_end = min(search_window_end, end_constraint)

# We need data points to cover jobs starting up to search_window_end
# plus the duration of those jobs
max_data_time = search_window_end + timedelta(minutes=duration)

# Filter data to respect the constraints
filtered_data = []
for d in data:
if d.datetime <= max_data_time:
filtered_data.append(d)
else:
break

if len(filtered_data) < 2:
raise ValueError(
"Insufficient forecast data for the specified time window constraints. "
f"Try increasing --window or adjusting --end-window."
)

return filtered_data

def __getitem__(self, index: int) -> CarbonIntensityAverageEstimate:
"""Get forecast window at index, but only if it respects constraints."""
if index >= len(self):
raise IndexError("Window index out of range")
return self._wf[index]

def __iter__(self):
"""Iterate over valid forecast windows."""
for index in range(len(self)):
yield self[index]

def __len__(self):
"""Return number of valid forecast windows respecting all constraints."""
base_length = len(self._wf)

if base_length <= 0:
return 0

max_valid_index = base_length - 1

# Check max window constraint
if self.max_window_minutes < 2820: # Only if different from default
data_stepsize_minutes = self._wf.data_stepsize.total_seconds() / 60
max_index_by_window = int(self.max_window_minutes / data_stepsize_minutes)
max_valid_index = min(max_valid_index, max_index_by_window)

# Check end constraint
if self.end_constraint:
if self.end_constraint.tzinfo != self._wf.start.tzinfo:
end_constraint = self.end_constraint.astimezone(self._wf.start.tzinfo)
else:
end_constraint = self.end_constraint

# Find the maximum index where job start time is before end_constraint
for i in range(min(base_length, max_valid_index + 1)):
window_start = self._wf.start + i * self._wf.data_stepsize
if window_start >= end_constraint:
max_valid_index = i - 1
break

return max(0, max_valid_index + 1)
Loading