"""Provide functions to remove thermodynamically infeasible loops."""
from typing import TYPE_CHECKING, Dict, Optional, Union
import numpy as np
from optlang.symbolics import Zero
from ..core import get_solution
from ..util import create_stoichiometric_matrix, nullspace
from .helpers import normalize_cutoff
if TYPE_CHECKING:
from cobra import Model, Reaction, Solution
[docs]def add_loopless(model: "Model", zero_cutoff: Optional[float] = None) -> None:
"""Modify a model so all feasible flux distributions are loopless.
It adds variables and constraints to a model which will disallow flux
distributions with loops. The used formulation is described in [1]_.
This function *will* modify your model.
In most cases you probably want to use the much faster
`loopless_solution`. May be used in cases where you want to add complex
constraints and objecives (for instance quadratic objectives) to the
model afterwards or use an approximation of Gibbs free energy directions
in your model.
Parameters
----------
model : cobra.Model
The model to which to add the constraints.
zero_cutoff : positive float, optional
Cutoff used for null space. Coefficients with an absolute value
smaller than `zero_cutoff` are considered to be zero. The default
uses the `model.tolerance` (default None).
References
----------
.. [1] Elimination of thermodynamically infeasible loops in steady-state
metabolic models. Schellenberger J, Lewis NE, Palsson BO. Biophys J.
2011 Feb 2;100(3):544-53. doi: 10.1016/j.bpj.2010.12.3707. Erratum
in: Biophys J. 2011 Mar 2;100(5):1381.
"""
zero_cutoff = normalize_cutoff(model, zero_cutoff)
internal = [i for i, r in enumerate(model.reactions) if not r.boundary]
s_int = create_stoichiometric_matrix(model)[:, np.array(internal)]
n_int = nullspace(s_int).T
max_bound = max(max(abs(b) for b in r.bounds) for r in model.reactions)
prob = model.problem
# Add indicator variables and new constraints
to_add = []
for i in internal:
rxn = model.reactions[i]
# indicator variable a_i
indicator = prob.Variable(f"indicator_{rxn.id}", type="binary")
# -M*(1 - a_i) <= v_i <= M*a_i
on_off_constraint = prob.Constraint(
rxn.flux_expression - max_bound * indicator,
lb=-max_bound,
ub=0,
name=f"on_off_{rxn.id}",
)
# -(max_bound + 1) * a_i + 1 <= G_i <= -(max_bound + 1) * a_i + 1000
delta_g = prob.Variable(f"delta_g_{rxn.id}")
delta_g_range = prob.Constraint(
delta_g + (max_bound + 1) * indicator,
lb=1,
ub=max_bound,
name=f"delta_g_range_{rxn.id}",
)
to_add.extend([indicator, on_off_constraint, delta_g, delta_g_range])
model.add_cons_vars(to_add)
# Add nullspace constraints for G_i
for i, row in enumerate(n_int):
name = f"nullspace_constraint_{str(i)}"
nullspace_constraint = prob.Constraint(Zero, lb=0, ub=0, name=name)
model.add_cons_vars([nullspace_constraint])
coefs = {
model.variables[f"delta_g_{model.reactions[ridx].id}"]: row[i]
for i, ridx in enumerate(internal)
if abs(row[i]) > zero_cutoff
}
model.constraints[name].set_linear_coefficients(coefs)
[docs]def _add_cycle_free(model: "Model", fluxes: Dict[str, float]) -> None:
"""Add constraints for CycleFreeFlux.
Parameters
----------
model : cobra.Model
The model to operate on.
fluxes : dict of {str: float}
A dictionary having keys as reaction IDs and values as their flux
values.
"""
model.objective = model.solver.interface.Objective(
Zero, direction="min", sloppy=True
)
objective_vars = []
for rxn in model.reactions:
flux = fluxes[rxn.id]
if rxn.boundary:
rxn.bounds = (flux, flux)
continue
if flux >= 0:
rxn.bounds = max(0, rxn.lower_bound), max(flux, rxn.upper_bound)
objective_vars.append(rxn.forward_variable)
else:
rxn.bounds = min(flux, rxn.lower_bound), min(0, rxn.upper_bound)
objective_vars.append(rxn.reverse_variable)
model.objective.set_linear_coefficients({v: 1.0 for v in objective_vars})
[docs]def loopless_solution(
model: "Model", fluxes: Optional[Dict[str, float]] = None
) -> "Solution":
"""Convert an existing solution to a loopless one.
Removes as many loops as possible (see Notes).
Uses the method from CycleFreeFlux [1]_ and is much faster than
`add_loopless` and should therefore be the preferred option to get
loopless flux distributions.
Parameters
----------
model : cobra.Model
The model to which to add the constraints.
fluxes : dict of {str, float}, optional
A dictionary having keys as reaction IDs and values as their flux
values. If not None will use the provided flux values to obtain a
close loopless solution (default None).
Returns
-------
cobra.Solution
A solution object containing the fluxes with the least amount of
loops possible or None if the optimization failed (usually happening
if the flux distribution in `fluxes` is infeasible).
Notes
-----
The returned flux solution has the following properties:
- It contains the minimal number of loops possible and no loops at all
if all flux bounds include zero.
- It has an objective value close to the original one and the same
objective value id the objective expression can not form a cycle
(which is usually true since it consumes metabolites).
- It has the same exact exchange fluxes as the previous solution.
- All fluxes have the same sign (flow in the same direction) as the
previous solution.
References
----------
.. [1] CycleFreeFlux: efficient removal of thermodynamically infeasible
loops from flux distributions. Desouki AA, Jarre F, Gelius-Dietrich
G, Lercher MJ. Bioinformatics. 2015 Jul 1;31(13):2159-65. doi:
10.1093/bioinformatics/btv096.
"""
# Need to reoptimize otherwise spurious solution artifacts can cause
# all kinds of havoc
# TODO: check solution status
if fluxes is None:
sol = model.optimize(objective_sense=None)
fluxes = sol.fluxes
with model:
prob = model.problem
# Needs one fixed bound for cplex...
loopless_obj_constraint = prob.Constraint(
model.objective.expression,
lb=-1e32,
name="loopless_obj_constraint",
)
model.add_cons_vars([loopless_obj_constraint])
_add_cycle_free(model, fluxes)
solution = model.optimize(objective_sense=None)
solution.objective_value = loopless_obj_constraint.primal
return solution
[docs]def loopless_fva_iter(
model: "Model",
reaction: "Reaction",
solution: bool = False,
zero_cutoff: Optional[float] = None,
) -> Union[float, Dict[str, float]]:
"""Plugin to get a loopless FVA solution from single FVA iteration.
Assumes the following about `model` and `reaction`:
1. The model objective is set to be `reaction`.
2. The model has been optimized and contains the minimum/maximum flux
for `reaction`.
3. The model contains an auxiliary variable called "fva_old_objective"
denoting the previous objective.
Parameters
----------
model : cobra.Model
The model to be used.
reaction : cobra.Reaction
The reaction currently minimized/maximized.
solution : bool, optional
Whether to return the entire solution or only the minimum/maximum
for `reaction` (default False).
zero_cutoff : positive float, optional
Cutoff used for loop removal. Fluxes with an absolute value smaller
than `zero_cutoff` are considered to be zero. The default is to use
`model.tolerance` (default None).
Returns
-------
single float or dict of {str: float}
Returns the minimized/maximized flux through `reaction` if
`solution` is False. Otherwise, returns a loopless flux
solution object containing the minimum/maximum flux for `reaction`.
"""
zero_cutoff = normalize_cutoff(model, zero_cutoff)
current = model.objective.value
sol = get_solution(model)
objective_dir = model.objective.direction
# Handle a suddenly infeasible solution,
# usually due to numerical instability
if current is None:
return None
# boundary reactions can not be part of cycles
if reaction.boundary:
if solution:
return sol
else:
return current
with model:
_add_cycle_free(model, sol.fluxes)
model.slim_optimize()
# If the previous optimum is maintained in the loopless solution it was
# loopless and we are done
if abs(reaction.flux - current) < zero_cutoff:
if solution:
return sol
return current
# If previous optimum was not in the loopless solution create a new
# almost loopless solution containing only loops including the current
# reaction. Than remove all of those loops.
ll_sol = get_solution(model).fluxes
reaction.bounds = (current, current)
model.slim_optimize()
almost_ll_sol = get_solution(model).fluxes
with model:
# find the reactions with loops using the current reaction and remove
# the loops
for rxn in model.reactions:
rid = rxn.id
if (abs(ll_sol[rid]) < zero_cutoff) and (
abs(almost_ll_sol[rid]) > zero_cutoff
):
rxn.bounds = max(0, rxn.lower_bound), min(0, rxn.upper_bound)
if solution:
best = model.optimize()
else:
model.slim_optimize()
best = reaction.flux
model.objective.direction = objective_dir
return best