import io
import os
from typing import List, Union
import dask.dataframe as dd
import pandas as pd
import validators
from openomics.io.files import get_pkg_data_filename
BCR_PATIENT_BARCODE_COL = "bcr_patient_barcode"
HISTOLOGIC_SUBTYPE_COL = "histologic_subtype"
PATHOLOGIC_STAGE_COL = "pathologic_stage"
TUMOR_NORMAL_COL = 'tumor_normal'
PREDICTED_SUBTYPE_COL = 'predicted_subtype'
TUMOR = "Tumor"
NORMAL = "Normal"
__all__ = ['ClinicalData']
[docs]class ClinicalData:
"""This class manages the clinical data tables to handle the patient's
phenotype data, as well as the treatment, and sample data associated to each
patient.
"""
pathologic_stage_map = {'Stage IA': 'Stage I', 'Stage IB': 'Stage I',
'Stage IIA': 'Stage II', 'Stage IIB': 'Stage II',
'Stage IIIA': 'Stage III', 'Stage IIIB': 'Stage III'}
def __init__(self,
file: Union[str, io.StringIO, pd.DataFrame, dd.DataFrame],
patient_index: str,
columns: List[str] = None):
"""
Args:
file (str, io.StringIO, pd.DataFrame): either a path to the
patients clinical data file, or a DataFrame.
patient_index (str): the patient's ID column name
columns (List[str]): default None. Specifies the columns to import,
if None, then import all columns.
"""
# self.cohort_name = cohort_name
self.patient_column = patient_index
if columns and patient_index not in columns:
columns.append(patient_index)
if isinstance(file, (pd.DataFrame, dd.DataFrame)):
self.patient = file
elif isinstance(file, io.StringIO):
file.seek(0) # Needed since the file was previous read to extract columns information
self.patient = pd.read_table(file,
skiprows=[1, 2],
na_values=["[Not Available]", "[Unknown]", "[Not Applicable]",
"[Discrepancy]"],
usecols=columns
)
elif isinstance(file, str) and validators.url(file):
dataurl, filename = os.path.split(file)
file = get_pkg_data_filename(dataurl + "/", filename)
self.patient = pd.read_table(file)
elif isinstance(file, str) and os.path.isfile(file):
self.patient = pd.read_table(file,
skiprows=[1, 2],
na_values=["[Not Available]", "[Unknown]", "[Not Applicable]",
"[Discrepancy]"],
usecols=columns
)
else:
raise FileNotFoundError("{}".format(file))
self.patient_barcodes = self.patient[patient_index].tolist()
self.patient.set_index(patient_index, inplace=True)
# Rename columns
self.patient.rename({"ajcc_pathologic_tumor_stage": PATHOLOGIC_STAGE_COL,
"histological_type": HISTOLOGIC_SUBTYPE_COL,
"histologic_diagnosis.1": HISTOLOGIC_SUBTYPE_COL}, axis=1, inplace=True)
self.patient.replace({PATHOLOGIC_STAGE_COL: ClinicalData.pathologic_stage_map}, inplace=True)
[docs] @classmethod
def name(self):
"""Returns the name of the class, i.e. 'ClinicalData'"""
return self.__class__.__name__
[docs] def build_clinical_samples(self, all_samples, index="bcr_patient_barcode"):
"""Build table with samples clinical data from patients :param
all_samples:
Args:
all_samples:
index:
"""
self.samples = pd.DataFrame(index=all_samples)
self.samples.index.name = index
self.samples.index = self.samples.index.str[:-4] # Cut sample barcode for TCGA
# Merge patients clinical data with patient barcode as index
# target = pd.merge(target, self.patient,
# how="left", left_on="patient_barcode", right_on="patient_barcode")
self.samples = self.samples.join(self.patient, on=index, how="left", rsuffix="_")
# self.samples.dropna(axis=0, subset=["bcr_patient_barcode"], inplace=True) # Remove samples without clinical data
self.samples = self.samples[self.samples[PATHOLOGIC_STAGE_COL] != "[Discrepancy]"]
self.samples.loc[self.samples.index.str.contains(
"-11"), TUMOR_NORMAL_COL] = NORMAL # Change stage label of normal samples to "Normal"
self.samples.loc[self.samples.index.str.contains(
"-01"), TUMOR_NORMAL_COL] = TUMOR # Change stage label of normal samples to "Normal"
[docs] def add_drug_response_data(self, file_path="nationwidechildrens.org_clinical_drug.txt",
patient_column="bcr_patient_barcode",
columns=None,
drug_name_col=None, response_column=None):
"""
Args:
file_path:
patient_column:
columns:
drug_name_col:
response_column:
"""
if columns is None:
columns = ['bcr_patient_barcode', 'pharmaceutical_therapy_drug_name',
'pharmaceutical_therapy_type', 'treatment_best_response']
if not os.path.exists(file_path):
raise FileNotFoundError(file_path)
self.drug_name_col = drug_name_col
self.response_column = response_column
self.drugs = pd.read_table(file_path,
sep="\t",
skiprows=[1, 2],
na_values=["[Not Available]", "[Unknown]", "[Not Applicable]"],
usecols=columns
)
self.drugs.set_index(patient_column, inplace=True)
[docs] def add_biospecimen_data(self, file_path="genome.wustl.edu_biospecimen_sample.txt",
patient_col_name="bcr_patient_barcode",
columns=['bcr_sample_barcode', 'sample_type']):
"""
Args:
file_path:
patient_col_name:
columns:
"""
if not os.path.exists(file_path):
raise FileNotFoundError(file_path)
self.biospecimen = pd.read_table(file_path, sep="\t", skiprows=[1, ],
na_values=["[Not Available]", "[Unknown]", "[Not Applicable]"],
usecols=columns
)
self.sample_barcodes = self.biospecimen[patient_col_name].tolist()
self.biospecimen.set_index(patient_col_name, inplace=True)
[docs] def get_patient_barcodes(self):
return self.patient_barcodes
[docs] def get_sample_barcodes(self):
return self.sample_barcodes