|
import os |
|
import subprocess |
|
import random |
|
import string |
|
from easydict import EasyDict |
|
from rdkit import Chem |
|
from rdkit.Chem.rdForceFieldHelpers import UFFOptimizeMolecule |
|
|
|
from .reconstruct import reconstruct_from_generated |
|
|
|
|
|
def get_random_id(length=30): |
|
letters = string.ascii_lowercase |
|
return ''.join(random.choice(letters) for i in range(length)) |
|
|
|
|
|
def load_pdb(path): |
|
with open(path, 'r') as f: |
|
return f.read() |
|
|
|
|
|
def parse_qvina_outputs(docked_sdf_path): |
|
|
|
suppl = Chem.SDMolSupplier(docked_sdf_path) |
|
results = [] |
|
for i, mol in enumerate(suppl): |
|
if mol is None: |
|
continue |
|
line = mol.GetProp('REMARK').splitlines()[0].split()[2:] |
|
results.append(EasyDict({ |
|
'rdmol': mol, |
|
'mode_id': i, |
|
'affinity': float(line[0]), |
|
'rmsd_lb': float(line[1]), |
|
'rmsd_ub': float(line[2]), |
|
})) |
|
|
|
return results |
|
|
|
class BaseDockingTask(object): |
|
|
|
def __init__(self, pdb_block, ligand_rdmol): |
|
super().__init__() |
|
self.pdb_block = pdb_block |
|
self.ligand_rdmol = ligand_rdmol |
|
|
|
def run(self): |
|
raise NotImplementedError() |
|
|
|
def get_results(self): |
|
raise NotImplementedError() |
|
|
|
|
|
class QVinaDockingTask(BaseDockingTask): |
|
|
|
@classmethod |
|
def from_generated_data(cls, data, protein_root='./data/crossdocked', **kwargs): |
|
protein_fn = os.path.join( |
|
os.path.dirname(data.ligand_filename), |
|
os.path.basename(data.ligand_filename)[:10] + '.pdb' |
|
) |
|
protein_path = os.path.join(protein_root, protein_fn) |
|
with open(protein_path, 'r') as f: |
|
pdb_block = f.read() |
|
ligand_rdmol = reconstruct_from_generated(data) |
|
return cls(pdb_block, ligand_rdmol, **kwargs) |
|
|
|
@classmethod |
|
def from_original_data(cls, data, ligand_root='./data/crossdocked_pocket10', protein_root='./data/crossdocked', **kwargs): |
|
protein_fn = os.path.join( |
|
os.path.dirname(data.ligand_filename), |
|
os.path.basename(data.ligand_filename)[:10] + '.pdb' |
|
) |
|
protein_path = os.path.join(protein_root, protein_fn) |
|
with open(protein_path, 'r') as f: |
|
pdb_block = f.read() |
|
|
|
ligand_path = os.path.join(ligand_root, data.ligand_filename) |
|
ligand_rdmol = next(iter(Chem.SDMolSupplier(ligand_path))) |
|
return cls(pdb_block, ligand_rdmol, **kwargs) |
|
|
|
def __init__(self, pdb_block, ligand_rdmol, conda_env='adt', tmp_dir='./tmp', use_uff=True, center=None): |
|
super().__init__(pdb_block, ligand_rdmol) |
|
self.conda_env = conda_env |
|
self.tmp_dir = os.path.realpath(tmp_dir) |
|
os.makedirs(tmp_dir, exist_ok=True) |
|
|
|
self.task_id = get_random_id() |
|
self.receptor_id = self.task_id + '_receptor' |
|
self.ligand_id = self.task_id + '_ligand' |
|
|
|
self.receptor_path = os.path.join(self.tmp_dir, self.receptor_id + '.pdb') |
|
self.ligand_path = os.path.join(self.tmp_dir, self.ligand_id + '.sdf') |
|
|
|
with open(self.receptor_path, 'w') as f: |
|
f.write(pdb_block) |
|
|
|
ligand_rdmol = Chem.AddHs(ligand_rdmol, addCoords=True) |
|
if use_uff: |
|
UFFOptimizeMolecule(ligand_rdmol) |
|
sdf_writer = Chem.SDWriter(self.ligand_path) |
|
sdf_writer.write(ligand_rdmol) |
|
sdf_writer.close() |
|
self.ligand_rdmol = ligand_rdmol |
|
|
|
pos = ligand_rdmol.GetConformer(0).GetPositions() |
|
if center is None: |
|
self.center = (pos.max(0) + pos.min(0)) / 2 |
|
else: |
|
self.center = center |
|
|
|
self.proc = None |
|
self.results = None |
|
self.output = None |
|
self.docked_sdf_path = None |
|
|
|
def run(self, exhaustiveness=16): |
|
commands = """ |
|
eval "$(conda shell.bash hook)" |
|
conda activate {env} |
|
cd {tmp} |
|
# Prepare receptor (PDB->PDBQT) |
|
prepare_receptor4.py -r {receptor_id}.pdb |
|
# Prepare ligand |
|
obabel {ligand_id}.sdf -O{ligand_id}.pdbqt |
|
qvina2.1 \ |
|
--receptor {receptor_id}.pdbqt \ |
|
--ligand {ligand_id}.pdbqt \ |
|
--center_x {center_x:.4f} \ |
|
--center_y {center_y:.4f} \ |
|
--center_z {center_z:.4f} \ |
|
--size_x 20 --size_y 20 --size_z 20 \ |
|
--exhaustiveness {exhaust} |
|
obabel {ligand_id}_out.pdbqt -O{ligand_id}_out.sdf -h |
|
""".format( |
|
receptor_id = self.receptor_id, |
|
ligand_id = self.ligand_id, |
|
env = self.conda_env, |
|
tmp = self.tmp_dir, |
|
exhaust = exhaustiveness, |
|
center_x = self.center[0], |
|
center_y = self.center[1], |
|
center_z = self.center[2], |
|
) |
|
|
|
self.docked_sdf_path = os.path.join(self.tmp_dir, '%s_out.sdf' % self.ligand_id) |
|
|
|
self.proc = subprocess.Popen( |
|
'/bin/bash', |
|
shell=False, |
|
stdin=subprocess.PIPE, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.PIPE |
|
) |
|
|
|
self.proc.stdin.write(commands.encode('utf-8')) |
|
self.proc.stdin.close() |
|
|
|
|
|
|
|
def run_sync(self): |
|
self.run() |
|
while self.get_results() is None: |
|
pass |
|
results = self.get_results() |
|
print('Best affinity:', results[0]['affinity']) |
|
return results |
|
|
|
def get_results(self): |
|
if self.proc is None: |
|
return None |
|
elif self.proc.poll() is None: |
|
return None |
|
else: |
|
if self.output is None: |
|
self.output = self.proc.stdout.readlines() |
|
try: |
|
self.results = parse_qvina_outputs(self.docked_sdf_path) |
|
except: |
|
print('[Error] Vina output error: %s' % self.docked_sdf_path) |
|
return [] |
|
return self.results |
|
|
|
|