Spaces:
Running
Running
| # ===================== api.py ========================== | |
| from typing import List, Dict | |
| from pathlib import Path | |
| from fastapi import FastAPI, UploadFile, File, Form | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel | |
| from config.config import Config | |
| from config.system_prompt import PPLX_SYSTEM_PROMPT | |
| # Core routing / graph | |
| from rag.router import RouterAgent | |
| from rag.graph_deep import ( | |
| DeepResearchGraph, | |
| WebSearchGraph, | |
| RAGOnlyGraph, | |
| AgenticRAGGraph, | |
| AnalysisGraph, | |
| SummarizeGraph | |
| ) | |
| # Tools | |
| from tools.memory_tool import MemoryTool | |
| from tools.name_tool import NameTool | |
| from tools.search_tool import SearchTool | |
| from tools.browse_tool import BrowseTool | |
| from tools.reranker_tool import Reranker | |
| from tools.followup_tool import FollowUpGenerator | |
| from tools.image_tavily import TavilyImageSearch | |
| from tools.knowledge_panel import KnowledgePanel | |
| from tools.summarizer_tool import SummarizerTool | |
| # RAG pipeline | |
| from document_processing.processor import DocumentProcessor | |
| from vectorstore.store import VectorStore | |
| # File Manager for per-workspace RAG | |
| from files.file_manager import FileManager | |
| # ======================================================= | |
| # FastAPI App | |
| # ======================================================= | |
| app = FastAPI(title="Perplexity Clone API", version="8.0 - Production LangGraph") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_headers=["*"], | |
| allow_methods=["*"], | |
| allow_credentials=True, | |
| ) | |
| import os | |
| # ======================================================= | |
| # Health Check Endpoint | |
| # ======================================================= | |
| async def health_check(): | |
| """Health check endpoint - shows API key status""" | |
| groq_key = os.getenv("GROQ_API_KEY", "") | |
| tavily_key = os.getenv("TAVILY_API_KEY", "") | |
| return { | |
| "status": "healthy", | |
| "service": "perplexity-clone-api", | |
| "api_keys": { | |
| "groq": "configured" if groq_key else "MISSING", | |
| "tavily": "configured" if tavily_key else "MISSING" | |
| }, | |
| "model": Config.LLM_MODEL | |
| } | |
| # ======================================================= | |
| # Global Components - with startup logging | |
| # ======================================================= | |
| print("\n" + "="*50) | |
| print("π PERPLEXITY CLONE API STARTING UP") | |
| print("="*50) | |
| # Check API keys at startup | |
| _groq_key = os.getenv("GROQ_API_KEY", "") | |
| _tavily_key = os.getenv("TAVILY_API_KEY", "") | |
| print(f"π GROQ_API_KEY: {'β Configured' if _groq_key else 'β MISSING'}") | |
| print(f"π TAVILY_API_KEY: {'β Configured' if _tavily_key else 'β MISSING'}") | |
| try: | |
| llm = Config.get_llm() | |
| print("β LLM initialized successfully") | |
| except Exception as e: | |
| print(f"β LLM initialization failed: {e}") | |
| llm = None | |
| router = RouterAgent() | |
| memory = MemoryTool() | |
| name_tool = NameTool() | |
| followup = FollowUpGenerator() | |
| try: | |
| search_tool = SearchTool() | |
| print("β SearchTool initialized") | |
| except Exception as e: | |
| print(f"β οΈ SearchTool init error: {e}") | |
| search_tool = None | |
| browse_tool = BrowseTool() | |
| try: | |
| image_search = TavilyImageSearch() | |
| print("β ImageSearch initialized") | |
| except Exception as e: | |
| print(f"β οΈ ImageSearch init error: {e}") | |
| image_search = None | |
| summarizer = SummarizerTool() | |
| # Load all components | |
| reranker = Reranker() | |
| knowledge_panel = KnowledgePanel() | |
| print("\nπ Loading demo documents...") | |
| # RAG demo vectorstore | |
| processor = DocumentProcessor( | |
| chunk_size=Config.CHUNK_SIZE, | |
| chunk_overlap=Config.CHUNK_OVERLAP, | |
| ) | |
| demo_docs = processor.load_url("https://lilianweng.github.io/posts/2023-06-23-agent/") | |
| demo_splits = processor.split(demo_docs) | |
| vector = VectorStore() | |
| vector.create(demo_splits) | |
| print("β Vector store initialized") | |
| # File manager for per-workspace document RAG | |
| file_manager = FileManager(base_dir="workspace_data") | |
| # ======================================================= | |
| # Initialize All LangGraph Pipelines | |
| # ======================================================= | |
| deep_graph = DeepResearchGraph(vector) | |
| rag_graph = RAGOnlyGraph(file_manager) | |
| agentic_graph = AgenticRAGGraph(file_manager, vector, image_search) | |
| web_graph = WebSearchGraph() | |
| analysis_graph = AnalysisGraph() | |
| summarize_graph = SummarizeGraph() | |
| print("β All LangGraph pipelines initialized!") | |
| # ======================================================= | |
| # Models | |
| # ======================================================= | |
| class ChatRequest(BaseModel): | |
| message: str | |
| workspace_id: str = "default" | |
| class ChatResponse(BaseModel): | |
| answer: str | |
| sources: List[Dict[str, str]] = [] | |
| links: List[Dict[str, str]] = [] | |
| images: List[Dict[str, str]] = [] | |
| followups: List[str] = [] | |
| default_tab: str = "answer" # "answer" | "links" | "images" | |
| workspace_id: str | |
| # ======================================================= | |
| # Utils | |
| # ======================================================= | |
| def build_context(ws: str, new_msg: str): | |
| """Inject full workspace chat history + system prompt.""" | |
| messages = [{"role": "system", "content": PPLX_SYSTEM_PROMPT}] | |
| for msg in memory.get_long_chat(ws): | |
| messages.append({"role": msg["role"], "content": msg["content"]}) | |
| messages.append({"role": "user", "content": new_msg}) | |
| return messages | |
| def guess_default_tab(query: str, mode: str) -> str: | |
| """Decide which UI tab should be first (Answer / Links / Images).""" | |
| q = query.lower() | |
| image_words = [ | |
| "image", "images", "photo", "photos", "picture", "pictures", | |
| "wallpaper", "logo", "flag", "screenshot", "pic" | |
| ] | |
| if any(w in q for w in image_words): | |
| return "images" | |
| if mode == "web": | |
| return "links" | |
| return "answer" | |
| def tavily_images_safe(query: str) -> List[Dict[str, str]]: | |
| """Safe wrapper around Tavily image search.""" | |
| try: | |
| return image_search.search(query, count=6) | |
| except Exception as e: | |
| print("Tavily image search error:", e) | |
| return [] | |
| def convert_links(results: List[Dict]) -> List[Dict[str, str]]: | |
| """Convert Tavily web search results to link objects.""" | |
| links = [] | |
| for r in results: | |
| url = r.get("url") | |
| if not url: | |
| continue | |
| links.append( | |
| { | |
| "title": r.get("title", "Result"), | |
| "url": url, | |
| "snippet": (r.get("content") or "")[:200], | |
| } | |
| ) | |
| return links | |
| # ======================================================= | |
| # Chat Endpoint | |
| # ======================================================= | |
| def chat(req: ChatRequest): | |
| q = req.message.strip() | |
| ws = req.workspace_id | |
| memory.add(ws, "user", q) | |
| # -------- Name memory special cases -------- | |
| extracted = name_tool.extract_name(q) | |
| if extracted: | |
| memory.set_name(ws, extracted) | |
| reply = f"Nice to meet you, {extracted}! Iβll remember your name." | |
| memory.add(ws, "assistant", reply) | |
| return ChatResponse(answer=reply, workspace_id=ws) | |
| if q.lower() in ["tell me my name", "what is my name"]: | |
| nm = memory.get_name(ws) | |
| ans = f"Your name is {nm} π" if nm else "You havenβt told me your name yet." | |
| memory.add(ws, "assistant", ans) | |
| return ChatResponse(answer=ans, workspace_id=ws) | |
| # -------- Routing -------- | |
| mode = router.route(q) | |
| default_tab = guess_default_tab(q, mode) | |
| answer = "" | |
| links: List[Dict[str, str]] = [] | |
| sources: List[Dict[str, str]] = [] | |
| follow: List[str] = [] | |
| # -------- LLM Mode (chat / creative / small talk) -------- | |
| if mode == "llm": | |
| msgs = build_context(ws, q) | |
| answer = llm.invoke(msgs).content | |
| follow = followup.generate(answer, q) | |
| # Optional small set of links | |
| try: | |
| res = search_tool.search(q, num_results=3) | |
| links = convert_links(res) | |
| except Exception as e: | |
| print("search error (llm mode):", e) | |
| # -------- Image Mode (image search queries) -------- | |
| elif mode == "image": | |
| # For image queries, provide brief context + focus on images tab | |
| try: | |
| res = search_tool.search(q, num_results=3) | |
| ctx = res[0].get("snippet", "") if res else "" | |
| answer = f"Here are images related to '{q}'." | |
| if ctx: | |
| answer += f"\n\n{ctx}" | |
| links = convert_links(res) | |
| except Exception as e: | |
| print("search error (image mode):", e) | |
| answer = f"Showing images for: {q}" | |
| follow = [] | |
| # -------- AGENTIC RAG Mode (files + web + images + knowledge) -------- | |
| elif mode == "rag": | |
| # PLANNER AGENT: Decide which agents to activate | |
| q_lower = q.lower() | |
| use_file_rag = any( | |
| w in q_lower for w in [ | |
| "summarize", "according to", "in this pdf", "in the document", | |
| "based on the file", "read my", "extract from", "uploaded", | |
| "this file", "the file", "my file", "from file" | |
| ] | |
| ) or len(q.split()) > 2 # multi-word questions likely need file RAG | |
| use_web = any( | |
| w in q_lower for w in [ | |
| "today", "latest", "current", "news", "stock", "price", | |
| "real-time", "weather", "who is", "what is", "where is", | |
| "when", "how much", "compare" | |
| ] | |
| ) | |
| use_images = any( | |
| w in q_lower for w in [ | |
| "image", "images", "logo", "flag", "photos", "look like", | |
| "picture", "show me", "wallpaper", "screenshot" | |
| ] | |
| ) | |
| # FILE AGENT: Retrieve from workspace uploaded docs | |
| ws_obj = file_manager.get_workspace(ws) | |
| file_chunks = [] | |
| if use_file_rag and ws_obj.initialized: | |
| file_chunks = ws_obj.retrieve(q, k=6) | |
| # REFERENCE AGENT: Retrieve from base vector store (demo docs) | |
| base_chunks = vector.retrieve(q, k=4) | |
| base_chunks = reranker.rerank(q, base_chunks, top_k=3) | |
| # WEB AGENT: Fetch live web content | |
| web_pages = [] | |
| web_results = [] | |
| if use_web: | |
| try: | |
| web_results = search_tool.search(q, num_results=4) | |
| for r in web_results: | |
| url = r.get("url") | |
| if not url: | |
| continue | |
| text = browse_tool.fetch_clean(url) | |
| if text: | |
| web_pages.append({ | |
| "title": r.get("title", ""), | |
| "url": url, | |
| "content": text[:1500] # Speed optimization | |
| }) | |
| except Exception as e: | |
| print(f"Web agent error: {e}") | |
| # IMAGE AGENT: Fetch relevant images | |
| images_result = tavily_images_safe(q) if use_images else [] | |
| # BUILD COMBINED CONTEXT | |
| contexts = [] | |
| if file_chunks: | |
| file_ctx = "\n\n".join(d.page_content for d in file_chunks) | |
| contexts.append(f"π FILE CONTEXT (from uploaded documents):\n{file_ctx}") | |
| if base_chunks: | |
| ref_ctx = "\n\n".join(d.page_content for d in base_chunks) | |
| contexts.append(f"π REFERENCE CONTEXT:\n{ref_ctx}") | |
| if web_pages: | |
| web_ctx = "\n\n".join(f"[{p['title']}]: {p['content']}" for p in web_pages) | |
| contexts.append(f"π WEB CONTEXT (live web data):\n{web_ctx}") | |
| full_context = "\n\n-----\n\n".join(contexts) if contexts else "No context available." | |
| # SYNTHESIZER AGENT: Generate final answer | |
| synth_prompt = f"""You are an AGENTIC RAG synthesis model like Perplexity AI. | |
| Combine information from FILE CONTEXT, REFERENCE CONTEXT and WEB CONTEXT. | |
| RULES: | |
| 1. PRIORITIZE info from FILE CONTEXT (user's uploaded documents) when available. | |
| 2. Use WEB CONTEXT to add current/live information. | |
| 3. Use REFERENCE CONTEXT for background knowledge. | |
| 4. Cite sources using [1], [2], etc. when referencing specific info. | |
| 5. If answering from a file, say "According to your uploaded document..." | |
| 6. Do NOT hallucinate - only use info from the provided contexts. | |
| 7. Be concise but comprehensive. | |
| AVAILABLE CONTEXT: | |
| {full_context} | |
| USER QUESTION: {q} | |
| FINAL ANSWER:""" | |
| msgs = build_context(ws, synth_prompt) | |
| answer = llm.invoke(msgs).content | |
| follow = followup.generate(answer, q) | |
| # BUILD SOURCES | |
| sources = [] | |
| if file_chunks: | |
| for d in file_chunks: | |
| sources.append({ | |
| "title": d.metadata.get("source", "π Uploaded File"), | |
| "url": d.metadata.get("file_path", "") | |
| }) | |
| if web_pages: | |
| for p in web_pages: | |
| sources.append({"title": p["title"], "url": p["url"]}) | |
| # BUILD LINKS | |
| links = convert_links(web_results) | |
| # Set images from image agent | |
| if images_result: | |
| # Will be set at the end with tavily_images_safe | |
| pass | |
| # -------- Web Mode (real-time / entities / news) -------- | |
| elif mode == "web": | |
| # Use WebSearchGraph for proper web search | |
| try: | |
| state = web_graph.run(q) | |
| answer = state.get("answer", "No answer generated.") | |
| sources = state.get("sources", []) | |
| links = state.get("links", []) | |
| follow = state.get("followups", []) | |
| except Exception as e: | |
| print(f"Web search error in chat: {e}") | |
| # Fallback to direct search | |
| res = search_tool.search(q, num_results=5) | |
| pages = [] | |
| for r in res: | |
| url = r.get("url") | |
| if not url: | |
| continue | |
| text = browse_tool.fetch_clean(url) | |
| if not text: | |
| continue | |
| pages.append( | |
| { | |
| "title": r.get("title", "Webpage"), | |
| "url": url, | |
| "content": text[:2000], | |
| } | |
| ) | |
| ctx = "\n\n".join(p["content"] for p in pages) | |
| prompt = ( | |
| "Use ONLY the following web content to answer. " | |
| "Cite sources using [1], [2], etc.\n\n" | |
| f"{ctx}\n\nQuestion: {q}" | |
| ) | |
| msgs = build_context(ws, prompt) | |
| answer = llm.invoke(msgs).content | |
| follow = followup.generate(answer, q) | |
| links = [ | |
| { | |
| "title": p["title"], | |
| "url": p["url"], | |
| "snippet": p["content"][:200], | |
| } | |
| for p in pages | |
| ] | |
| sources = [{"title": p["title"], "url": p["url"]} for p in pages] | |
| # -------- Fallback β LLM -------- | |
| else: | |
| msgs = build_context(ws, q) | |
| answer = llm.invoke(msgs).content | |
| follow = followup.generate(answer, q) | |
| # -------- Images (for Images tab) -------- | |
| images = tavily_images_safe(q) | |
| # Debug logging | |
| print(f"\n=== API Response Debug ===") | |
| print(f"Mode: {mode}") | |
| print(f"Links count: {len(links)}") | |
| print(f"Images count: {len(images)}") | |
| print(f"Sources count: {len(sources)}") | |
| if links: | |
| print(f"First link: {links[0]}") | |
| if images: | |
| print(f"First image: {images[0]}") | |
| print(f"========================\n") | |
| memory.add(ws, "assistant", answer) | |
| return ChatResponse( | |
| answer=answer, | |
| sources=sources, | |
| links=links, | |
| images=images, | |
| followups=follow, | |
| default_tab=default_tab, | |
| workspace_id=ws, | |
| ) | |
| # ======================================================= | |
| # Streaming Endpoint | |
| # ======================================================= | |
| def chat_stream(req: ChatRequest): | |
| q = req.message | |
| ws = req.workspace_id | |
| memory.add(ws, "user", q) | |
| msgs = build_context(ws, q) | |
| def generate(): | |
| full = "" | |
| for chunk in llm.stream(msgs): | |
| tok = getattr(chunk, "content", "") | |
| if tok: | |
| full += tok | |
| yield tok | |
| memory.add(ws, "assistant", full) | |
| return StreamingResponse(generate(), media_type="text/plain") | |
| # ======================================================= | |
| # Deep Research Endpoint | |
| # ======================================================= | |
| def deep_research(req: ChatRequest): | |
| q = req.message | |
| ws = req.workspace_id | |
| memory.add(ws, "user", q) | |
| try: | |
| state = deep_graph.run(q) | |
| answer = state.get("final_answer", "No answer generated.") | |
| sources = state.get("sources", []) | |
| # Get links from web_pages | |
| web_pages = state.get("web_pages", []) | |
| links = [{"title": p.get("title", ""), "url": p.get("url", ""), "snippet": p.get("content", "")[:200]} for p in web_pages] | |
| except Exception as e: | |
| print(f"Deep research error: {e}") | |
| answer = f"Deep research encountered an error. Please try again." | |
| sources = [] | |
| links = [] | |
| memory.add(ws, "assistant", answer) | |
| images = tavily_images_safe(q) | |
| follow = followup.generate(answer, q) | |
| return ChatResponse( | |
| answer=answer, | |
| sources=sources, | |
| links=links, | |
| images=images, | |
| followups=follow, | |
| default_tab="answer", | |
| workspace_id=ws, | |
| ) | |
| # ======================================================= | |
| # Knowledge Panel Endpoint | |
| # ======================================================= | |
| def get_knowledge_panel(q: str): | |
| """ | |
| Returns Wikipedia-style infobox + AI-generated facts. | |
| Used by UI to render a sidebar knowledge card. | |
| """ | |
| try: | |
| panel = knowledge_panel.build_panel(q) | |
| return panel | |
| except Exception as e: | |
| print("Knowledge panel error:", e) | |
| return {"wiki": {}, "facts": []} | |
| # ======================================================= | |
| # FILE UPLOAD (PDF / TXT / PPTX) - Perplexity Spaces Feature | |
| # ======================================================= | |
| async def upload_docs( | |
| workspace_id: str = Form("default"), | |
| files: List[UploadFile] = File(...) | |
| ): | |
| """ | |
| Upload one or more documents and index them for this workspace. | |
| Supports PDF, TXT, MD, PPT, PPTX files. | |
| """ | |
| ws = file_manager.get_workspace(workspace_id) | |
| saved_paths = [] | |
| for f in files: | |
| ext = Path(f.filename).suffix.lower() | |
| if ext not in [".pdf", ".txt", ".md", ".ppt", ".pptx"]: | |
| continue # skip unsupported types | |
| dest = Path(ws.base_dir) / f.filename | |
| with open(dest, "wb") as out: | |
| content = await f.read() | |
| out.write(content) | |
| saved_paths.append(dest) | |
| if saved_paths: | |
| ws.add_files(saved_paths) | |
| print(f"β Indexed {len(saved_paths)} files for workspace '{workspace_id}'") | |
| return { | |
| "workspace_id": workspace_id, | |
| "files": ws.files, | |
| "count": len(ws.files), | |
| "message": f"Successfully indexed {len(saved_paths)} files" | |
| } | |
| def get_workspace_files(workspace_id: str): | |
| """Get list of files uploaded to a workspace.""" | |
| ws = file_manager.get_workspace(workspace_id) | |
| return { | |
| "workspace_id": workspace_id, | |
| "files": ws.files, | |
| "initialized": ws.initialized | |
| } | |
| def clear_workspace(workspace_id: str): | |
| """Clear all files from a workspace.""" | |
| file_manager.clear_workspace(workspace_id) | |
| return {"message": f"Workspace '{workspace_id}' cleared"} | |
| # ======================================================= | |
| # MODE-SPECIFIC ENDPOINTS | |
| # ======================================================= | |
| class ModeRequest(BaseModel): | |
| message: str | |
| workspace_id: str = "default" | |
| mode: str = "auto" | |
| def focus_mode(req: ModeRequest): | |
| """Focus mode - concise, direct answers without web search.""" | |
| q = req.message.strip() | |
| ws = req.workspace_id | |
| memory.add(ws, "user", q) | |
| prompt = f"""You are in FOCUS mode. Provide a concise, direct answer. | |
| - No unnecessary elaboration | |
| - Get straight to the point | |
| - Use bullet points if helpful | |
| - Be accurate and helpful | |
| Question: {q} | |
| Answer:""" | |
| msgs = build_context(ws, prompt) | |
| answer = llm.invoke(msgs).content | |
| follow = followup.generate(answer, q) | |
| memory.add(ws, "assistant", answer) | |
| return ChatResponse( | |
| answer=answer, | |
| sources=[], | |
| links=[], | |
| images=[], | |
| followups=follow, | |
| default_tab="answer", | |
| workspace_id=ws | |
| ) | |
| def writing_mode(req: ModeRequest): | |
| """Writing mode - creative writing, essays, content generation.""" | |
| q = req.message.strip() | |
| ws = req.workspace_id | |
| memory.add(ws, "user", q) | |
| prompt = f"""You are in WRITING mode - a creative writing assistant. | |
| Help with: | |
| - Essays, articles, blog posts | |
| - Creative writing, stories | |
| - Professional emails and documents | |
| - Content improvement and editing | |
| - Grammar and style suggestions | |
| Be creative, engaging, and helpful. Format your response well. | |
| Request: {q} | |
| Response:""" | |
| msgs = build_context(ws, prompt) | |
| answer = llm.invoke(msgs).content | |
| follow = followup.generate(answer, q) | |
| memory.add(ws, "assistant", answer) | |
| return ChatResponse( | |
| answer=answer, | |
| sources=[], | |
| links=[], | |
| images=[], | |
| followups=follow, | |
| default_tab="answer", | |
| workspace_id=ws | |
| ) | |
| def math_mode(req: ModeRequest): | |
| """Math mode - mathematical calculations and explanations.""" | |
| q = req.message.strip() | |
| ws = req.workspace_id | |
| memory.add(ws, "user", q) | |
| prompt = f"""You are in MATH mode - a mathematical assistant. | |
| - Solve mathematical problems step by step | |
| - Show all work and calculations | |
| - Explain the reasoning | |
| - Use proper mathematical notation | |
| - Handle algebra, calculus, statistics, geometry, etc. | |
| Problem: {q} | |
| Solution:""" | |
| msgs = build_context(ws, prompt) | |
| answer = llm.invoke(msgs).content | |
| follow = followup.generate(answer, q) | |
| memory.add(ws, "assistant", answer) | |
| return ChatResponse( | |
| answer=answer, | |
| sources=[], | |
| links=[], | |
| images=[], | |
| followups=follow, | |
| default_tab="answer", | |
| workspace_id=ws | |
| ) | |
| def code_mode(req: ModeRequest): | |
| """Code mode - programming help and code generation.""" | |
| q = req.message.strip() | |
| ws = req.workspace_id | |
| memory.add(ws, "user", q) | |
| prompt = f"""You are in CODE mode - an expert programming assistant. | |
| - Write clean, efficient, well-commented code | |
| - Explain the code logic | |
| - Follow best practices | |
| - Handle any programming language | |
| - Debug and fix code issues | |
| - Suggest improvements | |
| Request: {q} | |
| Response:""" | |
| msgs = build_context(ws, prompt) | |
| answer = llm.invoke(msgs).content | |
| follow = followup.generate(answer, q) | |
| memory.add(ws, "assistant", answer) | |
| return ChatResponse( | |
| answer=answer, | |
| sources=[], | |
| links=[], | |
| images=[], | |
| followups=follow, | |
| default_tab="answer", | |
| workspace_id=ws | |
| ) | |
| def analyze_mode(req: ModeRequest): | |
| """ | |
| Analysis mode - deep analysis with web research. | |
| Production-level LangGraph implementation. | |
| """ | |
| q = req.message.strip() | |
| ws = req.workspace_id | |
| memory.add(ws, "user", q) | |
| try: | |
| # Run the AnalysisGraph pipeline | |
| state = analysis_graph.run(q) | |
| answer = state.get("answer", "No analysis generated.") | |
| sources = state.get("sources", []) | |
| links = state.get("links", []) | |
| follow = state.get("followups", []) | |
| except Exception as e: | |
| print(f"Analysis error: {e}") | |
| answer = f"Analysis encountered an error: {str(e)[:100]}" | |
| sources = [] | |
| links = [] | |
| follow = [] | |
| # Get related images | |
| images = tavily_images_safe(q) | |
| memory.add(ws, "assistant", answer) | |
| return ChatResponse( | |
| answer=answer, | |
| sources=sources, | |
| links=links, | |
| images=images, | |
| followups=follow, | |
| default_tab="answer", | |
| workspace_id=ws | |
| ) | |
| def summarize_mode(req: ModeRequest): | |
| """ | |
| Summarize mode - summarize uploaded documents OR web content. | |
| Prioritizes uploaded files, then falls back to web search. | |
| """ | |
| q = req.message.strip() | |
| ws = req.workspace_id | |
| memory.add(ws, "user", q) | |
| # STEP 1: Check for uploaded files first | |
| ws_obj = file_manager.get_workspace(ws) | |
| if ws_obj.initialized and ws_obj.files: | |
| # Summarize from uploaded files | |
| print(f"π SUMMARIZE MODE: Using uploaded files") | |
| try: | |
| # Retrieve relevant chunks from files | |
| chunks = ws_obj.retrieve(q, k=10) | |
| if chunks: | |
| # Combine chunk content for summarization | |
| content = "\n\n".join([c.page_content for c in chunks]) | |
| # Generate summary | |
| summary = summarizer.summarize(content, max_words=400) | |
| # Build sources from files | |
| seen_files = set() | |
| sources = [] | |
| for c in chunks: | |
| fname = c.metadata.get("source", "Document") | |
| if fname not in seen_files: | |
| sources.append({"title": f"π {fname}", "url": ""}) | |
| seen_files.add(fname) | |
| follow = followup.generate(summary, q) | |
| memory.add(ws, "assistant", summary) | |
| return ChatResponse( | |
| answer=summary, | |
| sources=sources, | |
| links=[], | |
| images=[], | |
| followups=follow, | |
| default_tab="answer", | |
| workspace_id=ws | |
| ) | |
| except Exception as e: | |
| print(f" β File summarize error: {e}") | |
| # STEP 2: Check if it's a URL | |
| if q.startswith("http"): | |
| print(f"π SUMMARIZE MODE: URL detected") | |
| try: | |
| content = browse_tool.fetch_clean(q) | |
| if content: | |
| summary = summarizer.summarize(content, max_words=400) | |
| sources = [{"title": "Source URL", "url": q}] | |
| links = [{"title": "Source", "url": q, "snippet": content[:200]}] | |
| follow = followup.generate(summary, q) | |
| memory.add(ws, "assistant", summary) | |
| return ChatResponse( | |
| answer=summary, | |
| sources=sources, | |
| links=links, | |
| images=[], | |
| followups=follow, | |
| default_tab="answer", | |
| workspace_id=ws | |
| ) | |
| except Exception as e: | |
| print(f" β URL fetch error: {e}") | |
| # STEP 3: Fall back to web search and summarize | |
| print(f"π SUMMARIZE MODE: Web search fallback") | |
| try: | |
| results = search_tool.search(q, num_results=3) | |
| content_parts = [] | |
| links = [] | |
| for r in results: | |
| url = r.get("url", "") | |
| title = r.get("title", "") | |
| text = browse_tool.fetch_clean(url) | |
| if text: | |
| content_parts.append(text[:1500]) | |
| links.append({"title": title, "url": url, "snippet": text[:150]}) | |
| if content_parts: | |
| combined = "\n\n".join(content_parts) | |
| summary = summarizer.summarize(combined, max_words=400) | |
| else: | |
| summary = "Could not find content to summarize." | |
| sources = [{"title": l["title"], "url": l["url"]} for l in links] | |
| follow = followup.generate(summary, q) | |
| memory.add(ws, "assistant", summary) | |
| return ChatResponse( | |
| answer=summary, | |
| sources=sources, | |
| links=links, | |
| images=[], | |
| followups=follow, | |
| default_tab="answer", | |
| workspace_id=ws | |
| ) | |
| except Exception as e: | |
| print(f" β Summarize error: {e}") | |
| return ChatResponse( | |
| answer=f"Error generating summary: {str(e)}", | |
| sources=[], | |
| links=[], | |
| images=[], | |
| followups=[], | |
| default_tab="answer", | |
| workspace_id=ws | |
| ) | |
| # ======================================================= | |
| # PRODUCTION-LEVEL MODE ENDPOINTS | |
| # ======================================================= | |
| def web_search_mode(req: ModeRequest): | |
| """ | |
| Web Search Mode - Real-time web search with source citations. | |
| Production-level LangGraph implementation. | |
| """ | |
| q = req.message.strip() | |
| ws = req.workspace_id | |
| memory.add(ws, "user", q) | |
| try: | |
| # Run the WebSearchGraph pipeline | |
| state = web_graph.run(q) | |
| answer = state.get("answer", "No answer generated.") | |
| sources = state.get("sources", []) | |
| links = state.get("links", []) | |
| follow = state.get("followups", []) | |
| except Exception as e: | |
| print(f"Web search error: {e}") | |
| answer = f"Web search encountered an error: {str(e)[:100]}" | |
| sources = [] | |
| links = [] | |
| follow = [] | |
| # Get images separately | |
| images = tavily_images_safe(q) | |
| memory.add(ws, "assistant", answer) | |
| return ChatResponse( | |
| answer=answer, | |
| sources=sources, | |
| links=links, | |
| images=images, | |
| followups=follow, | |
| default_tab="answer", | |
| workspace_id=ws | |
| ) | |
| def rag_mode(req: ModeRequest): | |
| """ | |
| RAG Mode - Search uploaded documents only. | |
| Production-level LangGraph implementation. | |
| """ | |
| q = req.message.strip() | |
| ws = req.workspace_id | |
| memory.add(ws, "user", q) | |
| try: | |
| # Run the RAGOnlyGraph pipeline | |
| state = rag_graph.run(q, ws) | |
| answer = state.get("answer", "No answer generated.") | |
| sources = state.get("sources", []) | |
| follow = state.get("followups", []) | |
| except Exception as e: | |
| print(f"RAG error: {e}") | |
| answer = f"RAG mode encountered an error: {str(e)[:100]}" | |
| sources = [] | |
| follow = [] | |
| memory.add(ws, "assistant", answer) | |
| return ChatResponse( | |
| answer=answer, | |
| sources=sources, | |
| links=[], | |
| images=[], | |
| followups=follow, | |
| default_tab="answer", | |
| workspace_id=ws | |
| ) | |
| def agentic_mode(req: ModeRequest): | |
| """ | |
| Agentic Mode - Multi-agent RAG with Planner, File, Web, Knowledge, Image, Synthesizer. | |
| Production-level LangGraph implementation. | |
| """ | |
| q = req.message.strip() | |
| ws = req.workspace_id | |
| memory.add(ws, "user", q) | |
| print(f"\nπ€ AGENTIC MODE (LangGraph): {q}") | |
| try: | |
| # Run the AgenticRAGGraph pipeline | |
| state = agentic_graph.run(q, ws) | |
| answer = state.get("answer", "No answer generated.") | |
| sources = state.get("sources", []) | |
| links = state.get("links", []) | |
| images = state.get("images", []) | |
| follow = state.get("followups", []) | |
| except Exception as e: | |
| print(f"Agentic error: {e}") | |
| answer = f"Agentic mode encountered an error: {str(e)[:100]}" | |
| sources = [] | |
| links = [] | |
| images = [] | |
| follow = [] | |
| # Always get images if none from agentic | |
| if not images: | |
| images = tavily_images_safe(q) | |
| memory.add(ws, "assistant", answer) | |
| print(f" β AgenticGraph: Completed with {len(sources)} sources, {len(images)} images") | |
| return ChatResponse( | |
| answer=answer, | |
| sources=sources, | |
| links=links, | |
| images=images, | |
| followups=follow, | |
| default_tab="answer", | |
| workspace_id=ws | |
| ) | |
| # ======================================================= | |
| # PRODUCT MVP ENDPOINT - Generates MVP Blueprints | |
| # ======================================================= | |
| class ProductMVPRequest(BaseModel): | |
| message: str | |
| workspace_id: str = "default" | |
| mode: str = "product_mvp" | |
| def product_mvp_mode(req: ProductMVPRequest): | |
| """ | |
| Product MVP Mode - Generates comprehensive MVP blueprints from product ideas. | |
| Includes product name, pitch, target users, features, architecture, tech stack, and more. | |
| """ | |
| q = req.message.strip() | |
| ws = req.workspace_id | |
| memory.add(ws, "user", q) | |
| print(f"\nπ PRODUCT MVP MODE: {q}") | |
| # Research similar products and market | |
| market_research = "" | |
| try: | |
| results = search_tool.search(f"{q} startup MVP product", num_results=3) | |
| if results: | |
| for r in results: | |
| url = r.get("url", "") | |
| text = browse_tool.fetch_clean(url) | |
| if text: | |
| market_research += text[:800] + "\n\n" | |
| except Exception as e: | |
| print(f"Market research error: {e}") | |
| prompt = f"""You are a PRODUCT BUILDER AI that creates comprehensive MVP blueprints. | |
| The user wants to build: {q} | |
| {f"MARKET RESEARCH (use for context):{chr(10)}{market_research}" if market_research else ""} | |
| Generate a COMPLETE MVP Blueprint with the following sections. Use markdown formatting with tables where appropriate: | |
| # π MVP Blueprint β [Product Name] | |
| A one-line description of the product. | |
| ## 1. Product Name | |
| Create a catchy, memorable product name. | |
| ## 2. OneβLine Pitch | |
| A compelling pitch in quotes that explains the value proposition. | |
| ## 3. Target Users | |
| Create a markdown table with columns: Persona | Age | Occupation | Goals | Pain Points | How [Product] Helps | |
| Include 4-5 different user personas. | |
| ## 4. Problems to Solve | |
| List 5 key problems the product solves with bullet points. | |
| ## 5. MVP Features | |
| Create a table with: Feature | Description | Priority (Must-have/Nice-to-have) | |
| Include 8-10 features. | |
| ## 6. User Journey (StepβbyβStep) | |
| Number each step of the user journey from landing to retention. | |
| ## 7. System Architecture | |
| Create an ASCII diagram showing the system components and their connections. | |
| Include: Frontend, Backend, Database, APIs, Third-party services. | |
| ## 8. Database Tables | |
| Create a table showing the main database tables with: Table | Columns | Notes | |
| ## 9. API Endpoints (REST) | |
| Create a table with: Method | Endpoint | Description | Auth Required | |
| ## 10. Tech Stack | |
| Create a table with: Layer | Technology | Reason | |
| Cover: Frontend, Backend, Auth, Database, Cache, Storage, Hosting, CI/CD, Monitoring | |
| ## 11. Future Features (PostβMVP) | |
| List 8 features for after MVP launch. | |
| ## Next Steps | |
| List 5 actionable next steps to start building. | |
| End with: **Happy building! π** | |
| Be detailed, practical, and use real-world best practices. Make it production-ready.""" | |
| msgs = build_context(ws, prompt) | |
| answer = llm.invoke(msgs).content | |
| # Generate follow-up questions | |
| follow = [ | |
| "Generate wireframes for core screens", | |
| "Create a development timeline", | |
| "Estimate the MVP budget", | |
| "Design the database schema in detail", | |
| "Write user stories for MVP features" | |
| ] | |
| memory.add(ws, "assistant", answer) | |
| print(f" β Product MVP: Blueprint generated") | |
| return ChatResponse( | |
| answer=answer, | |
| sources=[], | |
| links=[], | |
| images=[], | |
| followups=follow, | |
| default_tab="answer", | |
| workspace_id=ws | |
| ) | |
| # ======================================================= | |
| # VIDEO BRAIN ENDPOINT - YouTube Video Analysis with Transcript + Web Fallback | |
| # ======================================================= | |
| from tools.youtube_tool import YouTubeTool | |
| youtube_tool = YouTubeTool() | |
| # Store video transcripts in memory per workspace | |
| video_transcripts = {} | |
| class VideoBrainRequest(BaseModel): | |
| message: str | |
| workspace_id: str = "default" | |
| mode: str = "video_brain" | |
| youtube_url: str = "" | |
| transcript: str = "" # User can paste transcript directly | |
| def video_brain_mode(req: VideoBrainRequest): | |
| """ | |
| Video Brain Mode - Analyzes YouTube videos. | |
| Uses user-pasted transcript (priority), then tries auto-extraction, then web fallback. | |
| """ | |
| q = req.message.strip() | |
| ws = req.workspace_id | |
| youtube_url = req.youtube_url | |
| user_transcript = req.transcript.strip() if req.transcript else "" | |
| memory.add(ws, "user", q) | |
| print(f"\nπ₯ VIDEO BRAIN MODE: {q}") | |
| print(f" πΊ YouTube URL: {youtube_url}") | |
| print(f" π User transcript provided: {len(user_transcript)} chars") | |
| # Allow transcript-only mode (no URL required if user pastes transcript) | |
| if not youtube_url and not user_transcript: | |
| return ChatResponse( | |
| answer="β οΈ Please provide a YouTube URL or paste the transcript directly.", | |
| sources=[], | |
| links=[], | |
| images=[], | |
| followups=[], | |
| default_tab="answer", | |
| workspace_id=ws | |
| ) | |
| # Handle transcript-only mode | |
| if youtube_url == "user_provided_transcript" or not youtube_url: | |
| youtube_url = "user_provided_transcript" | |
| video_id = "user_transcript" | |
| else: | |
| video_id = youtube_tool.extract_video_id(youtube_url) | |
| cache_key = f"{ws}_{video_id}" | |
| # PRIORITY 1: User-pasted transcript | |
| transcript_text = "" | |
| transcript_data = None | |
| if user_transcript: | |
| transcript_text = user_transcript[:15000] # Allow longer user transcripts | |
| print(f" β Using user-pasted transcript: {len(transcript_text)} chars") | |
| # Cache it for follow-up questions | |
| video_transcripts[cache_key] = {"success": True, "transcript": transcript_text, "user_provided": True} | |
| # PRIORITY 2: Check cache | |
| if not transcript_text and cache_key in video_transcripts: | |
| transcript_data = video_transcripts[cache_key] | |
| if transcript_data.get("success"): | |
| transcript_text = transcript_data.get("transcript", "")[:8000] | |
| print(f" π Using cached transcript") | |
| # PRIORITY 3: Try auto-extraction | |
| if not transcript_text: | |
| print(f" π Attempting auto-extraction for video: {video_id}") | |
| transcript_data = youtube_tool.get_transcript(youtube_url) | |
| if transcript_data.get("success"): | |
| video_transcripts[cache_key] = transcript_data | |
| transcript_text = transcript_data.get("transcript", "")[:8000] | |
| print(f" β Auto-extracted transcript: {len(transcript_text)} chars") | |
| else: | |
| print(f" β οΈ Auto-extraction failed: {transcript_data.get('error')}") | |
| # If no transcript, use web search fallback | |
| video_context = "" | |
| sources = [] | |
| links = [] | |
| if not transcript_text: | |
| print(f" π Using web search fallback...") | |
| try: | |
| # Search for video info and summaries | |
| if search_tool: | |
| search_queries = [ | |
| f"youtube video {video_id} summary transcript", | |
| f"youtube {video_id} key points explained" | |
| ] | |
| for sq in search_queries[:1]: # Just one search to save time | |
| results = search_tool.search(sq, num_results=4) | |
| # Get Tavily AI answer | |
| if results and results[0].get("tavily_answer"): | |
| video_context += f"[Video Summary]: {results[0]['tavily_answer']}\n\n" | |
| for r in results: | |
| url = r.get("url", "") | |
| title = r.get("title", "") | |
| content = r.get("content", "") | |
| if content: | |
| video_context += f"[{title}]: {content[:1000]}\n\n" | |
| links.append({"title": title, "url": url, "snippet": content[:150]}) | |
| sources.append({"title": title, "url": url}) | |
| print(f" π Web fallback gathered: {len(video_context)} chars, {len(sources)} sources") | |
| except Exception as e: | |
| print(f" β Web search fallback error: {e}") | |
| # Build prompt | |
| q_lower = q.lower() | |
| is_summary = any(word in q_lower for word in ["summarize", "summary", "overview", "main points", "key takeaways", "what is this about"]) | |
| if transcript_text: | |
| # Have real transcript | |
| if is_summary: | |
| prompt = f"""You are VIDEO BRAIN AI - an expert at analyzing YouTube videos. | |
| VIDEO TRANSCRIPT (with timestamps [MM:SS]): | |
| {transcript_text} | |
| USER REQUEST: {q} | |
| Provide a comprehensive summary with: | |
| 1. **Overview**: One paragraph describing what the video is about | |
| 2. **Key Points**: 5-7 main takeaways with timestamps | |
| 3. **Important Details**: Any specific facts, figures, or examples mentioned | |
| 4. **Actionable Insights**: What viewers should do or remember | |
| Use the actual content from the transcript. Reference timestamps like [5:30] when citing specific parts.""" | |
| else: | |
| prompt = f"""You are VIDEO BRAIN AI - an expert at analyzing YouTube videos. | |
| VIDEO TRANSCRIPT (with timestamps [MM:SS]): | |
| {transcript_text} | |
| USER QUESTION: {q} | |
| Answer the question using ONLY the information from the transcript above. | |
| - Be specific and cite timestamps when relevant | |
| - If the answer is not in the transcript, say so honestly | |
| - Format your response clearly with bullet points if appropriate""" | |
| sources = [{"title": "π₯ YouTube Video (Transcript)", "url": youtube_url}] | |
| links = [{"title": "Source Video", "url": youtube_url, "snippet": f"Video ID: {video_id} - Full transcript available"}] | |
| elif video_context: | |
| # Have web search fallback context | |
| prompt = f"""You are VIDEO BRAIN AI. I couldn't get the direct transcript, but found related information about this video. | |
| VIDEO URL: {youtube_url} | |
| VIDEO ID: {video_id} | |
| AVAILABLE INFORMATION FROM WEB: | |
| {video_context[:6000]} | |
| USER QUESTION: {q} | |
| Based on the available information: | |
| 1. Answer the user's question as best as you can | |
| 2. Be clear that this is based on web search results, not the actual transcript | |
| 3. If summarizing, provide the key points found | |
| 4. Suggest the user can paste the transcript directly for more accurate analysis""" | |
| else: | |
| # No information available | |
| error_msg = transcript_data.get("error", "Unknown error") if transcript_data else "Could not fetch transcript" | |
| prompt = f"""I couldn't analyze the YouTube video. | |
| Video URL: {youtube_url} | |
| Error: {error_msg} | |
| User Question: {q} | |
| Please explain: | |
| 1. Why the transcript couldn't be fetched (network/DNS issues on this server) | |
| 2. Alternative: The user can: | |
| - Open YouTube, click "..." under the video, select "Show transcript" | |
| - Copy and paste the transcript text here | |
| - I can then analyze it accurately | |
| 3. Or they can try a different video""" | |
| try: | |
| msgs = build_context(ws, prompt) | |
| answer = llm.invoke(msgs).content | |
| except Exception as e: | |
| print(f" β LLM error: {e}") | |
| answer = f"Error generating response: {str(e)[:100]}" | |
| # Follow-up questions | |
| if transcript_text or video_context: | |
| follow = [ | |
| "What are the main arguments or points made?", | |
| "Summarize this in 3 bullet points", | |
| "What examples or case studies are mentioned?", | |
| "What should I learn from this video?", | |
| "Explain the most complex concept in simple terms" | |
| ] | |
| else: | |
| follow = [ | |
| "Paste the transcript text here", | |
| "Try a different YouTube video", | |
| "How do I get a YouTube transcript?" | |
| ] | |
| # Add video source if not already added | |
| if not sources: | |
| sources = [{"title": "π₯ YouTube Video", "url": youtube_url}] | |
| if not links: | |
| links = [{"title": "Source Video", "url": youtube_url, "snippet": f"Video ID: {video_id}"}] | |
| memory.add(ws, "assistant", answer) | |
| print(f" β Video Brain: Response generated") | |
| return ChatResponse( | |
| answer=answer, | |
| sources=sources, | |
| links=links, | |
| images=[], | |
| followups=follow, | |
| default_tab="answer", | |
| workspace_id=ws | |
| ) |