Source code for lambeq.training.nelder_mead_optimizer

# Copyright 2021-2024 Cambridge Quantum Computing Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Nelder-Mead Optimizer
=====================
The Nelder-Mead algorithm performs unconstrained optimization in
multidimensional spaces. It is based on the Simplex method and is
particularly useful when the (first and second) derivatives of the
objective function are unknown or unreliable. Unlike some other methods,
it does not take into account bounds or constraints on the variables.

Although the Nelder-Mead algorithm is generally robust and widely
applicable, it has some limitations. When derivatives can be accurately
computed, alternative algorithms that utilize this information may offer
better performance. These methods are often preferred due to their
ability to handle a wider range of scenarios and their tendency to
converge to more optimal solutions.

Nelder-Mead technique is a heuristic search approach, so it may converge
to non-stationary points or sub-optimal solutions.


"""
from __future__ import annotations

from collections.abc import Callable, Iterable, Mapping
from typing import Any
import warnings

import numpy as np
from numpy.typing import ArrayLike

from lambeq.backend.tensor import Diagram
from lambeq.core.utils import flatten
from lambeq.training.optimizer import Optimizer
from lambeq.training.quantum_model import QuantumModel


[docs]class NelderMeadOptimizer(Optimizer): """An optimizer based on the Nelder-Mead algorithm. This implementation is based heavily on SciPy's `optimize.minimize <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html>`_. """ model: QuantumModel bounds: np.ndarray | None
[docs] def __init__(self, *, model: QuantumModel, loss_fn: Callable[[Any, Any], float], hyperparams: dict[str, float] | None = None, bounds: ArrayLike | None = None) -> None: """Initialise the Nelder-Mead optimizer. The hyperparameters may contain the following key-value pairs: - `adaptive`: bool, default: False Adjust the algorithm's parameters based on the dimensionality of the problem. This is particularly helpful when minimizing functions in high-dimensional spaces. - `maxfev`: int, default: 1000 Maximum number of function evaluations allowed. - `initial_simplex`: ArrayLike (N+1, N), default: None If provided, replaces the initial model weights. Each row should contain the coordinates of the `i`th vertex of the `N+1` vertices in the simplex, where `N` is the dimension. - `xatol`: float, default: 1e-4 The acceptable level of absolute error in the optimal model weights (optimal solution) between iterations that indicates convergence. - `fatol`: float, default: 1e-4 The acceptable level of absolute error in the loss value between iterations that indicates convergence. Parameters ---------- model : :py:class:`.QuantumModel` A lambeq quantum model. hyperparams : dict of str to float A dictionary containing the models hyperparameters. loss_fn : Callable[[ArrayLike, ArrayLike], float]] A loss function of form `loss(prediction, labels)`. bounds : ArrayLike, optional The range of each of the model parameters. Raises ------ ValueError - If the hyperparameters are not set correctly, or if the length of `bounds` does not match the number of the model parameters. - If the lower bounds are greater than the upper bounds. - If the initial simplex is not a 2D array. - If the initial simplex does not have N+1 rows, where N is the number of model parameters. Warning - If the initial model weights are not within the bounds. References ---------- Gao, Fuchang & Han, Lixing. (2012). Implementing the Nelder-Mead Simplex Algorithm with Adaptive Parameters. `Computational Optimization and Applications`, 51. 259-277. 10.1007/s10589-010-9329-3. """ if hyperparams is None: hyperparams = {} super().__init__(model=model, hyperparams=hyperparams, loss_fn=loss_fn, bounds=bounds) self.ncalls = 0 self.current_sweep = 1 self.adaptive = hyperparams.get('adaptive', False) self.maxfev = hyperparams.get('maxfev', 1000) self.initial_simplex = hyperparams.get('initial_simplex', None) self.xatol = hyperparams.get('xatol', 1e-4) self.fatol = hyperparams.get('fatol', 1e-4) if self.adaptive: self.dim = float(len(self.model.weights)) self.rho = 1 self.chi = 1 + 2 / self.dim self.psi = 0.75 - 1 / (2 * self.dim) self.sigma = 1 - 1 / self.dim else: self.rho = 1 self.chi = 2 self.psi = 0.5 self.sigma = 0.5 self.nonzdelt = 0.05 self.zdelt = 0.00025 if bounds is None: self.bounds = None else: self.bounds = np.asarray(bounds) if len(self.bounds) != len(self.model.weights): raise ValueError( 'Length of `bounds` must be the same as the ' 'number of the model parameters' ) lower_bound = self.bounds[:, 0] upper_bound = self.bounds[:, 1] # check bounds if (lower_bound > upper_bound).any(): raise ValueError( 'Nelder-Mead lower bounds must be less than upper bounds.' ) if ( np.any(lower_bound > self.model.weights) or np.any(self.model.weights > upper_bound) ): warnings.warn( 'Initial value of model weights is not within the bounds.', stacklevel=2, ) self.model.weights = self.project(self.model.weights) self.N = len(self.model.weights) if self.initial_simplex is None: self.sim = np.empty((self.N + 1, self.N), dtype=self.model.weights.dtype) self.sim[0] = model.weights for k in range(self.N): y = np.array(self.model.weights, copy=True) if y[k] != 0: y[k] = (1 + self.nonzdelt) * y[k] else: y[k] = self.zdelt self.sim[k + 1] = y else: self.sim = np.asfarray(self.initial_simplex).copy() if ( self.sim.ndim != 2 or self.sim.shape[0] != self.sim.shape[1] + 1 ): raise ValueError( '`initial_simplex` should be an array of shape ' f'({self.N + 1}, {self.N})' ) if len(self.model.weights) != self.sim.shape[1]: raise ValueError( 'Size of `initial_simplex` is not consistent with ' '`model.weights`' ) self.N = self.sim.shape[1] self.sim = self.project(self.sim) self.fsim = np.full((self.N + 1,), np.inf, dtype=float) self.first_iter = True # flag for first iteration
[docs] def project(self, x: np.ndarray) -> np.ndarray: if self.bounds is None: return x else: return x.clip(self.bounds[:, 0], self.bounds[:, 1])
[docs] def objective(self, x: Iterable[Any], y: ArrayLike, w: ArrayLike) -> float: """The objective function to be minimized. Parameters ---------- x : ArrayLike The input data. y : ArrayLike The labels. w : ArrayLike The model parameters. Returns ------- result: float The result of the objective function. Raises ------ ValueError If the objective function does not return a scalar value. """ self.ncalls += 1 self.model.weights = np.copy(w) result = self.loss_fn(self.model(x), y) # `result` must be a scalar if not np.isscalar(result): try: result = np.asarray(result).item() except (TypeError, ValueError) as e: raise ValueError( 'Objective function must return a scalar' ) from e return result
[docs] def backward(self, batch: tuple[Iterable[Any], np.ndarray]) -> float: """Calculate the gradients of the loss function. The gradients are calculated with respect to the model parameters. Parameters ---------- batch : tuple of Iterable and numpy.ndarray Current batch. Contains an Iterable of diagrams in index 0, and the targets in index 1. Returns ------- float The calculated loss. """ diagrams, targets = batch diags_gen = flatten(diagrams) # the symbolic parameters parameters = self.model.symbols # try to extract the relevant parameters from the diagrams relevant_params = set.union(*[diag.free_symbols for diag in diags_gen if isinstance(diag, Diagram)]) if not relevant_params: relevant_params = set(parameters) mask = np.array([int(sym in relevant_params) for sym in parameters]) # Initialize ind, sim and fsim if self.first_iter: for k in range(self.N + 1): self.fsim[k] = self.objective(diagrams, targets, self.sim[k]) self.ind = np.argsort(self.fsim) self.sim = self.sim.take(self.ind, 0) self.fsim = self.fsim.take(self.ind, 0) self.first_iter = False if not ( np.abs(self.sim[1:] - self.sim[0]).max() <= self.xatol and np.abs(self.fsim[0] - self.fsim[1:]).max() <= self.fatol ): xbar = self.sim[:-1].sum(0) / self.N xr = self.project((1 + self.rho) * xbar - self.rho * self.sim[-1]) fxr = self.objective(diagrams, targets, xr) shrink = False if fxr < self.fsim[0]: xe = self.project( xbar + self.rho * self.chi * (xbar - self.sim[-1]) ) fxe = self.objective(diagrams, targets, xe) if fxe < fxr: self.sim[-1] = xe self.fsim[-1] = fxe else: self.sim[-1] = xr self.fsim[-1] = fxr elif fxr < self.fsim[-2]: # and fsim[0] <= fxr self.sim[-1] = xr self.fsim[-1] = fxr elif fxr < self.fsim[-1]: # and fxr >= fsim[-2] and fsim[0] <= fxr # Perform contraction xc = self.project( xbar + self.psi * self.rho * (xbar - self.sim[-1]) ) fxc = self.objective(diagrams, targets, xc) if fxc <= fxr: self.sim[-1] = xc self.fsim[-1] = fxc else: shrink = True else: # Perform an inside contraction xcc = self.project(xbar + self.psi * (self.sim[-1] - xbar)) fxcc = self.objective(diagrams, targets, xcc) if fxcc < self.fsim[-1]: self.sim[-1] = xcc self.fsim[-1] = fxcc else: shrink = True if shrink: for j in range(1, self.N + 1): self.sim[j] = self.project( self.sim[0] + self.sigma * (self.sim[j] - self.sim[0]) ) self.fsim[j] = self.objective(diagrams, targets, self.sim[j]) self.ind = np.argsort(self.fsim) self.sim = self.sim.take(self.ind, 0) self.fsim = self.fsim.take(self.ind, 0) loss = float(np.min(self.fsim)) self.gradient = self.sim[0] * mask if self.ncalls >= self.maxfev: warnings.warn( 'Maximum number of function evaluations exceeded.', stacklevel=3 ) return loss
[docs] def step(self) -> None: """Perform optimisation step.""" self.model.weights = np.copy(self.gradient) self.model.weights = self.project(self.model.weights) self.update_hyper_params() self.zero_grad()
[docs] def update_hyper_params(self) -> None: """Update the hyperparameters of the Nelder-Mead algorithm.""" self.current_sweep += 1
[docs] def state_dict(self) -> dict[str, Any]: """Return optimizer states as dictionary. Returns ------- dict A dictionary containing the current state of the optimizer. """ return { 'adaptive': self.adaptive, 'initial_simplex': self.initial_simplex, 'xatol': self.xatol, 'fatol': self.fatol, 'sim': self.sim, 'fsim': self.fsim, 'ncalls': self.ncalls, 'first_iter': self.first_iter, 'current_sweep': self.current_sweep, }
[docs] def load_state_dict(self, state_dict: Mapping[str, Any]) -> None: """Load state of the optimizer from the state dictionary. Parameters ---------- state_dict : dict A dictionary containing a snapshot of the optimizer state. """ self.adaptive = state_dict['adaptive'] self.initial_simplex = state_dict['initial_simplex'] self.xatol = state_dict['xatol'] self.fatol = state_dict['fatol'] self.sim = state_dict['sim'] self.fsim = state_dict['fsim'] self.ncalls = state_dict['ncalls'] self.first_iter = state_dict['first_iter'] self.current_sweep = state_dict['current_sweep']