import copy
import difflib
import logging
import os
import warnings
from abc import ABC, abstractmethod
from os.path import exists, join
from typing import Dict, Union, Any, Callable
from typing import List
from urllib.error import URLError
import dask.dataframe as dd
import filetype
import networkx as nx
import pandas as pd
import tqdm
import validators
from logzero import logger
from typing.io import IO
from ..io.files import get_pkg_data_filename, decompress_file
from ..transforms.agg import get_multi_aggregators, merge_concat
from ..transforms.df import drop_duplicate_columns, match_iterable_keys, has_iterables
__all__ = ['Database', 'Annotatable']
[docs]class Database(object):
"""This is a base class used to instantiate an external Database given a a set
of files from either local files or URLs. When creating a Database class, the
`load_dataframe()` function is called where the file_resources are used to
load (Pandas or Dask) DataFrames, then performs data wrangling to yield a
dataframe at `self.data` . This class also provides an interface for -omics
tables, e.g. `ExpressionData` , to annotate various annotations,
expressions, sequences, and disease associations.
"""
data: pd.DataFrame
COLUMNS_RENAME_DICT = None # Needs initialization since subclasses may use this field to rename columns in dataframes.
def __init__(self, path: str, file_resources: Dict = None, index_col=None, keys=None, usecols=None,
col_rename: Dict[str, str] = None, blocksize: int = None, verbose=False, **kwargs):
"""
Args:
path:
The folder or url path containing the data file resources. If
url path, the files will be downloaded and cached to the user's
home folder (at ~/.astropy/).
file_resources:
Used to list required files for preprocessing of the
database. A dictionary where keys are required filenames and
value are file paths. If None, then the class constructor should
automatically build the required file resources dict.
index_col: str of column name, default None.
If provided, then set_index() the dataframe at self.data by this
column name.
keys: a pd.Index or a List of str
If provided, then filter the rows in self.data with `index_col`
containing these values.
col_rename (dict): default None,
A dictionary to rename columns in the data table. If None, then
automatically load defaults.
usecols (list, optional): list of strings, default None.
If provided when loading the dataframes, only use a subset of these
columns, otherwise load all columns.
blocksize (int): str, int or None, optional. Default None to
Number of bytes by which to cut up larger files. Default value
is computed based on available physical memory and the number
of cores, up to a maximum of 64MB. Can be a number like 64000000
or a string like "64MB". If None, a single block is used for
each file.
verbose (bool): Default False.
"""
self.data_path = path
self.index_col = index_col
self.keys = keys.compute() if isinstance(keys, (dd.Index, dd.Series)) else keys
self.usecols = usecols
self.blocksize = blocksize
self.verbose = verbose
self.file_resources = self.load_file_resources(path, file_resources=file_resources, verbose=verbose)
self.data = self.load_dataframe(self.file_resources, blocksize=blocksize)
if self.data is not None and col_rename is not None:
self.data = self.data.rename(columns=col_rename)
if self.data.index.name in col_rename:
self.data.index = self.data.index.rename(col_rename[self.data.index.name])
def __repr__(self):
out = []
if hasattr(self, "data") and isinstance(self.data, (pd.DataFrame, dd.DataFrame)):
out.append("{}: {} {}".format(self.name(), self.data.index.name, self.data.columns.tolist()))
if hasattr(self, "network") and isinstance(self.network, nx.Graph):
out.append("{} {}".format(self.network.name, str(self.network)))
return "\n".join(out)
def load_file_resources(self, base_path: str, file_resources: Dict[str, str], verbose=False) -> Dict[str, Any]:
"""For each file in file_resources, download the file if path+file is a
URL or load from disk if a local path. Additionally unzip or unrar if
the file is compressed.
Args:
base_path (str): The folder or url path containing the data file
resources. If a url path, the files will be downloaded and
cached to the user's home folder (at ~/.astropy/).
file_resources (dict): default None, Used to list required files for
preprocessing of the database. A dictionary where keys are
required filenames and value are file paths. If None, then the
class constructor should automatically build the required file
resources dict.
verbose: default False
Whether to show progress bar of files being loaded
"""
file_resources_new = copy.copy(file_resources)
if base_path.startswith("~"):
base_path = os.path.expanduser(base_path)
files_prog = tqdm.tqdm(file_resources.items(), desc='Loading file_resources', disable=not verbose)
for name, filepath in files_prog:
if filepath.startswith("~"):
filepath = os.path.expanduser(filepath)
if verbose:
files_prog.set_description("Loading file_resources['{}']".format(name))
# Remote database file URL
if validators.url(filepath) or validators.url(join(base_path, filepath)):
try:
filepath = get_pkg_data_filename(base_path, filepath)
filepath_ext = filetype.guess(filepath)
except URLError as ue:
logger.error(f"{ue}. Skipping {filepath}")
continue
except TypeError as te:
filepath_ext = None
# Local database path
elif isinstance(filepath, str) and (exists(filepath) or exists(join(base_path, filepath))):
if not exists(filepath):
if exists(join(base_path, filepath)):
filepath = join(base_path, filepath)
else:
warnings.warn(f"`base_path` is a local file directory, so all file_resources must be local. "
f"Cannot use `filepath` = {filepath} with `base_path` = {base_path}")
continue
try:
filepath_ext = filetype.guess(filepath)
except:
filepath_ext = None
else:
# file_path is an external file outside of `base_path`
filepath_ext = None
# Update filepath on uncompressed file
file_resources_new[name] = filepath
if filepath_ext:
filestream, new_filename = decompress_file(filepath, name, file_ext=filepath_ext)
file_resources_new[new_filename] = filestream
logging.info(f"{self.name()} file_resources: {file_resources_new}")
return file_resources_new
[docs] @abstractmethod
def load_dataframe(self, file_resources: Dict[str, str], blocksize: int = None) -> pd.DataFrame:
"""Handles data preprocessing given the file_resources input, and
returns a DataFrame.
Args:
file_resources (dict): A dict with keys as filenames and values as
full file path.
blocksize (int):
"""
[docs] def close(self):
# Close file readers on file resources (from decompress_file)
for filename, filepath in self.file_resources.items():
if isinstance(filepath, IO) or hasattr(filepath, 'close'):
filepath.close()
[docs] @classmethod
def name(cls):
return cls.__name__
[docs] @staticmethod
def list_databases():
return DEFAULT_LIBRARIES
def get_mapper(self, col_a: str, col_b: str) -> pd.Series:
"""
Create a mapping between values from self.data['col_a'] to values in self.data['col_b']. If either 'col_a' or
'col_b' contain list-like data, then expand the values in these lists.
Args:
col_a ():
col_b ():
Returns:
pd.Series
"""
df: Union[pd.DataFrame, dd.DataFrame] = self.data.reset_index()[[col_a, col_b]].dropna()
if has_iterables(df[col_a]):
df = df.explode(col_a).dropna()
if has_iterables(df[col_b]):
df = df.explode(col_b).dropna()
mapping = df.set_index(col_a)[col_b]
return mapping
[docs] def get_annotations(self, on: Union[str, List[str]],
columns: List[str],
agg: str = "unique",
agg_for: Dict[str, Union[str, Callable, dd.Aggregation]] = None,
keys: pd.Index = None):
"""Returns the Database's DataFrame such that it's indexed by :param
index:, which then applies a groupby operation and aggregates all other
columns by concatenating all unique values.
Args:
on (str, list): The column name(s) of the DataFrame to group by.
columns (list): a list of column names to aggregate.
agg (str): Function to aggregate when there is more than one values
for each index key value. E.g. ['first', 'last', 'sum', 'mean',
'size', 'concat'], default 'concat'.
agg_for (Dict[str, Any]): Bypass the `agg` function for certain
columns with functions specified in this dict of column names
and the `agg` function to aggregate for that column.
keys (pd.Index): The values on the `index` column to
filter before performing the groupby-agg operations.
Returns:
values: An filted-groupby-aggregated dataframe to be used for annotation.
"""
if not set(columns).issubset(set(self.data.columns).union([self.data.index.name])):
raise Exception(
f"The columns argument must be a list such that it's subset of the following columns in the dataframe. "
f"These columns doesn't exist in `self.data`: {list(set(columns) - set(self.data.columns.tolist()))}"
)
elif len(set(columns)) < len(columns):
raise Exception(f"Duplicate values in `columns`: {columns}")
# Select df columns including df. However, the `columns` list shouldn't contain the index column
if on in columns:
columns = [col for col in columns if col not in on]
# All columns including `on` and `columns`
select_cols = columns + ([on] if not isinstance(on, list) else on)
if self.data.index.name in select_cols:
# Remove self.data's index_col since we can't select index from the df
index_col = select_cols.pop(select_cols.index(self.data.index.name))
else:
index_col = None
if isinstance(self.data, pd.DataFrame):
df = self.data.filter(select_cols, axis="columns")
elif isinstance(self.data, dd.DataFrame):
df = self.data[select_cols]
else:
raise Exception(f"{self} must have self.data as a pd.DataFrame or dd.DataFrame")
if index_col and df.index.name != index_col:
df[index_col] = self.data.index
# Filter rows in the database if provided `keys` in the `on` column.
if keys is not None:
if isinstance(keys, (dd.Series, dd.Index)):
keys = keys.compute()
if not has_iterables(keys.head()):
if on in df.columns:
df = df.loc[df[on].isin(keys)]
elif on == df.index.name:
df = df.loc[df.index.isin(keys)]
df = drop_duplicate_columns(df)
# Groupby includes column that was in the index
if on != df.index.name and df.index.name in columns:
groupby = df.reset_index().groupby(on)
# Groupby on index
elif on == df.index.name:
groupby = df.groupby(lambda x: x)
# Groupby on other columns
else:
groupby = df.groupby(on)
# Aggregate by all columns by concatenating unique values
agg_funcs = get_multi_aggregators(agg, agg_for=agg_for, use_dask=isinstance(df, dd.DataFrame))
values = groupby.agg({col: agg_funcs[col] for col in columns})
return values
[docs] def get_expressions(self, index):
"""
Args:
index:
"""
# TODO if index by gene, aggregate medians of transcript-level expressions
return self.data.groupby(index).median()
[docs]class Annotatable(ABC):
"""This abstract class provides an interface for the -omics
(:class:`Expression`) to annotate its genes list with the external data
downloaded from various databases. The database will be imported as
attributes information to the genes's annotations, or interactions between
the genes.
"""
SEQUENCE_COL = "sequence"
DISEASE_ASSOCIATIONS_COL = "disease_associations"
[docs] def get_annotations(self):
if hasattr(self, "annotations"):
return self.annotations
else:
raise Exception(
"{} must run initialize_annotations() first.".format(
self.name()))
[docs] def get_annotation_expressions(self):
if hasattr(self, "annotation_expressions"):
return self.annotation_expressions
else:
raise Exception("{} must run annotate_expressions() first.".format(
self.name()))
def init_annotations(self, index=None):
"""
Args:
index:
gene_list:
"""
if hasattr(self, 'annotations') and isinstance(self.annotations,
(pd.DataFrame, dd.DataFrame)) and not self.annotations.empty:
warnings.warn("Cannot initialize annotations because annotations already exists.")
return
if index is None:
index = self.get_genes_list()
self.annotations: pd.DataFrame = pd.DataFrame(index=index)
[docs] def annotate_attributes(self, database: Union[Database, pd.DataFrame], on: Union[str, List[str]],
columns: List[str], agg: str = "unique", agg_for: Dict[str, Any] = None,
fuzzy_match: bool = False, list_match=False):
"""Performs a left outer join between the annotation and Database's
DataFrame, on the keys in `on` column. The `on` argument must be column present
in both DataFrames. If there exists overlapping columns from the join,
then .fillna() is used to fill NaN values in the old column with non-NaN
values from the new column.
Args:
database (Database): Database which contains an dataframe.
on (str): The column name which exists in both the annotations and
Database dataframe to perform the join on.
columns ([str]): a list of column name to join to the annotation.
agg (str): Function to aggregate when there is more than one values
for each index instance. E.g. ['first', 'last', 'sum', 'mean',
'unique', 'concat'], default 'unique'.
agg_for (Dict[str, Any]): Bypass the `agg` function for certain
columns with functions specified in this dict of column names
and the `agg` function to aggregate for that column.
fuzzy_match (bool): default False.
Whether to join the annotation by applying a fuzzy match on the
string value index with difflib.get_close_matches(). It can be
slow and thus should only be used sparingly.
list_match (bool): default False.
"""
if not hasattr(self, "annotations"):
raise Exception("Must run .initialize_annotations() on, ", self.__class__.__name__, " first.")
left_df = self.annotations
if isinstance(on, str) and on in left_df.columns:
keys = left_df[on]
elif isinstance(on, list) and set(on).issubset(left_df.columns):
keys = left_df[on]
elif on == left_df.index.name:
keys = left_df.index
elif hasattr(left_df.index, 'names') and on in left_df.index.names:
# MultiIndex
keys = left_df.index.get_level_values(on)
else:
keys = None
# Get grouped values from `database`
if isinstance(database, (pd.DataFrame, dd.DataFrame)):
df = database
if on == df.index.name and on in df.columns:
df.pop(on) # Avoid ambiguous groupby col error
agg_funcs = get_multi_aggregators(agg=agg, agg_for=agg_for, use_dask=isinstance(df, dd.DataFrame))
if on == df.index.name or df.index.name in columns:
groupby = df.reset_index().groupby(on)
else:
groupby = df.groupby(on)
right_df = groupby.agg({col: agg_funcs[col] for col in columns})
else:
right_df = database.get_annotations(on, columns=columns, agg=agg, agg_for=agg_for, keys=keys)
# Match values between `self.annotations[on]` and `values.index`.
left_on = right_on = on
orig_keys = left_df.index
# Fuzzy match between string key values
if fuzzy_match:
left_on = right_df.index.map(
lambda x: difflib.get_close_matches(x, self.annotations.index, n=1)[0])
# Join on keys of 'list' values if they exist on either `left_keys` or `right_df.index`
elif list_match:
left_on, right_on = match_iterable_keys(left=keys, right=right_df.index)
# Set whether to join on index
left_index = True if isinstance(left_on, str) and left_df.index.name == left_on else False
right_index = True if isinstance(right_on, str) and right_df.index.name == right_on else False
if left_index:
left_on = None
if right_index:
right_on = None
# Performing join if `on` is already left_df's index
try:
if isinstance(left_df, type(right_df)) and left_index:
merged = left_df.join(right_df, on=on, how="left", rsuffix="_")
# Perform merge if `on` not index, and choose appropriate merge func depending on dask or pd DF
else:
if isinstance(left_df, pd.DataFrame) and isinstance(right_df, dd.DataFrame):
merged = dd.merge(left_df, right_df, how="left", left_on=left_on, left_index=left_index,
right_on=right_on, right_index=right_index, suffixes=("", "_"))
else:
merged = left_df.merge(right_df, how="left", left_on=left_on, left_index=left_index,
right_on=right_on, right_index=right_index, suffixes=("", "_"))
except Exception as e:
print('left_index', left_index)
print('left_on', left_on)
print('right_index', right_index)
print('right_on', right_on)
raise e
if list_match:
if 'key_0' in merged.columns:
merged = merged.drop(columns=['key_0'])
# If `left_on` was a modified keys, then revert the original keys
merged.index = orig_keys
# Merge columns if the database DataFrame has overlapping columns with existing column
duplicate_cols = {col: col.rstrip("_") for col in merged.columns if col.endswith("_")}
if duplicate_cols:
new_annotations = merged[list(duplicate_cols.keys())].rename(columns=duplicate_cols)
logger.info(f"merging {new_annotations.columns}")
# Combine new values with old values in overlapping columns
assign_fn = {old_col: merged[old_col].combine(merged[new_col], func=merge_concat) \
for new_col, old_col in duplicate_cols.items()}
merged = merged.assign(**assign_fn)
# then drop duplicate columns with "_" suffix
merged = merged.drop(columns=list(duplicate_cols.keys()))
# Revert back to Pandas DF if not previously a Dask DF
if isinstance(left_df, pd.DataFrame) and isinstance(merged, dd.DataFrame):
merged = merged.compute()
# Assign the new results
self.annotations = merged
return self
[docs] def annotate_sequences(self,
database,
on: Union[str, List[str]],
agg="longest",
omic=None,
**kwargs):
"""Annotate a genes list (based on index) with a dictionary of
<gene_name: sequence>. If multiple sequences per gene name, then perform
some aggregation.
Args:
database (SequenceDatabase): The database
on (str): The gene index column name.
agg (str): The aggregation method, one of ["longest", "shortest", or
"all"]. Default longest.
omic (str): Default None. Declare the omic type to fetch sequences
for.
**kwargs:
"""
if omic is None:
omic = self.name()
sequences = database.get_sequences(index=on, omic=omic, agg=agg, **kwargs)
# Map sequences to the keys of `on` columns.
if type(self.annotations.index) == pd.MultiIndex and self.annotations.index.names in on:
seqs = self.annotations.index.get_level_values(on).map(sequences)
elif self.annotations.index.name == on:
seqs = self.annotations.index.map(sequences)
elif isinstance(on, list):
# Index is a multi columns
seqs = pd.MultiIndex.from_frame(self.annotations.reset_index()[on]).map(sequences)
else:
seqs = pd.Index(self.annotations.reset_index()[on]).map(sequences)
if isinstance(self.annotations, dd.DataFrame) and isinstance(seqs, pd.Series):
seqs = dd.from_pandas(seqs, npartitions=self.annotations.npartitions)
self.annotations = self.annotations.assign(**{Annotatable.SEQUENCE_COL: seqs})
return self
[docs] def annotate_expressions(self, database, index, fuzzy_match=False):
"""
Args:
database:
index:
fuzzy_match:
"""
self.annotation_expressions = pd.DataFrame(index=self.annotations.index)
if self.annotations.index.name == index:
self.annotation_expressions = self.annotation_expressions.join(
database.get_expressions(index=index))
else:
raise Exception(f"index argument must be one of {database.data.index}")
return self
[docs] def annotate_interactions(self, database, index):
"""
Args:
database (Interactions):
index (str):
"""
raise NotImplementedError("Use HeteroNetwork from `moge` package instead")
[docs] def annotate_diseases(self, database, on: Union[str, List[str]]):
"""
Args:
database (DiseaseAssociation):
on (str):
"""
if on == self.annotations.index.name or (
hasattr(self.annotations.index, 'names') and on == self.annotations.index.names):
keys = self.annotations.index
else:
keys = self.annotations[on]
groupby_agg = database.get_disease_assocs(index=on, )
if isinstance(keys, (pd.DataFrame, pd.Series, pd.Index)):
if isinstance(keys, pd.DataFrame):
keys = pd.MultiIndex.from_frame(keys)
values = keys.map(groupby_agg)
elif isinstance(keys, dd.DataFrame):
values = keys.apply(lambda x: groupby_agg.loc[x], axis=1, meta=pd.Series([['']]))
elif isinstance(keys, dd.Series):
values = keys.map(groupby_agg)
else:
raise Exception()
self.annotations = self.annotations.assign(**{Annotatable.DISEASE_ASSOCIATIONS_COL: values})
[docs] def set_index(self, new_index):
"""Resets :param new_index: :type new_index: str
Args:
new_index:
"""
self.annotations[new_index].fillna(self.annotations.index.to_series(),
axis=0,
inplace=True)
self.annotations = self.annotations.reset_index().set_index(new_index)
[docs] def get_rename_dict(self, from_index, to_index):
"""
Utility function used to retrieve a lookup dictionary to convert from one index to
another, e.g., gene_id to gene_name, obtained from two columns in the dataframe.
Returns
Dict[str, str]: the lookup dictionary.
Args:
from_index (str): an index on the DataFrame for key
to_index:
"""
if self.annotations.index.name in [from_index, to_index]:
dataframe = self.annotations.reset_index()
else:
dataframe = self.annotations
dataframe = dataframe[dataframe[to_index].notnull()]
return pd.Series(dataframe[to_index].values,
index=dataframe[from_index]).to_dict()
DEFAULT_LIBRARIES = [
"10KImmunomes"
"BioGRID"
"CCLE"
"DisGeNET"
"ENSEMBL"
"GENCODE"
"GeneMania"
"GeneOntology"
"GlobalBiobankEngine"
"GTEx"
"HMDD_miRNAdisease"
"HPRD_PPI"
"HUGO_Gene_names"
"HumanBodyMapLincRNAs"
"IntAct"
"lncBase"
"LNCipedia"
"LncReg"
"lncRInter"
"lncrna2target"
"lncRNA_data_repository"
"lncrnadisease"
"lncRNome"
"mirbase"
"miRTarBase"
"NHLBI_Exome_Sequencing_Project"
"NONCODE"
"NPInter"
"PIRD"
"RegNetwork"
"RISE_RNA_Interactions"
"RNAcentral"
"StarBase_v2.0"
"STRING_PPI"
"TargetScan"
]