# -*- coding: utf-8 -*- """ Created on Tue Aug 29 10:34:38 2023 @author: DELL """ import base64 import numpy as np import pandas as pd from rdkit import Chem from rdkit.Chem import DataStructs, AllChem from sklearn.metrics.pairwise import cosine_similarity import matchms.filtering as msfilters from molmass import Formula from matchms import calculate_scores from matchms.similarity import CosineGreedy from spec2vec import SpectrumDocument from spec2vec.vector_operations import calc_vector from core.pycdk import IsotopeFromString, IsotopeSimilarity from core.pubchem import retrieve_by_formula, retrieve_by_exact_mass from core.pubchem import retrieve_by_formula_database, retrieve_by_exact_mass_database def spectrum_processing(s): """This is how one would typically design a desired pre- and post- processing pipeline.""" s = msfilters.default_filters(s) if ('adduct_type' in s.metadata.keys()) and ('adduct' not in s.metadata.keys()): s.set('adduct', s.get('adduct_type')) s = msfilters.correct_charge(s) s = msfilters.add_parent_mass(s) s = msfilters.add_losses(s) s = msfilters.normalize_intensities(s) s = msfilters.select_by_mz(s, mz_from=0, mz_to=1000) return s def get_formula_mass(formula): formula = formula.replace('+','').replace('-','') f = Formula(formula) return f.isotope.mass def search_candidates(s, database, ms1_tolerence = 20): if 'formula' in s.metadata.keys(): formula = s.metadata['formula'] candidate = retrieve_by_formula_database(formula, database) return candidate if len(candidate) == 0: try: candidate = retrieve_by_formula(formula) return candidate except: return None elif 'parent_mass' in s.metadata.keys(): mass = float(s.metadata['parent_mass']) candidate = retrieve_by_exact_mass_database(mass, database, ppm = ms1_tolerence) return candidate if len(candidate) == 0: try: candidate = retrieve_by_exact_mass(mass) return candidate except: return None else: return None def calc_isotope_score(s, candidate_mol): isotope_score = [] if s.get('isotope_mz') and s.get('isotope_intensity'): isotope_mz = base64.b64decode(s.get('isotope_mz').split("'")[1]).decode("ascii").replace('\n', '') isotope_intensity = base64.b64decode(s.get('isotope_intensity').split("'")[1]).decode("ascii").replace('\n', '') isotope_mz = [float(s) for s in isotope_mz.replace('[', '').replace(']', '').split(' ') if s != ''] isotope_intensity = [float(s) for s in isotope_intensity.replace('[', '').replace(']', '').split(' ') if s != ''] isotope_mz = np.array(isotope_mz) adduct_mz = isotope_mz[np.argmax(isotope_intensity)] - s.metadata['parent_mass'] isotope_mz = isotope_mz - adduct_mz isotope_intensity = np.array(isotope_intensity) isotope_intensity = isotope_intensity / max(isotope_intensity) isotope_pattern = np.vstack((isotope_mz, isotope_intensity)).T else: return None for i in range(len(candidate_mol)): try: formula = AllChem.CalcMolFormula(candidate_mol[i]) isotope_ref = IsotopeFromString(formula, minI=0.001) except: isotope_score.append(0) isotope_score.append(IsotopeSimilarity(isotope_pattern, isotope_ref, 10)) return isotope_score def calc_deepmass_score(s, candidate_mol, reference_mol, query_vector, reference_vector): get_fp = lambda x: AllChem.GetMorganFingerprintAsBitVect(x, radius=2) get_sim = lambda x, y: DataStructs.FingerprintSimilarity(x, y) get_corr = lambda x, y: cosine_similarity([x], [y])[0][0] k, reference_fp = [], [] for i, m in enumerate(reference_mol): try: reference_fp.append(get_fp(m)) k.append(i) except: pass if len(k) != len(reference_mol): k = np.array(k) reference_mol = np.array(reference_mol)[k] reference_vector = np.array(reference_vector)[k,:] deepmass_score = [] for i in range(len(candidate_mol)): try: candidate_fp_i = get_fp(candidate_mol[i]) except: deepmass_score.append(0) candidate_vecsim_i = [get_corr(query_vector, reference_vector_) for reference_vector_ in reference_vector] candidate_vecsim_i = np.array(candidate_vecsim_i) candidate_fpsim_i = [get_sim(candidate_fp_i, reference_fp_) for reference_fp_ in reference_fp] candidate_fpsim_i = np.array(candidate_fpsim_i) top20 = np.argsort(-np.array(candidate_fpsim_i))[:20] candidate_score_i = np.sqrt(np.sum(candidate_vecsim_i[top20] * candidate_fpsim_i[top20])) deepmass_score.append(candidate_score_i / 20) deepmass_score = np.array(deepmass_score) deepmass_score /= np.max(deepmass_score) return deepmass_score def calc_wt_score(s, candidate_mol): candidate_mass = [AllChem.CalcExactMolWt(m) for m in candidate_mol] diff_mass = np.array([abs(m - float(s.get['parent_mass'])) for m in candidate_mass]) wt_score = 1 - 50000 * diff_mass / float(s.get['parent_mass']) return wt_score def identify_unknown(s, p, model, references, database): candidate = search_candidates(s, database) if candidate is None: return s if len(candidate) == 0: return s candidate_mol = [Chem.MolFromSmiles(s) for s in candidate['CanonicalSMILES']] query_vector = calc_vector(model, SpectrumDocument(s, n_decimals=2)) xq = np.array(query_vector).astype('float32') I, D = p.knn_query(xq, 300) reference_spectrum = np.array(references)[I[0,:]] reference_smile = [s.metadata['smiles'] for s in reference_spectrum] reference_mol = [Chem.MolFromSmiles(s) for s in reference_smile] reference_vector = np.array(p.get_items(I[0, :])) candidate_deepmass_score = calc_deepmass_score(s, candidate_mol, reference_mol, query_vector, reference_vector) candidate['DeepMass Score'] = np.round(candidate_deepmass_score, 4) if s.get('formula') is None: candidate_wt_score = calc_wt_score(s, candidate_mol) candidate['MolWt Score'] = np.round(candidate_wt_score, 4) if s.get('isotope_mz') and s.get('isotope_intensity'): candidate_isotopic_score = calc_isotope_score(s, candidate_mol) candidate['Isotope Score'] = np.round(candidate_isotopic_score, 4) candidate['Consensus Score'] = 0.8*candidate['DeepMass Score'] + 0.1*candidate['Isotope Score'] + 0.1*candidate['MolWt Score'] else: candidate['Consensus Score'] = 0.8*candidate['DeepMass Score'] + 0.2*candidate['MolWt Score'] candidate = candidate.sort_values('Consensus Score', ignore_index = True, ascending = False) else: candidate = candidate.sort_values('DeepMass Score', ignore_index = True, ascending = False) s.set('annotation', candidate) s.set('reference', reference_spectrum) return s def match_spectrum(s, precursors, references): precursor = s.get('precursor_mz') if precursor is None: return s lb, ub = precursor - 0.05, precursor + 0.05 li = np.searchsorted(precursors, lb) ui = np.searchsorted(precursors, ub) if ui <= li: return s match_scores = calculate_scores(references = references[li:ui], queries = [s], similarity_function = CosineGreedy()) # print(match_scores.scores) match_scores = np.array([s[0].tolist()[0] for s in match_scores.scores]) w = np.argsort(-match_scores) match_scores = match_scores[w] reference = np.array(references)[li:ui][w] annotation, inchikeys = [], [] for i, r in enumerate(reference): mol = Chem.MolFromSmiles(r.get('smiles')) score = match_scores[i] if mol is None: continue inchikey = r.get('inchikey') if inchikey == '': continue title = r.get('compound_name') smiles = Chem.MolToSmiles(mol) try: formula = AllChem.CalcMolFormula(mol) except: formula = '' if inchikey not in inchikeys: inchikeys.append(inchikey) annotation.append([title, formula, smiles, inchikey, score]) annotation = pd.DataFrame(annotation, columns = ['Title', 'MolecularFormula', 'CanonicalSMILES', 'InChIKey', 'Matching Score']) if s.get('formula') is not None: annotation = annotation[annotation['MolecularFormula'] == s.get('formula')] annotation = annotation.reset_index(drop = True) s.set('annotation', annotation) s.set('reference', reference) return s if __name__ == '__main__': ''' import hnswlib import pickle import pandas as pd from matchms.importing import load_from_mgf from gensim.models import Word2Vec model = Word2Vec.load("model/Ms2Vec_allGNPSpositive.hdf5") p = hnswlib.Index(space='l2', dim=300) p.load_index('data/references_index_positive_spec2vec.bin') with open('data/references_spectrums_positive.pickle', 'rb') as file: references = pickle.load(file) references = np.array(references) precursors = [s.get('precursor_mz') for s in references] precursors = np.array(precursors) spectrums = [s for s in load_from_mgf("D:/DeepMASS2_Data_Processing/Example/CASMI/all_casmi.mgf")] s = spectrums[200] s = identify_unknown(s, p, model, reference, database) '''