"""Provide the base class and associated functions for Hit-and-Run samplers."""
import ctypes
import logging
from abc import ABC, abstractmethod
from multiprocessing import Array
from time import time
from typing import TYPE_CHECKING, NamedTuple, Optional, Tuple
import numpy as np
import pandas as pd
from optlang.interface import OPTIMAL
from optlang.symbolics import Zero
from cobra.util import constraint_matrices, create_stoichiometric_matrix, nullspace
if TYPE_CHECKING:
from cobra import Model
[docs]logger = logging.getLogger(__name__)
[docs]Problem = NamedTuple(
"Problem",
[
("equalities", np.ndarray),
("b", np.ndarray),
("inequalities", np.ndarray),
("bounds", np.ndarray),
("variable_fixed", np.ndarray),
("variable_bounds", np.ndarray),
("nullspace", np.matrix),
("homogeneous", bool),
],
)
"""Define the matrix representation of a sampling problem.
A named tuple consisting of 6 arrays, 1 matrix and 1 boolean.
Attributes
----------
equalities : numpy.array
All equality constraints in the model.
b : numpy.array
The right side of the equality constraints.
inequalities : numpy.array
All inequality constraints in the model.
bounds : numpy.array
The lower and upper bounds for the inequality constraints.
variable_fixed : numpy.array
A boolean vector indicating whether the variable at that index is
fixed i.e., whether `variable.lower_bound == variable.upper_bound`.
variable_bounds : numpy.array
The lower and upper bounds for the variables.
nullspace : numpy.matrix
A matrix containing the nullspace of the equality constraints.
Each column is one basis vector.
homogeneous: bool
Indicates whether the sampling problem is homogeneous, e.g. whether
there exist no non-zero fixed variables or constraints.
"""
[docs]def shared_np_array(
shape: Tuple[int, int], data: Optional[np.ndarray] = None, integer: bool = False
) -> np.ndarray:
"""Create a new numpy array that resides in shared memory.
Parameters
----------
shape : tuple of int
The shape of the new array.
data : numpy.array, optional
Data to copy to the new array. Has to have the same shape
(default None).
integer : bool, optional
Whether to use an integer array. By default, float array is used
(default False).
Returns
-------
numpy.array
The newly created shared numpy array.
Raises
------
ValueError
If the input `data` (if provided) size is not equal to the created
array.
"""
size = np.prod(shape)
if integer:
array = Array(ctypes.c_int64, int(size))
np_array = np.frombuffer(array.get_obj(), dtype="int64")
else:
array = Array(ctypes.c_double, int(size))
np_array = np.frombuffer(array.get_obj())
np_array = np_array.reshape(shape)
if data is not None:
if len(shape) != len(data.shape):
raise ValueError("`data` must have the same shape as the created array.")
same = all(x == y for x, y in zip(shape, data.shape))
if not same:
raise ValueError("`data` must have the same shape as the created array.")
np_array[:] = data
return np_array
[docs]class HRSampler(ABC):
"""
The abstract base class for hit-and-run samplers.
New samplers should derive from this class where possible to provide
a uniform interface.
Parameters
----------
model : cobra.Model
The cobra model from which to generate samples.
thinning : int
The thinning factor of the generated sampling chain. A thinning of
10 means samples are returned every 10 steps.
nproj : int > 0, optional
How often to reproject the sampling point into the feasibility
space. Avoids numerical issues at the cost of lower sampling. If
you observe many equality constraint violations with
`sampler.validate` you should lower this number (default None).
seed : int > 0, optional
Sets the random number seed. Initialized to the current time stamp
if None (default None).
Attributes
----------
feasibility_tol: float
The tolerance used for checking equalities feasibility.
bounds_tol: float
The tolerance used for checking bounds feasibility.
n_samples : int
The total number of samples that have been generated by this
sampler instance.
retries : int
The overall of sampling retries the sampler has observed. Larger
values indicate numerical instabilities.
problem : Problem
A NamedTuple whose attributes define the entire sampling problem in
matrix form.
warmup : numpy.matrix
A numpy matrix with as many columns as reactions in the model and
more than 3 rows containing a warmup sample in each row. None if no
warmup points have been generated yet.
fwd_idx : numpy.array
A numpy array having one entry for each reaction in the model,
containing the index of the respective forward variable.
rev_idx : numpy.array
A numpy array having one entry for each reaction in the model,
containing the index of the respective reverse variable.
"""
def __init__(
self,
model: "Model",
thinning: int,
nproj: Optional[int] = None,
seed: Optional[int] = None,
**kwargs,
) -> None:
"""Initialize a new sampler object.
Other Parameters
----------------
kwargs :
Further keyword arguments are passed on to the parent class.
Raises
------
TypeError
If integer problem is found.
"""
# This currently has to be done to reset the solver basis which is
# required to get deterministic warmup point generation
# (in turn required for a working `seed`)
if model.solver.is_integer:
raise TypeError("Sampling does not work with integer problems.")
self.model = model.copy()
self.feasibility_tol = model.tolerance
self.bounds_tol = model.tolerance
self.thinning = thinning
if nproj is None:
self.nproj = int(min(len(self.model.variables) ** 3, 1e6))
else:
self.nproj = nproj
self.n_samples = 0
self.retries = 0
self.problem = self.__build_problem()
# Set up a map from reaction -> forward/reverse variable
var_idx = {v: idx for idx, v in enumerate(self.model.variables)}
self.fwd_idx = np.array(
[var_idx[r.forward_variable] for r in self.model.reactions]
)
self.rev_idx = np.array(
[var_idx[r.reverse_variable] for r in self.model.reactions]
)
self.warmup = None
if seed is None:
self._seed = int(time())
else:
self._seed = seed
# Avoid overflow
self._seed = self._seed % np.iinfo(np.int32).max
[docs] def __build_problem(self) -> Problem:
"""Build the matrix representation of the sampling problem.
Returns
-------
Problem
The matrix representation in the form of a NamedTuple.
"""
# Set up the mathematical problem
prob = constraint_matrices(self.model, zero_tol=self.feasibility_tol)
# check if there any non-zero equality constraints
equalities = prob.equalities
b = prob.b
bounds = np.atleast_2d(prob.bounds).T
var_bounds = np.atleast_2d(prob.variable_bounds).T
homogeneous = all(np.abs(b) < self.feasibility_tol)
fixed_non_zero = np.abs(prob.variable_bounds[:, 1]) > self.feasibility_tol
fixed_non_zero &= prob.variable_fixed
# check if there are any non-zero fixed variables, add them as
# equalities to the stoichiometric matrix
if any(fixed_non_zero):
n_fixed = fixed_non_zero.sum()
rows = np.zeros((n_fixed, prob.equalities.shape[1]))
rows[range(n_fixed), np.where(fixed_non_zero)] = 1.0
equalities = np.vstack([equalities, rows])
var_b = prob.variable_bounds[:, 1]
b = np.hstack([b, var_b[fixed_non_zero]])
homogeneous = False
# Set up a projection that can cast point into the nullspace
nulls = nullspace(equalities)
# convert bounds to a matrix and add variable bounds as well
return Problem(
equalities=shared_np_array(equalities.shape, equalities),
b=shared_np_array(b.shape, b),
inequalities=shared_np_array(prob.inequalities.shape, prob.inequalities),
bounds=shared_np_array(bounds.shape, bounds),
variable_fixed=shared_np_array(
prob.variable_fixed.shape, prob.variable_fixed, integer=True
),
variable_bounds=shared_np_array(var_bounds.shape, var_bounds),
nullspace=shared_np_array(nulls.shape, nulls),
homogeneous=homogeneous,
)
[docs] def generate_fva_warmup(self) -> None:
"""Generate the warmup points for the sampler.
Generates warmup points by setting each flux as the sole objective
and minimizing/maximizing it. Also caches the projection of the
warmup points into the nullspace for non-homogeneous problems (only
if necessary).
Raises
------
ValueError
If flux cone contains a single point or the problem is
inhomogeneous.
"""
self.n_warmup = 0
reactions = self.model.reactions
self.warmup = np.zeros((2 * len(reactions), len(self.model.variables)))
self.model.objective = Zero
for sense in ("min", "max"):
self.model.objective_direction = sense
for i, r in enumerate(reactions):
variables = (
self.model.variables[self.fwd_idx[i]],
self.model.variables[self.rev_idx[i]],
)
# Omit fixed reactions if they are non-homogeneous
if r.upper_bound - r.lower_bound < self.bounds_tol:
logger.info(f"Skipping fixed reaction {r.id}")
continue
self.model.objective.set_linear_coefficients(
{variables[0]: 1, variables[1]: -1}
)
self.model.slim_optimize()
if not self.model.solver.status == OPTIMAL:
logger.info(f"Cannot maximize reaction {r.id}, skipping it.")
continue
primals = self.model.solver.primal_values
sol = [primals[v.name] for v in self.model.variables]
self.warmup[
self.n_warmup,
] = sol
self.n_warmup += 1
# Reset objective
self.model.objective.set_linear_coefficients(
{variables[0]: 0, variables[1]: 0}
)
# Shrink to measure
self.warmup = self.warmup[0 : self.n_warmup, :]
# Remove redundant search directions
keep = np.logical_not(self._is_redundant(self.warmup))
self.warmup = self.warmup[keep, :]
self.n_warmup = self.warmup.shape[0]
# Catch some special cases
if len(self.warmup.shape) == 1 or self.warmup.shape[0] == 1:
raise ValueError("Flux cone only consists a single point.")
elif self.n_warmup == 2:
if not self.problem.homogeneous:
raise ValueError(
"Cannot sample from an inhomogenous problem "
"with only 2 search directions."
)
logger.info("All search directions on a line, adding another one.")
newdir = self.warmup.T.dot([0.25, 0.25])
self.warmup = np.vstack([self.warmup, newdir])
self.n_warmup += 1
# Shrink warmup points to measure
self.warmup = shared_np_array(
(self.n_warmup, len(self.model.variables)), self.warmup
)
[docs] def _reproject(self, p: np.ndarray) -> np.ndarray:
"""Reproject a point into the feasibility region.
This function is guaranteed to return a new feasible point. However,
no guarantee can be made in terms of proximity to the original
point.
Parameters
----------
p : numpy.array
The current sample point.
Returns
-------
numpy.array
A new feasible point. If `p` is feasible, it will return `p`.
"""
nulls = self.problem.nullspace
equalities = self.problem.equalities
# don't reproject if point is feasible
if np.allclose(
equalities.dot(p), self.problem.b, rtol=0, atol=self.feasibility_tol
):
new = p
else:
logger.info(
f"Feasibility violated in sample {self.n_samples}, trying to reproject."
)
new = nulls.dot(nulls.T.dot(p))
# Projections may violate bounds
# set to random point in space in that case
if any(new != p):
logger.info(
f"Re-projection failed in sample {self.n_samples}, "
"using random point in space."
)
new = self._random_point()
return new
[docs] def _random_point(self) -> np.ndarray:
"""Find an approximately random point in the flux cone."""
idx = np.random.randint(
self.n_warmup, size=min(2, np.ceil(np.sqrt(self.n_warmup)))
)
return self.warmup[idx, :].mean(axis=0)
[docs] def _is_redundant(self, matrix: np.matrix, cutoff: Optional[float] = None) -> bool:
"""Identify redundant rows in a matrix that can be removed."""
cutoff = 1.0 - self.feasibility_tol
# Avoid zero variances
extra_col = matrix[:, 0] + 1
# Avoid zero rows being correlated with constant rows
extra_col[matrix.sum(axis=1) == 0] = 2
corr = np.corrcoef(np.c_[matrix, extra_col])
corr = np.tril(corr, -1)
return (np.abs(corr) > cutoff).any(axis=1)
[docs] def _bounds_dist(self, p: np.ndarray) -> np.ndarray:
"""Get the lower and upper bound distances. Negative is bad."""
prob = self.problem
lb_dist = (
p
- prob.variable_bounds[
0,
]
).min()
ub_dist = (
prob.variable_bounds[
1,
]
- p
).min()
if prob.bounds.shape[0] > 0:
const = prob.inequalities.dot(p)
const_lb_dist = (
const
- prob.bounds[
0,
]
).min()
const_ub_dist = (
prob.bounds[
1,
]
- const
).min()
lb_dist = min(lb_dist, const_lb_dist)
ub_dist = min(ub_dist, const_ub_dist)
return np.array([lb_dist, ub_dist])
@abstractmethod
[docs] def sample(self, n: int, fluxes: bool = True) -> pd.DataFrame:
"""Abstract sampling function.
Should be overwritten by child classes.
Parameters
----------
n : int
The number of samples that are generated at once.
fluxes : bool, optional
Whether to return fluxes or the internal solver variables. If
set to False, will return a variable for each forward and
backward flux as well as all additional variables you might
have defined in the model (default True).
Returns
-------
pandas.DataFrame
Returns a pandas DataFrame with `n` rows, each containing a
flux sample.
"""
raise NotImplementedError(
"This method needs to be implemented by the subclass."
)
[docs] def batch(
self, batch_size: int, batch_num: int, fluxes: bool = True
) -> pd.DataFrame:
"""Create a batch generator.
This is useful to generate `batch_num` batches of `batch_size`
samples each.
Parameters
----------
batch_size : int
The number of samples contained in each batch.
batch_num : int
The number of batches in the generator.
fluxes : bool, optional
Whether to return fluxes or the internal solver variables. If
set to False, will return a variable for each forward and
backward flux as well as all additional variables you might
have defined in the model (default True).
Yields
------
pandas.DataFrame
A DataFrame with dimensions (batch_size x n_r) containing
a valid flux sample for a total of n_r reactions (or variables
if fluxes=False) in each row.
"""
for _ in range(batch_num):
yield self.sample(batch_size, fluxes=fluxes)
[docs] def validate(self, samples: np.matrix) -> np.ndarray:
"""Validate a set of samples for equality and inequality feasibility.
Can be used to check whether the generated samples and warmup points
are feasible.
Parameters
----------
samples : numpy.matrix
Must be of dimension (samples x n_reactions). Contains the
samples to be validated. Samples must be from fluxes.
Returns
-------
numpy.array
A one-dimensional numpy array containing
a code of 1 to 3 letters denoting the validation result:
- 'v' means feasible in bounds and equality constraints
- 'l' means a lower bound violation
- 'u' means a lower bound validation
- 'e' means and equality constraint violation
Raises
------
ValueError
If wrong number of columns.
"""
samples = np.atleast_2d(samples)
prob = self.problem
if samples.shape[1] == len(self.model.reactions):
S = create_stoichiometric_matrix(self.model)
b = np.array(
[self.model.constraints[m.id].lb for m in self.model.metabolites]
)
bounds = np.array([r.bounds for r in self.model.reactions]).T
elif samples.shape[1] == len(self.model.variables):
S = prob.equalities
b = prob.b
bounds = prob.variable_bounds
else:
raise ValueError(
"Wrong number of columns. Samples must have a "
"column for each flux or variable defined in the "
"model."
)
feasibility = np.abs(S.dot(samples.T).T - b).max(axis=1)
lb_error = (
samples
- bounds[
0,
]
).min(axis=1)
ub_error = (
bounds[
1,
]
- samples
).min(axis=1)
if samples.shape[1] == len(self.model.variables) and prob.inequalities.shape[0]:
consts = prob.inequalities.dot(samples.T)
lb_error = np.minimum(
lb_error,
(
consts
- prob.bounds[
0,
]
).min(axis=1),
)
ub_error = np.minimum(
ub_error,
(
prob.bounds[
1,
]
- consts
).min(axis=1),
)
valid = (
(feasibility < self.feasibility_tol)
& (lb_error > -self.bounds_tol)
& (ub_error > -self.bounds_tol)
)
codes = np.repeat("", valid.shape[0]).astype(np.dtype((str, 3)))
codes[valid] = "v"
codes[lb_error <= -self.bounds_tol] = np.char.add(
codes[lb_error <= -self.bounds_tol], "l"
)
codes[ub_error <= -self.bounds_tol] = np.char.add(
codes[ub_error <= -self.bounds_tol], "u"
)
codes[feasibility > self.feasibility_tol] = np.char.add(
codes[feasibility > self.feasibility_tol], "e"
)
return codes