| import hashlib
|
| import os
|
| import sys
|
| from io import StringIO
|
| from pathlib import Path
|
|
|
| import numpy as np
|
| import pandas as pd
|
| import rpy2.robjects as ro
|
| from rpy2.robjects import pandas2ri
|
| from rpy2.robjects.conversion import localconverter
|
|
|
| from r_functions import export_matrix_to_newick_r, export_similarity_network_r
|
| from usalign_runner import USalignRunner
|
|
|
|
|
| def get_TM_mat_from_df(df):
|
| unique_chains = sorted(set(df["#PDBchain1"].unique()).union(set(df["PDBchain2"].unique())))
|
| chain_to_idx = {chain: idx for idx, chain in enumerate(unique_chains)}
|
| n = len(unique_chains)
|
| matrix = np.eye(n)
|
| for _, row in df.iterrows():
|
| chain1 = row["#PDBchain1"]
|
| chain2 = row["PDBchain2"]
|
| if chain1 in chain_to_idx and chain2 in chain_to_idx:
|
| i = chain_to_idx[chain1]
|
| j = chain_to_idx[chain2]
|
| matrix[j, i] = row["TM1"]
|
| matrix[i, j] = row["TM2"]
|
|
|
| columns_names = [chain.replace("/", "").replace(".pdb:A", "") for chain in unique_chains]
|
| df = pd.DataFrame(np.array(matrix), columns=columns_names, index=columns_names)
|
| return df
|
|
|
|
|
| def calculate_md5(files):
|
| hash_md5 = hashlib.md5()
|
| sorted_files = sorted(files, key=lambda x: x.name)
|
|
|
| for file in sorted_files:
|
| with open(file.name, "rb") as f:
|
| for chunk in iter(lambda: f.read(4096), b""):
|
| hash_md5.update(chunk)
|
|
|
| return hash_md5.hexdigest()
|
|
|
|
|
| def save_pdb_files(files, data_dir="./data"):
|
| """Save uploaded PDB files to the specified directory."""
|
| if not files:
|
| return "No files uploaded"
|
|
|
|
|
| data_path = Path(data_dir)
|
| data_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| md5_hash = calculate_md5(files)
|
|
|
| file_dir = os.path.join(data_path, md5_hash)
|
|
|
| try:
|
| os.mkdir(file_dir)
|
| except Exception:
|
| pass
|
| file_dir = os.path.join(data_path, md5_hash, "pdb")
|
| try:
|
| os.mkdir(file_dir)
|
| except Exception:
|
| pass
|
| print(f"Created directory: {file_dir}")
|
|
|
|
|
| list_file = os.path.join(data_path, md5_hash, "pdb_list")
|
|
|
| filenames = []
|
|
|
| results = []
|
| for file in files:
|
|
|
| original_filename = os.path.basename(file.name)
|
| filenames.append(original_filename)
|
|
|
| target_path = os.path.join(file_dir, original_filename)
|
| print(f"Saving to: {target_path}")
|
|
|
|
|
| with open(target_path, "wb") as f:
|
| f.write(open(file.name, "rb").read())
|
| results.append(f"Saved {original_filename}")
|
|
|
|
|
| with open(list_file, "w") as f:
|
| f.write("\n".join(filenames))
|
| results.append(f"Created list file: {list_file}")
|
|
|
| return "\n".join(results)
|
|
|
|
|
| def run_usalign(md5_hash):
|
| """Run USalign on the uploaded PDB files and return results as DataFrame."""
|
| try:
|
| runner = USalignRunner()
|
| data_path = Path("./data")
|
| pdb_dir = os.path.join(data_path, md5_hash, "pdb")
|
| list_file = os.path.join(data_path, md5_hash, "pdb_list")
|
| print(str(pdb_dir))
|
| print(str(list_file))
|
| return_code, stdout, stderr = runner.run_alignment(target_dir=str(pdb_dir), pdb_list_file=str(list_file))
|
| print(stdout)
|
| print(stderr)
|
| if return_code == 0:
|
|
|
| df = pd.read_csv(StringIO(stdout), sep="\t", encoding=sys.getdefaultencoding())
|
|
|
|
|
| df.columns = [col.strip() for col in df.columns]
|
| return df
|
| else:
|
| return pd.DataFrame({"Error": [stderr]})
|
| except Exception as e:
|
| return pd.DataFrame({"Error": [e, stderr]})
|
|
|
|
|
| def run_community_analysis(results_df, data_dir, md5_hash, threshold):
|
| """Run community analysis pipeline and return results."""
|
| try:
|
|
|
| tm_matrix = get_TM_mat_from_df(results_df)
|
|
|
| tm_file = os.path.join("data", md5_hash, "tm_matrix.csv")
|
| newick_file = os.path.join("data", md5_hash, "clustering.newick")
|
|
|
| network_edges_file = os.path.join("data", md5_hash, "network_cytoscape_export.xlsx")
|
|
|
|
|
| with localconverter(ro.default_converter + pandas2ri.converter):
|
| r_tm_matrix = ro.conversion.py2rpy(tm_matrix)
|
|
|
| result = export_matrix_to_newick_r(r_tm_matrix, newick_file)
|
| newick_str = result[0]
|
|
|
| export_similarity_network_r(threshold, r_tm_matrix, network_edges_file)
|
|
|
|
|
|
|
| tm_matrix.to_csv(tm_file)
|
|
|
|
|
|
|
|
|
|
|
|
|
| return {
|
| "tm_matrix": tm_matrix,
|
| "newick_str": newick_str,
|
|
|
| "files": [
|
| tm_file,
|
| newick_file,
|
|
|
| network_edges_file,
|
|
|
| ],
|
| }
|
| except Exception as e:
|
| print("Error", str(e))
|
| return {"Error": str(e)}
|
|
|