Spaces:
Sleeping
Sleeping
| """ | |
| Optimize protein sequences using ColiFormer. | |
| This script provides a user-friendly interface for codon optimization, | |
| supporting both single sequences and batch processing via FASTA files. | |
| Usage: | |
| # Single sequence | |
| python scripts/optimize_sequence.py --input "MALWMRLLPLLALLALWGPDPAAAFVNQHLCGSHLVEALYLVCGERGFFYTPKTRREAEDLQVGQVELGG" --output optimized.fasta | |
| # Batch processing from FASTA file | |
| python scripts/optimize_sequence.py --input sequences.fasta --output optimized.fasta --batch | |
| # With GC content constraints | |
| python scripts/optimize_sequence.py --input protein.fasta --output optimized.fasta --gc-min 0.45 --gc-max 0.55 | |
| """ | |
| import argparse | |
| import os | |
| import sys | |
| from pathlib import Path | |
| from typing import Any, List, Tuple | |
| # Add parent directory to path to import CodonTransformer | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| def parse_fasta(fasta_path: str) -> List[Tuple[str, str]]: | |
| """ | |
| Parse FASTA file into list of (name, sequence) tuples. | |
| Args: | |
| fasta_path: Path to FASTA file | |
| Returns: | |
| List of (name, sequence) tuples | |
| """ | |
| sequences = [] | |
| current_name = None | |
| current_seq = [] | |
| with open(fasta_path, 'r') as f: | |
| for line in f: | |
| line = line.strip() | |
| if line.startswith('>'): | |
| if current_name is not None: | |
| sequences.append((current_name, ''.join(current_seq))) | |
| current_name = line[1:] if len(line) > 1 else f"sequence_{len(sequences)+1}" | |
| current_seq = [] | |
| else: | |
| current_seq.append(line.upper()) | |
| if current_name is not None: | |
| sequences.append((current_name, ''.join(current_seq))) | |
| return sequences | |
| def write_fasta(output_path: str, sequences: List[Tuple[str, str]]): | |
| """ | |
| Write sequences to FASTA file. | |
| Args: | |
| output_path: Output FASTA file path | |
| sequences: List of (name, sequence) tuples | |
| """ | |
| with open(output_path, 'w') as f: | |
| for name, seq in sequences: | |
| f.write(f">{name}\n") | |
| # Write sequence in 60-character lines | |
| for i in range(0, len(seq), 60): | |
| f.write(seq[i:i+60] + "\n") | |
| def optimize_single_sequence( | |
| protein: str, | |
| model: Any, | |
| tokenizer: Any, | |
| device: Any, | |
| organism: str = "Escherichia coli general", | |
| gc_min: float = None, | |
| gc_max: float = None, | |
| cai_weights: dict = None, | |
| tai_weights: dict = None | |
| ) -> dict: | |
| """ | |
| Optimize a single protein sequence. | |
| Args: | |
| protein: Protein sequence string | |
| model: Loaded ColiFormer model | |
| tokenizer: Tokenizer | |
| device: PyTorch device | |
| organism: Target organism name | |
| gc_min: Minimum GC content (0-1) | |
| gc_max: Maximum GC content (0-1) | |
| cai_weights: CAI weights dictionary | |
| tai_weights: tAI weights dictionary | |
| Returns: | |
| Dictionary with optimization results | |
| """ | |
| # Lazy imports so `python scripts/optimize_sequence.py --help` works without ML deps installed. | |
| from CodonTransformer.CodonPrediction import predict_dna_sequence | |
| from CodonTransformer.CodonEvaluation import get_GC_content, calculate_tAI | |
| from CAI import CAI | |
| # Determine GC bounds if specified | |
| gc_bounds = None | |
| use_constrained = False | |
| if gc_min is not None and gc_max is not None: | |
| gc_bounds = (gc_min, gc_max) | |
| use_constrained = True | |
| # Run optimization | |
| output = predict_dna_sequence( | |
| protein=protein, | |
| organism=organism, | |
| device=device, | |
| model=model, | |
| tokenizer=tokenizer, | |
| deterministic=True, | |
| match_protein=True, | |
| use_constrained_search=use_constrained, | |
| gc_bounds=gc_bounds, | |
| beam_size=20 if use_constrained else 5, | |
| ) | |
| if isinstance(output, list): | |
| output = output[0] | |
| optimized_dna = output.predicted_dna | |
| # Calculate metrics | |
| gc_content = get_GC_content(optimized_dna) / 100.0 # Convert to fraction | |
| metrics = { | |
| 'protein': protein, | |
| 'optimized_dna': optimized_dna, | |
| 'gc_content': gc_content, | |
| 'length': len(optimized_dna), | |
| } | |
| if cai_weights: | |
| try: | |
| metrics['cai'] = CAI(optimized_dna, weights=cai_weights) | |
| except: | |
| metrics['cai'] = None | |
| else: | |
| metrics['cai'] = None | |
| if tai_weights: | |
| try: | |
| metrics['tai'] = calculate_tAI(optimized_dna, tai_weights) | |
| except: | |
| metrics['tai'] = None | |
| else: | |
| metrics['tai'] = None | |
| return metrics | |
| def load_reference_data(ref_sequences_path: str = None): | |
| """ | |
| Load reference sequences and calculate CAI weights. | |
| Args: | |
| ref_sequences_path: Path to CSV with reference sequences | |
| Returns: | |
| Tuple of (cai_weights, tai_weights) | |
| """ | |
| # Lazy imports so `--help` works without ML deps installed. | |
| import pandas as pd | |
| from CAI import relative_adaptiveness | |
| from CodonTransformer.CodonEvaluation import get_ecoli_tai_weights | |
| cai_weights = None | |
| tai_weights = None | |
| # Try to load reference sequences for CAI | |
| if ref_sequences_path and os.path.exists(ref_sequences_path): | |
| try: | |
| df = pd.read_csv(ref_sequences_path) | |
| if 'dna_sequence' in df.columns: | |
| ref_sequences = df['dna_sequence'].tolist() | |
| cai_weights = relative_adaptiveness(sequences=ref_sequences) | |
| print(f"Loaded CAI weights from {len(ref_sequences)} reference sequences") | |
| except Exception as e: | |
| print(f"Warning: Could not load CAI weights: {e}") | |
| # Load tAI weights | |
| try: | |
| tai_weights = get_ecoli_tai_weights() | |
| print("Loaded E. coli tAI weights") | |
| except Exception as e: | |
| print(f"Warning: Could not load tAI weights: {e}") | |
| return cai_weights, tai_weights | |
| def main(): | |
| """Main entry point for sequence optimization.""" | |
| parser = argparse.ArgumentParser( | |
| description="Optimize protein sequences using ENCOT", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| # Single sequence | |
| python scripts/optimize_sequence.py --input "MALWMRLLPLLALLALWGPDPAAAFVNQHLCGSHLVEALYLVCGERGFFYTPKTRREAEDLQVGQVELGG" --output optimized.fasta | |
| # Batch processing from FASTA file | |
| python scripts/optimize_sequence.py --input sequences.fasta --output optimized.fasta --batch | |
| # With GC content constraints | |
| python scripts/optimize_sequence.py --input protein.fasta --output optimized.fasta --gc-min 0.45 --gc-max 0.55 | |
| # Use custom checkpoint | |
| python scripts/optimize_sequence.py --input protein.fasta --output optimized.fasta --checkpoint models/my_model.ckpt | |
| """ | |
| ) | |
| parser.add_argument( | |
| "--input", | |
| type=str, | |
| required=True, | |
| help="Input protein sequence (string) or FASTA file path" | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| type=str, | |
| required=True, | |
| help="Output FASTA file path" | |
| ) | |
| parser.add_argument( | |
| "--checkpoint", | |
| type=str, | |
| default=None, | |
| help="Path to model checkpoint (default: auto-download from Hugging Face)" | |
| ) | |
| parser.add_argument( | |
| "--organism", | |
| type=str, | |
| default="Escherichia coli general", | |
| help="Target organism (default: Escherichia coli general)" | |
| ) | |
| parser.add_argument( | |
| "--gc-min", | |
| type=float, | |
| default=None, | |
| help="Minimum GC content (0-1, e.g., 0.45 for 45%%)" | |
| ) | |
| parser.add_argument( | |
| "--gc-max", | |
| type=float, | |
| default=None, | |
| help="Maximum GC content (0-1, e.g., 0.55 for 55%%)" | |
| ) | |
| parser.add_argument( | |
| "--batch", | |
| action="store_true", | |
| help="Process input as FASTA file with multiple sequences" | |
| ) | |
| parser.add_argument( | |
| "--ref-sequences", | |
| type=str, | |
| default="data/ecoli_processed_genes.csv", | |
| help="Path to reference sequences CSV for CAI calculation" | |
| ) | |
| parser.add_argument( | |
| "--use-gpu", | |
| action="store_true", | |
| help="Use GPU if available" | |
| ) | |
| args = parser.parse_args() | |
| try: | |
| # Lazy imports so `--help` works without ML deps installed. | |
| import torch | |
| from transformers import AutoTokenizer | |
| from CodonTransformer.CodonPrediction import load_model | |
| import pandas as pd | |
| # Setup device | |
| device = torch.device("cuda" if torch.cuda.is_available() and args.use_gpu else "cpu") | |
| print(f"Using device: {device}") | |
| # Load model | |
| print("Loading ColiFormer model...") | |
| if args.checkpoint: | |
| model = load_model(model_path=args.checkpoint, device=device) | |
| print(f"Loaded model from {args.checkpoint}") | |
| else: | |
| # Try to load from Hugging Face | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| checkpoint_path = hf_hub_download( | |
| repo_id="saketh11/ColiFormer", | |
| filename="balanced_alm_finetune.ckpt", | |
| cache_dir="./hf_cache" | |
| ) | |
| model = load_model(model_path=checkpoint_path, device=device) | |
| print("Loaded model from Hugging Face (saketh11/ColiFormer)") | |
| except Exception as e: | |
| print(f"Warning: Could not load from Hugging Face: {e}") | |
| print("Falling back to base CodonTransformer model...") | |
| from transformers import BigBirdForMaskedLM | |
| model = BigBirdForMaskedLM.from_pretrained("adibvafa/CodonTransformer").to(device) | |
| # Load tokenizer | |
| tokenizer = AutoTokenizer.from_pretrained("adibvafa/CodonTransformer") | |
| # Load reference data for metrics | |
| cai_weights, tai_weights = load_reference_data(args.ref_sequences) | |
| # Parse input | |
| if args.batch or os.path.exists(args.input): | |
| # FASTA file | |
| print(f"Reading sequences from {args.input}...") | |
| sequences = parse_fasta(args.input) | |
| print(f"Found {len(sequences)} sequences") | |
| else: | |
| # Single sequence string | |
| sequences = [("sequence_1", args.input.upper())] | |
| # Optimize sequences | |
| optimized_sequences = [] | |
| results = [] | |
| for i, (name, protein_seq) in enumerate(sequences, 1): | |
| print(f"\nOptimizing sequence {i}/{len(sequences)}: {name}") | |
| metrics = optimize_single_sequence( | |
| protein=protein_seq, | |
| model=model, | |
| tokenizer=tokenizer, | |
| device=device, | |
| organism=args.organism, | |
| gc_min=args.gc_min, | |
| gc_max=args.gc_max, | |
| cai_weights=cai_weights, | |
| tai_weights=tai_weights | |
| ) | |
| optimized_sequences.append((name, metrics['optimized_dna'])) | |
| results.append({ | |
| 'name': name, | |
| 'protein_length': len(protein_seq), | |
| 'dna_length': metrics['length'], | |
| 'gc_content': f"{metrics['gc_content']*100:.2f}%", | |
| 'cai': metrics['cai'], | |
| 'tai': metrics['tai'], | |
| }) | |
| print(f" GC content: {metrics['gc_content']*100:.2f}%") | |
| if metrics['cai']: | |
| print(f" CAI: {metrics['cai']:.3f}") | |
| if metrics['tai']: | |
| print(f" tAI: {metrics['tai']:.3f}") | |
| # Write output | |
| write_fasta(args.output, optimized_sequences) | |
| print(f"\nOptimized sequences saved to {args.output}") | |
| # Print summary | |
| if len(results) > 1: | |
| print("\n" + "="*60) | |
| print("Summary Statistics") | |
| print("="*60) | |
| df = pd.DataFrame(results) | |
| print(df.to_string(index=False)) | |
| print("="*60) | |
| except Exception as e: | |
| print(f"Error: {e}", file=sys.stderr) | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |