Spaces:
Runtime error
Runtime error
import argparse | |
import os | |
import requests | |
import gzip | |
import shutil | |
import pandas as pd | |
from tqdm import tqdm | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
download_type_dict = { | |
'cif': 'cif.gz', | |
'pdb': 'pdb.gz', | |
'pdb1': 'pdb1.gz', | |
'xml': 'xml.gz', | |
'sf': '-sf.cif.gz', | |
'mr': 'mr.gz', | |
'mrstr': '_mr.str.gz' | |
} | |
BASE_URL = "https://files.rcsb.org/download" | |
def download_and_unzip(file_name, out_dir, unzip): | |
url = f"{BASE_URL}/{file_name}" | |
out_path = os.path.join(out_dir, file_name) | |
message = f"{file_name} successfully downloaded" | |
if os.path.exists(out_path): | |
message = f"{out_path} already exists, skipping" | |
return message | |
if unzip and os.path.exists(out_path[:-3]): | |
message = f"{out_path[:-3]} already exists, skipping" | |
return message | |
try: | |
response = requests.get(url, stream=True) | |
response.raise_for_status() | |
with open(out_path, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
if unzip and out_path.endswith('.gz'): | |
with gzip.open(out_path, 'rb') as gz_file: | |
with open(out_path[:-3], 'wb') as out_file: | |
shutil.copyfileobj(gz_file, out_file) | |
os.remove(out_path) | |
except Exception as e: | |
message = f"{file_name} failed, {e}" | |
return message | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Download and optionally unzip files from RCSB.') | |
parser.add_argument('-i', '--pdb_id', help='Single PDB ID to download') | |
parser.add_argument('-f', '--pdb_id_file', help='Input file containing a list of PDB IDs') | |
parser.add_argument('-o', '--out_dir', default='.', help='Output directory') | |
parser.add_argument('-t', '--type', default='pdb', choices=['cif', 'pdb', 'pdb1', 'xml', 'sf', 'mr', 'mrstr'], help='File type to download') | |
parser.add_argument('-u', '--unzip', action='store_true', help='Unzip the downloaded files') | |
parser.add_argument('-e', '--error_file', help='File to write PDB ids that failed to download') | |
parser.add_argument('-n', '--num_workers', default=12, type=int, help='Number of workers to use for downloading') | |
args = parser.parse_args() | |
if not args.pdb_id and not args.pdb_id_file: | |
print("Error: Must provide either pdb_id or pdb_id_file") | |
exit(1) | |
os.makedirs(args.out_dir, exist_ok=True) | |
error_proteins = [] | |
error_messages = [] | |
if args.pdb_id: | |
message = download_and_unzip(f"{args.pdb_id}.{download_type_dict[args.type]}", args.out_dir, args.unzip) | |
print(message) | |
if "failed" in message: | |
error_proteins.append(args.pdb_id) | |
error_messages.append(message) | |
elif args.pdb_id_file: | |
pdbs = open(args.pdb_id_file, 'r').read().splitlines() | |
def download_file(pdb, download_type_dict, args): | |
message = download_and_unzip(f"{pdb}.{download_type_dict[args.type]}", args.out_dir, args.unzip) | |
return pdb, message | |
with ThreadPoolExecutor(max_workers=args.num_workers) as executor: | |
future_to_pdb = {executor.submit(download_file, pdb, download_type_dict, args): pdb for pdb in pdbs} | |
with tqdm(total=len(pdbs), desc="Downloading Files") as bar: | |
for future in as_completed(future_to_pdb): | |
pdb, message = future.result() | |
bar.set_description(message) | |
if "failed" in message: | |
error_proteins.append(pdb) | |
error_messages.append(message) | |
bar.update(1) | |
if error_proteins and args.error_file: | |
error_dict = {'protein': error_proteins, 'message': error_messages} | |
error_file_dir = os.path.dirname(args.error_file) | |
os.makedirs(error_file_dir, exist_ok=True) | |
pd.DataFrame(error_dict).to_csv(args.error_file, index=False) |