Spaces:
Sleeping
Sleeping
import ast | |
import re | |
import time | |
import json | |
import zlib | |
from xml.etree import ElementTree | |
from urllib.parse import urlparse, parse_qs, urlencode | |
import requests | |
import unipressed | |
from requests.adapters import HTTPAdapter, Retry | |
from unipressed import IdMappingClient | |
import Bio | |
from Bio import SeqIO | |
import pandas as pd | |
import numpy as np | |
from pathlib import Path | |
from Bio.PDB import * | |
from io import StringIO | |
from utils import * | |
import math | |
import json | |
UNIPROT_ANNOTATION_COLS = ['disulfide', 'intMet', 'intramembrane', 'naturalVariant', 'dnaBinding', | |
'activeSite', | |
'nucleotideBinding', 'lipidation', 'site', 'transmembrane', | |
'crosslink', 'mutagenesis', 'strand', | |
'helix', 'turn', 'metalBinding', 'repeat', 'topologicalDomain', | |
'caBinding', 'bindingSite', 'region', | |
'signalPeptide', 'modifiedResidue', 'zincFinger', 'motif', | |
'coiledCoil', 'peptide', | |
'transitPeptide', 'glycosylation', 'propeptide', 'disulfideBinary', | |
'intMetBinary', 'intramembraneBinary', | |
'naturalVariantBinary', 'dnaBindingBinary', 'activeSiteBinary', | |
'nucleotideBindingBinary', 'lipidationBinary', 'siteBinary', | |
'transmembraneBinary', 'crosslinkBinary', 'mutagenesisBinary', | |
'strandBinary', 'helixBinary', 'turnBinary', 'metalBindingBinary', | |
'repeatBinary', 'topologicalDomainBinary', 'caBindingBinary', | |
'bindingSiteBinary', 'regionBinary', 'signalPeptideBinary', | |
'modifiedResidueBinary', 'zincFingerBinary', 'motifBinary', | |
'coiledCoilBinary', 'peptideBinary', 'transitPeptideBinary', | |
'glycosylationBinary', 'propeptideBinary'] | |
def get_pdb_ids(protein_id): | |
try: | |
request = IdMappingClient.submit( | |
source="UniProtKB_AC-ID", dest="PDB", ids={protein_id}) | |
pdb_list = list(request.each_result()) | |
return [i['to'] for i in pdb_list] | |
except requests.exceptions.HTTPError: | |
return [] | |
except unipressed.id_mapping.core.IdMappingError: | |
print('IdMappingError caused by UniProt API service, please try later.') | |
return [] | |
except KeyError: | |
return [] | |
def fix_filename(filename): | |
try: | |
if Path(filename).suffix == '.pdb': | |
pass | |
elif Path(filename).stem.endswith("ent"): | |
filename_replace_ext = filename.with_name( Path(filename).stem[3:]) | |
Path(filename).rename(filename_replace_ext.with_suffix('.pdb')) | |
elif Path(filename).stem.startswith("pdb"): | |
filename_replace_ext = Path(filename).with_name(Path(filename).stem[3:]) | |
Path(filename).rename(filename_replace_ext.with_suffix('.pdb')) | |
else: | |
filename_replace_ext = filename.with_suffix(".pdb") | |
Path(filename).rename(filename_replace_ext) | |
except: | |
FileNotFoundError | |
def fetch_uniprot_ids(pdb_code): | |
response = requests.get(f"https://www.ebi.ac.uk/pdbe/api/mappings/uniprot/{pdb_code}") | |
response.raise_for_status() | |
resp = response.json() | |
return list(list(list(resp.values())[0].values())[0].keys()) | |
def addPDBinfo(data, path_to_output_files): | |
# pdb_fasta = pd.DataFrame(columns=['pdbID', 'chain', 'pdbSequence']) | |
pdb_info = pd.DataFrame(columns=['uniprotID', 'pdbID', 'chain', 'resolution']) | |
print('Retrieving PDB structures...\n') | |
up_list = data.uniprotID.to_list() | |
pdbs = [get_pdb_ids(i) for i in up_list] | |
if len(pdbs) >= 1: | |
pdbs = [item for sublist in pdbs for item in sublist] | |
pdbs = list(filter(None, pdbs)) | |
pdbs = set(pdbs) | |
pdbs = [i.lower() for i in pdbs] | |
else: | |
pdbs = [] | |
print('No PDB structure found for the query. ') | |
print('\n>>Starting PDB structures download...\n') | |
print('\n>>Processing PDB structures...\n') | |
parser = PDBParser() | |
ppb = PPBuilder() | |
index = 0 | |
for search in pdbs: | |
print(f'Searching for {search.upper()}') | |
try: | |
pdb_url = f"https://files.rcsb.org/download/{search}.pdb" | |
response = requests.get(pdb_url) | |
response.raise_for_status() # Check for a successful response | |
pdb_data = response.text | |
pdb_parser = PDBParser(QUIET=True) # QUIET=True suppresses warnings | |
pdb_file_content = StringIO(pdb_data) | |
structure = pdb_parser.get_structure(search, pdb_file_content) | |
pdb_data_list = pdb_data.split('\n') | |
pdb_data_list = [i for i in pdb_data_list if i.startswith('DBREF')] | |
pdb_data_list = [[list(filter(None, i.split(' '))) for j in i.split(' ') if j == 'UNP'] for | |
i in pdb_data_list] | |
pdb_data_list = [i for i in pdb_data_list if i != []] | |
header = structure.header | |
for unp in pdb_data_list: | |
if (unp[0][5] == 'UNP') & (unp[0][6].split('-')[0] in up_list): | |
pdb_info.at[index, 'uniprotID'] = unp[0][6].split('-')[0] | |
pdb_info.at[index, 'pdbID'] = unp[0][1].upper() | |
pdb_info.at[index, 'chain'] = unp[0][2].upper() | |
pdb_info.at[index, 'resolution'] = header.get('resolution', 'N/A') | |
pdb_info.at[index, 'start'] = unp[0][8] | |
pdb_info.at[index, 'end'] = unp[0][9] | |
index += 1 | |
except: | |
continue | |
pdb_info.replace({'None': np.NaN}, inplace=True) | |
print('PDB file processing finished..') | |
return pdb_info | |
from add_sasa import * | |
def downloadPDB(pdbID, path_to_output_files): | |
pdbl = PDBList() | |
existing_pdb = list(Path(path_to_output_files / 'pdb_structures').glob("*")) | |
existing_pdb = [str(i) for i in existing_pdb] | |
existing_pdb = [i.split('/')[-1].split('.')[0].lower() for i in existing_pdb] | |
if pdbID not in existing_pdb: | |
# print(f'Downloading PDB file for {pdbID.upper()}..') | |
file = pdbl.retrieve_pdb_file(pdbID, pdir=Path(path_to_output_files / 'pdb_structures'), file_format="pdb") | |
fix_filename(file) | |
file = fix_filename(file) | |
file = Path(path_to_output_files / 'pdb_structures' / f'{pdbID}.pdb') | |
else: | |
print(f'PDB file for {pdbID.upper()} exists..') | |
file = Path(path_to_output_files / 'pdb_structures' / f'{pdbID}.pdb') | |
fix_filename(file) | |
file = fix_filename(file) | |
file = Path(path_to_output_files / 'pdb_structures' / f'{pdbID}.pdb') | |
existing_free_sasa = list(Path(path_to_output_files / 'freesasa_files').glob("*")) | |
existing_free_sasa = [str(i) for i in existing_free_sasa] | |
existing_free_sasa = [i.split('/')[-1].split('.')[0] for i in existing_free_sasa] | |
if pdbID not in existing_free_sasa: | |
run_freesasa(file, Path(path_to_output_files / 'freesasa_files' / f'{pdbID}.txt'), include_hetatms=True, | |
outdir=None, force_rerun=False, file_type='pdb') | |
return file | |
def processFile(data, path_to_output_files): | |
for i in data.index: | |
protein = data.at[i,'uniprotID'] | |
pdbID = data.at[i,'pdbID'].lower() | |
chain = data.at[i,'chain'] | |
pos = int(data.at[i, 'pos']) | |
wt = data.at[i, 'wt'] | |
url = f'https://files.rcsb.org/download/{pdbID}.pdb' | |
response = requests.get(url) | |
if response.status_code == 200: | |
with open(f'{path_to_output_files}/pdb_structures/{pdbID}.pdb', 'w') as f: | |
f.write(response.text) | |
print(f"Downloaded {pdbID}.pdb successfully.") | |
else: | |
print(f"Failed to download {pdbID}.pdb. Status code: {response.status_code}") | |
file = Path(path_to_output_files / 'pdb_structures' / f'{pdbID}.pdb') | |
run_freesasa(file, Path(path_to_output_files / 'freesasa_files' / f'{pdbID}.txt'), include_hetatms=True, | |
outdir=None, force_rerun=False, file_type='pdb') | |
filename = Path(path_to_output_files / 'freesasa_files' / f'{pdbID}.txt') | |
data.loc[i, 'sasa'] = sasa(protein, pos, wt, 1, filename, path_to_output_files,file_type='pdb') | |
newCol = {} | |
with open(file, encoding="utf8") as f: | |
for line in f.readlines(): | |
if line[0:4].strip() == 'ATOM' and line[13:15].strip() == 'CA' and line[21].upper() == chain.upper(): | |
coords= [line[31:38].strip(), line[39:46].strip(), line[47:54].strip()] | |
resnums_for_sasa = line[22:26].strip() | |
newCol[resnums_for_sasa] = coords | |
elif line[0:4].strip() == 'ATOM' and line[13:15].strip() == 'CA' and line[21] == ' ': | |
coords= [line[31:38].strip(), line[39:46].strip(), line[47:54].strip()] | |
resnums_for_sasa = line[22:26].strip() | |
newCol[resnums_for_sasa] = coords | |
data.at[i, 'coordinates'] = json.dumps(newCol) | |
return data | |
def distance(x1, y1, z1, x2, y2, z2): | |
d = math.sqrt(math.pow(x2 - x1, 2) + | |
math.pow(y2 - y1, 2) + | |
math.pow(z2 - z1, 2) * 1.0) | |
return d | |
def find_distance(coordMut, coordAnnot): | |
if coordMut != np.NaN: | |
try: | |
dist = distance(float(coordMut[0]), float(coordMut[1]), float(coordMut[2]), float(coordAnnot[0]), | |
float(coordAnnot[1]), float(coordAnnot[2])) | |
return "%.2f" % dist | |
except: | |
ValueError | |
dist = 'nan' | |
return dist | |
else: | |
return np.NaN | |
def domainDistance(domStart, domEnd, coordinates, mutationPosition, matchList, posOnPDB): | |
resList = list(range(domStart, domEnd)) | |
domainDistanceList = [] | |
for i in resList: | |
try: | |
domainPos = ast.literal_eval(matchList)[str(i)] | |
coordMut = coordinates[str(posOnPDB)] | |
coordDomain = coordinates[str(domainPos)] | |
distance = find_distance(coordMut, coordDomain) | |
domainDistanceList.append(distance) | |
return min(domainDistanceList) | |
except KeyError: | |
domainDistanceList = np.NaN | |
return np.NaN | |
def match3D(data): | |
data.fillna(np.NaN, inplace=True) | |
for i in data.index: | |
coordinates = ast.literal_eval(data.at[i, 'coordinates']) | |
pos = str(data.at[i, 'pos']) | |
matchList = data.at[i, 'MATCHDICT'] | |
try: | |
posOnPDB = ast.literal_eval(data.at[i, 'MATCHDICT'])[pos] | |
coordMut = coordinates[str(posOnPDB)] | |
if data.at[i, 'distance'] == -1000: | |
domStart = data.at[i, 'domStart'] | |
domEnd = data.at[i, 'domEnd'] | |
data.at[i, 'distance'] = domainDistance(domStart, domEnd, coordinates, pos, matchList, posOnPDB) | |
except KeyError: | |
posOnPDB = np.NaN | |
coordMut = np.NaN | |
data.at[i, 'distance'] = np.NaN | |
for col in UNIPROT_ANNOTATION_COLS[0:30]: | |
allDist = [] | |
if (data.at[i, col] != np.NaN) & (data.at[i, col] != 'hit') & (data.at[i, col] != '[]')& (data.at[i, col] != []): | |
annotation_list = ast.literal_eval(data.at[i, col]) | |
integer_list = [int(element) for element in annotation_list if element != 'null'] | |
for annotPosition in integer_list: | |
coordAnnot = coordinates[str(annotPosition)] | |
distance = find_distance(coordMut, coordAnnot) | |
allDist.append(distance) | |
if len(allDist)>0: | |
data.at[i, col] = min(allDist) | |
return data | |
def domainDistanceModels(domStart, domEnd, coordinates, mutationPosition): | |
resList = list(range(domStart, domEnd)) | |
domainDistanceList = [] | |
for i in resList: | |
try: | |
coordMut = (coordinates)[mutationPosition] | |
coordDomain = (coordinates)[i] | |
distance = find_distance(coordMut, coordDomain) | |
domainDistanceList.append(distance) | |
return min(domainDistanceList) | |
except KeyError: | |
domainDistanceList = np.NaN | |
return np.NaN | |
def match3DModels(data): | |
data.fillna(np.NaN, inplace=True) | |
for i in data.index: | |
pos = int(data.at[i, 'pos']) | |
coords = data.at[i, 'coordinates'] | |
if type(coords) != dict: | |
coordinates = ast.literal_eval(coords) | |
else: | |
coordinates = coords | |
pass | |
coordMut = coordinates[pos] | |
if data.at[i, 'distance'] == -1000: | |
domStart = data.at[i, 'domStart'] | |
domEnd = data.at[i, 'domEnd'] | |
data.at[i, 'distance'] = domainDistanceModels(domStart, domEnd, coordinates, pos) | |
for col in UNIPROT_ANNOTATION_COLS[0:30]: | |
allDist = [] | |
if (data.at[i, col] != np.NaN) & (data.at[i, col] != 'hit') & (data.at[i, col] != '[]')& (data.at[i, col] != []): | |
annotation_list = ast.literal_eval(data.at[i, col]) | |
integer_list = [int(element) for element in annotation_list] | |
for annotPosition in integer_list: | |
try: | |
coordAnnot = coordinates[annotPosition] | |
except KeyError: | |
coordAnnot = [] | |
distance = find_distance(coordMut, coordAnnot) | |
allDist.append(distance) | |
if len(allDist)>0: | |
allDist = [float(i) for i in allDist] | |
data.at[i, col] = min(allDist) | |
return data | |
def selectMaxAnnot(data): | |
if len(data) >0: | |
for i in data.index: | |
total = 0 | |
nanCounter = 0 | |
hitCounter = 0 | |
for col in UNIPROT_ANNOTATION_COLS[0:30]: | |
if (str(data.at[i,col]) != 'nan') and (data.at[i,col] != '[]') and (data.at[i,col] != 'hit') and (data.at[i,col] != ''): | |
total += float(data.at[i,col]) | |
elif (str(data.at[i,col]) == 'nan') or (data.at[i,col] == '[]') or (data.at[i,col] != ''): | |
nanCounter +=1 | |
if data.at[i,col] == 'hit': | |
hitCounter += 1 | |
if hitCounter > 0: | |
data.at[i, 'hitTotal'] = hitCounter | |
else: | |
data.at[i, 'hitTotal'] = np.NaN | |
if nanCounter != 30: | |
data.at[i, 'annotTotal'] = total | |
else: | |
data.at[i, 'annotTotal'] = np.NaN | |
else: | |
data['annotTotal'] = np.NaN | |
return data | |