Spaces:
Running
Running
| """Document Explorer β API backend for exploring uploaded documents with an LLM. | |
| The frontend lives on the AppSimple website. This Space provides | |
| streaming question/answer, file upload, and trace endpoints. | |
| """ | |
| from __future__ import annotations | |
| import hmac | |
| import json | |
| import os | |
| import secrets | |
| import tempfile | |
| import time | |
| from collections.abc import Generator | |
| from dataclasses import asdict | |
| from datetime import date, datetime, timezone | |
| from pathlib import Path | |
| import gradio as gr | |
| import litellm | |
| from dotenv import load_dotenv | |
| from huggingface_hub import HfApi | |
| from llm_harness.agent import run_agent_loop | |
| from llm_harness.citations import process_citations, superscript | |
| from llm_harness.prompt import build_system_prompt | |
| from llm_harness.tools import TOOL_DEFINITIONS | |
| from llm_harness.trace_viewer import render_trace | |
| from llm_harness.types import Message, TextDeltaEvent, ToolCallEvent, ToolResultEvent | |
| from sandbox_e2b import run_python as e2b_run_python | |
| load_dotenv() | |
| litellm.suppress_debug_info = True | |
| MODEL = os.environ.get("LH_MODEL", "") | |
| ACCESS_TOKEN = os.environ.get("LH_ACCESS_TOKEN", "") | |
| ADMIN_TOKEN = os.environ.get("LH_ADMIN_TOKEN", "") | |
| MAX_SESSION_COST = float(os.environ.get("LH_MAX_SESSION_COST", "0.50")) | |
| DAILY_FREE_LIMIT = int(os.environ.get("LH_DAILY_FREE_LIMIT", "5")) | |
| NOTIFY_EMAIL = os.environ.get("NOTIFY_EMAIL", "") | |
| SMTP_APP_PASSWORD = os.environ.get("SMTP_APP_PASSWORD", "") | |
| HF_TRACES_REPO = os.environ.get("HF_TRACES_REPO", "") | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") | |
| SOURCE = "prod" if os.environ.get("SPACE_ID") else "dev" | |
| BASE_PROMPT = ( | |
| "Your response should stand on its own.\n\n" | |
| "Do not speculate, manufacture connections to make a question fit, or answer " | |
| "off-topic questions." | |
| ) | |
| hf_api = HfApi(token=HF_TOKEN) if HF_TOKEN else None | |
| # Global daily counter for free (unauthenticated) usage | |
| _free_count = 0 | |
| _free_date = date.today() | |
| def _notify_limit_reached() -> None: | |
| """Send a one-time daily email when the free question limit is reached.""" | |
| if not NOTIFY_EMAIL or not SMTP_APP_PASSWORD: | |
| return | |
| try: | |
| import smtplib | |
| from email.message import EmailMessage | |
| msg = EmailMessage() | |
| msg["Subject"] = "Document Explorer: daily free limit reached" | |
| msg["From"] = NOTIFY_EMAIL | |
| msg["To"] = NOTIFY_EMAIL | |
| msg.set_content( | |
| f"The document explorer free question limit ({DAILY_FREE_LIMIT}) " | |
| f"was reached on {date.today()}. People are using it!" | |
| ) | |
| with smtplib.SMTP_SSL("smtp.gmail.com", 465) as smtp: | |
| smtp.login(NOTIFY_EMAIL, SMTP_APP_PASSWORD) | |
| smtp.send_message(msg) | |
| print(f"Notification sent to {NOTIFY_EMAIL}") | |
| except Exception as exc: | |
| print(f"WARNING: notification failed: {exc}") | |
| def _is_free_question_allowed() -> bool: | |
| """Allow a limited number of questions per day without an access token.""" | |
| global _free_count, _free_date | |
| today = date.today() | |
| if today != _free_date: | |
| _free_count = 0 | |
| _free_date = today | |
| if _free_count >= DAILY_FREE_LIMIT: | |
| return False | |
| _free_count += 1 | |
| if _free_count == DAILY_FREE_LIMIT: | |
| _notify_limit_reached() | |
| return True | |
| # --------------------------------------------------------------------------- | |
| # Server-side session store | |
| # --------------------------------------------------------------------------- | |
| _sessions: dict[str, dict] = {} | |
| _TEMP_PREFIX = "/tmp/lh-" | |
| def _create_session(workspace_path: str) -> str: | |
| session_id = secrets.token_urlsafe(16) | |
| scratch_path = tempfile.mkdtemp(prefix="lh-scratch-") | |
| _sessions[session_id] = { | |
| "workspace": workspace_path, | |
| "scratch": scratch_path, | |
| "cost": 0.0, | |
| } | |
| return session_id | |
| def _get_session(session_id: str) -> dict | None: | |
| session = _sessions.get(session_id) | |
| if not session: | |
| return None | |
| # Validate paths are in expected temp directories | |
| if not session["workspace"].startswith(_TEMP_PREFIX): | |
| return None | |
| return session | |
| def _has_valid_token(token: str) -> bool: | |
| if not ACCESS_TOKEN: | |
| return True | |
| if not token: | |
| return False | |
| return hmac.compare_digest(token, ACCESS_TOKEN) | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _slugify(text: str, max_len: int = 50) -> str: | |
| slug = text.lower().replace(" ", "-") | |
| slug = "".join(c for c in slug if c.isalnum() or c == "-") | |
| return slug[:max_len].rstrip("-") | |
| def _redact_trace(result: dict) -> dict: | |
| """Strip document content from tool calls to protect user data.""" | |
| import copy | |
| sanitized = copy.deepcopy(result) | |
| for tc in sanitized.get("trace", {}).get("tool_calls", []): | |
| try: | |
| tool_result = json.loads(tc.get("result", "{}")) | |
| if "stdout" in tool_result: | |
| tool_result["stdout"] = f"[redacted β {len(tool_result['stdout'])} chars]" | |
| tc["result"] = json.dumps(tool_result) | |
| except (json.JSONDecodeError, TypeError): | |
| tc["result"] = "[redacted]" | |
| sanitized.get("trace", {}).pop("scratch_files", None) | |
| for msg in sanitized.get("trace", {}).get("messages", []): | |
| if msg.get("role") == "system": | |
| msg["content"] = "[redacted]" | |
| elif msg.get("role") == "tool": | |
| msg["content"] = "[redacted]" | |
| return sanitized | |
| def upload_trace(result: dict) -> None: | |
| if not hf_api or not HF_TRACES_REPO: | |
| return | |
| sanitized = _redact_trace(result) | |
| timestamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S-%f") | |
| question_slug = _slugify(result.get("question", "")) | |
| filename = f"{timestamp}_{question_slug}.json" if question_slug else f"{timestamp}.json" | |
| content = json.dumps(sanitized, indent=2, default=str).encode() | |
| try: | |
| hf_api.upload_file( | |
| path_or_fileobj=content, | |
| path_in_repo=filename, | |
| repo_id=HF_TRACES_REPO, | |
| repo_type="dataset", | |
| ) | |
| except Exception as exc: | |
| print(f"WARNING: trace upload failed: {exc}") | |
| ALLOWED_EXTENSIONS = {".txt", ".csv", ".md", ".json", ".pdf"} | |
| MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB per file | |
| MAX_TOTAL_SIZE = 50 * 1024 * 1024 # 50 MB total | |
| MAX_FILE_COUNT = 50 | |
| def validate_and_save_files(file_paths: list[str]) -> tuple[Path | None, list[str]]: | |
| """Validate and save uploaded files. Returns (workspace, errors).""" | |
| errors = [] | |
| if len(file_paths) > MAX_FILE_COUNT: | |
| errors.append(f"Too many files ({len(file_paths)}). Maximum is {MAX_FILE_COUNT}.") | |
| return None, errors | |
| valid_files = [] | |
| total_size = 0 | |
| for file_path in file_paths: | |
| src = Path(file_path) | |
| if not src.is_file(): | |
| continue | |
| if src.suffix.lower() not in ALLOWED_EXTENSIONS: | |
| allowed = ", ".join(sorted(ALLOWED_EXTENSIONS)) | |
| errors.append(f"'{src.name}' has an unsupported file type. Allowed: {allowed}") | |
| continue | |
| size = src.stat().st_size | |
| if size > MAX_FILE_SIZE: | |
| limit_mb = MAX_FILE_SIZE // (1024 * 1024) | |
| errors.append(f"'{src.name}' is too large ({size // (1024 * 1024)}MB). Maximum is {limit_mb}MB per file.") | |
| continue | |
| total_size += size | |
| if total_size > MAX_TOTAL_SIZE: | |
| limit_mb = MAX_TOTAL_SIZE // (1024 * 1024) | |
| errors.append(f"Total upload size exceeds {limit_mb}MB. Remove some files and try again.") | |
| return None, errors | |
| valid_files.append(src) | |
| if not valid_files: | |
| if not errors: | |
| errors.append("No valid files to upload.") | |
| return None, errors | |
| workspace = Path(tempfile.mkdtemp(prefix="lh-workspace-")) | |
| for src in valid_files: | |
| (workspace / src.name).write_bytes(src.read_bytes()) | |
| return workspace, errors | |
| def format_stats(trace) -> str: | |
| """Format trace stats for display. Accepts a Trace object or dict.""" | |
| if isinstance(trace, dict): | |
| cached = trace.get("cached_tokens", 0) | |
| model = trace.get("model", "") | |
| prompt = trace.get("prompt_tokens", 0) | |
| completion = trace.get("completion_tokens", 0) | |
| tool_calls = trace.get("tool_calls", []) | |
| wall = trace.get("wall_time_s", 0) | |
| cost = trace.get("cost") | |
| else: | |
| cached = trace.cached_tokens | |
| model = trace.model | |
| prompt = trace.prompt_tokens | |
| completion = trace.completion_tokens | |
| tool_calls = trace.tool_calls | |
| wall = trace.wall_time_s | |
| cost = trace.cost | |
| cache_str = f" ({cached} cached)" if cached else "" | |
| model_name = model.split("/")[-1] if model else "" | |
| parts = [ | |
| model_name, | |
| f"{prompt + completion:,} tokens{cache_str}", | |
| f"{len(tool_calls)} tool calls", | |
| f"{wall:.1f}s", | |
| ] | |
| if cost: | |
| parts.append(f"${cost:.4f}") | |
| return " Β· ".join(parts) | |
| # --------------------------------------------------------------------------- | |
| # Streaming question handler | |
| # --------------------------------------------------------------------------- | |
| def stream_question( | |
| question: str, | |
| session_id: str, | |
| token: str, | |
| ) -> Generator[str, None, None]: | |
| """Streaming API β yields JSON event strings.""" | |
| authenticated = _has_valid_token(token) | |
| if not authenticated and not _is_free_question_allowed(): | |
| yield json.dumps({"type": "error", "error": "Daily free limit reached. Enter an access token for unlimited use."}) | |
| return | |
| if token and not authenticated: | |
| yield json.dumps({"type": "error", "error": "Invalid access token."}) | |
| return | |
| if not MODEL: | |
| yield json.dumps({"type": "error", "error": "LH_MODEL not set."}) | |
| return | |
| session = _get_session(session_id) | |
| if not session: | |
| yield json.dumps({"type": "error", "error": "Invalid session. Please re-upload your documents."}) | |
| return | |
| if session["cost"] >= MAX_SESSION_COST: | |
| yield json.dumps({ | |
| "type": "error", | |
| "error": f"Session cost limit reached (${session['cost']:.2f} / ${MAX_SESSION_COST:.2f}).", | |
| }) | |
| return | |
| workspace = Path(session["workspace"]) | |
| scratch_dir = Path(session["scratch"]) | |
| system_prompt = build_system_prompt(base_prompt=BASE_PROMPT, workspace=workspace) | |
| messages: list[Message] = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": question}, | |
| ] | |
| start = time.monotonic() | |
| agent_run = run_agent_loop( | |
| model=MODEL, | |
| messages=messages, | |
| tools=TOOL_DEFINITIONS, | |
| completion=litellm.completion, | |
| workspace=workspace, | |
| scratch_dir=scratch_dir, | |
| sandbox_fn=e2b_run_python, | |
| stream=True, | |
| ) | |
| tool_call_count = 0 | |
| try: | |
| for event in agent_run: | |
| if isinstance(event, TextDeltaEvent): | |
| yield json.dumps({"type": "delta", "content": event.content}) | |
| elif isinstance(event, ToolCallEvent): | |
| tool_call_count += 1 | |
| yield json.dumps({"type": "tool_call", "count": tool_call_count, "name": event.name}) | |
| except Exception as exc: | |
| yield json.dumps({"type": "error", "error": "An error occurred during processing."}) | |
| print(f"ERROR in stream_question: {exc}") | |
| return | |
| trace = agent_run.trace | |
| trace.wall_time_s = round(time.monotonic() - start, 2) | |
| # Update server-side session cost | |
| session["cost"] += trace.cost or 0 | |
| clean_answer, sources = process_citations(trace.answer or "", workspace) | |
| result = { | |
| "question": question, | |
| "source": SOURCE, | |
| "passed": True, | |
| "assertions": {}, | |
| "trace": asdict(trace), | |
| "citations": sources, | |
| } | |
| upload_trace(result) | |
| trace_html = render_trace(result, max_chars=2000) | |
| yield json.dumps({ | |
| "type": "done", | |
| "answer": clean_answer, | |
| "sources": sources, | |
| "stats": format_stats(trace), | |
| "trace_html": trace_html, | |
| "session_cost": session["cost"], | |
| }) | |
| # --------------------------------------------------------------------------- | |
| # Gradio app (API endpoints only) | |
| # --------------------------------------------------------------------------- | |
| def build_app() -> gr.Blocks: | |
| with gr.Blocks(title="Document Explorer") as demo: | |
| gr.Markdown("# Document Explorer API\n\nThis Space provides the API backend. " | |
| "Visit [appsimple.io/explore](https://appsimple.io/explore) for the full interface.") | |
| # Streaming ask endpoint β takes question, session_id, token | |
| ask_inputs = [ | |
| gr.Textbox(visible=False), # question | |
| gr.Textbox(visible=False), # session_id | |
| gr.Textbox(visible=False), # token | |
| ] | |
| ask_output = gr.Textbox(visible=False) | |
| def api_ask_stream(question, session_id, token): | |
| for event_json in stream_question(question, session_id, token): | |
| yield event_json | |
| ask_btn = gr.Button(visible=False) | |
| ask_btn.click(api_ask_stream, inputs=ask_inputs, outputs=ask_output, api_name="ask") | |
| # Upload endpoint β accepts files, creates workspace and session | |
| upload_input = gr.Textbox(visible=False) | |
| upload_output = gr.Textbox(visible=False) | |
| def api_upload(payload): | |
| try: | |
| data = json.loads(payload) | |
| token = data.get("token", "") | |
| file_paths = data.get("paths", []) | |
| except (json.JSONDecodeError, AttributeError): | |
| return json.dumps({"error": "Invalid upload request."}) | |
| if token and not _has_valid_token(token): | |
| return json.dumps({"error": "Invalid access token."}) | |
| if not file_paths: | |
| return json.dumps({"error": "No files selected. Please upload at least one document."}) | |
| workspace, errors = validate_and_save_files(file_paths) | |
| if not workspace: | |
| return json.dumps({"error": " ".join(errors)}) | |
| session_id = _create_session(str(workspace)) | |
| saved = sum(1 for f in workspace.iterdir() if f.is_file()) | |
| result = {"session_id": session_id, "file_count": saved} | |
| if errors: | |
| result["warnings"] = errors | |
| return json.dumps(result) | |
| upload_btn = gr.Button(visible=False) | |
| upload_btn.click(api_upload, inputs=upload_input, outputs=upload_output, api_name="upload") | |
| # Document viewer endpoint β uses session_id for path lookup | |
| doc_input = gr.Textbox(visible=False) | |
| doc_session_input = gr.Textbox(visible=False) | |
| doc_output = gr.Textbox(visible=False) | |
| def api_get_doc(filename, session_id): | |
| session = _get_session(session_id) | |
| if not session or not filename: | |
| return json.dumps({"error": "not found"}) | |
| safe_name = Path(filename).name | |
| workspace = Path(session["workspace"]) | |
| filepath = workspace / safe_name | |
| if not filepath.is_file(): | |
| return json.dumps({"error": "not found"}) | |
| return json.dumps({"filename": safe_name, "content": filepath.read_text()}) | |
| doc_btn = gr.Button(visible=False) | |
| doc_btn.click(api_get_doc, inputs=[doc_input, doc_session_input], outputs=doc_output, api_name="doc") | |
| # Trace list endpoint | |
| traces_input = gr.Textbox(visible=False) | |
| traces_output = gr.Textbox(visible=False) | |
| def api_list_traces(query): | |
| if not hf_api or not HF_TRACES_REPO: | |
| return json.dumps({"error": "traces not configured"}) | |
| try: | |
| files = hf_api.list_repo_files( | |
| repo_id=HF_TRACES_REPO, repo_type="dataset" | |
| ) | |
| traces = sorted( | |
| [f for f in files if f.endswith(".json")], reverse=True | |
| ) | |
| if query: | |
| traces = [f for f in traces if query.lower() in f.lower()] | |
| return json.dumps({"traces": traces[:100]}) | |
| except Exception as exc: | |
| return json.dumps({"error": str(exc)}) | |
| traces_btn = gr.Button(visible=False) | |
| traces_btn.click(api_list_traces, inputs=traces_input, outputs=traces_output, api_name="traces") | |
| # Trace replay endpoint | |
| replay_input = gr.Textbox(visible=False) | |
| replay_output = gr.Textbox(visible=False) | |
| def api_get_trace(filename): | |
| if not hf_api or not HF_TRACES_REPO or not filename: | |
| return json.dumps({"error": "not found"}) | |
| safe_name = Path(filename).name | |
| try: | |
| path = hf_api.hf_hub_download( | |
| HF_TRACES_REPO, safe_name, repo_type="dataset" | |
| ) | |
| data = json.loads(Path(path).read_text()) | |
| trace = data.get("trace", {}) | |
| raw_answer = trace.get("answer", "") | |
| clean_answer, sources = process_citations(raw_answer, None) | |
| trace_html = render_trace(data, max_chars=2000) | |
| return json.dumps({ | |
| "question": data.get("question", ""), | |
| "answer": clean_answer, | |
| "sources": sources, | |
| "stats": format_stats(trace), | |
| "source_tag": data.get("source", ""), | |
| "trace_html": trace_html, | |
| "filename": safe_name, | |
| }) | |
| except Exception as exc: | |
| return json.dumps({"error": str(exc)}) | |
| replay_btn = gr.Button(visible=False) | |
| replay_btn.click(api_get_trace, inputs=replay_input, outputs=replay_output, api_name="replay") | |
| return demo | |
| if __name__ == "__main__": | |
| if not ACCESS_TOKEN: | |
| print("WARNING: LH_ACCESS_TOKEN not set β app is unprotected") | |
| app = build_app() | |
| app.launch(server_name="0.0.0.0", server_port=7860) | |