Source code for cobra.io.web.biomodels_repository

"""Provide functions for loading metabolic models over the wire."""


import gzip
from io import BytesIO
from typing import List

import httpx
import pydantic

from .abstract_model_repository import AbstractModelRepository


[docs]class BioModelsFile(pydantic.BaseModel): """Define a single BioModels file description."""
[docs] name: str
[docs] size: int = pydantic.Field(alias="fileSize")
[docs]class BioModelsFilesResponse(pydantic.BaseModel): """Define the BioModels files JSON response."""
[docs] main: List[BioModelsFile] = []
[docs]class BioModels(AbstractModelRepository): """ Define a concrete implementation of the BioModels repository. Attributes ---------- name : str The name of the BioModels repository. """
[docs] name: str = "BioModels"
def __init__( self, **kwargs, ) -> None: """ Initialize a BioModels repository interface. Other Parameters ---------------- kwargs Passed to the parent constructor in order to enable multiple inheritance. """ super().__init__(url="https://www.ebi.ac.uk/biomodels/model/", **kwargs)
[docs] def get_sbml(self, model_id: str) -> bytes: """ Attempt to download an SBML document from the repository. Parameters ---------- model_id : str The identifier of the desired metabolic model. This is typically repository specific. Returns ------- bytes A gzip-compressed, UTF-8 encoded SBML document. Raises ------ httpx.HTTPError In case there are any connection problems. """ data = BytesIO() response = httpx.get( url=self._url.join(f"files/{model_id}"), headers={"Accept": "application/json"}, ) response.raise_for_status() files = BioModelsFilesResponse.parse_obj(response.json()) for model in files.main: if model.name.endswith("xml"): break else: RuntimeError(f"Could not find an SBML document for '{model_id}'.") with self._progress, httpx.stream( method="GET", url=self._url.join(f"download/{model_id}"), params={"filename": model.name}, ) as response: response.raise_for_status() task_id = self._progress.add_task( description="download", total=model.size, model_id=model_id, ) for chunk in response.iter_bytes(): data.write(chunk) self._progress.update(task_id=task_id, advance=len(chunk)) data.seek(0) return gzip.compress(data.read())