# -*- coding: utf-8 -*- """ Created on Tue Oct 24 10:57:51 2023 @author: DELL """ import numpy as np import pandas as pd from tqdm import tqdm from matchms.Spectrum import Spectrum from matchms.similarity import CosineGreedy from rdkit import Chem, DataStructs from rdkit.Chem import AllChem from rdkit.Chem import rdFMCS import base64 import numpy as np import pandas as pd from tqdm import tqdm from matchms import Spectrum from core.identification import spectrum_processing def load_MS_DIAL_Peaklist(filename, exclude_precursor = False): """ Load aligned result exported by MS-DIAL and convert into a set of matchms::spectrum object. Arguments: filename: str, the path of the MS-DIAL export. Returns: List of matchms::spectrum. """ if filename.split('.')[-1] == 'csv': data = pd.read_csv(filename) elif filename.split('.')[-1] == 'txt': data = pd.read_csv(filename, '\t') else: return None output = [] for i in tqdm(data.index): s = str(data.loc[i, 'MSMS spectrum']) precursor_mz = float(data.loc[i, 'Precursor m/z']) if s == 'nan': continue else: s = s.split(' ') mz = np.array([float(ss.split(':')[0]) for ss in s if ':' in ss]) intensity = np.array([float(ss.split(':')[1]) for ss in s if ':' in ss]) if exclude_precursor: k = np.where(np.logical_and(mz <= precursor_mz - 0.1, intensity > 0))[0] else: k = np.where(intensity > 0)[0] mz = mz[k] intensity = intensity[k] intensity /= (np.max(intensity) + 10 **-10) ww = np.where(intensity >= 0.05)[0] mz = mz[ww] intensity = intensity[ww] rt = float(data.loc[i, 'RT (min)']) index = 'Peak_' + str(data.loc[i, 'PeakID']) smiles = str(data.loc[i, 'SMILES']) adduct = str(data.loc[i, 'Adduct']) isotope = str(data.loc[i, 'MS1 isotopes']) isotope = isotope.split(' ') isotope_mz = np.array([float(ss.split(':')[0]) for ss in isotope]) isotope_intensity = np.array([float(ss.split(':')[1]) for ss in isotope]) if len(mz) <= 1: continue obj = Spectrum(mz = mz, intensities = intensity, metadata={"precursor_mz": precursor_mz, "peak_index": index, "rt": rt, "smiles": smiles, "adduct": adduct, "isotope_mz": base64.b64encode(str(isotope_mz).encode("ascii")), "isotope_intensity": base64.b64encode(str(isotope_intensity).encode("ascii"))}) output.append(spectrum_processing(obj)) return output def load_MS_DIAL_Alginment(filename, exclude_precursor = False, sample_cols = []): """ Load aligned result exported by MS-DIAL and convert into a set of matchms::spectrum object. Arguments: filename: str, the path of the MS-DIAL export. Returns: List of matchms::spectrum. Example: filename = 'example/Plasma/ms_dial_positive.csv' load_MS_DIAL_Alginment(filename) """ if filename.split('.')[-1] == 'csv': data = pd.read_csv(filename) elif filename.split('.')[-1] == 'txt': data = pd.read_csv(filename, '\t') else: return None output = [] for i in tqdm(data.index): s = str(data.loc[i, 'MS/MS spectrum']) precursor_mz = float(data.loc[i, 'Average Mz']) if s == 'nan': continue else: s = s.split(' ') mz = np.array([float(ss.split(':')[0]) for ss in s if ':' in ss]) intensity = np.array([float(ss.split(':')[1]) for ss in s if ':' in ss]) if exclude_precursor: k = np.where(np.logical_and(mz <= precursor_mz - 0.1, intensity > 0))[0] else: k = np.where(intensity > 0)[0] mz = mz[k] intensity = intensity[k] intensity /= (np.max(intensity) + 10 **-10) ww = np.where(intensity >= 0.05)[0] mz = mz[ww] intensity = intensity[ww] rt = float(data.loc[i, 'Average Rt(min)']) index = 'Peak_' + str(data.loc[i, 'Alignment ID']) smiles = str(data.loc[i, 'SMILES']) adduct = str(data.loc[i, 'Adduct type']) isotope = str(data.loc[i, 'MS1 isotopic spectrum']) isotope = isotope.split(' ') isotope_mz = np.array([float(ss.split(':')[0]) for ss in isotope]) isotope_intensity = np.array([float(ss.split(':')[1]) for ss in isotope]) sample_abundance = np.array(data.loc[i, sample_cols]) precursor_intensity = np.nanmean(sample_abundance) obj = Spectrum(mz = mz, intensities = intensity, metadata={"precursor_mz": precursor_mz, "peak_index": index, "rt": rt, "smiles": smiles, "adduct": adduct, "precursor_intensity": precursor_intensity, "isotope_mz": base64.b64encode(str(isotope_mz).encode("ascii")), "isotope_intensity": base64.b64encode(str(isotope_intensity).encode("ascii"))}) output.append(spectrum_processing(obj)) return output def remove_duplicate(spectrums): new_spectrums = [] rt, mz, iontype, intensities = [], [], [], [] for s in tqdm(spectrums): [rt_, mz_, iontype_, intensity_, adduct_] = [s.metadata[k] for k in ['retention_time', 'precursor_mz', 'ionmode', 'precursor_intensity', 'adduct']] if adduct_ not in ['[M+H]+', '[M-H]-']: continue wh = np.logical_and( np.abs(np.array(rt) - rt_) < 18, np.abs(np.array(mz) - mz_) < 0.01, np.array([i == iontype_ for i in iontype])) wh = np.where(wh)[0] if len(wh) > 0: w = wh[0] if intensity_ >= intensities[w]: new_spectrums[w] = s intensities[w] = intensity_ else: continue else: rt.append(rt_) mz.append(mz_) iontype.append(iontype_) intensities.append(intensity_) new_spectrums.append(spectrum_processing(s)) return new_spectrums def save_as_sirius(spectrums, export_path): for i, s in enumerate(spectrums): energy = 35 compound = s.get('compound_name') parentmass = s.get('parent_mass') ionization = s.get('adduct') isotope_mz = base64.b64decode(s.get('isotope_mz')).decode("ascii").replace('\n', '') isotope_intensity = base64.b64decode(s.get('isotope_intensity')).decode("ascii").replace('\n', '') isotope_mz = [float(s) for s in isotope_mz.replace('[', '').replace(']', '').split(' ') if s != ''] isotope_intensity = [float(s) for s in isotope_intensity.replace('[', '').replace(']', '').split(' ') if s != ''] with open(export_path + '/' + compound + '.ms', 'w') as ms: ms.write('>compound {}\n'.format(compound)) ms.write('>ionization {}\n'.format(ionization)) ms.write('\n') ms.write('>collision {}\n'.format(energy)) for p in range(len(s.mz)): mz = s.mz[p] intensity = s.intensities[p] ms.write('{} {}\n'.format(mz, intensity)) ms.write('\n\n') ms.write('>ms1peaks\n') for p in range(len(isotope_mz)): mz = isotope_mz[p] intensity = isotope_intensity[p] ms.write('{} {}\n'.format(mz, intensity)) ms.write('\n') pass def save_as_msfinder(spectrums, export_path): for i, s in enumerate(spectrums): compound = s.get('compound_name') precursor_mz = s.get('precursor_mz') ionmode = s.get('ionmode').capitalize() ionization = s.get('adduct') isotope_mz = base64.b64decode(s.get('isotope_mz')).decode("ascii").replace('\n', '') isotope_intensity = base64.b64decode(s.get('isotope_intensity')).decode("ascii").replace('\n', '') isotope_mz = [float(s) for s in isotope_mz.replace('[', '').replace(']', '').split(' ') if s != ''] isotope_intensity = [float(s) for s in isotope_intensity.replace('[', '').replace(']', '').split(' ') if s != ''] with open(export_path + '/' + compound + '.mat', 'w') as ms: ms.write('NAME: {}\n'.format(compound)) ms.write('PRECURSORMZ: {}\n'.format(precursor_mz)) ms.write('PRECURSORTYPE: {}\n'.format(ionization)) ms.write('IONMODE: {}\n'.format(ionmode)) ms.write('\n') ms.write('MSTYPE: MS1\n') ms.write('Num Peaks: {}\n'.format(len(isotope_mz))) for p in range(len(isotope_mz)): mz = isotope_mz[p] intensity = isotope_intensity[p] ms.write('{}\t{}\n'.format(mz, intensity)) ms.write('\n') ms.write('MSTYPE: MS2\n') ms.write('Num Peaks: {}\n'.format(len(s.mz))) for p in range(len(s.mz)): mz = s.mz[p] intensity = s.intensities[p] ms.write('{}\t{}\n'.format(mz, intensity)) ms.write('\n') pass def spectrum_to_vector(s, min_mz = 0, max_mz = 1000, scale = 0.1): """ Convert spectrum object to vector. Arguments: s: matchms::spectrum min_mz: float, start of mz value. max_mz: float, end of mz value. scale: float, scale of mz bin. Returns: Numpy array of spectrum. """ bit = round((1 + max_mz - min_mz) / scale) vec = np.zeros(bit) if s is None: return vec else: k = np.logical_and(min_mz <= s.mz, s.mz <= max_mz) idx = np.round((s.mz[k] - min_mz) / scale).astype(int) val = s.intensities[k] vec[idx] = val vec = vec / (np.max(vec) + 10 ** -6) return vec def consensus_spectrum(spectrums, mz_window = 0.2): tot_array = [] for i, s in enumerate(spectrums): mz, intensity = s.peaks.mz, s.peaks.intensities array = np.vstack((mz, intensity, np.repeat(i, len(mz)))).T tot_array.append(array) i = 0 mz, intensity = [], [] tot_array = np.vstack(tot_array) while True: if i >= len(tot_array): break m = tot_array[i,0] j = np.searchsorted(tot_array[:,0], m + mz_window) a = tot_array[i:j, 0] b = tot_array[i:j, 1] a = np.round(np.sum(a * b) / np.sum(b), 5) b = np.round(np.max(b), 5) mz.append(a) intensity.append(b) i = j output = np.vstack((mz, intensity)).T return output