AryanRajSaxena's picture
Upload folder using huggingface_hub
56793c5 verified
# -*- coding: utf-8
"""Python wrapper for CDK descriptors and fingerprints"""
from __future__ import annotations
import io
import multiprocessing
import os
import subprocess
import warnings
from copy import deepcopy
from enum import Enum, auto
from subprocess import Popen, PIPE
from typing import Iterable, List, Optional
import more_itertools
import numpy as np
import pandas as pd
from bounded_pool_executor import BoundedProcessPoolExecutor
from rdkit import Chem
from rdkit.rdBase import BlockLogs
from .utils import install_java, mktempfile, needsHs
class FPType(Enum):
FP = auto()
ExtFP = auto()
EStateFP = auto()
GraphFP = auto()
MACCSFP = auto()
PubchemFP = auto()
SubFP = auto()
KRFP = auto()
AP2DFP = auto()
HybridFP = auto()
LingoFP = auto()
SPFP = auto()
SigFP = auto()
CircFP = auto()
class CDK:
"""Wrapper to obtain molecular descriptor from CDK."""
lock = multiprocessing.RLock() # Ensure installation of JRE is thread safe
# Path to the JAR file
_jarfile = os.path.abspath(os.path.join(__file__, os.pardir, 'CDKdesc.jar'))
def __init__(self, ignore_3D: bool = True, fingerprint: FPType = None, nbits: int = 1024, depth: int = 6,
backend_smiles: bool = False):
"""Instantiate a wrapper to calculate CDK molecular descriptors or a fingerprint.
:param ignore_3D: whether to include 3D molecular descriptors
:param fingerprint: a fingerprint type to be calculated (default: None, calculates descriptors)
:param nbits: number of bits (default: 1024 unless the fingerprint has a fixed size)
:param depth: depth of the fingerprint (default: 6 unless the fingerprint does not depend on depth)
:param backend_smiles: use SMILES as the interchange format to discuss with the CDKdesc backend;
the default (i.e. backend_smiles=False) makes use of the V2000 SD format; is ignored if ignore_3D=True.
"""
# Ensure the jar file exists
if not os.path.isfile(self._jarfile):
raise IOError('The required CDKdesc JAR file is not present. Reinstall CDK-pywrapper.')
if fingerprint is not None:
if not isinstance(fingerprint, FPType):
raise TypeError(f'Fingerprint type not supported: {fingerprint}')
self.include_3D = not ignore_3D
self.fingerprint = None if fingerprint is None else fingerprint.name
self.nbits = nbits
self.depth = depth
self.backend_smiles = backend_smiles and ignore_3D # if include_3D, then always False
def calculate(self, mols: List[Chem.Mol], show_banner: bool = True, cdk_smiles: bool = False,
njobs: int = 1, chunksize: Optional[int] = 1000) -> pd.DataFrame:
"""Calculate molecular fingerprints.
:param mols: RDKit molecules for which descriptors/fingerprints should be calculated
(must have 3D conformers if calculating descriptors)
:param show_banner: If True, show notice on this package usage
:param cdk_smiles: If True, generate the canonical SMILES (generated by CDK) of molecules parsed by CDK
:param njobs: number of concurrent processes
:param chunksize: number of molecules to be processed by a process; ignored if njobs is 1
:return: a pandas DataFrame containing all CDK descriptor/fingerprint values
"""
if show_banner:
self._show_banner()
if njobs < 0:
njobs = os.cpu_count() - njobs + 1
# Parallelize should need be
if njobs > 1:
with BoundedProcessPoolExecutor(max_workers=njobs) as worker:
futures = [worker.submit(self._multiproc_calculate, list(chunk), cdk_smiles)
for chunk in more_itertools.batched(mols, chunksize)
]
return (pd.concat([future.result() for future in futures]).
reset_index(drop=True)
)
# Single process
return self._calculate(list(mols), cdk_smiles)
def _show_banner(self):
"""Print info message for citing."""
print("""The Chemistry Development Kit (CDK) is a collection of modular Java libraries
for processing chemical information (Cheminformatics). It can compute 14 different fingerprint
types and 287 molecular descriptors (it requires 3D molecular structures for the latter).
###################################
Should you publish results based on the PaDEL descriptors,
please cite:
Willighagen et al., (2017) J. Cheminf. 9(3), doi:10.1186/s13321-017-0220-4,
May and Steinbeck., (2014) J. Cheminf., doi:10.1186/1758-2946-6-3,
Steinbeck et al., (2006) Curr. Pharm. Des. 12(17):2111-2120, doi:10.2174/138161206777585274,
Steinbeck et al., (2003) J. Chem. Inf. Comput. Sci. 43(2):493-500, doi:10.1021/ci025584y.
###################################
""")
def _prepare_command(self, mols: List[Chem.Mol], cdk_smiles: bool = False) -> str:
"""Create the CDK command to be run to obtain molecular descriptors.
:param mols: molecules to obtained molecular descriptors of
:param cdk_smiles: If True, generate the canonical SMILES (generated by CDK) of molecules parsed by CDK
:return: The command to run.
"""
# 1) Ensure JRE is accessible
with self.lock:
self._java_path = install_java(19)
# 2) Create temp SD v2k or SMILES file
self._tmp_input = mktempfile('molecules_.smi') if self.backend_smiles else mktempfile('molecules_v2k.sd')
self._n_mols = 0
self._skipped = []
self.n = 0
try:
block = BlockLogs()
if self.backend_smiles:
writer = Chem.SmilesWriter(self._tmp_input, includeHeader=False, isomericSmiles=True, kekuleSmiles=True)
else:
writer = Chem.SDWriter(self._tmp_input)
# Ensure V2000 as CDK cannot properly process v3000
writer.SetForceV3000(False)
for i, mol in enumerate(mols):
if mol is not None and isinstance(mol, Chem.Mol):
if not self.backend_smiles and mol.GetNumAtoms() > 999:
raise ValueError('Cannot calculate descriptors for molecules with more than 999 atoms.')
# Does molecule lack hydrogen atoms?
if needsHs(mol):
warnings.warn('Molecule lacks hydrogen atoms: this might affect the value of calculated descriptors')
# Are molecules 3D
if self.include_3D:
confs = list(mol.GetConformers())
if self.fingerprint is None and not (len(confs) > 0 and confs[-1].Is3D()):
raise ValueError('Cannot calculate the 3D descriptors of a conformer-less molecule')
writer.write(mol)
self._n_mols += 1
else:
self._skipped.append(i)
self.n += 1
writer.close()
del block
except ValueError as e:
# Free resources and raise error
writer.close()
del block
os.remove(self._tmp_input)
raise e from None
# 3) Create command
java_path = install_java(19)
command_parameters = (f"-f {self.fingerprint} -nBits {self.nbits} "
f"-depth {self.depth}") if self.fingerprint is not None else ""
command_file = f"-{'s' if self.backend_smiles else 'i'} {self._tmp_input}"
command_out_smiles = "-S" if cdk_smiles else ""
command = f"{java_path} -jar {self._jarfile} {command_file} {command_parameters} {command_out_smiles}"
return command
def _cleanup(self) -> None:
"""Cleanup resources used for calculation."""
# Remove temporary files
os.remove(self._tmp_input)
def _run_command(self, command: str) -> pd.DataFrame:
"""Run the CDK command.
:param command: The command to be run.
"""
with Popen(command.split(), stdout=PIPE, stderr=subprocess.DEVNULL) as process:
values = process.stdout.read().decode()
# CDK barf preventing correct parsing
if 'not found' in values:
# Omit error
values = '\n'.join(line for line in values.split('\n') if 'not found' not in line)
# Empty result file
if len(values) == 0:
details = self.get_details()
values = pd.DataFrame(np.full((self._n_mols, details.shape[0]), np.nan),
columns=details.Name)
elif '{' not in values:
if self.fingerprint is None:
values = values.split('\n')
# Ensure all columns are present in the header
values[0] = (f'{"SMILES " if values[0].startswith("SMILES") else ""}'
'Fsp3 nSmallRings nAromRings nRingBlocks nAromBlocks nRings3 nRings4 nRings5 nRings6 '
'nRings7 nRings8 nRings9 tpsaEfficiency Zagreb XLogP WPATH WPOL Wlambda1.unity Wlambda2.unity '
'Wlambda3.unity Wnu1.unity Wnu2.unity Wgamma1.unity Wgamma2.unity Wgamma3.unity Weta1.unity '
'Weta2.unity Weta3.unity WT.unity WA.unity WV.unity WK.unity WG.unity WD.unity WTPT-1 WTPT-2 '
'WTPT-3 WTPT-4 WTPT-5 MW VAdjMat VABC TopoPSA LipinskiFailures nRotB topoShape geomShape '
'PetitjeanNumber MOMI-X MOMI-Y MOMI-Z MOMI-XY MOMI-XZ MOMI-YZ MOMI-R MDEC-11 MDEC-12 MDEC-13 '
'MDEC-14 MDEC-22 MDEC-23 MDEC-24 MDEC-33 MDEC-34 MDEC-44 MDEO-11 MDEO-12 MDEO-22 MDEN-11 '
'MDEN-12 MDEN-13 MDEN-22 MDEN-23 MDEN-33 MLogP nAtomLAC LOBMAX LOBMIN nAtomP nAtomLC khs.sLi '
'khs.ssBe khs.ssssBe khs.ssBH khs.sssB khs.ssssB khs.sCH3 khs.dCH2 khs.ssCH2 khs.tCH khs.dsCH '
'khs.aaCH khs.sssCH khs.ddC khs.tsC khs.dssC khs.aasC khs.aaaC khs.ssssC khs.sNH3 khs.sNH2 '
'khs.ssNH2 khs.dNH khs.ssNH khs.aaNH khs.tN khs.sssNH khs.dsN khs.aaN khs.sssN khs.ddsN '
'khs.aasN khs.ssssN khs.sOH khs.dO khs.ssO khs.aaO khs.sF khs.sSiH3 khs.ssSiH2 khs.sssSiH '
'khs.ssssSi khs.sPH2 khs.ssPH khs.sssP khs.dsssP khs.sssssP khs.sSH khs.dS khs.ssS khs.aaS '
'khs.dssS khs.ddssS khs.sCl khs.sGeH3 khs.ssGeH2 khs.sssGeH khs.ssssGe khs.sAsH2 khs.ssAsH '
'khs.sssAs khs.sssdAs khs.sssssAs khs.sSeH khs.dSe khs.ssSe khs.aaSe khs.dssSe khs.ddssSe '
'khs.sBr khs.sSnH3 khs.ssSnH2 khs.sssSnH khs.ssssSn khs.sI khs.sPbH3 khs.ssPbH2 khs.sssPbH '
'khs.ssssPb Kier1 Kier2 Kier3 HybRatio nHBDon nHBAcc GRAV-1 GRAV-2 GRAV-3 GRAVH-1 GRAVH-2 '
'GRAVH-3 GRAV-4 GRAV-5 GRAV-6 fragC FMF ECCEN PPSA-1 PPSA-2 PPSA-3 PNSA-1 PNSA-2 PNSA-3 '
'DPSA-1 DPSA-2 DPSA-3 FPSA-1 FPSA-2 FPSA-3 FNSA-1 FNSA-2 FNSA-3 WPSA-1 WPSA-2 WPSA-3 WNSA-1 '
'WNSA-2 WNSA-3 RPCG RNCG RPCS RNCS THSA TPSA RHSA RPSA SP-0 SP-1 SP-2 SP-3 SP-4 SP-5 SP-6 '
'SP-7 VP-0 VP-1 VP-2 VP-3 VP-4 VP-5 VP-6 VP-7 SPC-4 SPC-5 SPC-6 VPC-4 VPC-5 VPC-6 SC-3 SC-4 '
'SC-5 SC-6 VC-3 VC-4 VC-5 VC-6 SCH-3 SCH-4 SCH-5 SCH-6 SCH-7 VCH-3 VCH-4 VCH-5 VCH-6 VCH-7 '
'C1SP1 C2SP1 C1SP2 C2SP2 C3SP2 C1SP3 C2SP3 C3SP3 C4SP3 bpol nB BCUTw-1l BCUTw-1h BCUTc-1l '
'BCUTc-1h BCUTp-1l BCUTp-1h nBase ATSp1 ATSp2 ATSp3 ATSp4 ATSp5 ATSm1 ATSm2 ATSm3 ATSm4 '
'ATSm5 ATSc1 ATSc2 ATSc3 ATSc4 ATSc5 nAtom nAromBond naAromAtom apol ALogP ALogp2 AMR nAcid '
'JPLogP')
# CDK uses uppercase exponents but pandas needs a lowercase
values = '\n'.join([values[0]] + [line.replace('E', 'e') for line in values[1:]])
# Parse with pandas
values = pd.read_csv(io.StringIO(values), sep=' ')
else:
try:
values = pd.DataFrame.from_dict(eval('{%s}' % values), orient='index').fillna(0)
# Separate SMILES when calculated
if isinstance(values.index[0], str) and values.index.str.contains('|').any():
smiles = pd.Series(values.index.str.split('|').str[1], name='SMILES').reset_index(drop=True)
values = (pd.concat([smiles, values.reset_index(drop=True)], axis=1)
.reset_index(drop=True))
except pd.errors.EmptyDataError:
raise RuntimeError('CDK could not obtain molecular descriptors, maybe due to a faulty molecule')
# If only 2D, remove 3D descriptors
if not self.include_3D and self.fingerprint is None:
# Get 3D descriptor names to remove
descs_3D = self.get_details()
descs_3D = descs_3D[descs_3D.Dimensions == '3D']
values = values[[col for col in values.columns if col not in descs_3D.Name.tolist()]]
return values
def _calculate(self, mols: List[Chem.Mol], cdk_smiles: bool = False) -> pd.DataFrame:
"""Calculate CDK molecular descriptors on one process.
:param mols: RDKit molecules for which CDK descriptors and fingerprints should be calculated.
:param cdk_smiles: If True, generate the canonical SMILES (generated by CDK) of molecules parsed by CDK
:return: a pandas DataFrame containing CDK descriptor values
"""
# Prepare inputs
command = self._prepare_command(mols, cdk_smiles)
# Run command and obtain results
results = self._run_command(command)
# Cleanup
self._cleanup()
# Insert lines of skipped molecules
if len(self._skipped):
results = pd.DataFrame(np.insert(results.values, self._skipped,
values=[np.NaN] * len(results.columns),
axis=0),
columns=results.columns)
# Omit SMILES column from casting if in values
if cdk_smiles:
smiles_col = results['SMILES']
results = results.drop(columns=['SMILES'])
results = (results.apply(pd.to_numeric, errors='coerce', axis=1)
)
# Insert SMILES column back
if cdk_smiles:
results = pd.concat([smiles_col, results], axis=1)
return results
def _multiproc_calculate(self, mols: List[Chem.Mol], cdk_smiles: bool = False) -> pd.DataFrame:
"""Calculate CDK descriptors and fingerprints in thread-safe manner.
:param mols: RDKit molecules for which CDK descriptors and fingerprints should be calculated
:param cdk_smiles: If True, generate the canonical SMILES (generated by CDK) of molecules parsed by CDK
:return: a pandas DataFrame containing all CDK descriptor values
"""
# Copy self instance to make thread safe
cdk = deepcopy(self)
# Run copy
result = cdk.calculate(mols, show_banner=False, njobs=1, cdk_smiles=cdk_smiles)
return result
@staticmethod
def get_details(desc_name: Optional[str] = None):
"""Obtain details about either one or all descriptors.
:param desc_name: the name of the descriptor to obtain details about (default: None).
If None, returns details about all descriptors.
"""
details = pd.read_json(os.path.abspath(os.path.join(__file__, os.pardir, 'descs.json')), orient='index')
if desc_name is not None:
if desc_name not in details.Name.tolist():
raise ValueError(f'descriptor name {desc_name} is not available')
details = details[details.Name == desc_name]
return details