|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Functions for parsing various file formats.""" |
|
|
import collections |
|
|
import dataclasses |
|
|
import re |
|
|
import string |
|
|
from typing import Dict, Iterable, List, Optional, Sequence, Tuple |
|
|
|
|
|
DeletionMatrix = Sequence[Sequence[int]] |
|
|
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True) |
|
|
class TemplateHit: |
|
|
"""Class representing a template hit.""" |
|
|
index: int |
|
|
name: str |
|
|
aligned_cols: int |
|
|
sum_probs: float |
|
|
query: str |
|
|
hit_sequence: str |
|
|
indices_query: List[int] |
|
|
indices_hit: List[int] |
|
|
|
|
|
|
|
|
def parse_fasta(fasta_string: str) -> Tuple[Sequence[str], Sequence[str]]: |
|
|
"""Parses FASTA string and returns list of strings with amino-acid sequences. |
|
|
|
|
|
Arguments: |
|
|
fasta_string: The string contents of a FASTA file. |
|
|
|
|
|
Returns: |
|
|
A tuple of two lists: |
|
|
* A list of sequences. |
|
|
* A list of sequence descriptions taken from the comment lines. In the |
|
|
same order as the sequences. |
|
|
""" |
|
|
sequences = [] |
|
|
descriptions = [] |
|
|
index = -1 |
|
|
for line in fasta_string.splitlines(): |
|
|
line = line.strip() |
|
|
if line.startswith('>'): |
|
|
index += 1 |
|
|
descriptions.append(line[1:]) |
|
|
sequences.append('') |
|
|
continue |
|
|
elif not line: |
|
|
continue |
|
|
sequences[index] += line |
|
|
|
|
|
return sequences, descriptions |
|
|
|
|
|
|
|
|
def parse_stockholm( |
|
|
stockholm_string: str |
|
|
) -> Tuple[Sequence[str], DeletionMatrix, Sequence[str]]: |
|
|
"""Parses sequences and deletion matrix from stockholm format alignment. |
|
|
|
|
|
Args: |
|
|
stockholm_string: The string contents of a stockholm file. The first |
|
|
sequence in the file should be the query sequence. |
|
|
|
|
|
Returns: |
|
|
A tuple of: |
|
|
* A list of sequences that have been aligned to the query. These |
|
|
might contain duplicates. |
|
|
* The deletion matrix for the alignment as a list of lists. The element |
|
|
at `deletion_matrix[i][j]` is the number of residues deleted from |
|
|
the aligned sequence i at residue position j. |
|
|
* The names of the targets matched, including the jackhmmer subsequence |
|
|
suffix. |
|
|
""" |
|
|
name_to_sequence = collections.OrderedDict() |
|
|
for line in stockholm_string.splitlines(): |
|
|
line = line.strip() |
|
|
if not line or line.startswith(('#', '//')): |
|
|
continue |
|
|
name, sequence = line.split() |
|
|
if name not in name_to_sequence: |
|
|
name_to_sequence[name] = '' |
|
|
name_to_sequence[name] += sequence |
|
|
|
|
|
msa = [] |
|
|
deletion_matrix = [] |
|
|
|
|
|
query = '' |
|
|
keep_columns = [] |
|
|
for seq_index, sequence in enumerate(name_to_sequence.values()): |
|
|
if seq_index == 0: |
|
|
|
|
|
query = sequence |
|
|
keep_columns = [i for i, res in enumerate(query) if res != '-'] |
|
|
|
|
|
|
|
|
aligned_sequence = ''.join([sequence[c] for c in keep_columns]) |
|
|
|
|
|
msa.append(aligned_sequence) |
|
|
|
|
|
|
|
|
deletion_vec = [] |
|
|
deletion_count = 0 |
|
|
for seq_res, query_res in zip(sequence, query): |
|
|
if seq_res != '-' or query_res != '-': |
|
|
if query_res == '-': |
|
|
deletion_count += 1 |
|
|
else: |
|
|
deletion_vec.append(deletion_count) |
|
|
deletion_count = 0 |
|
|
deletion_matrix.append(deletion_vec) |
|
|
|
|
|
return msa, deletion_matrix, list(name_to_sequence.keys()) |
|
|
|
|
|
|
|
|
def parse_a3m(a3m_string: str) -> Tuple[Sequence[str], DeletionMatrix]: |
|
|
"""Parses sequences and deletion matrix from a3m format alignment. |
|
|
|
|
|
Args: |
|
|
a3m_string: The string contents of a a3m file. The first sequence in the |
|
|
file should be the query sequence. |
|
|
|
|
|
Returns: |
|
|
A tuple of: |
|
|
* A list of sequences that have been aligned to the query. These |
|
|
might contain duplicates. |
|
|
* The deletion matrix for the alignment as a list of lists. The element |
|
|
at `deletion_matrix[i][j]` is the number of residues deleted from |
|
|
the aligned sequence i at residue position j. |
|
|
""" |
|
|
sequences, _ = parse_fasta(a3m_string) |
|
|
deletion_matrix = [] |
|
|
for msa_sequence in sequences: |
|
|
deletion_vec = [] |
|
|
deletion_count = 0 |
|
|
for j in msa_sequence: |
|
|
if j.islower(): |
|
|
deletion_count += 1 |
|
|
else: |
|
|
deletion_vec.append(deletion_count) |
|
|
deletion_count = 0 |
|
|
deletion_matrix.append(deletion_vec) |
|
|
|
|
|
|
|
|
deletion_table = str.maketrans('', '', string.ascii_lowercase) |
|
|
aligned_sequences = [s.translate(deletion_table) for s in sequences] |
|
|
return aligned_sequences, deletion_matrix |
|
|
|
|
|
|
|
|
def _convert_sto_seq_to_a3m( |
|
|
query_non_gaps: Sequence[bool], sto_seq: str) -> Iterable[str]: |
|
|
for is_query_res_non_gap, sequence_res in zip(query_non_gaps, sto_seq): |
|
|
if is_query_res_non_gap: |
|
|
yield sequence_res |
|
|
elif sequence_res != '-': |
|
|
yield sequence_res.lower() |
|
|
|
|
|
|
|
|
def convert_stockholm_to_a3m(stockholm_format: str, |
|
|
max_sequences: Optional[int] = None) -> str: |
|
|
"""Converts MSA in Stockholm format to the A3M format.""" |
|
|
descriptions = {} |
|
|
sequences = {} |
|
|
reached_max_sequences = False |
|
|
|
|
|
for line in stockholm_format.splitlines(): |
|
|
reached_max_sequences = max_sequences and len(sequences) >= max_sequences |
|
|
if line.strip() and not line.startswith(('#', '//')): |
|
|
|
|
|
|
|
|
seqname, aligned_seq = line.split(maxsplit=1) |
|
|
if seqname not in sequences: |
|
|
if reached_max_sequences: |
|
|
continue |
|
|
sequences[seqname] = '' |
|
|
sequences[seqname] += aligned_seq |
|
|
|
|
|
for line in stockholm_format.splitlines(): |
|
|
if line[:4] == '#=GS': |
|
|
|
|
|
|
|
|
columns = line.split(maxsplit=3) |
|
|
seqname, feature = columns[1:3] |
|
|
value = columns[3] if len(columns) == 4 else '' |
|
|
if feature != 'DE': |
|
|
continue |
|
|
if reached_max_sequences and seqname not in sequences: |
|
|
continue |
|
|
descriptions[seqname] = value |
|
|
if len(descriptions) == len(sequences): |
|
|
break |
|
|
|
|
|
|
|
|
a3m_sequences = {} |
|
|
|
|
|
query_sequence = next(iter(sequences.values())) |
|
|
query_non_gaps = [res != '-' for res in query_sequence] |
|
|
for seqname, sto_sequence in sequences.items(): |
|
|
a3m_sequences[seqname] = ''.join( |
|
|
_convert_sto_seq_to_a3m(query_non_gaps, sto_sequence)) |
|
|
|
|
|
fasta_chunks = (f">{k} {descriptions.get(k, '')}\n{a3m_sequences[k]}" |
|
|
for k in a3m_sequences) |
|
|
return '\n'.join(fasta_chunks) + '\n' |
|
|
|
|
|
|
|
|
def _get_hhr_line_regex_groups( |
|
|
regex_pattern: str, line: str) -> Sequence[Optional[str]]: |
|
|
match = re.match(regex_pattern, line) |
|
|
if match is None: |
|
|
raise RuntimeError(f'Could not parse query line {line}') |
|
|
return match.groups() |
|
|
|
|
|
|
|
|
def _update_hhr_residue_indices_list( |
|
|
sequence: str, start_index: int, indices_list: List[int]): |
|
|
"""Computes the relative indices for each residue with respect to the original sequence.""" |
|
|
counter = start_index |
|
|
for symbol in sequence: |
|
|
if symbol == '-': |
|
|
indices_list.append(-1) |
|
|
else: |
|
|
indices_list.append(counter) |
|
|
counter += 1 |
|
|
|
|
|
|
|
|
def _parse_hhr_hit(detailed_lines: Sequence[str]) -> TemplateHit: |
|
|
"""Parses the detailed HMM HMM comparison section for a single Hit. |
|
|
|
|
|
This works on .hhr files generated from both HHBlits and HHSearch. |
|
|
|
|
|
Args: |
|
|
detailed_lines: A list of lines from a single comparison section between 2 |
|
|
sequences (which each have their own HMM's) |
|
|
|
|
|
Returns: |
|
|
A dictionary with the information from that detailed comparison section |
|
|
|
|
|
Raises: |
|
|
RuntimeError: If a certain line cannot be processed |
|
|
""" |
|
|
|
|
|
number_of_hit = int(detailed_lines[0].split()[-1]) |
|
|
name_hit = detailed_lines[1][1:] |
|
|
|
|
|
|
|
|
pattern = ( |
|
|
'Probab=(.*)[\t ]*E-value=(.*)[\t ]*Score=(.*)[\t ]*Aligned_cols=(.*)[\t' |
|
|
' ]*Identities=(.*)%[\t ]*Similarity=(.*)[\t ]*Sum_probs=(.*)[\t ' |
|
|
']*Template_Neff=(.*)') |
|
|
match = re.match(pattern, detailed_lines[2]) |
|
|
if match is None: |
|
|
raise RuntimeError( |
|
|
'Could not parse section: %s. Expected this: \n%s to contain summary.' % |
|
|
(detailed_lines, detailed_lines[2])) |
|
|
(prob_true, e_value, _, aligned_cols, _, _, sum_probs, |
|
|
neff) = [float(x) for x in match.groups()] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
query = '' |
|
|
hit_sequence = '' |
|
|
indices_query = [] |
|
|
indices_hit = [] |
|
|
length_block = None |
|
|
|
|
|
for line in detailed_lines[3:]: |
|
|
|
|
|
if (line.startswith('Q ') and not line.startswith('Q ss_dssp') and |
|
|
not line.startswith('Q ss_pred') and |
|
|
not line.startswith('Q Consensus')): |
|
|
|
|
|
|
|
|
|
|
|
patt = r'[\t ]*([0-9]*) ([A-Z-]*)[\t ]*([0-9]*) \([0-9]*\)' |
|
|
groups = _get_hhr_line_regex_groups(patt, line[17:]) |
|
|
|
|
|
|
|
|
|
|
|
start = int(groups[0]) - 1 |
|
|
delta_query = groups[1] |
|
|
end = int(groups[2]) |
|
|
num_insertions = len([x for x in delta_query if x == '-']) |
|
|
length_block = end - start + num_insertions |
|
|
assert length_block == len(delta_query) |
|
|
|
|
|
|
|
|
query += delta_query |
|
|
_update_hhr_residue_indices_list(delta_query, start, indices_query) |
|
|
|
|
|
elif line.startswith('T '): |
|
|
|
|
|
if (not line.startswith('T ss_dssp') and |
|
|
not line.startswith('T ss_pred') and |
|
|
not line.startswith('T Consensus')): |
|
|
|
|
|
|
|
|
|
|
|
patt = r'[\t ]*([0-9]*) ([A-Z-]*)[\t ]*[0-9]* \([0-9]*\)' |
|
|
groups = _get_hhr_line_regex_groups(patt, line[17:]) |
|
|
start = int(groups[0]) - 1 |
|
|
delta_hit_sequence = groups[1] |
|
|
assert length_block == len(delta_hit_sequence) |
|
|
|
|
|
|
|
|
hit_sequence += delta_hit_sequence |
|
|
_update_hhr_residue_indices_list(delta_hit_sequence, start, indices_hit) |
|
|
|
|
|
return TemplateHit( |
|
|
index=number_of_hit, |
|
|
name=name_hit, |
|
|
aligned_cols=int(aligned_cols), |
|
|
sum_probs=sum_probs, |
|
|
query=query, |
|
|
hit_sequence=hit_sequence, |
|
|
indices_query=indices_query, |
|
|
indices_hit=indices_hit, |
|
|
) |
|
|
|
|
|
|
|
|
def parse_hhr(hhr_string: str) -> Sequence[TemplateHit]: |
|
|
"""Parses the content of an entire HHR file.""" |
|
|
lines = hhr_string.splitlines() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
block_starts = [i for i, line in enumerate(lines) if line.startswith('No ')] |
|
|
|
|
|
hits = [] |
|
|
if block_starts: |
|
|
block_starts.append(len(lines)) |
|
|
for i in range(len(block_starts) - 1): |
|
|
hits.append(_parse_hhr_hit(lines[block_starts[i]:block_starts[i + 1]])) |
|
|
return hits |
|
|
|
|
|
|
|
|
def parse_e_values_from_tblout(tblout: str) -> Dict[str, float]: |
|
|
"""Parse target to e-value mapping parsed from Jackhmmer tblout string.""" |
|
|
e_values = {'query': 0} |
|
|
lines = [line for line in tblout.splitlines() if line[0] != '#'] |
|
|
|
|
|
|
|
|
|
|
|
for line in lines: |
|
|
fields = line.split() |
|
|
e_value = fields[4] |
|
|
target_name = fields[0] |
|
|
e_values[target_name] = float(e_value) |
|
|
return e_values |
|
|
|