"""
Wrapper for RoBO with GP (and MCMC)
"""
from __future__ import annotations
from typing import Sequence
import numpy
from orion.algo.space import Space
from robo.acquisition_functions.marginalization import MarginalizationGPMCMC
from robo.models.gaussian_process import GaussianProcess
from robo.models.gaussian_process_mcmc import GaussianProcessMCMC
from orion.algo.robo.base import (
AcquisitionFnName,
MaximizerName,
Model,
RoBO,
build_bounds,
build_kernel,
build_prior,
infer_n_hypers,
)
[docs]class RoBO_GP(RoBO):
"""
Wrapper for RoBO with Gaussian processes
Parameters
----------
space: ``orion.algo.space.Space``
Optimisation space with priors for each dimension.
seed: None, int or sequence of int
Seed to sample initial points and candidates points.
Default: 0.
n_initial_points: int
Number of initial points randomly sampled. If new points
are requested and less than `n_initial_points` are observed,
the next points will also be sampled randomly instead of being
sampled from the parzen estimators.
Default: ``20``
maximizer: str
The optimizer for the acquisition function.
Can be one of ``{"random", "scipy", "differential_evolution"}``.
Defaults to 'random'
acquisition_func: str
Name of the acquisition function. Can be one of ``['ei', 'log_ei', 'pi', 'lcb']``.
normalize_input: bool
Normalize the input based on the provided bounds (zero mean and unit standard deviation).
Defaults to ``True``.
normalize_output: bool
Normalize the output based on data (zero mean and unit standard deviation).
Defaults to ``False``.
"""
def __init__(
self,
space: Space,
seed: int | Sequence[int] | None = 0,
n_initial_points: int = 20,
maximizer: MaximizerName = "random",
acquisition_func: AcquisitionFnName = "log_ei",
normalize_input: bool = True,
normalize_output: bool = False,
):
super().__init__(
space=space,
seed=seed,
n_initial_points=n_initial_points,
maximizer=maximizer,
acquisition_func=acquisition_func,
)
self.normalize_input = normalize_input
self.normalize_output = normalize_output
def build_model(self) -> OrionGaussianProcessWrapper:
"""Builds the model for the optimisation."""
lower, upper = build_bounds(self.space)
kernel = build_kernel(lower, upper)
prior = build_prior(kernel)
return OrionGaussianProcessWrapper(
kernel,
prior=prior,
rng=None,
normalize_input=self.normalize_input,
normalize_output=self.normalize_output,
lower=lower,
upper=upper,
)
[docs]class RoBO_GP_MCMC(RoBO_GP):
"""
Wrapper for RoBO with Gaussian processes using Markov chain Monte Carlo
to marginalize out hyperparameters of the Bayesian Optimization.
Parameters
----------
space: ``orion.algo.space.Space``
Optimisation space with priors for each dimension.
seed: None, int or sequence of int
Seed to sample initial points and candidates points.
Default: 0.
n_initial_points: int
Number of initial points randomly sampled. If new points
are requested and less than `n_initial_points` are observed,
the next points will also be sampled randomly instead of being
sampled from the parzen estimators.
Default: ``20``
maximizer: str
The optimizer for the acquisition function.
Can be one of ``{"random", "scipy", "differential_evolution"}``.
Defaults to 'random'
acquisition_func: str
Name of the acquisition function. Can be one of ``['ei', 'log_ei', 'pi', 'lcb']``.
normalize_input: bool
Normalize the input based on the provided bounds (zero mean and unit standard deviation).
Defaults to ``True``.
normalize_output: bool
Normalize the output based on data (zero mean and unit standard deviation).
Defaults to ``False``.
chain_length: int
The length of the MCMC chain. We start ``n_hypers`` walker for chain_length
steps and we use the last sample in the chain as a hyperparameter sample.
``n_hypers`` is automatically inferred based on dimensionality of the search space.
Defaults to 2000.
burnin_steps: int
The number of burnin steps before the actual MCMC sampling starts.
Defaults to 2000.
"""
def __init__(
self,
space: Space,
seed: int | Sequence[int] | None = 0,
n_initial_points: int = 20,
maximizer: MaximizerName = "random",
acquisition_func: AcquisitionFnName = "log_ei",
normalize_input: bool = True,
normalize_output: bool = False,
chain_length=2000,
burnin_steps=2000,
):
super().__init__(
space,
seed=seed,
n_initial_points=n_initial_points,
maximizer=maximizer,
acquisition_func=acquisition_func,
normalize_input=normalize_input,
normalize_output=normalize_output,
)
self.chain_length = chain_length
self.burnin_steps = burnin_steps
def build_acquisition_func(self) -> MarginalizationGPMCMC:
"""Build a marginalized acquisition function with MCMC."""
return MarginalizationGPMCMC(super().build_acquisition_func())
def build_model(self) -> OrionGaussianProcessMCMCWrapper:
lower, upper = build_bounds(self.space)
kernel = build_kernel(lower, upper)
prior = build_prior(kernel)
n_hypers = infer_n_hypers(kernel)
return OrionGaussianProcessMCMCWrapper(
kernel,
prior=prior,
n_hypers=n_hypers,
chain_length=self.chain_length,
burnin_steps=self.burnin_steps,
normalize_input=self.normalize_input,
normalize_output=self.normalize_output,
rng=None,
lower=lower,
upper=upper,
)
class OrionGaussianProcessWrapper(GaussianProcess, Model):
"""
Wrapper for RoBO's Gaussian processes model
Parameters
----------
kernel : george kernel object
Specifies the kernel that is used for all Gaussian Process
prior : prior object
Defines a prior for the hyperparameters of the GP. Make sure that
it implements the Prior interface.
noise : float
Noise term that is added to the diagonal of the covariance matrix
for the Cholesky decomposition.
use_gradients : bool
Use gradient information to optimize the negative log likelihood
lower : numpy.array(D,)
Lower bound of the input space which is used for the input space normalization
upper : numpy.array(D,)
Upper bound of the input space which is used for the input space normalization
normalize_output : bool
Zero mean unit variance normalization of the output values
normalize_input : bool
Normalize all inputs to be in [0, 1]. This is important to define good priors for the
length scales.
rng: numpy.random.RandomState
Random number generator
"""
def set_state(self, state_dict: dict) -> None:
"""Restore the state of the optimizer"""
self.rng.set_state(state_dict["model_rng_state"])
self.prior.rng.set_state(state_dict["prior_rng_state"])
self.kernel.set_parameter_vector(state_dict["model_kernel_parameter_vector"])
self.noise = state_dict["noise"]
def state_dict(self) -> dict:
"""Return the current state of the optimizer so that it can be restored"""
return {
"prior_rng_state": self.prior.rng.get_state(),
"model_rng_state": self.rng.get_state(),
"model_kernel_parameter_vector": self.kernel.get_parameter_vector().tolist(),
"noise": self.noise,
}
def seed(self, seed: int | Sequence[int] | None) -> None:
"""Seed all internal RNGs"""
seeds = numpy.random.RandomState(seed).randint(1, int(10e8), size=2)
self.rng.seed(seeds[0])
self.prior.rng.seed(seeds[1])
class OrionGaussianProcessMCMCWrapper(GaussianProcessMCMC, Model):
"""
Wrapper for RoBO's Gaussian processes with MCMC model
Parameters
----------
kernel : george kernel object
Specifies the kernel that is used for all Gaussian Process
prior : prior object
Defines a prior for the hyperparameters of the GP. Make sure that
it implements the Prior interface. During MCMC sampling the
lnlikelihood is multiplied with the prior.
n_hypers : int
The number of hyperparameter samples. This also determines the
number of walker for MCMC sampling as each walker will
return one hyperparameter sample.
chain_length : int
The length of the MCMC chain. We start n_hypers walker for
chain_length steps and we use the last sample
in the chain as a hyperparameter sample.
lower : np.array(D,)
Lower bound of the input space which is used for the input space normalization
upper : np.array(D,)
Upper bound of the input space which is used for the input space normalization
burnin_steps : int
The number of burnin steps before the actual MCMC sampling starts.
rng: np.random.RandomState
Random number generator
"""
def set_state(self, state_dict: dict) -> None:
"""Restore the state of the optimizer"""
self.rng.set_state(state_dict["model_rng_state"])
self.prior.rng.set_state(state_dict["prior_rng_state"])
if state_dict.get("model_p0", None) is not None:
self.p0 = numpy.array(state_dict["model_p0"])
self.burned = True
elif hasattr(self, "p0"):
delattr(self, "p0")
self.burned = False
def state_dict(self) -> dict:
"""Return the current state of the optimizer so that it can be restored"""
s_dict = {
"prior_rng_state": self.prior.rng.get_state(),
"model_rng_state": self.rng.get_state(),
}
if hasattr(self, "p0"):
s_dict["model_p0"] = self.p0.tolist()
return s_dict
def seed(self, seed: int | Sequence[int] | None) -> None:
"""Seed all internal RNGs"""
seeds = numpy.random.RandomState(seed).randint(1, int(10e8), size=2)
self.rng.seed(seeds[0])
self.prior.rng.seed(seeds[1])