|
|
|
""" |
|
Created on Tue Aug 29 10:34:38 2023 |
|
|
|
@author: DELL |
|
""" |
|
|
|
|
|
import base64 |
|
import numpy as np |
|
import pandas as pd |
|
from rdkit import Chem |
|
from rdkit.Chem import DataStructs, AllChem |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
import matchms.filtering as msfilters |
|
from molmass import Formula |
|
from matchms import calculate_scores |
|
from matchms.similarity import CosineGreedy |
|
from spec2vec import SpectrumDocument |
|
from spec2vec.vector_operations import calc_vector |
|
|
|
from core.pycdk import IsotopeFromString, IsotopeSimilarity |
|
from core.pubchem import retrieve_by_formula, retrieve_by_exact_mass |
|
from core.pubchem import retrieve_by_formula_database, retrieve_by_exact_mass_database |
|
|
|
|
|
def spectrum_processing(s): |
|
"""This is how one would typically design a desired pre- and post- |
|
processing pipeline.""" |
|
s = msfilters.default_filters(s) |
|
if ('adduct_type' in s.metadata.keys()) and ('adduct' not in s.metadata.keys()): |
|
s.set('adduct', s.get('adduct_type')) |
|
s = msfilters.correct_charge(s) |
|
s = msfilters.add_parent_mass(s) |
|
s = msfilters.add_losses(s) |
|
s = msfilters.normalize_intensities(s) |
|
s = msfilters.select_by_mz(s, mz_from=0, mz_to=1000) |
|
return s |
|
|
|
|
|
def get_formula_mass(formula): |
|
formula = formula.replace('+','').replace('-','') |
|
f = Formula(formula) |
|
return f.isotope.mass |
|
|
|
|
|
def search_candidates(s, database, ms1_tolerence = 20): |
|
if 'formula' in s.metadata.keys(): |
|
formula = s.metadata['formula'] |
|
candidate = retrieve_by_formula_database(formula, database) |
|
return candidate |
|
if len(candidate) == 0: |
|
try: |
|
candidate = retrieve_by_formula(formula) |
|
return candidate |
|
except: |
|
return None |
|
|
|
elif 'parent_mass' in s.metadata.keys(): |
|
mass = float(s.metadata['parent_mass']) |
|
candidate = retrieve_by_exact_mass_database(mass, database, ppm = ms1_tolerence) |
|
return candidate |
|
if len(candidate) == 0: |
|
try: |
|
candidate = retrieve_by_exact_mass(mass) |
|
return candidate |
|
except: |
|
return None |
|
else: |
|
return None |
|
|
|
|
|
def calc_isotope_score(s, candidate_mol): |
|
isotope_score = [] |
|
if s.get('isotope_mz') and s.get('isotope_intensity'): |
|
isotope_mz = base64.b64decode(s.get('isotope_mz').split("'")[1]).decode("ascii").replace('\n', '') |
|
isotope_intensity = base64.b64decode(s.get('isotope_intensity').split("'")[1]).decode("ascii").replace('\n', '') |
|
isotope_mz = [float(s) for s in isotope_mz.replace('[', '').replace(']', '').split(' ') if s != ''] |
|
isotope_intensity = [float(s) for s in isotope_intensity.replace('[', '').replace(']', '').split(' ') if s != ''] |
|
isotope_mz = np.array(isotope_mz) |
|
|
|
adduct_mz = isotope_mz[np.argmax(isotope_intensity)] - s.metadata['parent_mass'] |
|
isotope_mz = isotope_mz - adduct_mz |
|
|
|
isotope_intensity = np.array(isotope_intensity) |
|
isotope_intensity = isotope_intensity / max(isotope_intensity) |
|
isotope_pattern = np.vstack((isotope_mz, isotope_intensity)).T |
|
else: |
|
return None |
|
for i in range(len(candidate_mol)): |
|
try: |
|
formula = AllChem.CalcMolFormula(candidate_mol[i]) |
|
isotope_ref = IsotopeFromString(formula, minI=0.001) |
|
except: |
|
isotope_score.append(0) |
|
isotope_score.append(IsotopeSimilarity(isotope_pattern, isotope_ref, 10)) |
|
return isotope_score |
|
|
|
|
|
def calc_deepmass_score(s, candidate_mol, reference_mol, query_vector, reference_vector): |
|
get_fp = lambda x: AllChem.GetMorganFingerprintAsBitVect(x, radius=2) |
|
get_sim = lambda x, y: DataStructs.FingerprintSimilarity(x, y) |
|
get_corr = lambda x, y: cosine_similarity([x], [y])[0][0] |
|
|
|
k, reference_fp = [], [] |
|
for i, m in enumerate(reference_mol): |
|
try: |
|
reference_fp.append(get_fp(m)) |
|
k.append(i) |
|
except: |
|
pass |
|
if len(k) != len(reference_mol): |
|
k = np.array(k) |
|
reference_mol = np.array(reference_mol)[k] |
|
reference_vector = np.array(reference_vector)[k,:] |
|
|
|
deepmass_score = [] |
|
for i in range(len(candidate_mol)): |
|
try: |
|
candidate_fp_i = get_fp(candidate_mol[i]) |
|
except: |
|
deepmass_score.append(0) |
|
candidate_vecsim_i = [get_corr(query_vector, reference_vector_) for reference_vector_ in reference_vector] |
|
candidate_vecsim_i = np.array(candidate_vecsim_i) |
|
candidate_fpsim_i = [get_sim(candidate_fp_i, reference_fp_) for reference_fp_ in reference_fp] |
|
candidate_fpsim_i = np.array(candidate_fpsim_i) |
|
top20 = np.argsort(-np.array(candidate_fpsim_i))[:20] |
|
candidate_score_i = np.sqrt(np.sum(candidate_vecsim_i[top20] * candidate_fpsim_i[top20])) |
|
deepmass_score.append(candidate_score_i / 20) |
|
deepmass_score = np.array(deepmass_score) |
|
deepmass_score /= np.max(deepmass_score) |
|
return deepmass_score |
|
|
|
|
|
def calc_wt_score(s, candidate_mol): |
|
candidate_mass = [AllChem.CalcExactMolWt(m) for m in candidate_mol] |
|
diff_mass = np.array([abs(m - float(s.get['parent_mass'])) for m in candidate_mass]) |
|
wt_score = 1 - 50000 * diff_mass / float(s.get['parent_mass']) |
|
return wt_score |
|
|
|
|
|
def identify_unknown(s, p, model, references, database): |
|
candidate = search_candidates(s, database) |
|
if candidate is None: |
|
return s |
|
if len(candidate) == 0: |
|
return s |
|
candidate_mol = [Chem.MolFromSmiles(s) for s in candidate['CanonicalSMILES']] |
|
query_vector = calc_vector(model, SpectrumDocument(s, n_decimals=2)) |
|
|
|
xq = np.array(query_vector).astype('float32') |
|
I, D = p.knn_query(xq, 300) |
|
|
|
reference_spectrum = np.array(references)[I[0,:]] |
|
reference_smile = [s.metadata['smiles'] for s in reference_spectrum] |
|
reference_mol = [Chem.MolFromSmiles(s) for s in reference_smile] |
|
reference_vector = np.array(p.get_items(I[0, :])) |
|
|
|
candidate_deepmass_score = calc_deepmass_score(s, candidate_mol, reference_mol, query_vector, reference_vector) |
|
candidate['DeepMass Score'] = np.round(candidate_deepmass_score, 4) |
|
|
|
if s.get('formula') is None: |
|
candidate_wt_score = calc_wt_score(s, candidate_mol) |
|
candidate['MolWt Score'] = np.round(candidate_wt_score, 4) |
|
if s.get('isotope_mz') and s.get('isotope_intensity'): |
|
candidate_isotopic_score = calc_isotope_score(s, candidate_mol) |
|
candidate['Isotope Score'] = np.round(candidate_isotopic_score, 4) |
|
candidate['Consensus Score'] = 0.8*candidate['DeepMass Score'] + 0.1*candidate['Isotope Score'] + 0.1*candidate['MolWt Score'] |
|
else: |
|
candidate['Consensus Score'] = 0.8*candidate['DeepMass Score'] + 0.2*candidate['MolWt Score'] |
|
candidate = candidate.sort_values('Consensus Score', ignore_index = True, ascending = False) |
|
else: |
|
candidate = candidate.sort_values('DeepMass Score', ignore_index = True, ascending = False) |
|
|
|
s.set('annotation', candidate) |
|
s.set('reference', reference_spectrum) |
|
return s |
|
|
|
|
|
def match_spectrum(s, precursors, references): |
|
precursor = s.get('precursor_mz') |
|
if precursor is None: |
|
return s |
|
lb, ub = precursor - 0.05, precursor + 0.05 |
|
li = np.searchsorted(precursors, lb) |
|
ui = np.searchsorted(precursors, ub) |
|
if ui <= li: |
|
return s |
|
match_scores = calculate_scores(references = references[li:ui], queries = [s], similarity_function = CosineGreedy()) |
|
|
|
match_scores = np.array([s[0].tolist()[0] for s in match_scores.scores]) |
|
w = np.argsort(-match_scores) |
|
match_scores = match_scores[w] |
|
reference = np.array(references)[li:ui][w] |
|
|
|
annotation, inchikeys = [], [] |
|
for i, r in enumerate(reference): |
|
mol = Chem.MolFromSmiles(r.get('smiles')) |
|
score = match_scores[i] |
|
if mol is None: |
|
continue |
|
inchikey = r.get('inchikey') |
|
if inchikey == '': |
|
continue |
|
title = r.get('compound_name') |
|
smiles = Chem.MolToSmiles(mol) |
|
try: |
|
formula = AllChem.CalcMolFormula(mol) |
|
except: |
|
formula = '' |
|
if inchikey not in inchikeys: |
|
inchikeys.append(inchikey) |
|
annotation.append([title, formula, smiles, inchikey, score]) |
|
annotation = pd.DataFrame(annotation, columns = ['Title', 'MolecularFormula', 'CanonicalSMILES', 'InChIKey', 'Matching Score']) |
|
|
|
if s.get('formula') is not None: |
|
annotation = annotation[annotation['MolecularFormula'] == s.get('formula')] |
|
annotation = annotation.reset_index(drop = True) |
|
s.set('annotation', annotation) |
|
s.set('reference', reference) |
|
return s |
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
''' |
|
import hnswlib |
|
import pickle |
|
import pandas as pd |
|
from matchms.importing import load_from_mgf |
|
from gensim.models import Word2Vec |
|
|
|
model = Word2Vec.load("model/Ms2Vec_allGNPSpositive.hdf5") |
|
p = hnswlib.Index(space='l2', dim=300) |
|
p.load_index('data/references_index_positive_spec2vec.bin') |
|
with open('data/references_spectrums_positive.pickle', 'rb') as file: |
|
references = pickle.load(file) |
|
references = np.array(references) |
|
precursors = [s.get('precursor_mz') for s in references] |
|
precursors = np.array(precursors) |
|
|
|
spectrums = [s for s in load_from_mgf("D:/DeepMASS2_Data_Processing/Example/CASMI/all_casmi.mgf")] |
|
s = spectrums[200] |
|
s = identify_unknown(s, p, model, reference, database) |
|
''' |
|
|