File size: 5,086 Bytes
c712316
 
 
 
 
 
873150b
c712316
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
849b9bc
c712316
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
873150b
 
 
 
c712316
 
 
 
 
 
 
e359f1f
c712316
 
 
 
 
 
 
 
873150b
 
 
 
 
 
 
 
c712316
 
 
 
 
 
10fe79e
c712316
 
 
 
 
 
 
 
 
0be1ea7
2c37d62
29f5209
c712316
 
 
 
873150b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import requests
from io import StringIO
from Bio import SeqIO
import os
import time
import pandas as pd
import intervaltree

def find_domains(email, sequence, name):

    # send request to interproscan api
    headers = {
        'Content-Type': 'application/x-www-form-urlencoded',
        'Accept': 'text/plain',
    }

    data= {
       'email': email,
       'stype': 'p',
       'sequence': f'{sequence}'}


    job_id_response = requests.post('https://www.ebi.ac.uk/Tools/services/rest/iprscan5/run', headers=headers, data=data)
    job_id = job_id_response.text

    # get results

    headers = {
    'Accept': 'application/json',
    }

    job_result_url = f'https://www.ebi.ac.uk/Tools/services/rest/iprscan5/result/{job_id}/json'
    
    json_output = None
    entries = dict()
    with requests.Session() as s:
        # try 10 times if not successful print error
        c=0
        while c<10:
            job_result_response = s.get(job_result_url, headers=headers)
            if job_result_response.status_code == 200:
                json_output= job_result_response.json()['results'][0]
                print('InterProScan job done')
                break
            else:
                time.sleep(60)
                c+=1

    if json_output is None:
        result_text = 'InterProScan job failed'
        return [result_text, job_id, job_result_response.text]
    
    else:
        for elem in json_output['matches']:
            entry = elem['signature']['entry']

            location_list = [f"{i['start']}-{i['end']}" for i in elem['locations']]

            if type(entry) == dict and entry['type'] == 'DOMAIN':
                if entry['accession'] not in entries:
                    entries[entry['accession']] = {
                        'name': entry['name'],
                        # add locations as a list
                        'locations': location_list
                    }

                else:
                    try:
                        entries[entry['accession']]['locations'].extend(location_list)
                    except AttributeError:
                        entries[entry['accession']]['locations'] = entries[entry['accession']]['locations'].split(' ')
                        entries[entry['accession']]['locations'] = [i for i in entries[entry['accession']]['locations'] if i]
                        entries[entry['accession']]['locations'].extend(location_list)

                entries[entry['accession']]['locations'] = list(set(entries[entry['accession']]['locations']))
                if len(entries[entry['accession']]['locations']) > 1:
                    entries[entry['accession']]['locations'] = merge_locations(entries[entry['accession']]['locations'])
                    entries[entry['accession']]['locations'] = sorted([i.split('-') for i in entries[entry['accession']]['locations']], key=lambda x: (int(x[0]), int(x[1])))
                    entries[entry['accession']]['locations'] = ['-'.join(i) for i in entries[entry['accession']]['locations']]
    if entries:
        result_text  = 'Domains found.'

        # create domains dataframe
        domains_df = pd.DataFrame.from_dict(entries, orient='index').reset_index()
        domains_df['protein_name'] = name
        domains_df = domains_df[['protein_name', 'index', 'name', 'locations']]
        domains_df.columns = ['protein_name', 'domain_accession', 'domain_name', 'domain_locations']
        return [result_text, domains_df]

    else:
        result_text  = 'No domains found.'
        return [result_text]
                        
    # generate protein function predictions based on domain2go mappings


def merge_locations(locations):
    temp_locs = [i.split('-') for i in locations]
    tree = intervaltree.IntervalTree.from_tuples(temp_locs)
    tree.merge_overlaps()
    merged_locations = ['-'.join([i.begin, i.end]) for i in tree]
    return merged_locations
    
def generate_function_predictions(domains_df, mapping_path):
    
    # read domain2go mappings
    domain2go_df = pd.read_csv(os.path.join(mapping_path, 'finalized_domain2go_mappings.txt'))
    print('Domain2GO mappings loaded')
    # merge domain2go mappings with domains found in protein sequence
    merged_df = pd.merge(domains_df, domain2go_df, left_on='domain_accession', right_on='Interpro')

    print('Function predictions generated.')
    
    # if merged_df is empty return
    if merged_df.empty:
        result_text = 'No function predictions found.'
        return [result_text]
    
    else:
        merged_df['protein_name'] = domains_df['protein_name'].iloc[0]
        merged_df = merged_df[['protein_name', 'GO', 'GO_name', 'GO_aspect', 'domain_locations', 's', 'domain_accession', 'domain_name',]]
        merged_df.columns = ['protein_name', 'GO_ID', 'GO_term', 'GO_category', 'sequence_region', 'probability', 'domain_accession', 'domain_name',]

        # save protein function predictions
        protein_name = domains_df['protein_name'].iloc[0]
        result_text= 'Function predictions found.'
        return [result_text, merged_df]