| import logging, sys |
|
|
| import numpy as np |
| from tqdm import tqdm |
| from scipy.linalg import eigh |
|
|
| from rdkit import Chem |
| from rdkit.Chem import MolFromSmiles, MolToSmiles, AddHs |
|
|
| from nfp.preprocessing import features |
| from nfp.preprocessing.features import Tokenizer |
| import time |
|
|
|
|
| class SmilesPreprocessor(object): |
| """ Given a list of SMILES strings, encode these molecules as atom and |
| connectivity feature matricies. |
| |
| Example: |
| >>> preprocessor = SmilesPreprocessor(explicit_hs=False) |
| >>> inputs = preprocessor.fit(data.smiles) |
| """ |
|
|
| def __init__(self, explicit_hs=True, atom_features=None, bond_features=None): |
| """ |
| |
| explicit_hs : bool |
| whether to tell RDkit to add H's to a molecule. |
| atom_features : function |
| A function applied to an rdkit.Atom that returns some |
| representation (i.e., string, integer) for the Tokenizer class. |
| bond_features : function |
| A function applied to an rdkit Bond to return some description. |
| |
| """ |
|
|
| self.atom_tokenizer = Tokenizer() |
| self.bond_tokenizer = Tokenizer() |
| self.explicit_hs = explicit_hs |
|
|
| if atom_features is None: |
| atom_features = features.atom_features_v1 |
|
|
| if bond_features is None: |
| bond_features = features.bond_features_v1 |
|
|
| self.atom_features = atom_features |
| self.bond_features = bond_features |
|
|
|
|
| def fit(self, smiles_iterator): |
| """ Fit an iterator of SMILES strings, creating new atom and bond |
| tokens for unseen molecules. Returns a dictionary with 'atom' and |
| 'connectivity' entries """ |
| return list(self.preprocess(smiles_iterator, train=True)) |
|
|
|
|
| def predict(self, smiles_iterator): |
| """ Uses previously determined atom and bond tokens to convert a SMILES |
| iterator into 'atom' and 'connectivity' matrices. Ensures that atom and |
| bond classes commute with previously determined results. """ |
| return list(self.preprocess(smiles_iterator, train=False)) |
|
|
|
|
| def preprocess(self, smiles_iterator, train=True): |
|
|
| self.atom_tokenizer.train = train |
| self.bond_tokenizer.train = train |
|
|
| for smiles in tqdm(smiles_iterator): |
| yield self.construct_feature_matrices(smiles) |
|
|
|
|
| @property |
| def atom_classes(self): |
| """ The number of atom types found (includes the 0 null-atom type) """ |
| return self.atom_tokenizer.num_classes + 1 |
|
|
|
|
| @property |
| def bond_classes(self): |
| """ The number of bond types found (includes the 0 null-bond type) """ |
| return self.bond_tokenizer.num_classes + 1 |
|
|
|
|
| def construct_feature_matrices(self, smiles): |
| """ construct a molecule from the given smiles string and return atom |
| and bond classes. |
| |
| Returns |
| dict with entries |
| 'n_atom' : number of atoms in the molecule |
| 'n_bond' : number of bonds in the molecule |
| 'atom' : (n_atom,) length list of atom classes |
| 'bond' : (n_bond,) list of bond classes |
| 'connectivity' : (n_bond, 2) array of source atom, target atom pairs. |
| |
| """ |
|
|
| mol = MolFromSmiles(smiles) |
| if self.explicit_hs: |
| mol = AddHs(mol) |
|
|
| n_atom = len(mol.GetAtoms()) |
| n_bond = 2 * len(mol.GetBonds()) |
|
|
| |
| if n_bond == 0: |
| n_bond = 1 |
| |
| atom_feature_matrix = np.zeros(n_atom, dtype='int') |
| bond_feature_matrix = np.zeros(n_bond, dtype='int') |
| connectivity = np.zeros((n_bond, 2), dtype='int') |
|
|
| bond_index = 0 |
|
|
| atom_seq = mol.GetAtoms() |
| atoms = [atom_seq[i] for i in range(n_atom)] |
|
|
| for n, atom in enumerate(atoms): |
|
|
| |
| atom_feature_matrix[n] = self.atom_tokenizer( |
| self.atom_features(atom)) |
|
|
| start_index = atom.GetIdx() |
|
|
| for bond in atom.GetBonds(): |
| |
| rev = bond.GetBeginAtomIdx() != start_index |
|
|
| |
| bond_feature_matrix[n] = self.bond_tokenizer( |
| self.bond_features(bond, flipped=rev)) |
|
|
| |
| if not rev: |
| connectivity[bond_index, 0] = bond.GetBeginAtomIdx() |
| connectivity[bond_index, 1] = bond.GetEndAtomIdx() |
|
|
| else: |
| connectivity[bond_index, 0] = bond.GetEndAtomIdx() |
| connectivity[bond_index, 1] = bond.GetBeginAtomIdx() |
|
|
| bond_index += 1 |
|
|
|
|
| return { |
| 'n_atom': n_atom, |
| 'n_bond': n_bond, |
| 'atom': atom_feature_matrix, |
| 'bond': bond_feature_matrix, |
| 'connectivity': connectivity, |
| } |
| |
|
|
| class ConnectivityAPreprocessor(object): |
| """ Given a list of SMILES strings, encode these molecules as atom and |
| connectivity feature matricies. |
| |
| Example: |
| >>> preprocessor = SmilesPreprocessor(explicit_hs=False) |
| >>> inputs = preprocessor.fit(data.smiles) |
| """ |
|
|
| def __init__(self, explicit_hs=True, atom_features=None, bond_features=None): |
| """ |
| |
| explicit_hs : bool |
| whether to tell RDkit to add H's to a molecule. |
| atom_features : function |
| A function applied to an rdkit.Atom that returns some |
| representation (i.e., string, integer) for the Tokenizer class. |
| bond_features : function |
| A function applied to an rdkit Bond to return some description. |
| |
| """ |
|
|
| self.atom_tokenizer = Tokenizer() |
| self.bond_tokenizer = Tokenizer() |
| self.explicit_hs = explicit_hs |
|
|
| if atom_features is None: |
| atom_features = features.atom_features_v1 |
|
|
| if bond_features is None: |
| bond_features = features.bond_features_v1 |
|
|
| self.atom_features = atom_features |
| self.bond_features = bond_features |
|
|
|
|
| def fit(self, smiles_iterator): |
| """ Fit an iterator of SMILES strings, creating new atom and bond |
| tokens for unseen molecules. Returns a dictionary with 'atom' and |
| 'connectivity' entries """ |
| return list(self.preprocess(smiles_iterator, train=True)) |
|
|
|
|
| def predict(self, smiles_iterator): |
| """ Uses previously determined atom and bond tokens to convert a SMILES |
| iterator into 'atom' and 'connectivity' matrices. Ensures that atom and |
| bond classes commute with previously determined results. """ |
| return list(self.preprocess(smiles_iterator, train=False)) |
|
|
|
|
| def preprocess(self, smiles_iterator, train=True): |
|
|
| self.atom_tokenizer.train = train |
| self.bond_tokenizer.train = train |
|
|
| for smiles in tqdm(smiles_iterator): |
| yield self.construct_feature_matrices(smiles) |
|
|
|
|
| @property |
| def atom_classes(self): |
| """ The number of atom types found (includes the 0 null-atom type) """ |
| return self.atom_tokenizer.num_classes + 1 |
|
|
|
|
| @property |
| def bond_classes(self): |
| """ The number of bond types found (includes the 0 null-bond type) """ |
| return self.bond_tokenizer.num_classes + 1 |
|
|
|
|
| def construct_feature_matrices(self, smiles): |
| """ construct a molecule from the given smiles string and return atom |
| and bond classes. |
| |
| Returns |
| dict with entries |
| 'n_atom' : number of atoms in the molecule |
| 'n_bond' : number of bonds in the molecule |
| 'atom' : (n_atom,) length list of atom classes |
| 'bond' : (n_bond,) list of bond classes |
| 'connectivity' : (n_bond, 2) array of source atom, target atom pairs. |
| |
| """ |
|
|
| mol = MolFromSmiles(smiles) |
| if self.explicit_hs: |
| mol = AddHs(mol) |
|
|
| n_atom = len(mol.GetAtoms()) |
| n_bond = 2 * len(mol.GetBonds()) |
|
|
| |
| if n_bond == 0: |
| n_bond = 1 |
| |
| atom_feature_matrix = np.zeros(n_atom, dtype='int') |
| bond_feature_matrix = np.zeros(n_bond, dtype='int') |
| connectivity = np.zeros((n_bond, 2), dtype='int') |
|
|
| bond_index = 0 |
|
|
| atom_seq = mol.GetAtoms() |
| atoms = [atom_seq[i] for i in range(n_atom)] |
|
|
| for n, atom in enumerate(atoms): |
|
|
| |
| atom_feature_matrix[n] = self.atom_tokenizer( |
| self.atom_features(atom)) |
|
|
| start_index = atom.GetIdx() |
|
|
| for bond in atom.GetBonds(): |
| |
| rev = bond.GetBeginAtomIdx() != start_index |
|
|
| |
| bond_feature_matrix[n] = self.bond_tokenizer( |
| self.bond_features(bond, flipped=rev)) |
|
|
| |
| if not rev: |
| connectivity[bond_index, 0] = bond.GetBeginAtomIdx() |
| connectivity[bond_index, 1] = bond.GetEndAtomIdx() |
|
|
| else: |
| connectivity[bond_index, 0] = bond.GetEndAtomIdx() |
| connectivity[bond_index, 1] = bond.GetBeginAtomIdx() |
|
|
| bond_index += 1 |
|
|
|
|
| return { |
| 'n_atom': n_atom, |
| 'n_bond': n_bond, |
| 'atom': atom_feature_matrix, |
| 'bond': bond_feature_matrix, |
| 'connectivity': connectivity, |
| } |
|
|
|
|
| class MolPreprocessor(SmilesPreprocessor): |
| """ I should refactor this into a base class and separate |
| SmilesPreprocessor classes. But the idea is that we only need to redefine |
| the `construct_feature_matrices` method to have a working preprocessor that |
| handles 3D structures. |
| |
| We'll pass an iterator of mol objects instead of SMILES strings this time, |
| though. |
| |
| """ |
|
|
| def __init__(self, n_neighbors, cutoff, **kwargs): |
| """ A preprocessor class that also returns distances between |
| neighboring atoms. Adds edges for non-bonded atoms to include a maximum |
| of n_neighbors around each atom """ |
|
|
| self.n_neighbors = n_neighbors |
| self.cutoff = cutoff |
| super(MolPreprocessor, self).__init__(**kwargs) |
|
|
|
|
| def construct_feature_matrices(self, mol): |
| """ Given an rdkit mol, return atom feature matrices, bond feature |
| matrices, and connectivity matrices. |
| |
| Returns |
| dict with entries |
| 'n_atom' : number of atoms in the molecule |
| 'n_bond' : number of edges (likely n_atom * n_neighbors) |
| 'atom' : (n_atom,) length list of atom classes |
| 'bond' : (n_bond,) list of bond classes. 0 for no bond |
| 'distance' : (n_bond,) list of bond distances |
| 'connectivity' : (n_bond, 2) array of source atom, target atom pairs. |
| |
| """ |
|
|
| n_atom = len(mol.GetAtoms()) |
|
|
| |
| |
| |
| distance_matrix = Chem.Get3DDistanceMatrix(mol) |
|
|
| if self.n_neighbors <= (n_atom - 1): |
| n_bond = self.n_neighbors * n_atom |
| else: |
| |
| |
| n_bond = distance_matrix[(distance_matrix < self.cutoff) & (distance_matrix != 0)].size |
|
|
| if n_bond == 0: n_bond = 1 |
|
|
| |
| atom_feature_matrix = np.zeros(n_atom, dtype='int') |
| bond_feature_matrix = np.zeros(n_bond, dtype='int') |
| bond_distance_matrix = np.zeros(n_bond, dtype=np.float32) |
| connectivity = np.zeros((n_bond, 2), dtype='int') |
|
|
| |
| if mol is None: |
| raise RuntimeError("Issue in loading mol") |
| |
| |
| atom_seq = mol.GetAtoms() |
| atoms = [atom_seq[i] for i in range(n_atom)] |
|
|
| |
| |
| bond_index = 0 |
| for n, atom in enumerate(atoms): |
| |
| |
| atom_feature_matrix[n] = self.atom_tokenizer( |
| self.atom_features(atom)) |
| |
| |
| |
| if (self.n_neighbors + 1) > len(mol.GetAtoms()): |
| neighbor_end_index = len(mol.GetAtoms()) |
| else: |
| neighbor_end_index = (self.n_neighbors + 1) |
|
|
| distance_atom = distance_matrix[n, :] |
| cutoff_end_index = distance_atom[distance_atom < self.cutoff].size |
|
|
| end_index = min(neighbor_end_index, cutoff_end_index) |
|
|
| |
|
|
| neighbor_inds = distance_matrix[n, :].argsort()[1:end_index] |
| if len(neighbor_inds)==0: neighbor_inds = [n] |
| for neighbor in neighbor_inds: |
| |
| |
| bond = mol.GetBondBetweenAtoms(n, int(neighbor)) |
| if bond is None: |
| bond_feature_matrix[bond_index] = 0 |
| else: |
| rev = False if bond.GetBeginAtomIdx() == n else True |
| bond_feature_matrix[bond_index] = self.bond_tokenizer( |
| self.bond_features(bond, flipped=rev)) |
|
|
| distance = distance_matrix[n, neighbor] |
| bond_distance_matrix[bond_index] = distance |
| |
| |
| connectivity[bond_index, 0] = n |
| connectivity[bond_index, 1] = neighbor |
| |
| bond_index += 1 |
|
|
| return { |
| 'n_atom': n_atom, |
| 'n_bond': n_bond, |
| 'atom': atom_feature_matrix, |
| 'bond': bond_feature_matrix, |
| 'distance': bond_distance_matrix, |
| 'connectivity': connectivity, |
| } |
|
|
|
|
| class MolBPreprocessor(MolPreprocessor): |
| """ |
| This is a subclass of Molpreprocessor that preprocessor molecule with |
| bond property target |
| """ |
| def __init__(self, **kwargs): |
| """ |
| A preprocessor class that also returns bond_target_matrix, besides the bond matrix |
| returned by MolPreprocessor. The bond_target_matrix is then used as ref to reduce molecule |
| to bond property |
| """ |
| super(MolBPreprocessor, self).__init__(**kwargs) |
|
|
| def construct_feature_matrices(self, entry): |
| """ |
| Given an entry contining rdkit molecule, bond_index and for the target property, |
| return atom |
| feature matrices, bond feature matrices, distance matrices, connectivity matrices and bond |
| ref matrices. |
| |
| returns |
| dict with entries |
| see MolPreproccessor |
| 'bond_index' : ref array to the bond index |
| """ |
| mol, bond_index_array = entry |
| |
| n_atom = len(mol.GetAtoms()) |
| n_pro = len(bond_index_array) |
|
|
| |
| |
| |
| distance_matrix = Chem.Get3DDistanceMatrix(mol) |
|
|
| if self.n_neighbors <= (n_atom - 1): |
| n_bond = self.n_neighbors * n_atom |
| else: |
| |
| |
| n_bond = distance_matrix[(distance_matrix < self.cutoff) & (distance_matrix != 0)].size |
|
|
| if n_bond == 0: n_bond = 1 |
|
|
| |
| atom_feature_matrix = np.zeros(n_atom, dtype='int') |
| bond_feature_matrix = np.zeros(n_bond, dtype='int') |
| bond_distance_matrix = np.zeros(n_bond, dtype=np.float32) |
| bond_index_matrix = np.full(n_bond, -1, dtype='int') |
| connectivity = np.zeros((n_bond, 2), dtype='int') |
|
|
| |
| if mol is None: |
| raise RuntimeError("Issue in loading mol") |
| |
| |
| atom_seq = mol.GetAtoms() |
| atoms = [atom_seq[i] for i in range(n_atom)] |
|
|
| |
| |
| bond_index = 0 |
| for n, atom in enumerate(atoms): |
| |
| atom_feature_matrix[n] = self.atom_tokenizer( |
| self.atom_features(atom)) |
| |
| |
| |
| if (self.n_neighbors + 1) > len(mol.GetAtoms()): |
| neighbor_end_index = len(mol.GetAtoms()) |
| else: |
| neighbor_end_index = (self.n_neighbors + 1) |
|
|
| distance_atom = distance_matrix[n, :] |
| cutoff_end_index = distance_atom[distance_atom < self.cutoff].size |
|
|
| end_index = min(neighbor_end_index, cutoff_end_index) |
|
|
| |
|
|
| neighbor_inds = distance_matrix[n, :].argsort()[1:end_index] |
| if len(neighbor_inds)==0: neighbor_inds = [n] |
| for neighbor in neighbor_inds: |
| |
| |
| bond = mol.GetBondBetweenAtoms(n, int(neighbor)) |
| if bond is None: |
| bond_feature_matrix[bond_index] = 0 |
| else: |
| rev = False if bond.GetBeginAtomIdx() == n else True |
| bond_feature_matrix[bond_index] = self.bond_tokenizer( |
| self.bond_features(bond, flipped=rev)) |
| try: |
| bond_index_matrix[bond_index] = bond_index_array.tolist().index(bond.GetIdx()) |
| except: |
| pass |
|
|
| distance = distance_matrix[n, neighbor] |
| bond_distance_matrix[bond_index] = distance |
| |
| |
| connectivity[bond_index, 0] = n |
| connectivity[bond_index, 1] = neighbor |
| |
| bond_index += 1 |
| return { |
| 'n_atom': n_atom, |
| 'n_bond': n_bond, |
| 'n_pro': n_pro, |
| 'atom': atom_feature_matrix, |
| 'bond': bond_feature_matrix, |
| 'distance': bond_distance_matrix, |
| 'connectivity': connectivity, |
| 'bond_index': bond_index_matrix, |
| } |
|
|
| class MolAPreprocessor(MolPreprocessor): |
| """ |
| This is a subclass of Molpreprocessor that preprocessor molecule with |
| bond property target |
| """ |
| def __init__(self, **kwargs): |
| """ |
| A preprocessor class that also returns bond_target_matrix, besides the bond matrix |
| returned by MolPreprocessor. The bond_target_matrix is then used as ref to reduce molecule |
| to bond property |
| """ |
| super(MolAPreprocessor, self).__init__(**kwargs) |
|
|
| def construct_feature_matrices(self, entry): |
| """ |
| Given an entry contining rdkit molecule, bond_index and for the target property, |
| return atom |
| feature matrices, bond feature matrices, distance matrices, connectivity matrices and bond |
| ref matrices. |
| |
| returns |
| dict with entries |
| see MolPreproccessor |
| 'bond_index' : ref array to the bond index |
| """ |
| mol, atom_index_array = entry |
| |
| n_atom = len(mol.GetAtoms()) |
| n_pro = len(atom_index_array) |
|
|
| |
| |
| |
| distance_matrix = Chem.Get3DDistanceMatrix(mol) |
|
|
| if self.n_neighbors <= (n_atom - 1): |
| n_bond = self.n_neighbors * n_atom |
| else: |
| |
| |
| n_bond = distance_matrix[(distance_matrix < self.cutoff) & (distance_matrix != 0)].size |
|
|
| if n_bond == 0: n_bond = 1 |
|
|
| |
| atom_feature_matrix = np.zeros(n_atom, dtype='int') |
| bond_feature_matrix = np.zeros(n_bond, dtype='int') |
| bond_distance_matrix = np.zeros(n_bond, dtype=np.float32) |
| atom_index_matrix = np.full(n_atom, -1, dtype='int') |
| connectivity = np.zeros((n_bond, 2), dtype='int') |
|
|
| |
| if mol is None: |
| raise RuntimeError("Issue in loading mol") |
| |
| |
| atom_seq = mol.GetAtoms() |
| atoms = [atom_seq[i] for i in range(n_atom)] |
|
|
| |
| |
| bond_index = 0 |
| for n, atom in enumerate(atoms): |
| |
| atom_feature_matrix[n] = self.atom_tokenizer( |
| self.atom_features(atom)) |
| try: |
| atom_index_matrix[n] = atom_index_array.tolist().index(atom.GetIdx()) |
| except: |
| pass |
| |
| |
| if (self.n_neighbors + 1) > len(mol.GetAtoms()): |
| neighbor_end_index = len(mol.GetAtoms()) |
| else: |
| neighbor_end_index = (self.n_neighbors + 1) |
|
|
| distance_atom = distance_matrix[n, :] |
| cutoff_end_index = distance_atom[distance_atom < self.cutoff].size |
|
|
| end_index = min(neighbor_end_index, cutoff_end_index) |
|
|
| |
|
|
| neighbor_inds = distance_matrix[n, :].argsort()[1:end_index] |
| if len(neighbor_inds)==0: neighbor_inds = [n] |
| for neighbor in neighbor_inds: |
| |
| |
| bond = mol.GetBondBetweenAtoms(n, int(neighbor)) |
| if bond is None: |
| bond_feature_matrix[bond_index] = 0 |
| else: |
| rev = False if bond.GetBeginAtomIdx() == n else True |
| bond_feature_matrix[bond_index] = self.bond_tokenizer( |
| self.bond_features(bond, flipped=rev)) |
|
|
| distance = distance_matrix[n, neighbor] |
| bond_distance_matrix[bond_index] = distance |
| |
| |
| connectivity[bond_index, 0] = n |
| connectivity[bond_index, 1] = neighbor |
| |
| bond_index += 1 |
| return { |
| 'n_atom': n_atom, |
| 'n_bond': n_bond, |
| 'n_pro': n_pro, |
| 'atom': atom_feature_matrix, |
| 'bond': bond_feature_matrix, |
| 'distance': bond_distance_matrix, |
| 'connectivity': connectivity, |
| 'atom_index': atom_index_matrix, |
| } |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| def get_max_atom_bond_size(smiles_iterator, explicit_hs=True): |
| """ Convienence function to get max_atoms, max_bonds for a set of input |
| SMILES """ |
|
|
| max_atoms = 0 |
| max_bonds = 0 |
| for smiles in tqdm(smiles_iterator): |
| mol = MolFromSmiles(smiles) |
| if explicit_hs: |
| mol = AddHs(mol) |
| max_atoms = max([max_atoms, len(mol.GetAtoms())]) |
| max_bonds = max([max_bonds, len(mol.GetBonds())]) |
|
|
| return dict(max_atoms=max_atoms, max_bonds=max_bonds*2) |
|
|
|
|
| def canonicalize_smiles(smiles, isomeric=True, sanitize=True): |
| try: |
| mol = MolFromSmiles(smiles, sanitize=sanitize) |
| return MolToSmiles(mol, isomericSmiles=isomeric) |
| except Exception: |
| pass |
|
|