|
import os |
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
from tqdm import tqdm |
|
import pandas as pd |
|
from typing import List |
|
|
|
from rdkit import Chem |
|
from rdkit.Chem import AllChem |
|
|
|
from transformers import PretrainedConfig |
|
from transformers import PreTrainedModel |
|
from transformers import AutoModel |
|
|
|
from torch_geometric.nn import GCNConv |
|
from torch_geometric.data import Data |
|
from torch_geometric.loader import DataLoader |
|
from torch_scatter import scatter |
|
|
|
|
|
class SmilesDataset(torch.utils.data.Dataset): |
|
def __init__(self, smiles): |
|
self.smiles_list = smiles |
|
self.data_list = [] |
|
|
|
|
|
def __len__(self): |
|
return len(self.data_list) |
|
|
|
def __getitem__(self, idx): |
|
return self.data_list[idx] |
|
|
|
def get_data(self, smiles): |
|
self.smiles_list = smiles |
|
|
|
|
|
types = {'H': 0, 'C': 1, 'N': 2, 'O': 3, 'S': 4} |
|
|
|
for i in range(len(self.smiles_list)): |
|
|
|
|
|
mol = Chem.MolFromSmiles(self.smiles_list[i]) |
|
if mol is None: |
|
print("无法创建Mol对象", self.smiles_list[i]) |
|
else: |
|
|
|
mol3d = Chem.AddHs( |
|
mol) |
|
if mol3d is None: |
|
print("无法创建mol3d对象", self.smiles_list[i]) |
|
else: |
|
AllChem.EmbedMolecule(mol3d, randomSeed=1) |
|
|
|
N = mol3d.GetNumAtoms() |
|
|
|
if mol3d.GetNumConformers() > 0: |
|
conformer = mol3d.GetConformer() |
|
pos = conformer.GetPositions() |
|
pos = torch.tensor(pos, dtype=torch.float) |
|
|
|
type_idx = [] |
|
|
|
|
|
|
|
|
|
|
|
for atom in mol3d.GetAtoms(): |
|
type_idx.append(types[atom.GetSymbol()]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
row, col, edge_type = [], [], [] |
|
for bond in mol3d.GetBonds(): |
|
start, end = bond.GetBeginAtomIdx(), bond.GetEndAtomIdx() |
|
row += [start, end] |
|
col += [end, start] |
|
|
|
|
|
edge_index = torch.tensor([row, col], dtype=torch.long) |
|
|
|
|
|
|
|
perm = (edge_index[0] * N + edge_index[1]).argsort() |
|
edge_index = edge_index[:, perm] |
|
|
|
|
|
|
|
|
|
|
|
|
|
x = torch.tensor(type_idx).to(torch.float) |
|
|
|
|
|
|
|
data = Data(x=x, pos=pos, edge_index=edge_index, smiles=self.smiles_list[i]) |
|
|
|
self.data_list.append(data) |
|
else: |
|
print("无法创建comfor", self.smiles_list[i]) |
|
return self.data_list |
|
|
|
""" |
|
MLP Layer used after graph vector representation |
|
""" |
|
class MLPReadout(nn.Module): |
|
|
|
def __init__(self, input_dim, output_dim, L=2): |
|
super().__init__() |
|
list_FC_layers = [nn.Linear(input_dim // 2 ** l, input_dim // 2 ** (l + 1), bias=True) for l in range(L)] |
|
list_FC_layers.append(nn.Linear(input_dim // 2 ** L, output_dim, bias=True)) |
|
self.FC_layers = nn.ModuleList(list_FC_layers) |
|
self.L = L |
|
|
|
def forward(self, x): |
|
y = x |
|
for l in range(self.L): |
|
y = self.FC_layers[l](y) |
|
y = F.relu(y) |
|
y = self.FC_layers[self.L](y) |
|
return y |
|
|
|
class GCNNet(torch.nn.Module): |
|
def __init__(self, input_feature=64, emb_input=20, hidden_size=64, n_layers=6, num_classes=1): |
|
super(GCNNet, self).__init__() |
|
|
|
self.embedding = torch.nn.Embedding(emb_input, hidden_size, padding_idx=0) |
|
self.input_feature = input_feature |
|
self.n_layers = n_layers |
|
self.num_classes = num_classes |
|
|
|
self.conv1 = GCNConv(hidden_size, hidden_size) |
|
|
|
self.conv2 = GCNConv(hidden_size, 32) |
|
self.mlp = MLPReadout(32, num_classes) |
|
|
|
def forward_features(self, data): |
|
x, edge_index, batch = data.x.long(), data.edge_index, data.batch |
|
x = self.embedding(x.reshape(-1)) |
|
|
|
for i in range(self.n_layers): |
|
x = F.relu(self.conv1(x, edge_index)) |
|
|
|
x = F.relu(self.conv2(x, edge_index)) |
|
x = scatter(x, batch, dim=-2, reduce='mean') |
|
x = self.mlp(x) |
|
|
|
return x.squeeze(-1) |
|
|
|
|
|
class GCNConfig(PretrainedConfig): |
|
model_type = "gcn" |
|
|
|
def __init__( |
|
self, |
|
input_feature: int=64, |
|
emb_input: int=20, |
|
hidden_size: int=64, |
|
n_layers: int=6, |
|
num_classes: int=1, |
|
|
|
smiles: List[str] = None, |
|
processor_class: str = "SmilesProcessor", |
|
**kwargs, |
|
): |
|
|
|
self.input_feature = input_feature |
|
self.emb_input = emb_input |
|
self.hidden_size = hidden_size |
|
self.n_layers = n_layers |
|
self.num_classes = num_classes |
|
|
|
self.smiles = smiles |
|
self.processor_class = processor_class |
|
|
|
super().__init__(**kwargs) |
|
|
|
|
|
class GCNModel(PreTrainedModel): |
|
config_class = GCNConfig |
|
|
|
def __init__(self, config): |
|
super().__init__(config) |
|
|
|
self.model = GCNNet( |
|
input_feature=config.input_feature, |
|
emb_input=config.emb_input, |
|
hidden_size=config.hidden_size, |
|
n_layers=config.n_layers, |
|
num_classes=config.num_classes, |
|
) |
|
self.process = SmilesDataset( |
|
smiles=config.smiles, |
|
) |
|
|
|
self.gcn_model = None |
|
self.dataset = None |
|
self.output = None |
|
self.data_loader = None |
|
self.pred_data = None |
|
|
|
def forward(self, tensor): |
|
return self.model.forward_features(tensor) |
|
|
|
|
|
|
|
|
|
def predict_smiles(self, smiles, device: str='cpu', result_dir: str='./', **kwargs): |
|
|
|
|
|
batch_size = kwargs.pop('batch_size', 1) |
|
shuffle = kwargs.pop('shuffle', False) |
|
drop_last = kwargs.pop('drop_last', False) |
|
num_workers = kwargs.pop('num_workers', 0) |
|
|
|
self.gcn_model = AutoModel.from_pretrained("Huhujingjing/custom-gcn", trust_remote_code=True).to(device) |
|
self.gcn_model.eval() |
|
|
|
self.dataset = self.process.get_data(smiles) |
|
self.output = "" |
|
self.output += ("predicted samples num: {}\n".format(len(self.dataset))) |
|
self.output +=("predicted samples:{}\n".format(self.dataset[0])) |
|
self.data_loader = DataLoader(self.dataset, |
|
batch_size=batch_size, |
|
shuffle=shuffle, |
|
drop_last=drop_last, |
|
num_workers=num_workers |
|
) |
|
self.pred_data = { |
|
'smiles': [], |
|
'pred': [] |
|
} |
|
|
|
for batch in tqdm(self.data_loader): |
|
batch = batch.to(device) |
|
with torch.no_grad(): |
|
self.pred_data['smiles'] += batch['smiles'] |
|
self.pred_data['pred'] += self.gcn_model(batch).cpu().tolist() |
|
|
|
pred = torch.tensor(self.pred_data['pred']).reshape(-1) |
|
if device == 'cuda': |
|
pred = pred.cpu().tolist() |
|
self.pred_data['pred'] = pred |
|
pred_df = pd.DataFrame(self.pred_data) |
|
pred_df['pred'] = pred_df['pred'].apply(lambda x: round(x, 2)) |
|
self.output +=('-' * 40 + '\n'+'predicted result: \n'+'{}\n'.format(pred_df)) |
|
self.output +=('-' * 40) |
|
|
|
pred_df.to_csv(os.path.join(result_dir, 'gcn.csv'), index=False) |
|
self.output +=('\nsave predicted result to {}\n'.format(os.path.join(result_dir, 'gcn.csv'))) |
|
|
|
return self.output |
|
|
|
|
|
if __name__ == "__main__": |
|
gcn_config = GCNConfig(input_feature=64, emb_input=20, hidden_size=64, n_layers=6, num_classes=1, |
|
smiles=["C", "CC", "CCC"], processor_class="SmilesProcessor") |
|
|
|
gcnd = GCNModel(gcn_config) |
|
gcnd.model.load_state_dict(torch.load(r'G:\Trans_MXM\gcn_model\gcn.pt')) |
|
gcnd.save_pretrained("custom-gcn") |
|
|
|
|