Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import pandas as pd | |
from langchain.llms import OpenAI | |
from langchain.prompts import PromptTemplate | |
from langchain.chains import LLMChain | |
from pydantic import BaseModel, Field | |
from langchain.tools import BaseTool, StructuredTool, Tool, tool | |
from langchain.memory import ChatMessageHistory | |
from langchain.agents import initialize_agent | |
from langchain.agents import AgentType | |
from langchain.agents import load_tools | |
#from langchain.tools import PubmedQueryRun | |
from langchain.utilities import WikipediaAPIWrapper | |
from langchain.agents import create_pandas_dataframe_agent | |
from Bio import Entrez, SeqIO | |
from Bio.Seq import Seq | |
from Bio.SeqRecord import SeqRecord | |
from Bio.SeqFeature import SeqFeature, FeatureLocation | |
import random | |
#set API key | |
os.environ["OPENAI_API_KEY"] = "" #openai key | |
#prep database of reactions | |
biochem_data_url = "https://bkms.brenda-enzymes.org/download/Reactions_BKMS.tar.gz" | |
biochem_df = pd.read_csv(biochem_data_url, compression='gzip', header=0, sep="\t") | |
def biosynthesize(molecule,organism): | |
molecule = molecule | |
organism = organism | |
#initialize LLM | |
llm = OpenAI(temperature=0.5) | |
#initialize memory | |
history = ChatMessageHistory() | |
#create prompt template for an openAI search | |
prompt = PromptTemplate( | |
input_variables=["molecule",], | |
template="how do you biosynthesize {molecule}. Output a table of necessary genes with two columns: Gene and Function", | |
) | |
#and initiate its associated chain | |
chain = LLMChain(llm=llm, prompt=prompt) | |
this_output = (chain.run(molecule)) | |
#parse this output into a dataframe | |
this_output = this_output.split('\n') | |
df_array = [] | |
for line in this_output: | |
if len(line) > 0: | |
df_dict = {} | |
line = line.split(' | ') | |
if line[0] != 'Gene' or line[0] != 'gene' or line[0] != '----': #fucking weird ass edge cases | |
df_dict['gene'] = line[0] | |
if len(line) > 1: | |
df_dict['function'] = line[1] | |
df_array.append(df_dict) | |
#next augment this table by asking wikipedia | |
#initialize tools | |
#schema for wikipedia search | |
class SearchInput(BaseModel): | |
query: str = Field(description="should be molecule name") | |
wikipedia = WikipediaAPIWrapper() | |
tools = [ | |
Tool.from_function( | |
func=wikipedia.run, | |
name = "SearchWikipedia", | |
description="Searching wikipedia for genes that produce a molecule", | |
args_schema=SearchInput | |
# coroutine= ... <- you can specify an async method if desired as well | |
), | |
] | |
#initialize agent for the wikipedia search | |
#agent = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True) | |
#agent.run('how do you biosynthesize {}. output the gene names.'.format(molecule)) | |
gene_df = pd.DataFrame(df_array) | |
#pull out just the needed columns from the reaction database for now to make it easier | |
biochem_reactions = biochem_df[['Reaction', 'Reaction_ID_MetaCyc', 'Recommended_Name']] | |
#initialize the pandas agent | |
pandas_agent = create_pandas_dataframe_agent(OpenAI(temperature=0), biochem_reactions, verbose=False) | |
#figure out the EC number of the gene | |
new_array = [] | |
for i, gene in gene_df.iterrows(): | |
answer = pandas_agent.run('which element most closely matches {}'.format(gene['function'])) | |
biochem_info = biochem_df[(biochem_df.Reaction == answer) | (biochem_df.Reaction_ID_MetaCyc == answer) | (biochem_df.Recommended_Name == answer)] | |
#see if anything was even found | |
if biochem_info.shape[0] > 0: | |
new_dict = {} | |
new_dict['EC_Number'] = biochem_info.iloc[0]['EC_Number'] | |
new_dict['gene'] = gene['gene'] | |
new_dict['function'] = gene['function'] | |
new_array.append(new_dict) | |
EC_df = pd.DataFrame(new_array) | |
#find protein sequences for these predictions | |
Entrez.email = 'jayman1466@gmail.com' | |
gb_array = [] | |
for i,gene in EC_df.iterrows(): | |
#search NCBI for any record available | |
gb_dict = {} | |
info = Entrez.einfo() | |
info = Entrez.esearch(db = "protein",term = gene['EC_Number']) | |
record = Entrez.read(info) | |
#arbitrarily pick the first ID for now | |
this_id = record['IdList'][0] | |
#pull the sequence of that gene | |
handle = Entrez.efetch( | |
db = "protein", id = this_id, rettype = "fasta") | |
record = SeqIO.read( handle, "fasta" ) | |
gb_dict['sequence'] = str(record.seq) | |
gb_dict['actual_name'] = record.name | |
gb_dict['ID'] = record.id | |
gb_dict['EC'] = gene['EC_Number'] | |
gb_dict['desired_name'] = gene['gene'] | |
gb_array.append(gb_dict) | |
#start to create the actual genbank file | |
#to create random DNA sequences | |
def DNA(length): | |
return ''.join(random.choice('CGTA') for _ in range(length)) | |
#back translate | |
def orf_recode(sequence): | |
sequence = sequence.upper() | |
this_dict = {"A":"GCA","C":"TGT","D":"GAT","E":"GAA","F":"TTT","G":"GGA","H":"CAC","I":"ATA","K":"AAA","L":"CTT","M":"ATG","N":"AAT","P":"CCC","Q":"CAA","R":"AGG","S":"TCA","T":"ACA","V":"GTA","Y":"TAT","W":"TGG","*":"TAA"}; | |
output='' | |
for c in sequence: | |
output+=this_dict[c] | |
output+='TAA' | |
return output | |
current_DNA_coord = 0 | |
this_name = molecule | |
#initiate a record for a genbank file output | |
record = SeqRecord(Seq(""), | |
id='1243', | |
name=this_name, | |
description='Generated with synXNA', | |
annotations={"molecule_type": "DNA"}) | |
full_sequence = "" | |
#iterate through the individual genes | |
for item in gb_array: | |
#promoter | |
pro = DNA(50) | |
feature = SeqFeature(FeatureLocation(start=current_DNA_coord, end=current_DNA_coord + len(pro)), type='promoter', qualifiers={'label':'promoter'}) | |
record.features.append(feature) | |
full_sequence = full_sequence + pro | |
current_DNA_coord = current_DNA_coord + len(pro) | |
rbs = DNA(35) | |
feature = SeqFeature(FeatureLocation(start=current_DNA_coord, end=current_DNA_coord + len(rbs)), type='RBS', qualifiers={'label':'RBS'}) | |
record.features.append(feature) | |
full_sequence = full_sequence + rbs | |
current_DNA_coord = current_DNA_coord + len(rbs) | |
cds = item['sequence'] | |
cds_nuc = orf_recode(cds) | |
feature = SeqFeature(FeatureLocation(start=current_DNA_coord, end=current_DNA_coord + len(cds_nuc)), type='CDS', qualifiers={'label':item['EC'] + " " + item['desired_name'], 'translation':cds}) | |
record.features.append(feature) | |
full_sequence = full_sequence + cds_nuc | |
current_DNA_coord = current_DNA_coord + len(cds_nuc) | |
record.seq = Seq(full_sequence) | |
#write genbank file | |
output_file = open('{} Biosynthesis.gb'.format(molecule), 'w') | |
SeqIO.write(record, output_file, 'genbank') | |
output_file.close() | |
record = open('{} Biosynthesis.gb'.format(molecule), "r") | |
return(record.read()) | |
#create the interface | |
demo = gr.Interface( | |
biosynthesize, | |
inputs=[gr.Textbox(lines=1, placeholder="Biosynthesize this molecule"),gr.Textbox(lines=1, placeholder="In this bacteria")], | |
outputs="text", | |
title="What do you want to make", | |
) | |
demo.launch() | |