Spaces:
Runtime error
Runtime error
| import os | |
| import math | |
| import asyncio | |
| import subprocess | |
| import requests | |
| from io import BytesIO | |
| from bs4 import BeautifulSoup | |
| from pydantic import Field | |
| # ----- LlamaIndex & LangChain Imports ----- | |
| from llama_index.core.llms import ChatMessage, LLMMetadata, LLM, CompletionResponse | |
| from llama_index.core.agent import ReActAgent | |
| from llama_index.core.callbacks.llama_debug import LlamaDebugHandler | |
| from llama_index.core.tools import FunctionTool | |
| from llama_index.llms.huggingface import HuggingFaceInferenceAPI | |
| from langchain_community.retrievers import TavilySearchAPIRetriever | |
| # ---------- BASIC SETUP ---------- | |
| HEADERS = {"User-Agent": "Mozilla/5.0"} | |
| def check_required_keys() -> None: | |
| missing = [k for k in ("TAVILY_API_KEY", "HUGGINGFACE_TOKEN") if not os.getenv(k)] | |
| if missing: | |
| print(f"⚠️ WARNING: Missing API keys: {', '.join(missing)}") | |
| else: | |
| print("✅ All required API keys are present.") | |
| check_required_keys() | |
| # Monkey-patch requerido por LlamaIndex | |
| ChatMessage.message = property(lambda self: self) | |
| # ---------- HUGGING FACE LLM WRAPPER (Command R+) ---------- | |
| class HuggingFaceLLM(LLM): | |
| """Wrapper para la API de Inferencia de Hugging Face, optimizado para Command R+.""" | |
| model_name: str = Field(default="CohereForAI/c4ai-command-r-plus") | |
| temperature: float = Field(default=0.01) | |
| max_new_tokens: int = Field(default=2048) # Aumentado para respuestas más largas | |
| _client: HuggingFaceInferenceAPI = None | |
| class Config: | |
| extra = "allow" | |
| def __init__(self, **kwargs): | |
| super().__init__(**kwargs) | |
| api_key = os.getenv("HUGGINGFACE_TOKEN") | |
| if not api_key: | |
| raise ValueError("HUGGINGFACE_TOKEN no configurado en los secrets del Space") | |
| self._client = HuggingFaceInferenceAPI(model_name=self.model_name, token=api_key) | |
| if self.callback_manager is None: | |
| from llama_index.core.callbacks.base import CallbackManager | |
| self.callback_manager = CallbackManager([]) | |
| if not self.callback_manager.handlers: | |
| self.callback_manager.add_handler(LlamaDebugHandler()) | |
| def metadata(self) -> LLMMetadata: | |
| return LLMMetadata( | |
| context_window=128000, | |
| num_output=self.max_new_tokens, | |
| is_chat_model=True, | |
| is_function_calling_model=True, | |
| model_name=self.model_name, | |
| ) | |
| def chat(self, messages: list[ChatMessage], **kwargs) -> ChatMessage: | |
| prompt = self._client.tokenizer.apply_chat_template( | |
| [{"role": msg.role.value, "content": msg.content} for msg in messages], | |
| tokenize=False, add_generation_prompt=True | |
| ) | |
| try: | |
| response = self._client.text_generation( | |
| prompt, max_new_tokens=self.max_new_tokens, | |
| temperature=self.temperature if self.temperature > 0 else 0.01, # Temp no puede ser 0 | |
| do_sample=True, top_p=0.95 | |
| ) | |
| return ChatMessage(role="assistant", content=response) | |
| except Exception as e: | |
| print(f"[ERROR] HuggingFace API call failed: {e}") | |
| return ChatMessage(role="assistant", content=f"Error: API call failed. Reason: {e}") | |
| async def achat(self, messages: list[ChatMessage], **kwargs) -> ChatMessage: | |
| return await asyncio.to_thread(self.chat, messages, **kwargs) | |
| def complete(self, prompt: str, **kwargs) -> CompletionResponse: | |
| raise NotImplementedError("Use .chat() for this model.") | |
| # ---------- TOOLING ---------- | |
| def _pd_safe_import(): | |
| try: | |
| import pandas as pd | |
| return pd | |
| except ModuleNotFoundError: | |
| return None | |
| def web_search(query: str, num_results: int = 5) -> str: | |
| """Tavily search -> concatenated, citation‑ready snippet list (includes URL).""" | |
| try: | |
| retriever = TavilySearchAPIRetriever(api_key=os.getenv("TAVILY_API_KEY"), k=num_results) | |
| results = retriever.invoke(query) | |
| formatted = [f"Result {i}:\nTitle: {doc.metadata.get('title','')}\nURL: {doc.metadata.get('source','')}\nContent: {doc.page_content}\n" for i, doc in enumerate(results, 1)] | |
| return "\n\n".join(formatted) | |
| except Exception as exc: | |
| return f"Error web_search: {exc}" | |
| def scrape_url_text(url: str) -> str: | |
| """Downloads a webpage and returns cleaned visible text.""" | |
| try: | |
| resp = requests.get(url, headers=HEADERS, timeout=20) | |
| resp.raise_for_status() | |
| if "Just a moment" in resp.text and "cloudflare" in resp.text.lower(): | |
| return "Error: The site is protected by Cloudflare and cannot be scraped directly. Use information from web_search instead." | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| for tag in soup(["script", "style", "noscript", "header", "footer", "nav"]): | |
| tag.decompose() | |
| text = "\n".join(t.strip() for t in soup.get_text("\n").splitlines() if t.strip()) | |
| return text[:8000] | |
| except Exception as exc: | |
| return f"Error scrape_url_text: {exc}" | |
| def analyze_markdown_table(table_md: str, question: str) -> str: | |
| """Check commutativity or return CSV. Requires pandas lazily.""" | |
| pd = _pd_safe_import() | |
| if pd is None: return "Error: pandas library is required for this tool but not installed." | |
| try: | |
| clean = [ln for ln in table_md.strip().splitlines() if ln.strip() and not ln.lstrip().startswith("|---")] | |
| rows = [[c.strip() for c in ln.strip("|").split("|")] for ln in clean] | |
| if len(rows) < 2: return "Error: malformed markdown table" | |
| df = pd.DataFrame(rows[1:], columns=rows[0]) | |
| if "conmut" in question.lower(): | |
| offenders: set[str] = set() | |
| header, cols = df.columns[0], df.columns[1:] | |
| for x in cols: | |
| for y in cols: | |
| try: | |
| val_xy = df.loc[df[header] == x, y].iat[0] | |
| val_yx = df.loc[df[header] == y, x].iat[0] | |
| if val_xy != val_yx: offenders.update([x, y]) | |
| except (IndexError, KeyError): continue | |
| return ", ".join(sorted(offenders)) or "Conmutativa" | |
| return df.to_csv(index=False) | |
| except Exception as exc: | |
| return f"Error analyze_markdown_table: {exc}" | |
| def execute_code(code: str) -> str: | |
| """Runs short python code in a sandboxed subprocess.""" | |
| try: | |
| res = subprocess.run(["python", "-S", "-c", code], capture_output=True, text=True, timeout=10) | |
| if res.returncode == 0: | |
| output = res.stdout.strip() | |
| return f"Output: {output if output else '(No output)'}" | |
| return f"Error: {res.stderr.strip()}" | |
| except Exception as exc: | |
| return f"Error execute_code: {exc}" | |
| # ... (otras herramientas como reverse_text, classify_botanical_foods, etc. van aquí, sin cambios) ... | |
| def reverse_text(text: str) -> str: return text[::-1] | |
| # ---------- TOOL DEFINITIONS & PROMPT ---------- | |
| tool_defs = [ | |
| (web_search, "web_search", "Searches the web via Tavily."), | |
| (scrape_url_text, "scrape_url_text", "Fetch any URL and return visible text."), | |
| (analyze_markdown_table, "analyze_markdown_table", "Analyze a markdown table."), | |
| (execute_code, "execute_code", "Run short python snippets securely."), | |
| (reverse_text, "reverse_text", "Reverse a text string."), | |
| (lambda _: "I cannot answer with the available tools.", "no_tool_solution", "Fallback answer when stuck."), | |
| ] | |
| TOOLS = [FunctionTool.from_defaults(fn=fn, name=name, description=desc) for fn, name, desc in tool_defs] | |
| SYSTEM_PROMPT = f""" | |
| You are Alfred, a ReAct agent. Your goal is to answer questions accurately. Follow these rules STRICTLY. | |
| **OPERATING PROCEDURE:** | |
| 1. **TRIAGE:** First, analyze the question. If it involves a local file (image, audio, Excel) or multimedia, IMMEDIATELY use `no_tool_solution`. | |
| 2. **INFORMATION GATHERING:** For all other questions, your FIRST step is ALWAYS `web_search`. | |
| 3. **ANALYZE SNIPPET:** After `web_search`, CAREFULLY read the `Content:` snippet of each result. If the answer is clearly present, answer immediately. DO NOT use another tool if you already have the information. | |
| 4. **DEEP DIVE:** Only if the snippet is incomplete, use `scrape_url_text` on the most promising URL. If `scrape_url_text` fails (e.g., Cloudflare error), go back to the text from `web_search` or give up. | |
| 5. **FINAL ANSWER:** Your final response MUST be ONLY the `Observation:` from your last successful tool call, or the phrase "I cannot answer with the available tools." | |
| """ | |
| # ---------- AGENT CREATION & EXECUTION ---------- | |
| def create_fresh_agent(): | |
| """Creates a new, clean agent instance to prevent state contamination.""" | |
| llm = HuggingFaceLLM() | |
| return ReActAgent.from_tools( | |
| tools=TOOLS, llm=llm, system_prompt=SYSTEM_PROMPT, verbose=False, | |
| max_iterations=20, handle_parsing_errors=True | |
| ) | |
| def _extract_observation(raw: str) -> str: | |
| """Extracts the LAST observation from the ReAct agent's reasoning dump.""" | |
| if "Observation:" in raw: | |
| segment = raw.rsplit("Observation:", 1)[-1] | |
| if "Final Answer:" in segment: | |
| segment = segment.split("Final Answer:", 1)[0] | |
| return segment.strip() | |
| return raw.strip() | |
| def basic_agent_response(question: str) -> str: | |
| """Public entry point: creates a fresh agent and runs one query.""" | |
| try: | |
| print(f"[DEBUG] ➜ Question: {question}") | |
| agent = create_fresh_agent() | |
| raw_resp = agent.query(question) | |
| text_response = str(raw_resp.response if hasattr(raw_resp, "response") else raw_resp) | |
| cleaned = _extract_observation(text_response) | |
| return cleaned or "I cannot answer with the available tools." | |
| except Exception as exc: | |
| print(f"[ERROR] Agent execution failed: {exc}") | |
| return "I cannot answer with the available tools." |