File size: 3,790 Bytes
8918ac7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import argparse
import requests
import os
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed

def download_fasta(uniprot_id, outdir, merge_output=False):
    url = f"https://www.uniprot.org/uniprot/{uniprot_id}.fasta"
    response = requests.get(url)
    
    if not merge_output:
        out_path = os.path.join(outdir, f"{uniprot_id}.fasta")
        if os.path.exists(out_path):
            return uniprot_id, f"{uniprot_id}.fasta already exists, skipping", None
    
    if response.status_code != 200:
        return uniprot_id, f"{uniprot_id}.fasta failed, {response.status_code}", None

    if merge_output:
        return uniprot_id, f"{uniprot_id}.fasta successfully downloaded", response.text
    else:
        output_file = os.path.join(outdir, f"{uniprot_id}.fasta")
        with open(output_file, 'w') as file:
            file.write(response.text)
        return uniprot_id, f"{uniprot_id}.fasta successfully downloaded", None

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Download FASTA files from UniProt.')
    parser.add_argument('-i', '--uniprot_id', help='Single UniProt ID to download')
    parser.add_argument('-f', '--file', help='Input file containing UniProt IDs')
    parser.add_argument('-o', '--out_dir', help='Directory to save FASTA files')
    parser.add_argument('-n', '--num_workers', type=int, default=12, help='Number of workers to use for downloading')
    parser.add_argument('-m', '--merge', action='store_true', help='Merge all sequences into a single FASTA file')
    parser.add_argument('-e', '--error_file', help='File to save failed downloads. If not provided, errors will be printed to console')
    args = parser.parse_args()

    if not args.uniprot_id and not args.file:
        print("Error: Must provide either uniprot_id or file")
        exit(1)

    os.makedirs(args.out_dir, exist_ok=True)
    error_proteins = []
    error_messages = []
    all_sequences = []
    
    if args.uniprot_id:
        uid, message, sequence = download_fasta(args.uniprot_id, args.out_dir, args.merge)
        print(message)
        if "failed" in message:
            error_proteins.append(uid)
            error_messages.append(message)
        elif args.merge and sequence:
            all_sequences.append(sequence)
    
    elif args.file:
        uids = open(args.file, 'r').read().splitlines()
        with ThreadPoolExecutor(max_workers=args.num_workers) as executor:
            future_to_fasta = {executor.submit(download_fasta, uid, args.out_dir, args.merge): uid for uid in uids}

            with tqdm(total=len(uids), desc="Downloading Files") as bar:
                for future in as_completed(future_to_fasta):
                    uid, message, sequence = future.result()
                    bar.set_description(message)
                    if "failed" in message:
                        error_proteins.append(uid)
                        error_messages.append(message)
                    elif args.merge and sequence:
                        all_sequences.append(sequence)
                    bar.update(1)
    
    if args.merge and all_sequences:
        merged_file = os.path.join(args.out_dir, "merged.fasta")
        with open(merged_file, 'w') as f:
            f.write(''.join(all_sequences))
    
    if error_proteins and args.error_file:
        with open(args.error_file, 'w') as f:
            for protein, message in zip(error_proteins, error_messages):
                f.write(f"{protein} - {message}\n")
    elif error_proteins:
        print("Failed downloads:")
        for protein, message in zip(error_proteins, error_messages):
            print(f"{protein} - {message}")