diff --git a/bayes_opt/acquisition.py b/bayes_opt/acquisition.py index 4fa7eaa1b..a1c242b69 100644 --- a/bayes_opt/acquisition.py +++ b/bayes_opt/acquisition.py @@ -23,8 +23,7 @@ import abc import warnings from copy import deepcopy -from numbers import Number -from typing import TYPE_CHECKING, Callable +from typing import TYPE_CHECKING, Any, Literal, NoReturn import numpy as np from numpy.random import RandomState @@ -41,8 +40,15 @@ from bayes_opt.target_space import TargetSpace if TYPE_CHECKING: + from collections.abc import Callable + + from numpy.typing import NDArray + from scipy.optimize import OptimizeResult + from bayes_opt.constraint import ConstraintModel + Float = np.floating[Any] + class AcquisitionFunction(abc.ABC): """Base class for acquisition functions. @@ -53,7 +59,7 @@ class AcquisitionFunction(abc.ABC): Set the random state for reproducibility. """ - def __init__(self, random_state=None): + def __init__(self, random_state: int | RandomState | None = None) -> None: if random_state is not None: if isinstance(random_state, RandomState): self.random_state = random_state @@ -64,7 +70,7 @@ def __init__(self, random_state=None): self.i = 0 @abc.abstractmethod - def base_acq(self, *args, **kwargs): + def base_acq(self, *args: Any, **kwargs: Any) -> NDArray[Float]: """Provide access to the base acquisition function.""" def _fit_gp(self, gp: GaussianProcessRegressor, target_space: TargetSpace) -> None: @@ -80,10 +86,10 @@ def suggest( self, gp: GaussianProcessRegressor, target_space: TargetSpace, - n_random=10_000, - n_l_bfgs_b=10, + n_random: int = 10_000, + n_l_bfgs_b: int = 10, fit_gp: bool = True, - ): + ) -> NDArray[Float]: """Suggest a promising point to probe next. Parameters @@ -123,7 +129,9 @@ def suggest( acq = self._get_acq(gp=gp, constraint=target_space.constraint) return self._acq_min(acq, target_space.bounds, n_random=n_random, n_l_bfgs_b=n_l_bfgs_b) - def _get_acq(self, gp: GaussianProcessRegressor, constraint: ConstraintModel | None = None) -> Callable: + def _get_acq( + self, gp: GaussianProcessRegressor, constraint: ConstraintModel | None = None + ) -> Callable[[NDArray[Float]], NDArray[Float]]: """Prepare the acquisition function for minimization. Transforms a base_acq Callable, which takes `mean` and `std` as @@ -148,25 +156,36 @@ def _get_acq(self, gp: GaussianProcessRegressor, constraint: ConstraintModel | N dim = gp.X_train_.shape[1] if constraint is not None: - def acq(x): + def acq(x: NDArray[Float]) -> NDArray[Float]: x = x.reshape(-1, dim) with warnings.catch_warnings(): warnings.simplefilter("ignore") + mean: NDArray[Float] + std: NDArray[Float] + p_constraints: NDArray[Float] mean, std = gp.predict(x, return_std=True) p_constraints = constraint.predict(x) return -1 * self.base_acq(mean, std) * p_constraints else: - def acq(x): + def acq(x: NDArray[Float]) -> NDArray[Float]: x = x.reshape(-1, dim) with warnings.catch_warnings(): warnings.simplefilter("ignore") + mean: NDArray[Float] + std: NDArray[Float] mean, std = gp.predict(x, return_std=True) return -1 * self.base_acq(mean, std) return acq - def _acq_min(self, acq: Callable, bounds: np.ndarray, n_random=10_000, n_l_bfgs_b=10) -> np.ndarray: + def _acq_min( + self, + acq: Callable[[NDArray[Float]], NDArray[Float]], + bounds: NDArray[Float], + n_random: int = 10_000, + n_l_bfgs_b: int = 10, + ) -> NDArray[Float]: """Find the maximum of the acquisition function. Uses a combination of random sampling (cheap) and the 'L-BFGS-B' @@ -200,13 +219,14 @@ def _acq_min(self, acq: Callable, bounds: np.ndarray, n_random=10_000, n_l_bfgs_ raise ValueError(error_msg) x_min_r, min_acq_r = self._random_sample_minimize(acq, bounds, n_random=n_random) x_min_l, min_acq_l = self._l_bfgs_b_minimize(acq, bounds, n_x_seeds=n_l_bfgs_b) + # Either n_random or n_l_bfgs_b is not 0 => at least one of x_min_r and x_min_l is not None if min_acq_r < min_acq_l: return x_min_r return x_min_l def _random_sample_minimize( - self, acq: Callable, bounds: np.ndarray, n_random: int - ) -> tuple[np.ndarray, float]: + self, acq: Callable[[NDArray[Float]], NDArray[Float]], bounds: NDArray[Float], n_random: int + ) -> tuple[NDArray[Float] | None, float]: """Random search to find the minimum of `acq` function. Parameters @@ -239,8 +259,8 @@ def _random_sample_minimize( return x_min, min_acq def _l_bfgs_b_minimize( - self, acq: Callable, bounds: np.ndarray, n_x_seeds: int = 10 - ) -> tuple[np.ndarray, float]: + self, acq: Callable[[NDArray[Float]], NDArray[Float]], bounds: NDArray[Float], n_x_seeds: int = 10 + ) -> tuple[NDArray[Float] | None, float]: """Random search to find the minimum of `acq` function. Parameters @@ -268,10 +288,12 @@ def _l_bfgs_b_minimize( return None, np.inf x_seeds = self.random_state.uniform(bounds[:, 0], bounds[:, 1], size=(n_x_seeds, bounds.shape[0])) - min_acq = None + min_acq: float | None = None + x_try: NDArray[Float] + x_min: NDArray[Float] for x_try in x_seeds: # Find the minimum of minus the acquisition function - res = minimize(acq, x_try, bounds=bounds, method="L-BFGS-B") + res: OptimizeResult = minimize(acq, x_try, bounds=bounds, method="L-BFGS-B") # See if success if not res.success: @@ -317,7 +339,11 @@ class UpperConfidenceBound(AcquisitionFunction): """ def __init__( - self, kappa=2.576, exploration_decay=None, exploration_decay_delay=None, random_state=None + self, + kappa: float = 2.576, + exploration_decay: float | None = None, + exploration_decay_delay: int | None = None, + random_state: int | RandomState | None = None, ) -> None: if kappa < 0: error_msg = "kappa must be greater than or equal to 0." @@ -328,7 +354,7 @@ def __init__( self.exploration_decay = exploration_decay self.exploration_decay_delay = exploration_decay_delay - def base_acq(self, mean, std): + def base_acq(self, mean: NDArray[Float], std: NDArray[Float]) -> NDArray[Float]: """Calculate the upper confidence bound. Parameters @@ -350,10 +376,10 @@ def suggest( self, gp: GaussianProcessRegressor, target_space: TargetSpace, - n_random=10_000, - n_l_bfgs_b=10, + n_random: int = 10_000, + n_l_bfgs_b: int = 10, fit_gp: bool = True, - ) -> np.ndarray: + ) -> NDArray[Float]: """Suggest a promising point to probe next. Parameters @@ -432,14 +458,20 @@ class ProbabilityOfImprovement(AcquisitionFunction): Set the random state for reproducibility. """ - def __init__(self, xi, exploration_decay=None, exploration_decay_delay=None, random_state=None) -> None: + def __init__( + self, + xi: float, + exploration_decay: float | None = None, + exploration_decay_delay: int | None = None, + random_state: int | RandomState | None = None, + ) -> None: super().__init__(random_state=random_state) self.xi = xi self.exploration_decay = exploration_decay self.exploration_decay_delay = exploration_decay_delay self.y_max = None - def base_acq(self, mean, std): + def base_acq(self, mean: NDArray[Float], std: NDArray[Float]) -> NDArray[Float]: """Calculate the probability of improvement. Parameters @@ -473,10 +505,10 @@ def suggest( self, gp: GaussianProcessRegressor, target_space: TargetSpace, - n_random=10_000, - n_l_bfgs_b=10, + n_random: int = 10_000, + n_l_bfgs_b: int = 10, fit_gp: bool = True, - ) -> np.ndarray: + ) -> NDArray[Float]: """Suggest a promising point to probe next. Parameters @@ -565,14 +597,20 @@ class ExpectedImprovement(AcquisitionFunction): Set the random state for reproducibility. """ - def __init__(self, xi, exploration_decay=None, exploration_decay_delay=None, random_state=None) -> None: + def __init__( + self, + xi: float, + exploration_decay: float | None = None, + exploration_decay_delay: int | None = None, + random_state: int | RandomState | None = None, + ) -> None: super().__init__(random_state=random_state) self.xi = xi self.exploration_decay = exploration_decay self.exploration_decay_delay = exploration_decay_delay self.y_max = None - def base_acq(self, mean, std): + def base_acq(self, mean: NDArray[Float], std: NDArray[Float]) -> NDArray[Float]: """Calculate the expected improvement. Parameters @@ -607,10 +645,10 @@ def suggest( self, gp: GaussianProcessRegressor, target_space: TargetSpace, - n_random=10_000, - n_l_bfgs_b=10, + n_random: int = 10_000, + n_l_bfgs_b: int = 10, fit_gp: bool = True, - ) -> np.ndarray: + ) -> NDArray[Float]: """Suggest a promising point to probe next. Parameters @@ -701,19 +739,24 @@ class ConstantLiar(AcquisitionFunction): """ def __init__( - self, base_acquisition: AcquisitionFunction, strategy="max", random_state=None, atol=1e-5, rtol=1e-8 + self, + base_acquisition: AcquisitionFunction, + strategy: Literal["min", "mean", "max"] | float = "max", + random_state: int | RandomState | None = None, + atol: float = 1e-5, + rtol: float = 1e-8, ) -> None: super().__init__(random_state) self.base_acquisition = base_acquisition self.dummies = [] - if not isinstance(strategy, Number) and strategy not in ["min", "mean", "max"]: + if not isinstance(strategy, float) and strategy not in ["min", "mean", "max"]: error_msg = f"Received invalid argument {strategy} for strategy." raise ValueError(error_msg) - self.strategy = strategy + self.strategy: Literal["min", "mean", "max"] | float = strategy self.atol = atol self.rtol = rtol - def base_acq(self, *args, **kwargs): + def base_acq(self, *args: Any, **kwargs: Any) -> NDArray[Float]: """Calculate the acquisition function. Calls the base acquisition function's `base_acq` method. @@ -774,10 +817,10 @@ def suggest( self, gp: GaussianProcessRegressor, target_space: TargetSpace, - n_random=10_000, - n_l_bfgs_b=10, + n_random: int = 10_000, + n_l_bfgs_b: int = 10, fit_gp: bool = True, - ) -> np.ndarray: + ) -> NDArray[Float]: """Suggest a promising point to probe next. Parameters @@ -824,8 +867,9 @@ def suggest( # Create a copy of the target space dummy_target_space = self._copy_target_space(target_space) + dummy_target: float # Choose the dummy target value - if isinstance(self.strategy, Number): + if isinstance(self.strategy, float): dummy_target = self.strategy elif self.strategy == "min": dummy_target = target_space.target.min() @@ -875,14 +919,16 @@ class GPHedge(AcquisitionFunction): Set the random state for reproducibility. """ - def __init__(self, base_acquisitions: list[AcquisitionFunction], random_state=None) -> None: + def __init__( + self, base_acquisitions: list[AcquisitionFunction], random_state: int | RandomState | None = None + ) -> None: super().__init__(random_state) self.base_acquisitions = base_acquisitions self.n_acq = len(self.base_acquisitions) self.gains = np.zeros(self.n_acq) self.previous_candidates = None - def base_acq(self, *args, **kwargs): + def base_acq(self, *args: Any, **kwargs: Any) -> NoReturn: """Raise an error, since the base acquisition function is ambiguous.""" msg = ( "GPHedge base acquisition function is ambiguous." @@ -909,10 +955,10 @@ def suggest( self, gp: GaussianProcessRegressor, target_space: TargetSpace, - n_random=10_000, - n_l_bfgs_b=10, + n_random: int = 10_000, + n_l_bfgs_b: int = 10, fit_gp: bool = True, - ) -> np.ndarray: + ) -> NDArray[Float]: """Suggest a promising point to probe next. Parameters diff --git a/bayes_opt/bayesian_optimization.py b/bayes_opt/bayesian_optimization.py index b9152dffb..47a53f7c5 100644 --- a/bayes_opt/bayesian_optimization.py +++ b/bayes_opt/bayesian_optimization.py @@ -7,6 +7,7 @@ from __future__ import annotations from collections import deque +from typing import TYPE_CHECKING, Any from sklearn.gaussian_process import GaussianProcessRegressor from sklearn.gaussian_process.kernels import Matern @@ -18,30 +19,44 @@ from bayes_opt.target_space import TargetSpace from bayes_opt.util import ensure_rng +if TYPE_CHECKING: + from collections.abc import Callable, Iterable, Mapping, Sequence + + import numpy as np + from numpy.random import RandomState + from numpy.typing import NDArray + from scipy.optimize import NonlinearConstraint + + from bayes_opt.acquisition import AcquisitionFunction + from bayes_opt.constraint import ConstraintModel + from bayes_opt.domain_reduction import DomainTransformer + + Float = np.floating[Any] + class Observable: """Inspired by https://www.protechtraining.com/blog/post/879#simple-observer.""" - def __init__(self, events): + def __init__(self, events: Iterable[Any]) -> None: # maps event names to subscribers # str -> dict self._events = {event: dict() for event in events} - def get_subscribers(self, event): + def get_subscribers(self, event: Any) -> Any: """Return the subscribers of an event.""" return self._events[event] - def subscribe(self, event, subscriber, callback=None): + def subscribe(self, event: Any, subscriber: Any, callback: Callable[..., Any] | None = None) -> None: """Add subscriber to an event.""" if callback is None: callback = subscriber.update self.get_subscribers(event)[subscriber] = callback - def unsubscribe(self, event, subscriber): + def unsubscribe(self, event: Any, subscriber: Any) -> None: """Remove a subscriber for a particular event.""" del self.get_subscribers(event)[subscriber] - def dispatch(self, event): + def dispatch(self, event: Any) -> None: """Trigger callbacks for subscribers of an event.""" for callback in self.get_subscribers(event).values(): callback(event, self) @@ -56,14 +71,14 @@ class BayesianOptimization(Observable): Parameters ---------- - f: function + f: function or None. Function to be maximized. pbounds: dict Dictionary with parameters names as keys and a tuple with minimum and maximum values. - constraint: ConstraintModel. + constraint: NonlinearConstraint. Note that the names of arguments of the constraint function and of f need to be the same. @@ -87,18 +102,18 @@ class BayesianOptimization(Observable): def __init__( self, - f, - pbounds, - acquisition_function=None, - constraint=None, - random_state=None, - verbose=2, - bounds_transformer=None, - allow_duplicate_points=False, + f: Callable[..., float] | None, + pbounds: Mapping[str, tuple[float, float]], + acquisition_function: AcquisitionFunction | None = None, + constraint: NonlinearConstraint | None = None, + random_state: int | RandomState | None = None, + verbose: int = 2, + bounds_transformer: DomainTransformer | None = None, + allow_duplicate_points: bool = False, ): self._random_state = ensure_rng(random_state) self._allow_duplicate_points = allow_duplicate_points - self._queue = deque() + self._queue: deque[Mapping[str, float] | Sequence[float] | NDArray[Float]] = deque() if acquisition_function is None: if constraint is None: @@ -154,24 +169,24 @@ def __init__( super().__init__(events=DEFAULT_EVENTS) @property - def space(self): + def space(self) -> TargetSpace: """Return the target space associated with the optimizer.""" return self._space @property - def acquisition_function(self): + def acquisition_function(self) -> AcquisitionFunction: """Return the acquisition function associated with the optimizer.""" return self._acquisition_function @property - def constraint(self): + def constraint(self) -> ConstraintModel | None: """Return the constraint associated with the optimizer, if any.""" if self.is_constrained: return self._space.constraint return None @property - def max(self): + def max(self) -> dict[str, Any] | None: """Get maximum target value found and corresponding parameters. See `TargetSpace.max` for more information. @@ -179,14 +194,19 @@ def max(self): return self._space.max() @property - def res(self): + def res(self) -> list[dict[str, Any]]: """Get all target values and constraint fulfillment for all parameters. See `TargetSpace.res` for more information. """ return self._space.res() - def register(self, params, target, constraint_value=None): + def register( + self, + params: Mapping[str, float] | Sequence[float] | NDArray[Float], + target: float, + constraint_value: float | NDArray[Float] | None = None, + ) -> None: """Register an observation with known target. Parameters @@ -203,7 +223,9 @@ def register(self, params, target, constraint_value=None): self._space.register(params, target, constraint_value) self.dispatch(Events.OPTIMIZATION_STEP) - def probe(self, params, lazy=True): + def probe( + self, params: Mapping[str, float] | Sequence[float] | NDArray[Float], lazy: bool = True + ) -> None: """Evaluate the function at the given points. Useful to guide the optimizer. @@ -223,7 +245,7 @@ def probe(self, params, lazy=True): self._space.probe(params) self.dispatch(Events.OPTIMIZATION_STEP) - def suggest(self): + def suggest(self) -> dict[str, float]: """Suggest a promising point to probe next.""" if len(self._space) == 0: return self._space.array_to_params(self._space.random_sample()) @@ -233,7 +255,7 @@ def suggest(self): return self._space.array_to_params(suggestion) - def _prime_queue(self, init_points): + def _prime_queue(self, init_points: int) -> None: """Ensure the queue is not empty. Parameters @@ -247,14 +269,14 @@ def _prime_queue(self, init_points): for _ in range(init_points): self._queue.append(self._space.random_sample()) - def _prime_subscriptions(self): + def _prime_subscriptions(self) -> None: if not any([len(subs) for subs in self._events.values()]): _logger = _get_default_logger(self._verbose, self.is_constrained) self.subscribe(Events.OPTIMIZATION_START, _logger) self.subscribe(Events.OPTIMIZATION_STEP, _logger) self.subscribe(Events.OPTIMIZATION_END, _logger) - def maximize(self, init_points=5, n_iter=25): + def maximize(self, init_points: int = 5, n_iter: int = 25) -> None: r""" Maximize the given function over the target space. @@ -296,7 +318,7 @@ def maximize(self, init_points=5, n_iter=25): self.dispatch(Events.OPTIMIZATION_END) - def set_bounds(self, new_bounds): + def set_bounds(self, new_bounds: Mapping[str, NDArray[Float] | Sequence[float]]) -> None: """Modify the bounds of the search space. Parameters @@ -306,6 +328,6 @@ def set_bounds(self, new_bounds): """ self._space.set_bounds(new_bounds) - def set_gp_params(self, **params): + def set_gp_params(self, **params: Any) -> None: """Set parameters of the internal Gaussian Process Regressor.""" self._gp.set_params(**params) diff --git a/bayes_opt/constraint.py b/bayes_opt/constraint.py index 46cf3f793..a8243a167 100644 --- a/bayes_opt/constraint.py +++ b/bayes_opt/constraint.py @@ -2,11 +2,21 @@ from __future__ import annotations +from typing import TYPE_CHECKING, Any + import numpy as np from scipy.stats import norm from sklearn.gaussian_process import GaussianProcessRegressor from sklearn.gaussian_process.kernels import Matern +if TYPE_CHECKING: + from collections.abc import Callable + + from numpy.random import RandomState + from numpy.typing import NDArray + + Float = np.floating[Any] + class ConstraintModel: """Model constraints using GP regressors. @@ -40,7 +50,13 @@ class ConstraintModel: simply the product of the individual probabilities. """ - def __init__(self, fun, lb, ub, random_state=None): + def __init__( + self, + fun: Callable[..., float] | Callable[..., NDArray[Float]] | None, + lb: float | NDArray[Float], + ub: float | NDArray[Float], + random_state: int | RandomState | None = None, + ) -> None: self.fun = fun self._lb = np.atleast_1d(lb) @@ -62,21 +78,21 @@ def __init__(self, fun, lb, ub, random_state=None): ] @property - def lb(self): + def lb(self) -> NDArray[Float]: """Return lower bounds.""" return self._lb @property - def ub(self): + def ub(self) -> NDArray[Float]: """Return upper bounds.""" return self._ub @property - def model(self): + def model(self) -> list[GaussianProcessRegressor]: """Return GP regressors of the constraint function.""" return self._model - def eval(self, **kwargs: dict): # noqa: D417 + def eval(self, **kwargs: Any) -> float | NDArray[Float]: # noqa: D417 r"""Evaluate the constraint function. Parameters @@ -94,6 +110,10 @@ def eval(self, **kwargs: dict): # noqa: D417 TypeError If the kwargs' keys don't match the function argument names. """ + if self.fun is None: + error_msg = "No constraint function was provided." + raise ValueError(error_msg) + try: return self.fun(**kwargs) except TypeError as e: @@ -106,7 +126,7 @@ def eval(self, **kwargs: dict): # noqa: D417 e.args = (msg,) raise - def fit(self, X, Y): + def fit(self, X: NDArray[Float], Y: NDArray[Float]) -> None: """Fit internal GPRs to the data. Parameters @@ -127,7 +147,7 @@ def fit(self, X, Y): for i, gp in enumerate(self._model): gp.fit(X, Y[:, i]) - def predict(self, X): + def predict(self, X: NDArray[Float]) -> NDArray[Float]: r"""Calculate the probability that the constraint is fulfilled at `X`. Note that this does not try to approximate the values of the @@ -167,6 +187,12 @@ def predict(self, X): """ X_shape = X.shape X = X.reshape((-1, self._model[0].n_features_in_)) + + result: NDArray[Float] + y_mean: NDArray[Float] + y_std: NDArray[Float] + p_lower: NDArray[Float] + p_upper: NDArray[Float] if len(self._model) == 1: y_mean, y_std = self._model[0].predict(X, return_std=True) @@ -191,7 +217,7 @@ def predict(self, X): result = result * (p_upper - p_lower) return result.reshape(X_shape[:-1]) - def approx(self, X): + def approx(self, X: NDArray[Float]) -> NDArray[Float]: """ Approximate the constraint function using the internal GPR model. @@ -213,7 +239,7 @@ def approx(self, X): result = np.column_stack([gp.predict(X) for gp in self._model]) return result.reshape(X_shape[:-1] + (len(self._lb),)) - def allowed(self, constraint_values): + def allowed(self, constraint_values: NDArray[Float]) -> NDArray[np.bool_]: """Check whether `constraint_values` fulfills the specified limits. Parameters diff --git a/bayes_opt/domain_reduction.py b/bayes_opt/domain_reduction.py index 80a2253a1..39657f283 100644 --- a/bayes_opt/domain_reduction.py +++ b/bayes_opt/domain_reduction.py @@ -8,18 +8,26 @@ from __future__ import annotations from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any from warnings import warn import numpy as np from bayes_opt.target_space import TargetSpace +if TYPE_CHECKING: + from collections.abc import Iterable, Mapping, Sequence + + from numpy.typing import NDArray + + Float = np.floating[Any] + class DomainTransformer(ABC): """Base class.""" @abstractmethod - def __init__(self, **kwargs) -> None: + def __init__(self, **kwargs: Any) -> None: """To override with specific implementation.""" @abstractmethod @@ -27,7 +35,7 @@ def initialize(self, target_space: TargetSpace) -> None: """To override with specific implementation.""" @abstractmethod - def transform(self, target_space: TargetSpace) -> dict: + def transform(self, target_space: TargetSpace) -> dict[str, NDArray[Float]]: """To override with specific implementation.""" @@ -58,7 +66,7 @@ def __init__( gamma_osc: float = 0.7, gamma_pan: float = 1.0, eta: float = 0.9, - minimum_window: np.ndarray | list[float] | float | dict[str, float] | None = 0.0, + minimum_window: NDArray[Float] | Sequence[float] | float | Mapping[str, float] | None = 0.0, ) -> None: self.gamma_osc = gamma_osc self.gamma_pan = gamma_pan @@ -138,7 +146,7 @@ def _update(self, target_space: TargetSpace) -> None: self.r = self.contraction_rate * self.r - def _trim(self, new_bounds: np.ndarray, global_bounds: np.ndarray) -> np.ndarray: + def _trim(self, new_bounds: NDArray[Float], global_bounds: NDArray[Float]) -> NDArray[Float]: """ Adjust the new_bounds and verify that they adhere to global_bounds and minimum_window. @@ -158,6 +166,7 @@ def _trim(self, new_bounds: np.ndarray, global_bounds: np.ndarray) -> np.ndarray # sort bounds new_bounds = np.sort(new_bounds) + pbounds: NDArray[Float] # Validate each parameter's bounds against the global_bounds for i, pbounds in enumerate(new_bounds): # If the one of the bounds is outside the global bounds, reset the bound to the global bound @@ -222,7 +231,7 @@ def _trim(self, new_bounds: np.ndarray, global_bounds: np.ndarray) -> np.ndarray return new_bounds - def _window_bounds_compatibility(self, global_bounds: np.ndarray): + def _window_bounds_compatibility(self, global_bounds: NDArray[Float]) -> None: """Check if global bounds are compatible with the minimum window sizes. Parameters @@ -235,18 +244,19 @@ def _window_bounds_compatibility(self, global_bounds: np.ndarray): ValueError If global bounds are not compatible with the minimum window size. """ + entry: NDArray[Float] for i, entry in enumerate(global_bounds): global_window_width = abs(entry[1] - entry[0]) if global_window_width < self.minimum_window[i]: error_msg = "Global bounds are not compatible with the minimum window size." raise ValueError(error_msg) - def _create_bounds(self, parameters: dict, bounds: np.ndarray) -> dict: + def _create_bounds(self, parameters: Iterable[str], bounds: NDArray[Float]) -> dict[str, NDArray[Float]]: """Create a dictionary of bounds for each parameter. Parameters ---------- - parameters : dict + parameters : Iterable[str] The parameters for which to create the bounds. bounds : np.ndarray @@ -254,7 +264,7 @@ def _create_bounds(self, parameters: dict, bounds: np.ndarray) -> dict: """ return {param: bounds[i, :] for i, param in enumerate(parameters)} - def transform(self, target_space: TargetSpace) -> dict: + def transform(self, target_space: TargetSpace) -> dict[str, NDArray[Float]]: """Transform the bounds of the target space. Parameters diff --git a/bayes_opt/logger.py b/bayes_opt/logger.py index 2b9ae4722..a9756ca41 100644 --- a/bayes_opt/logger.py +++ b/bayes_opt/logger.py @@ -5,7 +5,7 @@ import json from contextlib import suppress from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any import numpy as np from colorama import Fore, just_fix_windows_console @@ -13,10 +13,15 @@ from bayes_opt.event import Events from bayes_opt.observer import _Tracker +if TYPE_CHECKING: + from os import PathLike + + from bayes_opt.bayesian_optimization import BayesianOptimization + just_fix_windows_console() -def _get_default_logger(verbose, is_constrained): +def _get_default_logger(verbose: int, is_constrained: bool) -> ScreenLogger: """ Return the default logger. @@ -57,19 +62,19 @@ class ScreenLogger(_Tracker): _colour_regular_message = Fore.RESET _colour_reset = Fore.RESET - def __init__(self, verbose=2, is_constrained=False): + def __init__(self, verbose: int = 2, is_constrained: bool = False) -> None: self._verbose = verbose self._is_constrained = is_constrained self._header_length = None super().__init__() @property - def verbose(self): + def verbose(self) -> int: """Return the verbosity level.""" return self._verbose @verbose.setter - def verbose(self, v): + def verbose(self, v: int) -> None: """Set the verbosity level. Parameters @@ -80,11 +85,11 @@ def verbose(self, v): self._verbose = v @property - def is_constrained(self): + def is_constrained(self) -> bool: """Return whether the logger is constrained.""" return self._is_constrained - def _format_number(self, x): + def _format_number(self, x: float) -> str: """Format a number. Parameters @@ -107,7 +112,7 @@ def _format_number(self, x): return s[: self._default_cell_size - 3] + "..." return s - def _format_bool(self, x): + def _format_bool(self, x: bool) -> str: """Format a boolean. Parameters @@ -122,7 +127,7 @@ def _format_bool(self, x): x_ = ("T" if x else "F") if self._default_cell_size < 5 else str(x) return f"{x_:<{self._default_cell_size}}" - def _format_key(self, key): + def _format_key(self, key: str) -> str: """Format a key. Parameters @@ -139,7 +144,7 @@ def _format_key(self, key): return s[: self._default_cell_size - 3] + "..." return s - def _step(self, instance, colour=_colour_regular_message): + def _step(self, instance: BayesianOptimization, colour: str = _colour_regular_message) -> str: """Log a step. Parameters @@ -167,7 +172,7 @@ def _step(self, instance, colour=_colour_regular_message): return "| " + " | ".join(colour + x + self._colour_reset for x in cells if x is not None) + " |" - def _header(self, instance): + def _header(self, instance: BayesianOptimization) -> str: """Print the header of the log. Parameters @@ -192,7 +197,7 @@ def _header(self, instance): self._header_length = len(line) return line + "\n" + ("-" * self._header_length) - def _is_new_max(self, instance): + def _is_new_max(self, instance: BayesianOptimization) -> bool: """Check if the step to log produced a new maximum. Parameters @@ -213,7 +218,7 @@ def _is_new_max(self, instance): self._previous_max = instance.max["target"] return instance.max["target"] > self._previous_max - def update(self, event, instance): + def update(self, event: str, instance: BayesianOptimization) -> None: """Handle incoming events. Parameters @@ -225,13 +230,12 @@ def update(self, event, instance): instance : bayesian_optimization.BayesianOptimization The instance associated with the step. """ + line = "" if event == Events.OPTIMIZATION_START: line = self._header(instance) + "\n" elif event == Events.OPTIMIZATION_STEP: is_new_max = self._is_new_max(instance) - if self._verbose == 1 and not is_new_max: - line = "" - else: + if self._verbose != 1 or is_new_max: colour = self._colour_new_max if is_new_max else self._colour_regular_message line = self._step(instance, colour=colour) + "\n" elif event == Events.OPTIMIZATION_END: @@ -250,7 +254,7 @@ class JSONLogger(_Tracker): Parameters ---------- - path : str or bytes or os.PathLike + path : str or os.PathLike Path to the file to write to. reset : bool @@ -258,14 +262,14 @@ class JSONLogger(_Tracker): """ - def __init__(self, path, reset=True): + def __init__(self, path: str | PathLike[str], reset: bool = True): self._path = Path(path) if reset: with suppress(OSError): self._path.unlink(missing_ok=True) super().__init__() - def update(self, event, instance): + def update(self, event: str, instance: BayesianOptimization) -> None: """ Handle incoming events. diff --git a/bayes_opt/observer.py b/bayes_opt/observer.py index 3571b660b..f40687e67 100644 --- a/bayes_opt/observer.py +++ b/bayes_opt/observer.py @@ -3,14 +3,18 @@ from __future__ import annotations from datetime import datetime +from typing import TYPE_CHECKING from bayes_opt.event import Events +if TYPE_CHECKING: + from bayes_opt.bayesian_optimization import BayesianOptimization + class _Tracker: """Parent class for ScreenLogger and JSONLogger.""" - def __init__(self): + def __init__(self) -> None: self._iterations = 0 self._previous_max = None @@ -19,7 +23,7 @@ def __init__(self): self._start_time = None self._previous_time = None - def _update_tracker(self, event, instance): + def _update_tracker(self, event: str, instance: BayesianOptimization) -> None: """Update the tracker. Parameters @@ -43,7 +47,7 @@ def _update_tracker(self, event, instance): self._previous_max = current_max["target"] self._previous_max_params = current_max["params"] - def _time_metrics(self): + def _time_metrics(self) -> tuple[str, float, float]: """Return time passed since last call.""" now = datetime.now() # noqa: DTZ005 if self._start_time is None: diff --git a/bayes_opt/target_space.py b/bayes_opt/target_space.py index 04bc3f13b..7dddbd515 100644 --- a/bayes_opt/target_space.py +++ b/bayes_opt/target_space.py @@ -2,6 +2,7 @@ from __future__ import annotations +from typing import TYPE_CHECKING, Any from warnings import warn import numpy as np @@ -10,8 +11,18 @@ from bayes_opt.exception import NotUniqueError from bayes_opt.util import ensure_rng +if TYPE_CHECKING: + from collections.abc import Callable, Mapping, Sequence -def _hashable(x): + from numpy.random import RandomState + from numpy.typing import NDArray + + from bayes_opt.constraint import ConstraintModel + + Float = np.floating[Any] + + +def _hashable(x: NDArray[Float]) -> tuple[float, ...]: """Ensure that a point is hashable by a python dict.""" return tuple(map(float, x)) @@ -23,7 +34,7 @@ class TargetSpace: Parameters ---------- - target_func : function + target_func : function or None. Function to be maximized. pbounds : dict @@ -53,39 +64,45 @@ class TargetSpace: """ def __init__( - self, target_func, pbounds, constraint=None, random_state=None, allow_duplicate_points=False - ): + self, + target_func: Callable[..., float] | None, + pbounds: Mapping[str, tuple[float, float]], + constraint: ConstraintModel | None = None, + random_state: int | RandomState | None = None, + allow_duplicate_points: bool | None = False, + ) -> None: self.random_state = ensure_rng(random_state) - self._allow_duplicate_points = allow_duplicate_points + self._allow_duplicate_points = allow_duplicate_points or False self.n_duplicate_points = 0 # The function to be optimized self.target_func = target_func # Get the name of the parameters - self._keys = sorted(pbounds) + self._keys: list[str] = sorted(pbounds) # Create an array with parameters bounds - self._bounds = np.array( + self._bounds: NDArray[Float] = np.array( [item[1] for item in sorted(pbounds.items(), key=lambda x: x[0])], dtype=float ) # preallocated memory for X and Y points - self._params = np.empty(shape=(0, self.dim)) - self._target = np.empty(shape=(0,)) + self._params: NDArray[Float] = np.empty(shape=(0, self.dim)) + self._target: NDArray[Float] = np.empty(shape=(0,)) # keep track of unique points we have seen so far - self._cache = {} + self._cache: dict[tuple[float, ...], float | tuple[float, float | NDArray[Float]]] = {} - self._constraint = constraint + self._constraint: ConstraintModel | None = constraint if constraint is not None: # preallocated memory for constraint fulfillment + self._constraint_values: NDArray[Float] if constraint.lb.size == 1: self._constraint_values = np.empty(shape=(0), dtype=float) else: self._constraint_values = np.empty(shape=(0, constraint.lb.size), dtype=float) - def __contains__(self, x): + def __contains__(self, x: NDArray[Float]) -> bool: """Check if this parameter has already been registered. Returns @@ -94,7 +111,7 @@ def __contains__(self, x): """ return _hashable(x) in self._cache - def __len__(self): + def __len__(self) -> int: """Return number of observations registered. Returns @@ -107,7 +124,7 @@ def __len__(self): return len(self._target) @property - def empty(self): + def empty(self) -> bool: """Check if anything has been registered. Returns @@ -117,7 +134,7 @@ def empty(self): return len(self) == 0 @property - def params(self): + def params(self) -> NDArray[Float]: """Get the parameter values registered to this TargetSpace. Returns @@ -127,7 +144,7 @@ def params(self): return self._params @property - def target(self): + def target(self) -> NDArray[Float]: """Get the target function values registered to this TargetSpace. Returns @@ -137,7 +154,7 @@ def target(self): return self._target @property - def dim(self): + def dim(self) -> int: """Get the number of parameter names. Returns @@ -147,7 +164,7 @@ def dim(self): return len(self._keys) @property - def keys(self): + def keys(self) -> list[str]: """Get the keys (or parameter names). Returns @@ -157,7 +174,7 @@ def keys(self): return self._keys @property - def bounds(self): + def bounds(self) -> NDArray[Float]: """Get the bounds of this TargetSpace. Returns @@ -167,7 +184,7 @@ def bounds(self): return self._bounds @property - def constraint(self): + def constraint(self) -> ConstraintModel | None: """Get the constraint model. Returns @@ -177,7 +194,7 @@ def constraint(self): return self._constraint @property - def constraint_values(self): + def constraint_values(self) -> NDArray[Float]: """Get the constraint values registered to this TargetSpace. Returns @@ -191,7 +208,7 @@ def constraint_values(self): return self._constraint_values @property - def mask(self): + def mask(self) -> NDArray[np.bool_]: """Return a boolean array of valid points. Points are valid if they satisfy both the constraint and boundary conditions. @@ -215,7 +232,7 @@ def mask(self): return mask - def params_to_array(self, params): + def params_to_array(self, params: Mapping[str, float]) -> NDArray[Float]: """Convert a dict representation of parameters into an array version. Parameters @@ -236,7 +253,7 @@ def params_to_array(self, params): raise ValueError(error_msg) return np.asarray([params[key] for key in self.keys]) - def array_to_params(self, x): + def array_to_params(self, x: NDArray[Float]) -> dict[str, float]: """Convert an array representation of parameters into a dict version. Parameters @@ -257,7 +274,7 @@ def array_to_params(self, x): raise ValueError(error_msg) return dict(zip(self.keys, x)) - def _as_array(self, x): + def _as_array(self, x: Any) -> NDArray[Float]: try: x = np.asarray(x, dtype=float) except TypeError: @@ -272,7 +289,12 @@ def _as_array(self, x): raise ValueError(error_msg) return x - def register(self, params, target, constraint_value=None): + def register( + self, + params: Mapping[str, float] | Sequence[float] | NDArray[Float], + target: float, + constraint_value: float | NDArray[Float] | None = None, + ) -> None: """Append a point and its target value to the known data. Parameters @@ -283,7 +305,7 @@ def register(self, params, target, constraint_value=None): target : float target function value - constraint_value : float or None + constraint_value : float or np.ndarray or None Constraint function value Raises @@ -330,8 +352,8 @@ def register(self, params, target, constraint_value=None): # Make copies of the data, so as not to modify the originals incase something fails # during the registration process. This prevents out-of-sync data. - params_copy = np.concatenate([self._params, x.reshape(1, -1)]) - target_copy = np.concatenate([self._target, [target]]) + params_copy: NDArray[Float] = np.concatenate([self._params, x.reshape(1, -1)]) + target_copy: NDArray[Float] = np.concatenate([self._target, [target]]) cache_copy = self._cache.copy() # shallow copy suffices if self._constraint is None: @@ -346,7 +368,9 @@ def register(self, params, target, constraint_value=None): raise ValueError(msg) # Insert data into unique dictionary cache_copy[_hashable(x.ravel())] = (target, constraint_value) - constraint_values_copy = np.concatenate([self._constraint_values, [constraint_value]]) + constraint_values_copy: NDArray[Float] = np.concatenate( + [self._constraint_values, [constraint_value]] + ) self._constraint_values = constraint_values_copy # Operations passed, update the variables @@ -354,7 +378,9 @@ def register(self, params, target, constraint_value=None): self._target = target_copy self._cache = cache_copy - def probe(self, params): + def probe( + self, params: Mapping[str, float] | Sequence[float] | NDArray[Float] + ) -> float | tuple[float, float | NDArray[Float]]: """Evaluate the target function on a point and register the result. Notes @@ -385,18 +411,21 @@ def probe(self, params): if x in self and not self._allow_duplicate_points: return self._cache[_hashable(x.ravel())] - params = dict(zip(self._keys, x)) - target = self.target_func(**params) + dict_params = self.array_to_params(x) + if self.target_func is None: + error_msg = "No target function has been provided." + raise ValueError(error_msg) + target = self.target_func(**dict_params) if self._constraint is None: self.register(x, target) return target - constraint_value = self._constraint.eval(**params) + constraint_value = self._constraint.eval(**dict_params) self.register(x, target, constraint_value) return target, constraint_value - def random_sample(self): + def random_sample(self) -> NDArray[Float]: """ Sample a random point from within the bounds of the space. @@ -418,7 +447,7 @@ def random_sample(self): data.T[col] = self.random_state.uniform(lower, upper, size=1) return data.ravel() - def _target_max(self): + def _target_max(self) -> float | None: """Get the maximum target value within the current parameter bounds. If there is a constraint present, the maximum value that fulfills the @@ -437,7 +466,7 @@ def _target_max(self): return self.target[self.mask].max() - def max(self): + def max(self) -> dict[str, Any] | None: """Get maximum target value found and corresponding parameters. If there is a constraint present, the maximum value that fulfills the @@ -467,7 +496,7 @@ def max(self): return res - def res(self): + def res(self) -> list[dict[str, Any]]: """Get all target values and constraint fulfillment for all parameters. Returns @@ -500,7 +529,7 @@ def res(self): ) ] - def set_bounds(self, new_bounds): + def set_bounds(self, new_bounds: Mapping[str, NDArray[Float] | Sequence[float]]) -> None: """Change the lower and upper search bounds. Parameters diff --git a/bayes_opt/util.py b/bayes_opt/util.py index b1e745b13..2795ca07a 100644 --- a/bayes_opt/util.py +++ b/bayes_opt/util.py @@ -3,14 +3,23 @@ from __future__ import annotations import json +from os import PathLike from pathlib import Path +from typing import TYPE_CHECKING import numpy as np from bayes_opt.exception import NotUniqueError +if TYPE_CHECKING: + from collections.abc import Iterable -def load_logs(optimizer, logs): + from bayes_opt.bayesian_optimization import BayesianOptimization + + +def load_logs( + optimizer: BayesianOptimization, logs: str | PathLike[str] | Iterable[str | PathLike[str]] +) -> BayesianOptimization: """Load previous ... Parameters @@ -18,7 +27,7 @@ def load_logs(optimizer, logs): optimizer : BayesianOptimizer Optimizer the register the previous observations with. - logs : str or bytes or os.PathLike + logs : str or os.PathLike File to load the logs from. Returns @@ -26,7 +35,7 @@ def load_logs(optimizer, logs): The optimizer with the state loaded. """ - if isinstance(logs, str): + if isinstance(logs, (str, PathLike)): logs = [logs] for log in logs: @@ -50,7 +59,7 @@ def load_logs(optimizer, logs): return optimizer -def ensure_rng(random_state=None): +def ensure_rng(random_state: int | np.random.RandomState | None = None) -> np.random.RandomState: """Create a random number generator based on an optional seed. Parameters diff --git a/docsrc/conf.py b/docsrc/conf.py index d6a02dc0b..99d447796 100644 --- a/docsrc/conf.py +++ b/docsrc/conf.py @@ -45,6 +45,7 @@ 'IPython.sphinxext.ipython_console_highlighting', 'sphinx.ext.mathjax', "sphinx.ext.napoleon", + 'sphinx_autodoc_typehints', 'sphinx.ext.intersphinx', 'sphinx_immaterial', ] @@ -63,9 +64,9 @@ # Link types to the corresponding documentations intersphinx_mapping = { 'python': ('https://docs.python.org/3', None), - 'numpy': ('https://numpy.org/doc/stable/', None), - 'scipy': ('https://docs.scipy.org/doc/scipy/reference/', None), - 'sklearn': ('https://scikit-learn.org/stable/', None), + 'numpy': ('https://numpy.org/doc/stable', None), + 'scipy': ('https://docs.scipy.org/doc/scipy/reference', None), + 'sklearn': ('https://scikit-learn.org/stable', None), } @@ -160,4 +161,43 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -# html_static_path = ['_static'] \ No newline at end of file +# html_static_path = ['_static'] + +## extensions configuration +### sphinx-autodoc-typehints +typehints_use_signature = True +""" +If True, typehints for parameters in the signature are shown. + +see more: https://github.com/tox-dev/sphinx-autodoc-typehints/blob/main/README.md#options +""" +typehints_use_signature_return = True +""" +If True, return annotations in the signature are shown. + +see more: https://github.com/tox-dev/sphinx-autodoc-typehints/blob/main/README.md#options +""" +### autodoc +autodoc_typehints = "both" +""" +This value controls how to represent typehints. The setting takes the following values: + - `signature`: Show typehints in the signature + - `description`: Show typehints as content of the function or method + The typehints of overloaded functions or methods will still be represented in the signature. + - `none`: Do not show typehints + - `both`: Show typehints in the signature and as content of the function or method + +Overloaded functions or methods will not have typehints included in the description +because it is impossible to accurately represent all possible overloads as a list of parameters. + +see more: https://www.sphinx-doc.org/en/master/usage/extensions/autodoc.html#confval-autodoc_typehints +""" +autodoc_typehints_format = "short" +""" +This value controls the format of typehints. The setting takes the following values: + - `fully-qualified`: Show the module name and its name of typehints + - `short`: Suppress the leading module names of the typehints + (e.g. io.StringIO -> StringIO) + +see more: https://www.sphinx-doc.org/en/master/usage/extensions/autodoc.html#confval-autodoc_typehints_format +""" diff --git a/pyproject.toml b/pyproject.toml index a2ae33d00..a31a3fa76 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,10 +32,16 @@ nbformat = "^5.9.2" nbconvert = "^7.14.2" jupyter = "^1.0.0" matplotlib = "^3.0" -sphinx = "^7.3.7" nbsphinx = "^0.9.4" sphinx-immaterial = "^0.12.0" -myst-parser = "^3.0.1" +sphinx = [ + {version = "^7.0.0", python = "<3.10"}, + {version = "^8.0.0", python = ">=3.10"} +] +sphinx-autodoc-typehints = [ + {version = "^2.3.0", python = "<3.10"}, + {version = "^2.4.0", python = ">=3.10"} +] [build-system] diff --git a/tests/test_constraint.py b/tests/test_constraint.py index ca9903953..586773ca2 100644 --- a/tests/test_constraint.py +++ b/tests/test_constraint.py @@ -162,3 +162,9 @@ def constraint_function_2_dim(x, y): with pytest.raises(ValueError): BayesianOptimization(f=target_function, constraint=conmod, pbounds=pbounds, verbose=0, random_state=1) + + +def test_null_constraint_function(): + constraint = ConstraintModel(None, np.array([0, 0]), np.array([1, 1])) + with pytest.raises(ValueError, match="No constraint function was provided."): + constraint.eval() diff --git a/tests/test_target_space.py b/tests/test_target_space.py index e1e9163c1..bd957e068 100644 --- a/tests/test_target_space.py +++ b/tests/test_target_space.py @@ -285,6 +285,12 @@ def test_set_bounds(): assert all(space.bounds[:, 1] == np.array([1, 8, 3, 4])) +def test_no_target_func(): + target_space = TargetSpace(None, PBOUNDS) + with pytest.raises(ValueError, match="No target function has been provided."): + target_space.probe({"p1": 1, "p2": 2}) + + if __name__ == "__main__": r""" CommandLine: