Source code for openomics.utils.read_gtf

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function, division, absolute_import

import logging
from collections import OrderedDict
from os.path import exists

import dask.dataframe as dd
import numpy as np
import pandas as pd
from six import string_types
from six.moves import intern


def expand_attribute_strings(attribute_strings, quote_char='\"', nan_value=None, usecols=None):
    """The last column of GTF has a variable number of key value pairs of the
    format: "key1 value1; key2 value2;" Parse these into a dictionary mapping
    each key onto a list of values, where the value is None for any row where
    the key was missing.

    Args:
        attribute_strings (list of str):
        quote_char (str): Quote character to remove from values
        nan_value (any): If an attribute is missing from a row, give it this
            value.
        usecols (list of str or None): If not None, then only expand columns
            included in this set, otherwise use all columns.
    """
    n = len(attribute_strings)

    extra_columns = {}
    column_order = []

    #
    # SOME NOTES ABOUT THE BIZARRE STRING INTERNING GOING ON BELOW
    #
    # While parsing millions of repeated strings (e.g. "gene_id" and "TP53"),
    # we can save a lot of memory by making sure there's only one string
    # object per unique string. The canonical way to do this is using
    # the 'intern' function. One problem is that Py2 won't let you intern
    # unicode objects, so to get around this we call intern(str(...)).
    #
    # It also turns out to be faster to check interned strings ourselves
    # using a local dictionary, hence the two dictionaries below
    # and pair of try/except blocks in the loop.
    column_interned_strings = {}
    value_interned_strings = {}

    for (i, attribute_string) in enumerate(attribute_strings):
        for kv in attribute_string.split(";"):
            # We're slicing the first two elements out of split() because
            # Ensembl release 79 added values like:
            #   transcript_support_level "1 (assigned to previous version 5)";
            # ...which gets mangled by splitting on spaces.
            parts = kv.strip().split(" ", 2)[:2]

            if len(parts) != 2:
                continue

            column_name, value = parts

            try:
                column_name = column_interned_strings[column_name]
            except KeyError:
                column_name = intern(str(column_name))
                column_interned_strings[column_name] = column_name

            if usecols is not None and column_name not in usecols:
                continue

            try:
                column = extra_columns[column_name]
            except KeyError:
                column = [nan_value] * n
                extra_columns[column_name] = column
                column_order.append(column_name)

            value = value.replace(quote_char, "") if value.startswith(quote_char) else value

            try:
                value = value_interned_strings[value]
            except KeyError:
                value = intern(str(value))
                value_interned_strings[value] = value

            # if an attribute is used repeatedly then
            # keep track of all its values in a list
            old_value = column[i]
            if old_value is nan_value:
                column[i] = value
            else:
                column[i] = "%s,%s" % (old_value, value)

    return OrderedDict(
        (column_name, extra_columns[column_name])
        for column_name in column_order)


REQUIRED_COLUMNS = [
    "seqname",
    "source",
    "feature",
    "start",
    "end",
    "score",
    "strand",
    "frame",
    "attribute",
]

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def parse_gtf(filepath_or_buffer, chunksize=1024 * 1024, features=None,
              intern_columns=["seqname", "source", "strand", "frame"], fix_quotes_columns=["attribute"]):
    """
    Args:
        filepath_or_buffer (str or buffer object):
        chunksize (int): Default 1048576.
        features (set or None): Drop entries which aren't one of these features
        intern_columns (list): These columns are short strings which should be
            interned
        fix_quotes_columns (list): Most commonly the 'attribute' column which
            had broken quotes on some Ensembl release GTF files.
    """
    if features is not None:
        features = set(features)

    def parse_frame(s):
        if s == ".":
            return 0
        else:
            return int(s)

    # GTF columns:
    # 1) seqname: str ("1", "X", "chrX", etc...)
    # 2) source : str
    #      Different versions of GTF use second column as of:
    #      (a) gene biotype
    #      (b) transcript biotype
    #      (c) the annotation source
    #      See: https://www.biostars.org/p/120306/#120321
    # 3) feature : str ("gene", "transcript", &c)
    # 4) start : int
    # 5) end : int
    # 6) score : float or "."
    # 7) strand : "+", "-", or "."
    # 8) frame : 0, 1, 2 or "."
    # 9) attribute : key-value pairs separated by semicolons
    # (see more complete description in docstring at top of file)

    chunk_iterator = pd.read_csv(
        filepath_or_buffer,
        sep="\t",
        comment="#",
        names=REQUIRED_COLUMNS,
        skipinitialspace=True,
        skip_blank_lines=True,
        error_bad_lines=True,
        warn_bad_lines=True,
        chunksize=chunksize,
        engine="c",
        dtype={
            "start": np.int64,
            "end": np.int64,
            "score": np.float32,
            "seqname": str,
        },
        na_values=".",
        converters={"frame": parse_frame})

    dataframes = []
    try:
        for df in chunk_iterator:
            for intern_column in intern_columns:
                df[intern_column] = [intern(str(s)) for s in df[intern_column]]

            # compare feature strings after interning
            if features is not None:
                df = df[df["feature"].isin(features)]

            for fix_quotes_column in fix_quotes_columns:
                # Catch mistaken semicolons by replacing "xyz;" with "xyz"
                # Required to do this since the Ensembl GTF for Ensembl
                # release 78 has mistakes such as:
                #   gene_name = "PRAMEF6;" transcript_name = "PRAMEF6;-201"
                df[fix_quotes_column] = [
                    s.replace(';\"', '\"').replace(";-", "-")
                    for s in df[fix_quotes_column]
                ]
            dataframes.append(df)
    except Exception as e:
        raise e
        # raise Exception("ParsingError:" + str(e))

    df = pd.concat(dataframes)

    return df

def parse_gtf_dask(filepath_or_buffer, npartitions=None, compression=None, features=None):
    """
    Args:
        filepath_or_buffer (str or buffer object):
        npartitions (int): Number of partitions for the dask dataframe. Default None.
        compression (str): Compression type to be passed into dask.dataframe.read_table(). Default None.
        features (set or None): Drop entries which aren't one of these features
    """
    if features is not None:
        features = set(features)

    def parse_frame(s):
        if s == ".":
            return 0
        else:
            return int(s)

    # GTF columns:
    # 1) seqname: str ("1", "X", "chrX", etc...)
    # 2) source : str
    #      Different versions of GTF use second column as of:
    #      (a) gene biotype
    #      (b) transcript biotype
    #      (c) the annotation source
    #      See: https://www.biostars.org/p/120306/#120321
    # 3) feature : str ("gene", "transcript", &c)
    # 4) start : int
    # 5) end : int
    # 6) score : float or "."
    # 7) strand : "+", "-", or "."
    # 8) frame : 0, 1, 2 or "."
    # 9) attribute : key-value pairs separated by semicolons
    # (see more complete description in docstring at top of file)

    # Uses Dask
    logging.debug("dask.datafame.read_table, file={}, compression={}".format(filepath_or_buffer, compression))
    dataframe = dd.read_table(
        filepath_or_buffer,
        sep="\t",
        compression=compression,
        blocksize=None,
        comment="#",
        names=REQUIRED_COLUMNS,
        skipinitialspace=True,
        skip_blank_lines=True,
        error_bad_lines=True,
        warn_bad_lines=True,
        # chunksize=chunksize,
        engine="c",
        dtype={
            "start": np.int64,
            "end": np.int64,
            "score": np.float32,
            "seqname": str,
        },
        na_values=".",
        converters={"frame": parse_frame})

    return dataframe


def parse_gtf_and_expand_attributes(filepath_or_buffer, npartitions=None, compression=None, chunksize=1024 * 1024,
                                    restrict_attribute_columns=None, features=None):
    """Parse lines into column->values dictionary and then expand the
    'attribute' column into multiple columns. This expansion happens by
    replacing strings of semi-colon separated key-value values in the
    'attribute' column with one column per distinct key, with a list of values
    for each row (using None for rows where key didn't occur).

    Args:
        compression:
        filepath_or_buffer (str or buffer object):
        npartitions:
        chunksize (int):
        restrict_attribute_columns (list/set of str or None): If given, then only uses attribute columns.
        features (set or None): Ignore entries which don't correspond to one of the supplied features
    """
    if npartitions:
        df = parse_gtf_dask(filepath_or_buffer, npartitions=npartitions, compression=compression, features=features)
        df = df.reset_index(drop=False)
        df = df.set_index("index")

        attribute_values = df.pop("attribute")

        for column_name, values in expand_attribute_strings(attribute_values,
                                                            usecols=restrict_attribute_columns).items():
            series = dd.from_array(np.array(values, dtype=str))
            df[column_name] = series
    else:
        df = parse_gtf(filepath_or_buffer, chunksize=chunksize, features=features)

        attribute_values = df.pop("attribute")

        for column_name, values in expand_attribute_strings(attribute_values,
                                                            usecols=restrict_attribute_columns).items():
            df[column_name] = values

    return df


[docs]def read_gtf(filepath_or_buffer, npartitions=None, compression=None, expand_attribute_column=True, infer_biotype_column=False, column_converters={}, usecols=None, features=None, chunksize=1024 * 1024): """Parse a GTF into a dictionary mapping column names to sequences of values. Args: filepath_or_buffer (str or buffer object): Path to GTF file (may be gzip compressed) or buffer object such as StringIO npartitions (int): Number of partitions for the dask dataframe. Default None. compression (str): Compression type to be passed into dask.dataframe.read_table(). Default None. expand_attribute_column (bool): Replace strings of semi-colon separated key-value values in the 'attribute' column with one column per distinct key, with a list of values for each row (using None for rows where key didn't occur). infer_biotype_column (bool): Due to the annoying ambiguity of the second GTF column across multiple Ensembl releases, figure out if an older GTF's source column is actually the gene_biotype or transcript_biotype. column_converters (dict, optional): Dictionary mapping column names to conversion functions. Will replace empty strings with None and otherwise passes them to given conversion function. usecols (list of str or None): Restrict which columns are loaded to the give set. If None, then load all columns. features (set of str or None): Drop rows which aren't one of the features in the supplied set chunksize (int): """ if isinstance(filepath_or_buffer, string_types) and not exists(filepath_or_buffer): raise ValueError("GTF file does not exist: %s" % filepath_or_buffer) if expand_attribute_column: result_df = parse_gtf_and_expand_attributes(filepath_or_buffer, npartitions=npartitions, compression=compression, chunksize=chunksize, restrict_attribute_columns=usecols) else: if npartitions: result_df = parse_gtf(filepath_or_buffer, features=features) else: result_df = parse_gtf_dask(filepath_or_buffer, npartitions=npartitions, features=features, compression=compression) for column_name, column_type in list(column_converters.items()): result_df[column_name] = [ column_type(string_value) if len(string_value) > 0 else None for string_value in result_df[column_name] ] # Hackishly infer whether the values in the 'source' column of this GTF # are actually representing a biotype by checking for the most common # gene_biotype and transcript_biotype value 'protein_coding' if infer_biotype_column: unique_source_values = set(result_df["source"]) if "protein_coding" in unique_source_values: column_names = set(result_df.columns) # Disambiguate between the two biotypes by checking if # gene_biotype is already present in another column. If it is, # the 2nd column is the transcript_biotype (otherwise, it's the # gene_biotype) if "gene_biotype" not in column_names: logging.info("Using column 'source' to replace missing 'gene_biotype'") result_df["gene_biotype"] = result_df["source"] if "transcript_biotype" not in column_names: logging.info("Using column 'source' to replace missing 'transcript_biotype'") result_df["transcript_biotype"] = result_df["source"] if usecols is not None: column_names = set(result_df.columns) valid_columns = [c for c in usecols if c in column_names] result_df = result_df[valid_columns] return result_df