synChat / app.py
jayman1466's picture
Update app.py
bb74fd7
import gradio as gr
import os
import pandas as pd
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from pydantic import BaseModel, Field
from langchain.tools import BaseTool, StructuredTool, Tool, tool
from langchain.memory import ChatMessageHistory
from langchain.agents import initialize_agent
from langchain.agents import AgentType
from langchain.agents import load_tools
#from langchain.tools import PubmedQueryRun
from langchain.utilities import WikipediaAPIWrapper
from langchain.agents import create_pandas_dataframe_agent
from Bio import Entrez, SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from Bio.SeqFeature import SeqFeature, FeatureLocation
import random
#set API key
os.environ["OPENAI_API_KEY"] = "" #openai key
#prep database of reactions
biochem_data_url = "https://bkms.brenda-enzymes.org/download/Reactions_BKMS.tar.gz"
biochem_df = pd.read_csv(biochem_data_url, compression='gzip', header=0, sep="\t")
def biosynthesize(molecule,organism):
molecule = molecule
organism = organism
#initialize LLM
llm = OpenAI(temperature=0.5)
#initialize memory
history = ChatMessageHistory()
#create prompt template for an openAI search
prompt = PromptTemplate(
input_variables=["molecule",],
template="how do you biosynthesize {molecule}. Output a table of necessary genes with two columns: Gene and Function",
)
#and initiate its associated chain
chain = LLMChain(llm=llm, prompt=prompt)
this_output = (chain.run(molecule))
#parse this output into a dataframe
this_output = this_output.split('\n')
df_array = []
for line in this_output:
if len(line) > 0:
df_dict = {}
line = line.split(' | ')
if line[0] != 'Gene' or line[0] != 'gene' or line[0] != '----': #fucking weird ass edge cases
df_dict['gene'] = line[0]
if len(line) > 1:
df_dict['function'] = line[1]
df_array.append(df_dict)
#next augment this table by asking wikipedia
#initialize tools
#schema for wikipedia search
class SearchInput(BaseModel):
query: str = Field(description="should be molecule name")
wikipedia = WikipediaAPIWrapper()
tools = [
Tool.from_function(
func=wikipedia.run,
name = "SearchWikipedia",
description="Searching wikipedia for genes that produce a molecule",
args_schema=SearchInput
# coroutine= ... <- you can specify an async method if desired as well
),
]
#initialize agent for the wikipedia search
#agent = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)
#agent.run('how do you biosynthesize {}. output the gene names.'.format(molecule))
gene_df = pd.DataFrame(df_array)
#pull out just the needed columns from the reaction database for now to make it easier
biochem_reactions = biochem_df[['Reaction', 'Reaction_ID_MetaCyc', 'Recommended_Name']]
#initialize the pandas agent
pandas_agent = create_pandas_dataframe_agent(OpenAI(temperature=0), biochem_reactions, verbose=False)
#figure out the EC number of the gene
new_array = []
for i, gene in gene_df.iterrows():
answer = pandas_agent.run('which element most closely matches {}'.format(gene['function']))
biochem_info = biochem_df[(biochem_df.Reaction == answer) | (biochem_df.Reaction_ID_MetaCyc == answer) | (biochem_df.Recommended_Name == answer)]
#see if anything was even found
if biochem_info.shape[0] > 0:
new_dict = {}
new_dict['EC_Number'] = biochem_info.iloc[0]['EC_Number']
new_dict['gene'] = gene['gene']
new_dict['function'] = gene['function']
new_array.append(new_dict)
EC_df = pd.DataFrame(new_array)
#find protein sequences for these predictions
Entrez.email = 'jayman1466@gmail.com'
gb_array = []
for i,gene in EC_df.iterrows():
#search NCBI for any record available
gb_dict = {}
info = Entrez.einfo()
info = Entrez.esearch(db = "protein",term = gene['EC_Number'])
record = Entrez.read(info)
#arbitrarily pick the first ID for now
this_id = record['IdList'][0]
#pull the sequence of that gene
handle = Entrez.efetch(
db = "protein", id = this_id, rettype = "fasta")
record = SeqIO.read( handle, "fasta" )
gb_dict['sequence'] = str(record.seq)
gb_dict['actual_name'] = record.name
gb_dict['ID'] = record.id
gb_dict['EC'] = gene['EC_Number']
gb_dict['desired_name'] = gene['gene']
gb_array.append(gb_dict)
#start to create the actual genbank file
#to create random DNA sequences
def DNA(length):
return ''.join(random.choice('CGTA') for _ in range(length))
#back translate
def orf_recode(sequence):
sequence = sequence.upper()
this_dict = {"A":"GCA","C":"TGT","D":"GAT","E":"GAA","F":"TTT","G":"GGA","H":"CAC","I":"ATA","K":"AAA","L":"CTT","M":"ATG","N":"AAT","P":"CCC","Q":"CAA","R":"AGG","S":"TCA","T":"ACA","V":"GTA","Y":"TAT","W":"TGG","*":"TAA"};
output=''
for c in sequence:
output+=this_dict[c]
output+='TAA'
return output
current_DNA_coord = 0
this_name = molecule
#initiate a record for a genbank file output
record = SeqRecord(Seq(""),
id='1243',
name=this_name,
description='Generated with synXNA',
annotations={"molecule_type": "DNA"})
full_sequence = ""
#iterate through the individual genes
for item in gb_array:
#promoter
pro = DNA(50)
feature = SeqFeature(FeatureLocation(start=current_DNA_coord, end=current_DNA_coord + len(pro)), type='promoter', qualifiers={'label':'promoter'})
record.features.append(feature)
full_sequence = full_sequence + pro
current_DNA_coord = current_DNA_coord + len(pro)
rbs = DNA(35)
feature = SeqFeature(FeatureLocation(start=current_DNA_coord, end=current_DNA_coord + len(rbs)), type='RBS', qualifiers={'label':'RBS'})
record.features.append(feature)
full_sequence = full_sequence + rbs
current_DNA_coord = current_DNA_coord + len(rbs)
cds = item['sequence']
cds_nuc = orf_recode(cds)
feature = SeqFeature(FeatureLocation(start=current_DNA_coord, end=current_DNA_coord + len(cds_nuc)), type='CDS', qualifiers={'label':item['EC'] + " " + item['desired_name'], 'translation':cds})
record.features.append(feature)
full_sequence = full_sequence + cds_nuc
current_DNA_coord = current_DNA_coord + len(cds_nuc)
record.seq = Seq(full_sequence)
#write genbank file
output_file = open('{} Biosynthesis.gb'.format(molecule), 'w')
SeqIO.write(record, output_file, 'genbank')
output_file.close()
record = open('{} Biosynthesis.gb'.format(molecule), "r")
return(record.read())
#create the interface
demo = gr.Interface(
biosynthesize,
inputs=[gr.Textbox(lines=1, placeholder="Biosynthesize this molecule"),gr.Textbox(lines=1, placeholder="In this bacteria")],
outputs="text",
title="What do you want to make",
)
demo.launch()