# -*- coding: utf-8 -*-
"""Module implementing flux sampling for cobra models.
New samplers should derive from the abstract `HRSampler` class
where possible to provide a uniform interface.
"""
from __future__ import absolute_import, division
import ctypes
from collections import namedtuple
from logging import getLogger
from multiprocessing import Array, Pool
from time import time
import numpy as np
import pandas
from optlang.interface import OPTIMAL
from optlang.symbolics import Zero
from cobra.util import (create_stoichiometric_matrix, constraint_matrices,
nullspace)
LOGGER = getLogger(__name__)
"""The logger for the package."""
bounds_tol = 1e-6
"""The tolerance used for checking bounds feasibility."""
feasibility_tol = 1e-6
"""The tolerance used for checking equalities feasibility."""
max_tries = 100
"""Maximum number of retries for sampling."""
Problem = namedtuple("Problem",
["equalities", "b", "inequalities", "bounds",
"variable_fixed", "variable_bounds", "nullspace",
"homogeneous"])
"""Defines the matrix representation of a sampling problem.
Attributes
----------
equalities : numpy.array
All equality constraints in the model.
b : numpy.array
The right side of the equality constraints.
inequalities : numpy.array
All inequality constraints in the model.
bounds : numpy.array
The lower and upper bounds for the inequality constraints.
variable_bounds : numpy.array
The lower and upper bounds for the variables.
homogeneous: boolean
Indicates whether the sampling problem is homogenous, e.g. whether there
exist no non-zero fixed variables or constraints.
nullspace : numpy.matrix
A matrix containing the nullspace of the equality constraints. Each column
is one basis vector.
"""
[docs]def mp_init(obj):
"""Initialize the multiprocessing pool."""
global sampler
sampler = obj
[docs]def shared_np_array(shape, data=None, integer=False):
"""Create a new numpy array that resides in shared memory.
Parameters
----------
shape : tuple of ints
The shape of the new array.
data : numpy.array
Data to copy to the new array. Has to have the same shape.
integer : boolean
Whether to use an integer array. Defaults to False which means
float array.
"""
size = np.prod(shape)
if integer:
array = Array(ctypes.c_int64, int(size))
np_array = np.frombuffer(array.get_obj(), dtype="int64")
else:
array = Array(ctypes.c_double, int(size))
np_array = np.frombuffer(array.get_obj())
np_array = np_array.reshape(shape)
if data is not None:
if len(shape) != len(data.shape):
raise ValueError("`data` must have the same dimensions"
"as the created array.")
same = all(x == y for x, y in zip(shape, data.shape))
if not same:
raise ValueError("`data` must have the same shape"
"as the created array.")
np_array[:] = data
return np_array
# Has to be declared outside of class to be used for multiprocessing :(
[docs]def _step(sampler, x, delta, fraction=None, tries=0):
"""Sample a new feasible point from the point `x` in direction `delta`."""
prob = sampler.problem
valid = ((np.abs(delta) > feasibility_tol) &
np.logical_not(prob.variable_fixed))
# permissible alphas for staying in variable bounds
valphas = ((1.0 - bounds_tol) * prob.variable_bounds - x)[:, valid]
valphas = (valphas / delta[valid]).flatten()
if prob.bounds.shape[0] > 0:
# permissible alphas for staying in constraint bounds
ineqs = prob.inequalities.dot(delta)
valid = np.abs(ineqs) > feasibility_tol
balphas = ((1.0 - bounds_tol) * prob.bounds -
prob.inequalities.dot(x))[:, valid]
balphas = (balphas / ineqs[valid]).flatten()
# combined alphas
alphas = np.hstack([valphas, balphas])
else:
alphas = valphas
pos_alphas = alphas[alphas > 0.0]
neg_alphas = alphas[alphas <= 0.0]
alpha_range = np.array([neg_alphas.max() if len(neg_alphas) > 0 else 0,
pos_alphas.min() if len(pos_alphas) > 0 else 0])
if fraction:
alpha = alpha_range[0] + fraction * (alpha_range[1] - alpha_range[0])
else:
alpha = np.random.uniform(alpha_range[0], alpha_range[1])
p = x + alpha * delta
# Numerical instabilities may cause bounds invalidation
# reset sampler and sample from one of the original warmup directions
# if that occurs. Also reset if we got stuck.
if (np.any(sampler._bounds_dist(p) < -bounds_tol) or
np.abs(np.abs(alpha_range).max() * delta).max() < bounds_tol):
if tries > max_tries:
raise RuntimeError("Can not escape sampling region, model seems"
" numerically unstable :( Reporting the "
"model to "
"https://github.com/opencobra/cobrapy/issues "
"will help us to fix this :)")
LOGGER.info("found bounds infeasibility in sample, "
"resetting to center")
newdir = sampler.warmup[np.random.randint(sampler.n_warmup)]
sampler.retries += 1
return _step(sampler, sampler.center, newdir - sampler.center, None,
tries + 1)
return p
[docs]class HRSampler(object):
"""The abstract base class for hit-and-run samplers.
Parameters
----------
model : cobra.Model
The cobra model from which to generate samples.
thinning : int
The thinning factor of the generated sampling chain. A thinning of 10
means samples are returned every 10 steps.
nproj : int > 0, optional
How often to reproject the sampling point into the feasibility space.
Avoids numerical issues at the cost of lower sampling. If you observe
many equality constraint violations with `sampler.validate` you should
lower this number.
seed : int > 0, optional
The random number seed that should be used.
Attributes
----------
model : cobra.Model
The cobra model from which the samples get generated.
thinning : int
The currently used thinning factor.
n_samples : int
The total number of samples that have been generated by this
sampler instance.
retries : int
The overall of sampling retries the sampler has observed. Larger
values indicate numerical instabilities.
problem : collections.namedtuple
A python object whose attributes define the entire sampling problem in
matrix form. See docstring of `Problem`.
warmup : a numpy matrix
A matrix of with as many columns as reactions in the model and more
than 3 rows containing a warmup sample in each row. None if no warmup
points have been generated yet.
nproj : int
How often to reproject the sampling point into the feasibility space.
seed : positive integer, optional
Sets the random number seed. Initialized to the current time stamp if
None.
fwd_idx : np.array
Has one entry for each reaction in the model containing the index of
the respective forward variable.
rev_idx : np.array
Has one entry for each reaction in the model containing the index of
the respective reverse variable.
"""
[docs] def __init__(self, model, thinning, nproj=None, seed=None):
"""Initialize a new sampler object."""
# This currently has to be done to reset the solver basis which is
# required to get deterministic warmup point generation
# (in turn required for a working `seed` arg)
if model.solver.is_integer:
raise TypeError("sampling does not work with integer problems :(")
self.model = model.copy()
self.thinning = thinning
if nproj is None:
self.nproj = int(min(len(self.model.variables)**3, 1e6))
else:
self.nproj = nproj
self.n_samples = 0
self.retries = 0
self.problem = self.__build_problem()
# Set up a map from reaction -> forward/reverse variable
var_idx = {v: idx for idx, v in enumerate(model.variables)}
self.fwd_idx = np.array([var_idx[r.forward_variable]
for r in model.reactions])
self.rev_idx = np.array([var_idx[r.reverse_variable]
for r in model.reactions])
self.warmup = None
if seed is None:
self._seed = int(time())
else:
self._seed = seed
# Avoid overflow
self._seed = self._seed % np.iinfo(np.int32).max
def __build_problem(self):
"""Build the matrix representation of the sampling problem."""
# Set up the mathematical problem
prob = constraint_matrices(self.model, zero_tol=feasibility_tol)
# check if there any non-zero equality constraints
equalities = prob.equalities
b = prob.b
bounds = np.atleast_2d(prob.bounds).T
var_bounds = np.atleast_2d(prob.variable_bounds).T
homogeneous = all(np.abs(b) < feasibility_tol)
fixed_non_zero = np.abs(prob.variable_bounds[:, 1]) > feasibility_tol
fixed_non_zero &= prob.variable_fixed
# check if there are any non-zero fixed variables, add them as
# equalities to the stoichiometric matrix
if any(fixed_non_zero):
n_fixed = fixed_non_zero.sum()
rows = np.zeros((n_fixed, prob.equalities.shape[1]))
rows[range(n_fixed), np.where(fixed_non_zero)] = 1.0
equalities = np.vstack([equalities, rows])
var_b = prob.variable_bounds[:, 1]
b = np.hstack([b, var_b[fixed_non_zero]])
homogeneous = False
# Set up a projection that can cast point into the nullspace
nulls = nullspace(equalities)
# convert bounds to a matrix and add variable bounds as well
return Problem(
equalities=shared_np_array(equalities.shape, equalities),
b=shared_np_array(b.shape, b),
inequalities=shared_np_array(prob.inequalities.shape,
prob.inequalities),
bounds=shared_np_array(bounds.shape, bounds),
variable_fixed=shared_np_array(prob.variable_fixed.shape,
prob.variable_fixed, integer=True),
variable_bounds=shared_np_array(var_bounds.shape, var_bounds),
nullspace=shared_np_array(nulls.shape, nulls),
homogeneous=homogeneous
)
[docs] def generate_fva_warmup(self):
"""Generate the warmup points for the sampler.
Generates warmup points by setting each flux as the sole objective
and minimizing/maximizing it. Also caches the projection of the
warmup points into the nullspace for non-homogeneous problems (only
if necessary).
"""
self.n_warmup = 0
reactions = self.model.reactions
self.warmup = np.zeros((2 * len(reactions), len(self.model.variables)))
self.model.objective = Zero
for sense in ("min", "max"):
self.model.objective_direction = sense
for i, r in enumerate(reactions):
variables = (self.model.variables[self.fwd_idx[i]],
self.model.variables[self.rev_idx[i]])
# Omit fixed reactions if they are non-homogeneous
if r.upper_bound - r.lower_bound < bounds_tol:
LOGGER.info("skipping fixed reaction %s" % r.id)
continue
self.model.objective.set_linear_coefficients(
{variables[0]: 1, variables[1]: -1})
self.model.slim_optimize()
if not self.model.solver.status == OPTIMAL:
LOGGER.info("can not maximize reaction %s, skipping it" %
r.id)
continue
primals = self.model.solver.primal_values
sol = [primals[v.name] for v in self.model.variables]
self.warmup[self.n_warmup, ] = sol
self.n_warmup += 1
# Reset objective
self.model.objective.set_linear_coefficients(
{variables[0]: 0, variables[1]: 0})
# Shrink to measure
self.warmup = self.warmup[0:self.n_warmup, :]
# Remove redundant search directions
keep = np.logical_not(self._is_redundant(self.warmup))
self.warmup = self.warmup[keep, :]
self.n_warmup = self.warmup.shape[0]
# Catch some special cases
if len(self.warmup.shape) == 1 or self.warmup.shape[0] == 1:
raise ValueError("Your flux cone consists only of a single point!")
elif self.n_warmup == 2:
if not self.problem.homogeneous:
raise ValueError("Can not sample from an inhomogenous problem"
" with only 2 search directions :(")
LOGGER.info("All search directions on a line, adding another one.")
newdir = self.warmup.T.dot([0.25, 0.25])
self.warmup = np.vstack([self.warmup, newdir])
self.n_warmup += 1
# Shrink warmup points to measure
self.warmup = shared_np_array(
(self.n_warmup, len(self.model.variables)), self.warmup)
[docs] def _reproject(self, p):
"""Reproject a point into the feasibility region.
This function is guaranteed to return a new feasible point. However,
no guarantees in terms of proximity to the original point can be made.
Parameters
----------
p : numpy.array
The current sample point.
Returns
-------
numpy.array
A new feasible point. If `p` was feasible it wil return p.
"""
nulls = self.problem.nullspace
equalities = self.problem.equalities
# don't reproject if point is feasible
if np.allclose(equalities.dot(p), self.problem.b,
rtol=0, atol=feasibility_tol):
new = p
else:
LOGGER.info("feasibility violated in sample"
" %d, trying to reproject" % self.n_samples)
new = nulls.dot(nulls.T.dot(p))
# Projections may violate bounds
# set to random point in space in that case
if any(new != p):
LOGGER.info("reprojection failed in sample"
" %d, using random point in space" % self.n_samples)
new = self._random_point()
return new
[docs] def _random_point(self):
"""Find an approximately random point in the flux cone."""
idx = np.random.randint(self.n_warmup,
size=min(2, np.ceil(np.sqrt(self.n_warmup))))
return self.warmup[idx, :].mean(axis=0)
[docs] def _is_redundant(self, matrix, cutoff=1.0 - feasibility_tol):
"""Identify rdeundant rows in a matrix that can be removed."""
# Avoid zero variances
extra_col = matrix[:, 0] + 1
# Avoid zero rows being correlated with constant rows
extra_col[matrix.sum(axis=1) == 0] = 2
corr = np.corrcoef(np.c_[matrix, extra_col])
corr = np.tril(corr, -1)
return (np.abs(corr) > cutoff).any(axis=1)
[docs] def _bounds_dist(self, p):
"""Get the lower and upper bound distances. Negative is bad."""
prob = self.problem
lb_dist = (p - prob.variable_bounds[0, ]).min()
ub_dist = (prob.variable_bounds[1, ] - p).min()
if prob.bounds.shape[0] > 0:
const = prob.inequalities.dot(p)
const_lb_dist = (const - prob.bounds[0, ]).min()
const_ub_dist = (prob.bounds[1, ] - const).min()
lb_dist = min(lb_dist, const_lb_dist)
ub_dist = min(ub_dist, const_ub_dist)
return np.array([lb_dist, ub_dist])
[docs] def sample(self, n, fluxes=True):
"""Abstract sampling function.
Should be overwritten by child classes.
"""
pass
[docs] def batch(self, batch_size, batch_num, fluxes=True):
"""Create a batch generator.
This is useful to generate n batches of m samples each.
Parameters
----------
batch_size : int
The number of samples contained in each batch (m).
batch_num : int
The number of batches in the generator (n).
fluxes : boolean
Whether to return fluxes or the internal solver variables. If set
to False will return a variable for each forward and backward flux
as well as all additional variables you might have defined in the
model.
Yields
------
pandas.DataFrame
A DataFrame with dimensions (batch_size x n_r) containing
a valid flux sample for a total of n_r reactions (or variables if
fluxes=False) in each row.
"""
for i in range(batch_num):
yield self.sample(batch_size, fluxes=fluxes)
[docs] def validate(self, samples):
"""Validate a set of samples for equality and inequality feasibility.
Can be used to check whether the generated samples and warmup points
are feasible.
Parameters
----------
samples : numpy.matrix
Must be of dimension (n_samples x n_reactions). Contains the
samples to be validated. Samples must be from fluxes.
Returns
-------
numpy.array
A one-dimensional numpy array of length containing
a code of 1 to 3 letters denoting the validation result:
- 'v' means feasible in bounds and equality constraints
- 'l' means a lower bound violation
- 'u' means a lower bound validation
- 'e' means and equality constraint violation
"""
samples = np.atleast_2d(samples)
prob = self.problem
if samples.shape[1] == len(self.model.reactions):
S = create_stoichiometric_matrix(self.model)
b = np.array([self.model.constraints[m.id].lb for m in
self.model.metabolites])
bounds = np.array([r.bounds for r in self.model.reactions]).T
elif samples.shape[1] == len(self.model.variables):
S = prob.equalities
b = prob.b
bounds = prob.variable_bounds
else:
raise ValueError("Wrong number of columns. samples must have a "
"column for each flux or variable defined in the "
"model!")
feasibility = np.abs(S.dot(samples.T).T - b)
feasibility = feasibility.max(axis=1)
lb_error = (samples - bounds[0, ]).min(axis=1)
ub_error = (bounds[1, ] - samples).min(axis=1)
if (samples.shape[1] == len(self.model.variables) and
prob.inequalities.shape[0]):
consts = prob.inequalities.dot(samples.T)
lb_error = np.minimum(
lb_error,
(consts - prob.bounds[0, ]).min(axis=1))
ub_error = np.minimum(
ub_error,
(prob.bounds[1, ] - consts).min(axis=1)
)
valid = (
(feasibility < feasibility_tol) &
(lb_error > -bounds_tol) &
(ub_error > -bounds_tol))
codes = np.repeat("", valid.shape[0]).astype(np.dtype((str, 3)))
codes[valid] = "v"
codes[lb_error <= -bounds_tol] = np.char.add(
codes[lb_error <= -bounds_tol], "l")
codes[ub_error <= -bounds_tol] = np.char.add(
codes[ub_error <= -bounds_tol], "u")
codes[feasibility > feasibility_tol] = np.char.add(
codes[feasibility > feasibility_tol], "e")
return codes
[docs]class ACHRSampler(HRSampler):
"""Artificial Centering Hit-and-Run sampler.
A sampler with low memory footprint and good convergence.
Parameters
----------
model : a cobra model
The cobra model from which to generate samples.
thinning : int, optional
The thinning factor of the generated sampling chain. A thinning of 10
means samples are returned every 10 steps.
nproj : int > 0, optional
How often to reproject the sampling point into the feasibility space.
Avoids numerical issues at the cost of lower sampling. If you observe
many equality constraint violations with `sampler.validate` you should
lower this number.
seed : int > 0, optional
Sets the random number seed. Initialized to the current time stamp if
None.
Attributes
----------
model : cobra.Model
The cobra model from which the samples get generated.
thinning : int
The currently used thinning factor.
n_samples : int
The total number of samples that have been generated by this
sampler instance.
problem : collections.namedtuple
A python object whose attributes define the entire sampling problem in
matrix form. See docstring of `Problem`.
warmup : a numpy matrix
A matrix of with as many columns as reactions in the model and more
than 3 rows containing a warmup sample in each row. None if no warmup
points have been generated yet.
retries : int
The overall of sampling retries the sampler has observed. Larger
values indicate numerical instabilities.
seed : positive integer, optional
Sets the random number seed. Initialized to the current time stamp if
None.
nproj : int
How often to reproject the sampling point into the feasibility space.
fwd_idx : np.array
Has one entry for each reaction in the model containing the index of
the respective forward variable.
rev_idx : np.array
Has one entry for each reaction in the model containing the index of
the respective reverse variable.
prev : numpy array
The current/last flux sample generated.
center : numpy array
The center of the sampling space as estimated by the mean of all
previously generated samples.
Notes
-----
ACHR generates samples by choosing new directions from the sampling space's
center and the warmup points. The implementation used here is the same
as in the Matlab Cobra Toolbox [2]_ and uses only the initial warmup points
to generate new directions and not any other previous iterates. This
usually gives better mixing since the startup points are chosen to span
the space in a wide manner. This also makes the generated sampling chain
quasi-markovian since the center converges rapidly.
Memory usage is roughly in the order of (2 * number reactions)^2
due to the required nullspace matrices and warmup points. So large
models easily take up a few GB of RAM.
References
----------
.. [1] Direction Choice for Accelerated Convergence in Hit-and-Run Sampling
David E. Kaufman Robert L. Smith
Operations Research 199846:1 , 84-95
https://doi.org/10.1287/opre.46.1.84
.. [2] https://github.com/opencobra/cobratoolbox
"""
[docs] def __init__(self, model, thinning=100, nproj=None, seed=None):
"""Initialize a new ACHRSampler."""
super(ACHRSampler, self).__init__(model, thinning, nproj=nproj,
seed=seed)
self.generate_fva_warmup()
self.prev = self.center = self.warmup.mean(axis=0)
np.random.seed(self._seed)
def __single_iteration(self):
pi = np.random.randint(self.n_warmup)
# mix in the original warmup points to not get stuck
delta = self.warmup[pi, ] - self.center
self.prev = _step(self, self.prev, delta)
if self.problem.homogeneous and (self.n_samples *
self.thinning % self.nproj == 0):
self.prev = self._reproject(self.prev)
self.center = self._reproject(self.center)
self.center = ((self.n_samples * self.center) / (self.n_samples + 1) +
self.prev / (self.n_samples + 1))
self.n_samples += 1
[docs] def sample(self, n, fluxes=True):
"""Generate a set of samples.
This is the basic sampling function for all hit-and-run samplers.
Parameters
----------
n : int
The number of samples that are generated at once.
fluxes : boolean
Whether to return fluxes or the internal solver variables. If set
to False will return a variable for each forward and backward flux
as well as all additional variables you might have defined in the
model.
Returns
-------
numpy.matrix
Returns a matrix with `n` rows, each containing a flux sample.
Notes
-----
Performance of this function linearly depends on the number
of reactions in your model and the thinning factor.
"""
samples = np.zeros((n, self.warmup.shape[1]))
for i in range(1, self.thinning * n + 1):
self.__single_iteration()
if i % self.thinning == 0:
samples[i//self.thinning - 1, ] = self.prev
if fluxes:
names = [r.id for r in self.model.reactions]
return pandas.DataFrame(
samples[:, self.fwd_idx] - samples[:, self.rev_idx],
columns=names)
else:
names = [v.name for v in self.model.variables]
return pandas.DataFrame(samples, columns=names)
# Unfortunately this has to be outside the class to be usable with
# multiprocessing :()
[docs]def _sample_chain(args):
"""Sample a single chain for OptGPSampler.
center and n_samples are updated locally and forgotten afterwards.
"""
n, idx = args # has to be this way to work in Python 2.7
center = sampler.center
np.random.seed((sampler._seed + idx) % np.iinfo(np.int32).max)
pi = np.random.randint(sampler.n_warmup)
prev = sampler.warmup[pi, ]
prev = _step(sampler, center, prev - center, 0.95)
n_samples = max(sampler.n_samples, 1)
samples = np.zeros((n, center.shape[0]))
for i in range(1, sampler.thinning * n + 1):
pi = np.random.randint(sampler.n_warmup)
delta = sampler.warmup[pi, ] - center
prev = _step(sampler, prev, delta)
if sampler.problem.homogeneous and (
n_samples * sampler.thinning % sampler.nproj == 0):
prev = sampler._reproject(prev)
center = sampler._reproject(center)
if i % sampler.thinning == 0:
samples[i//sampler.thinning - 1, ] = prev
center = ((n_samples * center) / (n_samples + 1) +
prev / (n_samples + 1))
n_samples += 1
return (sampler.retries, samples)
[docs]class OptGPSampler(HRSampler):
"""A parallel optimized sampler.
A parallel sampler with fast convergence and parallel execution. See [1]_
for details.
Parameters
----------
model : cobra.Model
The cobra model from which to generate samples.
processes: int
The number of processes used during sampling.
thinning : int, optional
The thinning factor of the generated sampling chain. A thinning of 10
means samples are returned every 10 steps.
nproj : int > 0, optional
How often to reproject the sampling point into the feasibility space.
Avoids numerical issues at the cost of lower sampling. If you observe
many equality constraint violations with `sampler.validate` you should
lower this number.
seed : int > 0, optional
Sets the random number seed. Initialized to the current time stamp if
None.
Attributes
----------
model : cobra.Model
The cobra model from which the samples get generated.
thinning : int
The currently used thinning factor.
n_samples : int
The total number of samples that have been generated by this
sampler instance.
problem : collections.namedtuple
A python object whose attributes define the entire sampling problem in
matrix form. See docstring of `Problem`.
warmup : a numpy matrix
A matrix of with as many columns as reactions in the model and more
than 3 rows containing a warmup sample in each row. None if no warmup
points have been generated yet.
retries : int
The overall of sampling retries the sampler has observed. Larger
values indicate numerical instabilities.
seed : positive integer, optional
Sets the random number seed. Initialized to the current time stamp if
None.
nproj : int
How often to reproject the sampling point into the feasibility space.
fwd_idx : np.array
Has one entry for each reaction in the model containing the index of
the respective forward variable.
rev_idx : np.array
Has one entry for each reaction in the model containing the index of
the respective reverse variable.
prev : numpy.array
The current/last flux sample generated.
center : numpy.array
The center of the sampling space as estimated by the mean of all
previously generated samples.
Notes
-----
The sampler is very similar to artificial centering where each process
samples its own chain. Initial points are chosen randomly from the warmup
points followed by a linear transformation that pulls the points towards
the a little bit towards the center of the sampling space.
If the number of processes used is larger than one the requested
number of samples is adjusted to the smallest multiple of the number of
processes larger than the requested sample number. For instance, if you
have 3 processes and request 8 samples you will receive 9.
Memory usage is roughly in the order of (2 * number reactions)^2
due to the required nullspace matrices and warmup points. So large
models easily take up a few GB of RAM. However, most of the large matrices
are kept in shared memory. So the RAM usage is independent of the number
of processes.
References
----------
.. [1] Megchelenbrink W, Huynen M, Marchiori E (2014)
optGpSampler: An Improved Tool for Uniformly Sampling the Solution-Space
of Genome-Scale Metabolic Networks.
PLoS ONE 9(2): e86587.
https://doi.org/10.1371/journal.pone.0086587
"""
[docs] def __init__(self, model, processes, thinning=100, nproj=None,
seed=None):
"""Initialize a new OptGPSampler."""
super(OptGPSampler, self).__init__(model, thinning, seed=seed)
self.generate_fva_warmup()
self.processes = processes
# This maps our saved center into shared memory,
# meaning they are synchronized across processes
self.center = shared_np_array((len(model.variables), ),
self.warmup.mean(axis=0))
[docs] def sample(self, n, fluxes=True):
"""Generate a set of samples.
This is the basic sampling function for all hit-and-run samplers.
Paramters
---------
n : int
The minimum number of samples that are generated at once
(see Notes).
fluxes : boolean
Whether to return fluxes or the internal solver variables. If set
to False will return a variable for each forward and backward flux
as well as all additional variables you might have defined in the
model.
Returns
-------
numpy.matrix
Returns a matrix with `n` rows, each containing a flux sample.
Notes
-----
Performance of this function linearly depends on the number
of reactions in your model and the thinning factor.
If the number of processes is larger than one, computation is split
across as the CPUs of your machine. This may shorten computation time.
However, there is also overhead in setting up parallel computation so
we recommend to calculate large numbers of samples at once
(`n` > 1000).
"""
if self.processes > 1:
n_process = np.ceil(n / self.processes).astype(int)
n = n_process * self.processes
# The cast to list is weird but not doing it gives recursion
# limit errors, something weird going on with multiprocessing
args = list(zip(
[n_process] * self.processes, range(self.processes)))
# No with statement or starmap here since Python 2.x
# does not support it :(
mp = Pool(self.processes, initializer=mp_init, initargs=(self,))
results = mp.map(_sample_chain, args, chunksize=1)
mp.close()
mp.join()
chains = np.vstack([r[1] for r in results])
self.retries += sum(r[0] for r in results)
else:
mp_init(self)
results = _sample_chain((n, 0))
chains = results[1]
# Update the global center
self.center = (self.n_samples * self.center +
np.atleast_2d(chains).sum(0)) / (self.n_samples + n)
self.n_samples += n
if fluxes:
names = [r.id for r in self.model.reactions]
return pandas.DataFrame(
chains[:, self.fwd_idx] - chains[:, self.rev_idx],
columns=names)
else:
names = [v.name for v in self.model.variables]
return pandas.DataFrame(chains, columns=names)
# Models can be large so don't pass them around during multiprocessing
[docs] def __getstate__(self):
"""Return the object for serialization."""
d = dict(self.__dict__)
del d['model']
return d
[docs]def sample(model, n, method="optgp", thinning=100, processes=1, seed=None):
"""Sample valid flux distributions from a cobra model.
The function samples valid flux distributions from a cobra model.
Currently we support two methods:
1. 'optgp' (default) which uses the OptGPSampler that supports parallel
sampling [1]_. Requires large numbers of samples to be performant
(n < 1000). For smaller samples 'achr' might be better suited.
or
2. 'achr' which uses artificial centering hit-and-run. This is a single
process method with good convergence [2]_.
Parameters
----------
model : cobra.Model
The model from which to sample flux distributions.
n : int
The number of samples to obtain. When using 'optgp' this must be a
multiple of `processes`, otherwise a larger number of samples will be
returned.
method : str, optional
The sampling algorithm to use.
thinning : int, optional
The thinning factor of the generated sampling chain. A thinning of 10
means samples are returned every 10 steps. Defaults to 100 which in
benchmarks gives approximately uncorrelated samples. If set to one
will return all iterates.
processes : int, optional
Only used for 'optgp'. The number of processes used to generate
samples.
seed : positive integer, optional
The random number seed to be used. Initialized to current time stamp
if None.
Returns
-------
pandas.DataFrame
The generated flux samples. Each row corresponds to a sample of the
fluxes and the columns are the reactions.
Notes
-----
The samplers have a correction method to ensure equality feasibility for
long-running chains, however this will only work for homogeneous models,
meaning models with no non-zero fixed variables or constraints (
right-hand side of the equalities are zero).
References
----------
.. [1] Megchelenbrink W, Huynen M, Marchiori E (2014)
optGpSampler: An Improved Tool for Uniformly Sampling the Solution-Space
of Genome-Scale Metabolic Networks.
PLoS ONE 9(2): e86587.
.. [2] Direction Choice for Accelerated Convergence in Hit-and-Run Sampling
David E. Kaufman Robert L. Smith
Operations Research 199846:1 , 84-95
"""
if method == "optgp":
sampler = OptGPSampler(model, processes, thinning=thinning, seed=seed)
elif method == "achr":
sampler = ACHRSampler(model, thinning=thinning, seed=seed)
else:
raise ValueError("method must be 'optgp' or 'achr'!")
return pandas.DataFrame(columns=[rxn.id for rxn in model.reactions],
data=sampler.sample(n))