Source code for orion.algo.robo.dngo

"""
Wrapper for RoBO with DNGO
"""
from __future__ import annotations

import logging
from typing import OrderedDict, Sequence

import numpy as np
import torch
from orion.algo.space import Space
from pybnn.dngo import (
    DNGO,
    BaseModel,
    BayesianLinearRegression,
    Net,
    Prior,
    emcee,
    optim,
    optimize,
    time,
    zero_mean_unit_var_denormalization,
    zero_mean_unit_var_normalization,
)
from torch.nn import functional as F
from torch.utils.data import DataLoader, TensorDataset

from orion.algo.robo.base import (
    AcquisitionFnName,
    MaximizerName,
    Model,
    RoBO,
    build_bounds,
    build_kernel,
    infer_n_hypers,
)

logger = logging.getLogger(__name__)


# pylint: disable = too-many-instance-attributes
class OrionDNGOWrapper(DNGO, Model):
    """
    Wrapper for PyBNN's DNGO model

    Parameters
    ----------
    batch_size: int
        Batch size for training the neural network
    num_epochs: int
        Number of epochs for training
    learning_rate: float
        Initial learning rate for Adam
    adapt_epoch: int
        Defines after how many epochs the learning rate will be decayed by a factor 10
    n_units_1: int
        Number of units in layer 1
    n_units_2: int
        Number of units in layer 2
    n_units_3: int
        Number of units in layer 3
    alpha: float
        Hyperparameter of the Bayesian linear regression
    beta: float
        Hyperparameter of the Bayesian linear regression
    prior: Prior object
        Prior for alpa and beta. If set to None the default prior is used
    do_mcmc: bool
        If set to true different values for alpha and beta are sampled via MCMC from the marginal
        log likelihood. Otherwise the marginal log likehood is optimized with scipy fmin function.
    n_hypers : int
        Number of samples for alpha and beta
    chain_length : int
        The chain length of the MCMC sampler
    burnin_steps: int
        The number of burnin steps before the sampling procedure starts
    normalize_output : bool
        Zero mean unit variance normalization of the output values
    normalize_input : bool
        Zero mean unit variance normalization of the input values
    rng: np.random.RandomState
        Random number generator

    """

    # pylint: disable = too-many-locals
    def __init__(
        self,
        lower: np.ndarray,
        upper: np.ndarray,
        device: torch.device | str | None = None,
        batch_size: int = 10,
        num_epochs: int = 500,
        learning_rate: float = 0.01,
        adapt_epoch: int = 5000,
        n_units_1: int = 50,
        n_units_2: int = 50,
        n_units_3: int = 50,
        alpha: float = 1.0,
        beta: float = 1000,
        prior: Prior | None = None,
        do_mcmc: bool = True,
        n_hypers: int = 20,
        chain_length: int = 2000,
        burnin_steps: int = 2000,
        normalize_input: int = True,
        normalize_output: int = True,
        rng: np.random.RandomState | None = None,
    ):
        super().__init__(
            batch_size=batch_size,
            num_epochs=num_epochs,
            learning_rate=learning_rate,
            adapt_epoch=adapt_epoch,
            n_units_1=n_units_1,
            n_units_2=n_units_2,
            n_units_3=n_units_3,
            alpha=alpha,
            beta=beta,
            prior=prior,
            do_mcmc=do_mcmc,
            n_hypers=n_hypers,
            chain_length=chain_length,
            burnin_steps=burnin_steps,
            normalize_input=normalize_input,
            normalize_output=normalize_output,
            rng=rng,
        )
        self.burned: bool = False
        self.lower = lower
        self.upper = upper
        self.device = torch.device(
            device or ("cuda" if torch.cuda.is_available() else "cpu")
        )
        self.prior: Prior
        self.network: Net
        self.optimizer: optim.Adam
        self._initial_network_weights: OrderedDict[str, torch.Tensor]
        self._initial_optimizer_state: dict
        self._create_modules()

    def _create_modules(self):
        """Create the network and the optimizer, using the global torch RNG."""
        assert self.lower.ndim == 1
        self.network = Net(
            n_inputs=self.lower.shape[0],
            n_units=[self.n_units_1, self.n_units_2, self.n_units_3],
        ).to(self.device)

        self.optimizer = optim.Adam(
            self.network.parameters(), lr=self.init_learning_rate
        )
        # Copies of the network and optimizer parameters, so we just reload the weights rather than
        # recreate a new network at each iteration.
        self._initial_network_weights = self.network.state_dict()
        self._initial_optimizer_state = self.optimizer.state_dict()

    def set_state(self, state_dict: dict) -> None:
        """Restore the state of the optimizer"""
        torch.random.set_rng_state(state_dict["torch"])
        if torch.cuda.is_available() and state_dict["torch_cuda"] is not None:
            torch.cuda.set_rng_state_all(state_dict["torch_cuda"])
        self.rng.set_state(state_dict["rng"])
        self.prior.rng.set_state(state_dict["prior_rng"])

    def state_dict(self) -> dict:
        """Return the current state algorithm so that it can be restored."""
        # NOTE: In the case of DNGO, we just need to save the RNG state, since the networks are
        # not reused between iterations and are created using the RNG state.
        return {
            "torch": torch.random.get_rng_state(),
            "torch_cuda": (
                torch.cuda.get_rng_state_all() if torch.cuda.is_available() else None
            ),
            "rng": self.rng.get_state(),
            "prior_rng": self.prior.rng.get_state(),
        }

    def seed(self, seed: int) -> None:
        """Seed all internal RNGs"""
        self.rng = np.random.RandomState(seed)
        rand_nums = self.rng.randint(1, int(10e8), 2)
        pytorch_seed = rand_nums[0]

        if torch.cuda.is_available():
            torch.backends.cudnn.benchmark = False  # type: ignore
            torch.cuda.manual_seed_all(pytorch_seed)
            torch.backends.cudnn.deterministic = True  # type: ignore

        torch.manual_seed(pytorch_seed)

        self.prior.rng.seed(rand_nums[1])
        # Recreate the modules using that RNG seed.
        self._create_modules()

    def _to_tensor(self, v: np.ndarray) -> torch.Tensor:
        return torch.as_tensor(v, device=self.device, dtype=torch.float32)

    @staticmethod
    def _to_numpy(v: torch.Tensor) -> np.ndarray:
        return v.detach().cpu().numpy()

    def train_epoch(self, X: np.ndarray, y: np.ndarray) -> float:
        """Perform one training epoch, and return the average loss per batch."""
        train_err: torch.Tensor | float = 0.0
        batch_index = 0

        # Check if we have enough points to create a minibatch otherwise use all data points
        batch_size = min(self.batch_size, self.X.shape[0])
        X_tensor = self._to_tensor(X)
        y_tensor = self._to_tensor(y)

        dataset = TensorDataset(X_tensor, y_tensor)
        epoch_dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
        for batch_index, (inputs, targets) in enumerate(epoch_dataloader):
            self.optimizer.zero_grad()
            output = self.network(inputs)
            loss = F.mse_loss(output, targets)
            loss.backward()
            self.optimizer.step()

            train_err += loss.detach()

        average_error = train_err / (batch_index + 1)

        logger.debug(  # pylint: disable=logging-too-many-args
            "Training loss:\t\t{:.5g}",
            average_error,
        )
        return float(average_error)

    @BaseModel._check_shapes_train  # type: ignore  pylint: disable=protected-access
    def train(self, X: np.ndarray, y: np.ndarray, do_optimize: bool = True):
        """
        Trains the model on the provided data.

        NOTE: Overwritten here to fix the following issues:
        - memory leak in the training loop due to accumulating (and storing) live losses.
        - Add cuda training when available
        - reload initial weights instead of re-creating the model and optimizer at each iteration.

        Parameters
        ----------
        X: np.ndarray (N, D)
            Input data points. The dimensionality of X is (N, D),
            with N as the number of points and D is the number of features.
        y: np.ndarray (N,)
            The corresponding target values.
        do_optimize: boolean
            If set to true the hyperparameters are optimized otherwise
            the default hyperparameters are used.

        """
        start_time = time.time()
        logger.info("Starting training with %s samples", X.shape[0])

        self.network.load_state_dict(self._initial_network_weights)
        self.optimizer.load_state_dict(self._initial_optimizer_state)

        # Normalize inputs
        if self.normalize_input:
            self.X, self.X_mean, self.X_std = zero_mean_unit_var_normalization(X)
        else:
            self.X = X

        # Normalize outputs
        if self.normalize_output:
            self.y, self.y_mean, self.y_std = zero_mean_unit_var_normalization(y)
        else:
            self.y = y

        self.y = self.y[:, None]

        # Start training
        loss_per_epoch = np.zeros([self.num_epochs])

        for epoch in range(self.num_epochs):
            epoch_start_time = time.time()

            average_batch_loss = self.train_epoch(self.X, self.y)

            loss_per_epoch[epoch] = average_batch_loss

            logger.debug(  # pylint: disable=logging-too-many-args
                "Epoch {} of {}",
                epoch + 1,
                self.num_epochs,
            )
            curtime = time.time()
            epoch_time = curtime - epoch_start_time
            total_time = curtime - start_time

            logger.debug(  # pylint: disable=logging-too-many-args
                "Epoch time {:.3f}s, total time {:.3f}s",
                epoch_time,
                total_time,
            )

        # Design matrix
        self.Theta = self._to_numpy(self.network.basis_funcs(self._to_tensor(self.X)))

        if do_optimize:
            if self.do_mcmc:
                self.sampler = emcee.EnsembleSampler(
                    self.n_hypers, 2, self.marginal_log_likelihood
                )

                # Do a burn-in in the first iteration
                if not self.burned:
                    # Initialize the walkers by sampling from the prior
                    self.p0 = self.prior.sample_from_prior(self.n_hypers)
                    # Run MCMC sampling
                    # NOTE: This has changed with the newer emcee versions:
                    self.p0, _, _ = self.sampler.run_mcmc(
                        self.p0, self.burnin_steps, rstate0=self.rng
                    )

                    self.burned = True

                # Start sampling
                pos, _, _ = self.sampler.run_mcmc(
                    self.p0, self.chain_length, rstate0=self.rng
                )

                # Save the current position, it will be the startpoint in
                # the next iteration
                self.p0 = pos

                # Take the last samples from each walker set them back on a linear scale
                linear_theta = np.exp(self.sampler.chain[:, -1])
                self.hypers = linear_theta
                self.hypers[:, 1] = 1 / self.hypers[:, 1]
            else:
                # Optimize hyperparameters of the Bayesian linear regression
                p0 = self.prior.sample_from_prior(n_samples=1)
                res = optimize.fmin(self.negative_mll, p0)
                self.hypers = [[np.exp(res[0]), 1 / np.exp(res[1])]]
        else:

            self.hypers = [[self.alpha, self.beta]]

        logger.info("Hypers: %s", self.hypers)
        self.models: list = []
        for sample in self.hypers:
            # Instantiate a model for each hyperparameter configuration
            model = BayesianLinearRegression(
                alpha=sample[0], beta=sample[1], basis_func=None
            )
            model.train(self.Theta, self.y[:, 0], do_optimize=False)

            self.models.append(model)

    # pylint: disable=protected-access
    @BaseModel._check_shapes_predict  # type: ignore
    def predict(
        self, X_test: np.ndarray
    ) -> tuple[np.ndarray, np.ndarray]:  # pylint: disable=unsubscriptable-object
        r"""
        Returns the predictive mean and variance of the objective function at
        the given test points.

        Parameters
        ----------
        X_test: np.ndarray (N, D)
            N input test points

        Returns
        ----------
        np.array(N,)
            predictive mean
        np.array(N,)
            predictive variance

        """
        # Normalize inputs
        if self.normalize_input:
            X_, _, _ = zero_mean_unit_var_normalization(X_test, self.X_mean, self.X_std)
        else:
            X_ = X_test

        # Get features from the net
        theta = self._to_numpy(self.network.basis_funcs(self._to_tensor(X_)))

        # Marginalise predictions over hyperparameters of the BLR
        mu = np.zeros([len(self.models), X_test.shape[0]])
        var = np.zeros([len(self.models), X_test.shape[0]])

        for i, m in enumerate(self.models):
            mu[i], var[i] = m.predict(theta)

        # See the algorithm runtime prediction paper by Hutter et al
        # for the derivation of the total variance
        m = np.mean(mu, axis=0)
        v = np.mean(mu**2 + var, axis=0) - m**2

        # Clip negative variances and set them to the smallest
        # positive float value
        if v.shape[0] == 1:
            v = np.clip(v, np.finfo(v.dtype).eps, np.inf)
        else:
            v = np.clip(v, np.finfo(v.dtype).eps, np.inf)
            v[np.where((v < np.finfo(v.dtype).eps) & (v > -np.finfo(v.dtype).eps))] = 0

        if self.normalize_output:
            m = zero_mean_unit_var_denormalization(m, self.y_mean, self.y_std)
            v *= self.y_std**2

        return m, v


# pylint: disable=unsubscriptable-object, too-few-public-methods
[docs]class RoBO_DNGO(RoBO[OrionDNGOWrapper]): """ Wrapper for RoBO with DNGO For more information on the algorithm, see original paper at http://proceedings.mlr.press/v37/snoek15.html. J. Snoek, O. Rippel, K. Swersky, R. Kiros, N. Satish, N. Sundaram, M.~M.~A. Patwary, Prabhat, R.~P. Adams Scalable Bayesian Optimization Using Deep Neural Networks Proc. of ICML'15 Parameters ---------- space: ``orion.algo.space.Space`` Optimisation space with priors for each dimension. seed: None, int or sequence of int Seed to sample initial points and candidates points. Default: 0. n_initial_points: int Number of initial points randomly sampled. If new points are requested and less than `n_initial_points` are observed, the next points will also be sampled randomly instead of being sampled from the parzen estimators. Default: ``20`` maximizer: str The optimizer for the acquisition function. Can be one of ``{"random", "scipy", "differential_evolution"}``. Defaults to 'random' acquisition_func: str Name of the acquisition function. Can be one of ``['ei', 'log_ei', 'pi', 'lcb']``. normalize_input: bool Normalize the input based on the provided bounds (zero mean and unit standard deviation). Defaults to ``True``. normalize_output: bool Normalize the output based on data (zero mean and unit standard deviation). Defaults to ``False``. chain_length : int The chain length of the MCMC sampler burnin_steps: int The number of burnin steps before the sampling procedure starts batch_size: int Batch size for training the neural network num_epochs: int Number of epochs for training learning_rate: float Initial learning rate for Adam adapt_epoch: int Defines after how many epochs the learning rate will be decayed by a factor 10 """ def __init__( self, space: Space, seed: int | Sequence[int] | None = 0, n_initial_points: int = 20, maximizer: MaximizerName = "random", acquisition_func: AcquisitionFnName = "log_ei", normalize_input: bool = True, normalize_output: bool = False, chain_length: int = 2000, burnin_steps: int = 2000, batch_size: int = 10, num_epochs: int = 500, learning_rate: float = 1e-2, adapt_epoch: int = 5000, ): super().__init__( space=space, seed=seed, n_initial_points=n_initial_points, maximizer=maximizer, acquisition_func=acquisition_func, ) self.seed = seed self.n_initial_points = n_initial_points self.maximizer = maximizer self.acquisition_func = acquisition_func self.normalize_input = normalize_input self.normalize_output = normalize_output self.chain_length = chain_length self.burnin_steps = burnin_steps self.batch_size = batch_size self.num_epochs = num_epochs self.learning_rate = learning_rate self.adapt_epoch = adapt_epoch def build_model(self) -> OrionDNGOWrapper: """Build the model.""" lower, upper = build_bounds(self.space) n_hypers = infer_n_hypers(build_kernel(lower, upper)) return OrionDNGOWrapper( batch_size=self.batch_size, num_epochs=self.num_epochs, learning_rate=self.learning_rate, adapt_epoch=self.adapt_epoch, n_units_1=50, n_units_2=50, n_units_3=50, alpha=1.0, beta=1000, prior=None, do_mcmc=True, n_hypers=n_hypers, chain_length=self.chain_length, burnin_steps=self.burnin_steps, normalize_input=self.normalize_input, normalize_output=self.normalize_output, rng=None, lower=lower, upper=upper, )