Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import os | |
| import pandas as pd | |
| from langchain.llms import OpenAI | |
| from langchain.prompts import PromptTemplate | |
| from langchain.chains import LLMChain | |
| from pydantic import BaseModel, Field | |
| from langchain.tools import BaseTool, StructuredTool, Tool, tool | |
| from langchain.memory import ChatMessageHistory | |
| from langchain.agents import initialize_agent | |
| from langchain.agents import AgentType | |
| from langchain.agents import load_tools | |
| #from langchain.tools import PubmedQueryRun | |
| from langchain.utilities import WikipediaAPIWrapper | |
| from langchain.agents import create_pandas_dataframe_agent | |
| from Bio import Entrez, SeqIO | |
| from Bio.Seq import Seq | |
| from Bio.SeqRecord import SeqRecord | |
| from Bio.SeqFeature import SeqFeature, FeatureLocation | |
| import random | |
| #set API key | |
| os.environ["OPENAI_API_KEY"] = "" #openai key | |
| #prep database of reactions | |
| biochem_data_url = "https://bkms.brenda-enzymes.org/download/Reactions_BKMS.tar.gz" | |
| biochem_df = pd.read_csv(biochem_data_url, compression='gzip', header=0, sep="\t") | |
| def biosynthesize(molecule,organism): | |
| molecule = molecule | |
| organism = organism | |
| #initialize LLM | |
| llm = OpenAI(temperature=0.5) | |
| #initialize memory | |
| history = ChatMessageHistory() | |
| #create prompt template for an openAI search | |
| prompt = PromptTemplate( | |
| input_variables=["molecule",], | |
| template="how do you biosynthesize {molecule}. Output a table of necessary genes with two columns: Gene and Function", | |
| ) | |
| #and initiate its associated chain | |
| chain = LLMChain(llm=llm, prompt=prompt) | |
| this_output = (chain.run(molecule)) | |
| #parse this output into a dataframe | |
| this_output = this_output.split('\n') | |
| df_array = [] | |
| for line in this_output: | |
| if len(line) > 0: | |
| df_dict = {} | |
| line = line.split(' | ') | |
| if line[0] != 'Gene' or line[0] != 'gene' or line[0] != '----': #fucking weird ass edge cases | |
| df_dict['gene'] = line[0] | |
| if len(line) > 1: | |
| df_dict['function'] = line[1] | |
| df_array.append(df_dict) | |
| #next augment this table by asking wikipedia | |
| #initialize tools | |
| #schema for wikipedia search | |
| class SearchInput(BaseModel): | |
| query: str = Field(description="should be molecule name") | |
| wikipedia = WikipediaAPIWrapper() | |
| tools = [ | |
| Tool.from_function( | |
| func=wikipedia.run, | |
| name = "SearchWikipedia", | |
| description="Searching wikipedia for genes that produce a molecule", | |
| args_schema=SearchInput | |
| # coroutine= ... <- you can specify an async method if desired as well | |
| ), | |
| ] | |
| #initialize agent for the wikipedia search | |
| #agent = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True) | |
| #agent.run('how do you biosynthesize {}. output the gene names.'.format(molecule)) | |
| gene_df = pd.DataFrame(df_array) | |
| #pull out just the needed columns from the reaction database for now to make it easier | |
| biochem_reactions = biochem_df[['Reaction', 'Reaction_ID_MetaCyc', 'Recommended_Name']] | |
| #initialize the pandas agent | |
| pandas_agent = create_pandas_dataframe_agent(OpenAI(temperature=0), biochem_reactions, verbose=False) | |
| #figure out the EC number of the gene | |
| new_array = [] | |
| for i, gene in gene_df.iterrows(): | |
| answer = pandas_agent.run('which element most closely matches {}'.format(gene['function'])) | |
| biochem_info = biochem_df[(biochem_df.Reaction == answer) | (biochem_df.Reaction_ID_MetaCyc == answer) | (biochem_df.Recommended_Name == answer)] | |
| #see if anything was even found | |
| if biochem_info.shape[0] > 0: | |
| new_dict = {} | |
| new_dict['EC_Number'] = biochem_info.iloc[0]['EC_Number'] | |
| new_dict['gene'] = gene['gene'] | |
| new_dict['function'] = gene['function'] | |
| new_array.append(new_dict) | |
| EC_df = pd.DataFrame(new_array) | |
| #find protein sequences for these predictions | |
| Entrez.email = 'jayman1466@gmail.com' | |
| gb_array = [] | |
| for i,gene in EC_df.iterrows(): | |
| #search NCBI for any record available | |
| gb_dict = {} | |
| info = Entrez.einfo() | |
| info = Entrez.esearch(db = "protein",term = gene['EC_Number']) | |
| record = Entrez.read(info) | |
| #arbitrarily pick the first ID for now | |
| this_id = record['IdList'][0] | |
| #pull the sequence of that gene | |
| handle = Entrez.efetch( | |
| db = "protein", id = this_id, rettype = "fasta") | |
| record = SeqIO.read( handle, "fasta" ) | |
| gb_dict['sequence'] = str(record.seq) | |
| gb_dict['actual_name'] = record.name | |
| gb_dict['ID'] = record.id | |
| gb_dict['EC'] = gene['EC_Number'] | |
| gb_dict['desired_name'] = gene['gene'] | |
| gb_array.append(gb_dict) | |
| #start to create the actual genbank file | |
| #to create random DNA sequences | |
| def DNA(length): | |
| return ''.join(random.choice('CGTA') for _ in range(length)) | |
| #back translate | |
| def orf_recode(sequence): | |
| sequence = sequence.upper() | |
| this_dict = {"A":"GCA","C":"TGT","D":"GAT","E":"GAA","F":"TTT","G":"GGA","H":"CAC","I":"ATA","K":"AAA","L":"CTT","M":"ATG","N":"AAT","P":"CCC","Q":"CAA","R":"AGG","S":"TCA","T":"ACA","V":"GTA","Y":"TAT","W":"TGG","*":"TAA"}; | |
| output='' | |
| for c in sequence: | |
| output+=this_dict[c] | |
| output+='TAA' | |
| return output | |
| current_DNA_coord = 0 | |
| this_name = molecule | |
| #initiate a record for a genbank file output | |
| record = SeqRecord(Seq(""), | |
| id='1243', | |
| name=this_name, | |
| description='Generated with synXNA', | |
| annotations={"molecule_type": "DNA"}) | |
| full_sequence = "" | |
| #iterate through the individual genes | |
| for item in gb_array: | |
| #promoter | |
| pro = DNA(50) | |
| feature = SeqFeature(FeatureLocation(start=current_DNA_coord, end=current_DNA_coord + len(pro)), type='promoter', qualifiers={'label':'promoter'}) | |
| record.features.append(feature) | |
| full_sequence = full_sequence + pro | |
| current_DNA_coord = current_DNA_coord + len(pro) | |
| rbs = DNA(35) | |
| feature = SeqFeature(FeatureLocation(start=current_DNA_coord, end=current_DNA_coord + len(rbs)), type='RBS', qualifiers={'label':'RBS'}) | |
| record.features.append(feature) | |
| full_sequence = full_sequence + rbs | |
| current_DNA_coord = current_DNA_coord + len(rbs) | |
| cds = item['sequence'] | |
| cds_nuc = orf_recode(cds) | |
| feature = SeqFeature(FeatureLocation(start=current_DNA_coord, end=current_DNA_coord + len(cds_nuc)), type='CDS', qualifiers={'label':item['EC'] + " " + item['desired_name'], 'translation':cds}) | |
| record.features.append(feature) | |
| full_sequence = full_sequence + cds_nuc | |
| current_DNA_coord = current_DNA_coord + len(cds_nuc) | |
| record.seq = Seq(full_sequence) | |
| #write genbank file | |
| output_file = open('{} Biosynthesis.gb'.format(molecule), 'w') | |
| SeqIO.write(record, output_file, 'genbank') | |
| output_file.close() | |
| record = open('{} Biosynthesis.gb'.format(molecule), "r") | |
| return(record.read()) | |
| #create the interface | |
| demo = gr.Interface( | |
| biosynthesize, | |
| inputs=[gr.Textbox(lines=1, placeholder="Biosynthesize this molecule"),gr.Textbox(lines=1, placeholder="In this bacteria")], | |
| outputs="text", | |
| title="What do you want to make", | |
| ) | |
| demo.launch() | |