Source code for qbm4eo.rbm

"""
=============================================================================

This file is a part of qbm4eo.rst project.

https://github.com/FeralQubits/qbm4eo

=============================================================================

It has been modified as a part of the EuroHPC PL project funded at the Smart Growth
Operational Programme 2014-2020, Measure 4.2 under the grant agreement no.
POIR.04.02.00-00-D014/20-00.

=============================================================================
"""

import abc
import io
from itertools import islice
from typing import Any, Callable, Dict, Generator, Iterator, Optional, Tuple, Union

import dimod
import numpy as np
from dimod.core.sampler import Sampler
from numpy.typing import NDArray
from scipy.special import expit
from torch import Tensor
from torch.utils.data import DataLoader
from tqdm import tqdm

INITIAL_COEFFICIENT_SCALE: float = 0.1


[docs] def infinite_dataloader_generator( data_loader: Union[ DataLoader[Tuple[Any, Any]], Generator[Tuple[Any, Any], Any, Any], ], ) -> Generator[ Tuple[Any, Any], None, None ]: # https://www.linuxjournal.com/content/pythons-mypy-callables-and-generators """ A generator that infinitely yields data batches from the given dataloader. :param data_loader: A PyTorch DataLoader initialized with the desired dataset. :return: Yields an index of a batch and the data from the data_loader. """ while True: for batch_idx, (data, target) in enumerate(data_loader): yield batch_idx, (data, target)
[docs] def qubo_from_rbm_coefficients( weights: Union[Tensor, NDArray[np.float32]], v_bias: Union[Tensor, NDArray[np.float32]], h_bias: Union[Tensor, NDArray[np.float32]], ) -> dimod.BQM: """Create a QUBO problem representing RBM with given coefficients. :param weights: A square interaction matrix. :param v_bias: An N-element visible layer bias vector. :param h_bias: An M-element hidden layer bias vector. :return: A QUBO, represented as dimod.BQM with N+M variables, such that: - variables 0,...,N-1 correspond to hidden layer - variables 0,...,M-1 correspond to visible layer - visible layer biases correspond to linear coefficients of first N variables - hidden layer biases correspond to linear coefficients of second M variables - weights correspond to interaction terms between first N and second M variables. .. note:: This function does not allow for manipulating how RBM variables are mapped to the QUBO variables. This is not a problem if QUBO is to be used with an unstructured sampler. For sampler, the intended usage is to wrap it in the EmbeddingComposite. """ linear: Dict[int, float] = { **{i: float(bias) for i, bias in enumerate(v_bias)}, **{i: float(bias) for i, bias in enumerate(h_bias, start=len(v_bias))}, } quadratic: Dict[Tuple[int, int], float] = { (i, j + len(v_bias)): float(weights[i, j]) for i in range(len(v_bias)) for j in range(len(h_bias)) } return dimod.BQM(linear, quadratic, offset=0, vartype="BINARY")
[docs] class RBM: """ A class implementing the Restricted Boltzmann Machine. """ def __init__( self, num_visible: int, num_hidden: int, rng: Optional[np.random.Generator] = None, ) -> None: """ A default constructor for the RMB class instances. :param num_visible: Number of neurons in the visible layer. :param num_hidden: Number of neurons in the hidden layer. :param rng: Random number generator to be used by the RBM. """ self.num_visible: int = num_visible self.num_hidden: int = num_hidden self.rng: np.random.Generator = ( rng if rng is not None else np.random.default_rng() ) self.weights: NDArray[np.float32] = ( self.rng.normal(size=(self.num_visible, self.num_hidden)) * INITIAL_COEFFICIENT_SCALE ) self.v_bias: NDArray[np.float32] = ( self.rng.normal(size=self.num_visible) * INITIAL_COEFFICIENT_SCALE ) self.h_bias: NDArray[np.float32] = ( self.rng.normal(size=self.num_hidden) * INITIAL_COEFFICIENT_SCALE )
[docs] def h_probs_given_v(self, v_batch: NDArray[np.float32]) -> NDArray[np.float32]: """ Compute the probabilities of the hidden layer neurons being active given the visible layer neurons' states. :param v_batch: A batch of visible layer neurons' states. :return: A batch of probabilities of the hidden layer neurons being active. """ return expit(self.h_bias + v_batch @ self.weights)
[docs] def sample_h_given_v(self, v_batch: NDArray[np.float32]) -> NDArray[np.float32]: """ Sample the hidden layer neurons' states given the visible layer neurons' states. :param v_batch: A batch of visible layer neurons' states. :return: A batch of hidden layer neurons' states. """ probs: NDArray[np.float32] = self.h_probs_given_v(v_batch) return (self.rng.random(probs.shape) < probs).astype(float)
[docs] def v_probs_given_h(self, h_batch: NDArray[np.float32]) -> NDArray[np.float32]: """ Compute the probabilities of the visible layer neurons' states given the hidden layer neurons' states. :param h_batch: A batch of hidden layer neurons' states. :return: A batch of probabilities of the visible layer neurons' states. """ return expit(self.v_bias + h_batch @ self.weights.T)
[docs] def sample_v_given_h(self, h_batch: NDArray[np.float32]) -> NDArray[np.float32]: """ Sample the visible layer neurons' states given the hidden layer neurons' states. :param h_batch: A batch of hidden layer neurons' states. :return: A batch of visible layer neurons' states. """ probs: NDArray[np.float32] = self.v_probs_given_h(h_batch) return (self.rng.random(probs.shape) < probs).astype(float)
[docs] def reconstruct(self, v_batch: NDArray[np.float32]) -> NDArray[np.float32]: """ Reconstruct the visible layer neurons' states given the visible layer neurons' states. :param v_batch: A batch of visible layer neurons' states. :return: A batch of reconstructed visible layer neurons' states. """ return self.v_probs_given_h(self.h_probs_given_v(v_batch))
[docs] def save(self, file: Union[str, io.BytesIO]) -> None: """ Save the RBM to a file. :param file: A file to save the RBM to. """ np.savez(file, v_bias=self.v_bias, h_bias=self.h_bias, weights=self.weights)
[docs] @classmethod def load(cls, file: Union[str, io.BytesIO]) -> "RBM": """ Load the RBM from a file. :param file: A file to load the RBM from. :return: A loaded RBM. """ data: Dict[str, NDArray[np.float32]] = np.load(file) weights: NDArray[np.float32] = data["weights"] v_bias: NDArray[np.float32] = data["v_bias"] h_bias: NDArray[np.float32] = data["h_bias"] rbm: "RBM" = cls(num_visible=len(v_bias), num_hidden=len(h_bias)) rbm.weights = weights rbm.v_bias = v_bias rbm.h_bias = h_bias return rbm
[docs] class RBMTrainer: """ A base class for implementing the RBM training algorithms. TODO TR: There are some nasty casting between np.matrix and np.ndarray that is there to ensure proper transposition or multiplications when one of the matrix dimension is 1. This probable could be done more elegantly. """ def __init__(self, num_steps: int) -> None: """ A default constructor for the RBMTrainer class instances. :param num_steps: Number of training steps. """ self.num_steps: int = num_steps
[docs] def fit( self, rbm: RBM, data_loader: Union[ DataLoader[Tuple[Any, Any]], Generator[Tuple[Any, Any], Any, Any], ], callback: Callable[[int, RBM, float], None] = None, verbose: bool = False, ) -> None: """ Fits the RBM to the data. :param rbm: An RBM to be trained. :param data_loader: A data loader. :param callback: A callback function to be called after each training step. """ data_iterator: Union[tqdm, Iterator[Tuple[Any, Any]]] = enumerate( islice(infinite_dataloader_generator(data_loader), self.num_steps) ) if verbose: data_iterator = tqdm(data_iterator, total=self.num_steps) for i, (_, (batch, _)) in data_iterator: batch = np.array(batch.detach().cpu().numpy().squeeze()) self.training_step(rbm, batch) loss: float = ( (np.array(batch - rbm.reconstruct(batch)) ** 2).sum() / batch.shape[0] / batch.shape[1] ) if isinstance(data_iterator, tqdm): data_iterator.set_postfix(loss=loss) if callback is not None: callback(i, rbm, loss)
[docs] @abc.abstractmethod def training_step(self, rbm: RBM, batch: NDArray[np.float32]) -> None: """ A single training step. :param rbm: An RBM to be trained. :param batch: A batch of data. """ pass
[docs] class AnnealingRBMTrainer(RBMTrainer): """ A class for training the RBM using the annealing samplers (D-Wave of CIMSampler). """ def __init__( self, num_steps: int, sampler: Sampler, qubo_scale: float = 1.0, learning_rate: float = 0.01, **sampler_kwargs: Dict[str, Any], ) -> None: """ A default constructor for the AnnealingRBMTrainer class instances. :param num_steps: Number of training steps. :param sampler: A sampler to be used during the training. :param qubo_scale: A scaling factor for the QUBO. :param learning_rate: A learning rate. :param sampler_kwargs: Additional keyword arguments to be passed to the sampler. """ super().__init__(num_steps) self.sampler: Sampler = sampler self.sampler_kwargs: Dict[str, Any] = sampler_kwargs self.qubo_scale: float = qubo_scale self.learning_rate: float = learning_rate
[docs] def training_step(self, rbm: RBM, batch: NDArray[np.float32]) -> None: """ A single training step. :param rbm: An RBM to be trained. :param batch: The RBMs' visible layer neurons' states. """ # Conditional probabilities given visible batch input hidden: NDArray[np.float32] = rbm.h_probs_given_v(batch) # Construct QUBO from this RBM bqm: dimod.BQM = qubo_from_rbm_coefficients(rbm.weights, rbm.v_bias, rbm.h_bias) # Scaling to compensate the temperature difference. Strangely, it seems # that in dimod this operation has to be done in place. bqm.scale(self.qubo_scale) # Take a sample of the same size as batch, extract only visible and hidden # variables. If the sampler supports num_reads, use it, otherwise repeat # the sampling for each data point in the batch. if "num_reads" in self.sampler.parameters: sample = self.sampler.sample( bqm, num_reads=len(batch), **self.sampler_kwargs ).record["sample"] else: sample = dimod.concatenate( [ self.sampler.sample(bqm, **self.sampler_kwargs) for _ in range(len(batch)) ] ).record["sample"] # Split, remembering that first variables correspond to hidden layer. sample_v = sample[:, : rbm.num_visible] sample_h = sample[:, rbm.num_visible :] # Update weights. rbm.weights += ( self.learning_rate * (batch.T @ hidden - sample_v.T @ sample_h) / len(batch) ) # And biases rbm.v_bias += ( self.learning_rate * np.asarray(batch - sample_v).sum(axis=0).squeeze() ) rbm.h_bias += ( self.learning_rate * np.asarray(hidden - sample_h).sum(axis=0).squeeze() )
[docs] class CD1Trainer(RBMTrainer): """ A class for training the RBM using the Contrastive Divergence 1 algorithm. """ def __init__(self, num_steps: int, learning_rate: float = 0.01): """ A default constructor for the CD1Trainer class instances. :param num_steps: Number of training steps. :param learning_rate: A learning rate. """ super().__init__(num_steps) self.learning_rate = learning_rate
[docs] def training_step(self, rbm: RBM, batch: NDArray[np.float32]) -> None: """ A single training step. :param rbm: An RBM to be trained. :param batch: The RBMs' visible layer neurons' states. TODO: There seems to be a problem when batch_size is 1. Investigate. """ # Conditional probabilities given visible batch input hidden_1: NDArray[np.float32] = rbm.h_probs_given_v(batch) # Propagate hidden -> visible -> hidden again visible_2: NDArray[np.float32] = rbm.v_probs_given_h(hidden_1) hidden_2: NDArray[np.float32] = rbm.h_probs_given_v(visible_2) rbm.weights += np.array( self.learning_rate * (batch.T @ hidden_1 - visible_2.T @ hidden_2) / len(batch) ) # And biases rbm.v_bias += self.learning_rate * (batch - visible_2).sum(axis=0) rbm.h_bias += self.learning_rate * (hidden_1 - hidden_2).sum(axis=0)