# -*- coding: utf-8 -*-
"""Define the ModelSummary class."""
from __future__ import absolute_import
import logging
from collections import OrderedDict
from operator import attrgetter
import numpy as np
import pandas as pd
from six import iteritems, iterkeys
from cobra.core import get_solution
from cobra.core.summary import Summary
from cobra.flux_analysis.variability import flux_variability_analysis
from cobra.util.solver import linear_reaction_coefficients
[docs]logger = logging.getLogger(__name__)
[docs]class ModelSummary(Summary):
"""
Define the model summary.
See Also
--------
Summary : Parent that defines further attributes.
MetaboliteSummary
ReactionSummary
"""
def __init__(self, model, **kwargs):
"""
Initialize a model summary.
Parameters
----------
model : cobra.Model
The metabolic model for which to generate a summary.
Other Parameters
----------------
kwargs :
Further keyword arguments are passed on to the parent class.
See Also
--------
Summary : Parent that has further default parameters.
MetaboliteSummary
ReactionSummary
"""
super(ModelSummary, self).__init__(model=model, **kwargs)
[docs] def _generate(self):
"""
Returns
-------
flux_summary: pandas.DataFrame
The DataFrame of flux summary data.
"""
if self.names:
emit = attrgetter('name')
else:
emit = attrgetter('id')
obj_rxns = linear_reaction_coefficients(self.model)
boundary_rxns = self.model.exchanges
summary_rxns = set(obj_rxns.keys()).union(boundary_rxns)
if self.solution is None:
self.model.slim_optimize(error_value=None)
self.solution = get_solution(self.model, reactions=summary_rxns)
# the order needs to be maintained
ord_obj_rxns = OrderedDict(obj_rxns)
obj_data = [(emit(rxn), self.solution[rxn.id] * stoich, np.nan, rxn,
np.nan, 1.0) for rxn, stoich in iteritems(ord_obj_rxns)]
# create a DataFrame of objective reactions
obj_df = pd.DataFrame.from_records(
data=obj_data,
index=[rxn.id for rxn in iterkeys(ord_obj_rxns)],
columns=['id', 'flux', 'metabolite', 'reaction', 'is_input',
'is_obj_rxn']
)
# create a collection of metabolites from the boundary reactions
mets = OrderedDict((met, rxn) for rxn in boundary_rxns
for met in sorted(rxn.metabolites,
key=attrgetter('id')))
index = [met.id for met in iterkeys(mets)]
met_data = [(emit(met), self.solution[rxn.id] * rxn.metabolites[met],
met, rxn) for met, rxn in iteritems(mets)]
# create a DataFrame of metabolites
flux_summary = pd.DataFrame.from_records(
data=met_data,
index=index,
columns=['id', 'flux', 'metabolite', 'reaction']
)
# Calculate FVA results if requested
if self.fva is not None:
if len(index) != len(boundary_rxns):
logger.warning(
'There exists more than one boundary reaction per '
'metabolite. Please be careful when evaluating flux '
'ranges.')
if hasattr(self.fva, 'columns'):
fva_results = self.fva
else:
fva_results = flux_variability_analysis(
self.model, reaction_list=boundary_rxns,
fraction_of_optimum=self.fva
)
# save old index
old_index = flux_summary.index
# generate new index for consistency with fva_results
flux_summary['rxn_id'] = flux_summary.apply(
lambda x: x.reaction.id, axis=1
)
# change to new index
flux_summary.set_index(['rxn_id'], inplace=True)
flux_summary = pd.concat([flux_summary, fva_results],
axis=1, sort=False)
flux_summary.rename(columns={'maximum': 'fmax', 'minimum': 'fmin'},
inplace=True)
# revert to old index
flux_summary.set_index(old_index)
def set_min_and_max(row):
"""Scale and set proper min and max values for flux."""
fmax = row.reaction.metabolites[row.metabolite] * row.fmax
fmin = row.reaction.metabolites[row.metabolite] * row.fmin
if abs(fmin) <= abs(fmax):
row.fmax = fmax
row.fmin = fmin
else:
# Reverse fluxes
row.fmax = fmin
row.fmin = fmax
return row
flux_summary = flux_summary.apply(set_min_and_max, axis=1)
flux_summary = self._process_flux_dataframe(flux_summary)
flux_summary = pd.concat([flux_summary, obj_df], sort=False)
return flux_summary
[docs] def to_frame(self):
"""
Returns
-------
A pandas.DataFrame of the summary.
"""
flux_df = self._generate()
# obtain separate DataFrames and join them instead of reorganizing the
# old DataFrame, since that is easier to understand
if self.fva is not None:
column_names = ['IN_FLUXES-ID', 'IN_FLUXES-FLUX',
'IN_FLUXES-FLUX_MIN', 'IN_FLUXES-FLUX_MAX',
'OUT_FLUXES-ID', 'OUT_FLUXES-FLUX',
'OUT_FLUXES-FLUX_MIN', 'OUT_FLUXES-FLUX_MAX',
'OBJECTIVES-ID', 'OBJECTIVES-FLUX']
# generate separate DataFrames
met_in_df = flux_df[flux_df.is_input == 1.0]\
.loc[:, ['id', 'flux', 'fmin', 'fmax']].reset_index(drop=True)
met_out_df = flux_df[flux_df.is_input == 0.0]\
.loc[:, ['id', 'flux', 'fmin', 'fmax']].reset_index(drop=True)
obj_df = flux_df[flux_df.is_obj_rxn == 1.0]\
.loc[:, ['id', 'flux']].reset_index(drop=True)
# concatenate DataFrames
concat_df = pd.concat([met_in_df, met_out_df, obj_df], axis=1)
del met_in_df, met_out_df, obj_df
# generate column names
concat_df.columns = pd.MultiIndex.from_tuples(
[tuple(c.split('-')) for c in column_names]
)
else:
column_names = ['IN_FLUXES-ID', 'IN_FLUXES-FLUX',
'OUT_FLUXES-ID', 'OUT_FLUXES-FLUX',
'OBJECTIVES-ID', 'OBJECTIVES-FLUX']
# generate separate DataFrames
met_in_df = flux_df[flux_df.is_input == 1.0]\
.loc[:, ['id', 'flux']].reset_index(drop=True)
met_out_df = flux_df[flux_df.is_input == 0.0]\
.loc[:, ['id', 'flux']].reset_index(drop=True)
obj_df = flux_df[flux_df.is_obj_rxn == 1.0]\
.loc[:, ['id', 'flux']].reset_index(drop=True)
# concatenate DataFrames
concat_df = pd.concat([met_in_df, met_out_df, obj_df], axis=1)
del met_in_df, met_out_df, obj_df
# generate column names
concat_df.columns = pd.MultiIndex.from_tuples(
[tuple(c.split('-')) for c in column_names]
)
return concat_df
[docs] def _to_table(self):
"""
Returns
-------
A string of the summary table.
"""
return self.to_frame().to_string(header=True, index=False, na_rep='',
float_format=self.float_format,
sparsify=False, justify='center')