Source code for cobra.sampling.hr_sampler

"""Provide the base class and associated functions for Hit-and-Run samplers."""

import ctypes
import logging
from abc import ABC, abstractmethod
from multiprocessing import Array
from time import time
from typing import TYPE_CHECKING, NamedTuple, Optional, Tuple

import numpy as np
import pandas as pd
from optlang.interface import OPTIMAL
from optlang.symbolics import Zero

from cobra.util import constraint_matrices, create_stoichiometric_matrix, nullspace

    from cobra import Model

[docs]logger = logging.getLogger(__name__)
# Maximum number of retries for sampling
[docs]MAX_TRIES = 100
[docs]Problem = NamedTuple( "Problem", [ ("equalities", np.ndarray), ("b", np.ndarray), ("inequalities", np.ndarray), ("bounds", np.ndarray), ("variable_fixed", np.ndarray), ("variable_bounds", np.ndarray), ("nullspace", np.matrix), ("homogeneous", bool),
], ) """Define the matrix representation of a sampling problem. A named tuple consisting of 6 arrays, 1 matrix and 1 boolean. Attributes ---------- equalities : numpy.array All equality constraints in the model. b : numpy.array The right side of the equality constraints. inequalities : numpy.array All inequality constraints in the model. bounds : numpy.array The lower and upper bounds for the inequality constraints. variable_fixed : numpy.array A boolean vector indicating whether the variable at that index is fixed i.e., whether `variable.lower_bound == variable.upper_bound`. variable_bounds : numpy.array The lower and upper bounds for the variables. nullspace : numpy.matrix A matrix containing the nullspace of the equality constraints. Each column is one basis vector. homogeneous: bool Indicates whether the sampling problem is homogeneous, e.g. whether there exist no non-zero fixed variables or constraints. """
[docs]def shared_np_array( shape: Tuple[int, int], data: Optional[np.ndarray] = None, integer: bool = False ) -> np.ndarray: """Create a new numpy array that resides in shared memory. Parameters ---------- shape : tuple of int The shape of the new array. data : numpy.array, optional Data to copy to the new array. Has to have the same shape (default None). integer : bool, optional Whether to use an integer array. By default, float array is used (default False). Returns ------- numpy.array The newly created shared numpy array. Raises ------ ValueError If the input `data` (if provided) size is not equal to the created array. """ size = if integer: array = Array(ctypes.c_int64, int(size)) np_array = np.frombuffer(array.get_obj(), dtype="int64") else: array = Array(ctypes.c_double, int(size)) np_array = np.frombuffer(array.get_obj()) np_array = np_array.reshape(shape) if data is not None: if len(shape) != len(data.shape): raise ValueError("`data` must have the same shape as the created array.") same = all(x == y for x, y in zip(shape, data.shape)) if not same: raise ValueError("`data` must have the same shape as the created array.") np_array[:] = data return np_array
[docs]class HRSampler(ABC): """ The abstract base class for hit-and-run samplers. New samplers should derive from this class where possible to provide a uniform interface. Parameters ---------- model : cobra.Model The cobra model from which to generate samples. thinning : int The thinning factor of the generated sampling chain. A thinning of 10 means samples are returned every 10 steps. nproj : int > 0, optional How often to reproject the sampling point into the feasibility space. Avoids numerical issues at the cost of lower sampling. If you observe many equality constraint violations with `sampler.validate` you should lower this number (default None). seed : int > 0, optional Sets the random number seed. Initialized to the current time stamp if None (default None). Attributes ---------- feasibility_tol: float The tolerance used for checking equalities feasibility. bounds_tol: float The tolerance used for checking bounds feasibility. n_samples : int The total number of samples that have been generated by this sampler instance. retries : int The overall of sampling retries the sampler has observed. Larger values indicate numerical instabilities. problem : Problem A NamedTuple whose attributes define the entire sampling problem in matrix form. warmup : numpy.matrix A numpy matrix with as many columns as reactions in the model and more than 3 rows containing a warmup sample in each row. None if no warmup points have been generated yet. fwd_idx : numpy.array A numpy array having one entry for each reaction in the model, containing the index of the respective forward variable. rev_idx : numpy.array A numpy array having one entry for each reaction in the model, containing the index of the respective reverse variable. """ def __init__( self, model: "Model", thinning: int, nproj: Optional[int] = None, seed: Optional[int] = None, **kwargs, ) -> None: """Initialize a new sampler object. Other Parameters ---------------- kwargs : Further keyword arguments are passed on to the parent class. Raises ------ TypeError If integer problem is found. """ # This currently has to be done to reset the solver basis which is # required to get deterministic warmup point generation # (in turn required for a working `seed`) if model.solver.is_integer: raise TypeError("Sampling does not work with integer problems.") self.model = model.copy() self.feasibility_tol = model.tolerance self.bounds_tol = model.tolerance self.thinning = thinning if nproj is None: self.nproj = int(min(len(self.model.variables) ** 3, 1e6)) else: self.nproj = nproj self.n_samples = 0 self.retries = 0 self.problem = self.__build_problem() # Set up a map from reaction -> forward/reverse variable var_idx = {v: idx for idx, v in enumerate(self.model.variables)} self.fwd_idx = np.array( [var_idx[r.forward_variable] for r in self.model.reactions] ) self.rev_idx = np.array( [var_idx[r.reverse_variable] for r in self.model.reactions] ) self.warmup = None if seed is None: self._seed = int(time()) else: self._seed = seed # Avoid overflow self._seed = self._seed % np.iinfo(np.int32).max
[docs] def __build_problem(self) -> Problem: """Build the matrix representation of the sampling problem. Returns ------- Problem The matrix representation in the form of a NamedTuple. """ # Set up the mathematical problem prob = constraint_matrices(self.model, zero_tol=self.feasibility_tol) # check if there any non-zero equality constraints equalities = prob.equalities b = prob.b bounds = np.atleast_2d(prob.bounds).T var_bounds = np.atleast_2d(prob.variable_bounds).T homogeneous = all(np.abs(b) < self.feasibility_tol) fixed_non_zero = np.abs(prob.variable_bounds[:, 1]) > self.feasibility_tol fixed_non_zero &= prob.variable_fixed # check if there are any non-zero fixed variables, add them as # equalities to the stoichiometric matrix if any(fixed_non_zero): n_fixed = fixed_non_zero.sum() rows = np.zeros((n_fixed, prob.equalities.shape[1])) rows[range(n_fixed), np.where(fixed_non_zero)] = 1.0 equalities = np.vstack([equalities, rows]) var_b = prob.variable_bounds[:, 1] b = np.hstack([b, var_b[fixed_non_zero]]) homogeneous = False # Set up a projection that can cast point into the nullspace nulls = nullspace(equalities) # convert bounds to a matrix and add variable bounds as well return Problem( equalities=shared_np_array(equalities.shape, equalities), b=shared_np_array(b.shape, b), inequalities=shared_np_array(prob.inequalities.shape, prob.inequalities), bounds=shared_np_array(bounds.shape, bounds), variable_fixed=shared_np_array( prob.variable_fixed.shape, prob.variable_fixed, integer=True ), variable_bounds=shared_np_array(var_bounds.shape, var_bounds), nullspace=shared_np_array(nulls.shape, nulls), homogeneous=homogeneous,
[docs] def generate_fva_warmup(self) -> None: """Generate the warmup points for the sampler. Generates warmup points by setting each flux as the sole objective and minimizing/maximizing it. Also caches the projection of the warmup points into the nullspace for non-homogeneous problems (only if necessary). Raises ------ ValueError If flux cone contains a single point or the problem is inhomogeneous. """ self.n_warmup = 0 reactions = self.model.reactions self.warmup = np.zeros((2 * len(reactions), len(self.model.variables))) self.model.objective = Zero for sense in ("min", "max"): self.model.objective_direction = sense for i, r in enumerate(reactions): variables = ( self.model.variables[self.fwd_idx[i]], self.model.variables[self.rev_idx[i]], ) # Omit fixed reactions if they are non-homogeneous if r.upper_bound - r.lower_bound < self.bounds_tol:"Skipping fixed reaction {}") continue self.model.objective.set_linear_coefficients( {variables[0]: 1, variables[1]: -1} ) self.model.slim_optimize() if not self.model.solver.status == OPTIMAL:"Cannot maximize reaction {}, skipping it.") continue primals = self.model.solver.primal_values sol = [primals[] for v in self.model.variables] self.warmup[ self.n_warmup, ] = sol self.n_warmup += 1 # Reset objective self.model.objective.set_linear_coefficients( {variables[0]: 0, variables[1]: 0} ) # Shrink to measure self.warmup = self.warmup[0 : self.n_warmup, :] # Remove redundant search directions keep = np.logical_not(self._is_redundant(self.warmup)) self.warmup = self.warmup[keep, :] self.n_warmup = self.warmup.shape[0] # Catch some special cases if len(self.warmup.shape) == 1 or self.warmup.shape[0] == 1: raise ValueError("Flux cone only consists a single point.") elif self.n_warmup == 2: if not self.problem.homogeneous: raise ValueError( "Cannot sample from an inhomogenous problem " "with only 2 search directions." )"All search directions on a line, adding another one.") newdir =[0.25, 0.25]) self.warmup = np.vstack([self.warmup, newdir]) self.n_warmup += 1 # Shrink warmup points to measure self.warmup = shared_np_array( (self.n_warmup, len(self.model.variables)), self.warmup
[docs] def _reproject(self, p: np.ndarray) -> np.ndarray: """Reproject a point into the feasibility region. This function is guaranteed to return a new feasible point. However, no guarantee can be made in terms of proximity to the original point. Parameters ---------- p : numpy.array The current sample point. Returns ------- numpy.array A new feasible point. If `p` is feasible, it will return `p`. """ nulls = self.problem.nullspace equalities = self.problem.equalities # don't reproject if point is feasible if np.allclose(, self.problem.b, rtol=0, atol=self.feasibility_tol ): new = p else: f"Feasibility violated in sample {self.n_samples}, trying to reproject." ) new = # Projections may violate bounds # set to random point in space in that case if any(new != p): f"Re-projection failed in sample {self.n_samples}, " "using random point in space." ) new = self._random_point() return new
[docs] def _random_point(self) -> np.ndarray: """Find an approximately random point in the flux cone.""" idx = np.random.randint( self.n_warmup, size=min(2, np.ceil(np.sqrt(self.n_warmup))) ) return self.warmup[idx, :].mean(axis=0)
[docs] def _is_redundant(self, matrix: np.matrix, cutoff: Optional[float] = None) -> bool: """Identify redundant rows in a matrix that can be removed.""" cutoff = 1.0 - self.feasibility_tol # Avoid zero variances extra_col = matrix[:, 0] + 1 # Avoid zero rows being correlated with constant rows extra_col[matrix.sum(axis=1) == 0] = 2 corr = np.corrcoef(np.c_[matrix, extra_col]) corr = np.tril(corr, -1) return (np.abs(corr) > cutoff).any(axis=1)
[docs] def _bounds_dist(self, p: np.ndarray) -> np.ndarray: """Get the lower and upper bound distances. Negative is bad.""" prob = self.problem lb_dist = ( p - prob.variable_bounds[ 0, ] ).min() ub_dist = ( prob.variable_bounds[ 1, ] - p ).min() if prob.bounds.shape[0] > 0: const = const_lb_dist = ( const - prob.bounds[ 0, ] ).min() const_ub_dist = ( prob.bounds[ 1, ] - const ).min() lb_dist = min(lb_dist, const_lb_dist) ub_dist = min(ub_dist, const_ub_dist) return np.array([lb_dist, ub_dist])
[docs] def sample(self, n: int, fluxes: bool = True) -> pd.DataFrame: """Abstract sampling function. Should be overwritten by child classes. Parameters ---------- n : int The number of samples that are generated at once. fluxes : bool, optional Whether to return fluxes or the internal solver variables. If set to False, will return a variable for each forward and backward flux as well as all additional variables you might have defined in the model (default True). Returns ------- pandas.DataFrame Returns a pandas DataFrame with `n` rows, each containing a flux sample. """ raise NotImplementedError( "This method needs to be implemented by the subclass."
[docs] def batch( self, batch_size: int, batch_num: int, fluxes: bool = True ) -> pd.DataFrame: """Create a batch generator. This is useful to generate `batch_num` batches of `batch_size` samples each. Parameters ---------- batch_size : int The number of samples contained in each batch. batch_num : int The number of batches in the generator. fluxes : bool, optional Whether to return fluxes or the internal solver variables. If set to False, will return a variable for each forward and backward flux as well as all additional variables you might have defined in the model (default True). Yields ------ pandas.DataFrame A DataFrame with dimensions (batch_size x n_r) containing a valid flux sample for a total of n_r reactions (or variables if fluxes=False) in each row. """ for _ in range(batch_num): yield self.sample(batch_size, fluxes=fluxes)
[docs] def validate(self, samples: np.matrix) -> np.ndarray: """Validate a set of samples for equality and inequality feasibility. Can be used to check whether the generated samples and warmup points are feasible. Parameters ---------- samples : numpy.matrix Must be of dimension (samples x n_reactions). Contains the samples to be validated. Samples must be from fluxes. Returns ------- numpy.array A one-dimensional numpy array containing a code of 1 to 3 letters denoting the validation result: - 'v' means feasible in bounds and equality constraints - 'l' means a lower bound violation - 'u' means a lower bound validation - 'e' means and equality constraint violation Raises ------ ValueError If wrong number of columns. """ samples = np.atleast_2d(samples) prob = self.problem if samples.shape[1] == len(self.model.reactions): S = create_stoichiometric_matrix(self.model) b = np.array( [self.model.constraints[].lb for m in self.model.metabolites] ) bounds = np.array([r.bounds for r in self.model.reactions]).T elif samples.shape[1] == len(self.model.variables): S = prob.equalities b = prob.b bounds = prob.variable_bounds else: raise ValueError( "Wrong number of columns. Samples must have a " "column for each flux or variable defined in the " "model." ) feasibility = np.abs( - b).max(axis=1) lb_error = ( samples - bounds[ 0, ] ).min(axis=1) ub_error = ( bounds[ 1, ] - samples ).min(axis=1) if samples.shape[1] == len(self.model.variables) and prob.inequalities.shape[0]: consts = lb_error = np.minimum( lb_error, ( consts - prob.bounds[ 0, ] ).min(axis=1), ) ub_error = np.minimum( ub_error, ( prob.bounds[ 1, ] - consts ).min(axis=1), ) valid = ( (feasibility < self.feasibility_tol) & (lb_error > -self.bounds_tol) & (ub_error > -self.bounds_tol) ) codes = np.repeat("", valid.shape[0]).astype(np.dtype((str, 3))) codes[valid] = "v" codes[lb_error <= -self.bounds_tol] = np.char.add( codes[lb_error <= -self.bounds_tol], "l" ) codes[ub_error <= -self.bounds_tol] = np.char.add( codes[ub_error <= -self.bounds_tol], "u" ) codes[feasibility > self.feasibility_tol] = np.char.add( codes[feasibility > self.feasibility_tol], "e" ) return codes
# Required by ACHRSampler and OptGPSampler # Has to be declared outside of class to be used for multiprocessing
[docs]def step( sampler: HRSampler, x: np.ndarray, delta: np.ndarray, fraction: Optional[float] = None, tries: int = 0, ) -> np.ndarray: """Sample a new feasible point from the point `x` in direction `delta`.""" prob = sampler.problem valid = (np.abs(delta) > sampler.feasibility_tol) & np.logical_not( prob.variable_fixed ) # permissible alphas for staying in variable bounds valphas = ((1.0 - sampler.bounds_tol) * prob.variable_bounds - x)[:, valid] valphas = (valphas / delta[valid]).flatten() if prob.bounds.shape[0] > 0: # permissible alphas for staying in constraint bounds ineqs = valid = np.abs(ineqs) > sampler.feasibility_tol balphas = ((1.0 - sampler.bounds_tol) * prob.bounds -[ :, valid ] balphas = (balphas / ineqs[valid]).flatten() # combined alphas alphas = np.hstack([valphas, balphas]) else: alphas = valphas pos_alphas = alphas[alphas > 0.0] neg_alphas = alphas[alphas <= 0.0] alpha_range = np.array( [ neg_alphas.max() if len(neg_alphas) > 0 else 0, pos_alphas.min() if len(pos_alphas) > 0 else 0, ] ) if fraction: alpha = alpha_range[0] + fraction * (alpha_range[1] - alpha_range[0]) else: alpha = np.random.uniform(alpha_range[0], alpha_range[1]) p = x + alpha * delta # Numerical instabilities may cause bounds invalidation # reset sampler and sample from one of the original warmup directions # if that occurs. Also reset if we got stuck. if ( np.any(sampler._bounds_dist(p) < -sampler.bounds_tol) or np.abs(np.abs(alpha_range).max() * delta).max() < sampler.bounds_tol ): if tries > MAX_TRIES: raise RuntimeError( "Cannot escape sampling region, model seems to be " "numerically unstable. Reporting the model to " " " "will help us to fix this." )"Found bounds infeasibility in sample, resetting to center.") newdir = sampler.warmup[np.random.randint(sampler.n_warmup)] sampler.retries += 1 return step(sampler,, newdir -, None, tries + 1) return p