Spaces:
Runtime error
Runtime error
| # ANARCI - Antibody Numbering and Antigen Receptor ClassIfication | |
| # Copyright (C) 2016 Oxford Protein Informatics Group (OPIG) | |
| # | |
| # This program is free software: you can redistribute it and/or modify | |
| # it under the terms of the GNU General Public License as published by | |
| # the Free Software Foundation, either version 3 of the License, or | |
| # (at your option) any later version. | |
| # | |
| # This program is distributed in the hope that it will be useful, | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| # GNU General Public License for more details.# | |
| # | |
| # You should have received a copy of the GNU General Public License | |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
| ''' | |
| ANARCI - Antigen Receptor Numbering And ClassIfication | |
| Oxford Protein Informatics Group (OPIG). 2015-17 | |
| ANARCI performs alignments of sequences to databases of Hidden Markov Models (HMMs). | |
| Those that align with a significant score are classified by species and chain type. | |
| They are then numbered with a scheme of the user's choosing. | |
| Currently implemented schemes: | |
| IMGT | |
| Chothia (IGs only) | |
| Kabat (IGs only) | |
| Martin / Enhanced Chothia (IGs only) | |
| AHo | |
| Wolfguy (IGs only) | |
| Currently recognisable species (chains): | |
| Human (heavy, kappa, lambda, alpha, beta) | |
| Mouse (heavy, kappa, lambda, alpha, beta) | |
| Rat (heavy, kappa, lambda) | |
| Rabbit (heavy, kappa, lambda) | |
| Pig (heavy, kappa, lambda) | |
| Rhesus Monkey (heavy, kappa) | |
| Notes: | |
| o Use assign_germline to get a better species assignment | |
| o Each scheme has been implemented to follow the published specification as closely as possible. However, in places some schemes | |
| do not specifiy where insertions should be placed (e.g. imgt FW3). In these cases the HMM alignment is used. This can give rise | |
| to inserted positions that were not described by the respective paper. | |
| o AHo is implemented heuristically based on chain type. If one grafted a foreign CDR1 loop onto, say, a VH domain, it will be | |
| numbered as if it is a CDRH1 loop. | |
| ''' | |
| import os | |
| import sys | |
| import tempfile | |
| import gzip | |
| import math | |
| from functools import partial | |
| from textwrap import wrap | |
| from subprocess import Popen, PIPE | |
| from itertools import groupby, islice | |
| from multiprocessing import Pool | |
| from Bio.SearchIO.HmmerIO import Hmmer3TextParser as HMMERParser | |
| # Import from the schemes submodule | |
| from .schemes import * | |
| from .germlines import all_germlines | |
| all_species = list(all_germlines['V']['H'].keys()) | |
| amino_acids = sorted(list("QWERTYIPASDFGHKLCVNM")) | |
| set_amino_acids = set(amino_acids) | |
| anarci_path = os.path.split(__file__)[0] | |
| scheme_short_to_long = { "m":"martin", "c":"chothia", "k":"kabat","imgt":"imgt", "kabat":"kabat", "chothia":"chothia", "martin":"martin", "i":"imgt", "a":"aho","aho":"aho","wolfguy":"wolfguy", "w":"wolfguy"} | |
| scheme_names = list(scheme_short_to_long.keys()) | |
| chain_type_to_class = {"H":"H", "K":"L", "L":"L", "A":"A", "B":"B", "G":"G", "D":"D"} | |
| HMM_path = os.path.join( anarci_path, "dat", "HMMs" ) | |
| all_reference_states = list(range( 1, 129)) # These are the IMGT reference states (matches) | |
| class HMMscanError(Exception): | |
| def __init__(self, message): | |
| # Call the base class constructor with the parameters it needs | |
| super(HMMscanError, self).__init__(message) | |
| ## Utility functions ## | |
| def read_fasta(filename): | |
| """ | |
| Read a sequence file and parse as description, string | |
| """ | |
| return [ r for r in fasta_iter(filename) ] | |
| def fasta_iter(fasta_name): | |
| """ | |
| Given a fasta file. yield tuples of header, sequence | |
| https://www.biostars.org/p/710/ | |
| """ | |
| if fasta_name.endswith( '.gz' ): # IOError raised upon iteration if not a real gzip file. | |
| fh = gzip.open(fasta_name) | |
| else: | |
| fh = open(fasta_name) | |
| faiter = (x[1] for x in groupby(fh, lambda line: line[0] == ">")) | |
| for header in faiter: | |
| header = next(header)[1:].strip() | |
| #header = header.next()[1:].strip() | |
| seq = "".join(s.strip() for s in next(faiter)) | |
| yield header, seq | |
| def write_fasta(sequences, f): | |
| """ | |
| Write a list of sequences to file. | |
| should be a list of name, sequence tuples | |
| f should be an open file | |
| """ | |
| for name, sequence in sequences: | |
| print(">%s"%name, file=f) | |
| print('\n'.join(['\n'.join(wrap(block, width=80)) for block in sequence.splitlines()]), file=f) | |
| def validate_sequence(sequence): | |
| """ | |
| Check whether a sequence is a protein sequence or if someone has submitted something nasty. | |
| """ | |
| assert len(sequence) < 10000, "Sequence too long." | |
| assert not (set( sequence.upper() ) - set_amino_acids ), "Unknown amino acid letter found in sequence: %s"% ", ".join(list((set( sequence.upper() ) - set_amino_acids ))) | |
| return True | |
| def validate_numbering(xxx_todo_changeme, name_seq=[]): | |
| """ | |
| Wrapper to do some basic validation of the numbering. | |
| Further validation could be done but at the moment we just check that the numbering indices are incremental (they should be) | |
| """ | |
| (numbering, start, end) = xxx_todo_changeme | |
| name, seq = name_seq | |
| last = -1 | |
| nseq="" | |
| for (index, _), a in numbering: | |
| assert index >= last, "Numbering was found to decrease along the sequence %s. Please report."%name | |
| last = index | |
| nseq += a.replace("-","") | |
| assert nseq in seq.replace("-",""), "The algorithm did not number a contiguous segment for sequence %s. Please report"%name | |
| return numbering, start, end | |
| def grouper(n, iterable): | |
| ''' | |
| Group entries of an iterable by n | |
| ''' | |
| it = iter(iterable) | |
| def take(): | |
| while 1: | |
| yield list( islice(it,n) ) | |
| return iter(take().__next__, [] ) | |
| def anarci_output(numbered, sequences, alignment_details, outfile, sequence_id=None, domain_id=None): | |
| """ | |
| Outputs to open file | |
| If sequence_id is specified as an integer then only this sequence will be printed. | |
| Otherwise all sequences will be printed. | |
| If domain_id is specified as an integer then only this domain will be printed. | |
| Otherwise all domains will be printed. | |
| If domain_id is specified then sequence_id must also be specified. | |
| """ | |
| assert (sequence_id is not None) or (sequence_id is None and domain_id is None), "If domain_id is specified, sequence_id must also be specified." | |
| for i in range(len(numbered)): | |
| if sequence_id is None: | |
| print("# %s"%sequences[i][0], file=outfile) # print the name | |
| if numbered[i] is not None: | |
| if sequence_id is not None: | |
| if i != sequence_id: continue | |
| print("# ANARCI numbered", file=outfile) | |
| for j in range( len(numbered[i])): # Iterate over domains | |
| if domain_id is not None: | |
| if j != domain_id: continue | |
| print("# Domain %d of %d"%(j+1, len(numbered[i]) ), file=outfile) | |
| print("# Most significant HMM hit", file=outfile) | |
| print("#|species|chain_type|e-value|score|seqstart_index|seqend_index|", file=outfile) | |
| alignment_details[i][j]["evalue"] = str( alignment_details[i][j]["evalue"] ) | |
| print("#|%s|%s|%s|%.1f|%d|%d|"%tuple( [alignment_details[i][j][field] for field in | |
| ["species","chain_type","evalue","bitscore"]] | |
| +[ numbered[i][j][1], numbered[i][j][2]] ), file=outfile) | |
| if 'germlines' in alignment_details[i][j]: | |
| print('# Most sequence-identical germlines', file=outfile) | |
| print('#|species|v_gene|v_identity|j_gene|j_identity|', file=outfile) | |
| (species, vgene), vid =alignment_details[i][j]['germlines'].get('v_gene', [['','unknown'],0]) | |
| if vgene is None: | |
| vgene, vid = 'unknown', 0 | |
| (_,jgene), jid =alignment_details[i][j]['germlines'].get('j_gene', [['','unknown'],0]) | |
| if jgene is None: | |
| jgene, jid = 'unknown', 0 | |
| print('#|%s|%s|%.2f|%s|%.2f|'%(species, vgene, vid, jgene, jid ), file=outfile) | |
| chain_type = chain_type_to_class[ alignment_details[i][j]["chain_type"] ] | |
| print("# Scheme = %s"%alignment_details[i][j]["scheme"], file=outfile) | |
| if len( numbered[i][j][0] ) == 0: | |
| print("# Warning: %s scheme could not be applied to this sequence."%alignment_details[i][j]["scheme"], file=outfile) | |
| for (index, insertion), aa in numbered[i][j][0]: | |
| print(chain_type, ("%d"%index).ljust(5), insertion, aa, file=outfile) | |
| print("//", file=outfile) | |
| def csv_output(sequences, numbered, details, outfileroot): | |
| ''' | |
| Write numbered sequences to csv files. A csv file is written for each chain type. | |
| Kappa and Lambda chains are written to the same file | |
| The sequences will written aligned to the numbering scheme. Gaps in the sequences with respect to the alignment are written | |
| as a '-' | |
| @param sequences: List of name, sequence tuples | |
| @param numbered: Numbered sequences in the same order as the sequences list. | |
| @param details: List of alignment details in the same order as the sequences list. | |
| @param outfileroot: The file path for csv files to write. _<chain_type>.csv will be appended to this. | |
| ''' | |
| chain_types = {} | |
| pos_ranks = {} | |
| all_pos = {} | |
| _lc = {'K':'KL','L':'KL'} | |
| # Divide the set into chain types and find how to order the numbering for each type. | |
| for i in range( len(sequences) ): # Iterate over entries | |
| if numbered[i] is None: continue | |
| for j in range(len(numbered[i])): # Iterate over domains. | |
| # Record the chain type index | |
| c = details[i][j]['chain_type'] | |
| c = _lc.get(c, c) # Consider lambda and kappa together. | |
| chain_types.setdefault( c, [] ).append( (i,j) ) | |
| if c not in pos_ranks: | |
| pos_ranks[c] = {} | |
| all_pos[c] = set() | |
| # Update the insertion order for the scheme. i.e. is it A B C or C B A (e.g. imgt 111 and 112 repectively) | |
| l = -1 | |
| r = 0 | |
| for p, _ in numbered[i][j][0]: | |
| if p[0] != l: | |
| l = p[0] | |
| r = 0 | |
| else: | |
| r +=1 | |
| pos_ranks[c][p] = max( r, pos_ranks[c].get( p, r ) ) | |
| all_pos[c].add( p ) | |
| # Write a new file for each chain type. Kappa and lambda are written together as light chains. | |
| for cts in ['H','KL','A','B','G','D']: | |
| if cts in chain_types: | |
| with open( outfileroot + '_%s.csv'%cts, 'w' ) as out: | |
| # Sort the positions by index and insertion order | |
| positions = sorted( all_pos[cts], key = lambda p: (p[0], pos_ranks[cts][p]) ) | |
| # Header line | |
| fields = ['Id','domain_no','hmm_species','chain_type','e-value','score','seqstart_index','seqend_index', | |
| 'identity_species','v_gene','v_identity','j_gene','j_identity'] | |
| fields += [ ('%d%s'%(p)).strip() for p in positions ] | |
| print(','.join( fields ), file=out) | |
| # Iterate over the domains identified | |
| for i,j in chain_types[cts]: | |
| line = [ sequences[i][0].replace(',',' '), | |
| str(j), | |
| details[i][j].get('species',''), | |
| details[i][j].get('chain_type',''), | |
| str(details[i][j].get('evalue','')), | |
| str(details[i][j].get('bitscore','')), | |
| str(numbered[i][j][1]), | |
| str(numbered[i][j][2]), | |
| details[i][j].get('germlines',{}).get( 'v_gene',[['',''],0] )[0][0], | |
| details[i][j].get('germlines',{}).get( 'v_gene',[['',''],0] )[0][1], | |
| '%.2f'%details[i][j].get('germlines',{}).get( 'v_gene',[['',''],0] )[1], | |
| details[i][j].get('germlines',{}).get( 'j_gene',[['',''],0] )[0][1], | |
| '%.2f'%details[i][j].get('germlines',{}).get( 'j_gene',[['',''],0] )[1] ] | |
| # Hash the numbering. Insertion order has been preserved in the positions sort. | |
| d = dict( numbered[i][j][0] ) | |
| line += [ d.get(p,'-') for p in positions ] | |
| assert len( line ) == len( fields ) | |
| print(','.join( line ), file=out) | |
| ## Parsing and recognising domain hits from hmmscan ## | |
| def _domains_are_same(dom1, dom2): | |
| """ | |
| Check to see if the domains are overlapping. | |
| @param dom1: | |
| @param dom2: | |
| @return: True or False | |
| """ | |
| dom1, dom2 = sorted( [dom1, dom2], key=lambda x: x.query_start ) | |
| if dom2.query_start >= dom1.query_end: | |
| return False | |
| return True | |
| def _parse_hmmer_query(query, bit_score_threshold=80, hmmer_species=None): | |
| """ | |
| @param query: hmmer query object from Biopython | |
| @param bit_score_threshold: the threshold for which to consider a hit a hit. | |
| The function will identify multiple domains if they have been found and provide the details for the best alignment for each domain. | |
| This allows the ability to identify single chain fvs and engineered antibody sequences as well as the capability in the future for identifying constant domains. | |
| """ | |
| hit_table = [ ['id', 'description', 'evalue', 'bitscore', 'bias', | |
| 'query_start', 'query_end' ] ] | |
| # Find the best hit for each domain in the sequence. | |
| top_descriptions, domains,state_vectors = [], [], [] | |
| if query.hsps: # We have some hits | |
| # If we have specified a species, check to see we have hits for that species | |
| # Otherwise revert back to using any species | |
| if hmmer_species: | |
| #hit_correct_species = [hsp for hsp in query.hsps if hsp.hit_id.startswith(hmmer_species) and hsp.bitscore >= bit_score_threshold] | |
| hit_correct_species = [] | |
| for hsp in query.hsps: | |
| if hsp.bitscore >= bit_score_threshold: | |
| for species in hmmer_species: | |
| if hsp.hit_id.startswith(species): | |
| hit_correct_species.append(hsp) | |
| if hit_correct_species: | |
| hsp_list = hit_correct_species | |
| else: | |
| print("Limiting hmmer search to species %s was requested but hits did not achieve a high enough bitscore. Reverting to using any species" %(hmmer_species)) | |
| hsp_list = query.hsps | |
| else: | |
| hsp_list = query.hsps | |
| for hsp in sorted(hsp_list, key=lambda x: x.evalue): # Iterate over the matches of the domains in order of their e-value (most significant first) | |
| new=True | |
| if hsp.bitscore >= bit_score_threshold: # Only look at those with hits that are over the threshold bit-score. | |
| for i in range( len(domains) ): # Check to see if we already have seen the domain | |
| if _domains_are_same( domains[i], hsp ): | |
| new = False | |
| break | |
| hit_table.append( [ hsp.hit_id, hsp.hit_description, hsp.evalue, hsp.bitscore, hsp.bias, hsp.query_start, hsp.query_end] ) | |
| if new: # It is a new domain and this is the best hit. Add it for further processing. | |
| domains.append( hsp ) | |
| top_descriptions.append( dict( list(zip(hit_table[0], hit_table[-1])) ) ) # Add the last added to the descriptions list. | |
| # Reorder the domains according to the order they appear in the sequence. | |
| ordering = sorted( list(range(len(domains))), key=lambda x: domains[x].query_start) | |
| domains = [ domains[_] for _ in ordering ] | |
| top_descriptions = [ top_descriptions[_] for _ in ordering ] | |
| ndomains = len( domains ) | |
| for i in range(ndomains): # If any significant hits were identified parse and align them to the reference state. | |
| domains[i].order = i | |
| species, chain = top_descriptions[i]["id"].split("_") | |
| state_vectors.append( _hmm_alignment_to_states(domains[i], ndomains, query.seq_len) ) # Alignment to the reference states. | |
| top_descriptions[i][ "species"] = species # Reparse | |
| top_descriptions[i][ "chain_type"] = chain | |
| top_descriptions[i][ "query_start"] = state_vectors[-1][0][-1] # Make sure the query_start agree if it was changed | |
| return hit_table, state_vectors, top_descriptions | |
| def _hmm_alignment_to_states(hsp, n, seq_length): | |
| """ | |
| Take a hit hsp and turn the alignment into a state vector with sequence indices | |
| """ | |
| # Extract the strings for the reference states and the posterior probability strings | |
| reference_string = hsp.aln_annotation["RF"] | |
| state_string = hsp.aln_annotation["PP"] | |
| assert len(reference_string) == len(state_string), "Aligned reference and state strings had different lengths. Don't know how to handle" | |
| # Extract the start an end points of the hmm states and the sequence | |
| # These are python indices i.e list[ start:end ] and therefore start will be one less than in the text file | |
| _hmm_start = hsp.hit_start | |
| _hmm_end = hsp.hit_end | |
| _seq_start = hsp.query_start | |
| _seq_end = hsp.query_end | |
| # Extact the full length of the HMM hit | |
| species, ctype = hsp.hit_id.split('_') | |
| _hmm_length = get_hmm_length( species, ctype ) | |
| # Handle cases where there are n terminal modifications. | |
| # In most cases the user is going to want these included in the numbered domain even though they are not 'antibody like' and | |
| # not matched to the germline. Only allow up to a maximum of 5 unmatched states at the start of the domain | |
| # Adds a bug here if there is a very short linker between a scfv domains with a modified n-term second domain | |
| # Thus this is only done for the first identified domain ( hence order attribute on hsp ) | |
| if hsp.order == 0 and _hmm_start and _hmm_start < 5: | |
| n_extend = _hmm_start | |
| if _hmm_start > _seq_start: | |
| n_extend = min( _seq_start , _hmm_start - _seq_start ) | |
| state_string = '8'*n_extend + state_string | |
| reference_string = 'x'*n_extend + reference_string | |
| _seq_start = _seq_start - n_extend | |
| _hmm_start = _hmm_start - n_extend | |
| # Handle cases where the alignment should be extended to the end of the j-element | |
| # This occurs when there a c-terminal modifications of the variable domain that are significantly different to germline | |
| # Extension is only made when half of framework 4 has been recognised and there is only one domain recognised. | |
| if n==1 and _seq_end < seq_length and (123 < _hmm_end < _hmm_length): # Extend forwards | |
| n_extend = min( _hmm_length - _hmm_end, seq_length - _seq_end ) | |
| state_string = state_string + '8'*n_extend | |
| reference_string = reference_string + 'x'*n_extend | |
| _seq_end = _seq_end + n_extend | |
| _hmm_end = _hmm_end + n_extend | |
| # Generate lists for the states and the sequence indices that are included in this alignment | |
| hmm_states = all_reference_states[ _hmm_start : _hmm_end ] | |
| sequence_indices = list(range(_seq_start, _seq_end)) | |
| h, s = 0, 0 # initialise the current index in the hmm and the sequence | |
| state_vector = [] | |
| # iterate over the state string (or the reference string) | |
| for i in range( len(state_string) ): | |
| if reference_string[i] == "x": # match state | |
| state_type = "m" | |
| else: # insert state | |
| state_type = "i" | |
| if state_string[i] == ".": # overloading if deleted relative to reference. delete_state | |
| state_type = "d" | |
| sequence_index = None | |
| else: | |
| sequence_index = sequence_indices[s] | |
| # Store the alignment as the state identifier (uncorrected IMGT annotation) and the index of the sequence | |
| state_vector.append( ((hmm_states[h], state_type), sequence_index ) ) | |
| # Updates to the indices | |
| if state_type == "m": | |
| h+=1 | |
| s+=1 | |
| elif state_type == "i": | |
| s+=1 | |
| else: # delete state | |
| h+=1 | |
| return state_vector | |
| def parse_hmmer_output(filedescriptor="", bit_score_threshold=80, hmmer_species=None): | |
| """ | |
| Parse the output of HMMscan and return top alignment and the score table for each input sequence. | |
| """ | |
| results = [] | |
| if type(filedescriptor) is str: | |
| openfile = open | |
| elif type(filedescriptor) is int: | |
| openfile = os.fdopen | |
| with openfile(filedescriptor) as inputfile: | |
| p = HMMERParser( inputfile ) | |
| for query in p: | |
| results.append(_parse_hmmer_query(query,bit_score_threshold=bit_score_threshold,hmmer_species=hmmer_species )) | |
| return results | |
| def run_hmmer(sequence_list,hmm_database="ALL",hmmerpath="", ncpu=None, bit_score_threshold=80, hmmer_species=None): | |
| """ | |
| Run the sequences in sequence list against a precompiled hmm_database. | |
| Those sequence that have a significant hit with a bit score over a threshold will | |
| be recognised and an alignment given. The alignment will be used to number the | |
| sequence. | |
| @param sequence_list: a list of (name, sequence) tuples. Both are strings | |
| @param hmm_database: The hmm database to use. Currently, all hmms are in the ALL database. | |
| The code to develop new models is in build_pipeline in the git repo. | |
| @param hmmerpath: The path to hmmer binaries if not in the path | |
| @param ncpu: The number of cpu's to allow hmmer to use. | |
| """ | |
| # Check that hmm_database is available | |
| assert hmm_database in ["ALL"], "Unknown HMM database %s"%hmm_database | |
| HMM = os.path.join( HMM_path, "%s.hmm"%hmm_database ) | |
| # Create a fasta file for all the sequences. Label them with their sequence index | |
| # This will go to a temp file | |
| fasta_filehandle, fasta_filename = tempfile.mkstemp( ".fasta", text=True ) | |
| with os.fdopen(fasta_filehandle,'w') as outfile: | |
| write_fasta(sequence_list, outfile) | |
| output_filehandle, output_filename = tempfile.mkstemp( ".txt", text=True ) | |
| # Run hmmer as a subprocess | |
| if hmmerpath: | |
| hmmscan = os.path.join(hmmerpath,"hmmscan") | |
| else: | |
| hmmscan = "hmmscan" | |
| try: | |
| if ncpu is None: | |
| command = [ hmmscan, "-o", output_filename, HMM, fasta_filename] | |
| else: | |
| command = [ hmmscan, "-o", output_filename, "--cpu", str(ncpu), HMM, fasta_filename] | |
| process = Popen( command, stdout=PIPE, stderr=PIPE ) | |
| _, pr_stderr = process.communicate() | |
| if pr_stderr: | |
| _f = os.fdopen(output_filehandle) # This is to remove the filedescriptor from the os. I have had problems with it before. | |
| _f.close() | |
| raise HMMscanError(pr_stderr) | |
| results = parse_hmmer_output(output_filehandle, bit_score_threshold=bit_score_threshold, hmmer_species=hmmer_species) | |
| finally: | |
| # clear up | |
| os.remove(fasta_filename) | |
| os.remove(output_filename) | |
| return results | |
| def get_hmm_length( species, ctype ): | |
| ''' | |
| Get the length of an hmm given a species and chain type. | |
| This tells us how many non-insertion positions there could possibly be in a domain (127 or 128 positions under imgt) | |
| ''' | |
| try: | |
| return len(list(all_germlines['J'][ctype][species].values())[0].rstrip('-')) | |
| except KeyError: | |
| return 128 | |
| def number_sequence_from_alignment(state_vector, sequence, scheme="imgt", chain_type=None): | |
| """ | |
| Given you have an alignment. Give back the numbering | |
| @param state_vector: List of states from the hmm. Effectively these are imgt columns but CDR3 has not been redone. | |
| @param sequence: The original sequence string or list. | |
| @param scheme: The numbering scheme to apply | |
| @param chain_type: The type of chain to apply numbering for. Some schemes do not require this (IMGT). Others (e.g. Chothia/Wolfguy) do. | |
| @return: A list of numbering identifier / amino acids tuples over the domain that has been numbered. The indices of the start (inclusive) and end point (exclusive) in the sequence for the numbering | |
| """ | |
| scheme=scheme.lower() | |
| if scheme == "imgt": | |
| return number_imgt(state_vector, sequence) | |
| elif scheme == "chothia": | |
| if chain_type == "H": | |
| return number_chothia_heavy(state_vector, sequence) | |
| elif chain_type in "KL": | |
| return number_chothia_light(state_vector, sequence) | |
| else: | |
| raise AssertionError("Unimplemented numbering scheme %s for chain %s"%( scheme, chain_type)) | |
| elif scheme == "kabat": | |
| if chain_type == "H": | |
| return number_kabat_heavy(state_vector, sequence) | |
| elif chain_type in "KL": | |
| return number_kabat_light(state_vector, sequence) | |
| else: | |
| raise AssertionError("Unimplemented numbering scheme %s for chain %s"%( scheme, chain_type)) | |
| elif scheme == "martin": | |
| if chain_type == "H": | |
| return number_martin_heavy(state_vector, sequence) | |
| elif chain_type in "KL": | |
| return number_martin_light(state_vector, sequence) | |
| else: | |
| raise AssertionError("Unimplemented numbering scheme %s for chain %s"%( scheme, chain_type)) | |
| elif scheme == "aho": | |
| return number_aho(state_vector, sequence, chain_type) # requires the chain type to heuristically put the CDR1 gap in position. | |
| elif scheme == "wolfguy": | |
| if chain_type == "H": | |
| return number_wolfguy_heavy( state_vector, sequence ) | |
| elif chain_type in "KL": | |
| return number_wolfguy_light( state_vector, sequence ) | |
| else: | |
| raise AssertionError("Unimplemented numbering scheme %s for chain %s"%( scheme, chain_type)) | |
| else: | |
| raise AssertionError("Unimplemented numbering scheme %s for chain %s"%( scheme, chain_type)) | |
| def number_sequences_from_alignment(sequences, alignments, scheme="imgt", allow=set(["H","K","L","A","B","G","D"]), | |
| assign_germline=False, allowed_species=None): | |
| ''' | |
| Given a list of sequences and a corresponding list of alignments from run_hmmer apply a numbering scheme. | |
| ''' | |
| # Iteration over the sequence alignments performing the desired numbering | |
| numbered = [] | |
| alignment_details = [] | |
| hit_tables = [] | |
| for i in range(len(sequences)): | |
| # Unpack | |
| hit_table, state_vectors, detailss = alignments[i] # We may have multiple domains per sequence (e.g. single chain fvs). | |
| # Iterate over all the domains in the sequence that have been recognised (typcially only 1 with the current hmms available) | |
| hit_numbered, hit_details = [], [] | |
| for di in range( len( state_vectors ) ): | |
| state_vector = state_vectors[di] | |
| details = detailss[di] | |
| details["scheme"]=scheme | |
| details["query_name"]=sequences[i][0] | |
| # Only number things that are allowed. We still keep the alignment details and hit_table | |
| if state_vector and details["chain_type"] in allow: | |
| try: | |
| # Do the numbering and validate (for development purposes) | |
| hit_numbered.append( validate_numbering(number_sequence_from_alignment(state_vector, sequences[i][1], | |
| scheme=scheme, chain_type=details["chain_type"]), sequences[i] ) ) | |
| if assign_germline: | |
| details["germlines"] = run_germline_assignment( state_vector, sequences[i][1], | |
| details["chain_type"], allowed_species=allowed_species) | |
| hit_details.append( details ) | |
| except AssertionError as e: # Handle errors. Those I have implemented should be assertion. | |
| print(str(e), file=sys.stderr) | |
| raise e # Validation went wrong. Error message will go to stderr. Want this to be fatal during development. | |
| except Exception as e: | |
| print("Error: Something really went wrong that has not been handled", file=sys.stderr) | |
| print(str(e), file=sys.stderr) | |
| raise e | |
| if hit_numbered: | |
| numbered.append( hit_numbered ) | |
| alignment_details.append( hit_details ) | |
| else: | |
| numbered.append( None ) | |
| alignment_details.append( None ) | |
| hit_tables.append(hit_table) | |
| return numbered, alignment_details, hit_tables | |
| def get_identity( state_sequence, germline_sequence ): | |
| """ | |
| Get the partially matched sequence identity between two aligned sequences. | |
| Partial in the sense that gaps can be in the state_sequence. | |
| """ | |
| # Ensure that the sequences are the expected length | |
| assert len( state_sequence) == len(germline_sequence ) == 128 | |
| n, m = 0, 0 | |
| for i in range( 128 ): | |
| if germline_sequence[i] == "-":continue | |
| if state_sequence[i].upper() == germline_sequence[i]: m+=1 | |
| n+=1 | |
| if not n: | |
| return 0 | |
| return float(m)/n | |
| def run_germline_assignment(state_vector, sequence, chain_type, allowed_species=None ): | |
| """ | |
| Find the closest sequence identity match. | |
| """ | |
| genes={'v_gene': [None,None], | |
| 'j_gene': [None,None], | |
| } | |
| # Extract the positions that correspond to match (germline) states. | |
| state_dict = dict( ((i, 'm'),None) for i in range(1,129)) | |
| state_dict.update(dict(state_vector)) | |
| state_sequence = "".join([ sequence[state_dict[(i, 'm')]] if state_dict[(i,'m')] is not None else "-" for i in range(1,129) ]) | |
| # Iterate over the v-germline sequences of the chain type of interest. | |
| # The maximum sequence identity is used to assign the germline | |
| if chain_type in all_germlines["V"]: | |
| if allowed_species is not None: | |
| if not all( [ sp in all_germlines['V'][chain_type] for sp in allowed_species ] ): # Made non-fatal | |
| return {} | |
| else: | |
| allowed_species = all_species | |
| seq_ids = {} | |
| for species in allowed_species: | |
| if species not in all_germlines["V"][ chain_type ]: continue # Previously bug. | |
| for gene, germline_sequence in all_germlines["V"][ chain_type ][ species ].items(): | |
| seq_ids[ (species, gene) ] = get_identity( state_sequence , germline_sequence ) | |
| genes['v_gene' ][0] = max( seq_ids, key=lambda x: seq_ids[x] ) | |
| genes['v_gene' ][1] = seq_ids[ genes['v_gene' ][0] ] | |
| # Use the assigned species for the v-gene for the j-gene. | |
| # This assumption may affect exotically engineered abs but in general is fair. | |
| species = genes['v_gene' ][0][0] | |
| if chain_type in all_germlines["J"]: | |
| if species in all_germlines["J"][chain_type]: | |
| seq_ids = {} | |
| for gene, germline_sequence in all_germlines["J"][ chain_type ][ species ].items(): | |
| seq_ids[ (species, gene) ] = get_identity( state_sequence , germline_sequence ) | |
| genes['j_gene' ][0] = max( seq_ids, key=lambda x: seq_ids[x] ) | |
| genes['j_gene' ][1] = seq_ids[ genes['j_gene' ][0] ] | |
| return genes | |
| def check_for_j( sequences, alignments, scheme ): | |
| ''' | |
| As the length of CDR3 gets long (over 30ish) an alignment that does not include the J region becomes more favourable. | |
| This leads to really long CDR3s not being numberable. | |
| To overcome this problem, when no J region is detected we try without the v region. | |
| ''' | |
| for i in range( len( sequences ) ): | |
| # Check the alignment for J region | |
| if len(alignments[i][1]) ==1: # Only do for single domain chains. | |
| # Check whether a J region has been identified. If not check whether there is still a considerable amount of sequence | |
| # remaining. | |
| ali = alignments[i][1][0] | |
| # Find the last match position. | |
| last_state = ali[-1][0][0] | |
| last_si = ali[-1][1] | |
| if last_state < 120: # No or very little J region | |
| if last_si + 30 < len( sequences[i][1] ): # Considerable amount of sequence left...suspicious of a long CDR3 | |
| # Find the position of the conserved cysteine (imgt 104). | |
| cys_si = dict( ali ).get( (104,'m'), None ) | |
| if cys_si is not None: # 104 found. | |
| # Find the corresponding index in the alignment. | |
| cys_ai = ali.index( ((104, 'm'), cys_si) ) | |
| # Try to identify a J region in the remaining sequence after the 104. A low bit score threshold is used. | |
| _, re_states, re_details = run_hmmer( [(sequences[i][0], sequences[i][1][cys_si+1:])], | |
| bit_score_threshold=10 )[0] | |
| # Check if a J region was detected in the remaining sequence. | |
| if re_states and re_states[0][-1][0][0] >= 126 and re_states[0][0][0][0] <= 117: | |
| # Sandwich the presumed CDR3 region between the V and J regions. | |
| vRegion = ali[:cys_ai+1] | |
| jRegion = [ (state, index+cys_si+1) for state, index in re_states[0] if state[0] >= 117 ] | |
| cdrRegion = [] | |
| next = 105 | |
| for si in range( cys_si+1, jRegion[0][1] ): | |
| if next >= 116: | |
| cdrRegion.append( ( (116, 'i'), si ) ) | |
| else: | |
| cdrRegion.append( ( (next, 'm'), si ) ) | |
| next +=1 | |
| # Update the alignment entry. | |
| alignments[i][1][0] = vRegion + cdrRegion + jRegion | |
| alignments[i][2][0]['query_end'] = jRegion[-1][1] + 1 | |
| ################################## | |
| # High level numbering functions # | |
| ################################## | |
| # Main function for ANARCI | |
| # Name conflict with function, module and package is kept for legacy unless issues are reported in future. | |
| def anarci(sequences, scheme="imgt", database="ALL", output=False, outfile=None, csv=False, allow=set(["H","K","L","A","B","G","D"]), | |
| hmmerpath="", ncpu=None, assign_germline=False, allowed_species=None, bit_score_threshold=80): | |
| """ | |
| The main function for anarci. Identify antibody and TCR domains, number them and annotate their germline and species. | |
| It is advised to use one of the wrapper functions: | |
| o run_anarci - fasta file or sequence list in. Automated multiprocessing for large jobs. Sequences, numbering, details | |
| and hit tables out. | |
| o number - single sequence in, numbering out | |
| @param sequences: A list or tuple of (Id, Sequence) pairs | |
| e.g. [ ("seq1","EVQLQQSGAEVVRSG ..."), | |
| ("seq2","DIVMTQSQKFMSTSV ...") ] | |
| @param scheme: The numbering scheme that should be applied. Choose from imgt, chothia, kabat or martin | |
| @param output: Boolean flag to say whether the result should be output. | |
| @param outfile: The name of the file to output to. If output is True and outfile is None then output is printed | |
| to stdout. | |
| @param csv: Boolean flag to say whether the csv output alignment format or the vertical anarci format should be used. | |
| @param allow: A set containing the chain types that should be recognised. If chothia, kabat or martin is used | |
| as the scheme, anarci will ignore tcr chains. Choose a subset of ["H","K","L","A","B","G","D"] | |
| @param assign_germline: Using highest sequence identity assign the germline to the chain. Can be more accurate at identifying | |
| species than the best HMM hit alone. (Bool) | |
| @param allowed_species: If assign_germline is true, limit the species that can be assigned to a limited set. Useful when the | |
| animal species is known or when performing closest germline experiments. Choose a subset of ['human', | |
| 'mouse','rat','rabbit','rhesus','pig','alpaca']. | |
| @param bit_score_threshold: The threshold score from HMMER at which an alignment should be numbered. Lowering the threshold | |
| means domain recognition is more permissive and can be useful for numbering heavily engineered molecules. | |
| However, too low and false positive recognition of other ig-like molecules will occur. | |
| @param hmmerpath: The path to hmmscan. If left unspecified then the PATH will be searched. | |
| @param ncpu: The number of cpu's that hmmer should be allowed to use. If not specified then the hmmscan | |
| default is used. N.B. hmmscan must be compiled with multithreading enabled for this option to have effect. | |
| Please consider using the run_anarci function for native multiprocessing with anarci. | |
| @param database: The HMMER database that should be used. Normally not changed unless a custom db is created. | |
| @return: Three lists. Numbered, Alignment_details and Hit_tables. | |
| Each list is in the same order as the input sequences list. | |
| A description of each entry in the three lists is as followed. | |
| o Numbered: will be None if no domain was found for that sequence or a list of domains with their | |
| numbering, start and finish indices. | |
| o Alignment_details: will be None if no domain was found for that sequence or a dictionary for each | |
| domain identified containing the details of the alignment (chain type, e-value, species etc). | |
| o Hit_tables: None if no domain was found for that sequence or a nested list for each domain containing | |
| the hit table from hmmscan. | |
| """ | |
| # Validate the input scheme | |
| try: | |
| scheme = scheme_short_to_long[scheme.lower()] | |
| except KeyError: | |
| raise AssertionError("Unrecognised or unimplemented scheme: %s"%scheme) | |
| # Check we have arguments for output before doing work. | |
| if csv: | |
| assert outfile, 'If csv output is True then an outfile must be specified' | |
| _path, _ = os.path.split(outfile) | |
| assert (not _path) or os.path.exists(_path), 'Output directory %s does not exist'%_path | |
| # Perform the alignments of the sequences to the hmm database | |
| alignments = run_hmmer(sequences,hmm_database=database,hmmerpath=hmmerpath,ncpu=ncpu,bit_score_threshold=bit_score_threshold,hmmer_species=allowed_species ) | |
| # Check the numbering for likely very long CDR3s that will have been missed by the first pass. | |
| # Modify alignments in-place | |
| check_for_j( sequences, alignments, scheme ) | |
| # Apply the desired numbering scheme to all sequences | |
| numbered, alignment_details, hit_tables = number_sequences_from_alignment(sequences, alignments, scheme=scheme, allow=allow, | |
| assign_germline=assign_germline, | |
| allowed_species=allowed_species) | |
| # Output if necessary | |
| if output: | |
| if csv: | |
| csv_output(sequences, numbered, details, outfile) | |
| else: | |
| outto, close=sys.stdout, False | |
| if outfile: | |
| outto, close = open(outfile,'w'), True | |
| anarci_output(numbered, sequences, alignment_details, outto) | |
| if close: | |
| outto.close() | |
| return numbered, alignment_details, hit_tables | |
| # Wrapper to run anarci using multiple processes and automate fasta file reading. | |
| def run_anarci( seq, ncpu=1, **kwargs): | |
| ''' | |
| Run the anarci numbering protocol for single or multiple sequences. | |
| @param sequences: A list or tuple of (Id, Sequence) pairs | |
| e.g. [ ("seq1","EVQLQQSGAEVVRSG ..."), | |
| ("seq2","DIVMTQSQKFMSTSV ...") ] | |
| @param scheme: The numbering scheme that should be applied. Choose from imgt, chothia, kabat or martin | |
| @param output: Boolean flag to say whether the result should be output. | |
| @param outfile: The name of the file to output to. If output is True and outfile is None then output is printed | |
| to stdout. | |
| @param allow: A set containing the chain types that should be recognised. If chothia, kabat or martin is used | |
| as the scheme, anarci will ignore tcr chains. Choose a subset of ["H","K","L","A","B","G","D"] | |
| @param assign_germline: Using highest sequence identity assign the germline to the chain. Can be more accurate at identifying | |
| species than the best HMM hit alone. (Bool) | |
| @param allowed_species: If assign_germline is true, limit the species that can be assigned to a limited set. Useful when the | |
| animal species is known or when performing closest germline experiments. Choose a subset of ['human', | |
| 'mouse','rat','rabbit','rhesus','pig','alpaca']. | |
| @param bit_score_threshold: The threshold score from HMMER at which an alignment should be numbered. Lowering the threshold | |
| means domain recognition is more permissive and can be useful for numbering heavily engineered molecules. | |
| However, too low and false positive recognition of other ig-like molecules will occur. | |
| @param hmmerpath: The path to hmmscan. If left unspecified then the PATH will be searched. | |
| @param ncpu: The number of cpu's that hmmer should be allowed to use. If not specified then the hmmscan | |
| default is used. N.B. hmmscan must be compiled with multithreading enabled for this option to have effect. | |
| Please consider using the run_anarci function for native multiprocessing with anarci. | |
| @param database: The HMMER database that should be used. Normally not changed unless a custom db is created. | |
| @return: Four lists. Sequences, Numbered, Alignment_details and Hit_tables. | |
| Each list is in the same order. | |
| A description of each entry in the four lists is as followed. | |
| o Sequences: The list of sequences formatted as [(Id,sequence), ...]. | |
| o Numbered: will be None if no domain was found for that sequence or a list of domains with their | |
| numbering, start and finish indices. | |
| o Alignment_details: will be None if no domain was found for that sequence or a dictionary for each | |
| domain identified containing the details of the alignment (chain type, e-value, species etc). | |
| o Hit_tables: None if no domain was found for that sequence or a nested list for each domain containing | |
| the hit table from hmmscan. | |
| ''' | |
| # Parse the input sequence or fasta file. | |
| if isinstance(seq, list) or isinstance(seq,tuple): # A list (or tuple) of (name,sequence) sequences | |
| assert all( len(_) == 2 for _ in seq ), "If list or tuple supplied as input format must be [ ('ID1','seq1'), ('ID2', 'seq2'), ... ]" | |
| sequences = seq | |
| elif os.path.isfile( seq ): # Fasta file. | |
| # Read the sequences. All are read into memory currently... | |
| sequences = read_fasta( seq ) | |
| ncpu = int(max(1, ncpu )) | |
| elif isinstance(seq, str): # Single sequence | |
| validate_sequence( seq ) | |
| ncpu=1 | |
| sequences = [ ["Input sequence", seq ]] | |
| # Handle the arguments to anarci. | |
| output = kwargs.get('output', False ) | |
| outfile = kwargs.get('outfile', False ) | |
| csv = kwargs.get( 'csv', False ) | |
| if csv: # Check output arguments before doing work. | |
| assert outfile, 'If csv output is True then an outfile must be specified' | |
| _path, _ = os.path.split(outfile) | |
| assert (not _path) or os.path.exists(_path), 'Output directory %s does not exist'%_path | |
| kwargs['ncpu'] = 1 # Set hmmscan ncpu to 1. HMMER has to be compiled appropriately for this to have an effect. | |
| kwargs['output'] = False # Overide and write the compiled results here. | |
| anarci_partial = partial( anarci, **kwargs ) | |
| chunksize = math.ceil( float( len(sequences) )/ncpu ) | |
| # Run the anarci function using a pool of workers. Using the map_async to get over the KeyboardInterrupt bug in python2.7 | |
| if ncpu > 1: | |
| pool = Pool( ncpu ) | |
| results = pool.map_async( anarci_partial, grouper( chunksize, sequences ) ).get() | |
| pool.close() | |
| else: | |
| results = list(map( anarci_partial, grouper( chunksize, sequences ) )) | |
| # Reformat the results to flat lists. | |
| numbered = sum( (_[0] for _ in results), [] ) | |
| alignment_details = sum( (_[1] for _ in results ), [] ) | |
| hit_tables = sum( (_[2] for _ in results), [] ) | |
| # Output if necessary | |
| if output: | |
| if csv: | |
| csv_output(sequences, numbered, alignment_details, outfile) | |
| else: | |
| outto, close=sys.stdout, False | |
| if outfile: | |
| outto, close = open(outfile,'w'), True | |
| anarci_output(numbered, sequences, alignment_details, outto) | |
| if close: | |
| outto.close() | |
| # Return the results | |
| return sequences, numbered, alignment_details, hit_tables | |
| # Wrapper function for simple sequence in numbering and chain type out behaviour. | |
| def number(sequence, scheme="imgt", database="ALL", allow=set(["H","K","L","A","B","G","D"])): | |
| """ | |
| Given a sequence string, use anarci to number it using the scheme of choice. | |
| Only the first domain will be recognised and numbered | |
| For multiple sequences it is advised to use run_anarci instead of iterative use of this function. | |
| @param sequence: An amino acid sequence string | |
| @param scheme: The numbering scheme that should be applied. Choose from imgt, chothia, kabat or martin | |
| @param database: The HMMER database that should be used. Normally not changed unless a custom db is created. | |
| @param allow: A set containing the chain types that should be recognised. If chothia, kabat or martin is used | |
| as the scheme, anarci will ignore tcr chains. | |
| @return: If the sequence can be numbered, a list containing the numbering and sequence; and the chain type. | |
| Otherwise both are False. | |
| """ | |
| try: | |
| validate_sequence( sequence ) | |
| scheme = scheme_short_to_long[scheme.lower()] | |
| except KeyError: | |
| raise AssertionError("Unrecognised to unimplemented scheme: %s"%scheme) | |
| if len(sequence) < 70: # Length check. ANARCI can number fragments of chains well. Encourage full domain numbering. | |
| return False, False | |
| try: | |
| numbered, alignment_details, _ = anarci( [("sequence_0", sequence)], scheme=scheme, database=database, output=False, allow=allow ) | |
| except AssertionError: # Catch where the user has tried to number a TCR with an antibody scheme | |
| return False, False | |
| # We return the numbering list and the chain type where kappa and lambda chains are both "L" for light | |
| if numbered[0]: | |
| return numbered[0][0][0], chain_type_to_class[alignment_details[0][0]["chain_type"]] | |
| else: | |
| return False, False | |
| if __name__ == "__main__": | |
| # Test and example useage of the anarci function. | |
| sequences = [ ("12e8:H","EVQLQQSGAEVVRSGASVKLSCTASGFNIKDYYIHWVKQRPEKGLEWIGWIDPEIGDTEYVPKFQGKATMTADTSSNTAYLQLSSLTSEDTAVYYCNAGHDYDRGRFPYWGQGTLVTVSAAKTTPPSVYPLAP"), | |
| ("12e8:L","DIVMTQSQKFMSTSVGDRVSITCKASQNVGTAVAWYQQKPGQSPKLMIYSASNRYTGVPDRFTGSGSGTDFTLTISNMQSEDLADYFCQQYSSYPLTFGAGTKLELKRADAAPTVSIFPPSSEQLTSGGASV"), | |
| ("scfv:A","DIQMTQSPSSLSASVGDRVTITCRTSGNIHNYLTWYQQKPGKAPQLLIYNAKTLADGVPSRFSGSGSGTQFTLTISSLQPEDFANYYCQHFWSLPFTFGQGTKVEIKRTGGGGSGGGGSGGGGSGGGGSEVQLVESGGGLVQPGGSLRLSCAASGFDFSRYDMSWVRQAPGKRLEWVAYISSGGGSTYFPDTVKGRFTISRDNAKNTLYLQMNSLRAEDTAVYYCARQNKKLTWFDYWGQGTLVTVSSHHHHHH"), | |
| ("lysozyme:A","KVFGRCELAAAMKRHGLDNYRGYSLGNWVCAAKFESNFNTQATNRNTDGSTDYGILQINSRWWCNDGRTPGSRNLCNIPCSALLSSDITASVNCAKKIVSDGNGMNAWVAWRNRCKGTDVQAWIRGCRL")] | |
| results = anarci(sequences, scheme="imgt", output=True) | |
| numbering, alignment_details, hit_tables = results | |
| expect_one_VH_domain_numbering, expect_one_VL_domain_numbering, expect_VH_then_VL_numbering, expect_None = numbering | |
| assert len(expect_one_VH_domain_numbering) == 1 | |
| assert len(expect_one_VL_domain_numbering) == 1 | |
| assert len(expect_VH_then_VL_numbering) == 2 | |
| assert expect_None == None | |