DiffAb / diffab /tools /relax /openmm_relaxer.py
luost26's picture
Update
753e275
import os
import time
import io
import logging
import pdbfixer
import openmm
from openmm import app as openmm_app
from openmm import unit
ENERGY = unit.kilocalories_per_mole
LENGTH = unit.angstroms
from diffab.tools.relax.base import RelaxTask
def current_milli_time():
return round(time.time() * 1000)
def _is_in_the_range(ch_rs_ic, flexible_residue_first, flexible_residue_last):
if ch_rs_ic[0] != flexible_residue_first[0]: return False
r_first, r_last = tuple(flexible_residue_first[1:]), tuple(flexible_residue_last[1:])
rs_ic = ch_rs_ic[1:]
return r_first <= rs_ic <= r_last
class ForceFieldMinimizer(object):
def __init__(self, stiffness=10.0, max_iterations=0, tolerance=2.39*unit.kilocalories_per_mole, platform='CUDA'):
super().__init__()
self.stiffness = stiffness
self.max_iterations = max_iterations
self.tolerance = tolerance
assert platform in ('CUDA', 'CPU')
self.platform = platform
def _fix(self, pdb_str):
fixer = pdbfixer.PDBFixer(pdbfile=io.StringIO(pdb_str))
fixer.findNonstandardResidues()
fixer.replaceNonstandardResidues()
fixer.findMissingResidues()
fixer.findMissingAtoms()
fixer.addMissingAtoms(seed=0)
fixer.addMissingHydrogens()
out_handle = io.StringIO()
openmm_app.PDBFile.writeFile(fixer.topology, fixer.positions, out_handle, keepIds=True)
return out_handle.getvalue()
def _get_pdb_string(self, topology, positions):
with io.StringIO() as f:
openmm_app.PDBFile.writeFile(topology, positions, f, keepIds=True)
return f.getvalue()
def _minimize(self, pdb_str, flexible_residue_first=None, flexible_residue_last=None):
pdb = openmm_app.PDBFile(io.StringIO(pdb_str))
force_field = openmm_app.ForceField("amber99sb.xml")
constraints = openmm_app.HBonds
system = force_field.createSystem(pdb.topology, constraints=constraints)
# Add constraints to non-generated regions
force = openmm.CustomExternalForce("0.5 * k * ((x-x0)^2 + (y-y0)^2 + (z-z0)^2)")
force.addGlobalParameter("k", self.stiffness)
for p in ["x0", "y0", "z0"]:
force.addPerParticleParameter(p)
if flexible_residue_first is not None and flexible_residue_last is not None:
for i, a in enumerate(pdb.topology.atoms()):
ch_rs_ic = (a.residue.chain.id, int(a.residue.id), a.residue.insertionCode)
if not _is_in_the_range(ch_rs_ic, flexible_residue_first, flexible_residue_last) and a.element.name != "hydrogen":
force.addParticle(i, pdb.positions[i])
system.addForce(force)
# Set up the integrator and simulation
integrator = openmm.LangevinIntegrator(0, 0.01, 0.0)
platform = openmm.Platform.getPlatformByName("CUDA")
simulation = openmm_app.Simulation(pdb.topology, system, integrator, platform)
simulation.context.setPositions(pdb.positions)
# Perform minimization
ret = {}
state = simulation.context.getState(getEnergy=True, getPositions=True)
ret["einit"] = state.getPotentialEnergy().value_in_unit(ENERGY)
ret["posinit"] = state.getPositions(asNumpy=True).value_in_unit(LENGTH)
simulation.minimizeEnergy(maxIterations=self.max_iterations, tolerance=self.tolerance)
state = simulation.context.getState(getEnergy=True, getPositions=True)
ret["efinal"] = state.getPotentialEnergy().value_in_unit(ENERGY)
ret["pos"] = state.getPositions(asNumpy=True).value_in_unit(LENGTH)
ret["min_pdb"] = self._get_pdb_string(simulation.topology, state.getPositions())
return ret['min_pdb'], ret
def _add_energy_remarks(self, pdb_str, ret):
pdb_lines = pdb_str.splitlines()
pdb_lines.insert(1, "REMARK 1 FINAL ENERGY: {:.3f} KCAL/MOL".format(ret['efinal']))
pdb_lines.insert(1, "REMARK 1 INITIAL ENERGY: {:.3f} KCAL/MOL".format(ret['einit']))
return "\n".join(pdb_lines)
def __call__(self, pdb_str, flexible_residue_first=None, flexible_residue_last=None, return_info=True):
if '\n' not in pdb_str and pdb_str.lower().endswith(".pdb"):
with open(pdb_str) as f:
pdb_str = f.read()
pdb_fixed = self._fix(pdb_str)
pdb_min, ret = self._minimize(pdb_fixed, flexible_residue_first, flexible_residue_last)
pdb_min = self._add_energy_remarks(pdb_min, ret)
if return_info:
return pdb_min, ret
else:
return pdb_min
def run_openmm(task: RelaxTask):
if not task.can_proceed() :
return task
if task.update_if_finished('openmm'):
return task
try:
minimizer = ForceFieldMinimizer()
with open(task.current_path, 'r') as f:
pdb_str = f.read()
pdb_min = minimizer(
pdb_str = pdb_str,
flexible_residue_first = task.flexible_residue_first,
flexible_residue_last = task.flexible_residue_last,
return_info = False,
)
out_path = task.set_current_path_tag('openmm')
with open(out_path, 'w') as f:
f.write(pdb_min)
task.mark_success()
except ValueError as e:
logging.warning(
f'{e.__class__.__name__}: {str(e)} ({task.current_path})'
)
task.mark_failure()
return task