"""Additional helper functions for the optlang solvers.
All functions integrate well with the context manager, meaning that
all operations defined here are automatically reverted when used in a
`with model:` block.
The functions defined here together with the existing model functions
should allow you to implement custom flux analysis methods with ease.
"""
import re
from functools import partial
from types import ModuleType
from typing import TYPE_CHECKING, Dict, List, NamedTuple, Optional, Tuple, Union
from warnings import warn
import optlang
import pandas as pd
from optlang.interface import (
FEASIBLE,
INFEASIBLE,
ITERATION_LIMIT,
NUMERIC,
OPTIMAL,
SUBOPTIMAL,
TIME_LIMIT,
)
from optlang.symbolics import Basic, Zero
from cobra.exceptions import (
OPTLANG_TO_EXCEPTIONS_DICT,
OptimizationError,
SolverNotFound,
)
from cobra.util.context import get_context
# Used to avoid cyclic reference and enable third-party static type checkers to work
if TYPE_CHECKING:
from cobra import Model, Reaction
[docs]CONS_VARS = Union[optlang.interface.Constraint, optlang.interface.Variable]
# Define all the solvers that are found in optlang.
[docs]solvers = {
match.split("_interface")[0]: getattr(optlang, match)
for match in dir(optlang)
if "_interface" in match
}
# Defines all the QP solvers implemented in optlang.
[docs]qp_solvers = ["cplex", "gurobi"]
# optlang solution statuses which still allow retrieving primal values
[docs]has_primals = [NUMERIC, FEASIBLE, INFEASIBLE, SUBOPTIMAL, ITERATION_LIMIT, TIME_LIMIT]
[docs]class Components(NamedTuple):
"""Define an object for adding absolute expressions."""
[docs] variable: optlang.interface.Variable
[docs] upper_constraint: optlang.interface.Constraint
[docs] lower_constraint: optlang.interface.Constraint
[docs]def linear_reaction_coefficients(
model: "Model", reactions: Optional[List["Reaction"]] = None
) -> Dict["Reaction", float]:
"""Retrieve coefficient for the reactions in a linear objective.
Parameters
----------
model : cobra.Model
The cobra model defining the linear objective.
reactions : list of cobra.Reaction, optional
An optional list of the reactions to get the coefficients for.
By default, all reactions are considered (default None).
Returns
-------
dict
A dictionary where the keys are the reaction objects and the values
are the corresponding coefficient. Empty dictionary if there are no
linear terms in the objective.
"""
linear_coefficients = {}
reactions = model.reactions if not reactions else reactions
try:
objective_expression = model.solver.objective.expression
coefficients = objective_expression.as_coefficients_dict()
except AttributeError:
return linear_coefficients
for rxn in reactions:
forward_coefficient = coefficients.get(rxn.forward_variable, 0)
reverse_coefficient = coefficients.get(rxn.reverse_variable, 0)
if forward_coefficient != 0:
if forward_coefficient == -reverse_coefficient:
linear_coefficients[rxn] = float(forward_coefficient)
return linear_coefficients
[docs]def _valid_atoms(model: "Model", expression: optlang.symbolics.Basic) -> bool:
"""Check whether a sympy expression references the correct variables.
Parameters
----------
model : cobra.Model
The model in which to check for variables.
expression : sympy.Basic
A sympy expression.
Returns
-------
bool
True if all referenced variables are contained in model, False
otherwise.
"""
atoms = expression.atoms(optlang.interface.Variable)
return all(a.problem is model.solver for a in atoms)
[docs]def set_objective(
model: "Model",
value: Union[
optlang.interface.Objective,
optlang.symbolics.Basic,
Dict["Reaction", float],
],
additive: bool = False,
) -> None:
"""Set the model objective.
Parameters
----------
model : cobra.Model
The model to set the objective for.
value : optlang.interface.Objective, optlang.symbolics.Basic, dict
If the model objective is linear, then the value can be a new
optlang.interface.Objective or a dictionary with linear
coefficients where each key is a reaction and the corresponding
value is the new coefficient (float).
If the objective is non-linear and `additive` is True, then only
values of class optlang.interface.Objective, are accepted.
additive : bool
If True, add the terms to the current objective, otherwise start with
an empty objective.
Raises
------
ValueError
If model objective is non-linear and the `value` is a dict.
TypeError
If the type of `value` is not one of the accepted ones.
"""
interface = model.problem
reverse_value = model.solver.objective.expression
reverse_value = interface.Objective(
reverse_value, direction=model.solver.objective.direction, sloppy=True
)
if isinstance(value, dict):
if not model.objective.is_Linear:
raise ValueError(
"You can only update non-linear objectives additively using object of "
f"class optlang.interface.Objective, not of {type(value)}"
)
if not additive:
model.solver.objective = interface.Objective(
Zero, direction=model.solver.objective.direction
)
for reaction, coef in value.items():
model.solver.objective.set_linear_coefficients(
{reaction.forward_variable: coef, reaction.reverse_variable: -coef}
)
elif isinstance(value, (Basic, optlang.interface.Objective)):
if isinstance(value, Basic):
value = interface.Objective(
value, direction=model.solver.objective.direction, sloppy=False
)
# Check whether expression only uses variables from current model;
# clone the objective if not, faster than cloning without checking
if not _valid_atoms(model, value.expression):
value = interface.Objective.clone(value, model=model.solver)
if not additive:
model.solver.objective = value
else:
model.solver.objective += value.expression
else:
raise TypeError(f"{value} is not a valid objective for {model.solver}.")
context = get_context(model)
if context:
def reset():
model.solver.objective = reverse_value
model.solver.objective.direction = reverse_value.direction
context(reset)
[docs]def interface_to_str(interface: Union[str, ModuleType]) -> str:
"""Give a string representation for an optlang interface.
Parameters
----------
interface : str, ModuleType
Full name of the interface in optlang or cobra representation.
For instance, 'optlang.glpk_interface' or 'optlang-glpk'.
Returns
-------
str
The name of the interface as a string.
"""
if isinstance(interface, ModuleType):
interface = interface.__name__
return re.sub(r"optlang.|.interface", "", interface)
[docs]def get_solver_name(mip: bool = False, qp: bool = False) -> str:
"""Select a solver for a given optimization problem.
Parameters
----------
mip : bool
True if the solver requires mixed integer linear programming capabilities.
qp : bool
True if the solver requires quadratic programming capabilities.
Returns
-------
str
The name of the feasible solver.
Raises
------
SolverNotFound
If no suitable solver could be found.
"""
if len(solvers) == 0:
raise SolverNotFound("No solvers found.")
# Those lists need to be updated as optlang implements more solvers
mip_order = ["gurobi", "cplex", "glpk"]
lp_order = ["glpk", "cplex", "gurobi"]
qp_order = ["gurobi", "cplex"]
if mip is False and qp is False:
for solver_name in lp_order:
if solver_name in solvers:
return solver_name
# none of them are in the list order - so return the first one
return list(solvers)[0]
elif qp: # mip does not yet matter for this determination
for solver_name in qp_order:
if solver_name in solvers:
return solver_name
raise SolverNotFound("No QP-capable solver found.")
else:
for solver_name in mip_order:
if solver_name in solvers:
return solver_name
raise SolverNotFound("No MIP-capable solver found.")
[docs]def choose_solver(
model: "Model", solver: Optional[str] = None, qp: bool = False
) -> ModuleType:
"""Choose a solver given a solver name and model.
This will choose a solver compatible with the model and required
capabilities. Also respects model.solver where it can.
Parameters
----------
model : cobra.Model
The model for which to choose the solver.
solver : str, optional
The name of the solver to be used (default None).
qp : boolean, optional
True if the solver needs quadratic programming capabilities
(default False).
Returns
-------
optlang.interface
Valid solver for the problem.
Raises
------
SolverNotFound
If no suitable solver could be found.
"""
if solver is None:
solver = model.problem
else:
model.solver = solver
# Check for QP, raise error if no QP solver found
if qp and interface_to_str(solver) not in qp_solvers:
solver = solvers[get_solver_name(qp=True)]
return solver
[docs]def add_cons_vars_to_problem(
model: "Model",
what: Union[List[CONS_VARS], Tuple[CONS_VARS], Components],
**kwargs,
) -> None:
"""Add variables and constraints to a model's solver object.
Useful for variables and constraints that can not be expressed with
reactions and lower/upper bounds. It will integrate with the model's
context manager in order to revert changes upon leaving the context.
Parameters
----------
model : cobra.Model
The model to which to add the variables and constraints.
what : list or tuple of optlang.interface.Variable or
optlang.interface.Constraint
The variables and constraints to add to the model.
**kwargs : keyword arguments
Keyword arguments passed to solver's add() method.
"""
model.solver.add(what, **kwargs)
context = get_context(model)
if context:
context(partial(model.solver.remove, what))
[docs]def remove_cons_vars_from_problem(
model: "Model",
what: Union[List[CONS_VARS], Tuple[CONS_VARS], Components],
) -> None:
"""Remove variables and constraints from a model's solver object.
Useful to temporarily remove variables and constraints from a model's
solver object.
Parameters
----------
model : cobra.Model
The model from which to remove the variables and constraints.
what : list or tuple of optlang.interface.Variable or
optlang.interface.Constraint
The variables and constraints to remove from the model.
"""
model.solver.remove(what)
context = get_context(model)
if context:
context(partial(model.solver.add, what))
[docs]def add_absolute_expression(
model: "Model",
expression: str,
name: str = "abs_var",
ub: Optional[float] = None,
difference: float = 0.0,
add: bool = True,
) -> Components:
"""Add the absolute value of an expression to the model.
Also defines a variable for the absolute value that can be used in
other objectives or constraints.
Parameters
----------
model : cobra.Model
The model to which to add the absolute expression.
expression : str
Must be a valid symbolic expression within the model's solver object.
The absolute value is applied automatically on the expression.
name : str, optional
The name of the newly created variable (default "abs_var").
ub : positive float, optional
The upper bound for the variable (default None).
difference : positive float, optional
The difference between the expression and the variable
(default 0.0).
add : bool, optional
Whether to add the variable to the model at once (default True).
Returns
-------
Components
A named tuple with variable and two constraints (upper_constraint,
lower_constraint) describing the new variable and the constraints
that assign the absolute value of the expression to it.
"""
variable = model.problem.Variable(name, lb=0, ub=ub)
# The following constraints enforce variable > expression and
# variable > -expression
upper_constraint = model.problem.Constraint(
expression - variable, ub=difference, name="abs_pos_" + name
)
lower_constraint = model.problem.Constraint(
expression + variable, lb=difference, name="abs_neg_" + name
)
to_add = Components(variable, upper_constraint, lower_constraint)
if add:
add_cons_vars_to_problem(model, to_add)
return to_add
[docs]def fix_objective_as_constraint(
model: "Model",
fraction: float = 1.0,
bound: Optional[float] = None,
name: str = "fixed_objective_{}",
) -> float:
"""Fix current objective as an additional constraint.
When adding constraints to a model, such as done in pFBA which
minimizes total flux, these constraints can become too powerful,
resulting in solutions that satisfy optimality but sacrifices too
much for the original objective function. To avoid that, we can fix
the current objective value as a constraint to ignore solutions that
give a lower (or higher depending on the optimization direction)
objective value than the original model.
When done with the model as a context, the modification to the
objective will be reverted when exiting that context.
Parameters
----------
model : cobra.Model
The model to operate on.
fraction : float, optional
The fraction of the optimum the objective is allowed to reach
(default 1.0).
bound : float, optional
The bound to use instead of fraction of maximum optimal value.
If not None, `fraction` is ignored (default None).
name : str, optional
Name of the objective. May contain one "{}" placeholder which is
filled with the name of the old objective
(default "fixed_objective_{}").
Returns
-------
float
The value of the optimized objective * fraction
"""
fix_objective_name = name.format(model.objective.name)
if fix_objective_name in model.constraints:
model.solver.remove(fix_objective_name)
if bound is None:
bound = model.slim_optimize(error_value=None) * fraction
if model.objective.direction == "max":
ub, lb = None, bound
else:
ub, lb = bound, None
constraint = model.problem.Constraint(
model.objective.expression, name=fix_objective_name, ub=ub, lb=lb
)
add_cons_vars_to_problem(model, constraint, sloppy=True)
return bound
[docs]def check_solver_status(status: str = None, raise_error: bool = False) -> None:
"""Perform standard checks on a solver's status.
Parameters
----------
status: str, optional
The status string obtained from the solver (default None).
raise_error: bool, optional
If True, raise error or display warning if False (default False).
Returns
-------
None
Warns
-----
UserWarning
If `status` is not optimal and `raise_error` is set to True.
Raises
------
OptimizationError
If `status` is None or is not optimal and `raise_error` is set to
True.
"""
if status == OPTIMAL:
return None
elif (status in has_primals) and not raise_error:
warn(f"Solver status is '{status}'.", UserWarning)
elif status is None:
raise OptimizationError(
"Model is not optimized yet or solver context has been switched."
)
else:
raise OptimizationError(f"Solver status is '{status}'.")
[docs]def assert_optimal(model: "Model", message: str = "Optimization failed") -> None:
"""Assert model solver status is optimal.
Do nothing if model solver status is optimal, otherwise throw
appropriate exception depending on the status.
Parameters
----------
model : cobra.Model
The model to check the solver status for.
message : str, optional
Message for the exception if solver status is not optimal
(default "Optimization failed").
Returns
-------
None
Raises
------
OptimizationError
If solver status is not optimal.
"""
status = model.solver.status
if status != OPTIMAL:
exception_cls = OPTLANG_TO_EXCEPTIONS_DICT.get(status, OptimizationError)
raise exception_cls(f"{message} ({status}).")
[docs]def add_lp_feasibility(model: "Model") -> None:
"""Add a new objective and variables to ensure a feasible solution.
The optimized objective will be zero for a feasible solution and
otherwise represent the distance from feasibility (please see [1]_
for more information).
Parameters
----------
model : cobra.Model
The model whose feasibility is to be tested.
Returns
-------
None
References
----------
.. [1] Gomez, Jose A., Kai Höffner, and Paul I. Barton.
“DFBAlab: A Fast and Reliable MATLAB Code for Dynamic Flux Balance
Analysis.” BMC Bioinformatics 15, no. 1 (December 18, 2014): 409.
https://doi.org/10.1186/s12859-014-0409-8.
"""
obj_vars = []
prob = model.problem
for met in model.metabolites:
s_plus = prob.Variable("s_plus_" + met.id, lb=0)
s_minus = prob.Variable("s_minus_" + met.id, lb=0)
model.add_cons_vars([s_plus, s_minus])
model.constraints[met.id].set_linear_coefficients({s_plus: 1.0, s_minus: -1.0})
obj_vars.append(s_plus)
obj_vars.append(s_minus)
model.objective = prob.Objective(Zero, sloppy=True, direction="min")
model.objective.set_linear_coefficients({v: 1.0 for v in obj_vars})
[docs]def add_lexicographic_constraints(
model: "Model",
objectives: List["Reaction"],
objective_direction: Union[str, List[str]] = "max",
) -> pd.Series:
"""Successively optimize separate targets in a specific order.
For each objective, optimize the model and set the optimal value as a
constraint. Proceed in the order of the objectives given. Due to the
specific order this is called lexicographic FBA [1]_. This procedure
is useful for returning unique solutions for a set of important
fluxes. Typically this is applied to exchange fluxes.
Parameters
----------
model : cobra.Model
The model to be optimized.
objectives : list of cobra.Reaction
A list of reactions (or objectives) in the model for which unique
fluxes are to be determined.
objective_direction : str or list of str, optional
The desired objective direction for each reaction (if a list) or
the objective direction to use for all reactions (default "max").
Returns
-------
pandas.Series
A pandas Series containing the optimized fluxes for each of the
given reactions in `objectives`.
References
----------
.. [1] Gomez, Jose A., Kai Höffner, and Paul I. Barton.
“DFBAlab: A Fast and Reliable MATLAB Code for Dynamic Flux Balance
Analysis.” BMC Bioinformatics 15, no. 1 (December 18, 2014): 409.
https://doi.org/10.1186/s12859-014-0409-8.
"""
if type(objective_direction) is not list:
objective_direction = [objective_direction] * len(objectives)
constraints = []
for rxn_id, obj_dir in zip(objectives, objective_direction):
model.objective = model.reactions.get_by_id(rxn_id)
model.objective_direction = obj_dir
constraints.append(fix_objective_as_constraint(model))
return pd.Series(constraints, index=objectives)