Source code for openomics.database.base

from typing import Dict, Tuple, List
import copy
import difflib
import gzip
import itertools
import logging
import os
import zipfile
from abc import ABC, abstractmethod
from typing import List

import dask.dataframe as dd
import filetype
import pandas as pd
import rarfile
import validators

from openomics.utils.df import concat_uniques
from openomics.utils.io import get_pkg_data_filename


[docs]class Database(object): """This is a base class used to instantiate an external Database given a a set of files from either local files or URLs. When creating a Database class, the `load_dataframe()` function is called where the file_resources are used to load (Pandas or Dask) DataFrames, then performs data wrangling to yield a dataframe at `self.data` . This class also provides an interface for -omics tables, e.g. `ExpressionData` , to annotate various annotations, expressions, sequences, and disease associations. """ COLUMNS_RENAME_DICT = None # Needs initialization since subclasses may use this field to rename columns in dataframes. def __init__( self, path, file_resources=None, col_rename=None, npartitions=None, verbose=False, ): """ Args: path: The folder or url path containing the data file resources. If url path, the files will be downloaded and cached to the user's home folder (at ~/.astropy/). file_resources: Used to list required files for preprocessing of the database. A dictionary where keys are required filenames and value are file paths. If None, then the class constructor should automatically build the required file resources dict. col_rename (dict): default None, A dictionary to rename columns in the data table. If None, then automatically load defaults. npartitions (int): [0-n], default 0 If 0, then uses a Pandas DataFrame, if >1, then creates an off-memory Dask DataFrame with n partitions verbose (bool): Default False. """ self.npartitions = npartitions self.verbose = verbose self.validate_file_resources(path, file_resources, npartitions=npartitions, verbose=verbose) self.data = self.load_dataframe(file_resources, npartitions=npartitions) self.data = self.data.reset_index() if col_rename is not None: self.data = self.data.rename(columns=col_rename) self.info() if verbose else None
[docs] def info(self): logging.info("{}: {}".format(self.name(), self.data.columns.tolist()))
[docs] def validate_file_resources(self, path, file_resources, npartitions=None, verbose=False) -> None: """For each file in file_resources, download the file if path+file is a URL or load from disk if a local path. Additionally unzip or unrar if the file is compressed. Args: path (str): The folder or url path containing the data file resources. If a url path, the files will be downloaded and cached to the user's home folder (at ~/.astropy/). file_resources (dict): default None, Used to list required files for preprocessing of the database. A dictionary where keys are required filenames and value are file paths. If None, then the class constructor should automatically build the required file resources dict. npartitions (int): >0 if the files will be used to create a Dask Dataframe. Default None. verbose: """ if validators.url(path): for filename, filepath in copy.copy(file_resources).items(): data_file = get_pkg_data_filename(path, filepath) # Download file and replace the file_resource path filepath_ext = filetype.guess(data_file) # This null if-clause is needed incase when filetype_ext is None, causing the next clauses to fail if filepath_ext is None: file_resources[filename] = data_file # Dask will automatically handle uncompression at dd.read_table(compression=filepath_ext) elif ".gtf" in filename and npartitions: file_resources[filename] = data_file elif filepath_ext.extension == "gz": logging.debug("Decompressed gzip file at {}".format(data_file)) file_resources[filename] = gzip.open(data_file, "rt") elif filepath_ext.extension == "zip": logging.debug("Decompressed zip file at {}".format(data_file)) zf = zipfile.ZipFile(data_file, "r") for subfile in zf.infolist(): if (os.path.splitext(subfile.filename)[-1] == os.path.splitext(filename)[-1] ): # If the file extension matches file_resources[filename] = zf.open( subfile.filename, mode="r") elif filepath_ext.extension == "rar": logging.debug("Decompressed rar file at {}".format(data_file)) rf = rarfile.RarFile(data_file, "r") for subfile in rf.infolist(): if (os.path.splitext(subfile.filename)[-1] == os.path.splitext(filename)[-1] ): # If the file extension matches file_resources[filename] = rf.open( subfile.filename, mode="r") else: file_resources[filename] = data_file elif os.path.isdir(path) and os.path.exists(path): for _, filepath in file_resources.items(): if not os.path.exists(filepath): raise IOError(filepath) else: raise IOError(path) self.data_path = path self.file_resources = file_resources logging.info("{} file_resources: {}".format(self.name(), file_resources))
[docs] def close(self): # Close opened file resources for filename, filepath in self.file_resources.items(): if type(self.file_resources[filename]) != str: self.file_resources[filename].close()
[docs] @abstractmethod def load_dataframe(self, file_resources:Dict[str, str], npartitions:int=None): """Handles data preprocessing given the file_resources input, and returns a DataFrame. Args: file_resources (dict): A dict with keys as filenames and values as full file path. npartitions (int): """ raise NotImplementedError
[docs] @classmethod def name(cls): return cls.__name__
[docs] @staticmethod def list_databases(): return DEFAULT_LIBRARIES
[docs] def get_annotations(self, index: str, columns: list, agg: str = "concat", filter_values: pd.Series = None): """Returns the Database's DataFrame such that it's indexed by :param index:, which then applies a groupby operation and aggregates all other columns by concatenating all unique values. Args: index (str): The column name of the DataFrame to join by. columns (list): a list of column names. agg (str): Function to aggregate when there is more than one values for each index instance. E.g. ['first', 'last', 'sum', 'mean', 'size', 'concat'], default 'concat'. filter_values (pd.Series): The values on the `index` column to filter before performing the groupby-agg operations. Returns: DataFrame: A dataframe to be used for annotation """ if not set(columns).issubset(set(self.data.columns)): raise Exception( "The columns argument must be a list such that it's subset of the following columns in the dataframe", "These columns doesn't exist in database:", set(columns) - set(self.data.columns.tolist()) ) # Select df columns including df. However the `columns` list shouldn't contain the index column if index in columns: columns.pop(columns.index(index)) df = self.data[columns + [index]] if filter_values is not None: df = df[df[index].isin(list(filter_values))] # if index != self.data.index.name and index in self.data.columns: # df = df.set_index(index) # Groupby index groupby = df.groupby(index) # Aggregate by all columns by concatenating unique values if agg == "concat": if isinstance(df, pd.DataFrame): aggregated = groupby.agg({col: concat_uniques for col in columns}) elif isinstance(df, dd.DataFrame): collect_concat = dd.Aggregation( name='collect_concat', chunk=lambda s1: s1.apply(list), agg=lambda s2: s2.apply(lambda chunks: filter( lambda x: False if x == "None" or x is None else True, set(itertools.chain.from_iterable(chunks)))), finalize=lambda s3: s3.apply(lambda xx: '|'.join(xx)) ) aggregated = groupby.agg({col: collect_concat for col in columns}) else: raise Exception("Unsupported dataframe: {}".format(df)) # Any other aggregation functions else: aggregated = groupby.agg({col: agg for col in columns}) # if aggregated.index.duplicated().sum() > 0: # raise ValueError("DataFrame must not have duplicates in index") return aggregated
[docs] def get_expressions(self, index): """ Args: index: """ return self.data.groupby(index).median( ) # TODO if index by gene, aggregate medians of transcript-level expressions
[docs]class Annotatable(ABC): """This abstract class provides an interface for the -omics (:class:`Expression`) to annotate its genes list with the external data downloaded from various databases. The database will be imported as attributes information to the genes's annotations, or interactions between the genes. """ SEQUENCE_COL_NAME = "sequence" DISEASE_ASSOCIATIONS_COL = "disease_associations"
[docs] def get_annotations(self): if hasattr(self, "annotations"): return self.annotations else: raise Exception( "{} must run initialize_annotations() first.".format( self.name()))
[docs] def get_annotation_expressions(self): if hasattr(self, "annotation_expressions"): return self.annotation_expressions else: raise Exception("{} must run annotate_expressions() first.".format( self.name()))
[docs] def initialize_annotations(self, index, gene_list=None): """ Args: index: gene_list: """ if gene_list is None: gene_list = self.get_genes_list() self.annotations = pd.DataFrame(index=gene_list) self.annotations.index.name = index
[docs] def annotate_attributes(self, database: Database, on: str, columns: List[str], agg: str = "concat", fuzzy_match: bool = False): """Performs a left outer join between the annotation and Database's DataFrame, on the index key. The index argument must be column present in both DataFrames. If there exists overlapping columns from the join, then .fillna() is used to fill NaN values in the old column with non-NaN values from the new column. Args: database (Database): Database which contains an dataframe. on (str): The column name which exists in both the annotations and Database dataframe to perform the join on. columns ([str]): a list of column name to join to the annotation. agg (str): Function to aggregate when there is more than one values for each index instance. E.g. ['first', 'last', 'sum', 'mean', 'concat'], default 'concat'. fuzzy_match (bool): default False. Whether to join the annotation by applying a fuzzy match on the index with difflib.get_close_matches(). It is very computationally expensive and thus should only be used sparingly. """ if not hasattr(self, "annotations"): raise Exception("Must run .initialize_annotations() on, ", self.__class__.__name__, " first.") if on in self.annotations.columns: filter_values = self.annotations[on] elif on == self.annotations.index.name: filter_values = self.annotations.index else: filter_values = None database_df = database.get_annotations(on, columns=columns, agg=agg, filter_values=filter_values) if len(database_df.columns) == 0: logging.warning("Database annotations is empty and has nothing to annotate.") return if fuzzy_match: database_df.index = database_df.index.map( lambda x: difflib.get_close_matches(x, self.annotations.index, n=1)[0]) # performing join on the index column if on == self.annotations.index.name and isinstance(database_df, pd.DataFrame): new_annotations = self.annotations.join(database_df, on=on, rsuffix="_") # elif on == self.annotations.index.name and isinstance(database_df, dd.DataFrame): # new_annotations = dd.merge(self.annotations, database_df, how="left", on=on, suffixes=("_", "")) # performing join on a different column else: # if isinstance(self.annotations.index, pd.MultiIndex): # old_index = self.annotations.index.names # else: # old_index = self.annotations.index.name # Save old index, reset the old index, set_index to the join index, perform the join, then change back to the old index # This also ensures the index in self.annotations aligns with the gene_index in self.expressions dataframes # TODO: Could be very slow on dask dataframes # new_annotations = self.annotations.reset_index() # new_annotations = new_annotations.set_index(on) # new_annotations = new_annotations.join( # database_df, on=on, rsuffix="_").reset_index() # new_annotations = new_annotations.set_index(old_index) new_annotations = dd.merge(self.annotations, database_df, how="left", on=on, suffixes=("_", "")) # Merge columns if the database DataFrame has overlapping columns with existing column duplicate_cols = [col for col in new_annotations.columns \ if col[-1] == "_"] # Fill in null values then drop duplicate columns for new_col in duplicate_cols: old_col = new_col.strip("_") new_annotations[old_col].fillna(new_annotations[new_col], inplace=True, axis=0) new_annotations = new_annotations.drop(columns=new_col) # Assign the new results self.annotations = new_annotations
[docs] def annotate_sequences(self, database, index, agg="longest", omic=None, **kwargs): """Annotate a genes list (based on index) with a dictionary of <gene_name: sequence>. If multiple sequences per gene name, then perform some aggregation. Args: database (Database): The database index (str): The gene index column name. agg (str): The aggregation method, one of ["longest", "shortest", or "all"]. Default longest. omic (str): Default None. Declare the omic type to fetch sequences for. **kwargs: """ if omic is None: omic = self.name() sequences_entries = database.get_sequences(index=index, omic=omic, agg_sequences=agg, **kwargs) if type(self.annotations.index) == pd.MultiIndex: self.annotations[ Annotatable.SEQUENCE_COL_NAME] = self.annotations.index.get_level_values( index).map(sequences_entries) else: self.annotations[Annotatable.SEQUENCE_COL_NAME] = self.annotations.index.map( sequences_entries)
[docs] def annotate_expressions(self, database, index, fuzzy_match=False): """Annotate :param database: :param index: :param fuzzy_match: Args: database: index: fuzzy_match: """ self.annotation_expressions = pd.DataFrame(index=self.annotations.index) if self.annotations.index.name == index: self.annotation_expressions = self.annotation_expressions.join( database.get_expressions(index=index)) else: raise Exception("index argument must be one of", database.data.index)
[docs] def annotate_interactions(self, database, index): """ Args: database (Interactions): index (str): """ raise NotImplementedError
[docs] def annotate_diseases(self, database, index): """ Args: database (DiseaseAssociation): index (str): """ self.annotations[Annotatable.DISEASE_ASSOCIATIONS_COL] = self.annotations.index.map( database.get_disease_assocs(index=index, ))
[docs] def set_index(self, new_index): """Resets :param new_index: :type new_index: str Args: new_index: """ self.annotations[new_index].fillna(self.annotations.index.to_series(), axis=0, inplace=True) self.annotations = self.annotations.reset_index().set_index(new_index)
[docs] def get_rename_dict(self, from_index, to_index): """Used to retrieve a lookup dictionary to convert from one index to another, e.g., gene_id to gene_name, obtained from two columns in the data frame. Returns Dict[str, str]: the lookup dictionary. Args: from_index (str): an index on the DataFrame for key to_index: """ if self.annotations.index.name in [from_index, to_index]: dataframe = self.annotations.reset_index() else: dataframe = self.annotations dataframe = dataframe[dataframe[to_index].notnull()] return pd.Series(dataframe[to_index].values, index=dataframe[from_index]).to_dict()
DEFAULT_LIBRARIES = [ "10KImmunomes" "BioGRID" "CCLE" "DisGeNET" "ENSEMBL" "GENCODE" "GeneMania" "GeneOntology" "GlobalBiobankEngine" "GTEx" "HMDD_miRNAdisease" "HPRD_PPI" "HUGO_Gene_names" "HumanBodyMapLincRNAs" "IntAct" "lncBase" "LNCipedia" "LncReg" "lncRInter" "lncrna2target" "lncRNA_data_repository" "lncrnadisease" "lncRNome" "mirbase" "miRTarBase" "NHLBI_Exome_Sequencing_Project" "NONCODE" "NPInter" "PIRD" "RegNetwork" "RISE_RNA_Interactions" "RNAcentral" "StarBase_v2.0" "STRING_PPI" "TargetScan" ]