Spaces:
Paused
Paused
| # -*- coding: utf-8 | |
| """Python wrapper for CDK descriptors and fingerprints""" | |
| from __future__ import annotations | |
| import io | |
| import multiprocessing | |
| import os | |
| import subprocess | |
| import warnings | |
| from copy import deepcopy | |
| from enum import Enum, auto | |
| from subprocess import Popen, PIPE | |
| from typing import Iterable, List, Optional | |
| import more_itertools | |
| import numpy as np | |
| import pandas as pd | |
| from bounded_pool_executor import BoundedProcessPoolExecutor | |
| from rdkit import Chem | |
| from rdkit.rdBase import BlockLogs | |
| from .utils import install_java, mktempfile, needsHs | |
| class FPType(Enum): | |
| FP = auto() | |
| ExtFP = auto() | |
| EStateFP = auto() | |
| GraphFP = auto() | |
| MACCSFP = auto() | |
| PubchemFP = auto() | |
| SubFP = auto() | |
| KRFP = auto() | |
| AP2DFP = auto() | |
| HybridFP = auto() | |
| LingoFP = auto() | |
| SPFP = auto() | |
| SigFP = auto() | |
| CircFP = auto() | |
| class CDK: | |
| """Wrapper to obtain molecular descriptor from CDK.""" | |
| lock = multiprocessing.RLock() # Ensure installation of JRE is thread safe | |
| # Path to the JAR file | |
| _jarfile = os.path.abspath(os.path.join(__file__, os.pardir, 'CDKdesc.jar')) | |
| def __init__(self, ignore_3D: bool = True, fingerprint: FPType = None, nbits: int = 1024, depth: int = 6, | |
| backend_smiles: bool = False): | |
| """Instantiate a wrapper to calculate CDK molecular descriptors or a fingerprint. | |
| :param ignore_3D: whether to include 3D molecular descriptors | |
| :param fingerprint: a fingerprint type to be calculated (default: None, calculates descriptors) | |
| :param nbits: number of bits (default: 1024 unless the fingerprint has a fixed size) | |
| :param depth: depth of the fingerprint (default: 6 unless the fingerprint does not depend on depth) | |
| :param backend_smiles: use SMILES as the interchange format to discuss with the CDKdesc backend; | |
| the default (i.e. backend_smiles=False) makes use of the V2000 SD format; is ignored if ignore_3D=True. | |
| """ | |
| # Ensure the jar file exists | |
| if not os.path.isfile(self._jarfile): | |
| raise IOError('The required CDKdesc JAR file is not present. Reinstall CDK-pywrapper.') | |
| if fingerprint is not None: | |
| if not isinstance(fingerprint, FPType): | |
| raise TypeError(f'Fingerprint type not supported: {fingerprint}') | |
| self.include_3D = not ignore_3D | |
| self.fingerprint = None if fingerprint is None else fingerprint.name | |
| self.nbits = nbits | |
| self.depth = depth | |
| self.backend_smiles = backend_smiles and ignore_3D # if include_3D, then always False | |
| def calculate(self, mols: List[Chem.Mol], show_banner: bool = True, cdk_smiles: bool = False, | |
| njobs: int = 1, chunksize: Optional[int] = 1000) -> pd.DataFrame: | |
| """Calculate molecular fingerprints. | |
| :param mols: RDKit molecules for which descriptors/fingerprints should be calculated | |
| (must have 3D conformers if calculating descriptors) | |
| :param show_banner: If True, show notice on this package usage | |
| :param cdk_smiles: If True, generate the canonical SMILES (generated by CDK) of molecules parsed by CDK | |
| :param njobs: number of concurrent processes | |
| :param chunksize: number of molecules to be processed by a process; ignored if njobs is 1 | |
| :return: a pandas DataFrame containing all CDK descriptor/fingerprint values | |
| """ | |
| if show_banner: | |
| self._show_banner() | |
| if njobs < 0: | |
| njobs = os.cpu_count() - njobs + 1 | |
| # Parallelize should need be | |
| if njobs > 1: | |
| with BoundedProcessPoolExecutor(max_workers=njobs) as worker: | |
| futures = [worker.submit(self._multiproc_calculate, list(chunk), cdk_smiles) | |
| for chunk in more_itertools.batched(mols, chunksize) | |
| ] | |
| return (pd.concat([future.result() for future in futures]). | |
| reset_index(drop=True) | |
| ) | |
| # Single process | |
| return self._calculate(list(mols), cdk_smiles) | |
| def _show_banner(self): | |
| """Print info message for citing.""" | |
| print("""The Chemistry Development Kit (CDK) is a collection of modular Java libraries | |
| for processing chemical information (Cheminformatics). It can compute 14 different fingerprint | |
| types and 287 molecular descriptors (it requires 3D molecular structures for the latter). | |
| ################################### | |
| Should you publish results based on the PaDEL descriptors, | |
| please cite: | |
| Willighagen et al., (2017) J. Cheminf. 9(3), doi:10.1186/s13321-017-0220-4, | |
| May and Steinbeck., (2014) J. Cheminf., doi:10.1186/1758-2946-6-3, | |
| Steinbeck et al., (2006) Curr. Pharm. Des. 12(17):2111-2120, doi:10.2174/138161206777585274, | |
| Steinbeck et al., (2003) J. Chem. Inf. Comput. Sci. 43(2):493-500, doi:10.1021/ci025584y. | |
| ################################### | |
| """) | |
| def _prepare_command(self, mols: List[Chem.Mol], cdk_smiles: bool = False) -> str: | |
| """Create the CDK command to be run to obtain molecular descriptors. | |
| :param mols: molecules to obtained molecular descriptors of | |
| :param cdk_smiles: If True, generate the canonical SMILES (generated by CDK) of molecules parsed by CDK | |
| :return: The command to run. | |
| """ | |
| # 1) Ensure JRE is accessible | |
| with self.lock: | |
| self._java_path = install_java(19) | |
| # 2) Create temp SD v2k or SMILES file | |
| self._tmp_input = mktempfile('molecules_.smi') if self.backend_smiles else mktempfile('molecules_v2k.sd') | |
| self._n_mols = 0 | |
| self._skipped = [] | |
| self.n = 0 | |
| try: | |
| block = BlockLogs() | |
| if self.backend_smiles: | |
| writer = Chem.SmilesWriter(self._tmp_input, includeHeader=False, isomericSmiles=True, kekuleSmiles=True) | |
| else: | |
| writer = Chem.SDWriter(self._tmp_input) | |
| # Ensure V2000 as CDK cannot properly process v3000 | |
| writer.SetForceV3000(False) | |
| for i, mol in enumerate(mols): | |
| if mol is not None and isinstance(mol, Chem.Mol): | |
| if not self.backend_smiles and mol.GetNumAtoms() > 999: | |
| raise ValueError('Cannot calculate descriptors for molecules with more than 999 atoms.') | |
| # Does molecule lack hydrogen atoms? | |
| if needsHs(mol): | |
| warnings.warn('Molecule lacks hydrogen atoms: this might affect the value of calculated descriptors') | |
| # Are molecules 3D | |
| if self.include_3D: | |
| confs = list(mol.GetConformers()) | |
| if self.fingerprint is None and not (len(confs) > 0 and confs[-1].Is3D()): | |
| raise ValueError('Cannot calculate the 3D descriptors of a conformer-less molecule') | |
| writer.write(mol) | |
| self._n_mols += 1 | |
| else: | |
| self._skipped.append(i) | |
| self.n += 1 | |
| writer.close() | |
| del block | |
| except ValueError as e: | |
| # Free resources and raise error | |
| writer.close() | |
| del block | |
| os.remove(self._tmp_input) | |
| raise e from None | |
| # 3) Create command | |
| java_path = install_java(19) | |
| command_parameters = (f"-f {self.fingerprint} -nBits {self.nbits} " | |
| f"-depth {self.depth}") if self.fingerprint is not None else "" | |
| command_file = f"-{'s' if self.backend_smiles else 'i'} {self._tmp_input}" | |
| command_out_smiles = "-S" if cdk_smiles else "" | |
| command = f"{java_path} -jar {self._jarfile} {command_file} {command_parameters} {command_out_smiles}" | |
| return command | |
| def _cleanup(self) -> None: | |
| """Cleanup resources used for calculation.""" | |
| # Remove temporary files | |
| os.remove(self._tmp_input) | |
| def _run_command(self, command: str) -> pd.DataFrame: | |
| """Run the CDK command. | |
| :param command: The command to be run. | |
| """ | |
| with Popen(command.split(), stdout=PIPE, stderr=subprocess.DEVNULL) as process: | |
| values = process.stdout.read().decode() | |
| # CDK barf preventing correct parsing | |
| if 'not found' in values: | |
| # Omit error | |
| values = '\n'.join(line for line in values.split('\n') if 'not found' not in line) | |
| # Empty result file | |
| if len(values) == 0: | |
| details = self.get_details() | |
| values = pd.DataFrame(np.full((self._n_mols, details.shape[0]), np.nan), | |
| columns=details.Name) | |
| elif '{' not in values: | |
| if self.fingerprint is None: | |
| values = values.split('\n') | |
| # Ensure all columns are present in the header | |
| values[0] = (f'{"SMILES " if values[0].startswith("SMILES") else ""}' | |
| 'Fsp3 nSmallRings nAromRings nRingBlocks nAromBlocks nRings3 nRings4 nRings5 nRings6 ' | |
| 'nRings7 nRings8 nRings9 tpsaEfficiency Zagreb XLogP WPATH WPOL Wlambda1.unity Wlambda2.unity ' | |
| 'Wlambda3.unity Wnu1.unity Wnu2.unity Wgamma1.unity Wgamma2.unity Wgamma3.unity Weta1.unity ' | |
| 'Weta2.unity Weta3.unity WT.unity WA.unity WV.unity WK.unity WG.unity WD.unity WTPT-1 WTPT-2 ' | |
| 'WTPT-3 WTPT-4 WTPT-5 MW VAdjMat VABC TopoPSA LipinskiFailures nRotB topoShape geomShape ' | |
| 'PetitjeanNumber MOMI-X MOMI-Y MOMI-Z MOMI-XY MOMI-XZ MOMI-YZ MOMI-R MDEC-11 MDEC-12 MDEC-13 ' | |
| 'MDEC-14 MDEC-22 MDEC-23 MDEC-24 MDEC-33 MDEC-34 MDEC-44 MDEO-11 MDEO-12 MDEO-22 MDEN-11 ' | |
| 'MDEN-12 MDEN-13 MDEN-22 MDEN-23 MDEN-33 MLogP nAtomLAC LOBMAX LOBMIN nAtomP nAtomLC khs.sLi ' | |
| 'khs.ssBe khs.ssssBe khs.ssBH khs.sssB khs.ssssB khs.sCH3 khs.dCH2 khs.ssCH2 khs.tCH khs.dsCH ' | |
| 'khs.aaCH khs.sssCH khs.ddC khs.tsC khs.dssC khs.aasC khs.aaaC khs.ssssC khs.sNH3 khs.sNH2 ' | |
| 'khs.ssNH2 khs.dNH khs.ssNH khs.aaNH khs.tN khs.sssNH khs.dsN khs.aaN khs.sssN khs.ddsN ' | |
| 'khs.aasN khs.ssssN khs.sOH khs.dO khs.ssO khs.aaO khs.sF khs.sSiH3 khs.ssSiH2 khs.sssSiH ' | |
| 'khs.ssssSi khs.sPH2 khs.ssPH khs.sssP khs.dsssP khs.sssssP khs.sSH khs.dS khs.ssS khs.aaS ' | |
| 'khs.dssS khs.ddssS khs.sCl khs.sGeH3 khs.ssGeH2 khs.sssGeH khs.ssssGe khs.sAsH2 khs.ssAsH ' | |
| 'khs.sssAs khs.sssdAs khs.sssssAs khs.sSeH khs.dSe khs.ssSe khs.aaSe khs.dssSe khs.ddssSe ' | |
| 'khs.sBr khs.sSnH3 khs.ssSnH2 khs.sssSnH khs.ssssSn khs.sI khs.sPbH3 khs.ssPbH2 khs.sssPbH ' | |
| 'khs.ssssPb Kier1 Kier2 Kier3 HybRatio nHBDon nHBAcc GRAV-1 GRAV-2 GRAV-3 GRAVH-1 GRAVH-2 ' | |
| 'GRAVH-3 GRAV-4 GRAV-5 GRAV-6 fragC FMF ECCEN PPSA-1 PPSA-2 PPSA-3 PNSA-1 PNSA-2 PNSA-3 ' | |
| 'DPSA-1 DPSA-2 DPSA-3 FPSA-1 FPSA-2 FPSA-3 FNSA-1 FNSA-2 FNSA-3 WPSA-1 WPSA-2 WPSA-3 WNSA-1 ' | |
| 'WNSA-2 WNSA-3 RPCG RNCG RPCS RNCS THSA TPSA RHSA RPSA SP-0 SP-1 SP-2 SP-3 SP-4 SP-5 SP-6 ' | |
| 'SP-7 VP-0 VP-1 VP-2 VP-3 VP-4 VP-5 VP-6 VP-7 SPC-4 SPC-5 SPC-6 VPC-4 VPC-5 VPC-6 SC-3 SC-4 ' | |
| 'SC-5 SC-6 VC-3 VC-4 VC-5 VC-6 SCH-3 SCH-4 SCH-5 SCH-6 SCH-7 VCH-3 VCH-4 VCH-5 VCH-6 VCH-7 ' | |
| 'C1SP1 C2SP1 C1SP2 C2SP2 C3SP2 C1SP3 C2SP3 C3SP3 C4SP3 bpol nB BCUTw-1l BCUTw-1h BCUTc-1l ' | |
| 'BCUTc-1h BCUTp-1l BCUTp-1h nBase ATSp1 ATSp2 ATSp3 ATSp4 ATSp5 ATSm1 ATSm2 ATSm3 ATSm4 ' | |
| 'ATSm5 ATSc1 ATSc2 ATSc3 ATSc4 ATSc5 nAtom nAromBond naAromAtom apol ALogP ALogp2 AMR nAcid ' | |
| 'JPLogP') | |
| # CDK uses uppercase exponents but pandas needs a lowercase | |
| values = '\n'.join([values[0]] + [line.replace('E', 'e') for line in values[1:]]) | |
| # Parse with pandas | |
| values = pd.read_csv(io.StringIO(values), sep=' ') | |
| else: | |
| try: | |
| values = pd.DataFrame.from_dict(eval('{%s}' % values), orient='index').fillna(0) | |
| # Separate SMILES when calculated | |
| if isinstance(values.index[0], str) and values.index.str.contains('|').any(): | |
| smiles = pd.Series(values.index.str.split('|').str[1], name='SMILES').reset_index(drop=True) | |
| values = (pd.concat([smiles, values.reset_index(drop=True)], axis=1) | |
| .reset_index(drop=True)) | |
| except pd.errors.EmptyDataError: | |
| raise RuntimeError('CDK could not obtain molecular descriptors, maybe due to a faulty molecule') | |
| # If only 2D, remove 3D descriptors | |
| if not self.include_3D and self.fingerprint is None: | |
| # Get 3D descriptor names to remove | |
| descs_3D = self.get_details() | |
| descs_3D = descs_3D[descs_3D.Dimensions == '3D'] | |
| values = values[[col for col in values.columns if col not in descs_3D.Name.tolist()]] | |
| return values | |
| def _calculate(self, mols: List[Chem.Mol], cdk_smiles: bool = False) -> pd.DataFrame: | |
| """Calculate CDK molecular descriptors on one process. | |
| :param mols: RDKit molecules for which CDK descriptors and fingerprints should be calculated. | |
| :param cdk_smiles: If True, generate the canonical SMILES (generated by CDK) of molecules parsed by CDK | |
| :return: a pandas DataFrame containing CDK descriptor values | |
| """ | |
| # Prepare inputs | |
| command = self._prepare_command(mols, cdk_smiles) | |
| # Run command and obtain results | |
| results = self._run_command(command) | |
| # Cleanup | |
| self._cleanup() | |
| # Insert lines of skipped molecules | |
| if len(self._skipped): | |
| results = pd.DataFrame(np.insert(results.values, self._skipped, | |
| values=[np.NaN] * len(results.columns), | |
| axis=0), | |
| columns=results.columns) | |
| # Omit SMILES column from casting if in values | |
| if cdk_smiles: | |
| smiles_col = results['SMILES'] | |
| results = results.drop(columns=['SMILES']) | |
| results = (results.apply(pd.to_numeric, errors='coerce', axis=1) | |
| ) | |
| # Insert SMILES column back | |
| if cdk_smiles: | |
| results = pd.concat([smiles_col, results], axis=1) | |
| return results | |
| def _multiproc_calculate(self, mols: List[Chem.Mol], cdk_smiles: bool = False) -> pd.DataFrame: | |
| """Calculate CDK descriptors and fingerprints in thread-safe manner. | |
| :param mols: RDKit molecules for which CDK descriptors and fingerprints should be calculated | |
| :param cdk_smiles: If True, generate the canonical SMILES (generated by CDK) of molecules parsed by CDK | |
| :return: a pandas DataFrame containing all CDK descriptor values | |
| """ | |
| # Copy self instance to make thread safe | |
| cdk = deepcopy(self) | |
| # Run copy | |
| result = cdk.calculate(mols, show_banner=False, njobs=1, cdk_smiles=cdk_smiles) | |
| return result | |
| def get_details(desc_name: Optional[str] = None): | |
| """Obtain details about either one or all descriptors. | |
| :param desc_name: the name of the descriptor to obtain details about (default: None). | |
| If None, returns details about all descriptors. | |
| """ | |
| details = pd.read_json(os.path.abspath(os.path.join(__file__, os.pardir, 'descs.json')), orient='index') | |
| if desc_name is not None: | |
| if desc_name not in details.Name.tolist(): | |
| raise ValueError(f'descriptor name {desc_name} is not available') | |
| details = details[details.Name == desc_name] | |
| return details | |