Source code for optilab.functions.surrogate.locally_weighted_polynomial_regression

"""
Surrogate function which estimates the objective function with polynomial regression.
Points are weighted based on mahalanobis distance from query points.
"""

from typing import Callable

import faiss
import numpy as np
from sklearn.preprocessing import PolynomialFeatures

from ...data_classes import Point, PointList
from .surrogate_objective_function import SurrogateObjectiveFunction


[docs] def biquadratic_kernel_function(x: float) -> float: """ Biquadratic weighting function. Args: x: Distance between points. Returns: Weight value. """ if np.abs(x) >= 1: return 0 return (1 - x**2) ** 2
[docs] class LocallyWeightedPolynomialRegression(SurrogateObjectiveFunction): """ Surrogate function which estimates the objective function with polynomial regression. Points are weighted based on mahalanobis distance from query points. """
[docs] def __init__( self, degree: int, num_neighbors: int, train_set: PointList | None = None, covariance_matrix: np.ndarray | None = None, kernel_function: Callable[[float], float] = biquadratic_kernel_function, ) -> None: """ Class constructor. Args: degree: Degree of the polynomial used to approximate function. num_neighbors: Number of closest points to use in function approximation. train_set: Training set for the regressor, optional. covariance_matrix: Covariance class used in mahalanobis distance, optional. When no such matrix is provided an identity matrix is used. kernel_function: Function used to assign weights to points. """ self.is_ready = False super().__init__( f"locally_weighted_polynomial_regression_{degree}_degree", train_set, {"degree": degree, "num_neighbors": num_neighbors}, ) if covariance_matrix is not None: self.set_covariance_matrix(covariance_matrix) else: self.set_covariance_matrix(np.eye(self.metadata.dim)) if train_set: self.train(train_set) self.kernel_function = kernel_function self.preprocessor = PolynomialFeatures(degree=degree) self.weights = None self.index = None
[docs] def set_covariance_matrix(self, new_covariance_matrix: np.ndarray) -> None: """ Setter for the covariance matrix. Args: new_covariance_matrix: New covariance matrix to use for mahalanobis distance. """ self.inverse_sqrt_covariance = np.linalg.inv( np.linalg.cholesky(new_covariance_matrix) ).T
[docs] def train(self, train_set: PointList) -> None: """ Build FAISS index and preprocess data to use Mahalanobis distance. Args: train_set: Training set for the function """ super().train(train_set) x_train, y_train = self.train_set.pairs() # ignore warnings about overflows and zero divisions when covariance matrix # is ill-conditioned with np.errstate(divide="ignore", over="ignore", invalid="ignore"): x_train = x_train @ self.inverse_sqrt_covariance self.y_train = y_train self.index = faiss.IndexFlatL2(x_train.shape[1]) self.index.add( # pylint: disable=no-value-for-parameter x_train.astype(np.float32) )
[docs] def __call__(self, point: Point) -> Point: """ Estimate the value of a single point with the surrogate function. Since the surrogate model is built for each point independently, this is where the regressor is trained. Args: x: Point to estimate. Raises: ValueError: If dimensionality of x doesn't match self.dim. Return: Point: Estimated point. """ super().__call__(point) # ignore warnings about overflows and zero divisions when covariance matrix # is ill-conditioned with np.errstate(divide="ignore", over="ignore", invalid="ignore"): x_query = ( np.array([point.x], dtype=np.float64) @ self.inverse_sqrt_covariance ) distances, indices = ( self.index.search( # pylint: disable=no-value-for-parameter x_query.astype(np.float32), self.metadata.hyperparameters["num_neighbors"], ) ) distances = np.sqrt(distances.astype(np.float64)) knn_x = np.array([self.train_set[i].x for i in indices[0]], dtype=np.float64) knn_y = np.array([self.train_set[i].y for i in indices[0]], dtype=np.float64) bandwidth = distances[0][-1] if np.isclose(bandwidth, 0): # All neighbors are at the same location — use equal weights. weights = np.ones(len(knn_y), dtype=np.float64) else: weights = np.array( [np.sqrt(self.kernel_function(d / bandwidth)) for d in distances[0]], dtype=np.float64, ) weighted_x = weights[:, None] * self.preprocessor.fit_transform(knn_x) weighted_y = weights * knn_y self.weights = np.linalg.lstsq(weighted_x, weighted_y, rcond=None)[0] y_pred = float( np.dot( self.preprocessor.fit_transform([point.x])[0], self.weights, ) ) return Point( x=point.x, y=y_pred, is_evaluated=False, )