From 20cd1b9dcb407a3a9b3404f8bba5fd5b654918d9 Mon Sep 17 00:00:00 2001 From: alistkova Date: Wed, 26 Mar 2025 19:41:48 +0300 Subject: [PATCH 01/13] feat(kliep): rewrite kliep algorithm implementation --- pysatl_cpd/core/algorithms/kliep_algorithm.py | 176 ++++++++++++------ 1 file changed, 119 insertions(+), 57 deletions(-) diff --git a/pysatl_cpd/core/algorithms/kliep_algorithm.py b/pysatl_cpd/core/algorithms/kliep_algorithm.py index 39eb255c..beffca43 100644 --- a/pysatl_cpd/core/algorithms/kliep_algorithm.py +++ b/pysatl_cpd/core/algorithms/kliep_algorithm.py @@ -1,82 +1,144 @@ -from typing import cast +""" +Module for implementation of CPD algorithm using KLIEP-based divergence estimation. +""" + +__author__ = "Aleksandra Listkova" +__copyright__ = "Copyright (c) 2025 Aleksandra Listkova" +__license__ = "SPDX-License-Identifier: MIT" import numpy as np import numpy.typing as npt -from numpy import dtype, float64, ndarray +from scipy.optimize import minimize +from pysatl_cpd.core.algorithms.abstract_algorithm import Algorithm from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import DensityBasedAlgorithm -class KliepAlgorithm(DensityBasedAlgorithm): - """Kullback-Leibler Importance Estimation Procedure (KLIEP) algorithm - for change point detection. - - KLIEP estimates the density ratio between two distributions and uses - the importance weights for detecting changes in the data distribution. - """ +class KliepAlgorithm(Algorithm): + def __init__( + self, + bandwidth: float = 1.0, + regularization: float = 0.1, + threshold: float = 1.1, + max_iter: int = 100, + min_window_size: int = 10 + ): + """ + Initializes a new instance of KLIEP based change point detection algorithm. - def __init__(self, bandwidth: float, regularization_coef: float, threshold: float = 1.1): - """Initialize the KLIEP algorithm. + :param bandwidth: the bandwidth parameter for the kernel density estimation. + :param regularization: L2 regularization coefficient for the KLIEP optimization. + :param threshold: detection threshold for significant change points. + :param max_iter: maximum number of iterations for the L-BFGS-B optimizer. + :param min_window_size: minimum size of data segments to consider. + """ + self.bandwidth = bandwidth, + self.regularisation = regularization, + self.threshold = threshold, + self.max_iter = max_iter, + self.min_window_size = min_window_size - Args: - bandwidth (float): bandwidth parameter for density estimation. - regularization_coef (float): regularization parameter. - threshold (float, optional): threshold for detecting change points. - Defaults to 1.1. + def detect(self, window: npt.NDArray[np.float64]) -> int: """ - self.bandwidth = bandwidth - self.regularization_coef = regularization_coef - self.threshold = np.float64(threshold) + Finds change points in the given window. - def _loss_function(self, density_ratio: npt.NDArray[np.float64], alpha: npt.NDArray[np.float64]) -> float: - """Loss function for KLIEP. + :param window: input data window for change point detection. + :return: number of detected change points in the window. + """ + return len(self.localize(window)) - Args: - density_ratio (np.ndarray): estimated density ratio. - alpha (np.ndarray): coefficients for the density ratio. + def localize(self, window: npt.NDArray[np.float64]) -> list[int]: + """ + Identifies and returns the locations of change points in the window. - Returns: - float: the computed loss value. + :param window: input data window for change point localization. + :return: list of indices where change points were detected. """ - return -np.mean(density_ratio) + self.regularization_coef * np.sum(alpha**2) + window = self._validate_window(window) + if len(window) < self.min_window_size: + return [] - def detect(self, window: npt.NDArray[np.float64]) -> int: - """Detect the number of change points in the given data window - using KLIEP. + scores = self._compute_kliep_scores(window) + return self._find_change_points(scores) - Args: - window (Iterable[float]): the data window to detect change points. + def _validate_window(self, window: np.ndarray) -> np.ndarray: + """ + Validates and prepares the input window for processing. - Returns: - int: the number of detected change points. + :param window: input data window. + :return: validated window in 2D format. """ + window = np.asarray(window) + if window.ndim == 1: + window = window.reshape(-1, 1) + return window - window_sample = np.array(window) - weights = self._calculate_weights( - test_value=window_sample, - reference_value=window_sample, - bandwidth=self.bandwidth, - objective_function=self._loss_function, - ) + def _compute_kliep_scores(self, window: np.ndarray) -> np.ndarray: + """ + Computes KLIEP anomaly scores for each point in the window. - return np.count_nonzero(weights > self.threshold) + :param window: validated input data window. + :return: array of KLIEP scores for each point. + """ + n_points = window.shape(0) + scores = np.zeros(n_points) + + for i in range(self.min_window_size, n_points - self.min_window_size): + before = window[:i] + after = window[i:] + + before_density = DensityBasedAlgorithm._kernel_density_estimation( + before, self.bandwidth + ) + after_density = DensityBasedAlgorithm._kernel_density_estimation( + after, self.bandwidth + ) + + alpha = self._optimize_alpha(after_density, before_density) + scores[i] = np.mean(np.exp(after_density - before_density - alpha)) + + return scores + + def _optimize_alpha( + self, + test_density: np.ndarray, + ref_density: np.ndarray + ) -> np.ndarray: + """ + Optimizes the alpha parameters for KLIEP density ratio estimation. - def localize(self, window: npt.NDArray[np.float64]) -> list[int]: - """Localize the change points in the given data window using KLIEP. + :param test_density: density estimates for the test window. + :param ref_density: density estimates for the reference window. + :return: optimized alpha parameters. + """ + def loss(alpha: np.ndarray) -> float: + """Objective function for KLIEP optimization.""" + ratio = np.exp(test_density - ref_density - alpha) + return -np.mean(np.log(ratio)) + self.regularisation * np.sum(alpha**2) + + res = minimize( + loss, + np.zeros_like(test_density), + method='L-BFGS-B', + options={'maxiter': self.max_iter}, + bounds=[(0, None)] * len(test_density) + ) + return res.x - Args: - window (Iterable[float]): the data window to localize - change points. + def _find_change_points(self, scores: np.ndarray) -> list[int]: + """ + Identifies change points from computed KLIEP scores. - Returns: - List[int]: the indices of the detected change points. + :param scores: array of KLIEP scores for each point. + :return: list of detected change point indices. """ - window_sample = np.array(window) - weights: ndarray[tuple[int, ...], dtype[float64]] = self._calculate_weights( - test_value=window_sample, - reference_value=window_sample, - bandwidth=self.bandwidth, - objective_function=self._loss_function, - ) + candidates = np.where(scores > self.threshold)[0] + if len(candidates) == 0: + return [] + + change_points = [candidates[0]] + for points in candidates[1:]: + if points - change_points[-1] > self.min_window_size: + change_points.append(points) - return cast(list[int], np.where(weights > self.threshold)[0].tolist()) + return change_points From 8a430551005ee8df873638d742a8a9a35ee0bd34 Mon Sep 17 00:00:00 2001 From: Aleksandra Listkova <115529517+alistkova@users.noreply.github.com> Date: Tue, 8 Apr 2025 13:38:20 +0300 Subject: [PATCH 02/13] fix(kliep): typing fix --- pysatl_cpd/core/algorithms/kliep_algorithm.py | 44 +++++++++++-------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/pysatl_cpd/core/algorithms/kliep_algorithm.py b/pysatl_cpd/core/algorithms/kliep_algorithm.py index beffca43..4b3a1cd0 100644 --- a/pysatl_cpd/core/algorithms/kliep_algorithm.py +++ b/pysatl_cpd/core/algorithms/kliep_algorithm.py @@ -9,6 +9,8 @@ import numpy as np import numpy.typing as npt from scipy.optimize import minimize +from typing import List, Any +from typing import cast from pysatl_cpd.core.algorithms.abstract_algorithm import Algorithm from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import DensityBasedAlgorithm @@ -22,7 +24,7 @@ def __init__( threshold: float = 1.1, max_iter: int = 100, min_window_size: int = 10 - ): + ) -> None: """ Initializes a new instance of KLIEP based change point detection algorithm. @@ -32,10 +34,10 @@ def __init__( :param max_iter: maximum number of iterations for the L-BFGS-B optimizer. :param min_window_size: minimum size of data segments to consider. """ - self.bandwidth = bandwidth, - self.regularisation = regularization, - self.threshold = threshold, - self.max_iter = max_iter, + self.bandwidth = bandwidth + self.regularisation = regularization + self.threshold = threshold + self.max_iter = max_iter self.min_window_size = min_window_size def detect(self, window: npt.NDArray[np.float64]) -> int: @@ -61,27 +63,27 @@ def localize(self, window: npt.NDArray[np.float64]) -> list[int]: scores = self._compute_kliep_scores(window) return self._find_change_points(scores) - def _validate_window(self, window: np.ndarray) -> np.ndarray: + def _validate_window(self, window: npt.NDArray[Any]) -> npt.NDArray[np.float64]: """ Validates and prepares the input window for processing. :param window: input data window. :return: validated window in 2D format. """ - window = np.asarray(window) + window = np.asarray(window, dtype=np.float64) if window.ndim == 1: window = window.reshape(-1, 1) return window - def _compute_kliep_scores(self, window: np.ndarray) -> np.ndarray: + def _compute_kliep_scores(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: """ Computes KLIEP anomaly scores for each point in the window. :param window: validated input data window. :return: array of KLIEP scores for each point. """ - n_points = window.shape(0) - scores = np.zeros(n_points) + n_points = window.shape[0] + scores = np.zeros(n_points, dtype=np.float64) for i in range(self.min_window_size, n_points - self.min_window_size): before = window[:i] @@ -101,9 +103,9 @@ def _compute_kliep_scores(self, window: np.ndarray) -> np.ndarray: def _optimize_alpha( self, - test_density: np.ndarray, - ref_density: np.ndarray - ) -> np.ndarray: + test_density: npt.NDArray[np.float64], + ref_density: npt.NDArray[np.float64] + ) -> npt.NDArray[np.float64]: """ Optimizes the alpha parameters for KLIEP density ratio estimation. @@ -111,21 +113,25 @@ def _optimize_alpha( :param ref_density: density estimates for the reference window. :return: optimized alpha parameters. """ - def loss(alpha: np.ndarray) -> float: + def loss(alpha: npt.NDArray[np.float64]) -> float: """Objective function for KLIEP optimization.""" ratio = np.exp(test_density - ref_density - alpha) - return -np.mean(np.log(ratio)) + self.regularisation * np.sum(alpha**2) + loss_val = -np.mean(np.log(ratio)) + self.regularisation * np.sum(alpha**2) + return float(loss_val) + + initial_alpha = np.zeros_like(test_density, dtype=np.float64) + bounds = [(0, None)] * len(test_density) res = minimize( loss, - np.zeros_like(test_density), + initial_alpha, method='L-BFGS-B', options={'maxiter': self.max_iter}, - bounds=[(0, None)] * len(test_density) + bounds=bounds ) - return res.x + return cast(npt.NDArray[np.float64], res.x) - def _find_change_points(self, scores: np.ndarray) -> list[int]: + def _find_change_points(self, scores: npt.NDArray[np.float64]) -> List[int]: """ Identifies change points from computed KLIEP scores. From 7b166756d0e304ce6606e82914b31911cddcf1c0 Mon Sep 17 00:00:00 2001 From: alistkova Date: Tue, 8 Apr 2025 14:04:06 +0300 Subject: [PATCH 03/13] fix(kliep): organize imports --- pysatl_cpd/core/algorithms/kliep_algorithm.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pysatl_cpd/core/algorithms/kliep_algorithm.py b/pysatl_cpd/core/algorithms/kliep_algorithm.py index 4b3a1cd0..e3909267 100644 --- a/pysatl_cpd/core/algorithms/kliep_algorithm.py +++ b/pysatl_cpd/core/algorithms/kliep_algorithm.py @@ -6,11 +6,11 @@ __copyright__ = "Copyright (c) 2025 Aleksandra Listkova" __license__ = "SPDX-License-Identifier: MIT" +from typing import Any, cast + import numpy as np import numpy.typing as npt from scipy.optimize import minimize -from typing import List, Any -from typing import cast from pysatl_cpd.core.algorithms.abstract_algorithm import Algorithm from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import DensityBasedAlgorithm @@ -131,7 +131,7 @@ def loss(alpha: npt.NDArray[np.float64]) -> float: ) return cast(npt.NDArray[np.float64], res.x) - def _find_change_points(self, scores: npt.NDArray[np.float64]) -> List[int]: + def _find_change_points(self, scores: npt.NDArray[np.float64]) -> list[int]: """ Identifies change points from computed KLIEP scores. From dda8d5fa69443bf74df2caa6b8ba99496da7798a Mon Sep 17 00:00:00 2001 From: Aleksandra Listkova <115529517+alistkova@users.noreply.github.com> Date: Sat, 3 May 2025 19:49:47 +0300 Subject: [PATCH 04/13] fix: correct type annotations --- pysatl_cpd/core/algorithms/kliep_algorithm.py | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/pysatl_cpd/core/algorithms/kliep_algorithm.py b/pysatl_cpd/core/algorithms/kliep_algorithm.py index e3909267..7950b0f6 100644 --- a/pysatl_cpd/core/algorithms/kliep_algorithm.py +++ b/pysatl_cpd/core/algorithms/kliep_algorithm.py @@ -6,11 +6,11 @@ __copyright__ = "Copyright (c) 2025 Aleksandra Listkova" __license__ = "SPDX-License-Identifier: MIT" -from typing import Any, cast +from typing import Any, List, Tuple, Optional, cast import numpy as np import numpy.typing as npt -from scipy.optimize import minimize +from scipy.optimize import minimize # type: ignore[import-untyped] from pysatl_cpd.core.algorithms.abstract_algorithm import Algorithm from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import DensityBasedAlgorithm @@ -49,7 +49,7 @@ def detect(self, window: npt.NDArray[np.float64]) -> int: """ return len(self.localize(window)) - def localize(self, window: npt.NDArray[np.float64]) -> list[int]: + def localize(self, window: npt.NDArray[np.float64]) -> List[int]: """ Identifies and returns the locations of change points in the window. @@ -102,36 +102,35 @@ def _compute_kliep_scores(self, window: npt.NDArray[np.float64]) -> npt.NDArray[ return scores def _optimize_alpha( - self, - test_density: npt.NDArray[np.float64], - ref_density: npt.NDArray[np.float64] + self, + test_density: npt.NDArray[np.float64], + ref_density: npt.NDArray[np.float64] ) -> npt.NDArray[np.float64]: """ Optimizes the alpha parameters for KLIEP density ratio estimation. - - :param test_density: density estimates for the test window. - :param ref_density: density estimates for the reference window. - :return: optimized alpha parameters. """ - def loss(alpha: npt.NDArray[np.float64]) -> float: - """Objective function for KLIEP optimization.""" + def loss(alpha: npt.NDArray[np.float64]) -> np.float64: ratio = np.exp(test_density - ref_density - alpha) - loss_val = -np.mean(np.log(ratio)) + self.regularisation * np.sum(alpha**2) - return float(loss_val) + return np.float64(-np.mean(np.log(ratio)) + self.regularisation * np.sum(alpha**2)) - initial_alpha = np.zeros_like(test_density, dtype=np.float64) - bounds = [(0, None)] * len(test_density) + initial_alpha: npt.NDArray[np.float64] = np.zeros_like(test_density).flatten() + bounds: List[Tuple[float, Optional[float]]] = [(0.0, None) for _ in test_density.flatten()] + + def wrapped_loss(alpha_flat: npt.NDArray[np.float64]) -> float: + alpha = alpha_flat.reshape(test_density.shape) + return float(loss(alpha)) res = minimize( - loss, + wrapped_loss, initial_alpha, method='L-BFGS-B', options={'maxiter': self.max_iter}, bounds=bounds ) - return cast(npt.NDArray[np.float64], res.x) + + return cast(npt.NDArray[np.float64], res.x.reshape(test_density.shape)) - def _find_change_points(self, scores: npt.NDArray[np.float64]) -> list[int]: + def _find_change_points(self, scores: npt.NDArray[np.float64]) -> List[int]: """ Identifies change points from computed KLIEP scores. @@ -142,9 +141,10 @@ def _find_change_points(self, scores: npt.NDArray[np.float64]) -> list[int]: if len(candidates) == 0: return [] - change_points = [candidates[0]] - for points in candidates[1:]: - if points - change_points[-1] > self.min_window_size: - change_points.append(points) + change_points = [int(candidates[0])] + for point in candidates[1:]: + if point - change_points[-1] > self.min_window_size: + change_points.append(int(point)) return change_points + From 32545726d0adf58f083248784b572ff4b3a520d7 Mon Sep 17 00:00:00 2001 From: Aleksandra Listkova <115529517+alistkova@users.noreply.github.com> Date: Sat, 3 May 2025 20:02:25 +0300 Subject: [PATCH 05/13] style: modernize type annotations and clean imports --- pysatl_cpd/core/algorithms/kliep_algorithm.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/pysatl_cpd/core/algorithms/kliep_algorithm.py b/pysatl_cpd/core/algorithms/kliep_algorithm.py index 7950b0f6..f98fa541 100644 --- a/pysatl_cpd/core/algorithms/kliep_algorithm.py +++ b/pysatl_cpd/core/algorithms/kliep_algorithm.py @@ -6,8 +6,7 @@ __copyright__ = "Copyright (c) 2025 Aleksandra Listkova" __license__ = "SPDX-License-Identifier: MIT" -from typing import Any, List, Tuple, Optional, cast - +from typing import Any, Optional, cast import numpy as np import numpy.typing as npt from scipy.optimize import minimize # type: ignore[import-untyped] @@ -49,7 +48,7 @@ def detect(self, window: npt.NDArray[np.float64]) -> int: """ return len(self.localize(window)) - def localize(self, window: npt.NDArray[np.float64]) -> List[int]: + def localize(self, window: npt.NDArray[np.float64]) -> list[int]: """ Identifies and returns the locations of change points in the window. @@ -102,9 +101,9 @@ def _compute_kliep_scores(self, window: npt.NDArray[np.float64]) -> npt.NDArray[ return scores def _optimize_alpha( - self, - test_density: npt.NDArray[np.float64], - ref_density: npt.NDArray[np.float64] + self, + test_density: npt.NDArray[np.float64], + ref_density: npt.NDArray[np.float64] ) -> npt.NDArray[np.float64]: """ Optimizes the alpha parameters for KLIEP density ratio estimation. @@ -114,8 +113,8 @@ def loss(alpha: npt.NDArray[np.float64]) -> np.float64: return np.float64(-np.mean(np.log(ratio)) + self.regularisation * np.sum(alpha**2)) initial_alpha: npt.NDArray[np.float64] = np.zeros_like(test_density).flatten() - bounds: List[Tuple[float, Optional[float]]] = [(0.0, None) for _ in test_density.flatten()] - + bounds: list[tuple[float, Optional[float]]] = [(0.0, None) for _ in test_density.flatten()] + def wrapped_loss(alpha_flat: npt.NDArray[np.float64]) -> float: alpha = alpha_flat.reshape(test_density.shape) return float(loss(alpha)) @@ -127,10 +126,10 @@ def wrapped_loss(alpha_flat: npt.NDArray[np.float64]) -> float: options={'maxiter': self.max_iter}, bounds=bounds ) - + return cast(npt.NDArray[np.float64], res.x.reshape(test_density.shape)) - def _find_change_points(self, scores: npt.NDArray[np.float64]) -> List[int]: + def _find_change_points(self, scores: npt.NDArray[np.float64]) -> list[int]: """ Identifies change points from computed KLIEP scores. From 0a316cb03df9357b38588f2f61255a015c0764e3 Mon Sep 17 00:00:00 2001 From: Aleksandra Listkova <115529517+alistkova@users.noreply.github.com> Date: Sat, 3 May 2025 20:14:49 +0300 Subject: [PATCH 06/13] style: organize imports --- pysatl_cpd/core/algorithms/kliep_algorithm.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pysatl_cpd/core/algorithms/kliep_algorithm.py b/pysatl_cpd/core/algorithms/kliep_algorithm.py index f98fa541..0735348a 100644 --- a/pysatl_cpd/core/algorithms/kliep_algorithm.py +++ b/pysatl_cpd/core/algorithms/kliep_algorithm.py @@ -7,12 +7,14 @@ __license__ = "SPDX-License-Identifier: MIT" from typing import Any, Optional, cast + import numpy as np import numpy.typing as npt from scipy.optimize import minimize # type: ignore[import-untyped] from pysatl_cpd.core.algorithms.abstract_algorithm import Algorithm -from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import DensityBasedAlgorithm +from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import \ + DensityBasedAlgorithm class KliepAlgorithm(Algorithm): @@ -114,7 +116,7 @@ def loss(alpha: npt.NDArray[np.float64]) -> np.float64: initial_alpha: npt.NDArray[np.float64] = np.zeros_like(test_density).flatten() bounds: list[tuple[float, Optional[float]]] = [(0.0, None) for _ in test_density.flatten()] - + def wrapped_loss(alpha_flat: npt.NDArray[np.float64]) -> float: alpha = alpha_flat.reshape(test_density.shape) return float(loss(alpha)) @@ -126,7 +128,7 @@ def wrapped_loss(alpha_flat: npt.NDArray[np.float64]) -> float: options={'maxiter': self.max_iter}, bounds=bounds ) - + return cast(npt.NDArray[np.float64], res.x.reshape(test_density.shape)) def _find_change_points(self, scores: npt.NDArray[np.float64]) -> list[int]: @@ -146,4 +148,3 @@ def _find_change_points(self, scores: npt.NDArray[np.float64]) -> list[int]: change_points.append(int(point)) return change_points - From 73945151381bc1540eed61380f68f8703b3cced1 Mon Sep 17 00:00:00 2001 From: Aleksandra Listkova <115529517+alistkova@users.noreply.github.com> Date: Sat, 3 May 2025 20:31:31 +0300 Subject: [PATCH 07/13] style: organize imports --- pysatl_cpd/core/algorithms/kliep_algorithm.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pysatl_cpd/core/algorithms/kliep_algorithm.py b/pysatl_cpd/core/algorithms/kliep_algorithm.py index 0735348a..e53aee38 100644 --- a/pysatl_cpd/core/algorithms/kliep_algorithm.py +++ b/pysatl_cpd/core/algorithms/kliep_algorithm.py @@ -13,8 +13,7 @@ from scipy.optimize import minimize # type: ignore[import-untyped] from pysatl_cpd.core.algorithms.abstract_algorithm import Algorithm -from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import \ - DensityBasedAlgorithm +from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import DensityBasedAlgorithm class KliepAlgorithm(Algorithm): From 8bc03e468b00b7daef2afa981a570498b39471b1 Mon Sep 17 00:00:00 2001 From: Aleksandra Listkova <115529517+alistkova@users.noreply.github.com> Date: Thu, 29 May 2025 22:18:14 +0300 Subject: [PATCH 08/13] fix: np type error --- pysatl_cpd/core/algorithms/kliep_algorithm.py | 162 +++++++----------- 1 file changed, 65 insertions(+), 97 deletions(-) diff --git a/pysatl_cpd/core/algorithms/kliep_algorithm.py b/pysatl_cpd/core/algorithms/kliep_algorithm.py index e53aee38..b0f8e639 100644 --- a/pysatl_cpd/core/algorithms/kliep_algorithm.py +++ b/pysatl_cpd/core/algorithms/kliep_algorithm.py @@ -6,144 +6,112 @@ __copyright__ = "Copyright (c) 2025 Aleksandra Listkova" __license__ = "SPDX-License-Identifier: MIT" -from typing import Any, Optional, cast - import numpy as np import numpy.typing as npt -from scipy.optimize import minimize # type: ignore[import-untyped] +from scipy.optimize import minimize # type: ignore -from pysatl_cpd.core.algorithms.abstract_algorithm import Algorithm from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import DensityBasedAlgorithm -class KliepAlgorithm(Algorithm): +class KliepAlgorithm(DensityBasedAlgorithm): def __init__( self, bandwidth: float = 1.0, regularization: float = 0.1, threshold: float = 1.1, max_iter: int = 100, - min_window_size: int = 10 + min_window_size: int = 10, ) -> None: """ - Initializes a new instance of KLIEP based change point detection algorithm. + Initializes a new instance of KLIEP-based change point detection algorithm. - :param bandwidth: the bandwidth parameter for the kernel density estimation. - :param regularization: L2 regularization coefficient for the KLIEP optimization. - :param threshold: detection threshold for significant change points. - :param max_iter: maximum number of iterations for the L-BFGS-B optimizer. - :param min_window_size: minimum size of data segments to consider. + :param bandwidth: kernel bandwidth for density estimation. + :param regularization: regularization coefficient for alpha optimization. + :param threshold: threshold for change point detection. + :param max_iter: maximum iterations for optimization solver. + :param min_window_size: minimum segment size for reliable estimation. """ - self.bandwidth = bandwidth - self.regularisation = regularization - self.threshold = threshold + super().__init__(min_window_size, threshold, bandwidth) + self.regularization = regularization self.max_iter = max_iter - self.min_window_size = min_window_size - - def detect(self, window: npt.NDArray[np.float64]) -> int: - """ - Finds change points in the given window. - - :param window: input data window for change point detection. - :return: number of detected change points in the window. - """ - return len(self.localize(window)) - - def localize(self, window: npt.NDArray[np.float64]) -> list[int]: - """ - Identifies and returns the locations of change points in the window. - - :param window: input data window for change point localization. - :return: list of indices where change points were detected. - """ - window = self._validate_window(window) - if len(window) < self.min_window_size: - return [] - - scores = self._compute_kliep_scores(window) - return self._find_change_points(scores) - - def _validate_window(self, window: npt.NDArray[Any]) -> npt.NDArray[np.float64]: - """ - Validates and prepares the input window for processing. - - :param window: input data window. - :return: validated window in 2D format. - """ - window = np.asarray(window, dtype=np.float64) - if window.ndim == 1: - window = window.reshape(-1, 1) - return window - def _compute_kliep_scores(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: + def _compute_scores(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: """ - Computes KLIEP anomaly scores for each point in the window. + Computes KLIEP-based change point scores for each position in the window. - :param window: validated input data window. - :return: array of KLIEP scores for each point. + :param window: input data window (1D array). + :return: array of change point scores at each index. """ n_points = window.shape[0] - scores = np.zeros(n_points, dtype=np.float64) + scores: npt.NDArray[np.float64] = np.zeros(n_points, dtype=np.float64) + common_grid = self._build_common_grid(window) for i in range(self.min_window_size, n_points - self.min_window_size): before = window[:i] after = window[i:] - before_density = DensityBasedAlgorithm._kernel_density_estimation( - before, self.bandwidth - ) - after_density = DensityBasedAlgorithm._kernel_density_estimation( - after, self.bandwidth - ) + before_density = self._kde_on_grid(before, self.bandwidth, common_grid) + after_density = self._kde_on_grid(after, self.bandwidth, common_grid) alpha = self._optimize_alpha(after_density, before_density) - scores[i] = np.mean(np.exp(after_density - before_density - alpha)) - + scores[i] = np.mean(np.log(after_density + 1e-10)) - np.mean(np.log(before_density + 1e-10)) - alpha return scores def _optimize_alpha( - self, - test_density: npt.NDArray[np.float64], - ref_density: npt.NDArray[np.float64] - ) -> npt.NDArray[np.float64]: - """ - Optimizes the alpha parameters for KLIEP density ratio estimation. + self, + test_density: npt.NDArray[np.float64], + ref_density: npt.NDArray[np.float64] + ) -> float: """ - def loss(alpha: npt.NDArray[np.float64]) -> np.float64: - ratio = np.exp(test_density - ref_density - alpha) - return np.float64(-np.mean(np.log(ratio)) + self.regularisation * np.sum(alpha**2)) - - initial_alpha: npt.NDArray[np.float64] = np.zeros_like(test_density).flatten() - bounds: list[tuple[float, Optional[float]]] = [(0.0, None) for _ in test_density.flatten()] + Optimizes alpha parameter for density ratio estimation. - def wrapped_loss(alpha_flat: npt.NDArray[np.float64]) -> float: - alpha = alpha_flat.reshape(test_density.shape) - return float(loss(alpha)) + :param test_density: KDE values for test segment (after potential CP). + :param ref_density: KDE values for reference segment (before potential CP). + :return: optimal alpha value for density ratio adjustment. + """ + def loss(alpha: float) -> float: + ratio = np.exp(np.log(test_density) - np.log(ref_density + 1e-10) - alpha) + return float(-np.mean(np.log(ratio + 1e-10)) + self.regularization * alpha**2) res = minimize( - wrapped_loss, - initial_alpha, + loss, + x0=0.0, method='L-BFGS-B', - options={'maxiter': self.max_iter}, - bounds=bounds + bounds=[(0.0, None)], + options={'maxiter': self.max_iter} ) + return float(res.x[0]) - return cast(npt.NDArray[np.float64], res.x.reshape(test_density.shape)) - - def _find_change_points(self, scores: npt.NDArray[np.float64]) -> list[int]: + def _build_common_grid(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: """ - Identifies change points from computed KLIEP scores. + Creates evaluation grid for density estimation. - :param scores: array of KLIEP scores for each point. - :return: list of detected change point indices. + :param window: input data window. + :return: grid spanning data range with bandwidth-adjusted margins. """ - candidates = np.where(scores > self.threshold)[0] - if len(candidates) == 0: - return [] + return np.linspace( + np.min(window) - 3 * self.bandwidth, + np.max(window) + 3 * self.bandwidth, + 1000, + dtype=np.float64 + ) - change_points = [int(candidates[0])] - for point in candidates[1:]: - if point - change_points[-1] > self.min_window_size: - change_points.append(int(point)) + def _kde_on_grid( + self, + observation: npt.NDArray[np.float64], + bandwidth: float, + grid: npt.NDArray[np.float64] + ) -> npt.NDArray[np.float64]: + """ + Computes kernel density estimate on specified grid. - return change_points + :param observation: data points for KDE. + :param bandwidth: kernel bandwidth parameter. + :param grid: evaluation grid points. + :return: density values at grid points. + """ + n = observation.shape[0] + diff = grid[:, np.newaxis] - observation + kernel_vals = np.exp(-0.5 * (diff / bandwidth) ** 2) + kde_vals = kernel_vals.sum(axis=1) + return np.asarray(kde_vals / (n * bandwidth * np.sqrt(2 * np.pi)), dtype=np.float64) From b77fbe012e23ea819f08aaf617e29c1d6d1f2a9b Mon Sep 17 00:00:00 2001 From: Aleksandra Listkova <115529517+alistkova@users.noreply.github.com> Date: Thu, 29 May 2025 22:26:53 +0300 Subject: [PATCH 09/13] fix: upd density based --- .../abstracts/density_based_algorithm.py | 189 +++++++++++------- 1 file changed, 116 insertions(+), 73 deletions(-) diff --git a/pysatl_cpd/core/algorithms/density/abstracts/density_based_algorithm.py b/pysatl_cpd/core/algorithms/density/abstracts/density_based_algorithm.py index 0cea21bb..f2412d2e 100644 --- a/pysatl_cpd/core/algorithms/density/abstracts/density_based_algorithm.py +++ b/pysatl_cpd/core/algorithms/density/abstracts/density_based_algorithm.py @@ -1,112 +1,155 @@ from abc import abstractmethod -from collections.abc import Callable -from typing import TypeAlias +from typing import Any, Callable, TypeAlias import numpy as np import numpy.typing as npt -from scipy.optimize import minimize from pysatl_cpd.core.algorithms.abstract_algorithm import Algorithm _TObjFunc: TypeAlias = Callable[[npt.NDArray[np.float64], npt.NDArray[np.float64]], float] _TMetrics: TypeAlias = dict[str, int | float] - class DensityBasedAlgorithm(Algorithm): - @staticmethod - def _kernel_density_estimation(observation: npt.NDArray[np.float64], bandwidth: float) -> npt.NDArray[np.float64]: - """Perform kernel density estimation on the given observations without fitting a model. + """Abstract base class for density-based change point detection algorithms. - :param observation: the data points for which to estimate the density. - :param bandwidth: the bandwidth parameter for the kernel density estimation. + Provides common infrastructure for methods that detect change points by + analyzing probability density changes in data segments. + """ + def __init__( + self, + min_window_size: int = 10, + threshold: float = 1.1, + bandwidth: float = 1.0 + ) -> None: + """ + Initializes density-based change point detector. - :return: estimated density values for the observations. + :param min_window_size: minimum data points required in each segment + :param threshold: detection sensitivity (higher = fewer detections) + :param bandwidth:kernel bandwidth for density estimation """ - n = len(observation) - x_grid = np.linspace(np.min(observation) - 3 * bandwidth, np.max(observation) + 3 * bandwidth, 1000) - kde_values = np.zeros_like(x_grid) - for x in observation: - kde_values += np.exp(-0.5 * ((x_grid - x) / bandwidth) ** 2) + self.min_window_size = min_window_size + self.threshold = threshold + self.bandwidth = bandwidth - kde_values /= n * bandwidth * np.sqrt(2 * np.pi) - return kde_values + def detect(self, window: npt.NDArray[np.float64]) -> int: + """Counts change points in the given data window. - def _calculate_weights( - self, - test_value: npt.NDArray[np.float64], - reference_value: npt.NDArray[np.float64], - bandwidth: float, - objective_function: _TObjFunc, - ) -> npt.NDArray[np.float64]: - """Calculate the weights based on the density ratio between test and reference values. + :param window: input data array (1D or 2D) + :return: number of detected change points + """ + return len(self.localize(window)) - :param test_value: the test data points. - :param reference_value: the reference data points. - :param bandwidth: the bandwidth parameter for the kernel density estimation. - :param objective_function: the objective function to minimize. + def localize(self, window: npt.NDArray[np.float64]) -> list[int]: + """Identifies positions of change points in the data window. - :return: the calculated density ratios normalized to their mean. + :param window: input data array (1D or 2D) + :return: list of change point indices """ - test_density = self._kernel_density_estimation(test_value, bandwidth) - reference_density = self._kernel_density_estimation(reference_value, bandwidth) + window = self._validate_window(window) + if not self._is_window_valid(window): + return [] + scores = self._compute_scores(window) + return self._find_change_points(scores) - def objective_function_wrapper(alpha: npt.NDArray[np.float64], /) -> float: - """Wrapper for the objective function to calculate the density ratio. + def _validate_window(self, window: npt.NDArray[Any]) -> npt.NDArray[np.float64]: + """Ensures input window meets processing requirements. - :param alpha: relative parameter that controls the weighting between the numerator distribution - and the denominator distribution in the density ratio estimation. + :param window: raw input data + :return: validated 2D float64 array + """ + window_arr = np.asarray(window, dtype=np.float64) + if window_arr.ndim == 1: + window_arr = window_arr.reshape(-1, 1).astype(np.float64) + return np.array(window_arr, dtype=np.float64) - :return: the value of the objective function to minimize. - """ - objective_density_ratio = np.exp(test_density - reference_density - alpha) - return objective_function(objective_density_ratio, alpha) + def _find_change_points(self, scores: npt.NDArray[np.float64]) -> list[int]: + """Filters candidate points using threshold and minimum separation. - res = minimize(objective_function_wrapper, np.zeros(len(test_value)), method="L-BFGS-B") - optimized_alpha: npt.NDArray[np.float64] = res.x - density_ratio: npt.NDArray[np.float64] = np.exp(test_density - reference_density - optimized_alpha) - return density_ratio / np.mean(density_ratio) + :param scores: change point scores for each position + :return: filtered list of change point indices + """ + candidates = np.where(scores > self.threshold)[0] + if not candidates.size: + return [] + change_points = [int(candidates[0])] + for point in candidates[1:]: + if point - change_points[-1] > self.min_window_size: + change_points.append(int(point)) + return change_points + + def _is_window_valid(self, window: npt.NDArray[np.float64]) -> bool: + """Verifies window meets minimum size requirements. + + :param window: input data window + :return: True if window can be processed, else False + """ + return len(window) >= 2 * self.min_window_size - @abstractmethod - def detect(self, window: npt.NDArray[np.float64]) -> int: - # maybe rtype tuple[int] - """Function for finding change points in window + @staticmethod + def _kernel_density_estimation( + observation: npt.NDArray[np.float64], + bandwidth: float + ) -> npt.NDArray[np.float64]: + """Computes kernel density estimate using Gaussian kernels. - :param window: part of global data for finding change points - :return: list of right borders of window change points + :param observation: data points for density estimation + :param bandwidth: smoothing parameter for KDE + :return: density values at evaluation points """ - raise NotImplementedError + n = observation.shape[0] + x_grid: npt.NDArray[np.float64] = np.linspace( + np.min(observation) - 3*bandwidth, + np.max(observation) + 3*bandwidth, + 1000, + dtype=np.float64 + ) + + diff = x_grid[:, np.newaxis] - observation + kernel_vals = np.exp(-0.5 * (diff / bandwidth) ** 2) + kde_vals = kernel_vals.sum(axis=1) + + return np.asarray(kde_vals / (n * bandwidth * np.sqrt(2*np.pi)), dtype=np.float64) @abstractmethod - def localize(self, window: npt.NDArray[np.float64]) -> list[int]: - """Function for finding coordinates of change points in window + def _compute_scores(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: + """Computes change point scores. - :param window: part of global data for finding change points - :return: list of window change points + :param window: validated input data + :return: array of change point scores """ raise NotImplementedError @staticmethod - def evaluate_detection_accuracy(true_change_points: list[int], detected_change_points: list[int]) -> _TMetrics: - """Evaluate the accuracy of change point detection. - - :param true_change_points: list of true change point indices. - :param detected_change_points: list of detected change point indices. - - :return: a dictionary with evaluation metrics (precision, recall, F1 score). + def evaluate_detection_accuracy( + true_change_points: list[int], + detected_change_points: list[int] + ) -> _TMetrics: + """Computes detection performance metrics. + + :param true_change_points: ground truth change points + :param detected_change_points: algorithm-detected change points + :return: dictionary containing precision, recall, F1, and error counts """ - true_positive = len(set(true_change_points) & set(detected_change_points)) - false_positive = len(set(detected_change_points) - set(true_change_points)) - false_negative = len(set(true_change_points) - set(detected_change_points)) - - precision = true_positive / (true_positive + false_positive) if true_positive + false_positive > 0 else 0.0 - recall = true_positive / (true_positive + false_negative) if true_positive + false_negative > 0 else 0.0 - f1_score = (2 * precision * recall / (precision + recall)) if (precision + recall) > 0 else 0.0 + true_positives = len(set(true_change_points) & set(detected_change_points)) + false_positives = len(set(detected_change_points) - set(true_change_points)) + false_negatives = len(set(true_change_points) - set(detected_change_points)) + + precision = ( + true_positives / (true_positives + false_positives) + if (true_positives + false_positives) > 0 else 0.0 + ) + recall = ( + true_positives / (true_positives + false_negatives) + if (true_positives + false_negatives) > 0 else 0.0 + ) + f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0 return { "precision": precision, "recall": recall, - "f1_score": f1_score, - "true_positive": true_positive, - "false_positive": false_positive, - "false_negative": false_negative, + "f1_score": f1, + "true_positive": true_positives, + "false_positive": false_positives, + "false_negative": false_negatives, } From d313d3eab49556fcfca7ae76fd74a6e89cc8391c Mon Sep 17 00:00:00 2001 From: Aleksandra Listkova <115529517+alistkova@users.noreply.github.com> Date: Thu, 29 May 2025 22:32:27 +0300 Subject: [PATCH 10/13] feat: rewrite rulsif algorithm implementation --- .../core/algorithms/rulsif_algorithm.py | 115 ++++++++---------- 1 file changed, 50 insertions(+), 65 deletions(-) diff --git a/pysatl_cpd/core/algorithms/rulsif_algorithm.py b/pysatl_cpd/core/algorithms/rulsif_algorithm.py index f0f04da8..1a808b2e 100644 --- a/pysatl_cpd/core/algorithms/rulsif_algorithm.py +++ b/pysatl_cpd/core/algorithms/rulsif_algorithm.py @@ -1,79 +1,64 @@ -from typing import cast +""" +Module for implementation of CPD algorithm using RULSIF-based divergence estimation. +""" + +__author__ = "Aleksandra Listkova" +__copyright__ = "Copyright (c) 2025 Aleksandra Listkova" +__license__ = "SPDX-License-Identifier: MIT" import numpy as np import numpy.typing as npt +from scipy.linalg import solve from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import DensityBasedAlgorithm class RulsifAlgorithm(DensityBasedAlgorithm): - """Relative Unconstrained Least-Squares Importance Fitting (RULSIF) - algorithm for change point detection. - - RULSIF estimates the density ratio between two distributions and uses - the importance weights for detecting changes in the data distribution. - """ - - def __init__(self, bandwidth: float, regularization_coef: float, threshold: float = 1.1): - """Initialize the RULSIF algorithm. - - Args: - bandwidth (float): bandwidth parameter for density estimation. - regularization_coef (float): regularization parameter. - threshold (float, optional): threshold for detecting change points. - Defaults to 1.1. + def __init__( + self, + alpha: float = 0.1, + bandwidth: float = 1.0, + lambda_reg: float = 0.1, + threshold: float = 1.1, + min_window_size: int = 10, + ) -> None: """ - self.bandwidth = bandwidth - self.regularization_coef = regularization_coef - self.threshold = threshold - - def _loss_function(self, density_ratio: npt.NDArray[np.float64], alpha: npt.NDArray[np.float64]) -> float: - """Loss function for RULSIF. - - Args: - density_ratio (np.ndarray): estimated density ratio. - alpha (np.ndarray): coefficients for the density ratio. - - Returns: - float: the computed loss value. + Initializes RULSIF-based change point detector. + + :param alpha: mixture coefficient (0-1) for reference/test densities + :param bandwidth: kernel bandwidth for density estimation + :param lambda_reg: L2 regularization strength + :param threshold: detection sensitivity threshold + :param min_window_size: minimum segment size requirement + :raises ValueError: if alpha is not in (0,1) """ - return np.mean((density_ratio - 1) ** 2) + self.regularization_coef * np.sum(alpha**2) + super().__init__(min_window_size, threshold, bandwidth) + if not 0 < alpha < 1: + raise ValueError("Alpha must be between 0 and 1") + self.alpha = alpha + self.lambda_reg = lambda_reg - def detect(self, window: npt.NDArray[np.float64]) -> int: - """Detect the number of change points in the given data window - using RULSIF. - - Args: - window (Iterable[float]): the data window to detect change points. - - Returns: - int: the number of detected change points. + def _compute_scores(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: """ - window_sample = np.array(window) - weights = self._calculate_weights( - test_value=window_sample, - reference_value=window_sample, - bandwidth=self.bandwidth, - objective_function=self._loss_function, - ) - - return np.count_nonzero(weights > self.threshold) + Computes RULSIF-based change point scores for each position. - def localize(self, window: npt.NDArray[np.float64]) -> list[int]: - """Localize the change points in the given data window using RULSIF. - - Args: - window (Iterable[float]): the data window to localize change points. - - Returns: - List[int]: the indices of the detected change points. + :param window: input data window (1D array) + :return: array of divergence scores at each index """ - window_sample = np.array(window) - weights = self._calculate_weights( - test_value=window_sample, - reference_value=window_sample, - bandwidth=self.bandwidth, - objective_function=self._loss_function, - ) - - return cast(list[int], np.where(weights > self.threshold)[0].tolist()) + n_points = window.shape[0] + scores: npt.NDArray[np.float64] = np.zeros(n_points, dtype=np.float64) + for i in range(self.min_window_size, n_points - self.min_window_size): + ref = window[:i] + test = window[i:] + K_ref = self._kernel_density_estimation(ref, self.bandwidth) + K_test = self._kernel_density_estimation(test, self.bandwidth) + H = ( + (1 - self.alpha) * (K_ref @ K_ref.T) / i + + self.alpha * (K_test @ K_test.T) / (n_points - i) + + self.lambda_reg * np.eye(K_ref.shape[0], dtype=np.float64) + ) + h = K_test.mean(axis=1) + theta = solve(H, h, assume_a='pos') + density_ratio = theta @ K_test + scores[i] = np.mean((density_ratio - 1) ** 2) + return scores From 88148128c787a8e312517e9c78051c75a034c7c2 Mon Sep 17 00:00:00 2001 From: alistkova Date: Thu, 29 May 2025 22:39:02 +0300 Subject: [PATCH 11/13] fix: correct minimize call and remove unused type ignore --- pysatl_cpd/core/algorithms/kliep_algorithm.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pysatl_cpd/core/algorithms/kliep_algorithm.py b/pysatl_cpd/core/algorithms/kliep_algorithm.py index b0f8e639..dad9e21e 100644 --- a/pysatl_cpd/core/algorithms/kliep_algorithm.py +++ b/pysatl_cpd/core/algorithms/kliep_algorithm.py @@ -69,15 +69,19 @@ def _optimize_alpha( :param ref_density: KDE values for reference segment (before potential CP). :return: optimal alpha value for density ratio adjustment. """ - def loss(alpha: float) -> float: + def loss(alpha_array: npt.NDArray[np.float64]) -> float: + alpha = alpha_array[0] ratio = np.exp(np.log(test_density) - np.log(ref_density + 1e-10) - alpha) return float(-np.mean(np.log(ratio + 1e-10)) + self.regularization * alpha**2) + initial_alpha = np.array([0.0], dtype=np.float64) + bounds = [(0.0, None)] + res = minimize( loss, - x0=0.0, + x0=initial_alpha, method='L-BFGS-B', - bounds=[(0.0, None)], + bounds=bounds, options={'maxiter': self.max_iter} ) return float(res.x[0]) From 063d6798fab681cbf6984a76a5d6c512cbb1b2e8 Mon Sep 17 00:00:00 2001 From: Aleksandra Listkova <115529517+alistkova@users.noreply.github.com> Date: Thu, 29 May 2025 23:14:39 +0300 Subject: [PATCH 12/13] fix: delete unused type ignore comment --- pysatl_cpd/core/algorithms/kliep_algorithm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pysatl_cpd/core/algorithms/kliep_algorithm.py b/pysatl_cpd/core/algorithms/kliep_algorithm.py index dad9e21e..ab44cb0d 100644 --- a/pysatl_cpd/core/algorithms/kliep_algorithm.py +++ b/pysatl_cpd/core/algorithms/kliep_algorithm.py @@ -8,7 +8,7 @@ import numpy as np import numpy.typing as npt -from scipy.optimize import minimize # type: ignore +from scipy.optimize import minimize from pysatl_cpd.core.algorithms.density.abstracts.density_based_algorithm import DensityBasedAlgorithm From 46895a6f82a6339ed0f9c6d7eac5cb01d4d8e8d8 Mon Sep 17 00:00:00 2001 From: Vladimir Kutuev Date: Thu, 10 Jul 2025 19:51:10 +0300 Subject: [PATCH 13/13] fix: typing problem with scipy.optimize.minimize call --- pysatl_cpd/core/algorithms/kliep_algorithm.py | 27 +++++-------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/pysatl_cpd/core/algorithms/kliep_algorithm.py b/pysatl_cpd/core/algorithms/kliep_algorithm.py index ab44cb0d..1cae33db 100644 --- a/pysatl_cpd/core/algorithms/kliep_algorithm.py +++ b/pysatl_cpd/core/algorithms/kliep_algorithm.py @@ -57,11 +57,7 @@ def _compute_scores(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.flo scores[i] = np.mean(np.log(after_density + 1e-10)) - np.mean(np.log(before_density + 1e-10)) - alpha return scores - def _optimize_alpha( - self, - test_density: npt.NDArray[np.float64], - ref_density: npt.NDArray[np.float64] - ) -> float: + def _optimize_alpha(self, test_density: npt.NDArray[np.float64], ref_density: npt.NDArray[np.float64]) -> float: """ Optimizes alpha parameter for density ratio estimation. @@ -69,7 +65,8 @@ def _optimize_alpha( :param ref_density: KDE values for reference segment (before potential CP). :return: optimal alpha value for density ratio adjustment. """ - def loss(alpha_array: npt.NDArray[np.float64]) -> float: + + def loss(alpha_array: npt.NDArray[np.float64], /) -> float: alpha = alpha_array[0] ratio = np.exp(np.log(test_density) - np.log(ref_density + 1e-10) - alpha) return float(-np.mean(np.log(ratio + 1e-10)) + self.regularization * alpha**2) @@ -77,13 +74,7 @@ def loss(alpha_array: npt.NDArray[np.float64]) -> float: initial_alpha = np.array([0.0], dtype=np.float64) bounds = [(0.0, None)] - res = minimize( - loss, - x0=initial_alpha, - method='L-BFGS-B', - bounds=bounds, - options={'maxiter': self.max_iter} - ) + res = minimize(loss, x0=initial_alpha, method="L-BFGS-B", bounds=bounds, options={"maxiter": self.max_iter}) return float(res.x[0]) def _build_common_grid(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]: @@ -94,17 +85,11 @@ def _build_common_grid(self, window: npt.NDArray[np.float64]) -> npt.NDArray[np. :return: grid spanning data range with bandwidth-adjusted margins. """ return np.linspace( - np.min(window) - 3 * self.bandwidth, - np.max(window) + 3 * self.bandwidth, - 1000, - dtype=np.float64 + np.min(window) - 3 * self.bandwidth, np.max(window) + 3 * self.bandwidth, 1000, dtype=np.float64 ) def _kde_on_grid( - self, - observation: npt.NDArray[np.float64], - bandwidth: float, - grid: npt.NDArray[np.float64] + self, observation: npt.NDArray[np.float64], bandwidth: float, grid: npt.NDArray[np.float64] ) -> npt.NDArray[np.float64]: """ Computes kernel density estimate on specified grid.