""" SIMPLIFIED VERSION - No monkey patching, just clean type hints """ import os import re import shutil import traceback import gradio as gr from pathlib import Path from typing import List, Dict, Any, Tuple, Optional from histopath.agent import A1 from dotenv import load_dotenv # Load environment if os.path.exists(".env"): load_dotenv() PASSCODE = os.getenv("GRADIO_PASSWORD", "TESTING") agent = None def check_for_output_files(): """Check output directory for files.""" output_dir = Path("./output") if not output_dir.exists(): return [], [] images = [str(f) for f in output_dir.glob("*") if f.suffix.lower() in {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff"}] data = [str(f) for f in output_dir.glob("*") if f.suffix.lower() in {".csv", ".txt", ".json", ".npy"}] return images, data def preview_file(file): """Preview uploaded file.""" if file is None: return None, None, "No file" path = Path(file.name) if path.suffix.lower() in {".png", ".jpg", ".jpeg", ".svg", ".tif", ".tiff", ".svs"}: return file.name, None, f"Preview: {path.name}" else: size = path.stat().st_size / 1024 return None, file.name, f"File: {path.name} ({size:.1f} KB)" def parse_output(text): """Parse agent output.""" text = re.sub(r'={30,}.*?={30,}', '', text, flags=re.DOTALL).strip() result = {"type": "text", "content": text, "code": None, "obs": None, "think": None} code_match = re.search(r'(.*?)', text, re.DOTALL) if code_match: result["type"] = "code" result["code"] = code_match.group(1).strip() before = text[:code_match.start()].strip() before = re.sub(r'(.*?)', r'\1', before, flags=re.DOTALL) result["think"] = before or None return result obs_match = re.search(r'(.*?)', text, re.DOTALL) if obs_match: result["type"] = "obs" result["obs"] = obs_match.group(1).strip() before = text[:obs_match.start()].strip() before = re.sub(r'(.*?)', r'\1', before, flags=re.DOTALL) result["think"] = before or None return result sol_match = re.search(r'(.*?)', text, re.DOTALL) if sol_match: result["type"] = "solution" result["content"] = sol_match.group(1).strip() before = text[:sol_match.start()].strip() before = re.sub(r'(.*?)', r'\1', before, flags=re.DOTALL) result["think"] = before or None return result result["content"] = re.sub(r'(.*?)', r'\1', text, flags=re.DOTALL) return result def format_display(parsed): """Format for display.""" parts = [] if parsed.get("think"): parts.append(parsed["think"]) if parsed["type"] == "code": if parsed.get("think"): parts.append("\n---\n") parts.append("### 💻 Code\n") parts.append(f"```python\n{parsed['code']}\n```") elif parsed["type"] == "obs": if parsed.get("think"): parts.append("\n---\n") parts.append("### 📊 Output\n") parts.append(f"```\n{parsed['obs']}\n```") elif parsed["type"] == "solution": if parsed.get("think"): parts.append("\n---\n") parts.append("### ✅ Solution\n") parts.append(parsed['content']) else: if not parsed.get("think"): parts.append(parsed["content"]) return "\n\n".join(parts) # CRITICAL: Simple types only def process_query(prompt: str, file: Any, history: List[Dict[str, str]]): """Process user query - SIMPLE TYPES ONLY.""" global agent # Initialize history if None if history is None: history = [] # Check agent if agent is None: history.append({"role": "assistant", "content": "⚠️ Enter passcode first"}) yield history, None, None, None, None, "Not initialized" return # Check input if not prompt.strip() and file is None: history.append({"role": "assistant", "content": "⚠️ Provide prompt or file"}) yield history, None, None, None, None, "No input" return # Handle file if file is not None: try: Path("./data").mkdir(exist_ok=True) fname = Path(file.name).name fpath = Path("./data") / fname shutil.copy(file.name, fpath) prompt = f"{prompt}\n\nFile: {fpath}" if prompt.strip() else f"File at: {fpath}" except Exception as e: history.append({"role": "assistant", "content": f"❌ File error: {e}"}) yield history, None, None, None, None, str(e) return # Add user message history.append({"role": "user", "content": prompt}) yield history, None, None, None, None, "Processing..." # Run agent try: outputs = [] for step in agent.go_stream(prompt): outputs.append(step.get("output", "")) # Format outputs for out in outputs: if not out.strip(): continue parsed = parse_output(out) msg = format_display(parsed) if msg.strip(): history.append({"role": "assistant", "content": msg}) # Get results imgs, data = check_for_output_files() yield history, imgs, data, None, None, f"✅ Done ({len(outputs)} steps)" except Exception as e: err = f"❌ Error:\n```\n{traceback.format_exc()}\n```" history.append({"role": "assistant", "content": err}) yield history, None, None, None, None, str(e) def validate_pass(pwd: str): """Validate passcode.""" global agent if pwd == PASSCODE: try: agent = A1( path="./data", llm="claude-sonnet-4-20250514", source="Anthropic", use_tool_retriever=True, timeout_seconds=600 ) return gr.update(visible=False), gr.update(visible=True), "✅ Authenticated" except Exception as e: return gr.update(visible=True), gr.update(visible=False), f"❌ Init error: {e}" else: return gr.update(visible=True), gr.update(visible=False), "❌ Invalid passcode" def clear_all(): """Clear everything.""" out_dir = Path("./output") if out_dir.exists(): for f in out_dir.iterdir(): if f.is_file(): f.unlink() return [], None, None, None, None, "Cleared" # UI with gr.Blocks(title="HistoPath") as demo: gr.HTML("

🔬 HistoPath Agent

") with gr.Group(visible=True) as pass_section: gr.Markdown("### 🔐 Enter Passcode") with gr.Row(): pass_input = gr.Textbox(label="Passcode", type="password", scale=3) pass_btn = gr.Button("Unlock", variant="primary", scale=1) pass_status = gr.Textbox(label="Status", interactive=False) with gr.Group(visible=False) as main_section: with gr.Row(): with gr.Column(scale=3): chat = gr.Chatbot(type="messages", height=500) with gr.Row(): msg = gr.Textbox(label="Query", placeholder="Enter query...", scale=4) upload = gr.File(label="Upload", scale=1) with gr.Row(): send = gr.Button("Send", variant="primary", scale=2) clear = gr.Button("Clear", scale=1) status = gr.Textbox(value="Ready", interactive=False, show_label=False) with gr.Column(scale=2): with gr.Tabs(): with gr.Tab("Input"): in_img = gr.Image(height=300) in_file = gr.File(interactive=False) in_stat = gr.Textbox(value="No file", interactive=False, show_label=False) with gr.Tab("Images"): out_imgs = gr.Gallery(height=500) with gr.Tab("Data"): out_data = gr.File(file_count="multiple", interactive=False) # Events pass_btn.click(validate_pass, [pass_input], [pass_section, main_section, pass_status]) upload.change(preview_file, [upload], [in_img, in_file, in_stat]) send.click(process_query, [msg, upload, chat], [chat, out_imgs, out_data, in_img, in_file, status]) clear.click(clear_all, None, [chat, out_imgs, out_data, in_img, in_file, status]) msg.submit(process_query, [msg, upload, chat], [chat, out_imgs, out_data, in_img, in_file, status]) if __name__ == "__main__": Path("./data").mkdir(exist_ok=True) Path("./output").mkdir(exist_ok=True) print("=" * 50) print("🔬 HistoPath Agent - Simplified") print("=" * 50) demo.launch()