Spaces:
Running
Running
| """ | |
| AI AGENT WITH LANGGRAPH + AI-DRIVEN TOOL CALLING | |
| Flow: | |
| 1. AI phân loại câu hỏi và quyết định tool | |
| 2. LangGraph nodes thực hiện tools | |
| 3. AI quyết định tiếp tục hoặc kết thúc | |
| 4. Qwen3-8B làm main reasoning engine | |
| Architecture: | |
| - Qwen3-8B via HuggingFace InferenceClient | |
| - LangGraph workflow với dynamic routing | |
| - AI-powered decision making (không hardcode) | |
| """ | |
| import os | |
| import json | |
| import tempfile | |
| import requests | |
| from typing import List, Dict, Any, Annotated | |
| from dotenv import load_dotenv | |
| # LangGraph imports | |
| from langgraph.graph import StateGraph, END | |
| from langgraph.graph.message import add_messages | |
| from typing_extensions import TypedDict | |
| # HuggingFace imports | |
| from huggingface_hub import InferenceClient | |
| # Other imports | |
| import wikipedia | |
| from PIL import Image | |
| import pandas as pd | |
| import yt_dlp | |
| from groq import Groq | |
| # OCR alternative - fallback to basic image processing | |
| try: | |
| import easyocr | |
| OCR_AVAILABLE = True | |
| except ImportError: | |
| OCR_AVAILABLE = False | |
| print("⚠️ EasyOCR not available, using fallback image processing") | |
| # Load environment | |
| load_dotenv() | |
| # ============================================================================= | |
| # STATE DEFINITION | |
| # ============================================================================= | |
| class AgentState(TypedDict): | |
| messages: Annotated[list, add_messages] | |
| question: str | |
| task_id: str | |
| file_name: str | |
| ai_decision: Dict[str, Any] # AI's decision about what to do | |
| tool_results: Dict[str, Any] | |
| answer: str | |
| continue_workflow: bool | |
| # ============================================================================= | |
| # QWEN3-8B AI BRAIN | |
| # ============================================================================= | |
| class Qwen3Brain: | |
| """Main AI brain using Qwen3-8B for all decisions""" | |
| def __init__(self): | |
| self.client = InferenceClient( | |
| provider="auto", | |
| api_key=os.environ.get("HF_TOKEN", "") | |
| ) | |
| self.model_name = "Qwen/Qwen3-8B" | |
| print("🧠 Qwen3-8B AI Brain initialized") | |
| def think(self, prompt: str) -> str: | |
| """Main thinking function""" | |
| try: | |
| completion = self.client.chat.completions.create( | |
| model=self.model_name, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ], | |
| max_tokens=2048, | |
| temperature=0.6 | |
| ) | |
| return completion.choices[0].message.content | |
| except Exception as e: | |
| return f"AI Error: {str(e)}" | |
| def decide_action(self, question: str, task_id: str = "", file_name: str = "") -> Dict[str, Any]: | |
| """AI decides what action to take""" | |
| prompt = f"""You are an intelligent AI agent. Analyze this question and decide the next action. | |
| Question: {question} | |
| Task ID: {task_id} | |
| File name: {file_name} | |
| Available actions: | |
| 1. "answer_directly" - if you can answer without tools | |
| 2. "transcribe_audio" - for audio files | |
| 3. "ocr_image" - for images with text | |
| 4. "read_file" - for Python/Excel/text files | |
| 5. "search_wikipedia" - for factual information | |
| 6. "calculate_math" - for math calculations | |
| 7. "get_youtube" - for YouTube videos | |
| 8. "download_file" - to get files from API | |
| Respond in JSON format: | |
| {{ | |
| "action": "action_name", | |
| "reasoning": "why you chose this", | |
| "params": "parameters needed (if any)", | |
| "can_answer_now": true/false | |
| }} | |
| Be decisive and clear about your choice.""" | |
| try: | |
| response = self.think(prompt) | |
| # Try to parse JSON | |
| return json.loads(response) | |
| except: | |
| # Fallback if JSON parsing fails | |
| return { | |
| "action": "answer_directly", | |
| "reasoning": "JSON parsing failed, answering directly", | |
| "params": "", | |
| "can_answer_now": True | |
| } | |
| def final_answer(self, question: str, tool_results: Dict[str, Any]) -> str: | |
| """Generate final answer based on question and tool results""" | |
| prompt = f"""Generate the final answer based on the question and any tool results. | |
| Question: {question} | |
| Tool results: {json.dumps(tool_results, indent=2)} | |
| Provide a clear, direct answer to the original question. Use the tool results if available.""" | |
| return self.think(prompt) | |
| # ============================================================================= | |
| # TOOLS AS LANGGRAPH NODES | |
| # ============================================================================= | |
| # Initialize components | |
| ai_brain = Qwen3Brain() | |
| # Initialize Groq client with error handling | |
| try: | |
| groq_client = Groq(api_key=os.environ.get("GROQ_API_KEY", "")) | |
| print("✅ Groq client initialized") | |
| except Exception as e: | |
| print(f"⚠️ Groq client initialization failed: {e}") | |
| groq_client = None | |
| # Initialize OCR with fallback | |
| if OCR_AVAILABLE: | |
| ocr_reader = easyocr.Reader(['en']) | |
| else: | |
| ocr_reader = None | |
| def ai_decision_node(state: AgentState) -> AgentState: | |
| """AI decides what to do next""" | |
| question = state["question"] | |
| task_id = state.get("task_id", "") | |
| file_name = state.get("file_name", "") | |
| decision = ai_brain.decide_action(question, task_id, file_name) | |
| state["ai_decision"] = decision | |
| print(f"🧠 AI Decision: {decision['action']} - {decision['reasoning']}") | |
| return state | |
| def answer_directly_node(state: AgentState) -> AgentState: | |
| """Answer question directly without tools""" | |
| question = state["question"] | |
| prompt = f"Answer this question directly: {question}" | |
| answer = ai_brain.think(prompt) | |
| state["answer"] = answer | |
| state["continue_workflow"] = False | |
| return state | |
| def transcribe_audio_node(state: AgentState) -> AgentState: | |
| """Transcribe audio files""" | |
| task_id = state.get("task_id", "") | |
| try: | |
| # Download file | |
| file_path = download_file(task_id) | |
| if not file_path.startswith("Error") and groq_client: | |
| # Transcribe | |
| with open(file_path, "rb") as f: | |
| transcription = groq_client.audio.transcriptions.create( | |
| file=(file_path, f.read()), | |
| model="whisper-large-v3-turbo", | |
| response_format="text", | |
| language="en" | |
| ) | |
| result = transcription.text | |
| elif not groq_client: | |
| result = "Audio transcription not available - Groq client not initialized" | |
| else: | |
| result = file_path | |
| state["tool_results"]["audio_transcript"] = result | |
| except Exception as e: | |
| state["tool_results"]["audio_transcript"] = f"Audio error: {str(e)}" | |
| state["continue_workflow"] = True | |
| return state | |
| def ocr_image_node(state: AgentState) -> AgentState: | |
| """Extract text from images""" | |
| task_id = state.get("task_id", "") | |
| try: | |
| # Download file | |
| file_path = download_file(task_id) | |
| if not file_path.startswith("Error"): | |
| if OCR_AVAILABLE and ocr_reader: | |
| # Use EasyOCR | |
| results = ocr_reader.readtext(file_path) | |
| text = " ".join([result[1] for result in results]) | |
| result = text if text.strip() else "No text found" | |
| else: | |
| # Fallback: Basic image info | |
| try: | |
| img = Image.open(file_path) | |
| result = f"Image info: {img.format} {img.size} {img.mode} - OCR not available, please describe the image content" | |
| except: | |
| result = "Image file detected but cannot process without OCR" | |
| else: | |
| result = file_path | |
| state["tool_results"]["ocr_text"] = result | |
| except Exception as e: | |
| state["tool_results"]["ocr_text"] = f"OCR error: {str(e)}" | |
| state["continue_workflow"] = True | |
| return state | |
| def read_file_node(state: AgentState) -> AgentState: | |
| """Read various file types""" | |
| task_id = state.get("task_id", "") | |
| try: | |
| # Download file | |
| file_path = download_file(task_id) | |
| if not file_path.startswith("Error"): | |
| # Read based on file type | |
| if file_path.endswith('.py'): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| result = f"Python code:\n{f.read()}" | |
| elif file_path.endswith(('.xlsx', '.xls')): | |
| df = pd.read_excel(file_path) | |
| result = f"Excel data:\n{df.to_string()}" | |
| elif file_path.endswith('.csv'): | |
| df = pd.read_csv(file_path) | |
| result = f"CSV data:\n{df.to_string()}" | |
| else: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| result = f"File content:\n{f.read()}" | |
| else: | |
| result = file_path | |
| state["tool_results"]["file_content"] = result | |
| except Exception as e: | |
| state["tool_results"]["file_content"] = f"File reading error: {str(e)}" | |
| state["continue_workflow"] = True | |
| return state | |
| def search_wikipedia_node(state: AgentState) -> AgentState: | |
| """Search Wikipedia""" | |
| question = state["question"] | |
| params = state["ai_decision"].get("params", "") | |
| # Use AI to determine search query if params not provided | |
| if not params: | |
| query_prompt = f"Extract the main search term for Wikipedia from: '{question}'. Return only the search term." | |
| search_query = ai_brain.think(query_prompt).strip() | |
| else: | |
| search_query = params | |
| try: | |
| wikipedia.set_lang("en") | |
| page = wikipedia.page(search_query) | |
| result = f"Title: {page.title}\nSummary: {page.summary[:2000]}" | |
| except: | |
| try: | |
| results = wikipedia.search(search_query, results=1) | |
| if results: | |
| page = wikipedia.page(results[0]) | |
| result = f"Title: {page.title}\nSummary: {page.summary[:2000]}" | |
| else: | |
| result = f"No Wikipedia results for: {search_query}" | |
| except: | |
| result = f"Wikipedia search failed for: {search_query}" | |
| state["tool_results"]["wikipedia"] = result | |
| state["continue_workflow"] = True | |
| return state | |
| def calculate_math_node(state: AgentState) -> AgentState: | |
| """Perform math calculations""" | |
| question = state["question"] | |
| # Extract math expression using AI | |
| extract_prompt = f"Extract ONLY the mathematical expression from: '{question}'. Return just the expression like '15+27'." | |
| expression = ai_brain.think(extract_prompt).strip() | |
| # Clean expression | |
| import re | |
| cleaned = re.findall(r'[\d+\-*/\(\)\.\s]+', expression) | |
| if cleaned: | |
| expression = cleaned[0].strip() | |
| try: | |
| # Safe evaluation | |
| allowed_chars = set('0123456789+-*/.() ') | |
| if all(c in allowed_chars for c in expression): | |
| result = str(eval(expression)) | |
| else: | |
| result = "Invalid mathematical expression" | |
| except Exception as e: | |
| result = f"Calculation error: {str(e)}" | |
| state["tool_results"]["calculation"] = result | |
| state["continue_workflow"] = True | |
| return state | |
| def get_youtube_node(state: AgentState) -> AgentState: | |
| """Get YouTube video info""" | |
| params = state["ai_decision"].get("params", "") | |
| try: | |
| ydl_opts = { | |
| 'writesubtitles': True, | |
| 'writeautomaticsub': True, | |
| 'subtitleslangs': ['en'], | |
| 'skip_download': True, | |
| 'quiet': True | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(params, download=False) | |
| title = info.get('title', 'Unknown') | |
| description = info.get('description', 'No description')[:500] | |
| result = f"Video: {title}\nDescription: {description}" | |
| except Exception as e: | |
| result = f"YouTube error: {str(e)}" | |
| state["tool_results"]["youtube"] = result | |
| state["continue_workflow"] = True | |
| return state | |
| def download_file_node(state: AgentState) -> AgentState: | |
| """Download file from API""" | |
| task_id = state.get("task_id", "") | |
| try: | |
| result = download_file(task_id) | |
| state["tool_results"]["downloaded_file"] = result | |
| except Exception as e: | |
| state["tool_results"]["downloaded_file"] = f"Download error: {str(e)}" | |
| state["continue_workflow"] = True | |
| return state | |
| def final_answer_node(state: AgentState) -> AgentState: | |
| """Generate final answer using AI""" | |
| question = state["question"] | |
| tool_results = state.get("tool_results", {}) | |
| answer = ai_brain.final_answer(question, tool_results) | |
| state["answer"] = answer | |
| state["continue_workflow"] = False | |
| return state | |
| # ============================================================================= | |
| # HELPER FUNCTIONS | |
| # ============================================================================= | |
| def download_file(task_id: str) -> str: | |
| """Download file from API""" | |
| try: | |
| api_url = "https://agents-course-unit4-scoring.hf.space" | |
| file_url = f"{api_url}/files/{task_id}" | |
| response = requests.get(file_url, timeout=30) | |
| if response.status_code == 200: | |
| # Determine file extension | |
| content_type = response.headers.get('content-type', '') | |
| if 'audio' in content_type: | |
| suffix = '.mp3' | |
| elif 'image' in content_type: | |
| suffix = '.png' | |
| elif 'excel' in content_type: | |
| suffix = '.xlsx' | |
| elif 'python' in content_type: | |
| suffix = '.py' | |
| else: | |
| suffix = '.tmp' | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: | |
| tmp_file.write(response.content) | |
| return tmp_file.name | |
| else: | |
| return f"Error: HTTP {response.status_code}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| # ============================================================================= | |
| # LANGGRAPH WORKFLOW | |
| # ============================================================================= | |
| def create_ai_agent_workflow(): | |
| """Create LangGraph workflow with AI-driven routing""" | |
| workflow = StateGraph(AgentState) | |
| # Add all nodes | |
| workflow.add_node("decision", ai_decision_node) | |
| workflow.add_node("direct_answer", answer_directly_node) | |
| workflow.add_node("audio_transcribe", transcribe_audio_node) | |
| workflow.add_node("image_ocr", ocr_image_node) | |
| workflow.add_node("file_read", read_file_node) | |
| workflow.add_node("wiki_search", search_wikipedia_node) | |
| workflow.add_node("math_calc", calculate_math_node) | |
| workflow.add_node("youtube_get", get_youtube_node) | |
| workflow.add_node("file_download", download_file_node) | |
| workflow.add_node("generate_answer", final_answer_node) | |
| # Dynamic routing based on AI decision | |
| def route_by_ai_decision(state: AgentState) -> str: | |
| action = state.get("ai_decision", {}).get("action", "answer_directly") | |
| print(f"🔀 Routing to: {action}") | |
| return action | |
| # Conditional routing from decision | |
| workflow.add_conditional_edges( | |
| "decision", | |
| route_by_ai_decision, | |
| { | |
| "answer_directly": "direct_answer", | |
| "transcribe_audio": "audio_transcribe", | |
| "ocr_image": "image_ocr", | |
| "read_file": "file_read", | |
| "search_wikipedia": "wiki_search", | |
| "calculate_math": "math_calc", | |
| "get_youtube": "youtube_get", | |
| "download_file": "file_download" | |
| } | |
| ) | |
| # Continue or end based on workflow state | |
| def should_continue(state: AgentState) -> str: | |
| if state.get("continue_workflow", False): | |
| return "generate_answer" | |
| else: | |
| return END | |
| # Add continue/end logic for tool nodes | |
| tool_nodes = [ | |
| "audio_transcribe", "image_ocr", "file_read", | |
| "wiki_search", "math_calc", "youtube_get", "file_download" | |
| ] | |
| for node in tool_nodes: | |
| workflow.add_conditional_edges( | |
| node, | |
| should_continue, | |
| { | |
| "generate_answer": "generate_answer", | |
| END: END | |
| } | |
| ) | |
| # End edges | |
| workflow.add_edge("direct_answer", END) | |
| workflow.add_edge("generate_answer", END) | |
| # Set entry point | |
| workflow.set_entry_point("decision") | |
| return workflow.compile() | |
| # ============================================================================= | |
| # MAIN AGENT CLASS | |
| # ============================================================================= | |
| class LangGraphAIAgent: | |
| """LangGraph agent with AI-driven tool calling""" | |
| def __init__(self): | |
| self.workflow = create_ai_agent_workflow() | |
| print("🤖 LangGraph AI Agent with Qwen3-8B ready!") | |
| print("🔧 Available tools: transcribe_audio, ocr_image, read_file, search_wikipedia, calculate_math, get_youtube") | |
| def process_question(self, question: str, task_id: str = "", file_name: str = "") -> str: | |
| """Process question through AI-driven workflow""" | |
| try: | |
| # Initialize state | |
| initial_state = { | |
| "messages": [], | |
| "question": question, | |
| "task_id": task_id, | |
| "file_name": file_name, | |
| "ai_decision": {}, | |
| "tool_results": {}, | |
| "answer": "", | |
| "continue_workflow": False | |
| } | |
| # Run workflow | |
| result = self.workflow.invoke(initial_state) | |
| return result.get("answer", "No answer generated") | |
| except Exception as e: | |
| return f"Agent error: {str(e)}" | |
| # ============================================================================= | |
| # GLOBAL AGENT | |
| # ============================================================================= | |
| # Create global agent instance | |
| agent = LangGraphAIAgent() | |
| def process_question(question: str, task_id: str = "", file_name: str = "") -> str: | |
| """Main entry point""" | |
| return agent.process_question(question, task_id, file_name) | |
| # ============================================================================= | |
| # TEST | |
| # ============================================================================= | |
| if __name__ == "__main__": | |
| test_questions = [ | |
| "What is 25 + 17?", | |
| "Who was Mercedes Sosa?", | |
| "What is the opposite of left?", | |
| "How many continents are there?" | |
| ] | |
| print("🧪 Testing LangGraph AI Agent:") | |
| for i, q in enumerate(test_questions): | |
| print(f"\n--- Test {i+1} ---") | |
| print(f"Q: {q}") | |
| answer = process_question(q) | |
| print(f"A: {answer}") | |
| print("-" * 50) |