Spaces:
Runtime error
Runtime error
from langgraph.graph import START, END, StateGraph | |
from langgraph.graph.state import CompiledStateGraph | |
from typing import Dict, Any, TypedDict, Literal, Optional | |
import logging | |
import datetime | |
from pathlib import Path | |
from args import Args | |
from management import Manager, Assistant | |
# Maximum number of interactions between Assistant and Manager | |
MAX_INTERACTIONS = 5 | |
# Maximum depth of recursion for Manager | |
MAX_DEPTH = 2 | |
# For both Assistant and Manager: | |
TEMPERATURE = 0.7 | |
MAX_TOKENS = 2000 | |
class State(TypedDict): | |
"""State for the agent graph.""" | |
initial_query: str | |
current_message: Optional[str] | |
nr_interactions: int | |
final_response: Optional[str] | |
class GraphBuilder: | |
def __init__(self): | |
""" | |
Initializes the GraphBuilder. | |
""" | |
self.assistant_agent = Assistant(TEMPERATURE, MAX_TOKENS) | |
self.manager_agent = Manager(TEMPERATURE, MAX_TOKENS, MAX_DEPTH) | |
self.final_answer_hint = "Final answer:" | |
def clear_chat_history(self): | |
self.assistant_agent.clear_context() | |
self.manager_agent.clear_context() | |
async def assistant_node(self, state: State) -> State: | |
""" | |
Assistant agent that evaluates the query and decides whether to give a final answer | |
or continue the conversation with the Manager. | |
Uses the existing Assistant implementation. | |
""" | |
if Args.LOGGER is None: | |
raise RuntimeError("LOGGER must be defined before running the assistant_node.") | |
Args.LOGGER.log(logging.INFO, "********** assistant_node **********") | |
if state["current_message"] is None: | |
# First time, just forward the query to the manager | |
response = state["initial_query"] | |
else: | |
assistent_input = f"This is the initial user query:\n\n{state["initial_query"]}\n\nThe manager provided the following answer:\n\n{state["current_message"]}\n" | |
response = await self.assistant_agent.query(assistent_input) | |
# Check if this is a final answer | |
if self.final_answer_hint in response: | |
# Extract the text after final answer hint | |
final_response = response.split(self.final_answer_hint)[-1] | |
final_response = final_response.strip() | |
state["final_response"] = final_response | |
state["current_message"] = response | |
state["nr_interactions"] += 1 | |
return state | |
async def manager_node(self, state: State) -> State: | |
""" | |
Manager agent that handles the queries from the Assistant and provides responses. | |
Uses the existing Manager implementation. | |
""" | |
if Args.LOGGER is None: | |
raise RuntimeError("LOGGER must be defined before running the manager_node.") | |
Args.LOGGER.log(logging.INFO, "********** manager_node **********") | |
if state["current_message"] is None: | |
raise ValueError("manager_node called with no current_message in state") | |
response = await self.manager_agent.query(state["current_message"]) | |
state["current_message"] = response | |
return state | |
async def final_answer_node(self, state: State) -> State: | |
""" | |
Final answer node that formats and returns the final response. | |
If there's already a final answer in the state, it uses that. | |
Otherwise, it asks the assistant to formulate a final answer. | |
""" | |
if Args.LOGGER is None: | |
raise RuntimeError("LOGGER must be defined before running the final_answer_node.") | |
Args.LOGGER.log(logging.INFO, "********** final_answer_node **********") | |
# If we already have a final answer, use it | |
final_response = state.get("final_response") | |
if final_response is not None: | |
Args.LOGGER.log(logging.INFO, f"==========\nFinal response:\n{final_response}\n==========") | |
return state | |
# Otherwise, have the assistant formulate a final answer | |
prompt = f"Based on the conversation so far, provide a final answer to the original query:\n\n{state['initial_query']}" | |
state["current_message"] = prompt | |
response = await self.assistant_agent.query(prompt) | |
# Format the response | |
if self.final_answer_hint not in response: | |
Args.LOGGER.log(logging.WARNING, f"Final_answer_hint '{self.final_answer_hint}' not in response !") | |
response = f"{self.final_answer_hint}{response}" | |
# Extract the text after final answer hint | |
final_response = response.split(self.final_answer_hint)[-1] | |
final_response = final_response.strip() | |
state["final_response"] = final_response | |
Args.LOGGER.log(logging.INFO, f"==========\nFinal response:\n{final_response}\n==========") | |
return state | |
def should_continue(self, state: State) -> Literal["manager", "final_answer"]: | |
""" | |
Decides whether to continue to the Manager or to provide a final answer. | |
Returns: | |
"manager": If the Assistant has decided to continue the conversation | |
"final_answer": If the Assistant has decided to provide a final answer | |
""" | |
if Args.LOGGER is None: | |
raise RuntimeError("LOGGER must be defined before running the should_continue edge.") | |
Args.LOGGER.log(logging.INFO, "++++++++++ should_continue edge ++++++++++") | |
if state["current_message"] is None: | |
raise ValueError("should_continue conditional edge was reached with no current_message in state") | |
message = state["current_message"] | |
if state["nr_interactions"] >= MAX_INTERACTIONS or self.final_answer_hint in message: | |
Args.LOGGER.log(logging.INFO, "++++++++++ should_continue edge decision: final_answer ++++++++++") | |
return "final_answer" | |
Args.LOGGER.log(logging.INFO, "++++++++++ should_continue edge decision: manager ++++++++++") | |
return "manager" | |
def build_agent_graph(self) -> CompiledStateGraph: | |
"""Build and return the agent graph.""" | |
graph = StateGraph(State) | |
# Add the nodes with sync wrappers | |
graph.add_node("assistant", self.assistant_node) | |
graph.add_node("manager", self.manager_node) | |
graph.add_node("final_answer", self.final_answer_node) | |
# Add the edges | |
graph.add_edge(START, "assistant") | |
graph.add_conditional_edges( | |
"assistant", | |
self.should_continue, | |
{ | |
"manager": "manager", | |
"final_answer": "final_answer" | |
} | |
) | |
graph.add_edge("manager", "assistant") | |
graph.add_edge("final_answer", END) | |
return graph.compile() | |
class Alfred: | |
def __init__(self): | |
print("Agent initialized.") | |
Args.LOGGER = self.set_logger() | |
self.graph_builder = GraphBuilder() | |
self.agent_graph = self.graph_builder.build_agent_graph() | |
async def __call__(self, question: str) -> str: | |
print(f"Agent received question (first 50 chars): {question[:50]}...") | |
result = await self.process_query(question) | |
response = result["final_response"] | |
print(f"Agent processed the response: {response}") | |
return response | |
async def process_query(self, query: str) -> Dict[str, Any]: | |
""" | |
Process a query through the agent graph. | |
Args: | |
query: The initial query to process | |
Returns: | |
The final state of the graph execution | |
""" | |
initial_state: State = { | |
"initial_query": query, | |
"current_message": None, | |
"nr_interactions": 0, | |
"final_response": None | |
} | |
self.graph_builder.clear_chat_history() | |
result = await self.agent_graph.ainvoke(initial_state) | |
return result | |
def set_logger(self) -> logging.Logger: | |
""" | |
Configure and return a logger with a file handler that writes to logs/<current date-time>.txt | |
Returns: | |
logging.Logger: Configured logger instance | |
""" | |
# Create logger | |
logger = logging.getLogger("Alfred") | |
logger.setLevel(logging.INFO) | |
# Create formatter | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
# Create logs directory if it doesn't exist | |
logs_dir = Path('logs') | |
logs_dir.mkdir(exist_ok=True) | |
# Generate log filename with current date-time | |
current_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
log_filename = f"{current_time}.txt" | |
log_filepath = logs_dir / log_filename | |
# Create file handler with UTF-8 encoding to handle all Unicode characters | |
file_handler = logging.FileHandler(log_filepath, encoding='utf-8') | |
file_handler.setLevel(logging.INFO) | |
file_handler.setFormatter(formatter) | |
# Add handler to logger | |
logger.addHandler(file_handler) | |
logger.info(f"Logging started at {current_time}") | |
return logger |