| """ |
| TodoAgent - OpenAI Agent orchestration with MCP tools. |
| |
| Per @specs/001-chatbot-mcp/plan.md Section VIII - AI Chatbot Architecture |
| OpenAI Agents SDK for orchestration with MCP tool calling. |
| """ |
| from openai import OpenAI |
| from typing import List, Dict, Any, Optional, AsyncGenerator |
| from uuid import UUID |
| import logging |
| import json |
|
|
| from config import settings |
| from mcp.server import get_mcp_server |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class TodoAgent: |
| """ |
| OpenAI Agent for Todo task management with MCP tool integration. |
| |
| This agent: |
| - Uses OpenAI Chat Completions API with tool calling |
| - Integrates with MCP tools for task operations |
| - Maintains conversation context for multi-turn dialogs |
| - Enforces user isolation by passing user_id to all tool calls |
| """ |
|
|
| |
| SYSTEM_PROMPT = """You are a helpful task management assistant for TaskFlow. Your role is to help users manage their todo tasks through natural language. |
| |
| Key behaviors: |
| - Be friendly and concise in your responses |
| - Focus on helping users create, view, complete, and delete tasks |
| - When users ask to add tasks, extract the task title and any description |
| - When users ask to see tasks, list their tasks clearly with numbers for easy reference |
| - When users ask to complete/delete/update tasks, ALWAYS call list_tasks FIRST to identify the task |
| - Users may refer to tasks by number (e.g., "first task", "task 2") or by title/content |
| - Use the available tools to perform all task operations |
| - Never make up task information - always use the tools |
| - The user_id parameter is automatically injected - do NOT ask the user for it |
| |
| Task identification workflow: |
| 1. When user wants to update/delete/complete a task, FIRST call list_tasks |
| 2. Show the user the tasks with numbers: "1. Task title", "2. Task title", etc. |
| 3. If user specified a number or title, match it to get the task_id |
| 4. Then call the appropriate tool (update_task, delete_task, complete_task) with the task_id |
| |
| Available tools: |
| - add_task: Create a new task with title and optional description |
| - list_tasks: Show all tasks with their IDs (call this FIRST before update/delete/complete) |
| - complete_task: Mark a task as completed (requires task_id from list_tasks) |
| - delete_task: Remove a task permanently (requires task_id from list_tasks) |
| - update_task: Change a task's title or description (requires task_id from list_tasks) |
| |
| Note: user_id is automatically provided for all tool calls. Never ask the user for their user ID.""" |
|
|
| def __init__(self, user_id: UUID): |
| """ |
| Initialize the TodoAgent for a specific user. |
| |
| Args: |
| user_id: The user's UUID for data isolation |
| """ |
| self.user_id = user_id |
|
|
| |
| key_preview = settings.openai_api_key[:10] if settings.openai_api_key else "None" |
| key_length = len(settings.openai_api_key) if settings.openai_api_key else 0 |
| logger.info(f"TodoAgent initializing with OpenAI API key: {key_preview}... (length: {key_length})") |
|
|
| self.client = OpenAI(api_key=settings.openai_api_key) |
| self.model = settings.openai_model |
|
|
| |
| self.mcp_server = get_mcp_server() |
| self.tools = self._get_openai_tools() |
|
|
| logger.info(f"TodoAgent initialized for user {user_id}") |
|
|
| def _get_openai_tools(self) -> List[Dict[str, Any]]: |
| """ |
| Convert MCP tools to OpenAI function format. |
| |
| Returns: |
| List of tool definitions in OpenAI format |
| |
| Per @specs/001-chatbot-mcp/contracts/mcp-tools.json |
| """ |
| tools = [] |
|
|
| |
| mcp_tools = self.mcp_server.list_tools() |
|
|
| for tool in mcp_tools: |
| |
| function_def = { |
| "type": "function", |
| "function": { |
| "name": tool.name, |
| "description": tool.description, |
| "parameters": tool.parameters.copy() |
| } |
| } |
|
|
| |
| |
| if "properties" not in function_def["function"]["parameters"]: |
| function_def["function"]["parameters"]["properties"] = {} |
|
|
| function_def["function"]["parameters"]["properties"]["user_id"] = { |
| "type": "string", |
| "description": "User ID for data isolation (auto-filled, do not ask user)", |
| } |
|
|
| |
| if "required" not in function_def["function"]["parameters"]: |
| function_def["function"]["parameters"]["required"] = [] |
|
|
| |
| |
|
|
| tools.append(function_def) |
|
|
| logger.info(f"Registered {len(tools)} tools with OpenAI agent") |
| return tools |
|
|
| async def process_message( |
| self, |
| user_message: str, |
| conversation_history: Optional[List[Dict[str, str]]] = None |
| ) -> AsyncGenerator[str, None]: |
| """ |
| Process a user message through the agent with tool execution. |
| |
| Args: |
| user_message: The user's message text |
| conversation_history: Previous messages in the conversation |
| |
| Yields: |
| Response text chunks as they're generated |
| |
| Per @specs/001-chatbot-mcp/plan.md - MCP First with OpenAI Agents |
| """ |
| |
| messages = [ |
| {"role": "system", "content": self.SYSTEM_PROMPT} |
| ] |
|
|
| |
| if conversation_history: |
| messages.extend(conversation_history) |
|
|
| |
| messages.append({"role": "user", "content": user_message}) |
|
|
| logger.info(f"Processing message for user {self.user_id}: {user_message[:50]}...") |
|
|
| |
| |
| update_delete_keywords = ["update", "delete", "remove", "change", "modify", "complete", "finish", "mark"] |
| needs_task_list = any(keyword in user_message.lower() for keyword in update_delete_keywords) |
|
|
| if needs_task_list: |
| |
| list_tool = self.mcp_server.get_tool("list_tasks") |
| if list_tool: |
| tasks_result = await list_tool.handler(user_id=str(self.user_id), include_completed=True) |
| if tasks_result.get("success") and tasks_result.get("tasks"): |
| |
| task_list = "\n".join([ |
| f"Task {i+1}: ID={t['id']}, Title='{t['title']}', Completed={t['completed']}" |
| for i, t in enumerate(tasks_result["tasks"]) |
| ]) |
| enhanced_prompt = self.SYSTEM_PROMPT + f"\n\nCURRENT USER TASKS:\n{task_list}\n\nWhen user refers to a task by number or title, use the corresponding ID from this list." |
| messages[0] = {"role": "system", "content": enhanced_prompt} |
| logger.info(f"Pre-loaded {len(tasks_result['tasks'])} tasks for context") |
|
|
| try: |
| |
| response = self.client.chat.completions.create( |
| model=self.model, |
| messages=messages, |
| tools=self.tools, |
| tool_choice="auto", |
| temperature=0.7, |
| max_tokens=1000, |
| ) |
|
|
| |
| choice = response.choices[0] |
| message = choice.message |
|
|
| |
| if message.tool_calls: |
| |
| tool_messages = [] |
| for tool_call in message.tool_calls: |
| result = await self._execute_tool_call(tool_call) |
| |
| tool_messages.append({ |
| "role": "tool", |
| "tool_call_id": tool_call.id, |
| "content": result |
| }) |
|
|
| |
| messages.append({ |
| "role": "assistant", |
| "content": message.content or "", |
| "tool_calls": message.tool_calls |
| }) |
|
|
| |
| messages.extend(tool_messages) |
|
|
| |
| follow_up = self.client.chat.completions.create( |
| model=self.model, |
| messages=messages, |
| temperature=0.7, |
| max_tokens=1000, |
| ) |
|
|
| if follow_up.choices[0].message.content: |
| yield follow_up.choices[0].message.content |
|
|
| |
| elif message.content: |
| yield message.content |
|
|
| else: |
| yield "I understand. How can I help you with your tasks?" |
|
|
| except Exception as e: |
| error_type = type(e).__name__ |
| error_msg = str(e) |
| logger.error(f"Error processing message: {error_type}: {error_msg}", exc_info=True) |
|
|
| |
| if "Connection" in error_msg or "connect" in error_msg.lower(): |
| yield "I'm having trouble connecting to my AI service. Please check if the OpenAI API key is configured correctly in Railway environment variables." |
| elif "401" in error_msg or "Unauthorized" in error_msg or "authentication" in error_msg.lower(): |
| yield "My AI service credentials are invalid. Please check the OpenAI API key in Railway environment variables." |
| elif "rate" in error_msg.lower() or "limit" in error_msg.lower(): |
| yield "I've reached my rate limit. Please try again in a moment." |
| else: |
| yield f"I encountered an error ({error_type}): {error_msg}" |
|
|
| async def _execute_tool_call(self, tool_call) -> str: |
| """ |
| Execute a single tool call from OpenAI. |
| |
| Args: |
| tool_call: The OpenAI tool call object |
| |
| Returns: |
| Result message to display to user |
| |
| Per @specs/001-chatbot-mcp/plan.md - MCP First architecture |
| """ |
| function_name = tool_call.function.name |
| function_args = json.loads(tool_call.function.arguments) |
|
|
| |
| function_args["user_id"] = str(self.user_id) |
|
|
| logger.info(f"Executing tool: {function_name} with args: {function_args}") |
|
|
| try: |
| |
| tool = self.mcp_server.get_tool(function_name) |
|
|
| if not tool: |
| return f"Error: Tool '{function_name}' not found" |
|
|
| |
| if function_name in ["update_task", "delete_task", "complete_task"]: |
| task_id = function_args.get("task_id") |
| if task_id: |
| |
| list_tool = self.mcp_server.get_tool("list_tasks") |
| tasks_result = await list_tool.handler(user_id=str(self.user_id), include_completed=True) |
| valid_task_ids = [t["id"] for t in tasks_result.get("tasks", [])] |
|
|
| if task_id not in valid_task_ids: |
| |
| |
| if tasks_result.get("tasks"): |
| task_list = "\n".join([ |
| f"Task {i+1}: {t['title']}" |
| for i, t in enumerate(tasks_result["tasks"]) |
| ]) |
| return f"Error: Task not found. Here are your current tasks:\n{task_list}\n\nPlease specify which task you'd like to {function_name.replace('_', ' ')}." |
| else: |
| return "Error: You don't have any tasks yet. Create some tasks first!" |
|
|
| |
| result = await tool.handler(**function_args) |
|
|
| |
| if isinstance(result, dict): |
| if result.get("success"): |
| |
| if function_name == "add_task": |
| return f"β Task '{result.get('title')}' created successfully!" |
| elif function_name == "complete_task": |
| return f"β Task '{result.get('title')}' marked as complete!" |
| elif function_name == "delete_task": |
| return f"β Task '{result.get('title')}' deleted." |
| elif function_name == "update_task": |
| return f"β Task updated successfully!" |
| elif function_name == "list_tasks": |
| tasks = result.get("tasks", []) |
| count = result.get("count", 0) |
| if count == 0: |
| return "You don't have any tasks yet." |
| |
| task_list = "\n".join([ |
| f"{i+1}. {t['title']}" + (" β" if t['completed'] else "") |
| for i, t in enumerate(tasks) |
| ]) |
| return f"You have {count} task(s):\n{task_list}\n\nYou can refer to tasks by number (e.g., \"complete task 1\")" |
| else: |
| return "Operation completed successfully!" |
| else: |
| return f"Error: {result.get('error', 'Unknown error')}" |
|
|
| return str(result) |
|
|
| except Exception as e: |
| logger.error(f"Error executing tool {function_name}: {e}") |
| return f"Error executing {function_name}: {str(e)}" |
|
|
|
|
| def create_todo_agent(user_id: UUID) -> TodoAgent: |
| """ |
| Factory function to create a TodoAgent instance. |
| |
| Args: |
| user_id: The user's UUID |
| |
| Returns: |
| Initialized TodoAgent instance |
| """ |
| return TodoAgent(user_id) |
|
|