ICLR_FLAG / utils /docking.py
zaixizhang
renew
10efe81
raw
history blame
5.79 kB
import os
import subprocess
import random
import string
from easydict import EasyDict
from rdkit import Chem
from rdkit.Chem.rdForceFieldHelpers import UFFOptimizeMolecule
from .reconstruct import reconstruct_from_generated
def get_random_id(length=30):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))
def load_pdb(path):
with open(path, 'r') as f:
return f.read()
def parse_qvina_outputs(docked_sdf_path):
suppl = Chem.SDMolSupplier(docked_sdf_path)
results = []
for i, mol in enumerate(suppl):
if mol is None:
continue
line = mol.GetProp('REMARK').splitlines()[0].split()[2:]
results.append(EasyDict({
'rdmol': mol,
'mode_id': i,
'affinity': float(line[0]),
'rmsd_lb': float(line[1]),
'rmsd_ub': float(line[2]),
}))
return results
class BaseDockingTask(object):
def __init__(self, pdb_block, ligand_rdmol):
super().__init__()
self.pdb_block = pdb_block
self.ligand_rdmol = ligand_rdmol
def run(self):
raise NotImplementedError()
def get_results(self):
raise NotImplementedError()
class QVinaDockingTask(BaseDockingTask):
@classmethod
def from_generated_data(cls, data, protein_root='./data/crossdocked', **kwargs):
protein_fn = os.path.join(
os.path.dirname(data.ligand_filename),
os.path.basename(data.ligand_filename)[:10] + '.pdb'
)
protein_path = os.path.join(protein_root, protein_fn)
with open(protein_path, 'r') as f:
pdb_block = f.read()
ligand_rdmol = reconstruct_from_generated(data)
return cls(pdb_block, ligand_rdmol, **kwargs)
@classmethod
def from_original_data(cls, data, ligand_root='./data/crossdocked_pocket10', protein_root='./data/crossdocked', **kwargs):
protein_fn = os.path.join(
os.path.dirname(data.ligand_filename),
os.path.basename(data.ligand_filename)[:10] + '.pdb'
)
protein_path = os.path.join(protein_root, protein_fn)
with open(protein_path, 'r') as f:
pdb_block = f.read()
ligand_path = os.path.join(ligand_root, data.ligand_filename)
ligand_rdmol = next(iter(Chem.SDMolSupplier(ligand_path)))
return cls(pdb_block, ligand_rdmol, **kwargs)
def __init__(self, pdb_block, ligand_rdmol, conda_env='adt', tmp_dir='./tmp', use_uff=True, center=None):
super().__init__(pdb_block, ligand_rdmol)
self.conda_env = conda_env
self.tmp_dir = os.path.realpath(tmp_dir)
os.makedirs(tmp_dir, exist_ok=True)
self.task_id = get_random_id()
self.receptor_id = self.task_id + '_receptor'
self.ligand_id = self.task_id + '_ligand'
self.receptor_path = os.path.join(self.tmp_dir, self.receptor_id + '.pdb')
self.ligand_path = os.path.join(self.tmp_dir, self.ligand_id + '.sdf')
with open(self.receptor_path, 'w') as f:
f.write(pdb_block)
ligand_rdmol = Chem.AddHs(ligand_rdmol, addCoords=True)
if use_uff:
UFFOptimizeMolecule(ligand_rdmol)
sdf_writer = Chem.SDWriter(self.ligand_path)
sdf_writer.write(ligand_rdmol)
sdf_writer.close()
self.ligand_rdmol = ligand_rdmol
pos = ligand_rdmol.GetConformer(0).GetPositions()
if center is None:
self.center = (pos.max(0) + pos.min(0)) / 2
else:
self.center = center
self.proc = None
self.results = None
self.output = None
self.docked_sdf_path = None
def run(self, exhaustiveness=16):
commands = """
eval "$(conda shell.bash hook)"
conda activate {env}
cd {tmp}
# Prepare receptor (PDB->PDBQT)
prepare_receptor4.py -r {receptor_id}.pdb
# Prepare ligand
obabel {ligand_id}.sdf -O{ligand_id}.pdbqt
qvina2.1 \
--receptor {receptor_id}.pdbqt \
--ligand {ligand_id}.pdbqt \
--center_x {center_x:.4f} \
--center_y {center_y:.4f} \
--center_z {center_z:.4f} \
--size_x 20 --size_y 20 --size_z 20 \
--exhaustiveness {exhaust}
obabel {ligand_id}_out.pdbqt -O{ligand_id}_out.sdf -h
""".format(
receptor_id = self.receptor_id,
ligand_id = self.ligand_id,
env = self.conda_env,
tmp = self.tmp_dir,
exhaust = exhaustiveness,
center_x = self.center[0],
center_y = self.center[1],
center_z = self.center[2],
)
self.docked_sdf_path = os.path.join(self.tmp_dir, '%s_out.sdf' % self.ligand_id)
self.proc = subprocess.Popen(
'/bin/bash',
shell=False,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self.proc.stdin.write(commands.encode('utf-8'))
self.proc.stdin.close()
# return commands
def run_sync(self):
self.run()
while self.get_results() is None:
pass
results = self.get_results()
print('Best affinity:', results[0]['affinity'])
return results
def get_results(self):
if self.proc is None: # Not started
return None
elif self.proc.poll() is None: # In progress
return None
else:
if self.output is None:
self.output = self.proc.stdout.readlines()
try:
self.results = parse_qvina_outputs(self.docked_sdf_path)
except:
print('[Error] Vina output error: %s' % self.docked_sdf_path)
return []
return self.results