-
Notifications
You must be signed in to change notification settings - Fork 106
Add feature to cache pv regression model #638
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add feature to cache pv regression model #638
Conversation
Reviewer's GuideIntroduce a caching mechanism for the adjusted PV regression model, controlled by a new adjusted_pv_model_max_age parameter (configurable and runtime-overridable), including logic to reuse, refit, or recover from corrupted models, with corresponding tests and documentation updates. Sequence diagram for adjusted PV model caching and loadingsequenceDiagram
participant OptimCaller
participant adjust_pv_forecast
participant is_model_outdated
participant FileSystem
participant HomeAssistantAPI
participant Forecast
OptimCaller->>adjust_pv_forecast: call with logger, fcst, P_PV_forecast, configs
adjust_pv_forecast->>FileSystem: build model_path data_path/adjust_pv_regressor.pkl
adjust_pv_forecast->>is_model_outdated: model_path, max_age_hours, logger
alt model_file_missing
is_model_outdated->>FileSystem: check exists
is_model_outdated-->>adjust_pv_forecast: True
else max_age_zero
is_model_outdated-->>adjust_pv_forecast: True
else model_too_old_or_fresh
is_model_outdated->>FileSystem: read mtime
is_model_outdated-->>adjust_pv_forecast: True or False
end
alt need_fit == True
adjust_pv_forecast->>HomeAssistantAPI: retrieve_home_assistant_data adjust_pv
HomeAssistantAPI-->>adjust_pv_forecast: success, df_input_data
opt success == False
adjust_pv_forecast-->>OptimCaller: False
end
adjust_pv_forecast->>Forecast: adjust_pv_forecast_data_prep df_input_data
adjust_pv_forecast->>Forecast: adjust_pv_forecast_fit n_splits, regression_model
else need_fit == False
adjust_pv_forecast->>FileSystem: open model_path (rb)
alt model_load_success
FileSystem-->>adjust_pv_forecast: loaded_model
adjust_pv_forecast->>Forecast: set model_adjust_pv = loaded_model
else model_load_recoverable_error
adjust_pv_forecast->>HomeAssistantAPI: retrieve_home_assistant_data adjust_pv
HomeAssistantAPI-->>adjust_pv_forecast: success, df_input_data
opt success == False
adjust_pv_forecast-->>OptimCaller: False
end
adjust_pv_forecast->>Forecast: adjust_pv_forecast_data_prep df_input_data
adjust_pv_forecast->>Forecast: adjust_pv_forecast_fit n_splits, regression_model
else model_load_unexpected_error
adjust_pv_forecast-->>OptimCaller: False
end
end
adjust_pv_forecast->>Forecast: adjust_pv_forecast_predict forecasted_pv
Forecast-->>adjust_pv_forecast: adjusted_series
adjust_pv_forecast-->>OptimCaller: adjusted_series
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- The
adjust_pv_forecastfunction repeats the same 'retrieve_home_assistant_data → data_prep → fit' sequence in both the normal re-fit path and the corrupted-model recovery path; consider extracting this into a small helper to avoid duplication and keep the logic easier to maintain. - In
adjust_pv_forecast,model_pathis built asemhass_conf["data_path"] / model_filename; ifdata_pathcan ever be a string in existing configs, it would be safer to normalize it to aPath(e.g.,pathlib.Path(emhass_conf["data_path"])) before using the/operator.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The `adjust_pv_forecast` function repeats the same 'retrieve_home_assistant_data → data_prep → fit' sequence in both the normal re-fit path and the corrupted-model recovery path; consider extracting this into a small helper to avoid duplication and keep the logic easier to maintain.
- In `adjust_pv_forecast`, `model_path` is built as `emhass_conf["data_path"] / model_filename`; if `data_path` can ever be a string in existing configs, it would be safer to normalize it to a `Path` (e.g., `pathlib.Path(emhass_conf["data_path"])`) before using the `/` operator.
## Individual Comments
### Comment 1
<location> `src/emhass/command_line.py:181-190` </location>
<code_context>
+ if need_fit:
</code_context>
<issue_to_address>
**suggestion:** Refactor the duplicated model re-fit logic into a helper to avoid divergence.
Both the `need_fit` and pickle-error recovery branches repeat the same steps: retrieve HA data, prepare it, and fit the model. Extract this into a helper (e.g. `_fit_adjust_pv_model(...)` that returns a success flag) so the flow is defined in one place and future changes (like fit parameters) can’t diverge between branches.
Suggested implementation:
```python
def _fit_adjust_pv_model(
logger,
get_data_from_file,
retrieve_hass_conf,
optim_conf,
rh,
emhass_conf,
test_df_literal,
) -> bool:
"""
Retrieve Home Assistant history and fit the adjust_pv model.
Returns
-------
bool
True if the model was successfully fitted, False otherwise.
"""
logger.info("Adjusting PV forecast, retrieving history data for model fit")
success, df_input_data, _ = retrieve_home_assistant_data(
"adjust_pv",
get_data_from_file,
retrieve_hass_conf,
optim_conf,
rh,
emhass_conf,
test_df_literal,
)
if not success:
logger.error("Failed to retrieve history data for adjust_pv model fit")
return False
# NOTE:
# The existing data preparation and model fitting steps that are currently
# duplicated in both the `need_fit` branch and the pickle-error recovery
# branch should be moved into this helper below. For now, we just signal
# that data retrieval succeeded; the remaining logic will be refactored
# into this function in a follow-up change.
#
# Example (to be adapted with the actual current implementation):
#
# model = _build_adjust_pv_model(...)
# model.fit(X_train, y_train)
# joblib.dump(model, model_path)
#
# return True
return True
# Check if we need to fit a new model or can use existing one
model_filename = "adjust_pv_regressor.pkl"
model_path = emhass_conf["data_path"] / model_filename
max_age_hours = optim_conf.get("adjusted_pv_model_max_age", 24)
# Check if model needs to be re-fitted
need_fit = is_model_outdated(model_path, max_age_hours, logger)
if need_fit:
if not _fit_adjust_pv_model(
logger,
get_data_from_file,
retrieve_hass_conf,
optim_conf,
rh,
emhass_conf,
test_df_literal,
):
logger.warning("Adjust PV model fit was requested but did not complete successfully")
```
To fully complete the refactor and ensure there is no divergence between branches, you should also:
1. Locate the pickle-error recovery branch where the adjust_pv model is re-fitted after a `pickle` load failure. Replace its inline “retrieve HA data, prepare it, and fit the model” logic with a call to `_fit_adjust_pv_model(...)` using the same arguments as above.
2. Move the actual data preparation and model-fitting steps (currently duplicated in both the `need_fit` branch and the pickle-error recovery branch) into `_fit_adjust_pv_model(...)`, replacing the placeholder comment and the simple `return True`. The flow inside the helper should be:
- Use `df_input_data` to build the training dataset for the adjust_pv model.
- Fit the model with the appropriate parameters.
- Persist the fitted model to `model_path` (if that is part of the current implementation).
- Return `True` on success and `False` with appropriate logging on failure.
3. Remove the now-duplicated code in both original branches so that all adjust_pv model fitting logic lives exclusively in `_fit_adjust_pv_model(...)`.
</issue_to_address>
### Comment 2
<location> `tests/test_command_line_utils.py:1166-1175` </location>
<code_context>
+ def test_is_model_outdated(self):
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding edge-case tests for negative max_age_hours and exact-threshold ages in is_model_outdated
To better pin down the function’s contract, consider adding:
- A case where `max_age_hours` is negative (e.g. `-1`), since the implementation treats `<= 0` as "force refit" but tests only cover `0`.
- A case where the model age is exactly equal to `max_age_hours` to confirm that "exactly at the threshold" is treated as up-to-date (making the `>` comparison explicit).
These will clarify the intended behavior and guard against regressions in the comparison logic.
</issue_to_address>
### Comment 3
<location> `tests/test_command_line_utils.py:1221-1164` </location>
<code_context>
+ def test_adjusted_pv_model_max_age_runtime_override(self):
</code_context>
<issue_to_address>
**suggestion (testing):** Extend tests to verify behavioral impact of adjusted_pv_model_max_age on model (re)fitting, not just config propagation
Right now the test only checks that the runtime value overrides `input_data_dict['fcst'].optim_conf`, but not that it actually changes whether a cached model is reused vs refit.
Please add a behavior-focused test around `adjust_pv_forecast` that:
- Uses a temporary `data_path` with a synthetic model file.
- Mocks `Forecast.adjust_pv_forecast_fit` and `retrieve_home_assistant_data`.
- Asserts that with a large `adjusted_pv_model_max_age` and a fresh model file, the existing model is loaded and `adjust_pv_forecast_fit` is not called.
- Asserts that with `adjusted_pv_model_max_age = 0` (or a stale file), `adjust_pv_forecast_fit` is called.
That will validate the runtime caching behavior, not just configuration wiring.
Suggested implementation:
```python
def test_adjusted_pv_model_max_age_runtime_override(self):
"""Test that runtime adjusted_pv_model_max_age parameter overrides config parameter."""
# Build params with default config
params = TestCommandLineUtils.get_test_params(set_use_pv=True)
# Set adjusted_pv_model_max_age in config to 24
params["optim_conf"]["adjusted_pv_model_max_age"] = 24
# Add runtime parameters with adjusted_pv_model_max_age override
runtimeparams = {
"pv_power_forecast": [i + 1 for i in range(48)],
"adjusted_pv_model_max_age": 1,
}
# Build input data dict (or equivalent) using the runtime parameters
input_data_dict = TestCommandLineUtils.get_input_data_dict(params, runtimeparams)
# Verify that the optim_conf value was overridden by the runtime parameter
self.assertEqual(
1,
input_data_dict["fcst"].optim_conf["adjusted_pv_model_max_age"],
"Runtime adjusted_pv_model_max_age should override configuration value",
)
def test_adjusted_pv_model_max_age_affects_model_refit_behavior(self):
"""
Test that adjusted_pv_model_max_age controls whether a cached model is reused
vs. refit within adjust_pv_forecast.
"""
from pathlib import Path
import tempfile
from unittest.mock import patch
# Create a temporary data_path with a synthetic PV model file
with tempfile.TemporaryDirectory() as tmpdir:
data_path = Path(tmpdir)
model_path = data_path / "adjusted_pv_model.pkl"
# Write a dummy model file and set its mtime to "fresh"
model_path.write_bytes(b"dummy-model-bytes")
# Base params with PV enabled and pointing to the temporary data_path
params = TestCommandLineUtils.get_test_params(set_use_pv=True)
params["paths"]["data_path"] = str(data_path)
# Use a simple deterministic PV forecast for the runtime params
runtimeparams = {
"pv_power_forecast": [i + 1 for i in range(48)],
}
# Common mock for retrieve_home_assistant_data to avoid real I/O
def _mock_retrieve_ha_data(*_args, **_kwargs):
return {
"pv_power_forecast": runtimeparams["pv_power_forecast"],
}
# Patch in the namespace where adjust_pv_forecast looks up these symbols
# NOTE: adjust the patch targets to match the actual module name if needed.
with patch(
"command_line_utils.retrieve_home_assistant_data",
side_effect=_mock_retrieve_ha_data,
), patch(
"command_line_utils.Forecast.adjust_pv_forecast_fit"
) as mock_fit:
# Case 1: Large max_age with a fresh model -> model should be reused, no refit
params["optim_conf"]["adjusted_pv_model_max_age"] = 24
mock_fit.reset_mock()
adjust_pv_forecast(params=params, runtimeparams=runtimeparams)
mock_fit.assert_not_called()
# Case 2: max_age = 0 -> model should always be considered stale and refit
params["optim_conf"]["adjusted_pv_model_max_age"] = 0
mock_fit.reset_mock()
adjust_pv_forecast(params=params, runtimeparams=runtimeparams)
mock_fit.assert_called()
```
1. Ensure `tests/test_command_line_utils.py` (or the test package) has access to:
- `TestCommandLineUtils.get_input_data_dict`. If the helper is named differently in your codebase (e.g. `build_input_data_dict` or similar), update the call accordingly.
- `adjust_pv_forecast` imported from the module under test. For example, near the top of the file you may need something like:
```python
from my_project.command_line_utils import adjust_pv_forecast
```
2. Update the patch targets to match the actual module path where `adjust_pv_forecast` is defined. In the example above I used:
```python
"command_line_utils.retrieve_home_assistant_data"
"command_line_utils.Forecast.adjust_pv_forecast_fit"
```
If your module is named differently (e.g. `my_project.command_line_utils` or similar), change these strings accordingly so they patch in the correct namespace.
3. If `tests/test_command_line_utils.py` does not already import `tempfile`, `Path`, or `patch`, consider moving those imports to the top of the file to match your existing style:
```python
import tempfile
from pathlib import Path
from unittest.mock import patch
```
and remove the inline imports inside the test method.
4. If your code uses a different model filename or location under `data_path` for the adjusted PV model, update `model_path = data_path / "adjusted_pv_model.pkl"` to match the real filename so that the freshness check in `adjust_pv_forecast` sees the created file.
</issue_to_address>
### Comment 4
<location> `tests/test_command_line_utils.py:1280-1289` </location>
<code_context>
+ def test_adjust_pv_forecast_corrupted_model_recovery(self):
</code_context>
<issue_to_address>
**suggestion (testing):** Strengthen corrupted-model tests with assertions on data retrieval behavior and failure paths
To better exercise that logic, you could:
1. Assert that the recovery path actually calls `retrieve_home_assistant_data` with the expected arguments after the corrupted load.
2. Add a test where, after a corrupted model is detected, `retrieve_home_assistant_data` returns `success = False`, and verify that `adjust_pv_forecast` returns `False`.
3. Optionally, add a test for an unexpected exception during model loading (e.g., `open` or `pickle.load` raising a generic `Exception`) and assert that `adjust_pv_forecast` returns `False` and does not attempt to re-fit.
Together, these would cover both recovery and failure paths for corrupted/unloadable models.
Suggested implementation:
```python
def test_adjust_pv_forecast_corrupted_model_recovery(self):
"""Test that adjust_pv_forecast gracefully handles corrupted model files and refits."""
import pathlib
import tempfile
from unittest.mock import MagicMock, patch
from emhass.command_line import adjust_pv_forecast
# Create a corrupted pickle file on disk to simulate an existing-but-broken model
with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as tmp:
tmp_path = pathlib.Path(tmp.name)
# Write clearly invalid data that cannot be unpickled
tmp.write(b"not-a-valid-pickle")
tmp.flush()
dummy_logger = MagicMock(name="logger")
dummy_args = MagicMock(name="args")
# Simulate the corrupted model detection by making the initial model loading fail
# and then verifying that the recovery path calls retrieve_home_assistant_data.
with patch("emhass.command_line.Forecast") as MockForecast:
forecast_instance = MockForecast.return_value
# Simulate a successful recovery: data retrieval succeeds and fit completes.
forecast_instance.retrieve_home_assistant_data.return_value = (True, MagicMock(name="ha_data"))
forecast_instance.fit.return_value = None
# Exercise the code under test
result = adjust_pv_forecast(dummy_args, dummy_logger, get_data_from_file=True)
# 1. Assert that the recovery path actually calls retrieve_home_assistant_data
forecast_instance.retrieve_home_assistant_data.assert_called_once()
# 2. In the recovery case, adjust_pv_forecast should succeed
self.assertTrue(result)
def test_adjust_pv_forecast_corrupted_model_recovery_data_failure(self):
"""If recovery after a corrupted model fails to retrieve data, adjust_pv_forecast should return False."""
import pathlib
import tempfile
from unittest.mock import MagicMock, patch
from emhass.command_line import adjust_pv_forecast
# Create a corrupted pickle file to simulate a broken persisted model
with tempfile.NamedTemporaryFile(suffix=".pkl", delete=False) as tmp:
tmp_path = pathlib.Path(tmp.name)
tmp.write(b"still-not-a-valid-pickle")
tmp.flush()
dummy_logger = MagicMock(name="logger")
dummy_args = MagicMock(name="args")
with patch("emhass.command_line.Forecast") as MockForecast:
forecast_instance = MockForecast.return_value
# Simulate failure during recovery: data retrieval fails
forecast_instance.retrieve_home_assistant_data.return_value = (False, None)
result = adjust_pv_forecast(dummy_args, dummy_logger, get_data_from_file=True)
# When data retrieval fails in the recovery path, the CLI helper should signal failure
self.assertFalse(result)
forecast_instance.retrieve_home_assistant_data.assert_called_once()
def test_adjust_pv_forecast_unexpected_model_load_exception(self):
"""Unexpected errors while loading the model should cause adjust_pv_forecast to fail without refitting."""
from unittest.mock import MagicMock, patch
from emhass.command_line import adjust_pv_forecast
dummy_logger = MagicMock(name="logger")
dummy_args = MagicMock(name="args")
# Patch the Forecast class used inside adjust_pv_forecast to raise an unexpected exception
with patch("emhass.command_line.Forecast") as MockForecast:
forecast_instance = MockForecast.return_value
# Simulate an unexpected exception during model loading/initialization
MockForecast.side_effect = Exception("unexpected load error")
result = adjust_pv_forecast(dummy_args, dummy_logger, get_data_from_file=True)
# The helper should surface a failure result
self.assertFalse(result)
# And should not attempt to refit if it cannot even construct/load the model
self.assertFalse(
hasattr(forecast_instance, "fit") and forecast_instance.fit.called,
"fit() should not be called when model construction itself fails",
)
```
The above changes assume:
1. `adjust_pv_forecast` accepts the signature `adjust_pv_forecast(args, logger, get_data_from_file=...)`, matching how it is used in other tests.
2. `adjust_pv_forecast` internally uses `emhass.command_line.Forecast` and that calling `Forecast(...)` (or equivalent) is the entry point to loading/using the persisted model. If your implementation instead uses a factory function or `Forecast.from_pickle`, you should:
- Patch that actual entry point in the tests (`patch("emhass.command_line.Forecast.from_pickle")`, or similar) instead of `Forecast`.
- Move the `side_effect` for the unexpected exception test to that entry point.
To better align with your concrete implementation, you may want to:
- Pass real `args`/`input_data_dict` objects that your other tests already use (e.g. created by helpers/fixtures) instead of `MagicMock`, to ensure the tests exercise the full flow.
- Strengthen the assertion on `retrieve_home_assistant_data` calls, e.g.:
```python
forecast_instance.retrieve_home_assistant_data.assert_called_once_with(
expected_start, expected_end, expected_other_kw
)
```
using the actual arguments `adjust_pv_forecast` passes when recovering from a corrupted model.
</issue_to_address>
### Comment 5
<location> `docs/forecasts.md:118` </location>
<code_context>
+The actual **Forecast Adjustment** is perfomed by the `adjust_pv_forecast_predict` method. This method applies the trained regression model to adjust PV forecast data. Before making predictions, the method enhances the data by adding date-based and solar-related features. It then uses the trained model to predict the adjusted forecast. A correction is applied based on solar elevation to prevent negative or unrealistic values, ensuring that the adjusted forecast remains physically meaningful. The correction based on solar elevation can be parametrized using a treshold value with parameter `adjusted_pv_solar_elevation_threshold` from the configuration.
</code_context>
<issue_to_address>
**issue (typo):** Fix spelling of “perfomed” and “treshold”.
These misspellings appear in the added Forecast Adjustment paragraph describing the `adjust_pv_forecast_predict` method.
```suggestion
The actual **Forecast Adjustment** is performed by the `adjust_pv_forecast_predict` method. This method applies the trained regression model to adjust PV forecast data. Before making predictions, the method enhances the data by adding date-based and solar-related features. It then uses the trained model to predict the adjusted forecast. A correction is applied based on solar elevation to prevent negative or unrealistic values, ensuring that the adjusted forecast remains physically meaningful. The correction based on solar elevation can be parametrized using a threshold value with parameter `adjusted_pv_solar_elevation_threshold` from the configuration.
```
</issue_to_address>
### Comment 6
<location> `src/emhass/command_line.py:135` </location>
<code_context>
+ return False
+
+
def adjust_pv_forecast(
logger: logging.Logger,
fcst: Forecast,
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the repeated fit logic into a helper and simplifying the model-loading exceptions to make `adjust_pv_forecast` shorter and easier to follow.
You can reduce complexity and duplication in `adjust_pv_forecast` by extracting the “retrieve + prep + fit” sequence into a small helper and simplifying the exception handling. This keeps the new behavior intact but makes the control flow easier to follow.
### 1. Extract a `_fit_adjusted_pv_model` helper
The logic in the `if need_fit:` branch and the “load failed, re-fit” branch is almost identical. Move it into a single helper so the main function only decides *when* to fit.
```python
def _fit_adjusted_pv_model(
logger: logging.Logger,
fcst: Forecast,
get_data_from_file: bool,
retrieve_hass_conf: dict,
optim_conf: dict,
rh: RetrieveHass,
emhass_conf: dict,
test_df_literal: pd.DataFrame,
) -> bool:
logger.info("Adjusting PV forecast, retrieving history data for model fit")
success, df_input_data, _ = retrieve_home_assistant_data(
"adjust_pv",
get_data_from_file,
retrieve_hass_conf,
optim_conf,
rh,
emhass_conf,
test_df_literal,
)
if not success:
logger.error("Failed to retrieve data for adjusted PV model fit")
return False
fcst.adjust_pv_forecast_data_prep(df_input_data)
fcst.adjust_pv_forecast_fit(
n_splits=5,
regression_model=optim_conf["adjusted_pv_regression_model"],
)
return True
```
Then `adjust_pv_forecast` becomes smaller and less repetitive:
```python
def adjust_pv_forecast(
logger: logging.Logger,
fcst: Forecast,
P_PV_forecast: pd.Series,
get_data_from_file: bool,
retrieve_hass_conf: dict,
optim_conf: dict,
rh: RetrieveHass,
emhass_conf: dict,
test_df_literal: pd.DataFrame,
) -> pd.Series:
model_filename = "adjust_pv_regressor.pkl"
model_path = emhass_conf["data_path"] / model_filename
max_age_hours = optim_conf.get("adjusted_pv_model_max_age", 24)
need_fit = is_model_outdated(model_path, max_age_hours, logger)
if need_fit:
if not _fit_adjusted_pv_model(
logger,
fcst,
get_data_from_file,
retrieve_hass_conf,
optim_conf,
rh,
emhass_conf,
test_df_literal,
):
return False
else:
logger.info("Loading existing adjusted PV model from file")
try:
with open(model_path, "rb") as inp:
fcst.model_adjust_pv = pickle.load(inp)
except Exception as e:
logger.error(
"Failed to load existing adjusted PV model: %s: %s",
type(e).__name__,
str(e),
)
logger.warning(
"Model file may be corrupted or incompatible. Falling back to re-fitting the model."
)
if not _fit_adjusted_pv_model(
logger,
fcst,
get_data_from_file,
retrieve_hass_conf,
optim_conf,
rh,
emhass_conf,
test_df_literal,
):
logger.error("Failed to retrieve data for model re-fit after load error")
return False
logger.info("Successfully re-fitted model after load failure")
P_PV_forecast = P_PV_forecast.rename("forecast").to_frame()
P_PV_forecast = fcst.adjust_pv_forecast_predict(forecasted_pv=P_PV_forecast)
return P_PV_forecast["adjusted_forecast"].rename(None)
```
This preserves:
- Age-based refitting.
- Load-then-refit-on-failure behavior.
- All logging and return semantics.
### 2. Simplify exception handling around model loading
If you don’t need different behavior per error type, catching `Exception` and logging the type is enough and much easier to read. If you still want to explicitly separate “expected pickle/mod import issues” from truly unexpected ones, you can keep a short tuple and fall back to a generic catch-all:
```python
try:
with open(model_path, "rb") as inp:
fcst.model_adjust_pv = pickle.load(inp)
except (pickle.UnpicklingError, EOFError, ImportError, ModuleNotFoundError) as e:
# expected load/compat issues, re-fit
...
except Exception as e:
logger.error(
"Unexpected error loading adjusted PV model: %s: %s",
type(e).__name__,
str(e),
)
return False
```
This keeps your “cannot recover” behavior for truly unexpected failures but reduces the noisy list and nesting.
### 3. Minor simplification of `is_model_outdated` (optional)
You can keep behavior identical while making the flow slightly more linear and reducing log noise:
```python
def is_model_outdated(
model_path: pathlib.Path, max_age_hours: int, logger: logging.Logger
) -> bool:
if not model_path.exists():
logger.info("Adjusted PV model file does not exist, will train new model")
return True
if max_age_hours <= 0:
logger.info(
"adjusted_pv_model_max_age is set to 0, forcing model re-fit"
)
return True
model_mtime = datetime.fromtimestamp(model_path.stat().st_mtime)
model_age_hours = (datetime.now() - model_mtime).total_seconds() / 3600.0
if model_age_hours > max_age_hours:
logger.info(
f"Adjusted PV model is outdated (age: {model_age_hours:.1f}h, "
f"max: {max_age_hours}h), will train new model"
)
return True
logger.info(
f"Using existing adjusted PV model (age: {model_age_hours:.1f}h, "
f"max: {max_age_hours}h)"
)
return False
```
Key change is computing the age once in hours, which also simplifies the log formatting.
</issue_to_address>
### Comment 7
<location> `src/emhass/command_line.py:179` </location>
<code_context>
def adjust_pv_forecast(
logger: logging.Logger,
fcst: Forecast,
P_PV_forecast: pd.Series,
get_data_from_file: bool,
retrieve_hass_conf: dict,
optim_conf: dict,
rh: RetrieveHass,
emhass_conf: dict,
test_df_literal: pd.DataFrame,
) -> pd.Series:
"""
Adjust the photovoltaic (PV) forecast using historical data and a regression model.
This method retrieves historical data, prepares it for model fitting, trains a regression
model, and adjusts the provided PV forecast based on the trained model.
:param logger: Logger object for logging information and errors.
:type logger: logging.Logger
:param fcst: Forecast object used for PV forecast adjustment.
:type fcst: Forecast
:param P_PV_forecast: The initial PV forecast to be adjusted.
:type P_PV_forecast: pd.Series
:param get_data_from_file: Whether to retrieve data from a file instead of Home Assistant.
:type get_data_from_file: bool
:param retrieve_hass_conf: Configuration dictionary for retrieving data from Home Assistant.
:type retrieve_hass_conf: dict
:param optim_conf: Configuration dictionary for optimization settings.
:type optim_conf: dict
:param rh: RetrieveHass object for interacting with Home Assistant.
:type rh: RetrieveHass
:param emhass_conf: Configuration dictionary for emhass paths and settings.
:type emhass_conf: dict
:param test_df_literal: DataFrame containing test data for debugging purposes.
:type test_df_literal: pd.DataFrame
:return: The adjusted PV forecast as a pandas Series.
:rtype: pd.Series
"""
# Check if we need to fit a new model or can use existing one
model_filename = "adjust_pv_regressor.pkl"
model_path = emhass_conf["data_path"] / model_filename
max_age_hours = optim_conf.get("adjusted_pv_model_max_age", 24)
# Check if model needs to be re-fitted
need_fit = is_model_outdated(model_path, max_age_hours, logger)
if need_fit:
logger.info("Adjusting PV forecast, retrieving history data for model fit")
# Retrieve data from Home Assistant
success, df_input_data, _ = retrieve_home_assistant_data(
"adjust_pv",
get_data_from_file,
retrieve_hass_conf,
optim_conf,
rh,
emhass_conf,
test_df_literal,
)
if not success:
return False
# Call data preparation method
fcst.adjust_pv_forecast_data_prep(df_input_data)
# Call the fit method
fcst.adjust_pv_forecast_fit(
n_splits=5,
regression_model=optim_conf["adjusted_pv_regression_model"],
)
else:
# Load existing model
logger.info("Loading existing adjusted PV model from file")
try:
with open(model_path, "rb") as inp:
fcst.model_adjust_pv = pickle.load(inp)
except (pickle.UnpicklingError, EOFError, AttributeError, ImportError, ModuleNotFoundError) as e:
logger.error(
f"Failed to load existing adjusted PV model: {type(e).__name__}: {str(e)}"
)
logger.warning(
"Model file may be corrupted or incompatible. Falling back to re-fitting the model."
)
# Retrieve data from Home Assistant for re-fit
success, df_input_data, _ = retrieve_home_assistant_data(
"adjust_pv",
get_data_from_file,
retrieve_hass_conf,
optim_conf,
rh,
emhass_conf,
test_df_literal,
)
if not success:
logger.error("Failed to retrieve data for model re-fit after load error")
return False
# Call data preparation method
fcst.adjust_pv_forecast_data_prep(df_input_data)
# Call the fit method to create new model
fcst.adjust_pv_forecast_fit(
n_splits=5,
regression_model=optim_conf["adjusted_pv_regression_model"],
)
logger.info("Successfully re-fitted model after load failure")
except Exception as e:
logger.error(
f"Unexpected error loading adjusted PV model: {type(e).__name__}: {str(e)}"
)
logger.error("Cannot recover from this error")
return False
# Call the predict method
P_PV_forecast = P_PV_forecast.rename("forecast").to_frame()
P_PV_forecast = fcst.adjust_pv_forecast_predict(forecasted_pv=P_PV_forecast)
# Update the PV forecast
return P_PV_forecast["adjusted_forecast"].rename(None)
</code_context>
<issue_to_address>
**issue (code-quality):** We've found these issues:
- Use named expression to simplify assignment and conditional ([`use-named-expression`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/use-named-expression/))
- Remove redundant exceptions from an except clause ([`remove-redundant-exception`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/remove-redundant-exception/))
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
Hi @sokorn, please update your code to solve current conflict so that we can merge this new feature. Thanks |
|
done. As far as I can tell, the failures seems to come from other PR. |
Yes that's unexpected and annoying. |
|
Sourcery is reporting one issue, but I don't see the output, so I can't resolve it. All other tests now look good. |
I can see that, thanks for addressing the issues. Merging... |



added a parameter to cache the adjusted pv regression model. It works by checking the file age and only running a new fit, when the file is older than the given hours (default 24h). This could improve the performance of the day ahead and mpc calls. Especially, when using a high frequency.
Summary by Sourcery
Add caching and age-based reuse of the adjusted PV regression model to avoid unnecessary re-training while retaining a fallback path when the cached model is invalid.
New Features:
Bug Fixes:
Enhancements:
Documentation:
Tests: