Spaces:
Runtime error
Runtime error
| import os | |
| import shutil | |
| from typing import TypedDict, Optional, List, Annotated | |
| from datetime import datetime | |
| from langchain_core.messages import HumanMessage, AIMessage, BaseMessage | |
| from langgraph.graph import START, END, StateGraph | |
| from langgraph.graph.message import add_messages | |
| from langgraph.prebuilt import ToolNode, tools_condition | |
| from langfuse.langchain import CallbackHandler | |
| try: | |
| # Try relative imports first (when used as package) | |
| from .tools import ( | |
| wikipedia_search, youtube_search, decode_text, | |
| download_and_process_file, web_search, evaluate_computation | |
| ) | |
| except ImportError: | |
| # Fall back to absolute imports (when run directly) | |
| from tools import ( | |
| wikipedia_search, youtube_search, decode_text, | |
| download_and_process_file, web_search, evaluate_computation | |
| ) | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| # --- Agent State following LangGraph pattern --- | |
| class AgentState(TypedDict): | |
| # Messages for LLM interactions (includes question and all conversation) | |
| messages: Annotated[List[BaseMessage], add_messages] | |
| # Task ID for file downloads | |
| task_id: Optional[str] | |
| # File classification results | |
| requires_file: Optional[bool] | |
| # Final answer | |
| final_answer: Optional[str] | |
| # --- Flexible Tool-Based Agent --- | |
| class FlexibleAgent: | |
| def __init__(self): | |
| # Initialize Gemini chat model for LangChain integration | |
| self.chat = ChatGoogleGenerativeAI( | |
| model="gemini-2.5-flash", | |
| temperature=0.0, | |
| max_tokens=None | |
| ) | |
| # Define available tools | |
| self.tools = [ | |
| wikipedia_search, | |
| youtube_search, | |
| decode_text, | |
| web_search, | |
| download_and_process_file, | |
| evaluate_computation | |
| ] | |
| # Bind tools to the LLM | |
| self.chat_with_tools = self.chat.bind_tools(self.tools) | |
| # Initialize Langfuse CallbackHandler for tracing | |
| try: | |
| self.langfuse_handler = CallbackHandler() | |
| print("β Langfuse CallbackHandler initialized successfully") | |
| except Exception as e: | |
| print(f"β οΈ Warning: Could not initialize Langfuse CallbackHandler: {e}") | |
| print(" Continuing without Langfuse tracing...") | |
| self.langfuse_handler = None | |
| # Create questions directory for logging | |
| self.questions_dir = "questions" | |
| # Clear previous question files | |
| if os.path.exists(self.questions_dir): | |
| shutil.rmtree(self.questions_dir) | |
| os.makedirs(self.questions_dir, exist_ok=True) | |
| self.question_counter = 0 | |
| # Build the graph following LangGraph pattern | |
| self._build_graph() | |
| print("FlexibleAgent initialized with Gemini LLM and LangGraph workflow.") | |
| def log_full_conversation(self, question: str, final_state: dict, answer: str): | |
| """Log the complete conversation including all tool calls and LLM interactions""" | |
| self.question_counter += 1 | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"question_{self.question_counter:03d}_{timestamp}.txt" | |
| filepath = os.path.join(self.questions_dir, filename) | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| f.write(f"Question #{self.question_counter}\n") | |
| f.write(f"Timestamp: {datetime.now().isoformat()}\n") | |
| f.write(f"Question: {question}\n") | |
| f.write("="*60 + "\n") | |
| f.write("FULL CONVERSATION TRACE:\n") | |
| f.write("="*60 + "\n\n") | |
| # Log all messages in the conversation | |
| messages = final_state.get("messages", []) | |
| for i, message in enumerate(messages): | |
| f.write(f"--- Message {i+1}: {type(message).__name__} ---\n") | |
| f.write(f"Content: {message.content}\n") | |
| # If it's an AI message with tool calls, log the tool calls | |
| if hasattr(message, 'tool_calls') and message.tool_calls: | |
| f.write(f"Tool Calls: {len(message.tool_calls)}\n") | |
| for j, tool_call in enumerate(message.tool_calls): | |
| # Handle both dict and object formats | |
| if hasattr(tool_call, 'name'): | |
| f.write(f" Tool {j+1}: {tool_call.name}\n") | |
| f.write(f" Arguments: {tool_call.args}\n") | |
| f.write(f" ID: {tool_call.id}\n") | |
| elif isinstance(tool_call, dict): | |
| f.write(f" Tool {j+1}: {tool_call.get('name', 'unknown')}\n") | |
| f.write(f" Arguments: {tool_call.get('args', {})}\n") | |
| f.write(f" ID: {tool_call.get('id', 'unknown')}\n") | |
| else: | |
| f.write(f" Tool {j+1}: {str(tool_call)}\n") | |
| # If it's a tool message, show which tool it came from | |
| if hasattr(message, 'tool_call_id'): | |
| f.write(f"Tool Call ID: {message.tool_call_id}\n") | |
| f.write("\n") | |
| f.write("="*60 + "\n") | |
| f.write(f"FINAL ANSWER: {answer}\n") | |
| f.write("="*60 + "\n") | |
| print(f"Logged full conversation to: {filename}") | |
| def classify_file_requirement(self, state: AgentState): | |
| """Check if question mentions an attached file""" | |
| messages = state["messages"] | |
| # Get the original question from first message | |
| if messages and isinstance(messages[0], HumanMessage): | |
| question = messages[0].content.lower() | |
| # Simple keyword check for file attachments | |
| file_keywords = ["attached", "attachment", "see the file", "in the file", | |
| "i've attached", "attached as", "attached file"] | |
| requires_file = any(keyword in question for keyword in file_keywords) | |
| return {"requires_file": requires_file} | |
| return {"requires_file": False} | |
| def download_file_content(self, state: AgentState): | |
| """Download and add file content to messages""" | |
| task_id = state.get("task_id") | |
| if not task_id: | |
| # Add error message | |
| return { | |
| "messages": [HumanMessage(content="Error: No task_id provided for file download")] | |
| } | |
| try: | |
| # Use the download tool directly | |
| file_result = download_and_process_file.invoke({"task_id": task_id}) | |
| # Add file content as a system message | |
| return { | |
| "messages": [HumanMessage(content=f"File content:\n{file_result}")] | |
| } | |
| except Exception as e: | |
| return { | |
| "messages": [HumanMessage(content=f"Error downloading file: {str(e)}")] | |
| } | |
| def call_model(self, state: AgentState): | |
| """Call the model with tools - it will decide what to do""" | |
| messages = state["messages"] | |
| response = self.chat_with_tools.invoke(messages) | |
| return {"messages": [response]} | |
| def analyze_tool_results(self, state: AgentState): | |
| """Analyze if tool results are sufficient to answer the question""" | |
| analysis_prompt = """Based on the tool results above, think through the following: | |
| 1. Do you have enough information to answer the original question? | |
| 2. Are the tool results relevant and helpful? | |
| 3. Do you need to use another tool to get more information? | |
| If you consider that you don't need to use another tool, then try to answer the question based on what infos you have, the best you can. | |
| Think about the fact that the answer may formulated using synonyms or similar words to the ones used in the question. | |
| Even if you are not able to youtube video, the result may be in the description of the video. | |
| Provide your reasoning and conclude with either: | |
| - "READY_TO_ANSWER" if you have sufficient information | |
| - "NEED_MORE_TOOLS" if you need additional tool calls | |
| Format your response as: | |
| REASONING: [your analysis here] | |
| CONCLUSION: [READY_TO_ANSWER or NEED_MORE_TOOLS]""" | |
| messages = state["messages"] + [HumanMessage(content=analysis_prompt)] | |
| response = self.chat.invoke(messages) | |
| # Add the analysis to messages | |
| return {"messages": [response]} | |
| def route_after_analysis(self, state: AgentState) -> str: | |
| """Route based on whether we can answer or need more tools""" | |
| messages = state["messages"] | |
| # Get the last message (should be the analysis) | |
| if messages: | |
| last_message = messages[-1] | |
| if isinstance(last_message, AIMessage): | |
| content = last_message.content.upper() | |
| # Check if ready to answer | |
| if "READY_TO_ANSWER" in content: | |
| return "extract_answer" | |
| elif "NEED_MORE_TOOLS" in content: | |
| return "call_model" | |
| # Default: try to answer | |
| return "extract_answer" | |
| def extract_final_answer(self, state: AgentState): | |
| """Extract ONLY the final answer from the conversation""" | |
| # Create a dedicated extraction prompt that looks at the entire conversation | |
| extraction_prompt = """Based on all the information gathered above, provide ONLY the final answer to the original question. | |
| Rules: | |
| - Return ONLY the answer with NO explanations, sentences, or extra words | |
| - If the answer is a number, write it in digits only | |
| - No punctuation unless it's part of the answer | |
| - No phrases like "The answer is" or "Based on..." | |
| Examples: | |
| - Question: "What is the capital of France?" β Answer: Paris | |
| - Question: "How much is 2+2?" β Answer: 4 | |
| - Question: "What is the opposite of left?" β Answer: right | |
| Final answer only:""" | |
| try: | |
| # Use the full conversation context for extraction | |
| messages = state["messages"] + [HumanMessage(content=extraction_prompt)] | |
| response = self.chat.invoke(messages) | |
| answer = response.content.strip() | |
| # Return dict to update state (LangGraph requirement) | |
| return {"final_answer": answer} | |
| except Exception as e: | |
| print(f"Answer extraction error: {e}") | |
| # Fallback: get the last AI message content | |
| messages = state["messages"] | |
| for msg in reversed(messages): | |
| if isinstance(msg, AIMessage) and not getattr(msg, 'tool_calls', None): | |
| return {"final_answer": msg.content.strip()} | |
| return {"final_answer": "No answer found"} | |
| def route_after_classification(self, state: AgentState) -> str: | |
| """Route based on file requirement""" | |
| if state.get("requires_file"): | |
| return "download_file" | |
| else: | |
| return "call_model" | |
| def _build_graph(self): | |
| """Build LangGraph workflow with reasoning/analysis step""" | |
| graph = StateGraph(AgentState) | |
| # Add nodes | |
| graph.add_node("classify_file", self.classify_file_requirement) | |
| graph.add_node("download_file", self.download_file_content) | |
| graph.add_node("call_model", self.call_model) | |
| graph.add_node("tools", ToolNode(self.tools)) | |
| graph.add_node("analyze_results", self.analyze_tool_results) | |
| graph.add_node("extract_answer", self.extract_final_answer) | |
| # Define the flow | |
| graph.add_edge(START, "classify_file") | |
| # After classification, either download file or go to model | |
| graph.add_conditional_edges( | |
| "classify_file", | |
| self.route_after_classification, | |
| { | |
| "download_file": "download_file", | |
| "call_model": "call_model" | |
| } | |
| ) | |
| # After downloading file, call model | |
| graph.add_edge("download_file", "call_model") | |
| # After model call, check if tools were called | |
| graph.add_conditional_edges( | |
| "call_model", | |
| tools_condition, # Built-in function that checks for tool calls | |
| { | |
| "tools": "tools", # If tools called, execute them | |
| END: "extract_answer", # No tools, go straight to answer | |
| } | |
| ) | |
| # After tools execute, analyze the results | |
| graph.add_edge("tools", "analyze_results") | |
| # After analysis, decide next step | |
| graph.add_conditional_edges( | |
| "analyze_results", | |
| self.route_after_analysis, | |
| { | |
| "extract_answer": "extract_answer", # Ready to answer | |
| "call_model": "call_model", # Need more tools | |
| } | |
| ) | |
| # After extracting answer, we're done | |
| graph.add_edge("extract_answer", END) | |
| # Compile the graph | |
| self.compiled_graph = graph.compile() | |
| def __call__(self, question: str, task_id: Optional[str] = None) -> str: | |
| """Process question using LangGraph workflow""" | |
| print(f"Processing: {question[:50]}...") | |
| # Create initial state with just the question as a message | |
| initial_state = { | |
| "messages": [HumanMessage(content=question)], | |
| "task_id": task_id, | |
| "requires_file": None, | |
| "final_answer": None | |
| } | |
| try: | |
| # Run the graph with Langfuse tracing | |
| config = {"recursion_limit": 25} | |
| # Add Langfuse callback handler if available | |
| if self.langfuse_handler: | |
| config["callbacks"] = [self.langfuse_handler] | |
| print("π Running with Langfuse tracing enabled") | |
| result = self.compiled_graph.invoke(initial_state, config=config) | |
| # Extract the final answer from the state | |
| answer = result.get("final_answer", "No answer found") | |
| print(f"Answer: {answer[:50]}...") | |
| # Log the complete conversation for review | |
| self.log_full_conversation(question, result, answer) | |
| return answer | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| error_answer = "Error occurred." | |
| # Create a minimal state for error logging | |
| error_state = { | |
| "question": question, | |
| "messages": [ | |
| HumanMessage(content=question), | |
| AIMessage(content=f"Error: {str(e)}") | |
| ] | |
| } | |
| self.log_full_conversation(question, error_state, error_answer) | |
| return error_answer | |
| if __name__ == "__main__": | |
| print("Testing FlexibleAgent with a simple question...") | |
| try: | |
| # Create an instance of the agent | |
| agent = FlexibleAgent() | |
| # Test with a simple math question | |
| test_question = "How much is 2+2?" | |
| print(f"\nQuestion: {test_question}") | |
| # Get the answer | |
| answer = agent(test_question) | |
| print(f"Answer: {answer}") | |
| # Check if the answer is correct | |
| if answer == "4": | |
| print("β Test passed! The agent correctly answered the math question.") | |
| else: | |
| print("β Test failed. Expected the answer to be '4'.") | |
| answer = agent("What is the surname of the equine veterinarian mentioned in 1.E Exercises from the chemistry materials licensed by Marisa Alviar-Agnew & Henry Agnew under the CK-12 license in LibreText's Introductory Chemistry materials as compiled 08/21/2023?") | |
| print(f"Answer: {answer}") | |
| if answer == "Louvrier": | |
| print("β Test passed! The agent correctly answered the question.") | |
| else: | |
| print("β Test failed. Expected the answer to contain 'Louvrier'.") | |
| answer = agent("In the video https://www.youtube.com/watch?v=L1vXCYZAYYM, what is the highest number of bird species to be on camera simultaneously?") | |
| print(f"Answer: {answer}") | |
| if answer == "3": | |
| print("β Test passed! The agent correctly answered the question.") | |
| else: | |
| print("β Test failed. Expected the answer to be '3'.") | |
| except Exception as e: | |
| import traceback | |
| print(f"β Test failed with error: {e}") | |
| print("Full traceback:") | |
| traceback.print_exc() |