# Source code for cobra.flux_analysis.variability

# -*- coding: utf-8 -*-

from __future__ import absolute_import

import logging
import multiprocessing
from builtins import map
from warnings import warn

from numpy import zeros
from optlang.symbolics import Zero
from pandas import DataFrame

from cobra.core import Configuration, get_solution
from cobra.flux_analysis.deletion import single_gene_deletion, single_reaction_deletion
from cobra.flux_analysis.helpers import normalize_cutoff
from cobra.flux_analysis.loopless import loopless_fva_iter
from cobra.util import solver as sutil

[docs]LOGGER = logging.getLogger(__name__)
[docs]CONFIGURATION = Configuration()

[docs]def _init_worker(model, loopless, sense):
"""Initialize a global model object for multiprocessing."""
global _model
global _loopless
_model = model
_model.solver.objective.direction = sense
_loopless = loopless

[docs]def _fva_step(reaction_id):
global _model
global _loopless
rxn = _model.reactions.get_by_id(reaction_id)
# The previous objective assignment already triggers a reset
# so directly update coefs here to not trigger redundant resets
# in the history manager which can take longer than the actual
# FVA for small models
_model.solver.objective.set_linear_coefficients(
{rxn.forward_variable: 1, rxn.reverse_variable: -1}
)
_model.slim_optimize()
sutil.check_solver_status(_model.solver.status)
if _loopless:
value = loopless_fva_iter(_model, rxn)
else:
value = _model.solver.objective.value
# handle infeasible case
if value is None:
value = float("nan")
LOGGER.warning(
"Could not get flux for reaction %s,  setting "
"it to NaN. This is usually due to numerical instability.",
rxn.id,
)
_model.solver.objective.set_linear_coefficients(
{rxn.forward_variable: 0, rxn.reverse_variable: 0}
)
return reaction_id, value

[docs]def flux_variability_analysis(
model,
reaction_list=None,
loopless=False,
fraction_of_optimum=1.0,
pfba_factor=None,
processes=None,
):
"""
Determine the minimum and maximum possible flux value for each reaction.

Parameters
----------
model : cobra.Model
The model for which to run the analysis. It will *not* be modified.
reaction_list : list of cobra.Reaction or str, optional
The reactions for which to obtain min/max fluxes. If None will use
all reactions in the model (default).
loopless : boolean, optional
Whether to return only loopless solutions. This is significantly
slower. Please also refer to the notes.
fraction_of_optimum : float, optional
Must be <= 1.0. Requires that the objective value is at least the
fraction times maximum objective value. A value of 0.85 for instance
means that the objective has to be at least at 85% percent of its
maximum.
pfba_factor : float, optional
Add an additional constraint to the model that requires the total sum
of absolute fluxes must not be larger than this value times the
smallest possible sum of absolute fluxes, i.e., by setting the value
to 1.1 the total sum of absolute fluxes must not be more than
10% larger than the pFBA solution. Since the pFBA solution is the
one that optimally minimizes the total flux sum, the pfba_factor
should, if set, be larger than one. Setting this value may lead to
more realistic predictions of the effective flux bounds.
processes : int, optional
The number of parallel processes to run. If not explicitly passed,
will be set from the global configuration singleton.

Returns
-------
pandas.DataFrame
A data frame with reaction identifiers as the index and two columns:
- maximum: indicating the highest possible flux
- minimum: indicating the lowest possible flux

Notes
-----
This implements the fast version as described in [1]_. Please note that
the flux distribution containing all minimal/maximal fluxes does not have
to be a feasible solution for the model. Fluxes are minimized/maximized
individually and a single minimal flux might require all others to be
suboptimal.

Using the loopless option will lead to a significant increase in
computation time (about a factor of 100 for large models). However, the
algorithm used here (see [2]_) is still more than 1000x faster than the
"naive" version using add_loopless(model). Also note that if you have
included constraints that force a loop (for instance by setting all fluxes
in a loop to be non-zero) this loop will be included in the solution.

References
----------
.. [1] Computationally efficient flux variability analysis.
Gudmundsson S, Thiele I.
BMC Bioinformatics. 2010 Sep 29;11:489.
doi: 10.1186/1471-2105-11-489, PMID: 20920235

.. [2] CycleFreeFlux: efficient removal of thermodynamically infeasible
loops from flux distributions.
Desouki AA, Jarre F, Gelius-Dietrich G, Lercher MJ.
Bioinformatics. 2015 Jul 1;31(13):2159-65.
doi: 10.1093/bioinformatics/btv096.
"""
if reaction_list is None:
reaction_ids = [r.id for r in model.reactions]
else:
reaction_ids = [r.id for r in model.reactions.get_by_any(reaction_list)]

if processes is None:
processes = CONFIGURATION.processes
num_reactions = len(reaction_ids)
processes = min(processes, num_reactions)

fva_result = DataFrame(
{
"minimum": zeros(num_reactions, dtype=float),
"maximum": zeros(num_reactions, dtype=float),
},
index=reaction_ids,
)
prob = model.problem
with model:
# Safety check before setting up FVA.
model.slim_optimize(
error_value=None,
message="There is no optimal solution for the " "chosen objective!",
)
# Add the previous objective as a variable to the model then set it to
# zero. This also uses the fraction to create the lower/upper bound for
# the old objective.
# TODO: Use utility function here (fix_objective_as_constraint)?
if model.solver.objective.direction == "max":
fva_old_objective = prob.Variable(
"fva_old_objective",
lb=fraction_of_optimum * model.solver.objective.value,
)
else:
fva_old_objective = prob.Variable(
"fva_old_objective",
ub=fraction_of_optimum * model.solver.objective.value,
)
fva_old_obj_constraint = prob.Constraint(
model.solver.objective.expression - fva_old_objective,
lb=0,
ub=0,
name="fva_old_objective_constraint",
)

if pfba_factor is not None:
if pfba_factor < 1.0:
warn(
"The 'pfba_factor' should be larger or equal to 1.",
UserWarning,
)
with model:
ub = model.slim_optimize(error_value=None)
flux_sum = prob.Variable("flux_sum", ub=pfba_factor * ub)
flux_sum_constraint = prob.Constraint(
model.solver.objective.expression - flux_sum,
lb=0,
ub=0,
name="flux_sum_constraint",
)

model.objective = Zero  # This will trigger the reset as well
for what in ("minimum", "maximum"):
if processes > 1:
# We create and destroy a new pool here in order to set the
# objective direction for all reactions. This creates a
# slight overhead but seems the most clean.
chunk_size = len(reaction_ids) // processes
pool = multiprocessing.Pool(
processes,
initializer=_init_worker,
initargs=(model, loopless, what[:3]),
)
for rxn_id, value in pool.imap_unordered(
_fva_step, reaction_ids, chunksize=chunk_size
):
fva_result.at[rxn_id, what] = value
pool.close()
pool.join()
else:
_init_worker(model, loopless, what[:3])
for rxn_id, value in map(_fva_step, reaction_ids):
fva_result.at[rxn_id, what] = value

return fva_result[["minimum", "maximum"]]

[docs]def find_blocked_reactions(
model,
reaction_list=None,
zero_cutoff=None,
open_exchanges=False,
processes=None,
):
"""
Find reactions that cannot carry any flux.

The question whether or not a reaction is blocked is highly dependent
on the current exchange reaction settings for a COBRA model. Hence an
argument is provided to open all exchange reactions.

Notes
-----
Sink and demand reactions are left untouched. Please modify them manually.

Parameters
----------
model : cobra.Model
The model to analyze.
reaction_list : list, optional
List of reactions to consider, the default includes all model
reactions.
zero_cutoff : float, optional
Flux value which is considered to effectively be zero
(default model.tolerance).
open_exchanges : bool, optional
Whether or not to open all exchange reactions to very high flux ranges.
processes : int, optional
The number of parallel processes to run. Can speed up the computations
if the number of reactions is large. If not explicitly
passed, it will be set from the global configuration singleton.

Returns
-------
list
List with the identifiers of blocked reactions.

"""
zero_cutoff = normalize_cutoff(model, zero_cutoff)

with model:
if open_exchanges:
for reaction in model.exchanges:
reaction.bounds = (
min(reaction.lower_bound, -1000),
max(reaction.upper_bound, 1000),
)
if reaction_list is None:
reaction_list = model.reactions
# Limit the search space to reactions which have zero flux. If the
# reactions already carry flux in this solution,
# then they cannot be blocked.
model.slim_optimize()
solution = get_solution(model, reactions=reaction_list)
reaction_list = solution.fluxes[
solution.fluxes.abs() < zero_cutoff
].index.tolist()
# Run FVA to find reactions where both the minimal and maximal flux
# are zero (below the cut off).
flux_span = flux_variability_analysis(
model,
fraction_of_optimum=0.0,
reaction_list=reaction_list,
processes=processes,
)
return flux_span[flux_span.abs().max(axis=1) < zero_cutoff].index.tolist()

[docs]def find_essential_genes(model, threshold=None, processes=None):
"""
Return a set of essential genes.

A gene is considered essential if restricting the flux of all reactions
that depend on it to zero causes the objective, e.g., the growth rate,
to also be zero, below the threshold, or infeasible.

Parameters
----------
model : cobra.Model
The model to find the essential genes for.
threshold : float, optional
Minimal objective flux to be considered viable. By default this is
1% of the maximal objective.
processes : int, optional
The number of parallel processes to run. If not passed,
will be set to the number of CPUs found.
processes : int, optional
The number of parallel processes to run. Can speed up the computations
if the number of knockouts to perform is large. If not explicitly
passed, it will be set from the global configuration singleton.

Returns
-------
set
Set of essential genes
"""
if threshold is None:
threshold = model.slim_optimize(error_value=None) * 1e-02
deletions = single_gene_deletion(model, method="fba", processes=processes)
essential = deletions.loc[
deletions["growth"].isna() | (deletions["growth"] < threshold), :
].ids
return {model.genes.get_by_id(g) for ids in essential for g in ids}

[docs]def find_essential_reactions(model, threshold=None, processes=None):
"""Return a set of essential reactions.

A reaction is considered essential if restricting its flux to zero
causes the objective, e.g., the growth rate, to also be zero, below the
threshold, or infeasible.

Parameters
----------
model : cobra.Model
The model to find the essential reactions for.
threshold : float, optional
Minimal objective flux to be considered viable. By default this is
1% of the maximal objective.
processes : int, optional
The number of parallel processes to run. Can speed up the computations
if the number of knockouts to perform is large. If not explicitly
passed, it will be set from the global configuration singleton.

Returns
-------
set
Set of essential reactions
"""
if threshold is None:
threshold = model.slim_optimize(error_value=None) * 1e-02
deletions = single_reaction_deletion(model, method="fba", processes=processes)
essential = deletions.loc[
deletions["growth"].isna() | (deletions["growth"] < threshold), :
].ids
return {model.reactions.get_by_id(r) for ids in essential for r in ids}