VenusFactory / src /crawler /sequence /download_uniprot_seq.py
2dogey's picture
Upload folder using huggingface_hub
8918ac7 verified
import argparse
import requests
import os
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
def download_fasta(uniprot_id, outdir, merge_output=False):
url = f"https://www.uniprot.org/uniprot/{uniprot_id}.fasta"
response = requests.get(url)
if not merge_output:
out_path = os.path.join(outdir, f"{uniprot_id}.fasta")
if os.path.exists(out_path):
return uniprot_id, f"{uniprot_id}.fasta already exists, skipping", None
if response.status_code != 200:
return uniprot_id, f"{uniprot_id}.fasta failed, {response.status_code}", None
if merge_output:
return uniprot_id, f"{uniprot_id}.fasta successfully downloaded", response.text
else:
output_file = os.path.join(outdir, f"{uniprot_id}.fasta")
with open(output_file, 'w') as file:
file.write(response.text)
return uniprot_id, f"{uniprot_id}.fasta successfully downloaded", None
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Download FASTA files from UniProt.')
parser.add_argument('-i', '--uniprot_id', help='Single UniProt ID to download')
parser.add_argument('-f', '--file', help='Input file containing UniProt IDs')
parser.add_argument('-o', '--out_dir', help='Directory to save FASTA files')
parser.add_argument('-n', '--num_workers', type=int, default=12, help='Number of workers to use for downloading')
parser.add_argument('-m', '--merge', action='store_true', help='Merge all sequences into a single FASTA file')
parser.add_argument('-e', '--error_file', help='File to save failed downloads. If not provided, errors will be printed to console')
args = parser.parse_args()
if not args.uniprot_id and not args.file:
print("Error: Must provide either uniprot_id or file")
exit(1)
os.makedirs(args.out_dir, exist_ok=True)
error_proteins = []
error_messages = []
all_sequences = []
if args.uniprot_id:
uid, message, sequence = download_fasta(args.uniprot_id, args.out_dir, args.merge)
print(message)
if "failed" in message:
error_proteins.append(uid)
error_messages.append(message)
elif args.merge and sequence:
all_sequences.append(sequence)
elif args.file:
uids = open(args.file, 'r').read().splitlines()
with ThreadPoolExecutor(max_workers=args.num_workers) as executor:
future_to_fasta = {executor.submit(download_fasta, uid, args.out_dir, args.merge): uid for uid in uids}
with tqdm(total=len(uids), desc="Downloading Files") as bar:
for future in as_completed(future_to_fasta):
uid, message, sequence = future.result()
bar.set_description(message)
if "failed" in message:
error_proteins.append(uid)
error_messages.append(message)
elif args.merge and sequence:
all_sequences.append(sequence)
bar.update(1)
if args.merge and all_sequences:
merged_file = os.path.join(args.out_dir, "merged.fasta")
with open(merged_file, 'w') as f:
f.write(''.join(all_sequences))
if error_proteins and args.error_file:
with open(args.error_file, 'w') as f:
for protein, message in zip(error_proteins, error_messages):
f.write(f"{protein} - {message}\n")
elif error_proteins:
print("Failed downloads:")
for protein, message in zip(error_proteins, error_messages):
print(f"{protein} - {message}")