| | import torch |
| | from typing import Annotated, TypedDict, Literal |
| | from langchain_community.tools import DuckDuckGoSearchRun |
| | from langchain_core.tools import tool |
| | from langgraph.prebuilt import ToolNode, tools_condition |
| | from langgraph.graph import StateGraph, START, END |
| | from langgraph.graph.message import add_messages |
| | from langchain_core.messages import SystemMessage, trim_messages, AIMessage, HumanMessage, ToolCall |
| |
|
| | from langchain_huggingface.llms import HuggingFacePipeline |
| | from langchain_huggingface import ChatHuggingFace |
| | from langchain_core.prompts import PromptTemplate, ChatPromptTemplate |
| | from langchain_core.runnables import chain |
| | from uuid import uuid4 |
| | import re |
| | import matplotlib.pyplot as plt |
| | import spaces |
| |
|
| | from dockstring import load_target |
| | from rdkit import Chem |
| | from rdkit.Chem import AllChem, QED |
| | from rdkit.Chem import Draw |
| | from rdkit.Chem.Draw import MolsToGridImage |
| | import os, re |
| | import gradio as gr |
| | from PIL import Image |
| |
|
| | device = "cuda" if torch.cuda.is_available() else "cpu" |
| |
|
| | hf = HuggingFacePipeline.from_model_id( |
| | model_id= "microsoft/Phi-4-mini-instruct", |
| | task="text-generation", |
| | pipeline_kwargs = {"max_new_tokens": 500, "temperature": 0.4}) |
| |
|
| | chat_model = ChatHuggingFace(llm=hf) |
| |
|
| | cpuCount = os.cpu_count() |
| | print(f"Number of CPUs: {cpuCount}") |
| |
|
| | class State(TypedDict): |
| | ''' |
| | The state of the agent. |
| | ''' |
| | messages: Annotated[list, add_messages] |
| | |
| | tool_choice: tuple |
| | which_tool: int |
| | props_string: str |
| | similars_img: str |
| | loop_again: str |
| |
|
| | |
| | query_smiles: str |
| | query_task: str |
| | query_protein: str |
| |
|
| | def docking_node(state: State) -> State: |
| | ''' |
| | Docking tool: uses dockstring to dock the molecule into the protein |
| | ''' |
| | print("docking tool") |
| | print('===================================================') |
| | current_props_string = state["props_string"] |
| | query_protein = state["query_protein"].strip() |
| | query_smiles = state["query_smiles"].strip() |
| |
|
| | print(f'query_protein: {query_protein}') |
| | print(f'query_smiles: {query_smiles}') |
| |
|
| | try: |
| | target = load_target(query_protein) |
| | print("===============================================") |
| | print(f"Docking molecule with {cpuCount} cpu cores.") |
| | score, aux = target.dock(query_smiles, num_cpus = cpuCount) |
| | mol = aux['ligand'] |
| | print(f"Docking score: {score}") |
| | print("===============================================") |
| | atoms_list = "" |
| | template = mol |
| | molH = Chem.AddHs(mol) |
| | AllChem.ConstrainedEmbed(molH,template, useTethers=True) |
| | xyz_string = f"{molH.GetNumAtoms()}\n\n" |
| | for atom in molH.GetAtoms(): |
| | atoms_list += atom.GetSymbol() |
| | pos = molH.GetConformer().GetAtomPosition(atom.GetIdx()) |
| | xyz_string += f"{atom.GetSymbol()} {pos[0]} {pos[1]} {pos[2]}\n" |
| | prop_string = f"Docking score: {score} kcal/mol \n\n" |
| | prop_string += f"pose structure: {xyz_string}\n" |
| |
|
| | except: |
| | print(f"Molecule could not be docked!") |
| | prop_string = '' |
| | |
| |
|
| | current_props_string += prop_string |
| | state["props_string"] = current_props_string |
| | state["which_tool"] += 1 |
| | return state |
| |
|
| | def first_node(state: State) -> State: |
| | ''' |
| | The first node of the agent. This node receives the input and asks the LLM |
| | to determine which is the best tool to use to answer the QUERY TASK. |
| | Input: the initial prompt from the user. should contain only one of more of the following: |
| | smiles: the smiles string, task: the query task, path: the path to the file, |
| | reference: the reference smiles |
| | the value should be separated from the name by a ':' and each field should |
| | be separated from the previous one by a ','. |
| | All of these values are saved to the state |
| | Output: the tool choice |
| | ''' |
| | |
| | query_smiles = None |
| | state["query_smiles"] = query_smiles |
| | query_task = None |
| | state["query_task"] = query_task |
| | query_protein = None |
| | state["query_protein"] = query_protein |
| | |
| | state['similars_img'] = None |
| | props_string = "" |
| | state["props_string"] = props_string |
| | state["loop_again"] = None |
| |
|
| | raw_input = state["messages"][-1].content |
| | |
| | parts = raw_input.split(',') |
| | for part in parts: |
| | if 'query_smiles' in part: |
| | query_smiles = part.split(':')[1] |
| | if query_smiles.lower() == 'none': |
| | query_smiles = None |
| | state["query_smiles"] = query_smiles |
| | if 'query_task' in part: |
| | query_task = part.split(':')[1] |
| | state["query_task"] = query_task |
| | if 'query_protein' in part: |
| | query_protein = part.split(':')[1] |
| | state["query_protein"] = query_protein |
| |
|
| | prompt = f'For the QUERY_TASK given below, determine if one or two of the tools descibed below \ |
| | can complete the task. If so, reply with only the tool names followed by "#". If two tools \ |
| | are required, reply with both tool names separated by a comma and followed by "#". \ |
| | If the tools cannot complete the task, reply with "None #".\n \ |
| | QUERY_TASK: {query_task}.\n \ |
| | The information provided by the user is:\n \ |
| | QUERY_SMILES: {query_smiles}.\n \ |
| | QUERY_PROTEIN: {query_protein}.\n \ |
| | Tools: \n \ |
| | docking_tool: uses dockstring to dock the molecule into the protein, producing a pose structure and a docking score.\n \ |
| | ' |
| |
|
| | res = chat_model.invoke(prompt) |
| | print(res) |
| |
|
| | tool_choices = str(res).split('<|assistant|>')[1].split('#')[0].strip() |
| | tool_choices = tool_choices.split(',') |
| | print(tool_choices) |
| |
|
| | if len(tool_choices) == 1: |
| | tool1 = tool_choices[0].strip().lower() |
| | if ('autodock' in tool1) or ('docking' in tool1): |
| | tool1 = 'docking_tool' |
| | if tool1.lower() == 'none': |
| | tool_choice = (None, None) |
| | else: |
| | tool_choice = (tool1, None) |
| | elif len(tool_choices) == 2: |
| | tool1 = tool_choices[0].strip().lower() |
| | tool2 = tool_choices[1].strip().lower() |
| | if ('autodock' in tool1) or ('docking' in tool1): |
| | tool1 = 'docking_tool' |
| | if ('autodock' in tool2) or ('docking' in tool2): |
| | tool2 = 'docking_tool' |
| | if tool1.lower() == 'none' and tool2.lower() == 'none': |
| | tool_choice = (None, None) |
| | elif tool1.lower() == 'none' and tool2.lower() != 'none': |
| | tool_choice = (None, tool2) |
| | elif tool2.lower() == 'none' and tool1.lower() != 'none': |
| | tool_choice = (tool1, None) |
| | else: |
| | tool_choice = (tool1, tool2) |
| | else: |
| | tool_choice = (None, None) |
| |
|
| | state["tool_choice"] = tool_choice |
| | state["which_tool"] = 0 |
| | print(f"First Node. The chosen tools are: {tool_choice}") |
| |
|
| | return state |
| |
|
| | def retry_node(state: State) -> State: |
| | ''' |
| | If the previous loop of the agent does not get enough informartion from the |
| | tools to answer the query, this node is called to retry the previous loop. |
| | Input: the previous loop of the agent. |
| | Output: the tool choice |
| | ''' |
| | query_task = state["query_task"] |
| | query_smiles = state["query_smiles"] |
| | query_protein = state["query_protein"] |
| |
|
| | prompt = f'You were previously given the QUERY_TASK below, and asked to determine if one \ |
| | or two of the tools descibed below could complete the task. The tool choices did not succeed. \ |
| | Please re-examine the tool choices and determine if one or two of the tools descibed below \ |
| | can complete the task. If so, reply with only the tool names followed by "#". If two tools \ |
| | are required, reply with both tool names separated by a comma and followed by "#". \ |
| | If the tools cannot complete the task, reply with "None #".\n \ |
| | The information provided by the user is:\n \ |
| | QUERY_SMILES: {query_smiles}.\n \ |
| | QUERY_PROTEIN: {query_protein}.\n \ |
| | The task is: \ |
| | QUERY_TASK: {query_task}.\n \ |
| | Tool options: \n \ |
| | docking_tool: uses dockstring to dock the molecule into the protein using AutoDock Vina, producing a pose structure and a docking score.\n \ |
| | ' |
| |
|
| | res = chat_model.invoke(prompt) |
| |
|
| | tool_choices = str(res).split('<|assistant|>')[1].split('#')[0].strip() |
| | tool_choices = tool_choices.split(',') |
| | if len(tool_choices) == 1: |
| | tool1 = tool_choices[0].strip() |
| | if tool1.lower() == 'none': |
| | tool_choice = (None, None) |
| | else: |
| | tool_choice = (tool1, None) |
| | elif len(tool_choices) == 2: |
| | tool1 = tool_choices[0].strip() |
| | tool2 = tool_choices[1].strip() |
| | if tool1.lower() == 'none' and tool2.lower() == 'none': |
| | tool_choice = (None, None) |
| | elif tool1.lower() == 'none' and tool2.lower() != 'none': |
| | tool_choice = (None, tool2) |
| | elif tool2.lower() == 'none' and tool1.lower() != 'none': |
| | tool_choice = (tool1, None) |
| | else: |
| | tool_choice = (tool1, tool2) |
| | else: |
| | tool_choice = (None, None) |
| |
|
| | state["tool_choice"] = tool_choice |
| | state["which_tool"] = 0 |
| | print(f"The chosen tools are (Retry): {tool_choice}") |
| |
|
| | return state |
| |
|
| | def loop_node(state: State) -> State: |
| | ''' |
| | This node accepts the tool returns and decides if it needs to call another |
| | tool or go on to the parser node. |
| | Input: the tool returns. |
| | Output: the next node to call. |
| | ''' |
| | return state |
| |
|
| | def parser_node(state: State) -> State: |
| | ''' |
| | This is the third node in the agent. It receives the output from the tool, |
| | puts it into a prompt as CONTEXT, and asks the LLM to answer the original |
| | query. |
| | Input: the output from the tool. |
| | Output: the answer to the original query. |
| | ''' |
| | props_string = state["props_string"] |
| | query_task = state["query_task"] |
| | tool_choice = state["tool_choice"] |
| |
|
| | if type(tool_choice) != tuple and tool_choice == None: |
| | state["loop_again"] = "finish_gracefully" |
| | return state |
| | elif type(tool_choice) == tuple and (tool_choice[0] == None) and (tool_choice[1] == None): |
| | state["loop_again"] = "finish_gracefully" |
| | return state |
| |
|
| | prompt = f'Using the CONTEXT below, answer the original query, which \ |
| | was to answer the QUERY_TASK. Remember that the docking score was obtained with AutoDock Vina. End your answer with a "#" \ |
| | CONTEXT: {props_string}.\n \ |
| | QUERY_TASK: {query_task}.\n ' |
| |
|
| | res = chat_model.invoke(prompt) |
| | trial_answer = str(res).split('<|assistant|>')[1] |
| | print('parser 1 ', trial_answer) |
| | state["messages"] = res |
| |
|
| | check_prompt = f'Determine if the TRIAL ANSWER below answers the original \ |
| | QUERY TASK. If it does, respond with "PROCEED #" . If the TRIAL ANSWER did not \ |
| | answer the QUERY TASK, respond with "LOOP #" \n \ |
| | Only loop again if the TRIAL ANSWER did not answer the QUERY TASK. \ |
| | TRIAL ANSWER: {trial_answer}.\n \ |
| | QUERY_TASK: {query_task}.\n' |
| |
|
| | res = chat_model.invoke(check_prompt) |
| | print('parser, loop again? ', res) |
| |
|
| | if str(res).split('<|assistant|>')[1].split('#')[0].strip().lower() == "loop": |
| | state["loop_again"] = "loop_again" |
| | return state |
| | elif str(res).split('<|assistant|>')[1].split('#')[0].strip().lower() == "proceed": |
| | state["loop_again"] = None |
| | print('trying to break loop') |
| | elif "proceed" in str(res).split('<|assistant|>')[1].lower(): |
| | state["loop_again"] = None |
| | print('trying to break loop') |
| |
|
| | return state |
| |
|
| | def reflect_node(state: State) -> State: |
| | ''' |
| | This is the fourth node of the agent. It recieves the LLMs previous answer and |
| | tries to improve it. |
| | Input: the LLMs last answer. |
| | Output: the improved answer. |
| | ''' |
| | previous_answer = state["messages"][-1].content |
| | props_string = state["props_string"] |
| |
|
| | prompt = f'Look at the PREVIOUS ANSWER below which you provided and the \ |
| | TOOL RESULTS. Write an improved answer based on the PREVIOUS ANSWER and the \ |
| | TOOL RESULTS by adding additional clarifying and enriching information. Remember that the docking score was obtained with AutoDock Vina. \ |
| | End your new answer with a "#" \ |
| | PREVIOUS ANSWER: {previous_answer}.\n \ |
| | TOOL RESULTS: {props_string}. ' |
| |
|
| | res = chat_model.invoke(prompt) |
| |
|
| | return {"messages": res} |
| |
|
| | def graceful_exit_node(state: State) -> State: |
| | ''' |
| | Called when the Agent cannot assign any tools for the task |
| | ''' |
| | props_string = state["props_string"] |
| | prompt = f'Summarize the information in the CONTEXT, including any useful chemical information. Start your answer with: \ |
| | Here is what I found: \n \ |
| | CONTEXT: {props_string}' |
| |
|
| | res = chat_model.invoke(prompt) |
| |
|
| | return {"messages": res} |
| | |
| |
|
| | def get_chemtool(state): |
| | ''' |
| | ''' |
| | which_tool = state["which_tool"] |
| | tool_choice = state["tool_choice"] |
| | print('in get_chemtool ',tool_choice) |
| | if tool_choice == None: |
| | return None |
| | if which_tool == 0 or which_tool == 1: |
| | current_tool = tool_choice[which_tool] |
| | elif which_tool > 1: |
| | current_tool = None |
| |
|
| | return current_tool |
| |
|
| | def loop_or_not(state): |
| | ''' |
| | ''' |
| | print(f"(line 417) Loop? {state['loop_again']}") |
| | if state["loop_again"] == "loop_again": |
| | return True |
| | elif state["loop_again"] == "finish_gracefully": |
| | return 'lets_get_outta_here' |
| | else: |
| | return False |
| |
|
| | builder = StateGraph(State) |
| | |
| | builder.add_node("first_node", first_node) |
| | builder.add_node("retry_node", retry_node) |
| | builder.add_node("loop_node", loop_node) |
| | builder.add_node("parser_node", parser_node) |
| | builder.add_node("reflect_node", reflect_node) |
| | builder.add_node("graceful_exit_node", graceful_exit_node) |
| | |
| | builder.add_node("docking_node", docking_node) |
| |
|
| | builder.add_edge(START, "first_node") |
| | builder.add_conditional_edges("first_node", get_chemtool, { |
| | "docking_tool": "docking_node", |
| | None: "parser_node"}) |
| |
|
| | builder.add_conditional_edges("retry_node", get_chemtool, { |
| | "docking_tool": "docking_node", |
| | None: "parser_node"}) |
| |
|
| | builder.add_edge("docking_node", "loop_node") |
| |
|
| | builder.add_conditional_edges("loop_node", get_chemtool, { |
| | "docking_tool" : "docking_node", |
| | "loop_again": "first_node", |
| | None: "parser_node"}) |
| |
|
| | builder.add_conditional_edges("parser_node", loop_or_not, { |
| | True: "retry_node", |
| | 'lets_get_outta_here': "graceful_exit_node", |
| | False: "reflect_node"}) |
| |
|
| | builder.add_edge("reflect_node", END) |
| | builder.add_edge("graceful_exit_node", END) |
| |
|
| | graph = builder.compile() |
| |
|
| | @spaces.GPU |
| | def DockAgent(task, smiles, protein): |
| | ''' |
| | This Agent performs one task: |
| | 1. Docks a molecule in a protein based on the molecules's SMILES string and the protein name. |
| | |
| | This docking is performed with AutoDockVina, through the DockString interface. Only a limited number of proteins are |
| | available, and they are IGF1R, JAK2, KIT, LCK, MAPK14, MAPKAPK2, MET, PTK2, PTPN1, SRC, ABL1, AKT1, AKT2, CDK2, CSF1R, |
| | EGFR, KDR, MAPK1, FGFR1, ROCK1, MAP2K1, PLK1, HSD11B1, PARP1, PDE5A, PTGS2, ACHE, MAOB, CA2, GBA, HMGCR, NOS1, REN, |
| | DHFR, ESR1, ESR2, NR3C1, PGR, PPARA, PPARD, PPARG, AR, THRB, ADAM17, F10, F2, BACE1, CASP3, MMP13, DPP4, ADRB1, ADRB2, |
| | DRD2, DRD3, ADORA2A, CYP2C9, CYP3A4, and HSP90AA1. |
| | |
| | Args: |
| | task: the specific task to perform |
| | smiles: the smiles string of the molecule to be studied |
| | protein: the name of the protein for docking |
| | Returns: |
| | replies[-1]: a text string containing the requested information. |
| | img: a blank image. |
| | ''' |
| |
|
| | |
| | if os.path.exists('Similars_image.png'): |
| | os.remove('Similars_image.png') |
| |
|
| | input = { |
| | "messages": [ |
| | HumanMessage(f'query_smiles: {smiles}, query_task: {task}, query_protein: {protein}') |
| | ] |
| | } |
| | |
| |
|
| | replies = [] |
| | for c in graph.stream(input): |
| | m = re.findall(r'[a-z]+\_node', str(c)) |
| | if len(m) != 0: |
| | try: |
| | reply = c[str(m[0])]['messages'] |
| | if 'assistant' in str(reply): |
| | reply = str(reply).split("<|assistant|>")[-1].split('#')[0].strip() |
| | replies.append(reply) |
| | except: |
| | reply = str(c).split("<|assistant|>")[-1].split('#')[0].strip() |
| | replies.append(reply) |
| | |
| | if os.path.exists('Similars_image.png'): |
| | img_loc = 'Similars_image.png' |
| | img = Image.open(img_loc) |
| | |
| | else: |
| | img = Image.new('RGB', (250, 250), color = (255, 255, 255)) |
| |
|
| | return replies[-1], img |
| |
|
| | dudes = ['IGF1R', 'JAK2', 'KIT', 'LCK', 'MAPK14', 'MAPKAPK2', 'MET', 'PTK2', 'PTPN1', 'SRC', 'ABL1', 'AKT1', 'AKT2', 'CDK2', 'CSF1R', 'EGFR', 'KDR', 'MAPK1', 'FGFR1', 'ROCK1', 'MAP2K1', 'PLK1', |
| | 'HSD11B1', 'PARP1', 'PDE5A', 'PTGS2', 'ACHE', 'MAOB', 'CA2', 'GBA', 'HMGCR', 'NOS1', 'REN', 'DHFR', 'ESR1', 'ESR2', 'NR3C1', 'PGR', 'PPARA', 'PPARD', 'PPARG', |
| | 'AR','THRB','ADAM17', 'F10', 'F2', 'BACE1', 'CASP3', 'MMP13', 'DPP4', 'ADRB1', 'ADRB2', 'DRD2', 'DRD3','ADORA2A','CYP2C9', 'CYP3A4', 'HSP90AA1'] |
| |
|
| | with gr.Blocks(fill_height=True) as forest: |
| | gr.Markdown(''' |
| | # Docking Agent |
| | - uses dockstring to dock a molecule into a protein using only a SMILES string and a protein name |
| | - produces a pose structure and a docking score |
| | ''') |
| |
|
| | with gr.Accordion("ProteinOptions", open=False): |
| | gr.Markdown(''' |
| | # Protein Options |
| | ## Kinase |
| | ### Highest quality |
| | - IGF1R, JAK2, KIT, LCK, MAPK14, MAPKAPK2, MET, PTK2, PTPN1, SRC |
| | ### Medium quality |
| | - ABL1, AKT1, AKT2, CDK2, CSF1R, EGFR, KDR, MAPK1, FGFR1, ROCK1 |
| | ### Lower quality |
| | - MAP2K1, PLK1 |
| | ## Enzyme |
| | ### Highest quality |
| | - HSD11B1, PARP1, PDE5A, PTGS2 |
| | ### Medium quality |
| | - ACHE, MAOB |
| | ### Lower quality |
| | - CA2, GBA, HMGCR, NOS1, REN, DHFR |
| | ## Nuclear Receptor |
| | ### Highest quality |
| | - ESR1, ESR2, NR3C1, PGR, PPARA, PPARD, PPARG |
| | ### Medium quality |
| | - AR |
| | ### Lower quality |
| | - THRB |
| | ## Protease |
| | ### Higher quality |
| | - ADAM17, F10, F2 |
| | ### Medium quality |
| | - BACE1, CASP3, MMP13 |
| | ### Lower quality |
| | - DPP4 |
| | ## GPCR |
| | ### Medium quality |
| | - ADRB1, ADRB2, DRD2, DRD3 |
| | ### Lower quality |
| | - ADORA2A |
| | ## Cytochrome |
| | ### Medium quality |
| | - CYP2C9, CYP3A4 |
| | ## Chaperone |
| | ### Lower quality |
| | - HSP90AA1 |
| | ''') |
| | with gr.Row(): |
| | with gr.Column(): |
| | smiles = gr.Textbox(label="Molecule SMILES of interest (optional): ", placeholder='none') |
| | protein = gr.Dropdown(dudes, label="Protein name (see options): ") |
| | task = gr.Textbox(label="Task for Agent: ") |
| | |
| | calc_btn = gr.Button(value = "Submit to Agent") |
| | with gr.Column(): |
| | props = gr.Textbox(label="Agent results: ", lines=20 ) |
| | pic = gr.Image(label="Molecule") |
| |
|
| | calc_btn.click(DockAgent, inputs = [task, smiles, protein], outputs = [props, pic]) |
| | task.submit(DockAgent, inputs = [task, smiles, protein], outputs = [props, pic]) |
| |
|
| | forest.launch(debug=False, mcp_server=True) |