Spaces:
Sleeping
Sleeping
| import base64 | |
| import json | |
| from os.path import join | |
| import pandas as pd | |
| from langchain_core.messages import SystemMessage, HumanMessage | |
| from langchain_core.rate_limiters import InMemoryRateLimiter | |
| from langchain_openai.chat_models import ChatOpenAI | |
| from langfuse import Langfuse, get_client | |
| from langfuse.langchain import CallbackHandler | |
| from langgraph.graph import START, StateGraph, MessagesState | |
| from langgraph.prebuilt import tools_condition | |
| from langgraph.prebuilt import ToolNode | |
| class Agent: | |
| """ | |
| Class representing a basic agent that can answer questions. | |
| """ | |
| def __init__( | |
| self, | |
| model: str, | |
| tools: list, | |
| system_prompt_path: str, | |
| data_path: str, | |
| openai_api_key: str = None, | |
| langfuse_callback_handler: CallbackHandler = None | |
| ): | |
| """ | |
| Initialize the agent object. | |
| :param model: The OpenAI model to use. | |
| :param tools: List of tools the agent can use. | |
| :param system_prompt_path: Path to the system prompt file. | |
| :param data_path: Data to be used by the agent. | |
| :param openai_api_key: OpenAI API key for authentication. | |
| :param langfuse_callback_handler: Langfuse callback handler for | |
| tracking and logging interactions. | |
| """ | |
| rate_limiter = InMemoryRateLimiter( | |
| # <-- Super slow! We can only make a request once every 10 seconds | |
| requests_per_second=0.1, | |
| # Wake up every 100 ms to check whether allowed to make a request, | |
| check_every_n_seconds=0.1, | |
| # Controls the maximum burst size. | |
| max_bucket_size=10, | |
| ) | |
| self.chat_model = ChatOpenAI( | |
| model=model, | |
| api_key=openai_api_key, | |
| rate_limiter=rate_limiter | |
| ) | |
| with open(system_prompt_path, "r") as file: | |
| self.system_prompt = file.read() | |
| self.data_path = data_path | |
| self.tools = tools | |
| if langfuse_callback_handler is not None: | |
| self.chat_model.callbacks = [langfuse_callback_handler] | |
| self.chat_model_with_tools = self.chat_model.bind_tools( | |
| tools=tools, | |
| parallel_tool_calls=False | |
| ) | |
| self.graph = self.__build_graph() | |
| def __call__( | |
| self, | |
| question: str, | |
| question_file: str | None | |
| ) -> tuple[str, str]: | |
| """ | |
| Reply to a question using the agent and return the agents full reply | |
| with reasoning included. | |
| :param question: The question to ask the agent. | |
| :param question_file: The file that comes with the question. | |
| :return: The agent's response. | |
| """ | |
| human_message = self.__format_human_message( | |
| question=question, | |
| question_file=question_file | |
| ) | |
| final_state = self.graph.invoke( | |
| input={ | |
| "messages": [ | |
| SystemMessage(content=self.system_prompt), | |
| human_message | |
| ] | |
| }, | |
| config={ | |
| "callbacks": self.chat_model.callbacks | |
| } | |
| ) | |
| content: str = final_state["messages"][-1].content | |
| if content.startswith("```json"): | |
| # If the reply starts with a code fence, remove it | |
| content = content[7:-3] | |
| reply = json.loads(content) | |
| return reply["reasoning"], reply["answer"] | |
| def __build_graph(self): | |
| """ | |
| Build the graph for the agent. | |
| """ | |
| builder = StateGraph(MessagesState) | |
| # Define nodes: these do the work | |
| builder.add_node("assistant", self.__assistant) | |
| builder.add_node("tools", ToolNode(self.tools)) | |
| # Define edges: these determine how the control flow moves | |
| builder.add_edge(START, "assistant") | |
| builder.add_conditional_edges( | |
| "assistant", | |
| tools_condition, | |
| ) | |
| builder.add_edge("tools", "assistant") | |
| return builder.compile() | |
| def __assistant(self, state: MessagesState) -> MessagesState: | |
| """ | |
| The assistant function that processes the state and returns a response. | |
| :param state: The current state of the agent. | |
| :return: Updated state with the assistant's response. | |
| """ | |
| response = self.chat_model_with_tools.invoke(state["messages"]) | |
| return {"messages": [response]} | |
| def __format_human_message( | |
| self, | |
| question: str, | |
| question_file: str | None | |
| ) -> HumanMessage: | |
| """ | |
| Format the human message for the agent. | |
| :param question: The question to ask the agent. | |
| :param question_file: The file that comes with the question. | |
| :return: Formatted HumanMessage. | |
| """ | |
| if question_file is None or question_file == '': | |
| human_message = HumanMessage(content=question) | |
| else: | |
| if '.png' in question_file: | |
| with open(join(self.data_path, question_file), "rb") as file: | |
| file_content = base64.b64encode(file.read()).\ | |
| decode("utf-8") | |
| human_message = HumanMessage( | |
| content=[ | |
| { | |
| 'type': 'text', | |
| 'text': question | |
| }, | |
| { | |
| 'type': 'image', | |
| 'source_type': 'base64', | |
| 'data': file_content, | |
| "mime_type": "image/png" | |
| } | |
| ] | |
| ) | |
| elif '.mp3' in question_file: | |
| # There is no support for audio fileswhen using gpt-4o | |
| # So, I will use a tools to record the .mp3 file in text | |
| human_message = HumanMessage( | |
| content=[ | |
| { | |
| 'type': 'text', | |
| 'text': f'''{question}\n\nHere is the audio file: | |
| ```audio\n{question_file}\n```''' | |
| }, | |
| ] | |
| ) | |
| elif '.py' in question_file: | |
| with open(join(self.data_path, question_file), "r") as file: | |
| file_content = file.read() | |
| human_message = HumanMessage( | |
| content=[ | |
| { | |
| 'type': 'text', | |
| 'text': f'''{question}\n\nHere is the code: | |
| ```python\n{file_content}\n```''' | |
| }, | |
| ] | |
| ) | |
| elif '.xlsx' in question_file: | |
| data = pd.read_excel( | |
| join(self.data_path, question_file), | |
| ) | |
| data = data.to_string() | |
| human_message = HumanMessage( | |
| content=[ | |
| { | |
| 'type': 'text', | |
| 'text': f'''{question}\n\nHere is the data: | |
| ```\n{data}\n```''' | |
| }, | |
| ] | |
| ) | |
| return human_message | |
| if __name__ == "__main__": | |
| import os | |
| from langchain_community.tools import DuckDuckGoSearchResults | |
| from tools import multiply, add, subtract, divide, modulus | |
| # Initialize Langfuse client with constructor arguments | |
| Langfuse( | |
| public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"), | |
| secret_key=os.environ.get("LANGFUSE_SECRET_KEY"), | |
| host='https://cloud.langfuse.com' | |
| ) | |
| # Get the configured client instance | |
| langfuse = get_client() | |
| # Initialize the Langfuse handler | |
| langfuse_handler = CallbackHandler() | |
| tools = [multiply, add, subtract, divide, modulus] | |
| tools.append( | |
| DuckDuckGoSearchResults() | |
| ) | |
| agent = Agent( | |
| model="gpt-4o", | |
| tools=tools, | |
| system_prompt_path="prompts/system_prompt.txt", | |
| openai_api_key=os.environ.get("OPENAI_API_KEY"), | |
| langfuse_callback_handler=langfuse_handler | |
| ) | |
| response = agent( | |
| question=""" | |
| Search for Tom Cruise and summarize the results for me. | |
| """ | |
| ) | |
| print(response) | |