Source code for cobra.util.solver

# -*- coding: utf-8 -*-

"""Additional helper functions for the optlang solvers.

All functions integrate well with the context manager, meaning that
all operations defined here are automatically reverted when used in a
`with model:` block.

The functions defined here together with the existing model functions should
allow you to implement custom flux analysis methods with ease.
"""

from __future__ import absolute_import

import re
from collections import namedtuple
from functools import partial
from types import ModuleType
from warnings import warn

import optlang
import pandas as pd
from optlang.interface import (
    FEASIBLE, INFEASIBLE, ITERATION_LIMIT, NUMERIC, OPTIMAL, SUBOPTIMAL,
    TIME_LIMIT)
from optlang.symbolics import Basic, Zero

from cobra.exceptions import (
    OPTLANG_TO_EXCEPTIONS_DICT, OptimizationError, SolverNotFound)
from cobra.util.context import get_context


# Define all the solvers that are found in optlang.
[docs]solvers = {match.split("_interface")[0]: getattr(optlang, match) for match in dir(optlang) if "_interface" in match}
# Defines all the QP solvers implemented in optlang.
[docs]qp_solvers = ["cplex", "gurobi"]
# optlang solution statuses which still allow retrieving primal values
[docs]has_primals = [NUMERIC, FEASIBLE, INFEASIBLE, SUBOPTIMAL, ITERATION_LIMIT, TIME_LIMIT]
[docs]def linear_reaction_coefficients(model, reactions=None): """Coefficient for the reactions in a linear objective. Parameters ---------- model : cobra model the model object that defined the objective reactions : list an optional list for the reactions to get the coefficients for. All reactions if left missing. Returns ------- dict A dictionary where the key is the reaction object and the value is the corresponding coefficient. Empty dictionary if there are no linear terms in the objective. """ linear_coefficients = {} reactions = model.reactions if not reactions else reactions try: objective_expression = model.solver.objective.expression coefficients = objective_expression.as_coefficients_dict() except AttributeError: return linear_coefficients for rxn in reactions: forward_coefficient = coefficients.get(rxn.forward_variable, 0) reverse_coefficient = coefficients.get(rxn.reverse_variable, 0) if forward_coefficient != 0: if forward_coefficient == -reverse_coefficient: linear_coefficients[rxn] = float(forward_coefficient) return linear_coefficients
[docs]def _valid_atoms(model, expression): """Check whether a sympy expression references the correct variables. Parameters ---------- model : cobra.Model The model in which to check for variables. expression : sympy.Basic A sympy expression. Returns ------- boolean True if all referenced variables are contained in model, False otherwise. """ atoms = expression.atoms(optlang.interface.Variable) return all(a.problem is model.solver for a in atoms)
[docs]def set_objective(model, value, additive=False): """Set the model objective. Parameters ---------- model : cobra model The model to set the objective for value : model.problem.Objective, e.g. optlang.glpk_interface.Objective, sympy.Basic or dict If the model objective is linear, the value can be a new Objective object or a dictionary with linear coefficients where each key is a reaction and the element the new coefficient (float). If the objective is not linear and `additive` is true, only values of class Objective. additive : boolmodel.reactions.Biomass_Ecoli_core.bounds = (0.1, 0.1) If true, add the terms to the current objective, otherwise start with an empty objective. """ interface = model.problem reverse_value = model.solver.objective.expression reverse_value = interface.Objective( reverse_value, direction=model.solver.objective.direction, sloppy=True) if isinstance(value, dict): if not model.objective.is_Linear: raise ValueError('can only update non-linear objectives ' 'additively using object of class ' 'model.problem.Objective, not %s' % type(value)) if not additive: model.solver.objective = interface.Objective( Zero, direction=model.solver.objective.direction) for reaction, coef in value.items(): model.solver.objective.set_linear_coefficients( {reaction.forward_variable: coef, reaction.reverse_variable: -coef}) elif isinstance(value, (Basic, optlang.interface.Objective)): if isinstance(value, Basic): value = interface.Objective( value, direction=model.solver.objective.direction, sloppy=False) # Check whether expression only uses variables from current model # clone the objective if not, faster than cloning without checking if not _valid_atoms(model, value.expression): value = interface.Objective.clone(value, model=model.solver) if not additive: model.solver.objective = value else: model.solver.objective += value.expression else: raise TypeError( '%r is not a valid objective for %r.' % (value, model.solver)) context = get_context(model) if context: def reset(): model.solver.objective = reverse_value model.solver.objective.direction = reverse_value.direction context(reset)
[docs]def interface_to_str(interface): """Give a string representation for an optlang interface. Parameters ---------- interface : string, ModuleType Full name of the interface in optlang or cobra representation. For instance 'optlang.glpk_interface' or 'optlang-glpk'. Returns ------- string The name of the interface as a string """ if isinstance(interface, ModuleType): interface = interface.__name__ return re.sub(r"optlang.|.interface", "", interface)
[docs]def get_solver_name(mip=False, qp=False): """Select a solver for a given optimization problem. Parameters ---------- mip : bool Does the solver require mixed integer linear programming capabilities? qp : bool Does the solver require quadratic programming capabilities? Returns ------- string The name of feasible solver. Raises ------ SolverNotFound If no suitable solver could be found. """ if len(solvers) == 0: raise SolverNotFound("no solvers installed") # Those lists need to be updated as optlang implements more solvers mip_order = ["gurobi", "cplex", "glpk"] lp_order = ["glpk", "cplex", "gurobi"] qp_order = ["gurobi", "cplex"] if mip is False and qp is False: for solver_name in lp_order: if solver_name in solvers: return solver_name # none of them are in the list order - so return the first one return list(solvers)[0] elif qp: # mip does not yet matter for this determination for solver_name in qp_order: if solver_name in solvers: return solver_name raise SolverNotFound("no qp-capable solver found") else: for solver_name in mip_order: if solver_name in solvers: return solver_name raise SolverNotFound("no mip-capable solver found")
[docs]def choose_solver(model, solver=None, qp=False): """Choose a solver given a solver name and model. This will choose a solver compatible with the model and required capabilities. Also respects model.solver where it can. Parameters ---------- model : a cobra model The model for which to choose the solver. solver : str, optional The name of the solver to be used. qp : boolean, optional Whether the solver needs Quadratic Programming capabilities. Returns ------- solver : an optlang solver interface Returns a valid solver for the problem. Raises ------ SolverNotFound If no suitable solver could be found. """ if solver is None: solver = model.problem else: model.solver = solver # Check for QP, raise error if no QP solver found if qp and interface_to_str(solver) not in qp_solvers: solver = solvers[get_solver_name(qp=True)] return solver
[docs]def add_cons_vars_to_problem(model, what, **kwargs): """Add variables and constraints to a Model's solver object. Useful for variables and constraints that can not be expressed with reactions and lower/upper bounds. Will integrate with the Model's context manager in order to revert changes upon leaving the context. Parameters ---------- model : a cobra model The model to which to add the variables and constraints. what : list or tuple of optlang variables or constraints. The variables or constraints to add to the model. Must be of class `model.problem.Variable` or `model.problem.Constraint`. **kwargs : keyword arguments passed to solver.add() """ context = get_context(model) model.solver.add(what, **kwargs) if context: context(partial(model.solver.remove, what))
[docs]def remove_cons_vars_from_problem(model, what): """Remove variables and constraints from a Model's solver object. Useful to temporarily remove variables and constraints from a Models's solver object. Parameters ---------- model : a cobra model The model from which to remove the variables and constraints. what : list or tuple of optlang variables or constraints. The variables or constraints to remove from the model. Must be of class `model.problem.Variable` or `model.problem.Constraint`. """ context = get_context(model) model.solver.remove(what) if context: context(partial(model.solver.add, what))
[docs]def add_absolute_expression(model, expression, name="abs_var", ub=None, difference=0, add=True): """Add the absolute value of an expression to the model. Also defines a variable for the absolute value that can be used in other objectives or constraints. Parameters ---------- model : a cobra model The model to which to add the absolute expression. expression : A sympy expression Must be a valid expression within the Model's solver object. The absolute value is applied automatically on the expression. name : string The name of the newly created variable. ub : positive float The upper bound for the variable. difference : positive float The difference between the expression and the variable. add : bool Whether to add the variable to the model at once. Returns ------- namedtuple A named tuple with variable and two constraints (upper_constraint, lower_constraint) describing the new variable and the constraints that assign the absolute value of the expression to it. """ Components = namedtuple('Components', ['variable', 'upper_constraint', 'lower_constraint']) variable = model.problem.Variable(name, lb=0, ub=ub) # The following constraints enforce variable > expression and # variable > -expression upper_constraint = model.problem.Constraint(expression - variable, ub=difference, name="abs_pos_" + name), lower_constraint = model.problem.Constraint(expression + variable, lb=difference, name="abs_neg_" + name) to_add = Components(variable, upper_constraint, lower_constraint) if add: add_cons_vars_to_problem(model, to_add) return to_add
[docs]def fix_objective_as_constraint(model, fraction=1, bound=None, name='fixed_objective_{}'): """Fix current objective as an additional constraint. When adding constraints to a model, such as done in pFBA which minimizes total flux, these constraints can become too powerful, resulting in solutions that satisfy optimality but sacrifices too much for the original objective function. To avoid that, we can fix the current objective value as a constraint to ignore solutions that give a lower (or higher depending on the optimization direction) objective value than the original model. When done with the model as a context, the modification to the objective will be reverted when exiting that context. Parameters ---------- model : cobra.Model The model to operate on fraction : float The fraction of the optimum the objective is allowed to reach. bound : float, None The bound to use instead of fraction of maximum optimal value. If not None, fraction is ignored. name : str Name of the objective. May contain one `{}` placeholder which is filled with the name of the old objective. Returns ------- The value of the optimized objective * fraction """ fix_objective_name = name.format(model.objective.name) if fix_objective_name in model.constraints: model.solver.remove(fix_objective_name) if bound is None: bound = model.slim_optimize(error_value=None) * fraction if model.objective.direction == 'max': ub, lb = None, bound else: ub, lb = bound, None constraint = model.problem.Constraint( model.objective.expression, name=fix_objective_name, ub=ub, lb=lb) add_cons_vars_to_problem(model, constraint, sloppy=True) return bound
[docs]def check_solver_status(status, raise_error=False): """Perform standard checks on a solver's status.""" if status == OPTIMAL: return elif (status in has_primals) and not raise_error: warn("solver status is '{}'".format(status), UserWarning) elif status is None: raise OptimizationError( "model was not optimized yet or solver context switched") else: raise OptimizationError("solver status is '{}'".format(status))
[docs]def assert_optimal(model, message='optimization failed'): """Assert model solver status is optimal. Do nothing if model solver status is optimal, otherwise throw appropriate exception depending on the status. Parameters ---------- model : cobra.Model The model to check the solver status for. message : str (optional) Message to for the exception if solver status was not optimal. """ status = model.solver.status if status != OPTIMAL: exception_cls = OPTLANG_TO_EXCEPTIONS_DICT.get( status, OptimizationError) raise exception_cls("{} ({})".format(message, status))
[docs]def add_lp_feasibility(model): """ Add a new objective and variables to ensure a feasible solution. The optimized objective will be zero for a feasible solution and otherwise represent the distance from feasibility (please see [1]_ for more information). Parameters ---------- model : cobra.Model The model whose feasibility is to be tested. References ---------- .. [1] Gomez, Jose A., Kai Höffner, and Paul I. Barton. “DFBAlab: A Fast and Reliable MATLAB Code for Dynamic Flux Balance Analysis.” BMC Bioinformatics 15, no. 1 (December 18, 2014): 409. https://doi.org/10.1186/s12859-014-0409-8. """ obj_vars = [] prob = model.problem for met in model.metabolites: s_plus = prob.Variable("s_plus_" + met.id, lb=0) s_minus = prob.Variable("s_minus_" + met.id, lb=0) model.add_cons_vars([s_plus, s_minus]) model.constraints[met.id].set_linear_coefficients( {s_plus: 1.0, s_minus: -1.0}) obj_vars.append(s_plus) obj_vars.append(s_minus) model.objective = prob.Objective(Zero, sloppy=True, direction="min") model.objective.set_linear_coefficients({v: 1.0 for v in obj_vars})
[docs]def add_lexicographic_constraints(model, objectives, objective_direction='max'): """ Successively optimize separate targets in a specific order. For each objective, optimize the model and set the optimal value as a constraint. Proceed in the order of the objectives given. Due to the specific order this is called lexicographic FBA [1]_. This procedure is useful for returning unique solutions for a set of important fluxes. Typically this is applied to exchange fluxes. Parameters ---------- model : cobra.Model The model to be optimized. objectives : list A list of reactions (or objectives) in the model for which unique fluxes are to be determined. objective_direction : str or list, optional The desired objective direction for each reaction (if a list) or the objective direction to use for all reactions (default maximize). Returns ------- optimized_fluxes : pandas.Series A vector containing the optimized fluxes for each of the given reactions in `objectives`. References ---------- .. [1] Gomez, Jose A., Kai Höffner, and Paul I. Barton. “DFBAlab: A Fast and Reliable MATLAB Code for Dynamic Flux Balance Analysis.” BMC Bioinformatics 15, no. 1 (December 18, 2014): 409. https://doi.org/10.1186/s12859-014-0409-8. """ if type(objective_direction) is not list: objective_direction = [objective_direction] * len(objectives) constraints = [] for rxn_id, obj_dir in zip(objectives, objective_direction): model.objective = model.reactions.get_by_id(rxn_id) model.objective_direction = obj_dir constraints.append(fix_objective_as_constraint(model)) return pd.Series(constraints, index=objectives)