from keras import Model from keras.layers import Input from keras.layers import Multiply from keras.layers import Dense, Dropout, Activation, Flatten from keras.layers import Convolution1D, AveragePooling1D import pandas as pd import numpy as np import keras import requests from functools import reduce from operator import add from Bio.SeqRecord import SeqRecord from Bio.SeqFeature import SeqFeature, FeatureLocation from Bio.Seq import Seq from Bio import SeqIO ntmap = {'A': (1, 0, 0, 0), 'C': (0, 1, 0, 0), 'G': (0, 0, 1, 0), 'T': (0, 0, 0, 1) } def get_seqcode(seq): return np.array(reduce(add, map(lambda c: ntmap[c], seq.upper()))).reshape((1, len(seq), -1)) def Seq_DeepCpf1_model(input_shape): Seq_deepCpf1_Input_SEQ = Input(shape=input_shape) Seq_deepCpf1_C1 = Convolution1D(80, 5, activation='relu')(Seq_deepCpf1_Input_SEQ) Seq_deepCpf1_P1 = AveragePooling1D(2)(Seq_deepCpf1_C1) Seq_deepCpf1_F = Flatten()(Seq_deepCpf1_P1) Seq_deepCpf1_DO1 = Dropout(0.3)(Seq_deepCpf1_F) Seq_deepCpf1_D1 = Dense(80, activation='relu')(Seq_deepCpf1_DO1) Seq_deepCpf1_DO2 = Dropout(0.3)(Seq_deepCpf1_D1) Seq_deepCpf1_D2 = Dense(40, activation='relu')(Seq_deepCpf1_DO2) Seq_deepCpf1_DO3 = Dropout(0.3)(Seq_deepCpf1_D2) Seq_deepCpf1_D3 = Dense(40, activation='relu')(Seq_deepCpf1_DO3) Seq_deepCpf1_DO4 = Dropout(0.3)(Seq_deepCpf1_D3) Seq_deepCpf1_Output = Dense(1, activation='linear')(Seq_deepCpf1_DO4) Seq_deepCpf1 = Model(inputs=[Seq_deepCpf1_Input_SEQ], outputs=[Seq_deepCpf1_Output]) return Seq_deepCpf1 # seq-ca model (DeepCpf1) def DeepCpf1_model(input_shape): DeepCpf1_Input_SEQ = Input(shape=input_shape) DeepCpf1_C1 = Convolution1D(80, 5, activation='relu')(DeepCpf1_Input_SEQ) DeepCpf1_P1 = AveragePooling1D(2)(DeepCpf1_C1) DeepCpf1_F = Flatten()(DeepCpf1_P1) DeepCpf1_DO1 = Dropout(0.3)(DeepCpf1_F) DeepCpf1_D1 = Dense(80, activation='relu')(DeepCpf1_DO1) DeepCpf1_DO2 = Dropout(0.3)(DeepCpf1_D1) DeepCpf1_D2 = Dense(40, activation='relu')(DeepCpf1_DO2) DeepCpf1_DO3 = Dropout(0.3)(DeepCpf1_D2) DeepCpf1_D3_SEQ = Dense(40, activation='relu')(DeepCpf1_DO3) DeepCpf1_Input_CA = Input(shape=(1,)) DeepCpf1_D3_CA = Dense(40, activation='relu')(DeepCpf1_Input_CA) DeepCpf1_M = Multiply()([DeepCpf1_D3_SEQ, DeepCpf1_D3_CA]) DeepCpf1_DO4 = Dropout(0.3)(DeepCpf1_M) DeepCpf1_Output = Dense(1, activation='linear')(DeepCpf1_DO4) DeepCpf1 = Model(inputs=[DeepCpf1_Input_SEQ, DeepCpf1_Input_CA], outputs=[DeepCpf1_Output]) return DeepCpf1 def fetch_ensembl_transcripts(gene_symbol): url = f"https://rest.ensembl.org/lookup/symbol/homo_sapiens/{gene_symbol}?expand=1;content-type=application/json" response = requests.get(url) if response.status_code == 200: gene_data = response.json() if 'Transcript' in gene_data: return gene_data['Transcript'] else: print("No transcripts found for gene:", gene_symbol) return None else: print(f"Error fetching gene data from Ensembl: {response.text}") return None def fetch_ensembl_sequence(transcript_id): url = f"https://rest.ensembl.org/sequence/id/{transcript_id}?content-type=application/json" response = requests.get(url) if response.status_code == 200: sequence_data = response.json() if 'seq' in sequence_data: return sequence_data['seq'] else: print("No sequence found for transcript:", transcript_id) return None else: print(f"Error fetching sequence data from Ensembl: {response.text}") return None def find_crispr_targets(sequence, chr, start, strand, pam="TTTN", target_length=34): targets = [] len_sequence = len(sequence) for i in range(len_sequence - target_length + 1): target_seq = sequence[i:i + target_length] if target_seq[4:7] == 'TTT': tar_start = start + i tar_end = start + i + target_length gRNA = target_seq[8:28] targets.append([target_seq, gRNA, chr, str(tar_start), str(tar_end), str(strand)]) return targets def format_prediction_output(targets, seq_deepCpf1): formatted_data = [] for target in targets: # Predict encoded_seq = get_seqcode(target[0]) # 'target' seems to be the full sequence including PAM prediction = seq_deepCpf1.predict(encoded_seq) # Format output gRNA = target[1] # gRNA is presumably the guide RNA sequence chr = target[2] # Chromosome start = target[3] # Start position end = target[4] # End position strand = target[5] # Strand target_seq = target[0] # Full target sequence including PAM formatted_data.append([chr, start, end, strand, target_seq, gRNA, prediction[0][0]]) return formatted_data def process_gene(gene_symbol, model_path): transcripts = fetch_ensembl_transcripts(gene_symbol) all_data = [] gene_sequence = '' # Initialize an empty string for the gene sequence # Load the model seq_deepCpf1 = Seq_DeepCpf1_model(input_shape=(34, 4)) seq_deepCpf1.load_weights(model_path) if transcripts: for transcript in transcripts: transcript_id = transcript['id'] chr = transcript.get('seq_region_name', 'unknown') start = transcript.get('start', 0) strand = transcript.get('strand', 'unknown') # Fetch the sequence here and concatenate if multiple transcripts gene_sequence += fetch_ensembl_sequence(transcript_id) or '' if gene_sequence: targets = find_crispr_targets(gene_sequence, chr, start, strand) if targets: formatted_data = format_prediction_output(targets, seq_deepCpf1) all_data.extend(formatted_data) else: print("Failed to retrieve transcripts.") return all_data, gene_sequence def create_genbank_features(formatted_data): features = [] for data in formatted_data: try: # Attempt to convert start and end positions to integers start = int(data[1]) end = int(data[2]) except ValueError as e: # Log the error and skip this iteration if conversion fails print(f"Error converting start/end to int: {data[1]}, {data[2]} - {e}") continue # Skip this iteration # Proceed as normal if conversion is successful strand = 1 if data[3] == '+' else -1 location = FeatureLocation(start=start, end=end, strand=strand) feature = SeqFeature(location=location, type="misc_feature", qualifiers={ 'label': data[5], # gRNA as label 'note': f"Prediction: {data[6]}" # Prediction score in note }) features.append(feature) return features def generate_genbank_file_from_data(formatted_data, gene_sequence, gene_symbol, output_path): features = create_genbank_features(formatted_data) record = SeqRecord(Seq(gene_sequence), id=gene_symbol, name=gene_symbol, description='CRISPR Cas12 predicted targets', features=features) record.annotations["molecule_type"] = "DNA" SeqIO.write(record, output_path, "genbank") def create_csv_from_df(df, output_path): df.to_csv(output_path, index=False) def generate_bed_file_from_data(formatted_data, output_path): with open(output_path, 'w') as bed_file: for data in formatted_data: try: # Ensure data has the expected number of elements if len(data) < 7: raise ValueError("Incomplete data item") chrom = data[0] start = data[1] end = data[2] strand = '+' if data[3] == '+' else '-' gRNA = data[5] score = data[6] # Ensure this index exists bed_file.write(f"{chrom}\t{start}\t{end}\t{gRNA}\t{score}\t{strand}\n") except ValueError as e: print(f"Skipping an item due to error: {e}") continue