Spaces:
Running
Running
import os | |
import subprocess | |
import sys | |
import warnings | |
import pandas as pd | |
from rdkit import Chem, RDLogger | |
from sklearn.model_selection import train_test_split | |
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) | |
from utils import remove_atom_mapping, seed_everything | |
seed_everything(seed=42) | |
# Disable RDKit warnings and Python warnings | |
RDLogger.DisableLog("rdApp.*") | |
warnings.filterwarnings("ignore") | |
script_dir = os.path.abspath(os.path.dirname(__file__)) | |
project_root = os.path.abspath(os.path.join(script_dir, "..")) | |
data_dir = os.path.join(project_root, "data") | |
files_to_download = [ | |
"1ZPsoUYb4HcxFzK_ac9rb_pQj7oO3Gagh", | |
"1XwkxxHiaWFbSNhGyxnv6hAliutIMNrIp", | |
"1yIwUH_OhER9nuMo9HjBhBmyc6zvmrSPA", | |
"1skFRirstIUijhieshvJEScBD2aB3H1YU", | |
"1fa2MyLdN1vcA7Rysk8kLQENE92YejS9B", | |
] | |
for file_id in files_to_download: | |
subprocess.run( | |
f"gdown 'https://drive.google.com/uc?export=download&id={file_id}'", shell=True | |
) | |
# Move downloaded files to data directory | |
subprocess.run("mv *.smi " + data_dir, shell=True) | |
subprocess.run("mv *.tsv " + data_dir, shell=True) | |
# Function to process SMILES files and save canonicalized versions | |
def process_smiles_files(file_paths): | |
unique_smiles = set() | |
for file_path in file_paths: | |
suppl = Chem.SmilesMolSupplier(file_path) | |
for mol in suppl: | |
if mol is not None: | |
try: | |
sm = Chem.MolToSmiles(mol, canonical=True) | |
unique_smiles.add(sm) | |
except: | |
continue | |
df = pd.DataFrame({"smiles": list(unique_smiles)}) | |
df.to_csv(os.path.join(data_dir, "ZINC-canonicalized.csv"), index=False) | |
train, valid = train_test_split(df, test_size=0.1) | |
# Save train and validation data | |
train.to_csv(os.path.join(data_dir, "ZINC-canonicalized-train.csv"), index=False) | |
valid.to_csv(os.path.join(data_dir, "ZINC-canonicalized-valid.csv"), index=False) | |
# Process 16_p files | |
process_smiles_files([os.path.join(data_dir, f"16_p{i}.smi") for i in range(4)]) | |
# Load reaction data | |
ord_df = pd.read_csv( | |
os.path.join(data_dir, "all_ord_reaction_uniq_with_attr20240506_v1.tsv"), | |
sep="\t", | |
names=["id", "input", "product", "condition"], | |
) | |
def data_split(row): | |
categories = [ | |
"CATALYST", | |
"REACTANT", | |
"REAGENT", | |
"SOLVENT", | |
"INTERNAL_STANDARD", | |
"NoData", | |
] | |
data = {cat: [] for cat in categories} | |
input_data = row["input"] | |
if isinstance(input_data, str): | |
for item in input_data.split("."): | |
for cat in categories: | |
if cat in item: | |
data[cat].append(item[item.find(":") + 1 :]) | |
break | |
for key, value in data.items(): | |
data[key] = ".".join(value) | |
product_data = row["product"] | |
if isinstance(product_data, str): | |
product_data = product_data.replace(".PRODUCT", "PRODUCT") | |
pro_lis = [] | |
for item in product_data.split("PRODUCT:"): | |
if item != "": | |
pro_lis.append(item) | |
data["PRODUCT"] = ".".join(pro_lis) | |
else: | |
data["PRODUCT"] = None | |
condition_data = row["condition"] | |
if isinstance(condition_data, str): | |
data["YIELD"] = ( | |
float(condition_data.split(":")[1]) if "YIELD" in condition_data else None | |
) | |
temp_pos = condition_data.find("TEMP") | |
data["TEMP"] = ( | |
float(condition_data[temp_pos:].split(":")[1]) | |
if "TEMP" in condition_data | |
else None | |
) | |
else: | |
data["YIELD"] = None | |
data["TEMP"] = None | |
return list(data.values()) | |
# Split data and create cleaned DataFrame | |
categories = [ | |
"CATALYST", | |
"REACTANT", | |
"REAGENT", | |
"SOLVENT", | |
"INTERNAL_STANDARD", | |
"NoData", | |
"PRODUCT", | |
"YIELD", | |
"TEMP", | |
] | |
cleaned_data = {cat: [] for cat in categories} | |
for _, row in ord_df.iterrows(): | |
split_data = data_split(row) | |
for i, value in enumerate(split_data): | |
cleaned_data[categories[i]].append(value) | |
cleaned_df = pd.DataFrame(cleaned_data) | |
# Apply remove_atom_mapping function to relevant columns | |
for column in [ | |
"CATALYST", | |
"REACTANT", | |
"REAGENT", | |
"SOLVENT", | |
"INTERNAL_STANDARD", | |
"NoData", | |
"PRODUCT", | |
]: | |
cleaned_df[column] = cleaned_df[column].apply( | |
lambda x: remove_atom_mapping(x) if isinstance(x, str) else None | |
) | |
# Save cleaned DataFrame | |
cleaned_df.to_csv(os.path.join(data_dir, "preprocessed_ord.tsv"), index=False) | |
train, valid = train_test_split(cleaned_df, test_size=int(len(cleaned_df) * 0.1)) | |
train, test = train_test_split(train, test_size=int(len(cleaned_df) * 0.1)) | |
# Save train and validation data | |
train.to_csv(os.path.join(data_dir, "preprocessed_ord_train.csv"), index=False) | |
valid.to_csv(os.path.join(data_dir, "preprocessed_ord_valid.csv"), index=False) | |
test.to_csv(os.path.join(data_dir, "preprocessed_ord_test.csv"), index=False) | |