Source code for cobra.sampling.hr_sampler

"""Provide the base class and associated functions for Hit-and-Run samplers."""

import ctypes
import logging
from abc import ABC, abstractmethod
from multiprocessing import Array
from time import time
from typing import TYPE_CHECKING, NamedTuple, Optional, Tuple

import numpy as np
import pandas as pd
from optlang.interface import OPTIMAL
from optlang.symbolics import Zero

from cobra.util import constraint_matrices, create_stoichiometric_matrix, nullspace


if TYPE_CHECKING:
    from cobra import Model


[docs]logger = logging.getLogger(__name__)
[docs]Problem = NamedTuple( "Problem", [ ("equalities", np.ndarray), ("b", np.ndarray), ("inequalities", np.ndarray), ("bounds", np.ndarray), ("variable_fixed", np.ndarray), ("variable_bounds", np.ndarray), ("nullspace", np.matrix), ("homogeneous", bool),
], ) """Define the matrix representation of a sampling problem. A named tuple consisting of 6 arrays, 1 matrix and 1 boolean. Attributes ---------- equalities : numpy.array All equality constraints in the model. b : numpy.array The right side of the equality constraints. inequalities : numpy.array All inequality constraints in the model. bounds : numpy.array The lower and upper bounds for the inequality constraints. variable_fixed : numpy.array A boolean vector indicating whether the variable at that index is fixed i.e., whether `variable.lower_bound == variable.upper_bound`. variable_bounds : numpy.array The lower and upper bounds for the variables. nullspace : numpy.matrix A matrix containing the nullspace of the equality constraints. Each column is one basis vector. homogeneous: bool Indicates whether the sampling problem is homogeneous, e.g. whether there exist no non-zero fixed variables or constraints. """
[docs]def shared_np_array( shape: Tuple[int, int], data: Optional[np.ndarray] = None, integer: bool = False ) -> np.ndarray: """Create a new numpy array that resides in shared memory. Parameters ---------- shape : tuple of int The shape of the new array. data : numpy.array, optional Data to copy to the new array. Has to have the same shape (default None). integer : bool, optional Whether to use an integer array. By default, float array is used (default False). Returns ------- numpy.array The newly created shared numpy array. Raises ------ ValueError If the input `data` (if provided) size is not equal to the created array. """ size = np.prod(shape) if integer: array = Array(ctypes.c_int64, int(size)) np_array = np.frombuffer(array.get_obj(), dtype="int64") else: array = Array(ctypes.c_double, int(size)) np_array = np.frombuffer(array.get_obj()) np_array = np_array.reshape(shape) if data is not None: if len(shape) != len(data.shape): raise ValueError("`data` must have the same shape as the created array.") same = all(x == y for x, y in zip(shape, data.shape)) if not same: raise ValueError("`data` must have the same shape as the created array.") np_array[:] = data return np_array
[docs]class HRSampler(ABC): """ The abstract base class for hit-and-run samplers. New samplers should derive from this class where possible to provide a uniform interface. Parameters ---------- model : cobra.Model The cobra model from which to generate samples. thinning : int The thinning factor of the generated sampling chain. A thinning of 10 means samples are returned every 10 steps. nproj : int > 0, optional How often to reproject the sampling point into the feasibility space. Avoids numerical issues at the cost of lower sampling. If you observe many equality constraint violations with `sampler.validate` you should lower this number (default None). seed : int > 0, optional Sets the random number seed. Initialized to the current time stamp if None (default None). Attributes ---------- feasibility_tol: float The tolerance used for checking equalities feasibility. bounds_tol: float The tolerance used for checking bounds feasibility. n_samples : int The total number of samples that have been generated by this sampler instance. retries : int The overall of sampling retries the sampler has observed. Larger values indicate numerical instabilities. problem : Problem A NamedTuple whose attributes define the entire sampling problem in matrix form. warmup : numpy.matrix A numpy matrix with as many columns as reactions in the model and more than 3 rows containing a warmup sample in each row. None if no warmup points have been generated yet. fwd_idx : numpy.array A numpy array having one entry for each reaction in the model, containing the index of the respective forward variable. rev_idx : numpy.array A numpy array having one entry for each reaction in the model, containing the index of the respective reverse variable. """ def __init__( self, model: "Model", thinning: int, nproj: Optional[int] = None, seed: Optional[int] = None, **kwargs, ) -> None: """Initialize a new sampler object. Other Parameters ---------------- kwargs : Further keyword arguments are passed on to the parent class. Raises ------ TypeError If integer problem is found. """ # This currently has to be done to reset the solver basis which is # required to get deterministic warmup point generation # (in turn required for a working `seed`) if model.solver.is_integer: raise TypeError("Sampling does not work with integer problems.") self.model = model.copy() self.feasibility_tol = model.tolerance self.bounds_tol = model.tolerance self.thinning = thinning if nproj is None: self.nproj = int(min(len(self.model.variables) ** 3, 1e6)) else: self.nproj = nproj self.n_samples = 0 self.retries = 0 self.problem = self.__build_problem() # Set up a map from reaction -> forward/reverse variable var_idx = {v: idx for idx, v in enumerate(self.model.variables)} self.fwd_idx = np.array( [var_idx[r.forward_variable] for r in self.model.reactions] ) self.rev_idx = np.array( [var_idx[r.reverse_variable] for r in self.model.reactions] ) self.warmup = None if seed is None: self._seed = int(time()) else: self._seed = seed # Avoid overflow self._seed = self._seed % np.iinfo(np.int32).max
[docs] def __build_problem(self) -> Problem: """Build the matrix representation of the sampling problem. Returns ------- Problem The matrix representation in the form of a NamedTuple. """ # Set up the mathematical problem prob = constraint_matrices(self.model, zero_tol=self.feasibility_tol) # check if there any non-zero equality constraints equalities = prob.equalities b = prob.b bounds = np.atleast_2d(prob.bounds).T var_bounds = np.atleast_2d(prob.variable_bounds).T homogeneous = all(np.abs(b) < self.feasibility_tol) fixed_non_zero = np.abs(prob.variable_bounds[:, 1]) > self.feasibility_tol fixed_non_zero &= prob.variable_fixed # check if there are any non-zero fixed variables, add them as # equalities to the stoichiometric matrix if any(fixed_non_zero): n_fixed = fixed_non_zero.sum() rows = np.zeros((n_fixed, prob.equalities.shape[1])) rows[range(n_fixed), np.where(fixed_non_zero)] = 1.0 equalities = np.vstack([equalities, rows]) var_b = prob.variable_bounds[:, 1] b = np.hstack([b, var_b[fixed_non_zero]]) homogeneous = False # Set up a projection that can cast point into the nullspace nulls = nullspace(equalities) # convert bounds to a matrix and add variable bounds as well return Problem( equalities=shared_np_array(equalities.shape, equalities), b=shared_np_array(b.shape, b), inequalities=shared_np_array(prob.inequalities.shape, prob.inequalities), bounds=shared_np_array(bounds.shape, bounds), variable_fixed=shared_np_array( prob.variable_fixed.shape, prob.variable_fixed, integer=True ), variable_bounds=shared_np_array(var_bounds.shape, var_bounds), nullspace=shared_np_array(nulls.shape, nulls), homogeneous=homogeneous,
)
[docs] def generate_fva_warmup(self) -> None: """Generate the warmup points for the sampler. Generates warmup points by setting each flux as the sole objective and minimizing/maximizing it. Also caches the projection of the warmup points into the nullspace for non-homogeneous problems (only if necessary). Raises ------ ValueError If flux cone contains a single point or the problem is inhomogeneous. """ self.n_warmup = 0 reactions = self.model.reactions self.warmup = np.zeros((2 * len(reactions), len(self.model.variables))) self.model.objective = Zero for sense in ("min", "max"): self.model.objective_direction = sense for i, r in enumerate(reactions): variables = ( self.model.variables[self.fwd_idx[i]], self.model.variables[self.rev_idx[i]], ) # Omit fixed reactions if they are non-homogeneous if r.upper_bound - r.lower_bound < self.bounds_tol: logger.info(f"Skipping fixed reaction {r.id}") continue self.model.objective.set_linear_coefficients( {variables[0]: 1, variables[1]: -1} ) self.model.slim_optimize() if not self.model.solver.status == OPTIMAL: logger.info(f"Cannot maximize reaction {r.id}, skipping it.") continue primals = self.model.solver.primal_values sol = [primals[v.name] for v in self.model.variables] self.warmup[ self.n_warmup, ] = sol self.n_warmup += 1 # Reset objective self.model.objective.set_linear_coefficients( {variables[0]: 0, variables[1]: 0} ) # Shrink to measure self.warmup = self.warmup[0 : self.n_warmup, :] # Remove redundant search directions keep = np.logical_not(self._is_redundant(self.warmup)) self.warmup = self.warmup[keep, :] self.n_warmup = self.warmup.shape[0] # Catch some special cases if len(self.warmup.shape) == 1 or self.warmup.shape[0] == 1: raise ValueError("Flux cone only consists a single point.") elif self.n_warmup == 2: if not self.problem.homogeneous: raise ValueError( "Cannot sample from an inhomogenous problem " "with only 2 search directions." ) logger.info("All search directions on a line, adding another one.") newdir = self.warmup.T.dot([0.25, 0.25]) self.warmup = np.vstack([self.warmup, newdir]) self.n_warmup += 1 # Shrink warmup points to measure self.warmup = shared_np_array( (self.n_warmup, len(self.model.variables)), self.warmup
)
[docs] def _reproject(self, p: np.ndarray) -> np.ndarray: """Reproject a point into the feasibility region. This function is guaranteed to return a new feasible point. However, no guarantee can be made in terms of proximity to the original point. Parameters ---------- p : numpy.array The current sample point. Returns ------- numpy.array A new feasible point. If `p` is feasible, it will return `p`. """ nulls = self.problem.nullspace equalities = self.problem.equalities # don't reproject if point is feasible if np.allclose( equalities.dot(p), self.problem.b, rtol=0, atol=self.feasibility_tol ): new = p else: logger.info( f"Feasibility violated in sample {self.n_samples}, trying to reproject." ) new = nulls.dot(nulls.T.dot(p)) # Projections may violate bounds # set to random point in space in that case if any(new != p): logger.info( f"Re-projection failed in sample {self.n_samples}, " "using random point in space." ) new = self._random_point() return new
[docs] def _random_point(self) -> np.ndarray: """Find an approximately random point in the flux cone.""" idx = np.random.randint( self.n_warmup, size=min(2, np.ceil(np.sqrt(self.n_warmup))) ) return self.warmup[idx, :].mean(axis=0)
[docs] def _is_redundant(self, matrix: np.matrix, cutoff: Optional[float] = None) -> bool: """Identify redundant rows in a matrix that can be removed.""" cutoff = 1.0 - self.feasibility_tol # Avoid zero variances extra_col = matrix[:, 0] + 1 # Avoid zero rows being correlated with constant rows extra_col[matrix.sum(axis=1) == 0] = 2 corr = np.corrcoef(np.c_[matrix, extra_col]) corr = np.tril(corr, -1) return (np.abs(corr) > cutoff).any(axis=1)
[docs] def _bounds_dist(self, p: np.ndarray) -> np.ndarray: """Get the lower and upper bound distances. Negative is bad.""" prob = self.problem lb_dist = ( p - prob.variable_bounds[ 0, ] ).min() ub_dist = ( prob.variable_bounds[ 1, ] - p ).min() if prob.bounds.shape[0] > 0: const = prob.inequalities.dot(p) const_lb_dist = ( const - prob.bounds[ 0, ] ).min() const_ub_dist = ( prob.bounds[ 1, ] - const ).min() lb_dist = min(lb_dist, const_lb_dist) ub_dist = min(ub_dist, const_ub_dist) return np.array([lb_dist, ub_dist])
@abstractmethod
[docs] def sample(self, n: int, fluxes: bool = True) -> pd.DataFrame: """Abstract sampling function. Should be overwritten by child classes. Parameters ---------- n : int The number of samples that are generated at once. fluxes : bool, optional Whether to return fluxes or the internal solver variables. If set to False, will return a variable for each forward and backward flux as well as all additional variables you might have defined in the model (default True). Returns ------- pandas.DataFrame Returns a pandas DataFrame with `n` rows, each containing a flux sample. """ raise NotImplementedError( "This method needs to be implemented by the subclass."
)
[docs] def batch( self, batch_size: int, batch_num: int, fluxes: bool = True ) -> pd.DataFrame: """Create a batch generator. This is useful to generate `batch_num` batches of `batch_size` samples each. Parameters ---------- batch_size : int The number of samples contained in each batch. batch_num : int The number of batches in the generator. fluxes : bool, optional Whether to return fluxes or the internal solver variables. If set to False, will return a variable for each forward and backward flux as well as all additional variables you might have defined in the model (default True). Yields ------ pandas.DataFrame A DataFrame with dimensions (batch_size x n_r) containing a valid flux sample for a total of n_r reactions (or variables if fluxes=False) in each row. """ for _ in range(batch_num): yield self.sample(batch_size, fluxes=fluxes)
[docs] def validate(self, samples: np.matrix) -> np.ndarray: """Validate a set of samples for equality and inequality feasibility. Can be used to check whether the generated samples and warmup points are feasible. Parameters ---------- samples : numpy.matrix Must be of dimension (samples x n_reactions). Contains the samples to be validated. Samples must be from fluxes. Returns ------- numpy.array A one-dimensional numpy array containing a code of 1 to 3 letters denoting the validation result: - 'v' means feasible in bounds and equality constraints - 'l' means a lower bound violation - 'u' means a lower bound validation - 'e' means and equality constraint violation Raises ------ ValueError If wrong number of columns. """ samples = np.atleast_2d(samples) prob = self.problem if samples.shape[1] == len(self.model.reactions): S = create_stoichiometric_matrix(self.model) b = np.array( [self.model.constraints[m.id].lb for m in self.model.metabolites] ) bounds = np.array([r.bounds for r in self.model.reactions]).T elif samples.shape[1] == len(self.model.variables): S = prob.equalities b = prob.b bounds = prob.variable_bounds else: raise ValueError( "Wrong number of columns. Samples must have a " "column for each flux or variable defined in the " "model." ) feasibility = np.abs(S.dot(samples.T).T - b).max(axis=1) lb_error = ( samples - bounds[ 0, ] ).min(axis=1) ub_error = ( bounds[ 1, ] - samples ).min(axis=1) if samples.shape[1] == len(self.model.variables) and prob.inequalities.shape[0]: consts = prob.inequalities.dot(samples.T) lb_error = np.minimum( lb_error, ( consts - prob.bounds[ 0, ] ).min(axis=1), ) ub_error = np.minimum( ub_error, ( prob.bounds[ 1, ] - consts ).min(axis=1), ) valid = ( (feasibility < self.feasibility_tol) & (lb_error > -self.bounds_tol) & (ub_error > -self.bounds_tol) ) codes = np.repeat("", valid.shape[0]).astype(np.dtype((str, 3))) codes[valid] = "v" codes[lb_error <= -self.bounds_tol] = np.char.add( codes[lb_error <= -self.bounds_tol], "l" ) codes[ub_error <= -self.bounds_tol] = np.char.add( codes[ub_error <= -self.bounds_tol], "u" ) codes[feasibility > self.feasibility_tol] = np.char.add( codes[feasibility > self.feasibility_tol], "e" ) return codes