# -*- coding: utf-8 -*-
"""Provide base class for Hit-and-Run samplers.
New samplers should derive from the abstract `HRSampler` class
where possible to provide a uniform interface."""
from __future__ import absolute_import, division
import ctypes
from collections import namedtuple
from logging import getLogger
from multiprocessing import Array
from time import time
import numpy as np
from optlang.interface import OPTIMAL
from optlang.symbolics import Zero
from cobra.util import (
constraint_matrices, create_stoichiometric_matrix, nullspace)
[docs]LOGGER = getLogger(__name__)
# Maximum number of retries for sampling
[docs]Problem = namedtuple("Problem",
["equalities", "b", "inequalities", "bounds",
"variable_fixed", "variable_bounds", "nullspace",
"homogeneous"])
"""Defines the matrix representation of a sampling problem.
Attributes
----------
equalities : numpy.array
All equality constraints in the model.
b : numpy.array
The right side of the equality constraints.
inequalities : numpy.array
All inequality constraints in the model.
bounds : numpy.array
The lower and upper bounds for the inequality constraints.
variable_bounds : numpy.array
The lower and upper bounds for the variables.
homogeneous: boolean
Indicates whether the sampling problem is homogenous, e.g. whether there
exist no non-zero fixed variables or constraints.
nullspace : numpy.matrix
A matrix containing the nullspace of the equality constraints. Each column
is one basis vector.
"""
[docs]def shared_np_array(shape, data=None, integer=False):
"""Create a new numpy array that resides in shared memory.
Parameters
----------
shape : tuple of ints
The shape of the new array.
data : numpy.array
Data to copy to the new array. Has to have the same shape.
integer : boolean
Whether to use an integer array. Defaults to False which means
float array.
"""
size = np.prod(shape)
if integer:
array = Array(ctypes.c_int64, int(size))
np_array = np.frombuffer(array.get_obj(), dtype="int64")
else:
array = Array(ctypes.c_double, int(size))
np_array = np.frombuffer(array.get_obj())
np_array = np_array.reshape(shape)
if data is not None:
if len(shape) != len(data.shape):
raise ValueError("`data` must have the same dimensions"
"as the created array.")
same = all(x == y for x, y in zip(shape, data.shape))
if not same:
raise ValueError("`data` must have the same shape"
"as the created array.")
np_array[:] = data
return np_array
[docs]class HRSampler(object):
"""The abstract base class for hit-and-run samplers.
Parameters
----------
model : cobra.Model
The cobra model from which to generate samples.
thinning : int
The thinning factor of the generated sampling chain. A thinning of 10
means samples are returned every 10 steps.
nproj : int > 0, optional
How often to reproject the sampling point into the feasibility space.
Avoids numerical issues at the cost of lower sampling. If you observe
many equality constraint violations with `sampler.validate` you should
lower this number.
seed : int > 0, optional
The random number seed that should be used.
Attributes
----------
model : cobra.Model
The cobra model from which the sampes get generated.
feasibility_tol: float
The tolerance used for checking equalities feasibility.
bounds_tol: float
The tolerance used for checking bounds feasibility.
thinning : int
The currently used thinning factor.
n_samples : int
The total number of samples that have been generated by this
sampler instance.
retries : int
The overall of sampling retries the sampler has observed. Larger
values indicate numerical instabilities.
problem : collections.namedtuple
A python object whose attributes define the entire sampling problem in
matrix form. See docstring of `Problem`.
warmup : numpy.matrix
A matrix of with as many columns as reactions in the model and more
than 3 rows containing a warmup sample in each row. None if no warmup
points have been generated yet.
nproj : int
How often to reproject the sampling point into the feasibility space.
seed : int > 0, optional
Sets the random number seed. Initialized to the current time stamp if
None.
fwd_idx : numpy.array
Has one entry for each reaction in the model containing the index of
the respective forward variable.
rev_idx : numpy.array
Has one entry for each reaction in the model containing the index of
the respective reverse variable.
"""
def __init__(self, model, thinning, nproj=None, seed=None):
"""Initialize a new sampler object."""
# This currently has to be done to reset the solver basis which is
# required to get deterministic warmup point generation
# (in turn required for a working `seed` arg)
if model.solver.is_integer:
raise TypeError("sampling does not work with integer problems :(")
self.model = model.copy()
self.feasibility_tol = model.tolerance
self.bounds_tol = model.tolerance
self.thinning = thinning
if nproj is None:
self.nproj = int(min(len(self.model.variables)**3, 1e6))
else:
self.nproj = nproj
self.n_samples = 0
self.retries = 0
self.problem = self.__build_problem()
# Set up a map from reaction -> forward/reverse variable
var_idx = {v: idx for idx, v in enumerate(self.model.variables)}
self.fwd_idx = np.array([var_idx[r.forward_variable]
for r in self.model.reactions])
self.rev_idx = np.array([var_idx[r.reverse_variable]
for r in self.model.reactions])
self.warmup = None
if seed is None:
self._seed = int(time())
else:
self._seed = seed
# Avoid overflow
self._seed = self._seed % np.iinfo(np.int32).max
[docs] def __build_problem(self):
"""Build the matrix representation of the sampling problem."""
# Set up the mathematical problem
prob = constraint_matrices(self.model, zero_tol=self.feasibility_tol)
# check if there any non-zero equality constraints
equalities = prob.equalities
b = prob.b
bounds = np.atleast_2d(prob.bounds).T
var_bounds = np.atleast_2d(prob.variable_bounds).T
homogeneous = all(np.abs(b) < self.feasibility_tol)
fixed_non_zero = np.abs(prob.variable_bounds[:, 1]) > \
self.feasibility_tol
fixed_non_zero &= prob.variable_fixed
# check if there are any non-zero fixed variables, add them as
# equalities to the stoichiometric matrix
if any(fixed_non_zero):
n_fixed = fixed_non_zero.sum()
rows = np.zeros((n_fixed, prob.equalities.shape[1]))
rows[range(n_fixed), np.where(fixed_non_zero)] = 1.0
equalities = np.vstack([equalities, rows])
var_b = prob.variable_bounds[:, 1]
b = np.hstack([b, var_b[fixed_non_zero]])
homogeneous = False
# Set up a projection that can cast point into the nullspace
nulls = nullspace(equalities)
# convert bounds to a matrix and add variable bounds as well
return Problem(
equalities=shared_np_array(equalities.shape, equalities),
b=shared_np_array(b.shape, b),
inequalities=shared_np_array(prob.inequalities.shape,
prob.inequalities),
bounds=shared_np_array(bounds.shape, bounds),
variable_fixed=shared_np_array(prob.variable_fixed.shape,
prob.variable_fixed, integer=True),
variable_bounds=shared_np_array(var_bounds.shape, var_bounds),
nullspace=shared_np_array(nulls.shape, nulls),
homogeneous=homogeneous
)
[docs] def generate_fva_warmup(self):
"""Generate the warmup points for the sampler.
Generates warmup points by setting each flux as the sole objective
and minimizing/maximizing it. Also caches the projection of the
warmup points into the nullspace for non-homogeneous problems (only
if necessary).
"""
self.n_warmup = 0
reactions = self.model.reactions
self.warmup = np.zeros((2 * len(reactions), len(self.model.variables)))
self.model.objective = Zero
for sense in ("min", "max"):
self.model.objective_direction = sense
for i, r in enumerate(reactions):
variables = (self.model.variables[self.fwd_idx[i]],
self.model.variables[self.rev_idx[i]])
# Omit fixed reactions if they are non-homogeneous
if r.upper_bound - r.lower_bound < self.bounds_tol:
LOGGER.info("skipping fixed reaction %s" % r.id)
continue
self.model.objective.set_linear_coefficients(
{variables[0]: 1, variables[1]: -1})
self.model.slim_optimize()
if not self.model.solver.status == OPTIMAL:
LOGGER.info("can not maximize reaction %s, skipping it" %
r.id)
continue
primals = self.model.solver.primal_values
sol = [primals[v.name] for v in self.model.variables]
self.warmup[self.n_warmup, ] = sol
self.n_warmup += 1
# Reset objective
self.model.objective.set_linear_coefficients(
{variables[0]: 0, variables[1]: 0})
# Shrink to measure
self.warmup = self.warmup[0:self.n_warmup, :]
# Remove redundant search directions
keep = np.logical_not(self._is_redundant(self.warmup))
self.warmup = self.warmup[keep, :]
self.n_warmup = self.warmup.shape[0]
# Catch some special cases
if len(self.warmup.shape) == 1 or self.warmup.shape[0] == 1:
raise ValueError("Your flux cone consists only of a single point!")
elif self.n_warmup == 2:
if not self.problem.homogeneous:
raise ValueError("Can not sample from an inhomogenous problem"
" with only 2 search directions :(")
LOGGER.info("All search directions on a line, adding another one.")
newdir = self.warmup.T.dot([0.25, 0.25])
self.warmup = np.vstack([self.warmup, newdir])
self.n_warmup += 1
# Shrink warmup points to measure
self.warmup = shared_np_array(
(self.n_warmup, len(self.model.variables)), self.warmup)
[docs] def _reproject(self, p):
"""Reproject a point into the feasibility region.
This function is guaranteed to return a new feasible point. However,
no guarantees in terms of proximity to the original point can be made.
Parameters
----------
p : numpy.array
The current sample point.
Returns
-------
numpy.array
A new feasible point. If `p` was feasible it wil return p.
"""
nulls = self.problem.nullspace
equalities = self.problem.equalities
# don't reproject if point is feasible
if np.allclose(equalities.dot(p), self.problem.b,
rtol=0, atol=self.feasibility_tol):
new = p
else:
LOGGER.info("feasibility violated in sample"
" %d, trying to reproject" % self.n_samples)
new = nulls.dot(nulls.T.dot(p))
# Projections may violate bounds
# set to random point in space in that case
if any(new != p):
LOGGER.info("reprojection failed in sample"
" %d, using random point in space" % self.n_samples)
new = self._random_point()
return new
[docs] def _random_point(self):
"""Find an approximately random point in the flux cone."""
idx = np.random.randint(self.n_warmup,
size=min(2, np.ceil(np.sqrt(self.n_warmup))))
return self.warmup[idx, :].mean(axis=0)
[docs] def _is_redundant(self, matrix, cutoff=None):
"""Identify rdeundant rows in a matrix that can be removed."""
cutoff = 1.0 - self.feasibility_tol
# Avoid zero variances
extra_col = matrix[:, 0] + 1
# Avoid zero rows being correlated with constant rows
extra_col[matrix.sum(axis=1) == 0] = 2
corr = np.corrcoef(np.c_[matrix, extra_col])
corr = np.tril(corr, -1)
return (np.abs(corr) > cutoff).any(axis=1)
[docs] def _bounds_dist(self, p):
"""Get the lower and upper bound distances. Negative is bad."""
prob = self.problem
lb_dist = (p - prob.variable_bounds[0, ]).min()
ub_dist = (prob.variable_bounds[1, ] - p).min()
if prob.bounds.shape[0] > 0:
const = prob.inequalities.dot(p)
const_lb_dist = (const - prob.bounds[0, ]).min()
const_ub_dist = (prob.bounds[1, ] - const).min()
lb_dist = min(lb_dist, const_lb_dist)
ub_dist = min(ub_dist, const_ub_dist)
return np.array([lb_dist, ub_dist])
[docs] def sample(self, n, fluxes=True):
"""Abstract sampling function.
Should be overwritten by child classes.
"""
pass
[docs] def batch(self, batch_size, batch_num, fluxes=True):
"""Create a batch generator.
This is useful to generate n batches of m samples each.
Parameters
----------
batch_size : int
The number of samples contained in each batch (m).
batch_num : int
The number of batches in the generator (n).
fluxes : boolean
Whether to return fluxes or the internal solver variables. If set
to False will return a variable for each forward and backward flux
as well as all additional variables you might have defined in the
model.
Yields
------
pandas.DataFrame
A DataFrame with dimensions (batch_size x n_r) containing
a valid flux sample for a total of n_r reactions (or variables if
fluxes=False) in each row.
"""
for i in range(batch_num):
yield self.sample(batch_size, fluxes=fluxes)
[docs] def validate(self, samples):
"""Validate a set of samples for equality and inequality feasibility.
Can be used to check whether the generated samples and warmup points
are feasible.
Parameters
----------
samples : numpy.matrix
Must be of dimension (n_samples x n_reactions). Contains the
samples to be validated. Samples must be from fluxes.
Returns
-------
numpy.array
A one-dimensional numpy array of length containing
a code of 1 to 3 letters denoting the validation result:
- 'v' means feasible in bounds and equality constraints
- 'l' means a lower bound violation
- 'u' means a lower bound validation
- 'e' means and equality constraint violation
"""
samples = np.atleast_2d(samples)
prob = self.problem
if samples.shape[1] == len(self.model.reactions):
S = create_stoichiometric_matrix(self.model)
b = np.array([self.model.constraints[m.id].lb for m in
self.model.metabolites])
bounds = np.array([r.bounds for r in self.model.reactions]).T
elif samples.shape[1] == len(self.model.variables):
S = prob.equalities
b = prob.b
bounds = prob.variable_bounds
else:
raise ValueError("Wrong number of columns. samples must have a "
"column for each flux or variable defined in the "
"model!")
feasibility = np.abs(S.dot(samples.T).T - b).max(axis=1)
lb_error = (samples - bounds[0, ]).min(axis=1)
ub_error = (bounds[1, ] - samples).min(axis=1)
if (samples.shape[1] == len(self.model.variables) and
prob.inequalities.shape[0]):
consts = prob.inequalities.dot(samples.T)
lb_error = np.minimum(
lb_error,
(consts - prob.bounds[0, ]).min(axis=1))
ub_error = np.minimum(
ub_error,
(prob.bounds[1, ] - consts).min(axis=1)
)
valid = (
(feasibility < self.feasibility_tol) &
(lb_error > -self.bounds_tol) &
(ub_error > -self.bounds_tol))
codes = np.repeat("", valid.shape[0]).astype(np.dtype((str, 3)))
codes[valid] = "v"
codes[lb_error <= -self.bounds_tol] = np.char.add(
codes[lb_error <= -self.bounds_tol], "l")
codes[ub_error <= -self.bounds_tol] = np.char.add(
codes[ub_error <= -self.bounds_tol], "u")
codes[feasibility > self.feasibility_tol] = np.char.add(
codes[feasibility > self.feasibility_tol], "e")
return codes
# Required by ACHRSampler and OptGPSampler
# Has to be declared outside of class to be used for multiprocessing :(
[docs]def step(sampler, x, delta, fraction=None, tries=0):
"""Sample a new feasible point from the point `x` in direction `delta`."""
prob = sampler.problem
valid = ((np.abs(delta) > sampler.feasibility_tol) &
np.logical_not(prob.variable_fixed))
# permissible alphas for staying in variable bounds
valphas = ((1.0 - sampler.bounds_tol) * prob.variable_bounds -
x)[:, valid]
valphas = (valphas / delta[valid]).flatten()
if prob.bounds.shape[0] > 0:
# permissible alphas for staying in constraint bounds
ineqs = prob.inequalities.dot(delta)
valid = np.abs(ineqs) > sampler.feasibility_tol
balphas = ((1.0 - sampler.bounds_tol) * prob.bounds -
prob.inequalities.dot(x))[:, valid]
balphas = (balphas / ineqs[valid]).flatten()
# combined alphas
alphas = np.hstack([valphas, balphas])
else:
alphas = valphas
pos_alphas = alphas[alphas > 0.0]
neg_alphas = alphas[alphas <= 0.0]
alpha_range = np.array([neg_alphas.max() if len(neg_alphas) > 0 else 0,
pos_alphas.min() if len(pos_alphas) > 0 else 0])
if fraction:
alpha = alpha_range[0] + fraction * (alpha_range[1] - alpha_range[0])
else:
alpha = np.random.uniform(alpha_range[0], alpha_range[1])
p = x + alpha * delta
# Numerical instabilities may cause bounds invalidation
# reset sampler and sample from one of the original warmup directions
# if that occurs. Also reset if we got stuck.
if (np.any(sampler._bounds_dist(p) < -sampler.bounds_tol) or
np.abs(np.abs(alpha_range).max() * delta).max() <
sampler.bounds_tol):
if tries > MAX_TRIES:
raise RuntimeError("Can not escape sampling region, model seems"
" numerically unstable :( Reporting the "
"model to "
"https://github.com/opencobra/cobrapy/issues "
"will help us to fix this :)")
LOGGER.info("found bounds infeasibility in sample, "
"resetting to center")
newdir = sampler.warmup[np.random.randint(sampler.n_warmup)]
sampler.retries += 1
return step(sampler, sampler.center, newdir - sampler.center, None,
tries + 1)
return p