diff --git a/.github/workflows/check.yaml b/.github/workflows/check.yaml index ae2432ac..f98f58a8 100644 --- a/.github/workflows/check.yaml +++ b/.github/workflows/check.yaml @@ -16,7 +16,7 @@ jobs: python-version: ${{ matrix.python-version }} - name: Install Poetry run: | - pipx install poetry==2.1.0 + pipx install poetry==2.1.3 - name: Install dependencies run: | diff --git a/pyproject.toml b/pyproject.toml index f79a99d4..8368eafb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ scipy = "^1.14.0" matplotlib = "^3.9.1" scikit-learn = "^1.5.2" PyQt5 = "^5.15.11" +statsmodels = "^0.14.5" [tool.poetry.group.dev.dependencies] pytest = "^8.2.2" @@ -60,6 +61,7 @@ ignore = ["PLR0913"] files = "pysatl_cpd" mypy_path = "pysatl_cpd" strict = true +ignore_missing_imports = true [build-system] diff --git a/pysatl_cpd/core/algorithms/arima/__init__.py b/pysatl_cpd/core/algorithms/arima/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pysatl_cpd/core/algorithms/arima/models/__init__.py b/pysatl_cpd/core/algorithms/arima/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pysatl_cpd/core/algorithms/arima/models/arima.py b/pysatl_cpd/core/algorithms/arima/models/arima.py new file mode 100644 index 00000000..5c60cb92 --- /dev/null +++ b/pysatl_cpd/core/algorithms/arima/models/arima.py @@ -0,0 +1,84 @@ +""" +Module for the implementation of an ARIMA prediction model for the Predict&Compare CPD algorithm. +""" + +__author__ = "Aleksandra Ri" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + + +import warnings +from typing import Any, Optional + +import numpy as np +from statsmodels.tools.sm_exceptions import ConvergenceWarning +from statsmodels.tsa.arima.model import ARIMA, ARIMAResults + + +class ArimaModel: + """ + A wrapper for the statsmodels ARIMA model to simplify its use in online + change point detection tasks. + """ + + def __init__(self) -> None: + """ + Initializes the ArimaModel instance, without any concrete values. + """ + self.__training_data: list[np.float64] = [] + self.__results: Optional[ARIMAResults] = None + self.__order: Optional[tuple[int, int, int]] = None + + def clear(self) -> None: + """ + Resets the model's state by clearing training data and results. + :return: + """ + self.__training_data = [] + self.__results = None + + def fit(self, training_data: list[np.float64], order: Optional[tuple[int, int, int]] = None) -> Any: + """ + Fits the ARIMA model on the provided training data. + + If the data variance is very low, it fits the model as a simple mean model. + Otherwise, the standard ARIMA(0,0,0) model is used, which is suitable for stationary data. + Convergence warnings are suppressed to avoid cluttering output. + + :param training_data: the time series data to train the model on. + :param order: the (p,d,q) order of the model for the autoregressive, differences, and moving average components. + :return: the residuals of the model after fitting. + """ + self.__training_data = training_data + if order: + self.__order = order + + assert self.__order is not None, "Model order was not specified." + + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=UserWarning, message=".*Non-(stationary|invertible) starting.*") + warnings.filterwarnings("ignore", category=ConvergenceWarning) + model = ARIMA(self.__training_data, order=self.__order) + self.__results = model.fit() + + return self.__results.resid + + def predict(self, steps: int) -> Any: + """ + Forecasts future values of the time series. + :param steps: the number of steps to forecast ahead. + :return: a list of forecasted values. + """ + assert self.__results is not None, "Model must be fitted before prediction." + + return self.__results.forecast(steps=steps) + + def update(self, observation: list[np.float64]) -> Any: + """ + Appends a new observation to the fitted ARIMA model. + :param observation: the new observation to add to the model. + :return: + """ + assert self.__results is not None, "Model must be fitted before prediction." + + self.__results = self.__results.append(observation) diff --git a/pysatl_cpd/core/algorithms/arima/stats_tests/__init__.py b/pysatl_cpd/core/algorithms/arima/stats_tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pysatl_cpd/core/algorithms/arima/stats_tests/cusum.py b/pysatl_cpd/core/algorithms/arima/stats_tests/cusum.py new file mode 100644 index 00000000..cd3fdfd1 --- /dev/null +++ b/pysatl_cpd/core/algorithms/arima/stats_tests/cusum.py @@ -0,0 +1,100 @@ +""" +Module for implementation of the CUSUM statistical test for Predict&Compare CPD algorithm. +""" + +__author__ = "Aleksandra Ri" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + + +import numpy as np + +ZERO = np.float64(0.0) + + +class CuSum: + """ + Implements the cumulative sum (CUSUM) algorithm to detect shifts in the value of a process. + + This class tracks two cumulative sums: one for detecting an increase (positive CUSUM) + and one for detecting a decrease (negative CUSUM). An alarm is triggered if either + sum exceeds a predefined threshold. + """ + + def __init__(self) -> None: + self.__k_threshold: np.float64 = ZERO + self.__h_threshold: np.float64 = ZERO + + self.__positive_cusum: np.float64 = ZERO + self.__negative_cusum: np.float64 = ZERO + + # History tracks when the CUSUM was at 0. + # Start with True to ensure a valid index is found on the first detection + self.__positive_reset_history: list[bool] = [True] + self.__negative_reset_history: list[bool] = [True] + + def update(self, residual: np.float64) -> None: + """ + Updates the CUSUM statistics with a new residual value. + :param residual: the difference between the observed and expected value. + """ + # Update the positive CUSUM, resetting to 0 if it goes negative. + self.__positive_cusum = np.maximum(ZERO, self.__positive_cusum + residual - self.__k_threshold) + + # Update the negative CUSUM, resetting to 0 if it goes positive. + self.__negative_cusum = np.minimum(ZERO, self.__negative_cusum + residual + self.__k_threshold) + + self.__positive_reset_history.append(self.__positive_cusum == ZERO) + self.__negative_reset_history.append(self.__negative_cusum == ZERO) + + def is_change_detected(self) -> np.bool: + """ + Checks if either CUSUM statistic has exceeded its threshold. + :return: True if a change is detected, False otherwise. + """ + return self.__positive_cusum > self.__h_threshold or self.__negative_cusum < -self.__h_threshold + + def get_last_reset_index(self) -> int: + """ + Finds the number of time steps since the last CUSUM reset. + After a change is detected (i.e., `is_change_detected()` is True), this + method calculates how many steps have passed since the cumulative sum value + was last at zero. This is essential for localizing the starting point of the + detected anomaly. + :return: the number of steps back from the current time to the last reset. + """ + assert self.is_change_detected(), "This method should only be called when a change is detected." + + last_reset_index: int + if self.__positive_cusum > self.__h_threshold: + last_reset_index = list(reversed(self.__positive_reset_history)).index(True) + elif self.__negative_cusum < -self.__h_threshold: + last_reset_index = list(reversed(self.__negative_reset_history)).index(True) + else: # This branch is unreachable if the assertion holds, but NUMPY + last_reset_index = len(self.__positive_reset_history) + + return last_reset_index + + def update_thresholds(self, k: np.float64, h: np.float64) -> None: + """ + Updates the 'k' and 'h' thresholds for the CUSUM detector. + :param k: the new value for the allowable slack. + :param h: the new value for the decision threshold. + :return: + """ + self.__k_threshold = k + self.__h_threshold = h + + def clear(self) -> None: + """ + Resets the CUSUM detector to its initial state. + :return: + """ + self.__k_threshold = ZERO + self.__h_threshold = ZERO + + self.__positive_cusum = ZERO + self.__negative_cusum = ZERO + + self.__positive_reset_history = [True] + self.__negative_reset_history = [True] diff --git a/pysatl_cpd/core/algorithms/arima_online_algorithm.py b/pysatl_cpd/core/algorithms/arima_online_algorithm.py new file mode 100644 index 00000000..f3bdb59e --- /dev/null +++ b/pysatl_cpd/core/algorithms/arima_online_algorithm.py @@ -0,0 +1,294 @@ +""" +Module for implementation of Predict&Compare CPD algorithm. +""" + +__author__ = "Aleksandra Ri" +__copyright__ = "Copyright (c) 2025 PySATL project" +__license__ = "SPDX-License-Identifier: MIT" + + +from typing import Optional + +import numpy as np +import numpy.typing as npt + +from pysatl_cpd.core.algorithms.arima.models.arima import ArimaModel +from pysatl_cpd.core.algorithms.arima.stats_tests.cusum import CuSum +from pysatl_cpd.core.algorithms.online_algorithm import OnlineAlgorithm + + +class ArimaCusumAlgorithm(OnlineAlgorithm): + """ + An online change point detection algorithm that combines an ARIMA model + with multiple CUSUM detectors to monitor the model's residuals. + """ + + def __init__( + self, + training_size: int, + h_coefficient: float = 5.0, + ema_alpha: float = 0.05, + p: int = 0, + d: int = 0, + q: int = 0, + ): + """ + Initializes the ArimaCusumAlgorithm. + :param training_size: the number of data points for initial training. + :param h_coefficient: a multiplier for the statistic's standard deviation to set the CUSUM 'h' threshold. + :param ema_alpha: the smoothing factor for the exponential moving average (EMA) of the residual variance. + :param p: the order of the model for the autoregressive. + :param d: the order of the model for the differences. + :param q: the order of the model for the moving average components. + """ + self.__arima_model: ArimaModel = ArimaModel() + self.__order: Optional[tuple[int, int, int]] = (p, d, q) + + self.__training_buffer: list[np.float64] = [] + self.__training_size = training_size + self.__is_training: bool = True + + self.__h_coefficient = np.float64(h_coefficient) + self.__ema_alpha = ema_alpha + self.__sigma2: Optional[np.float64] = None + + self.__current_time: int = 0 + + self.__data_history: list[np.float64] = [] + self.__was_change_point: bool = False + self.__change_point: Optional[int] = None + + self.__cusum_detectors: dict[str, CuSum] = { + "mean": CuSum(), + "variance": CuSum(), + "skewness": CuSum(), + "kurtosis": CuSum(), + } + self.__residual_moment_baselines: dict[str, np.float64] = { + "mean": np.float64(0.0), + "variance": np.float64(0.0), + "skewness": np.float64(0.0), + "kurtosis": np.float64(0.0), + } + + self.__prev_residual_norm: np.float64 = np.float64(0.0) + + def clear(self) -> None: + self.__arima_model.clear() + + self.__training_buffer = [] + self.__is_training = True + self.__sigma2 = None + + self.__current_time = 0 + self.__data_history = [] + self.__was_change_point = False + self.__change_point = None + + for _, detector in self.__cusum_detectors.items(): + detector.clear() + + for residual_moment_baseline in self.__residual_moment_baselines: + self.__residual_moment_baselines[residual_moment_baseline] = np.float64(0.0) + + def __fit_model(self) -> None: + """ + Fits the ARIMA model on the collected buffer and calibrates CUSUM detectors. + :return: + """ + assert len(self.__training_buffer) >= self.__training_size, ( + "Training buffer is smaller than required training size." + ) + residuals = self.__arima_model.fit(self.__training_buffer, self.__order) + assert residuals is not None and len(residuals) != 0, "ARIMA fit did not produce valid residuals." + + self.__sigma2 = np.maximum(np.var(residuals), 1e-6) + self.__recalculate_cusum_thresholds(residuals) + + self.__training_buffer = [] + self.__is_training = False + + def __collect_training_data(self, observation: np.float64) -> None: + """ + Appends an observation to the training buffer and fits the model when full. + :param observation: a new data point. + :return: + """ + self.__training_buffer.append(observation) + + if len(self.__training_buffer) >= self.__training_size: + self.__fit_model() + + def __update_cusum_detectors(self, residual: np.float64) -> None: + """ + Updates all CUSUM detectors based on a new residual. + :param residual: the residual from the ARIMA model. + :return: + """ + assert self.__sigma2 is not None, "Sigma squared must be initialized before updating detectors." + + sigma = np.maximum(np.sqrt(self.__sigma2), 1e-3) + normalized_residual = residual / sigma + + # Calculate statistics for each moment, centered around their expected values + stats = { + "mean": normalized_residual - self.__residual_moment_baselines["mean"], + "variance": ((normalized_residual**2) - 1) - self.__residual_moment_baselines["variance"], + "skewness": (normalized_residual**3) - self.__residual_moment_baselines["skewness"], + "kurtosis": ((normalized_residual**4) - 3) - self.__residual_moment_baselines["kurtosis"], + } + self.__prev_residual_norm = normalized_residual + + for name, detector in self.__cusum_detectors.items(): + detector.update(stats[name]) + + def __check_for_change(self) -> dict[str, np.bool]: + """ + Polls each CUSUM detector for a change signal. + :return: a dictionary indicating which, if any, detector has fired. + """ + return {name: detector.is_change_detected() for name, detector in self.__cusum_detectors.items()} + + def __recalculate_cusum_thresholds(self, training_residuals: npt.NDArray[np.float64]) -> None: + """ + Recalculates and updates the thresholds for all CUSUM detectors. + :param training_residuals: the residuals produced during the training phase. + :return: + """ + assert len(training_residuals) != 0, "Cannot calculate thresholds on empty residuals." + + train_sigma = np.std(training_residuals) + train_sigma = np.maximum(train_sigma, 1e-6) + + normalized_residuals = training_residuals / train_sigma + + stats_arrays = { + "mean": normalized_residuals, + "variance": (normalized_residuals**2) - 1, + "skewness": normalized_residuals**3, + "kurtosis": (normalized_residuals**4) - 3, + } + + # Update moment baselines + for name, arr in stats_arrays.items(): + self.__residual_moment_baselines[name] = np.float64(np.mean(arr)) + + # Set k and h thresholds for each detector + for name, stats_array in stats_arrays.items(): + stat_std = np.std(stats_array) + stat_std = np.maximum(stat_std, 1e-6) + + k = np.float64(0.5) * stat_std + h = self.__h_coefficient * stat_std + + self.__cusum_detectors[name].update_thresholds(k=k, h=h) + + def __update_model_and_detectors(self, observation: np.float64) -> None: + """ + Updates the model with a new observation and runs the detection logic. + :param observation: a new data point. + :return: + """ + prediction = self.__arima_model.predict(1)[0] + residual = observation - prediction + + # Update variance estimate using EMA + assert self.__sigma2 is not None, "Model must be trained before updating (sigma2 is None)." + self.__sigma2 = (1 - self.__ema_alpha) * self.__sigma2 + self.__ema_alpha * (residual**2) + + self.__update_cusum_detectors(residual) + self.__arima_model.update([observation]) + + def __process_point(self, observation: np.float64) -> None: + """ + Universal method for processing of another observation of a time series. + :param observation: new observation of a time series. + :return: + """ + self.__current_time += 1 + self.__data_history.append(observation) + + if self.__is_training: + self.__collect_training_data(observation) + else: + self.__update_model_and_detectors(observation) + detections = self.__check_for_change() + + if any(detections.values()): + self.__handle_change_point(detections) + return + + def __handle_change_point(self, detections: dict[str, np.bool]) -> None: + """ + Executes the logic to handle a detected change point, including localization + and resetting the algorithm for retraining. + :param detections: the dictionary of detector results. + :return: + """ + assert any(detections), "Handler called without any detection." + self.__was_change_point = True + + # Localize the change point based on detector priority + priority = ["mean", "variance", "skewness", "kurtosis"] + for stat_name in priority: + if detections.get(stat_name): + self.__change_point = self.__current_time - self.__cusum_detectors[stat_name].get_last_reset_index() + break + + self.__arima_model.clear() + self.__is_training = True + for detector in self.__cusum_detectors.values(): + detector.clear() + + self.__residual_moment_baselines = { + "mean": np.float64(0.0), + "variance": np.float64(0.0), + "skewness": np.float64(0.0), + "kurtosis": np.float64(0.0), + } + self.__prev_residual_norm = np.float64(0.0) + self.__sigma2 = None + + # Keep data after the change point for retraining + assert self.__change_point is not None + time_after_cpd = self.__current_time - self.__change_point + self.__data_history = self.__data_history[-time_after_cpd:] if time_after_cpd > 0 else [] + self.__training_buffer = [] + + # Retrain on the data collected after the change point + for observation in self.__data_history: + if self.__is_training: + self.__collect_training_data(observation) + else: + self.__update_model_and_detectors(observation) + + def detect(self, observation: np.float64 | npt.NDArray[np.float64]) -> bool: + """ + Function for finding change points in window + + :param observation: part of global data for finding change points + :return: the number of change points in the window + """ + if isinstance(observation, np.ndarray): + raise TypeError("Multivariate observations are not supported") + + self.__process_point(np.float64(observation)) + result = self.__was_change_point + self.__was_change_point = False + return result + + def localize(self, observation: np.float64 | npt.NDArray[np.float64]) -> Optional[int]: + """ + Function for finding coordinates of change points in window + + :param observation: part of global data for finding change points + :return: list of window change points + """ + if isinstance(observation, np.ndarray): + raise TypeError("Multivariate observations are not supported") + + self.__process_point(np.float64(observation)) + result = self.__change_point + self.__was_change_point = False + self.__change_point = None + return result diff --git a/pysatl_cpd/core/algorithms/bayesian_linear_heuristic.py b/pysatl_cpd/core/algorithms/bayesian_linear_heuristic.py index add4cfbc..4f433ef5 100644 --- a/pysatl_cpd/core/algorithms/bayesian_linear_heuristic.py +++ b/pysatl_cpd/core/algorithms/bayesian_linear_heuristic.py @@ -89,7 +89,7 @@ def detect(self, observation: np.float64 | npt.NDArray[np.float64]) -> bool: :param observation: a new observation from a time series. Note: only univariate data is supported for now. :return: whether a change point was detected by a main algorithm. """ - if observation is npt.NDArray[np.float64]: + if isinstance(observation, np.ndarray): raise TypeError("Multivariate observations are not supported") assert self.__main_algorithm is not None, "Main algorithm must be initialized" @@ -111,7 +111,7 @@ def localize(self, observation: np.float64 | npt.NDArray[np.float64]) -> Optiona :param observation: a new observation from a time series. Note: only univariate data is supported for now. :return: a change point, if it was localized, None otherwise. """ - if observation is npt.NDArray[np.float64]: + if isinstance(observation, np.ndarray): raise TypeError("Multivariate observations are not supported") assert self.__main_algorithm is not None, "Main algorithm must be initialized" diff --git a/pysatl_cpd/core/algorithms/bayesian_online_algorithm.py b/pysatl_cpd/core/algorithms/bayesian_online_algorithm.py index 1fb6610d..7331a248 100644 --- a/pysatl_cpd/core/algorithms/bayesian_online_algorithm.py +++ b/pysatl_cpd/core/algorithms/bayesian_online_algorithm.py @@ -191,7 +191,7 @@ def detect(self, observation: np.float64 | npt.NDArray[np.float64]) -> bool: :param observation: new observation of a time series. Note: multivariate time series aren't supported for now. :return: whether a change point was detected after processing the new observation. """ - if observation is npt.NDArray[np.float64]: + if isinstance(observation, np.ndarray): raise TypeError("Multivariate observations are not supported") self.__process_point(np.float64(observation), False) @@ -206,7 +206,7 @@ def localize(self, observation: np.float64 | npt.NDArray[np.float64]) -> Optiona :return: absolute location of a change point, acquired after processing the new observation, or None if there wasn't any. """ - if observation is npt.NDArray[np.float64]: + if isinstance(observation, np.ndarray): raise TypeError("Multivariate observations are not supported") self.__process_point(np.float64(observation), True) diff --git a/pysatl_cpd/core/algorithms/kliep_algorithm.py b/pysatl_cpd/core/algorithms/kliep_algorithm.py index 39eb255c..531339e5 100644 --- a/pysatl_cpd/core/algorithms/kliep_algorithm.py +++ b/pysatl_cpd/core/algorithms/kliep_algorithm.py @@ -59,7 +59,7 @@ def detect(self, window: npt.NDArray[np.float64]) -> int: objective_function=self._loss_function, ) - return np.count_nonzero(weights > self.threshold) + return int(np.count_nonzero(weights > self.threshold)) def localize(self, window: npt.NDArray[np.float64]) -> list[int]: """Localize the change points in the given data window using KLIEP. diff --git a/pysatl_cpd/core/algorithms/rulsif_algorithm.py b/pysatl_cpd/core/algorithms/rulsif_algorithm.py index f0f04da8..1b828c56 100644 --- a/pysatl_cpd/core/algorithms/rulsif_algorithm.py +++ b/pysatl_cpd/core/algorithms/rulsif_algorithm.py @@ -57,7 +57,7 @@ def detect(self, window: npt.NDArray[np.float64]) -> int: objective_function=self._loss_function, ) - return np.count_nonzero(weights > self.threshold) + return int(np.count_nonzero(weights > self.threshold)) def localize(self, window: npt.NDArray[np.float64]) -> list[int]: """Localize the change points in the given data window using RULSIF. diff --git a/tests/test_core/test_algorithms/test_arima_online_algorithm.py b/tests/test_core/test_algorithms/test_arima_online_algorithm.py new file mode 100644 index 00000000..2d5ca970 --- /dev/null +++ b/tests/test_core/test_algorithms/test_arima_online_algorithm.py @@ -0,0 +1,90 @@ +import numpy as np +import pytest + +import pysatl_cpd.generator.distributions as dstr +from pysatl_cpd.core.algorithms.arima_online_algorithm import ArimaCusumAlgorithm + + +@pytest.fixture(scope="module") +def set_seed(): + np.random.seed(1) + + +@pytest.fixture +def algorithm_factory(): + def _factory(): + return ArimaCusumAlgorithm(training_size=5, h_coefficient=5, ema_alpha=0.05, p=0, d=0, q=0) + + return _factory + + +@pytest.fixture(scope="function") +def data_params(): + base_params = { + "num_of_tests": 10, + "size": 500, + "change_point": 250, + } + return base_params + + +@pytest.fixture() +def generate_data(data_params): + np.random.seed(1) + cp = data_params["change_point"] + size = data_params["size"] + + left_distr = dstr.Distribution.from_str(str(dstr.Distributions.BETA), {"alpha": "0.5", "beta": "0.5"}) + right_distr = dstr.Distribution.from_str(str(dstr.Distributions.NORMAL), {"mean": "1.0", "variance": "1.0"}) + return np.concatenate([left_distr.scipy_sample(cp), right_distr.scipy_sample(size - cp)]) + + +class TestArimaCusumAlgorithm: + @pytest.fixture(autouse=True) + def setup(self, algorithm_factory): + self.algorithm_factory = algorithm_factory + + def test_consecutive_detection(self, generate_data, data_params): + for _ in range(data_params["num_of_tests"]): + algorithm = self.algorithm_factory() + was_change_point = False + for value in generate_data: + result = algorithm.detect(value) + if result: + was_change_point = True + + assert was_change_point, "There was undetected change point in data" + + def test_correctness_of_consecutive_detection(self, generate_data, data_params): + outer_algorithm = self.algorithm_factory() + for _ in range(data_params["num_of_tests"]): + inner_algorithm = self.algorithm_factory() + outer_result = [] + inner_result = [] + + for value in generate_data: + outer_result.append(outer_algorithm.detect(value)) + inner_result.append(inner_algorithm.detect(value)) + + outer_algorithm.clear() + assert outer_result == inner_result, "Consecutive and independent detection should give same results" + + def test_correctness_of_consecutive_localization(self, generate_data, data_params): + outer_algorithm = self.algorithm_factory() + for _ in range(data_params["num_of_tests"]): + inner_algorithm = self.algorithm_factory() + + for value in generate_data: + outer_result = outer_algorithm.localize(value) + inner_result = inner_algorithm.localize(value) + assert outer_result == inner_result, "Consecutive and independent localization should give same results" + + outer_algorithm.clear() + + def test_online_localization_correctness(self, generate_data, data_params): + for _ in range(data_params["num_of_tests"]): + algorithm = self.algorithm_factory() + for time, value in np.ndenumerate(generate_data): + result = algorithm.localize(value) + if result: + assert result <= time[0] + 1, "Change point cannot be in future"