Spaces:
Sleeping
Sleeping
feat: Unified pipeline lifecycle - single S3 file per pipeline - Pipeline now stored in S3 with pipeline_id at creation - Proposals stored in S3, only reference in MongoDB - Results appended to same S3 file on completion - Workflow save uses pipeline_id, works at any stage - pipelines_history now in session API response
aaaba76
| # # app.py - MasterLLM v2.0 with Bedrock Fallback System | |
| # """ | |
| # MasterLLM Pipeline Orchestrator v2.0 | |
| # - Bedrock (priority) + Gemini (fallback) for pipeline generation | |
| # - Bedrock LangChain (priority) + CrewAI (fallback) for execution | |
| # - MongoDB session management | |
| # - Complete REST API | |
| # - Gradio UI with fancy displays | |
| # """ | |
| # import os | |
| # import json | |
| # import uuid | |
| # from datetime import datetime | |
| # from typing import List, Optional | |
| # import gradio as gr | |
| # from fastapi import FastAPI | |
| # from fastapi.middleware.cors import CORSMiddleware | |
| # from contextlib import asynccontextmanager | |
| # import asyncio | |
| # # Import our new services | |
| # from services.pipeline_generator import generate_pipeline, format_pipeline_for_display | |
| # from services.pipeline_executor import execute_pipeline_streaming | |
| # from services.session_manager import session_manager | |
| # from api_routes import router as api_router | |
| # # ======================== | |
| # # BACKGROUND CLEANUP TASK | |
| # # ======================== | |
| # async def periodic_cleanup(): | |
| # """Cleanup old sessions every hour""" | |
| # while True: | |
| # await asyncio.sleep(3600) # Run every hour | |
| # try: | |
| # removed = session_manager.cleanup_old_sessions(max_age_hours=24) | |
| # if removed > 0: | |
| # print(f"π§Ή Cleaned up {removed} inactive sessions") | |
| # except Exception as e: | |
| # print(f"β οΈ Cleanup error: {e}") | |
| # @asynccontextmanager | |
| # async def lifespan(app: FastAPI): | |
| # """Manage application lifecycle""" | |
| # # Startup | |
| # print("π Starting MasterLLM v2.0...") | |
| # task = asyncio.create_task(periodic_cleanup()) | |
| # yield | |
| # # Shutdown | |
| # task.cancel() | |
| # session_manager.close() | |
| # print("π MasterLLM shut down gracefully") | |
| # # ======================== | |
| # # FASTAPI APP | |
| # # ======================== | |
| # app = FastAPI( | |
| # title="MasterLLM v2.0 - AI Pipeline Orchestrator", | |
| # description="Bedrock + Gemini fallback system with MongoDB sessions", | |
| # version="2.0.0", | |
| # lifespan=lifespan | |
| # ) | |
| # # CORS Configuration | |
| # app.add_middleware( | |
| # CORSMiddleware, | |
| # allow_origins=[os.getenv("FRONTEND_ORIGIN", "http://localhost:3000")], | |
| # allow_credentials=True, | |
| # allow_methods=["*"], | |
| # allow_headers=["*"], | |
| # ) | |
| # # Mount API routes | |
| # app.include_router(api_router) | |
| # # ======================== | |
| # # CONVERSATION STATE | |
| # # ======================== | |
| # class ConversationState: | |
| # INITIAL = "initial" | |
| # PIPELINE_PROPOSED = "pipeline_proposed" | |
| # PIPELINE_APPROVED = "pipeline_approved" | |
| # EXECUTING = "executing" | |
| # COMPLETED = "completed" | |
| # ERROR = "error" | |
| # # ======================== | |
| # # GRADIO UI HANDLERS | |
| # # ======================== | |
| # def create_new_session(): | |
| # """Create a new session""" | |
| # return session_manager.create_session() | |
| # def handle_file_upload(file_path, session_id): | |
| # """Handle file upload""" | |
| # if not file_path: | |
| # return None, json.dumps({ | |
| # "status": "error", | |
| # "message": "No file uploaded" | |
| # }, indent=2), session_id | |
| # if not session_id: | |
| # session_id = create_new_session() | |
| # file_name = os.path.basename(file_path) | |
| # # Update session | |
| # session_manager.update_session(session_id, { | |
| # "current_file": file_path, | |
| # "state": ConversationState.INITIAL | |
| # }) | |
| # # Add system message | |
| # session_manager.add_message( | |
| # session_id, | |
| # "system", | |
| # f"File uploaded: {file_name}" | |
| # ) | |
| # status = { | |
| # "status": "success", | |
| # "message": f"File '{file_name}' uploaded successfully", | |
| # "file_info": { | |
| # "name": file_name, | |
| # "path": file_path, | |
| # "size_bytes": os.path.getsize(file_path) if os.path.exists(file_path) else 0 | |
| # }, | |
| # "next_action": "π¬ Now tell me what you'd like to do with this document" | |
| # } | |
| # return file_path, json.dumps(status, indent=2), session_id | |
| # def format_chat_history(history, new_user_msg, new_assistant_msg): | |
| # """ | |
| # Convert chat history to new Gradio format (list of dicts with role/content) | |
| # Handles both old format (tuples) and new format (dicts) | |
| # """ | |
| # messages = [] | |
| # # Handle existing history - could be in old or new format | |
| # if history: | |
| # # Check if already in new format (list of dicts with 'role' and 'content') | |
| # if isinstance(history[0], dict) and 'role' in history[0]: | |
| # # Already in new format, just copy it | |
| # messages = list(history) | |
| # else: | |
| # # Old format (list of tuples), convert it | |
| # for item in history: | |
| # if isinstance(item, (list, tuple)) and len(item) == 2: | |
| # user_msg, bot_msg = item | |
| # messages.append({"role": "user", "content": user_msg}) | |
| # messages.append({"role": "assistant", "content": bot_msg}) | |
| # # Add new messages | |
| # messages.append({"role": "user", "content": new_user_msg}) | |
| # messages.append({"role": "assistant", "content": new_assistant_msg}) | |
| # return messages | |
| # def chatbot_response_streaming(message: str, history: List, session_id: str, file_path: str = None): | |
| # """ | |
| # Handle chat messages with streaming updates | |
| # Uses Bedrock (priority) β Gemini (fallback) for both generation and execution | |
| # """ | |
| # # Get or create session | |
| # session = session_manager.get_session(session_id) | |
| # if not session: | |
| # session_id = create_new_session() | |
| # session = session_manager.get_session(session_id) | |
| # # Update file path if provided | |
| # if file_path: | |
| # session_manager.update_session(session_id, {"current_file": file_path}) | |
| # session = session_manager.get_session(session_id) | |
| # # Add user message to session | |
| # session_manager.add_message(session_id, "user", message) | |
| # current_state = session.get("state", ConversationState.INITIAL) | |
| # # ======================== | |
| # # STATE: INITIAL - Generate Pipeline | |
| # # ======================== | |
| # if current_state == ConversationState.INITIAL: | |
| # # Check if file is uploaded | |
| # if not session.get("current_file"): | |
| # response = { | |
| # "status": "error", | |
| # "message": "Please upload a document first", | |
| # "action": "π Click 'Upload Document' to begin" | |
| # } | |
| # response_text = f"```json\n{json.dumps(response, indent=2)}\n```" | |
| # session_manager.add_message(session_id, "assistant", response_text) | |
| # yield format_chat_history(history, message, response_text) | |
| # return | |
| # try: | |
| # # Generate pipeline using Bedrock β Gemini fallback | |
| # yield format_chat_history(history, message, "π€ Generating pipeline with AI...\nβ³ Trying Bedrock first...") | |
| # pipeline = generate_pipeline( | |
| # user_input=message, | |
| # file_path=session.get("current_file"), | |
| # prefer_bedrock=True | |
| # ) | |
| # # Save proposed pipeline to session | |
| # session_manager.update_session(session_id, { | |
| # "proposed_pipeline": pipeline, | |
| # "state": ConversationState.PIPELINE_PROPOSED | |
| # }) | |
| # # Format for display | |
| # formatted_display = format_pipeline_for_display(pipeline) | |
| # # Create response with both fancy display and JSON | |
| # response_text = formatted_display + f"\n\n```json\n{json.dumps(pipeline, indent=2)}\n```" | |
| # session_manager.add_message(session_id, "assistant", response_text) | |
| # yield format_chat_history(history, message, response_text) | |
| # return | |
| # except Exception as e: | |
| # error_response = { | |
| # "status": "error", | |
| # "message": "Failed to generate pipeline", | |
| # "error": str(e), | |
| # "action": "Please try rephrasing your request" | |
| # } | |
| # response_text = f"```json\n{json.dumps(error_response, indent=2)}\n```" | |
| # session_manager.add_message(session_id, "assistant", response_text) | |
| # yield format_chat_history(history, message, response_text) | |
| # return | |
| # # ======================== | |
| # # STATE: PIPELINE_PROPOSED - Handle Approval/Rejection | |
| # # ======================== | |
| # elif current_state == ConversationState.PIPELINE_PROPOSED: | |
| # user_input = message.lower().strip() | |
| # # APPROVE - Execute the pipeline | |
| # if "approve" in user_input or "yes" in user_input: | |
| # session_manager.update_session(session_id, {"state": ConversationState.EXECUTING}) | |
| # plan = session.get("proposed_pipeline", {}) | |
| # # Initial status | |
| # initial_status = { | |
| # "status": "executing", | |
| # "message": "π Starting pipeline execution...", | |
| # "pipeline": plan.get("pipeline_name", "unknown"), | |
| # "executor": "Attempting Bedrock LangChain first", | |
| # "steps": [] | |
| # } | |
| # accumulated_response = f"```json\n{json.dumps(initial_status, indent=2)}\n```" | |
| # yield format_chat_history(history, message, accumulated_response) | |
| # steps_completed = [] | |
| # final_payload = None | |
| # executor_used = "unknown" | |
| # try: | |
| # # Execute pipeline with Bedrock β CrewAI fallback | |
| # for event in execute_pipeline_streaming( | |
| # pipeline=plan, | |
| # file_path=session.get("current_file"), | |
| # session_id=session_id, | |
| # prefer_bedrock=True | |
| # ): | |
| # event_type = event.get("type") | |
| # # Info events (fallback notifications, etc.) | |
| # if event_type == "info": | |
| # info_status = { | |
| # "status": "info", | |
| # "message": event.get("message"), | |
| # "executor": event.get("executor", "unknown") | |
| # } | |
| # accumulated_response = f"```json\n{json.dumps(info_status, indent=2)}\n```" | |
| # yield format_chat_history(history, message, accumulated_response) | |
| # # Step updates | |
| # elif event_type == "step": | |
| # step_info = { | |
| # "step": event.get("step", 0), | |
| # "tool": event.get("tool", "processing"), | |
| # "status": event.get("status", "running"), | |
| # "executor": event.get("executor", "unknown") | |
| # } | |
| # # Add observation if available (tool output) | |
| # if "observation" in event: | |
| # step_info["observation"] = event.get("observation") | |
| # # Add tool input if available | |
| # if "input" in event: | |
| # step_info["input"] = event.get("input") | |
| # steps_completed.append(step_info) | |
| # executor_used = event.get("executor", executor_used) | |
| # # Create more informative status message | |
| # status_message = f"π Step {event.get('step', 0)}: {event.get('tool', 'processing')}" | |
| # if event.get('status') == 'completed' and 'observation' in event: | |
| # obs_preview = str(event.get('observation'))[:100] | |
| # status_message += f" β \n Output: {obs_preview}..." | |
| # elif event.get('status') == 'executing': | |
| # status_message += " β³" | |
| # progress_status = { | |
| # "status": "executing", | |
| # "message": status_message, | |
| # "pipeline": plan.get("pipeline_name", ""), | |
| # "executor": executor_used, | |
| # "current_step": step_info, | |
| # "steps_completed": steps_completed | |
| # } | |
| # accumulated_response = f"```json\n{json.dumps(progress_status, indent=2)}\n```" | |
| # yield format_chat_history(history, message, accumulated_response) | |
| # # Final result | |
| # elif event_type == "final": | |
| # final_payload = event.get("data") | |
| # executor_used = event.get("executor", executor_used) | |
| # # Error | |
| # elif event_type == "error": | |
| # error_result = { | |
| # "status": "failed", | |
| # "error": event.get("error"), | |
| # "steps_completed": steps_completed, | |
| # "executor": event.get("executor", "unknown") | |
| # } | |
| # final_response = f"```json\n{json.dumps(error_result, indent=2)}\n```" | |
| # session_manager.update_session(session_id, {"state": ConversationState.INITIAL}) | |
| # session_manager.add_message(session_id, "assistant", final_response) | |
| # yield format_chat_history(history, message, final_response) | |
| # return | |
| # # Process final result | |
| # if final_payload: | |
| # session_manager.update_session(session_id, { | |
| # "pipeline_result": final_payload, | |
| # "state": ConversationState.INITIAL | |
| # }) | |
| # # Save execution to MongoDB | |
| # session_manager.save_pipeline_execution( | |
| # session_id=session_id, | |
| # pipeline=plan, | |
| # result=final_payload, | |
| # file_path=session.get("current_file"), | |
| # executor=executor_used | |
| # ) | |
| # # Format final response | |
| # final_display = { | |
| # "status": "completed", | |
| # "executor": executor_used, | |
| # "pipeline": plan.get("pipeline_name"), | |
| # "result": final_payload, | |
| # "summary": { | |
| # "total_steps": len(steps_completed), | |
| # "completed_successfully": len([s for s in steps_completed if s.get("status") == "completed"]) | |
| # } | |
| # } | |
| # final_response = f"```json\n{json.dumps(final_display, indent=2)}\n```" | |
| # else: | |
| # final_response = f"```json\n{json.dumps({'status': 'completed', 'steps': steps_completed, 'executor': executor_used}, indent=2)}\n```" | |
| # session_manager.update_session(session_id, {"state": ConversationState.INITIAL}) | |
| # session_manager.add_message(session_id, "assistant", final_response) | |
| # yield format_chat_history(history, message, final_response) | |
| # return | |
| # except Exception as e: | |
| # error_result = { | |
| # "error": str(e), | |
| # "status": "failed", | |
| # "message": "Pipeline execution failed", | |
| # "steps_completed": steps_completed | |
| # } | |
| # final_response = f"```json\n{json.dumps(error_result, indent=2)}\n```" | |
| # session_manager.update_session(session_id, {"state": ConversationState.INITIAL}) | |
| # session_manager.add_message(session_id, "assistant", final_response) | |
| # yield format_chat_history(history, message, final_response) | |
| # return | |
| # # REJECT - Cancel the pipeline | |
| # elif "reject" in user_input or "no" in user_input: | |
| # session_manager.update_session(session_id, { | |
| # "state": ConversationState.INITIAL, | |
| # "proposed_pipeline": None | |
| # }) | |
| # response_data = { | |
| # "status": "rejected", | |
| # "message": "Pipeline rejected by user", | |
| # "action": "π¬ Please provide a new instruction" | |
| # } | |
| # response = f"```json\n{json.dumps(response_data, indent=2)}\n```" | |
| # session_manager.add_message(session_id, "assistant", response) | |
| # yield format_chat_history(history, message, response) | |
| # return | |
| # # EDIT - Request modifications | |
| # elif "edit" in user_input or "modify" in user_input: | |
| # current_pipeline = session.get("proposed_pipeline", {}) | |
| # edit_help = { | |
| # "status": "edit_mode", | |
| # "message": "To modify the plan, describe your changes", | |
| # "current_plan": current_pipeline, | |
| # "examples": [ | |
| # "Add summarization at the end", | |
| # "Remove table extraction", | |
| # "Only process pages 1-3", | |
| # "Translate to French instead of Spanish" | |
| # ], | |
| # "action": "Describe your changes, or say 'approve' to run as-is" | |
| # } | |
| # response = f"```json\n{json.dumps(edit_help, indent=2)}\n```" | |
| # session_manager.add_message(session_id, "assistant", response) | |
| # yield format_chat_history(history, message, response) | |
| # return | |
| # # Try to modify pipeline based on user input | |
| # else: | |
| # if len(message.strip()) > 5: | |
| # try: | |
| # original_plan = session.get("proposed_pipeline", {}) | |
| # edit_context = f"Original: {original_plan.get('pipeline_name')}. User wants: {message}" | |
| # # Generate new pipeline with modification | |
| # new_pipeline = generate_pipeline( | |
| # user_input=edit_context, | |
| # file_path=session.get("current_file"), | |
| # prefer_bedrock=True | |
| # ) | |
| # session_manager.update_session(session_id, { | |
| # "proposed_pipeline": new_pipeline, | |
| # "state": ConversationState.PIPELINE_PROPOSED | |
| # }) | |
| # formatted = format_pipeline_for_display(new_pipeline) | |
| # response = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```" | |
| # session_manager.add_message(session_id, "assistant", response) | |
| # yield format_chat_history(history, message, response) | |
| # return | |
| # except Exception as e: | |
| # error_response = { | |
| # "status": "edit_failed", | |
| # "error": str(e), | |
| # "message": "Could not modify the plan", | |
| # "action": "Try 'approve' to run as-is, or 'reject' to start over" | |
| # } | |
| # response = f"```json\n{json.dumps(error_response, indent=2)}\n```" | |
| # session_manager.add_message(session_id, "assistant", response) | |
| # yield format_chat_history(history, message, response) | |
| # return | |
| # # Default waiting message | |
| # response_data = { | |
| # "status": "waiting_for_confirmation", | |
| # "message": "Please type 'approve', 'reject', or describe changes", | |
| # "hint": "You can also say 'edit' for modification hints" | |
| # } | |
| # response = f"```json\n{json.dumps(response_data, indent=2)}\n```" | |
| # session_manager.add_message(session_id, "assistant", response) | |
| # yield format_chat_history(history, message, response) | |
| # return | |
| # # Default fallback | |
| # response = json.dumps({"status": "ready", "message": "Ready for your next instruction"}, indent=2) | |
| # session_manager.add_message(session_id, "assistant", response) | |
| # yield format_chat_history(history, message, response) | |
| # # ======================== | |
| # # GRADIO UI | |
| # # ======================== | |
| # # Simple Blocks initialization for HF Spaces compatibility (older Gradio version) | |
| # with gr.Blocks(title="MasterLLM v2.0 - AI Pipeline Orchestrator") as demo: | |
| # gr.Markdown(""" | |
| # # π€ MasterLLM v2.0 - AI Pipeline Orchestrator | |
| # **π Bedrock Priority** with Gemini Fallback | **πΎ MongoDB Sessions** | **π‘ Complete REST API** | |
| # Upload a document, describe what you want, and watch AI orchestrate the perfect pipeline! | |
| # """) | |
| # # State management | |
| # session_id_state = gr.State(value=create_new_session()) | |
| # file_state = gr.State(value=None) | |
| # with gr.Row(): | |
| # with gr.Column(scale=3): | |
| # # Chat interface - Gradio auto-detects format from data structure | |
| # chatbot = gr.Chatbot(label="Chat") | |
| # # Text input | |
| # msg = gr.Textbox( | |
| # placeholder="π¬ Type your instruction... (e.g., 'extract text from pages 1-5 and summarize')", | |
| # label="Your Message", | |
| # lines=2, | |
| # max_lines=4, | |
| # ) | |
| # with gr.Row(): | |
| # submit_btn = gr.Button("π Send", variant="primary", scale=2) | |
| # clear_btn = gr.Button("ποΈ Clear Chat", scale=1) | |
| # with gr.Column(scale=1): | |
| # # File upload section | |
| # gr.Markdown("### π Upload Document") | |
| # file_upload = gr.File( | |
| # label="PDF or Image", | |
| # file_types=[".pdf", ".png", ".jpg", ".jpeg", ".gif", ".bmp"], | |
| # type="filepath", | |
| # ) | |
| # upload_status = gr.Textbox( | |
| # label="π Upload Status", | |
| # interactive=False, | |
| # lines=10, | |
| # max_lines=15, | |
| # ) | |
| # # Session info | |
| # gr.Markdown("### π Session Info") | |
| # session_display = gr.Textbox( | |
| # label="Session ID", | |
| # interactive=False, | |
| # value=lambda: session_id_state.value[:8] + "...", | |
| # ) | |
| # # Examples | |
| # gr.Markdown("### π‘ Example Pipelines") | |
| # gr.Examples( | |
| # examples=[ | |
| # "extract text from pages 1-5", | |
| # "extract text and summarize", | |
| # "extract text, tables, and translate to Spanish", | |
| # "get tables from pages 2-4 and summarize", | |
| # "text-classify-ner from entire document", | |
| # "describe images and summarize findings", | |
| # "extract text, detect signatures and stamps", | |
| # ], | |
| # inputs=msg, | |
| # ) | |
| # # System info | |
| # gr.Markdown(""" | |
| # ### βΉοΈ System Features | |
| # - β **Bedrock** (Claude 3.5 Sonnet) priority | |
| # - β **Gemini** (gemini-2.0-flash) fallback | |
| # - β **MongoDB** session persistence | |
| # - β **Streaming** real-time updates | |
| # - β **Component-level** JSON output | |
| # - β **REST API** for integration | |
| # ### π Pipeline Flow: | |
| # 1. **Upload** your document | |
| # 2. **Describe** what you want | |
| # 3. **Review** AI-generated pipeline | |
| # 4. **Approve** to execute | |
| # 5. **Watch** streaming updates | |
| # 6. **Get** complete JSON results | |
| # """) | |
| # # Event handlers | |
| # file_upload.upload( | |
| # fn=handle_file_upload, | |
| # inputs=[file_upload, session_id_state], | |
| # outputs=[file_state, upload_status, session_id_state], | |
| # ) | |
| # msg.submit( | |
| # fn=chatbot_response_streaming, | |
| # inputs=[msg, chatbot, session_id_state, file_state], | |
| # outputs=[chatbot], | |
| # ).then( | |
| # lambda: "", | |
| # outputs=msg, | |
| # ) | |
| # submit_btn.click( | |
| # fn=chatbot_response_streaming, | |
| # inputs=[msg, chatbot, session_id_state, file_state], | |
| # outputs=[chatbot], | |
| # ).then( | |
| # lambda: "", | |
| # outputs=msg, | |
| # ) | |
| # clear_btn.click( | |
| # fn=lambda: ([], create_new_session(), None, None, "", ""), | |
| # outputs=[chatbot, session_id_state, file_state, file_upload, msg, upload_status], | |
| # ) | |
| # # Mount Gradio on FastAPI | |
| # app = gr.mount_gradio_app(app, demo, path="/") | |
| # # ======================== | |
| # # LAUNCH | |
| # # ======================== | |
| # if __name__ == "__main__": | |
| # import uvicorn | |
| # port = int(os.getenv("PORT", 7860)) | |
| # print(f""" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # β β | |
| # β π MasterLLM v2.0 Starting... β | |
| # β β | |
| # β π Gradio UI: http://localhost:{port} β | |
| # β π‘ REST API: http://localhost:{port}/api/v1 β | |
| # β π API Docs: http://localhost:{port}/docs β | |
| # β β | |
| # β π Bedrock: Priority (Claude 3.5 Sonnet) β | |
| # β π Gemini: Fallback (gemini-2.0-flash) β | |
| # β πΎ MongoDB: Session management β | |
| # β β | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # """) | |
| # uvicorn.run(app, host="0.0.0.0", port=port) | |
| # app.py - MasterLLM v2.0 with Bedrock Fallback System | |
| """ | |
| MasterLLM Pipeline Orchestrator v2.0 | |
| - Bedrock (priority) + Gemini (fallback) for pipeline generation | |
| - Bedrock LangChain (priority) + CrewAI (fallback) for execution | |
| - MongoDB session management | |
| - Complete REST API | |
| - Gradio UI with fancy displays | |
| """ | |
| import os | |
| import json | |
| import uuid | |
| from datetime import datetime | |
| from typing import List, Optional | |
| import gradio as gr | |
| from fastapi import FastAPI | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from contextlib import asynccontextmanager | |
| import asyncio | |
| # Import our new services | |
| from services.pipeline_generator import generate_pipeline, format_pipeline_for_display | |
| from services.pipeline_executor import execute_pipeline_streaming | |
| from services.session_manager import session_manager | |
| from services.intent_classifier import intent_classifier | |
| from api_routes import router as api_router | |
| from api_routes_v2 import router as api_router_v2 | |
| # V3 Architecture managers | |
| from services.pipeline_manager import get_pipeline_manager | |
| from services.workflow_manager import get_workflow_manager | |
| # ======================== | |
| # BACKGROUND CLEANUP TASK | |
| # ======================== | |
| async def periodic_cleanup(): | |
| """Cleanup old sessions every hour""" | |
| while True: | |
| await asyncio.sleep(3600) # Run every hour | |
| try: | |
| removed = session_manager.cleanup_old_sessions(max_age_hours=24) | |
| if removed > 0: | |
| print(f"π§Ή Cleaned up {removed} inactive sessions") | |
| except Exception as e: | |
| print(f"β οΈ Cleanup error: {e}") | |
| async def lifespan(app: FastAPI): | |
| """Manage application lifecycle""" | |
| # Startup | |
| print("π Starting MasterLLM v2.0...") | |
| task = asyncio.create_task(periodic_cleanup()) | |
| yield | |
| # Shutdown | |
| task.cancel() | |
| session_manager.close() | |
| print("π MasterLLM shut down gracefully") | |
| # ======================== | |
| # FASTAPI APP | |
| # ======================== | |
| app = FastAPI( | |
| title="MasterLLM v2.0 - AI Pipeline Orchestrator", | |
| description="Bedrock + Gemini fallback system with MongoDB sessions", | |
| version="2.0.0", | |
| lifespan=lifespan | |
| ) | |
| # CORS Configuration | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=[os.getenv("FRONTEND_ORIGIN", "http://localhost:3000")], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Mount API routes | |
| app.include_router(api_router) # V1 API (legacy) | |
| app.include_router(api_router_v2) # V2 API (enhanced with intent classification) | |
| # ======================== | |
| # CONVERSATION STATE | |
| # ======================== | |
| class ConversationState: | |
| INITIAL = "initial" | |
| PIPELINE_PROPOSED = "pipeline_proposed" | |
| PIPELINE_APPROVED = "pipeline_approved" | |
| EXECUTING = "executing" | |
| COMPLETED = "completed" | |
| ERROR = "error" | |
| # ======================== | |
| # GRADIO UI HANDLERS | |
| # ======================== | |
| def create_new_session(): | |
| """Create a new session""" | |
| return session_manager.create_session() | |
| def handle_file_upload(file_path, session_id): | |
| """ | |
| Handle file upload with V3 architecture integration | |
| - Uploads file to S3 | |
| - Creates record in files collection | |
| - Generates presigned URL | |
| - Stores file metadata in MongoDB | |
| """ | |
| if not file_path: | |
| return None, json.dumps({ | |
| "status": "error", | |
| "message": "No file uploaded" | |
| }, indent=2), session_id | |
| if not session_id: | |
| session_id = create_new_session() | |
| file_name = os.path.basename(file_path) | |
| try: | |
| # V3: Upload to S3 and create file record | |
| from services.s3_manager import get_s3_manager | |
| from services.schemas import FileSchema, schema_to_dict | |
| from datetime import datetime | |
| s3 = get_s3_manager() | |
| file_id = str(uuid.uuid4()) | |
| file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0 | |
| # Determine MIME type | |
| mime_type = "application/pdf" | |
| if file_path.lower().endswith(('.png', '.jpg', '.jpeg')): | |
| mime_type = "image/jpeg" | |
| elif file_path.lower().endswith('.gif'): | |
| mime_type = "image/gif" | |
| # Upload to S3 | |
| s3_key = f"sessions/{session_id}/files/{file_id}_{file_name}" | |
| with open(file_path, 'rb') as f: | |
| s3_result = s3.upload_file( | |
| key=s3_key, | |
| file_obj=f, | |
| content_type=mime_type, | |
| add_prefix=False | |
| ) | |
| # Generate presigned URL (7-day max) | |
| presigned = s3.generate_presigned_url( | |
| s3_key, | |
| expires_in=604800, # 7 days | |
| add_prefix=False | |
| ) | |
| # Create file record in MongoDB | |
| mongodb_uri = os.getenv("MONGODB_URI") | |
| mongodb_db = os.getenv("MONGODB_DB", "masterllm") | |
| from pymongo import MongoClient | |
| mongo_client = MongoClient(mongodb_uri) | |
| files_collection = mongo_client[mongodb_db]["files"] | |
| file_record = FileSchema( | |
| file_id=file_id, | |
| session_id=session_id, | |
| uploaded_at=datetime.utcnow(), | |
| file_name=file_name, | |
| file_size=file_size, | |
| mime_type=mime_type, | |
| s3_bucket=s3.bucket_name, | |
| s3_key=s3_key, | |
| presigned_url=presigned["presigned_url"], | |
| presigned_expires_at=presigned["presigned_expires_at"] | |
| ) | |
| files_collection.insert_one(schema_to_dict(file_record)) | |
| print(f"β File uploaded to S3 and recorded: {file_id}") | |
| # Update session with file reference | |
| session_manager.update_session(session_id, { | |
| "current_file": file_path, # Keep local path for backward compatibility | |
| "current_file_id": file_id, # V3: file_id reference | |
| "state": ConversationState.INITIAL | |
| }) | |
| # Add system message with file info | |
| session_manager.add_message( | |
| session_id, | |
| "system", | |
| f"File uploaded: {file_name}", | |
| file_data={"file_id": file_id, "file_name": file_name} | |
| ) | |
| status = { | |
| "status": "success", | |
| "message": f"File '{file_name}' uploaded successfully", | |
| "file_info": { | |
| "file_id": file_id, | |
| "name": file_name, | |
| "size_bytes": file_size, | |
| "s3_key": s3_key, | |
| "presigned_url": presigned["presigned_url"], | |
| "expires_at": presigned["presigned_expires_at"] | |
| }, | |
| "next_action": "π¬ Now tell me what you'd like to do with this document" | |
| } | |
| return file_path, json.dumps(status, indent=2), session_id | |
| except Exception as e: | |
| # Fallback to old behavior if V3 integration fails | |
| print(f"β οΈ File upload V3 integration failed: {e}") | |
| print(f" Falling back to local file path") | |
| # Update session with local path only | |
| session_manager.update_session(session_id, { | |
| "current_file": file_path, | |
| "state": ConversationState.INITIAL | |
| }) | |
| # Add system message | |
| session_manager.add_message( | |
| session_id, | |
| "system", | |
| f"File uploaded: {file_name}" | |
| ) | |
| status = { | |
| "status": "partial_success", | |
| "message": f"File '{file_name}' uploaded locally (S3 upload failed)", | |
| "file_info": { | |
| "name": file_name, | |
| "path": file_path, | |
| "size_bytes": os.path.getsize(file_path) if os.path.exists(file_path) else 0 | |
| }, | |
| "warning": str(e), | |
| "next_action": "π¬ Now tell me what you'd like to do with this document" | |
| } | |
| return file_path, json.dumps(status, indent=2), session_id | |
| def format_chat_history(history, new_user_msg, new_assistant_msg): | |
| """ | |
| Convert chat history to new Gradio format (list of dicts with role/content) | |
| Handles both old format (tuples) and new format (dicts) | |
| """ | |
| messages = [] | |
| # Handle existing history - could be in old or new format | |
| if history: | |
| # Check if already in new format (list of dicts with 'role' and 'content') | |
| if isinstance(history[0], dict) and 'role' in history[0]: | |
| # Already in new format, just copy it | |
| messages = list(history) | |
| else: | |
| # Old format (list of tuples), convert it | |
| for item in history: | |
| if isinstance(item, (list, tuple)) and len(item) == 2: | |
| user_msg, bot_msg = item | |
| messages.append({"role": "user", "content": user_msg}) | |
| messages.append({"role": "assistant", "content": bot_msg}) | |
| # Add new messages | |
| messages.append({"role": "user", "content": new_user_msg}) | |
| messages.append({"role": "assistant", "content": new_assistant_msg}) | |
| return messages | |
| def chatbot_response_streaming(message: str, history: List, session_id: str, file_path: str = None): | |
| """ | |
| Handle chat messages with streaming updates | |
| Uses intent classification + Bedrock (priority) β Gemini (fallback) for both generation and execution | |
| """ | |
| # Get or create session | |
| session = session_manager.get_session(session_id) | |
| if not session: | |
| session_id = create_new_session() | |
| session = session_manager.get_session(session_id) | |
| # Update file path if provided | |
| if file_path: | |
| session_manager.update_session(session_id, {"current_file": file_path}) | |
| session = session_manager.get_session(session_id) | |
| # Add user message to session | |
| session_manager.add_message(session_id, "user", message) | |
| current_state = session.get("state", ConversationState.INITIAL) | |
| # ======================== | |
| # CLASSIFY USER INTENT | |
| # ======================== | |
| intent_data = intent_classifier.classify_intent(message) | |
| # ======================== | |
| # HANDLE CASUAL CHAT & QUESTIONS | |
| # ======================== | |
| if intent_data["intent"] in ["casual_chat", "question"] and current_state == ConversationState.INITIAL: | |
| friendly_response = intent_classifier.get_friendly_response(intent_data["intent"], message) | |
| session_manager.add_message(session_id, "assistant", friendly_response) | |
| yield format_chat_history(history, message, friendly_response) | |
| return | |
| # ======================== | |
| # HANDLE UNCLEAR INTENT | |
| # ======================== | |
| if intent_data["intent"] == "unclear" and current_state == ConversationState.INITIAL: | |
| friendly_response = intent_classifier.get_friendly_response("unclear", message) | |
| session_manager.add_message(session_id, "assistant", friendly_response) | |
| yield format_chat_history(history, message, friendly_response) | |
| return | |
| # ======================== | |
| # HANDLE WORKFLOW SAVE REQUEST (V3) | |
| # ======================== | |
| pending_workflow = session.get("pending_workflow_save") | |
| if pending_workflow and any(phrase in message.lower() for phrase in ["save", "yes", "sure", "ok"]): | |
| try: | |
| from services.s3_manager import get_s3_manager | |
| pipeline_mgr = get_pipeline_manager() | |
| workflow_mgr = get_workflow_manager() | |
| s3 = get_s3_manager() | |
| # V3: Get full pipeline document from S3 using pipeline_id | |
| pipeline_id = pending_workflow.get("pipeline_id") | |
| pipeline_doc = pipeline_mgr.get_full_pipeline_document(pipeline_id) | |
| if pipeline_doc and pipeline_doc.get("definition"): | |
| # Extract definition from pipeline document | |
| pipeline_def = pipeline_doc["definition"] | |
| # Save as workflow with source tracking | |
| workflow_id = workflow_mgr.save_workflow( | |
| session_id=session_id, | |
| pipeline_definition=pipeline_def, | |
| user_message=message, | |
| source_pipeline_id=pipeline_id, | |
| pipeline_status=pipeline_doc.get("status", "unknown") | |
| ) | |
| # Clear pending | |
| session_manager.update_session(session_id, {"pending_workflow_save": None}) | |
| response = f"β **Workflow Saved!**\n\nWorkflow ID: `{workflow_id}`\nName: {pending_workflow['pipeline_name']}\nSource Pipeline: `{pipeline_id[:8]}...`\n\nYou can now reuse this workflow anytime!\n\nWhat else can I help you with?" | |
| session_manager.add_message(session_id, "assistant", response) | |
| yield format_chat_history(history, message, response) | |
| return | |
| else: | |
| # Pipeline document not found | |
| session_manager.update_session(session_id, {"pending_workflow_save": None}) | |
| response = "β οΈ Sorry, I couldn't find the pipeline to save. The workflow save request has expired.\n\nWhat else can I help you with?" | |
| session_manager.add_message(session_id, "assistant", response) | |
| yield format_chat_history(history, message, response) | |
| return | |
| except Exception as e: | |
| session_manager.update_session(session_id, {"pending_workflow_save": None}) | |
| response = f"β Failed to save workflow: {str(e)}\n\nWhat would you like to do next?" | |
| session_manager.add_message(session_id, "assistant", response) | |
| yield format_chat_history(history, message, response) | |
| return | |
| # ======================== | |
| # STATE: INITIAL - Generate Pipeline ONLY if intent requires it | |
| # ======================== | |
| if current_state == ConversationState.INITIAL: | |
| # Only generate pipeline if user explicitly requested it | |
| if not intent_data.get("requires_pipeline", False): | |
| # Not a pipeline request - give friendly response | |
| friendly_response = "I'm here to help process documents! Please tell me what you'd like to do with your document.\n\nFor example:\n- 'extract text and summarize'\n- 'get tables from pages 2-5'\n- 'translate to Spanish'\n\nType 'help' to see all capabilities!" | |
| session_manager.add_message(session_id, "assistant", friendly_response) | |
| yield format_chat_history(history, message, friendly_response) | |
| return | |
| # Check if file is uploaded | |
| if not session.get("current_file"): | |
| response_text = "π Please upload a document first before I can process it!\n\nClick the 'Upload Document' button to get started." | |
| session_manager.add_message(session_id, "assistant", response_text) | |
| yield format_chat_history(history, message, response_text) | |
| return | |
| try: | |
| # Generate pipeline using Bedrock β Gemini fallback | |
| yield format_chat_history(history, message, "π€ Analyzing your request and creating a pipeline...\nβ³ This will take just a moment...") | |
| pipeline = generate_pipeline( | |
| user_input=message, | |
| file_path=session.get("current_file"), | |
| prefer_bedrock=True | |
| ) | |
| # V3: Create pipeline_id and upload to S3 | |
| pipeline_id = str(uuid.uuid4()) | |
| # Build initial pipeline document | |
| pipeline_doc = { | |
| "pipeline_id": pipeline_id, | |
| "session_id": session_id, | |
| "pipeline_name": pipeline.get("pipeline_name"), | |
| "status": "proposed", | |
| "created_at": datetime.utcnow().isoformat() + "Z", | |
| "created_by_message": message, | |
| "definition": pipeline, | |
| "execution": None, | |
| "results": None | |
| } | |
| # Upload to S3 | |
| from services.s3_manager import get_s3_manager | |
| s3 = get_s3_manager() | |
| pipeline_s3_key = f"sessions/{session_id}/pipelines/{pipeline_id}.json" | |
| s3.upload_json(pipeline_s3_key, pipeline_doc, add_prefix=False) | |
| # Create metadata in MongoDB | |
| from services.pipeline_manager import get_pipeline_manager | |
| pipeline_mgr = get_pipeline_manager() | |
| pipeline_mgr.create_pipeline_metadata( | |
| pipeline_id=pipeline_id, | |
| session_id=session_id, | |
| pipeline_name=pipeline.get("pipeline_name"), | |
| s3_key=pipeline_s3_key, | |
| status="proposed", | |
| created_by_message=message | |
| ) | |
| # Update session with reference only (not full pipeline) | |
| session_manager.update_session(session_id, { | |
| "current_pipeline_id": pipeline_id, | |
| "current_pipeline_s3_key": pipeline_s3_key, | |
| "state": ConversationState.PIPELINE_PROPOSED | |
| }) | |
| # Create user-friendly display | |
| pipeline_name = pipeline.get("pipeline_name", "Document Processing") | |
| steps_list = pipeline.get("pipeline_steps", []) | |
| steps_summary = "\n".join([f" {i+1}. **{step.get('tool', 'Unknown')}**" for i, step in enumerate(steps_list)]) | |
| friendly_display = f"""π― **Pipeline Created: {pipeline_name}** | |
| Here's what I'll do: | |
| {steps_summary} | |
| **Ready to proceed?** | |
| - Type **'approve'** or **'yes'** to execute | |
| - Type **'reject'** or **'no'** to cancel | |
| - Describe changes to modify the plan""" | |
| # Add technical details in collapsible format | |
| response_text = friendly_display + f"\n\n<details>\n<summary>π Technical Details (for developers)</summary>\n\n```json\n{json.dumps(pipeline, indent=2)}\n```\n</details>" | |
| session_manager.add_message(session_id, "assistant", response_text) | |
| yield format_chat_history(history, message, response_text) | |
| return | |
| except Exception as e: | |
| error_response = f"β **Oops!** I encountered an error while creating the pipeline:\n\n{str(e)}\n\nPlease try rephrasing your request or type 'help' for examples." | |
| session_manager.add_message(session_id, "assistant", error_response) | |
| yield format_chat_history(history, message, error_response) | |
| return | |
| # ======================== | |
| # STATE: PIPELINE_PROPOSED - Handle Approval/Rejection | |
| # ======================== | |
| elif current_state == ConversationState.PIPELINE_PROPOSED: | |
| user_input = message.lower().strip() | |
| # APPROVE - Execute the pipeline | |
| if "approve" in user_input or "yes" in user_input: | |
| session_manager.update_session(session_id, {"state": ConversationState.EXECUTING}) | |
| # V3: Get pipeline references from session | |
| pipeline_s3_key = session.get("current_pipeline_s3_key") | |
| pipeline_id = session.get("current_pipeline_id") | |
| # Download pipeline from S3 | |
| from services.s3_manager import get_s3_manager | |
| s3 = get_s3_manager() | |
| from services.pipeline_manager import get_pipeline_manager | |
| pipeline_mgr = get_pipeline_manager() | |
| if pipeline_s3_key: | |
| try: | |
| pipeline_doc = s3.download_json(pipeline_s3_key, add_prefix=False) | |
| plan = pipeline_doc["definition"] | |
| except Exception as e: | |
| print(f"β οΈ Failed to download pipeline from S3: {e}") | |
| plan = {} | |
| pipeline_id = session_id | |
| else: | |
| # Fallback for old sessions without S3 storage | |
| plan = session.get("proposed_pipeline", {}) | |
| pipeline_id = session_id | |
| # Update pipeline status to executing | |
| if pipeline_s3_key and pipeline_doc: | |
| pipeline_doc["status"] = "executing" | |
| pipeline_doc["execution"] = { | |
| "started_at": datetime.utcnow().isoformat() + "Z", | |
| "executor": "unknown", | |
| "components_status": [] | |
| } | |
| s3.upload_json(pipeline_s3_key, pipeline_doc, add_prefix=False) | |
| pipeline_mgr.update_pipeline_status(pipeline_id, "executing") | |
| execution_id = pipeline_id | |
| # Initial status - User-friendly | |
| initial_message = f"β **Approved!** Starting execution of: **{plan.get('pipeline_name', 'pipeline')}**\n\nπ Processing... please wait...\n_(Using {plan.get('_generator', 'AI')} - {plan.get('_model', 'model')})_" | |
| yield format_chat_history(history, message, initial_message) | |
| steps_completed = [] | |
| final_payload = None | |
| executor_used = "unknown" | |
| progress_messages = [] | |
| try: | |
| # Execute pipeline with Bedrock β CrewAI fallback | |
| for event in execute_pipeline_streaming( | |
| pipeline=plan, | |
| file_path=session.get("current_file"), | |
| session_id=session_id, | |
| prefer_bedrock=True | |
| ): | |
| event_type = event.get("type") | |
| # Info events (fallback notifications, etc.) | |
| if event_type == "info": | |
| info_message = f"βΉοΈ {event.get('message')}\n_(Executor: {event.get('executor', 'unknown')})_" | |
| progress_messages.append(info_message) | |
| accumulated_response = initial_message + "\n\n" + "\n".join(progress_messages) | |
| yield format_chat_history(history, message, accumulated_response) | |
| # Component status updates (V3) | |
| elif event_type == "component_status": | |
| step_info = { | |
| "step": event.get("step", 0), | |
| "component": event.get("component", "processing"), | |
| "status": event.get("status", "running"), | |
| "message": event.get("message", ""), | |
| "executor": event.get("executor", "unknown") | |
| } | |
| if "observation" in event: | |
| step_info["observation"] = event.get("observation") | |
| steps_completed.append(step_info) | |
| executor_used = event.get("executor", executor_used) | |
| # User-friendly status | |
| status_emoji = "β " if event.get('status') == 'completed' else "β³" | |
| component_msg = event.get('message', '') | |
| status_line = f"{status_emoji} Step {event.get('step', 0)}: {event.get('component', 'processing')}" | |
| if component_msg: | |
| status_line += f" - {component_msg}" | |
| progress_messages.append(status_line) | |
| current_progress = "\n".join(progress_messages[-5:]) # Last 5 steps | |
| accumulated = initial_message + "\n\n" + current_progress | |
| yield format_chat_history(history, message, accumulated) | |
| # Legacy step updates (backwards compatibility) | |
| elif event_type == "step": | |
| step_info = { | |
| "step": event.get("step", 0), | |
| "tool": event.get("tool", "processing"), | |
| "status": event.get("status", "running"), | |
| "executor": event.get("executor", "unknown") | |
| } | |
| # Add observation if available (tool output) | |
| if "observation" in event: | |
| step_info["observation"] = event.get("observation") | |
| # Add tool input if available | |
| if "input" in event: | |
| step_info["input"] = event.get("input") | |
| steps_completed.append(step_info) | |
| executor_used = event.get("executor", executor_used) | |
| # Create user-friendly progress message | |
| step_num = event.get('step', 0) | |
| tool_name = event.get('tool', 'processing') | |
| if event.get('status') == 'completed' and 'observation' in event: | |
| obs_preview = str(event.get('observation'))[:80] | |
| step_message = f"β **Step {step_num}:** {tool_name} - Completed!\n _Preview: {obs_preview}..._" | |
| elif event.get('status') == 'executing': | |
| step_message = f"β³ **Step {step_num}:** {tool_name} - Processing..." | |
| else: | |
| step_message = f"π **Step {step_num}:** {tool_name}" | |
| progress_messages.append(step_message) | |
| accumulated_response = initial_message + "\n\n" + "\n\n".join(progress_messages) | |
| yield format_chat_history(history, message, accumulated_response) | |
| # Final result | |
| elif event_type == "final": | |
| final_payload = event.get("data") | |
| executor_used = event.get("executor", executor_used) | |
| # Error | |
| elif event_type == "error": | |
| error_msg = event.get("error", "Unknown error") | |
| friendly_error = f"β **Pipeline Failed**\n\nError: {error_msg}\n\nCompleted {len(steps_completed)} step(s) before failure.\n\nWhat would you like to do next?" | |
| session_manager.update_session(session_id, {"state": ConversationState.INITIAL}) | |
| session_manager.add_message(session_id, "assistant", friendly_error) | |
| yield format_chat_history(history, message, friendly_error) | |
| return | |
| # Process final result | |
| # V3: Update pipeline document with results in S3 | |
| if pipeline_s3_key: | |
| try: | |
| pipeline_doc = s3.download_json(pipeline_s3_key, add_prefix=False) | |
| pipeline_doc["status"] = "completed" | |
| if pipeline_doc.get("execution"): | |
| pipeline_doc["execution"]["completed_at"] = datetime.utcnow().isoformat() + "Z" | |
| pipeline_doc["execution"]["executor"] = executor_used | |
| pipeline_doc["results"] = { | |
| "final_output_url": final_payload.get("final_output_url"), | |
| "final_output_expires_at": final_payload.get("final_output_expires_at"), | |
| "components_executed": final_payload.get("components_executed"), | |
| "last_node_output": final_payload.get("last_node_output"), | |
| "workflow_status": "completed" | |
| } | |
| s3.upload_json(pipeline_s3_key, pipeline_doc, add_prefix=False) | |
| # Update MongoDB metadata | |
| pipeline_mgr.update_pipeline_status( | |
| pipeline_id, | |
| "completed", | |
| final_output_url=final_payload.get("final_output_url"), | |
| final_output_expires_at=final_payload.get("final_output_expires_at") | |
| ) | |
| except Exception as e: | |
| print(f"β οΈ Failed to update pipeline document: {e}") | |
| # Update session state | |
| session_manager.update_session(session_id, { | |
| "state": ConversationState.INITIAL | |
| }) | |
| # V3: Store pending workflow save info with pipeline_id | |
| session_manager.update_session(session_id, { | |
| "pending_workflow_save": { | |
| "pipeline_id": pipeline_id, | |
| "pipeline_name": plan.get("pipeline_name", "Untitled") | |
| } | |
| }) | |
| # REMOVED: Deprecated save_pipeline_execution - now handled by pipeline_manager | |
| # Create user-friendly final response with workflow save prompt | |
| success_count = len([s for s in steps_completed if s.get("status") == "completed"]) | |
| # V3: Check if we have S3 final output URL | |
| final_output_url = final_payload.get("final_output_url", "") | |
| final_output_expires = final_payload.get("final_output_expires_at", "") | |
| # Build URL section | |
| url_section = "" | |
| if final_output_url: | |
| url_section = f"\nπ **Results Available:**\n- [Download All Outputs]({final_output_url})\n- Expires: {final_output_expires}\n" | |
| friendly_final = f"""π **Pipeline Completed Successfully!** | |
| **Summary:** | |
| - Pipeline: {plan.get('pipeline_name', 'Document Processing')} | |
| - Total Steps: {len(steps_completed)} | |
| - Successful: {success_count} | |
| - Executor: {executor_used} | |
| {url_section} | |
| πΎ **Would you like to save this workflow for future use?** | |
| Type **'save workflow'** or **'yes'** to save it. | |
| β All done! What else can I help you with? | |
| <details> | |
| <summary>π Detailed Results (for developers)</summary> | |
| ```json | |
| {json.dumps({"status": "completed", "executor": executor_used, "pipeline": plan.get("pipeline_name"), "result": final_payload, "steps": steps_completed}, indent=2)} | |
| ``` | |
| </details>""" | |
| final_response = friendly_final | |
| else: | |
| final_response = f"β **Pipeline Completed!**\n\nExecuted {len(steps_completed)} steps using {executor_used}.\n\nReady for your next task!" | |
| session_manager.update_session(session_id, {"state": ConversationState.INITIAL}) | |
| session_manager.add_message(session_id, "assistant", final_response) | |
| yield format_chat_history(history, message, final_response) | |
| return | |
| except Exception as e: | |
| friendly_error = f"β **Pipeline Execution Failed**\n\nError: {str(e)}\n\nCompleted {len(steps_completed)} step(s) before failure.\n\n<details>\n<summary>π Error Details</summary>\n\n```\n{str(e)}\n```\n</details>\n\nWould you like to try again with a different approach?" | |
| session_manager.update_session(session_id, {"state": ConversationState.INITIAL}) | |
| session_manager.add_message(session_id, "assistant", friendly_error) | |
| yield format_chat_history(history, message, friendly_error) | |
| return | |
| # REJECT - Cancel the pipeline | |
| elif "reject" in user_input or "no" in user_input: | |
| session_manager.update_session(session_id, { | |
| "state": ConversationState.INITIAL, | |
| "proposed_pipeline": None | |
| }) | |
| friendly_rejection = "π No problem! Pipeline cancelled.\n\nWhat else would you like me to help you with?" | |
| session_manager.add_message(session_id, "assistant", friendly_rejection) | |
| yield format_chat_history(history, message, friendly_rejection) | |
| return | |
| # EDIT - Request modifications | |
| elif "edit" in user_input or "modify" in user_input: | |
| current_pipeline = session.get("proposed_pipeline", {}) | |
| friendly_edit_help = f"""π **Edit Mode** | |
| Current pipeline: **{current_pipeline.get('pipeline_name', 'Unknown')}** | |
| Describe what you'd like to change. For example: | |
| - "Add summarization at the end" | |
| - "Remove table extraction" | |
| - "Only process pages 1-3" | |
| - "Translate to French instead of Spanish" | |
| Or type 'approve' to run the current plan as-is.""" | |
| session_manager.add_message(session_id, "assistant", friendly_edit_help) | |
| yield format_chat_history(history, message, friendly_edit_help) | |
| return | |
| # Try to modify pipeline based on user input | |
| else: | |
| if len(message.strip()) > 5: | |
| try: | |
| original_plan = session.get("proposed_pipeline", {}) | |
| edit_context = f"Original: {original_plan.get('pipeline_name')}. User wants: {message}" | |
| # Generate new pipeline with modification | |
| new_pipeline = generate_pipeline( | |
| user_input=edit_context, | |
| file_path=session.get("current_file"), | |
| prefer_bedrock=True | |
| ) | |
| session_manager.update_session(session_id, { | |
| "proposed_pipeline": new_pipeline, | |
| "state": ConversationState.PIPELINE_PROPOSED | |
| }) | |
| formatted = format_pipeline_for_display(new_pipeline) | |
| response = formatted + f"\n\n```json\n{json.dumps(new_pipeline, indent=2)}\n```" | |
| session_manager.add_message(session_id, "assistant", response) | |
| yield format_chat_history(history, message, response) | |
| return | |
| except Exception as e: | |
| error_response = { | |
| "status": "edit_failed", | |
| "error": str(e), | |
| "message": "Could not modify the plan", | |
| "action": "Try 'approve' to run as-is, or 'reject' to start over" | |
| } | |
| response = f"```json\n{json.dumps(error_response, indent=2)}\n```" | |
| session_manager.add_message(session_id, "assistant", response) | |
| yield format_chat_history(history, message, response) | |
| return | |
| # Default waiting message | |
| response_data = { | |
| "status": "waiting_for_confirmation", | |
| "message": "Please type 'approve', 'reject', or describe changes", | |
| "hint": "You can also say 'edit' for modification hints" | |
| } | |
| response = f"```json\n{json.dumps(response_data, indent=2)}\n```" | |
| session_manager.add_message(session_id, "assistant", response) | |
| yield format_chat_history(history, message, response) | |
| return | |
| # Default fallback | |
| response = json.dumps({"status": "ready", "message": "Ready for your next instruction"}, indent=2) | |
| session_manager.add_message(session_id, "assistant", response) | |
| yield format_chat_history(history, message, response) | |
| # ======================== | |
| # GRADIO UI | |
| # ======================== | |
| # Simple Blocks initialization for HF Spaces compatibility (older Gradio version) | |
| with gr.Blocks(title="MasterLLM v2.0 - AI Pipeline Orchestrator") as demo: | |
| gr.Markdown(""" | |
| # π€ MasterLLM v2.0 - AI Pipeline Orchestrator | |
| **π Bedrock Priority** with Gemini Fallback | **πΎ MongoDB Sessions** | **π‘ Complete REST API** | |
| Upload a document, describe what you want, and watch AI orchestrate the perfect pipeline! | |
| """) | |
| # State management | |
| session_id_state = gr.State(value=create_new_session()) | |
| file_state = gr.State(value=None) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| # Chat interface - Gradio auto-detects format from data structure | |
| chatbot = gr.Chatbot(label="Chat") | |
| # Text input | |
| msg = gr.Textbox( | |
| placeholder="π¬ Type your instruction... (e.g., 'extract text from pages 1-5 and summarize')", | |
| label="Your Message", | |
| lines=2, | |
| max_lines=4, | |
| ) | |
| with gr.Row(): | |
| submit_btn = gr.Button("π Send", variant="primary", scale=2) | |
| clear_btn = gr.Button("ποΈ Clear Chat", scale=1) | |
| with gr.Column(scale=1): | |
| # File upload section | |
| gr.Markdown("### π Upload Document") | |
| file_upload = gr.File( | |
| label="PDF or Image", | |
| file_types=[".pdf", ".png", ".jpg", ".jpeg", ".gif", ".bmp"], | |
| type="filepath", | |
| ) | |
| upload_status = gr.Textbox( | |
| label="π Upload Status", | |
| interactive=False, | |
| lines=10, | |
| max_lines=15, | |
| ) | |
| # Session info | |
| gr.Markdown("### π Session Info") | |
| session_display = gr.Textbox( | |
| label="Session ID", | |
| interactive=False, | |
| value=lambda: session_id_state.value[:8] + "...", | |
| ) | |
| # Examples | |
| gr.Markdown("### π‘ Example Pipelines") | |
| gr.Examples( | |
| examples=[ | |
| "extract text from pages 1-5", | |
| "extract text and summarize", | |
| "extract text, tables, and translate to Spanish", | |
| "get tables from pages 2-4 and summarize", | |
| "text-classify-ner from entire document", | |
| "describe images and summarize findings", | |
| "extract text, detect signatures and stamps", | |
| ], | |
| inputs=msg, | |
| ) | |
| # System info | |
| gr.Markdown(""" | |
| ### βΉοΈ System Features | |
| - β **Bedrock** (Claude 3.5 Sonnet) priority | |
| - β **Gemini** (gemini-2.0-flash) fallback | |
| - β **MongoDB** session persistence | |
| - β **Streaming** real-time updates | |
| - β **Component-level** JSON output | |
| - β **REST API** for integration | |
| ### π Pipeline Flow: | |
| 1. **Upload** your document | |
| 2. **Describe** what you want | |
| 3. **Review** AI-generated pipeline | |
| 4. **Approve** to execute | |
| 5. **Watch** streaming updates | |
| 6. **Get** complete JSON results | |
| """) | |
| # Event handlers | |
| file_upload.upload( | |
| fn=handle_file_upload, | |
| inputs=[file_upload, session_id_state], | |
| outputs=[file_state, upload_status, session_id_state], | |
| ) | |
| msg.submit( | |
| fn=chatbot_response_streaming, | |
| inputs=[msg, chatbot, session_id_state, file_state], | |
| outputs=[chatbot], | |
| ).then( | |
| lambda: "", | |
| outputs=msg, | |
| ) | |
| submit_btn.click( | |
| fn=chatbot_response_streaming, | |
| inputs=[msg, chatbot, session_id_state, file_state], | |
| outputs=[chatbot], | |
| ).then( | |
| lambda: "", | |
| outputs=msg, | |
| ) | |
| clear_btn.click( | |
| fn=lambda: ([], create_new_session(), None, None, "", ""), | |
| outputs=[chatbot, session_id_state, file_state, file_upload, msg, upload_status], | |
| ) | |
| # Mount Gradio on FastAPI | |
| app = gr.mount_gradio_app(app, demo, path="/") | |
| # ======================== | |
| # LAUNCH | |
| # ======================== | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.getenv("PORT", 7860)) | |
| print(f""" | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β β | |
| β π MasterLLM v2.0 Starting... β | |
| β β | |
| β π Gradio UI: http://localhost:{port} β | |
| β π‘ REST API: http://localhost:{port}/api/v1 β | |
| β π API Docs: http://localhost:{port}/docs β | |
| β β | |
| β π Bedrock: Priority (Claude 3.5 Sonnet) β | |
| β π Gemini: Fallback (gemini-2.0-flash) β | |
| β πΎ MongoDB: Session management β | |
| β β | |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| """) | |
| uvicorn.run(app, host="0.0.0.0", port=port) |