| | import os |
| | import json |
| | from dotenv import load_dotenv |
| | from tavily import TavilyClient |
| | from cerebras.cloud.sdk import Cerebras |
| |
|
| | load_dotenv() |
| |
|
| | |
| |
|
| | class WebSearchTool: |
| | """Search the web using Tavily""" |
| | |
| | def __init__(self, api_key: str): |
| | self.client = TavilyClient(api_key=api_key) |
| | |
| | def search(self, query: str, max_results: int = 5) -> str: |
| | """Search and return formatted results""" |
| | try: |
| | response = self.client.search( |
| | query=query, |
| | search_depth="advanced", |
| | max_results=max_results, |
| | include_answer=True |
| | ) |
| | |
| | |
| | output = [] |
| | |
| | if response.get("answer"): |
| | output.append(f"Quick Answer: {response['answer']}\n") |
| | |
| | output.append("Search Results:") |
| | for i, result in enumerate(response.get("results", []), 1): |
| | output.append(f"\n{i}. {result['title']}") |
| | output.append(f" URL: {result['url']}") |
| | output.append(f" {result['content'][:300]}...") |
| | |
| | return "\n".join(output) |
| | |
| | except Exception as e: |
| | return f"Search error: {str(e)}" |
| |
|
| | class FileReaderTool: |
| | """Read various file formats""" |
| | |
| | def read(self, file_path: str) -> str: |
| | """Read file and return content as text""" |
| | if not os.path.exists(file_path): |
| | return f"Error: File not found at {file_path}" |
| | |
| | ext = os.path.splitext(file_path)[1].lower() |
| | |
| | try: |
| | |
| | if ext == '.docx': |
| | try: |
| | from docx import Document |
| | doc = Document(file_path) |
| | text = [para.text for para in doc.paragraphs if para.text.strip()] |
| | for table in doc.tables: |
| | for row in table.rows: |
| | cells = [cell.text.strip() for cell in row.cells] |
| | text.append(" | ".join(cells)) |
| | return "\n".join(text) |
| | except ImportError: |
| | return "Error: python-docx not installed." |
| |
|
| | |
| | elif ext == '.pdf': |
| | try: |
| | import pdfplumber |
| | with pdfplumber.open(file_path) as pdf: |
| | text = [page.extract_text() for page in pdf.pages if page.extract_text()] |
| | return "\n".join(text) |
| | except ImportError: |
| | return "Error: pdfplumber not installed." |
| | |
| | |
| | elif ext in ['.xlsx', '.xls', '.csv']: |
| | try: |
| | import pandas as pd |
| | if ext == '.csv': |
| | df = pd.read_csv(file_path) |
| | else: |
| | df = pd.read_excel(file_path) |
| | return df.to_string() |
| | except ImportError: |
| | return "Error: pandas or openpyxl not installed." |
| | |
| | |
| | elif ext in ['.txt', '.md', '.json']: |
| | with open(file_path, 'r', encoding='utf-8') as f: |
| | return f.read() |
| | |
| | else: |
| | return f"Unsupported file type: {ext}" |
| | |
| | except Exception as e: |
| | return f"Error reading file: {str(e)}" |
| |
|
| | class ImageAnalysisTool: |
| | """Analyze images using OCR or vision models""" |
| | |
| | def analyze(self, image_path: str, question: str = "Describe this image") -> str: |
| | if not os.path.exists(image_path): |
| | return f"Error: Image not found at {image_path}" |
| | |
| | try: |
| | |
| | import pytesseract |
| | from PIL import Image |
| | |
| | img = Image.open(image_path) |
| | text = pytesseract.image_to_string(img) |
| | |
| | if text.strip(): |
| | return f"Text extracted from image:\n{text}" |
| | else: |
| | return "No text found in image (OCR returned empty)" |
| | |
| | except ImportError: |
| | return "Error: pytesseract or Pillow not installed." |
| | except Exception as e: |
| | return f"Error analyzing image: {str(e)}" |
| |
|
| | |
| |
|
| | class BasicAgent: |
| | """ |
| | Renamed from SimpleResearchAgent to match app.py requirements. |
| | """ |
| | |
| | def __init__(self): |
| | print("--- Initializing BasicAgent ---") |
| | |
| | |
| | self.hf_token = os.getenv("HF_TOKEN") |
| | self.cerebras_key = os.getenv("CEREBRAS_API_KEY") |
| | self.tavily_key = os.getenv("TAVILY_API_KEY") |
| | |
| | if not self.cerebras_key or not self.tavily_key: |
| | raise ValueError("❌ Missing API Keys. Please check Space Settings.") |
| |
|
| | |
| | self.llm = Cerebras(api_key=self.cerebras_key) |
| | self.model = "gpt-oss-120b" |
| | |
| | |
| | self.web_search = WebSearchTool(self.tavily_key) |
| | self.file_reader = FileReaderTool() |
| | self.image_analyzer = ImageAnalysisTool() |
| | |
| | print("✅ BasicAgent initialized successfully.") |
| | |
| | def _call_llm(self, messages: list, temperature: float = 0.0) -> str: |
| | """Call LLM and return response""" |
| | try: |
| | response = self.llm.chat.completions.create( |
| | model=self.model, |
| | messages=messages, |
| | temperature=temperature, |
| | max_tokens=200 |
| | ) |
| | content = response.choices[0].message.content |
| | return content.strip() if content else "Error: Empty response." |
| | except Exception as e: |
| | return f"LLM Error: {str(e)}" |
| | |
| | def answer(self, question: str, mode="context") -> str: |
| | """ |
| | Main method called by app.py. |
| | Note: app.py only passes 'question', not 'file_path'. |
| | """ |
| | print(f"Processing: {question[:50]}...") |
| |
|
| | |
| | is_logic = any(keyword in question.lower() for keyword in [ |
| | 'opposite', 'backwards', 'reversed', 'if you understand', 'python code' |
| | ]) |
| | |
| | context_parts = [] |
| | |
| | |
| | if not is_logic: |
| | |
| | search_results = self.web_search.search(question) |
| | context_parts.append(f"Web Search Results:\n{search_results}") |
| | else: |
| | context_parts.append("Logic/Reasoning Task (No Search Performed)") |
| |
|
| | context = "\n\n".join(context_parts) |
| | |
| | |
| | |
| | messages = [ |
| | { |
| | "role": "system", |
| | "content": ( |
| | "You are a precise data extraction engine. " |
| | "Answer with ONLY the exact value requested. " |
| | "No explanations, no preambles, no conversational filler. " |
| | "Examples: '42', 'John Smith', 'Paris', 'right'. " |
| | ) |
| | }, |
| | { |
| | "role": "user", |
| | "content": f"Context:\n{context}\n\nQuestion: {question}\n\nExact Answer:" |
| | } |
| | ] |
| | |
| | return self._call_llm(messages) |
| |
|
| | def __call__(self, question: str) -> str: |
| | return self.answer(question) |
| |
|
| | |
| | if __name__ == "__main__": |
| | agent = BasicAgent() |
| | print(agent("What is the capital of France?")) |