kostasang's picture
Upload agent.py
44dcf66 verified
import base64
import json
from os.path import join
import pandas as pd
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_core.rate_limiters import InMemoryRateLimiter
from langchain_openai.chat_models import ChatOpenAI
from langfuse import Langfuse, get_client
from langfuse.langchain import CallbackHandler
from langgraph.graph import START, StateGraph, MessagesState
from langgraph.prebuilt import tools_condition
from langgraph.prebuilt import ToolNode
class Agent:
"""
Class representing a basic agent that can answer questions.
"""
def __init__(
self,
model: str,
tools: list,
system_prompt_path: str,
data_path: str,
openai_api_key: str = None,
langfuse_callback_handler: CallbackHandler = None
):
"""
Initialize the agent object.
:param model: The OpenAI model to use.
:param tools: List of tools the agent can use.
:param system_prompt_path: Path to the system prompt file.
:param data_path: Data to be used by the agent.
:param openai_api_key: OpenAI API key for authentication.
:param langfuse_callback_handler: Langfuse callback handler for
tracking and logging interactions.
"""
rate_limiter = InMemoryRateLimiter(
# <-- Super slow! We can only make a request once every 10 seconds
requests_per_second=0.1,
# Wake up every 100 ms to check whether allowed to make a request,
check_every_n_seconds=0.1,
# Controls the maximum burst size.
max_bucket_size=10,
)
self.chat_model = ChatOpenAI(
model=model,
api_key=openai_api_key,
rate_limiter=rate_limiter
)
with open(system_prompt_path, "r") as file:
self.system_prompt = file.read()
self.data_path = data_path
self.tools = tools
if langfuse_callback_handler is not None:
self.chat_model.callbacks = [langfuse_callback_handler]
self.chat_model_with_tools = self.chat_model.bind_tools(
tools=tools,
parallel_tool_calls=False
)
self.graph = self.__build_graph()
def __call__(
self,
question: str,
question_file: str | None
) -> tuple[str, str]:
"""
Reply to a question using the agent and return the agents full reply
with reasoning included.
:param question: The question to ask the agent.
:param question_file: The file that comes with the question.
:return: The agent's response.
"""
human_message = self.__format_human_message(
question=question,
question_file=question_file
)
final_state = self.graph.invoke(
input={
"messages": [
SystemMessage(content=self.system_prompt),
human_message
]
},
config={
"callbacks": self.chat_model.callbacks
}
)
content: str = final_state["messages"][-1].content
if content.startswith("```json"):
# If the reply starts with a code fence, remove it
content = content[7:-3]
reply = json.loads(content)
return reply["reasoning"], reply["answer"]
def __build_graph(self):
"""
Build the graph for the agent.
"""
builder = StateGraph(MessagesState)
# Define nodes: these do the work
builder.add_node("assistant", self.__assistant)
builder.add_node("tools", ToolNode(self.tools))
# Define edges: these determine how the control flow moves
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
tools_condition,
)
builder.add_edge("tools", "assistant")
return builder.compile()
def __assistant(self, state: MessagesState) -> MessagesState:
"""
The assistant function that processes the state and returns a response.
:param state: The current state of the agent.
:return: Updated state with the assistant's response.
"""
response = self.chat_model_with_tools.invoke(state["messages"])
return {"messages": [response]}
def __format_human_message(
self,
question: str,
question_file: str | None
) -> HumanMessage:
"""
Format the human message for the agent.
:param question: The question to ask the agent.
:param question_file: The file that comes with the question.
:return: Formatted HumanMessage.
"""
if question_file is None or question_file == '':
human_message = HumanMessage(content=question)
else:
if '.png' in question_file:
with open(join(self.data_path, question_file), "rb") as file:
file_content = base64.b64encode(file.read()).\
decode("utf-8")
human_message = HumanMessage(
content=[
{
'type': 'text',
'text': question
},
{
'type': 'image',
'source_type': 'base64',
'data': file_content,
"mime_type": "image/png"
}
]
)
elif '.mp3' in question_file:
# There is no support for audio fileswhen using gpt-4o
# So, I will use a tools to record the .mp3 file in text
human_message = HumanMessage(
content=[
{
'type': 'text',
'text': f'''{question}\n\nHere is the audio file:
```audio\n{question_file}\n```'''
},
]
)
elif '.py' in question_file:
with open(join(self.data_path, question_file), "r") as file:
file_content = file.read()
human_message = HumanMessage(
content=[
{
'type': 'text',
'text': f'''{question}\n\nHere is the code:
```python\n{file_content}\n```'''
},
]
)
elif '.xlsx' in question_file:
data = pd.read_excel(
join(self.data_path, question_file),
)
data = data.to_string()
human_message = HumanMessage(
content=[
{
'type': 'text',
'text': f'''{question}\n\nHere is the data:
```\n{data}\n```'''
},
]
)
return human_message
if __name__ == "__main__":
import os
from langchain_community.tools import DuckDuckGoSearchResults
from tools import multiply, add, subtract, divide, modulus
# Initialize Langfuse client with constructor arguments
Langfuse(
public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"),
secret_key=os.environ.get("LANGFUSE_SECRET_KEY"),
host='https://cloud.langfuse.com'
)
# Get the configured client instance
langfuse = get_client()
# Initialize the Langfuse handler
langfuse_handler = CallbackHandler()
tools = [multiply, add, subtract, divide, modulus]
tools.append(
DuckDuckGoSearchResults()
)
agent = Agent(
model="gpt-4o",
tools=tools,
system_prompt_path="prompts/system_prompt.txt",
openai_api_key=os.environ.get("OPENAI_API_KEY"),
langfuse_callback_handler=langfuse_handler
)
response = agent(
question="""
Search for Tom Cruise and summarize the results for me.
"""
)
print(response)