Source code for cobra.flux_analysis.variability

"""Provide variability based methods such as flux variability or gene essentiality."""


import logging
from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union
from warnings import warn

import numpy as np
import pandas as pd
from optlang.symbolics import Zero

from ..core import Configuration, get_solution
from ..util import ProcessPool
from ..util import solver as sutil
from .deletion import single_gene_deletion, single_reaction_deletion
from .helpers import normalize_cutoff
from .loopless import loopless_fva_iter
from .parsimonious import add_pfba


if TYPE_CHECKING:
    from cobra import Gene, Model, Reaction


[docs]logger = logging.getLogger(__name__)
[docs]configuration = Configuration()
[docs]def _init_worker(model: "Model", loopless: bool, sense: str) -> None: """Initialize a global model object for multiprocessing. Parameters ---------- model: cobra.Model The model to operate on. loopless: bool Whether to use loopless version. sense: {"max", "min"} Whether to maximise or minimise objective. """ global _model global _loopless _model = model _model.solver.objective.direction = sense _loopless = loopless
[docs]def _fva_step(reaction_id: str) -> Tuple[str, float]: """Take a step for calculating FVA. Parameters ---------- reaction_id: str The ID of the reaction. Returns ------- tuple of (str, float) The reaction ID with the flux value. """ global _model global _loopless rxn = _model.reactions.get_by_id(reaction_id) # The previous objective assignment already triggers a reset # so directly update coefs here to not trigger redundant resets # in the history manager which can take longer than the actual # FVA for small models _model.solver.objective.set_linear_coefficients( {rxn.forward_variable: 1, rxn.reverse_variable: -1} ) _model.slim_optimize() sutil.check_solver_status(_model.solver.status) if _loopless: value = loopless_fva_iter(_model, rxn) else: value = _model.solver.objective.value # handle infeasible case if value is None: value = float("nan") logger.warning( f"Could not get flux for reaction {rxn.id}, setting it to NaN. " "This is usually due to numerical instability." ) _model.solver.objective.set_linear_coefficients( {rxn.forward_variable: 0, rxn.reverse_variable: 0} ) return reaction_id, value
[docs]def flux_variability_analysis( model: "Model", reaction_list: Optional[List[Union["Reaction", str]]] = None, loopless: bool = False, fraction_of_optimum: float = 1.0, pfba_factor: Optional[float] = None, processes: Optional[int] = None, ) -> pd.DataFrame: """Determine the minimum and maximum flux value for each reaction. Parameters ---------- model : cobra.Model The model for which to run the analysis. It will *not* be modified. reaction_list : list of cobra.Reaction or str, optional The reactions for which to obtain min/max fluxes. If None will use all reactions in the model (default None). loopless : bool, optional Whether to return only loopless solutions. This is significantly slower. Please also refer to the notes (default False). fraction_of_optimum : float, optional Must be <= 1.0. Requires that the objective value is at least the fraction times maximum objective value. A value of 0.85 for instance means that the objective has to be at least at 85% percent of its maximum (default 1.0). pfba_factor : float, optional Add an additional constraint to the model that requires the total sum of absolute fluxes must not be larger than this value times the smallest possible sum of absolute fluxes, i.e., by setting the value to 1.1 the total sum of absolute fluxes must not be more than 10% larger than the pFBA solution. Since the pFBA solution is the one that optimally minimizes the total flux sum, the `pfba_factor` should, if set, be larger than one. Setting this value may lead to more realistic predictions of the effective flux bounds (default None). processes : int, optional The number of parallel processes to run. If not explicitly passed, will be set from the global configuration singleton (default None). Returns ------- pandas.DataFrame A data frame with reaction identifiers as the index and two columns: - maximum: indicating the highest possible flux - minimum: indicating the lowest possible flux Notes ----- This implements the fast version as described in [1]_. Please note that the flux distribution containing all minimal/maximal fluxes does not have to be a feasible solution for the model. Fluxes are minimized/maximized individually and a single minimal flux might require all others to be sub-optimal. Using the loopless option will lead to a significant increase in computation time (about a factor of 100 for large models). However, the algorithm used here (see [2]_) is still more than 1000x faster than the "naive" version using `add_loopless(model)`. Also note that if you have included constraints that force a loop (for instance by setting all fluxes in a loop to be non-zero) this loop will be included in the solution. References ---------- .. [1] Computationally efficient flux variability analysis. Gudmundsson S, Thiele I. BMC Bioinformatics. 2010 Sep 29;11:489. doi: 10.1186/1471-2105-11-489, PMID: 20920235 .. [2] CycleFreeFlux: efficient removal of thermodynamically infeasible loops from flux distributions. Desouki AA, Jarre F, Gelius-Dietrich G, Lercher MJ. Bioinformatics. 2015 Jul 1;31(13):2159-65. doi: 10.1093/bioinformatics/btv096. """ if reaction_list is None: reaction_ids = [r.id for r in model.reactions] else: reaction_ids = [r.id for r in model.reactions.get_by_any(reaction_list)] if processes is None: processes = configuration.processes num_reactions = len(reaction_ids) processes = min(processes, num_reactions) fva_result = pd.DataFrame( { "minimum": np.zeros(num_reactions, dtype=float), "maximum": np.zeros(num_reactions, dtype=float), }, index=reaction_ids, ) prob = model.problem with model: # Safety check before setting up FVA. model.slim_optimize( error_value=None, message="There is no optimal solution for the chosen objective!", ) # Add the previous objective as a variable to the model then set it to # zero. This also uses the fraction to create the lower/upper bound for # the old objective. # TODO: Use utility function here (fix_objective_as_constraint)? if model.solver.objective.direction == "max": fva_old_objective = prob.Variable( "fva_old_objective", lb=fraction_of_optimum * model.solver.objective.value, ) else: fva_old_objective = prob.Variable( "fva_old_objective", ub=fraction_of_optimum * model.solver.objective.value, ) fva_old_obj_constraint = prob.Constraint( model.solver.objective.expression - fva_old_objective, lb=0, ub=0, name="fva_old_objective_constraint", ) model.add_cons_vars([fva_old_objective, fva_old_obj_constraint]) if pfba_factor is not None: if pfba_factor < 1.0: warn( "The 'pfba_factor' should be larger or equal to 1.", UserWarning, ) with model: add_pfba(model, fraction_of_optimum=0) ub = model.slim_optimize(error_value=None) flux_sum = prob.Variable("flux_sum", ub=pfba_factor * ub) flux_sum_constraint = prob.Constraint( model.solver.objective.expression - flux_sum, lb=0, ub=0, name="flux_sum_constraint", ) model.add_cons_vars([flux_sum, flux_sum_constraint]) model.objective = Zero # This will trigger the reset as well for what in ("minimum", "maximum"): if processes > 1: # We create and destroy a new pool here in order to set the # objective direction for all reactions. This creates a # slight overhead but seems the most clean. chunk_size = len(reaction_ids) // processes with ProcessPool( processes, initializer=_init_worker, initargs=(model, loopless, what[:3]), ) as pool: for rxn_id, value in pool.imap_unordered( _fva_step, reaction_ids, chunksize=chunk_size ): fva_result.at[rxn_id, what] = value else: _init_worker(model, loopless, what[:3]) for rxn_id, value in map(_fva_step, reaction_ids): fva_result.at[rxn_id, what] = value return fva_result[["minimum", "maximum"]]
[docs]def find_blocked_reactions( model: "Model", reaction_list: Optional[List[Union["Reaction", str]]] = None, zero_cutoff: Optional[float] = None, open_exchanges: bool = False, processes: Optional[int] = None, ) -> List["Reaction"]: """Find reactions that cannot carry any flux. The question whether or not a reaction is blocked is highly dependent on the current exchange reaction settings for a COBRA model. Hence an argument is provided to open all exchange reactions. Parameters ---------- model : cobra.Model The model to analyze. reaction_list : list of cobra.Reaction or str, optional List of reactions to consider, the default includes all model reactions (default None). zero_cutoff : float, optional Flux value which is considered to effectively be zero. The default is set to use `model.tolerance` (default None). open_exchanges : bool, optional Whether or not to open all exchange reactions to very high flux ranges (default False). processes : int, optional The number of parallel processes to run. Can speed up the computations if the number of reactions is large. If not explicitly passed, it will be set from the global configuration singleton (default None). Returns ------- list of cobra.Reaction List with the identifiers of blocked reactions. Notes ----- Sink and demand reactions are left untouched. Please modify them manually. """ zero_cutoff = normalize_cutoff(model, zero_cutoff) with model: if open_exchanges: for reaction in model.exchanges: reaction.bounds = ( min(reaction.lower_bound, -1000), max(reaction.upper_bound, 1000), ) if reaction_list is None: reaction_list = model.reactions # Limit the search space to reactions which have zero flux. If the # reactions already carry flux in this solution, # then they cannot be blocked. model.slim_optimize() solution = get_solution(model, reactions=reaction_list) reaction_list = solution.fluxes[ solution.fluxes.abs() < zero_cutoff ].index.tolist() # Run FVA to find reactions where both the minimal and maximal flux # are zero (below the cut off). flux_span = flux_variability_analysis( model, fraction_of_optimum=0.0, reaction_list=reaction_list, processes=processes, ) return flux_span[flux_span.abs().max(axis=1) < zero_cutoff].index.tolist()
[docs]def find_essential_genes( model: "Model", threshold: Optional[float] = None, processes: Optional[int] = None, ) -> Set["Gene"]: """Return a set of essential genes. A gene is considered essential if restricting the flux of all reactions that depend on it to zero causes the objective, e.g., the growth rate, to also be zero, below the threshold, or infeasible. Parameters ---------- model : cobra.Model The model to find the essential genes for. threshold : float, optional Minimal objective flux to be considered viable. By default this is 1% of the maximal objective (default None). processes : int, optional The number of parallel processes to run. Can speed up the computations if the number of knockouts to perform is large. If not explicitly passed, it will be set from the global configuration singleton (default None). Returns ------- set of cobra.Gene Set of essential genes. """ if threshold is None: threshold = model.slim_optimize(error_value=None) * 1e-02 deletions = single_gene_deletion(model, method="fba", processes=processes) essential = deletions.loc[ deletions["growth"].isna() | (deletions["growth"] < threshold), : ].ids return {model.genes.get_by_id(g) for ids in essential for g in ids}
[docs]def find_essential_reactions( model: "Model", threshold: Optional[float] = None, processes: Optional[int] = None, ) -> Set["Reaction"]: """Return a set of essential reactions. A reaction is considered essential if restricting its flux to zero causes the objective, e.g., the growth rate, to also be zero, below the threshold, or infeasible. Parameters ---------- model : cobra.Model The model to find the essential reactions for. threshold : float, optional Minimal objective flux to be considered viable. By default this is 1% of the maximal objective (default None). processes : int, optional The number of parallel processes to run. Can speed up the computations if the number of knockouts to perform is large. If not explicitly passed, it will be set from the global configuration singleton (default None). Returns ------- set of cobra.Reaction Set of essential reactions. """ if threshold is None: threshold = model.slim_optimize(error_value=None) * 1e-02 deletions = single_reaction_deletion(model, method="fba", processes=processes) essential = deletions.loc[ deletions["growth"].isna() | (deletions["growth"] < threshold), : ].ids return {model.reactions.get_by_id(r) for ids in essential for r in ids}