Spaces:
Runtime error
Runtime error
import argparse | |
import requests | |
import os | |
from tqdm import tqdm | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
def download_fasta(uniprot_id, outdir, merge_output=False): | |
url = f"https://www.uniprot.org/uniprot/{uniprot_id}.fasta" | |
response = requests.get(url) | |
if not merge_output: | |
out_path = os.path.join(outdir, f"{uniprot_id}.fasta") | |
if os.path.exists(out_path): | |
return uniprot_id, f"{uniprot_id}.fasta already exists, skipping", None | |
if response.status_code != 200: | |
return uniprot_id, f"{uniprot_id}.fasta failed, {response.status_code}", None | |
if merge_output: | |
return uniprot_id, f"{uniprot_id}.fasta successfully downloaded", response.text | |
else: | |
output_file = os.path.join(outdir, f"{uniprot_id}.fasta") | |
with open(output_file, 'w') as file: | |
file.write(response.text) | |
return uniprot_id, f"{uniprot_id}.fasta successfully downloaded", None | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='Download FASTA files from UniProt.') | |
parser.add_argument('-i', '--uniprot_id', help='Single UniProt ID to download') | |
parser.add_argument('-f', '--file', help='Input file containing UniProt IDs') | |
parser.add_argument('-o', '--out_dir', help='Directory to save FASTA files') | |
parser.add_argument('-n', '--num_workers', type=int, default=12, help='Number of workers to use for downloading') | |
parser.add_argument('-m', '--merge', action='store_true', help='Merge all sequences into a single FASTA file') | |
parser.add_argument('-e', '--error_file', help='File to save failed downloads. If not provided, errors will be printed to console') | |
args = parser.parse_args() | |
if not args.uniprot_id and not args.file: | |
print("Error: Must provide either uniprot_id or file") | |
exit(1) | |
os.makedirs(args.out_dir, exist_ok=True) | |
error_proteins = [] | |
error_messages = [] | |
all_sequences = [] | |
if args.uniprot_id: | |
uid, message, sequence = download_fasta(args.uniprot_id, args.out_dir, args.merge) | |
print(message) | |
if "failed" in message: | |
error_proteins.append(uid) | |
error_messages.append(message) | |
elif args.merge and sequence: | |
all_sequences.append(sequence) | |
elif args.file: | |
uids = open(args.file, 'r').read().splitlines() | |
with ThreadPoolExecutor(max_workers=args.num_workers) as executor: | |
future_to_fasta = {executor.submit(download_fasta, uid, args.out_dir, args.merge): uid for uid in uids} | |
with tqdm(total=len(uids), desc="Downloading Files") as bar: | |
for future in as_completed(future_to_fasta): | |
uid, message, sequence = future.result() | |
bar.set_description(message) | |
if "failed" in message: | |
error_proteins.append(uid) | |
error_messages.append(message) | |
elif args.merge and sequence: | |
all_sequences.append(sequence) | |
bar.update(1) | |
if args.merge and all_sequences: | |
merged_file = os.path.join(args.out_dir, "merged.fasta") | |
with open(merged_file, 'w') as f: | |
f.write(''.join(all_sequences)) | |
if error_proteins and args.error_file: | |
with open(args.error_file, 'w') as f: | |
for protein, message in zip(error_proteins, error_messages): | |
f.write(f"{protein} - {message}\n") | |
elif error_proteins: | |
print("Failed downloads:") | |
for protein, message in zip(error_proteins, error_messages): | |
print(f"{protein} - {message}") |