import networkx as nx
import numpy as np
import obonet
import pandas as pd
from Bio.UniProt import GOA
from .base import Database
from ..utils.df import slice_adj
[docs]class Ontology(Database):
DELIM = "|"
def __init__(self,
path,
file_resources=None,
col_rename=None,
npartitions=0,
verbose=False):
"""
Manages dataset input processing from tables and construct an ontology network from .obo file. There ontology
network is G(V,E) where there exists e_ij for child i to parent j to present "node i is_a node j".
Args:
path:
file_resources:
col_rename:
npartitions:
verbose:
"""
self.network, self.node_list = self.load_network(file_resources)
super(Ontology, self).__init__(
path=path,
file_resources=file_resources,
col_rename=col_rename,
npartitions=npartitions,
verbose=verbose,
)
[docs] def load_network(self, file_resources) -> (nx.MultiDiGraph, list):
raise NotImplementedError
[docs] def get_adjacency_matrix(self, node_list):
if hasattr(self, "adjacency_matrix"):
adjacency_matrix = self.adjacency_matrix
else:
adjacency_matrix = nx.adj_matrix(self.network, nodelist=node_list)
self.adjacency_matrix = adjacency_matrix
if node_list is None or list(node_list) == list(self.node_list):
return adjacency_matrix
elif set(node_list) < set(self.node_list):
return slice_adj(adjacency_matrix, list(self.node_list), node_list,
None)
elif not (set(node_list) < set(self.node_list)):
raise Exception("A node in node_list is not in self.node_list.")
return adjacency_matrix
[docs] def filter_network(self, namespace):
raise NotImplementedError
[docs] def filter_annotation(self, annotation: pd.Series):
go_terms = set(self.node_list)
filtered_annotation = annotation.map(lambda x: list(set(x) & go_terms)
if isinstance(x, list) else [])
return filtered_annotation
[docs] def get_child_nodes(self):
adj = self.get_adjacency_matrix(self.node_list)
leaf_terms = self.node_list[np.nonzero(adj.sum(axis=0) == 0)[1]]
return leaf_terms
[docs] def get_root_nodes(self):
adj = self.get_adjacency_matrix(self.node_list)
parent_terms = self.node_list[np.nonzero(adj.sum(axis=1) == 0)[0]]
return parent_terms
[docs] def get_dfs_paths(self, root_nodes: list, filter_duplicates=False):
"""
Return all depth-first search paths from root node(s) to children node by traversing the ontology directed graph.
Args:
root_nodes (list): ["GO:0008150"] if biological processes, ["GO:0003674"] if molecular_function, or ["GO:0005575"] if cellular_component
filter_duplicates (bool): whether to remove duplicated paths that end up at the same leaf nodes
Returns: pd.DataFrame of all paths starting from the root nodes.
"""
if not isinstance(root_nodes, list):
root_nodes = list(root_nodes)
paths = list(dfs_path(self.network.reverse(copy=True), root_nodes))
paths = list(flatten_list(paths))
paths_df = pd.DataFrame(paths)
if filter_duplicates:
paths_df = paths_df[~paths_df.duplicated(keep="first")]
paths_df = filter_dfs_paths(paths_df)
return paths_df
[docs] def remove_predecessor_terms(self, annotation: pd.Series):
leaf_terms = self.get_child_nodes()
if not annotation.map(lambda x: isinstance(x, list)).any():
annotation = annotation.str.split(self.DELIM)
go_terms_parents = annotation.map(lambda x: list(
set(x) & set(leaf_terms)) if isinstance(x, list) else None)
return go_terms_parents
[docs] @staticmethod
def get_node_color(
file="~/Bioinformatics_ExternalData/GeneOntology/go_colors_biological.csv",
):
go_colors = pd.read_csv(file)
def selectgo(x):
terms = [term for term in x if isinstance(term, str)]
if len(terms) > 0:
return terms[-1]
else:
return None
go_colors["node"] = go_colors[[
col for col in go_colors.columns if col.isdigit()
]].apply(selectgo, axis=1)
go_id_colors = go_colors[go_colors["node"].notnull()].set_index(
"node")["HCL.color"]
go_id_colors = go_id_colors[~go_id_colors.index.duplicated(
keep="first")]
print(go_id_colors.unique().shape,
go_colors["HCL.color"].unique().shape)
return go_id_colors
[docs]class HumanPhenotypeOntology(Ontology):
pass
[docs]class GeneOntology(Ontology):
COLUMNS_RENAME_DICT = {
"DB_Object_Symbol": "gene_name",
"DB_Object_ID": "gene_id",
"GO_ID": "go_id",
}
def __init__(
self,
path="http://geneontology.org/gene-associations/",
file_resources=None,
col_rename=COLUMNS_RENAME_DICT,
npartitions=0,
verbose=False,
):
"""
Handles downloading the latest Gene Ontology obo and annotation data, preprocesses them. It provide
functionalities to create a directed acyclic graph of GO terms, filter terms, and filter annotations.
"""
if file_resources is None:
file_resources = {
"go-basic.obo":
"http://purl.obolibrary.org/obo/go/go-basic.obo",
"goa_human.gaf": "goa_human.gaf.gz",
"goa_human_rna.gaf": "goa_human_rna.gaf.gz",
"goa_human_isoform.gaf": "goa_human_isoform.gaf.gz",
}
super(GeneOntology, self).__init__(
path,
file_resources,
col_rename=col_rename,
npartitions=npartitions,
verbose=verbose,
)
[docs] def info(self):
print("network {}".format(nx.info(self.network)))
[docs] def load_dataframe(self, file_resources, npartitions=None):
go_annotation_dfs = []
for file in file_resources:
if ".gaf" in file:
go_lines = []
for line in GOA.gafiterator(file_resources[file]):
go_lines.append(line)
go_annotation_dfs.append(pd.DataFrame(go_lines))
go_annotations = pd.concat(go_annotation_dfs)
go_terms = pd.DataFrame.from_dict(self.network.nodes,
orient="index",
dtype="object")
go_annotations["go_name"] = go_annotations["GO_ID"].map(
go_terms["name"])
go_annotations["namespace"] = go_annotations["GO_ID"].map(
go_terms["namespace"])
go_annotations["is_a"] = go_annotations["GO_ID"].map(go_terms["is_a"])
return go_annotations
[docs] def load_network(self, file_resources):
for file in file_resources:
if ".obo" in file:
network = obonet.read_obo(file_resources[file])
# network = network.reverse(copy=True)
node_list = np.array(network.nodes)
return network, node_list
[docs] def filter_network(self, namespace):
"""
Filter the subgraph node_list to only `namespace` terms.
Args:
namespace: one of {"biological_process", "cellular_component", "molecular_function"}
"""
terms = self.data[self.data["namespace"] ==
namespace]["go_id"].unique()
print("{} terms: {}".format(namespace,
len(terms))) if self.verbose else None
self.network = self.network.subgraph(nodes=list(terms))
self.node_list = np.array(list(terms))
[docs] def get_predecessor_terms(self, annotation: pd.Series, type="is_a"):
go_terms_parents = annotation.map(lambda x: list({
parent
for term in x
for parent in list(nx.descendants(self.network, term))
}) if isinstance(x, list) else [
]) # flatten(self.traverse_predecessors(term, type))}) \
return go_terms_parents
[docs] def add_predecessor_terms(self, annotation: pd.Series, return_str=False):
if (annotation.dtypes == np.object
and annotation.str.contains("\||;", regex=True).any()):
go_terms_annotations = annotation.str.split("|")
else:
go_terms_annotations = annotation
go_terms_parents = go_terms_annotations + self.get_predecessor_terms(
annotation)
if return_str:
go_terms_parents = go_terms_parents.map(
lambda x: "|".join(x) if isinstance(x, list) else None)
return go_terms_parents
[docs]def traverse_predecessors(network, seed_node, type=["is_a", "part_of"]):
"""
Returns all successor terms from seed_node by traversing the ontology network with edges == `type`.
Args:
seed_node: seed node of the traversal
type: the ontology type to include
Returns:
generator of list of lists for each dfs branches.
"""
parents = dict(network.pred[seed_node])
for parent, v in parents.items():
if list(v.keys())[0] in type:
yield [parent] + list(traverse_predecessors(network, parent, type))
[docs]def flatten(lst):
return sum(([x] if not isinstance(x, list) else flatten(x) for x in lst),
[])
[docs]def dfs_path(graph, path):
node = path[-1]
successors = list(graph.successors(node))
if len(successors) > 0:
for child in successors:
yield list(dfs_path(graph, path + [child]))
else:
yield path
[docs]def flatten_list(list_in):
if isinstance(list_in, list):
for l in list_in:
if isinstance(list_in[0], list):
for y in flatten_list(l):
yield y
elif isinstance(list_in[0], str):
yield list_in
else:
yield list_in
[docs]def filter_dfs_paths(paths_df: pd.DataFrame):
idx = {}
for col in sorted(paths_df.columns[:-1], reverse=True):
idx[col] = ~(paths_df[col].notnull()
& paths_df[col].duplicated(keep="first")
& paths_df[col + 1].isnull())
idx = pd.DataFrame(idx)
paths_df = paths_df[idx.all(axis=1)]
return paths_df
[docs]def write_taxonomy(network, root_nodes, file_path):
"""
Args:
network: A network with edge(i, j) where i is a node and j is a child of i.
root_nodes (list): a list of node names
file_path (str):
"""
file = open(file_path, "a")
file.write("Root\t" + "\t".join(root_nodes) + "\n")
for root_node in root_nodes:
for node, children in nx.traversal.bfs_successors(network, root_node):
if len(children) > 0:
file.write(node + "\t" + "\t".join(children) + "\n")
file.close()