Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 169 additions & 7 deletions src/emhass/optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

"""

def __init__(

Check failure on line 32 in src/emhass/optimization.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 17 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=davidusb-geek_emhass&issues=AZrLfvAZp8Xfr07A3f8q&open=AZrLfvAZp8Xfr07A3f8q&pullRequest=630
self,
retrieve_hass_conf: dict,
optim_conf: dict,
Expand Down Expand Up @@ -121,7 +121,29 @@
f"Solver configuration: lp_solver={self.lp_solver}, lp_solver_path={self.lp_solver_path}"
)
self.logger.debug(f"Number of threads: {self.num_threads}")


Check failure on line 124 in src/emhass/optimization.py

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

Ruff (W293)

src/emhass/optimization.py:124:1: W293 Blank line contains whitespace
# Capacity tariff configuration
self.set_capacity_tariff = optim_conf.get("set_capacity_tariff", False)
if self.set_capacity_tariff:
raw_threshold = optim_conf.get("capacity_tariff_threshold", 5000)
self.capacity_penalty = float(optim_conf.get("capacity_tariff_penalty", 10.0))

Check failure on line 130 in src/emhass/optimization.py

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

Ruff (W293)

src/emhass/optimization.py:130:1: W293 Blank line contains whitespace
# Handle both scalar and time-series thresholds
if isinstance(raw_threshold, (list, np.ndarray)):
self.capacity_threshold = [float(v) for v in raw_threshold]
self.capacity_threshold_is_list = True
self.logger.info(
f"Capacity tariff enabled with time-series thresholds, "
f"penalty={self.capacity_penalty}"
)
else:
self.capacity_threshold = float(raw_threshold)
self.capacity_threshold_is_list = False
self.logger.info(
f"Capacity tariff enabled: threshold={self.capacity_threshold}W, "
f"penalty={self.capacity_penalty}"
)

Check failure on line 146 in src/emhass/optimization.py

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

Ruff (W293)

src/emhass/optimization.py:146:1: W293 Blank line contains whitespace
def perform_optimization(
self,
data_opt: pd.DataFrame,
Expand Down Expand Up @@ -192,7 +214,26 @@
self.logger.debug(
f"Battery usage enabled. Initial SOC: {soc_init}, Final SOC: {soc_final}"
)


Check failure on line 217 in src/emhass/optimization.py

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

Ruff (W293)

src/emhass/optimization.py:217:1: W293 Blank line contains whitespace
# Prepare capacity tariff threshold array if needed
capacity_threshold_array = None
if self.set_capacity_tariff and self.capacity_threshold_is_list:
n = len(data_opt.index)
if len(self.capacity_threshold) != n:
self.logger.error(
f"Capacity tariff threshold list length ({len(self.capacity_threshold)}) "
f"does not match optimization period length ({n}). Disabling capacity tariff."
)
self.set_capacity_tariff = False
else:
capacity_threshold_array = np.array(self.capacity_threshold)
active_count = np.sum(capacity_threshold_array > 0)
unique_thresholds = np.unique(capacity_threshold_array[capacity_threshold_array > 0])
self.logger.info(
f"Capacity tariff active for {active_count}/{n} timesteps "
f"with {len(unique_thresholds)} unique threshold(s)"
)

Check failure on line 236 in src/emhass/optimization.py

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

Ruff (W293)

src/emhass/optimization.py:236:1: W293 Blank line contains whitespace
# If def_total_timestep os set, bypass def_total_hours
if def_total_timestep is not None:
if def_total_hours is None:
Expand Down Expand Up @@ -379,7 +420,6 @@
)
for i in set_I
}

## Define objective
P_def_sum = []
for i in set_I:
Expand Down Expand Up @@ -475,9 +515,56 @@
* self.optim_conf["nominal_power_of_deferrable_loads"][k]
for i in set_I
)


Check failure on line 518 in src/emhass/optimization.py

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

Ruff (W293)

src/emhass/optimization.py:518:1: W293 Blank line contains whitespace
# Add capacity tariff penalty to objective function
P_excess = {}

Check warning on line 520 in src/emhass/optimization.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this local variable "P_excess" to match the regular expression ^[_a-z][a-z0-9_]*$.

See more on https://sonarcloud.io/project/issues?id=davidusb-geek_emhass&issues=AZrLfvAZp8Xfr07A3f8r&open=AZrLfvAZp8Xfr07A3f8r&pullRequest=630
active_periods = {}

Check failure on line 522 in src/emhass/optimization.py

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

Ruff (W293)

src/emhass/optimization.py:522:1: W293 Blank line contains whitespace
if self.set_capacity_tariff:
if self.capacity_threshold_is_list:
# Build mapping of thresholds to timesteps
for i in set_I:
threshold_i = capacity_threshold_array[i]
if threshold_i > 0:
threshold_key = float(threshold_i)
if threshold_key not in active_periods:
active_periods[threshold_key] = []
active_periods[threshold_key].append(i)

Check failure on line 533 in src/emhass/optimization.py

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

Ruff (W293)

src/emhass/optimization.py:533:1: W293 Blank line contains whitespace
# Create excess variable for each active timestep
for threshold_val, timesteps in active_periods.items():
for i in timesteps:
var_name = f"P_excess_{int(threshold_val)}_{i}"
P_excess[i] = plp.LpVariable(var_name, lowBound=0, cat="Continuous")

Check failure on line 539 in src/emhass/optimization.py

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

Ruff (W293)

src/emhass/optimization.py:539:1: W293 Blank line contains whitespace
# Add per-timestep penalty
capacity_penalty_term = plp.lpSum(
-self.capacity_penalty * P_excess[i]
for i in P_excess.keys()
)
objective = objective + capacity_penalty_term
self.logger.info(
f"Added per-timestep capacity tariff penalty for {len(P_excess)} timesteps "
f"across {len(active_periods)} threshold level(s)"
)
else:
# Single threshold - create excess variable for each timestep
for i in set_I:
P_excess[i] = plp.LpVariable(f"P_excess_{i}", lowBound=0, cat="Continuous")

Check failure on line 554 in src/emhass/optimization.py

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

Ruff (W293)

src/emhass/optimization.py:554:1: W293 Blank line contains whitespace
# Add per-timestep penalty
capacity_penalty_term = plp.lpSum(
-self.capacity_penalty * P_excess[i]
for i in set_I
)
objective = objective + capacity_penalty_term
self.logger.info(
f"Added per-timestep capacity tariff penalty: "
f"threshold={self.capacity_threshold}W, penalty={self.capacity_penalty}"
)

opt_model.setObjective(objective)

## Setting constraints
# The main constraint: power balance
if self.plant_conf["inverter_is_hybrid"]:
Expand Down Expand Up @@ -711,6 +798,32 @@
}
)

# Capacity tariff constraints: define excess for each timestep
if self.set_capacity_tariff:
if self.capacity_threshold_is_list:
# For each active timestep: P_excess[i] >= P_grid_pos[i] - threshold[i]
for threshold_val, timesteps in active_periods.items():
for i in timesteps:
constraints[f"constraint_excess_{int(threshold_val)}_{i}"] = plp.LpConstraint(
e=P_excess[i] - (P_grid_pos[i] - threshold_val),
sense=plp.LpConstraintGE,
rhs=0
)
self.logger.info(
f"Added capacity tariff excess constraints for {len(P_excess)} timesteps"
)
else:
# Single threshold - constrain each timestep
for i in set_I:
constraints[f"constraint_excess_{i}"] = plp.LpConstraint(
e=P_excess[i] - (P_grid_pos[i] - self.capacity_threshold),
sense=plp.LpConstraintGE,
rhs=0
)
self.logger.info(
f"Added capacity tariff excess constraints for all {len(set_I)} timesteps"
)

# Treat deferrable loads constraints
predicted_temps = {}
for k in range(self.optim_conf["number_of_deferrable_loads"]):
Expand Down Expand Up @@ -1366,6 +1479,48 @@
opt_tp["P_PV_curtailment"] = [P_PV_curtailment[i].varValue for i in set_I]
opt_tp.index = data_opt.index

# Add capacity tariff results to output DataFrame
if self.set_capacity_tariff:
# Initialize arrays
excess_values = []
penalty_values = []

for i in set_I:
if i in P_excess:
excess_val = P_excess[i].varValue or 0.0
penalty_val = -self.capacity_penalty * excess_val
excess_values.append(excess_val)
penalty_values.append(penalty_val)
else:
excess_values.append(0.0)
penalty_values.append(0.0)

# Add columns to output
if self.capacity_threshold_is_list:
opt_tp["capacity_threshold"] = capacity_threshold_array.tolist()
else:
opt_tp["capacity_threshold"] = self.capacity_threshold

opt_tp["P_excess"] = excess_values
opt_tp["capacity_penalty_cost"] = penalty_values

# Log summary
total_penalty = sum(penalty_values)
total_excess = sum(excess_values)
self.logger.info(
f"Capacity tariff results: "
f"Total excess: {total_excess:.0f}W, "
f"Total penalty: {total_penalty:.2f}"
)

# Temporary diagnostic
# Check for duplicate or irregular timestamps
time_diffs = opt_tp.index.to_series().diff()
if time_diffs.nunique() > 1:
self.logger.warning(
f"Irregular index spacing detected: {time_diffs.value_counts()}"
)

# Lets compute the optimal cost function
P_def_sum_tp = []
for i in set_I:
Expand Down Expand Up @@ -1399,14 +1554,21 @@
]

if self.costfun == "profit":
# Get penalty values if capacity tariff is enabled
penalty_per_timestep = (
opt_tp["capacity_penalty_cost"].values
if self.set_capacity_tariff
else [0] * len(set_I)
)

if self.optim_conf["set_total_pv_sell"]:
opt_tp["cost_fun_profit"] = [
-0.001
* self.timeStep
* (
unit_load_cost[i] * (P_load[i] + P_def_sum_tp[i])
+ unit_prod_price[i] * P_grid_neg[i].varValue
)
) + penalty_per_timestep[i]
for i in set_I
]
else:
Expand All @@ -1416,7 +1578,7 @@
* (
unit_load_cost[i] * P_grid_pos[i].varValue
+ unit_prod_price[i] * P_grid_neg[i].varValue
)
) + penalty_per_timestep[i]
for i in set_I
]
elif self.costfun == "cost":
Expand Down
73 changes: 73 additions & 0 deletions src/emhass/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,80 @@ def treat_runtimeparams(
else:
beta = runtimeparams["beta"]
params["passed_data"]["beta"] = beta

# Treat capacity tariff parameters passed at runtime
if "set_capacity_tariff" not in runtimeparams.keys():
set_capacity_tariff = False
else:
set_capacity_tariff = ast.literal_eval(
str(runtimeparams["set_capacity_tariff"]).capitalize()
)
params["optim_conf"]["set_capacity_tariff"] = set_capacity_tariff

# Handle capacity_tariff_threshold (scalar or list)
if "capacity_tariff_threshold" not in runtimeparams.keys():
capacity_tariff_threshold = 5000
else:
capacity_tariff_threshold = runtimeparams["capacity_tariff_threshold"]

# Parse string input if needed
if isinstance(capacity_tariff_threshold, str):
try:
parsed = ast.literal_eval(capacity_tariff_threshold)
capacity_tariff_threshold = parsed if isinstance(parsed, list) else float(parsed)
except (ValueError, SyntaxError):
logger.warning(
f"Invalid capacity_tariff_threshold value: {capacity_tariff_threshold}, "
f"defaulting to 5000"
)
capacity_tariff_threshold = 5000

# Validate list length
if isinstance(capacity_tariff_threshold, list):
expected_len = len(forecast_dates)
actual_len = len(capacity_tariff_threshold)

if actual_len >= expected_len:
logger.info(
f"Time-series capacity tariff threshold with {actual_len} values"
)
else:
logger.warning(
f"Capacity tariff threshold list length ({actual_len}) is less than "
f"forecast length ({expected_len}). Using single threshold mode."
)
# Fall back to first non-zero value or default
capacity_tariff_threshold = next(
(x for x in capacity_tariff_threshold if x > 0), 5000
)

params["optim_conf"]["capacity_tariff_threshold"] = capacity_tariff_threshold

# Handle capacity_tariff_penalty
if "capacity_tariff_penalty" not in runtimeparams.keys():
capacity_tariff_penalty = 10.0
else:
try:
capacity_tariff_penalty = float(runtimeparams["capacity_tariff_penalty"])
except (ValueError, TypeError):
logger.warning(
f"Invalid capacity_tariff_penalty value: "
f"{runtimeparams['capacity_tariff_penalty']}, defaulting to 10.0"
)
capacity_tariff_penalty = 10.0

params["optim_conf"]["capacity_tariff_penalty"] = capacity_tariff_penalty

# Treat plant configuration parameters passed at runtime
if "maximum_power_from_grid" in runtimeparams.keys():
params["plant_conf"]["maximum_power_from_grid"] = runtimeparams[
"maximum_power_from_grid"
]
if "maximum_power_to_grid" in runtimeparams.keys():
params["plant_conf"]["maximum_power_to_grid"] = runtimeparams[
"maximum_power_to_grid"
]

# Param to save forecast cache (i.e. Solcast)
if "weather_forecast_cache" not in runtimeparams.keys():
weather_forecast_cache = False
Expand Down
Loading