| import os |
| import logging |
| import warnings |
| import re |
| import time |
|
|
| |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' |
| logging.getLogger('tensorflow').setLevel(logging.ERROR) |
| warnings.filterwarnings('ignore', module='tensorflow') |
| warnings.filterwarnings('ignore', module='tf_keras') |
|
|
| from typing import TypedDict, Optional, List, Annotated |
| from langchain_core.messages import HumanMessage, SystemMessage |
| from langgraph.graph import MessagesState, StateGraph, START, END |
| from langgraph.graph.message import add_messages |
| from langgraph.prebuilt import tools_condition |
| from langgraph.prebuilt import ToolNode |
| from langchain_google_genai import ChatGoogleGenerativeAI |
| from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint |
|
|
| from custom_tools import get_custom_tools_list |
| from system_prompt import SYSTEM_PROMPT |
| from utils import cleanup_answer, extract_text_from_content |
| import config |
| from langfuse_tracking import track_agent_execution, track_llm_call |
|
|
| |
| try: |
| from bs4 import GuessedAtParserWarning |
| warnings.filterwarnings('ignore', category=GuessedAtParserWarning) |
| except ImportError: |
| pass |
|
|
|
|
| class AgentState(TypedDict): |
| question: str |
| messages: Annotated[list , add_messages] |
| answer: str |
| step_count: int |
| file_name: str |
|
|
|
|
| class LangGraphAgent: |
|
|
| def __init__(self): |
| |
| if not os.getenv("GOOGLE_API_KEY"): |
| print("WARNING: GOOGLE_API_KEY not found - analyze_youtube_video will fail") |
|
|
| self.tools = get_custom_tools_list() |
| self.llm_client_with_tools = self._create_llm_client() |
| self.graph = self._build_graph() |
|
|
| def _create_llm_client(self, model_provider: str = "google"): |
| """Create and return the LLM client with tools bound based on the model provider.""" |
|
|
| if model_provider == "google": |
| apikey = os.getenv("GOOGLE_API_KEY") |
|
|
| return ChatGoogleGenerativeAI( |
| model=config.ACTIVE_AGENT_LLM_MODEL, |
| temperature=0, |
| api_key=apikey, |
| timeout=60 |
| ).bind_tools(self.tools) |
|
|
| elif model_provider == "huggingface": |
| LLM_MODEL = "meta-llama/Llama-3.1-8B-Instruct" |
| apikey = os.getenv("HUGGINGFACEHUB_API_TOKEN") |
|
|
| llmObject = HuggingFaceEndpoint( |
| repo_id=LLM_MODEL, |
| task="text-generation", |
| max_new_tokens=512, |
| temperature=0.7, |
| do_sample=False, |
| repetition_penalty=1.03, |
| huggingfacehub_api_token=apikey |
| ) |
| return ChatHuggingFace(llm=llmObject).bind_tools(self.tools) |
|
|
| |
| def _init_questions(self, state: AgentState): |
| """Initialize the messages in the state with system prompt and user question.""" |
|
|
| |
| question_content = state["question"] |
| if state.get("file_name"): |
| question_content += f'\n\nNote: This question references a file: {state["file_name"]}' |
|
|
| return { |
| "messages": [ |
| SystemMessage(content=SYSTEM_PROMPT), |
| HumanMessage(content=question_content) |
| ], |
| "step_count": 0 |
| } |
|
|
| @track_llm_call(config.ACTIVE_AGENT_LLM_MODEL) |
| def _assistant(self, state: AgentState): |
| """Assistant node which calls the LLM with tools""" |
|
|
| |
| current_step = state.get("step_count", 0) + 1 |
| print(f"[STEP {current_step}] Calling assistant with {len(state['messages'])} messages") |
|
|
| |
| max_retries = config.MAX_RETRIES |
| delay = config.INITIAL_RETRY_DELAY |
|
|
| for attempt in range(max_retries + 1): |
| try: |
| response = self.llm_client_with_tools.invoke(state["messages"]) |
| |
| break |
| except Exception as e: |
| error_msg = str(e) |
|
|
| |
| if "504" in error_msg and "DEADLINE_EXCEEDED" in error_msg: |
| if attempt < max_retries: |
| print(f"[RETRY] Attempt {attempt + 1}/{max_retries} failed with 504 DEADLINE_EXCEEDED") |
| print(f"[RETRY] Retrying in {delay:.1f} seconds...") |
| time.sleep(delay) |
| delay *= config.RETRY_BACKOFF_FACTOR |
| continue |
| else: |
| print(f"[RETRY] All {max_retries} retries exhausted for 504 error") |
| print(f"[ERROR] LLM invocation failed after retries: {e}") |
| return { |
| "messages": [], |
| "answer": f"Error: LLM failed after {max_retries} retries - {str(e)[:100]}", |
| "step_count": current_step |
| } |
| else: |
| |
| print(f"[ERROR] LLM invocation failed: {e}") |
| return { |
| "messages": [], |
| "answer": f"Error: LLM failed - {str(e)[:100]}", |
| "step_count": current_step |
| } |
|
|
| |
| if not response.tool_calls: |
| content = response.content |
| print(f"[FINAL ANSWER] Agent produced answer (no tool calls)") |
|
|
| |
| if isinstance(content, list): |
| |
| text_parts = [] |
| for item in content: |
| if isinstance(item, dict) and 'text' in item: |
| text_parts.append(item['text']) |
| elif hasattr(item, 'text'): |
| text_parts.append(item.text) |
| else: |
| text_parts.append(str(item)) |
| content = " ".join(text_parts) |
| elif isinstance(content, dict) and 'text' in content: |
| |
| content = content['text'] |
| elif hasattr(content, 'text'): |
| |
| content = content.text |
| else: |
| |
| content = str(content) |
|
|
| |
| content = content.strip() |
| print(f"[EXTRACTED TEXT] {content[:100]}{'...' if len(content) > 100 else ''}") |
|
|
| return { |
| "messages": [response], |
| "answer": content, |
| "step_count": current_step |
| } |
|
|
| |
| print(f"[TOOL CALLS] Agent requesting {len(response.tool_calls)} tool(s):") |
| for tc in response.tool_calls: |
| print(f" - {tc['name']}") |
|
|
| return { |
| "messages": [response], |
| "step_count": current_step |
| } |
|
|
|
|
| def _should_continue(self, state: AgentState): |
| """Check if we should continue or stop based on step count and other conditions.""" |
|
|
| step_count = state.get("step_count", 0) |
|
|
| |
| if step_count >= 40: |
| print(f"[WARNING] Max steps (40) reached, forcing termination") |
| |
| if not state.get("answer"): |
| state["answer"] = "Error: Maximum iteration limit reached" |
| return END |
|
|
| |
| return tools_condition(state) |
|
|
|
|
| def _build_graph(self): |
| """Build and return the Compiled Graph for the agent.""" |
|
|
| graph = StateGraph(AgentState) |
|
|
| |
| graph.add_node("init", self._init_questions) |
| graph.add_node("assistant", self._assistant) |
| graph.add_node("tools", ToolNode(self.tools)) |
| graph.add_edge(START, "init") |
| graph.add_edge("init", "assistant") |
| graph.add_conditional_edges( |
| "assistant", |
| |
| self._should_continue, |
| ) |
| graph.add_edge("tools", "assistant") |
| |
| return graph.compile() |
|
|
| @track_agent_execution("LangGraph") |
| def __call__(self, question: str, file_name: str = None) -> str: |
| """Invoke the agent graph with the given question and return the final answer. |
| |
| Args: |
| question: The question to answer |
| file_name: Optional file name if the question references a file |
| """ |
|
|
| print(f"\n{'='*60}") |
| print(f"[LANGGRAPH AGENT START] Question: {question}") |
| if file_name: |
| print(f"[FILE] {file_name}") |
| print(f"{'='*60}") |
|
|
| start_time = time.time() |
|
|
| try: |
| response = self.graph.invoke( |
| {"question": question, "messages": [], "answer": None, "step_count": 0, "file_name": file_name or ""}, |
| config={"recursion_limit": 80} |
| ) |
|
|
| elapsed_time = time.time() - start_time |
| print(f"[LANGGRAPH AGENT COMPLETE] Time: {elapsed_time:.2f}s") |
| print(f"{'='*60}\n") |
|
|
| answer = response.get("answer") |
| if not answer or answer is None: |
| print("[WARNING] Agent completed but returned None as answer") |
| return "Error: No answer generated" |
|
|
| |
| answer = extract_text_from_content(answer) |
|
|
| |
| answer = cleanup_answer(answer) |
|
|
| print(f"[FINAL ANSWER] {answer}") |
| return answer |
|
|
| except Exception as e: |
| elapsed_time = time.time() - start_time |
| print(f"[LANGGRAPH AGENT ERROR] Failed after {elapsed_time:.2f}s: {e}") |
| print(f"{'='*60}\n") |
| return f"Error: {str(e)[:100]}" |
|
|