TaskFlow / src /services /agent_service.py
BilalCode's picture
taskflow todo app
310260a
"""
Agent Service Module
This module provides the AgentService class that orchestrates AI agent interactions.
The service wraps MCP tools with user_id pre-bound for security and data isolation.
"""
from typing import List, Dict, Any, Optional
import json
import time
from openai import OpenAI
from ..config import settings
from ..tools.mcp_server import mcp_server, MCPContext
from ..tools import (
get_list_tasks_definition,
get_create_task_definition,
get_mark_complete_definition,
get_update_task_definition,
get_delete_task_definition,
get_get_task_definition
)
from ..database import engine
class AgentService:
"""
Agent Service for processing conversational task management requests.
This service:
1. Initializes OpenAI client with configured API key
2. Wraps MCP tools with user_id pre-bound for security
3. Processes user messages with conversation history
4. Returns agent responses with tool call metadata
"""
def __init__(self, user_id: int):
"""
Initialize AgentService for a specific user.
Args:
user_id: Authenticated user ID for data scoping
"""
self.user_id = user_id
# Configure client for OpenRouter if base URL is provided
client_kwargs = {"api_key": settings.OPENAI_API_KEY}
if hasattr(settings, 'OPENAI_BASE_URL') and settings.OPENAI_BASE_URL:
client_kwargs["base_url"] = settings.OPENAI_BASE_URL
self.client = OpenAI(**client_kwargs)
self.model = settings.OPENAI_MODEL
self.mcp_context = mcp_server.create_context(user_id=user_id)
def create_user_scoped_tools(self) -> List[Dict[str, Any]]:
"""
Create user-scoped tool definitions for OpenAI function calling.
This method wraps internal MCP tools with user_id pre-bound,
ensuring the agent can only access the authenticated user's data.
Returns:
List of tool definitions in OpenAI function calling format
"""
tools = [
get_list_tasks_definition(),
get_create_task_definition(),
get_mark_complete_definition(),
get_update_task_definition(),
get_delete_task_definition(),
get_get_task_definition()
]
return tools
async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute an MCP tool with the given arguments.
Args:
tool_name: Name of the tool to execute
arguments: Tool arguments (user_id is pre-bound from context)
Returns:
Tool execution result
Raises:
ValueError: If tool not found
"""
# Get tool function from MCP server
tool_func = mcp_server.get_tool(tool_name)
if not tool_func:
raise ValueError(f"Tool '{tool_name}' not found")
# Execute tool with user-scoped context
try:
result = await tool_func(self.mcp_context, **arguments)
return result
except Exception as e:
print(f"Error executing tool '{tool_name}': {str(e)}")
return {
"status": "error",
"error": str(e)
}
async def process_message(
self,
message: str,
conversation_history: List[Dict[str, str]]
) -> Dict[str, Any]:
"""
Process user message with conversation history and return agent response.
This method implements the full OpenAI function calling workflow:
1. Send message with tool definitions to OpenAI
2. If model calls tools, execute them
3. Send tool results back to model
4. Return final natural language response
Args:
message: User's natural language input
conversation_history: List of previous messages in conversation
Returns:
Dict containing:
- content: Agent's natural language response
- tool_calls: List of tool invocations with results (if any)
- model: Model used for generation
Raises:
Exception: If OpenAI API call fails
"""
# Build messages array for OpenAI API
messages = []
# Add system message for agent behavior
system_prompt = (
"You are a helpful task management assistant. "
"You help users manage their tasks through natural conversation.\n\n"
"**Task Creation Intent Recognition:**\n"
"When users express intent to create a task, use the create_task tool. Examples:\n"
"- 'remind me to X' → create_task(title='X')\n"
"- 'add task X' → create_task(title='X')\n"
"- 'I need to X' → create_task(title='X')\n"
"- 'create a task for X' → create_task(title='X')\n"
"- 'don't let me forget to X' → create_task(title='X')\n\n"
"**Task Listing Intent Recognition:**\n"
"When users want to see their tasks, use the list_tasks tool. Examples:\n"
"- 'show my tasks' → list_tasks()\n"
"- 'what do I need to do' → list_tasks()\n"
"- 'list my todos' → list_tasks()\n"
"- 'what are my tasks' → list_tasks()\n"
"- 'show me my task list' → list_tasks()\n\n"
"**Task Completion Intent Recognition:**\n"
"When users indicate they finished a task, use mark_complete tool. Examples:\n"
"- 'I finished X' → list_tasks() to find task, then mark_complete(task_id)\n"
"- 'mark X as done' → list_tasks() to find task, then mark_complete(task_id)\n"
"- 'I completed the groceries task' → list_tasks() to find task, then mark_complete(task_id)\n"
"- 'done with X' → list_tasks() to find task, then mark_complete(task_id)\n\n"
"**Task Update Intent Recognition:**\n"
"When users want to modify a task, use update_task tool. Examples:\n"
"- 'change X to Y' → list_tasks() to find task, then update_task(task_id, title='Y')\n"
"- 'update task X' → list_tasks() to find task, then update_task(task_id, ...)\n"
"- 'rename X to Y' → list_tasks() to find task, then update_task(task_id, title='Y')\n"
"- 'add details to X' → list_tasks() to find task, then update_task(task_id, description='...')\n\n"
"**Task Deletion Intent Recognition:**\n"
"When users want to remove a task, use delete_task tool. Examples:\n"
"- 'delete X' → list_tasks() to find task, then delete_task(task_id)\n"
"- 'remove the task' → list_tasks() to find task, then delete_task(task_id)\n"
"- 'cancel X' → list_tasks() to find task, then delete_task(task_id)\n"
"- 'get rid of X' → list_tasks() to find task, then delete_task(task_id)\n\n"
"**Multi-Step Operations:**\n"
"For operations that reference tasks by title or description (not ID):\n"
"1. First call list_tasks() to get all tasks\n"
"2. Identify the matching task from the list\n"
"3. Use the task's ID for the operation (mark_complete, update_task, delete_task)\n"
"4. If multiple tasks match, ask the user to clarify which one\n"
"5. If no tasks match, inform the user the task wasn't found\n\n"
"**Context Awareness:**\n"
"- Remember previous messages in the conversation\n"
"- When users say 'the first task', 'the second one', 'that task', refer to recently listed tasks\n"
"- Maintain context across multiple turns\n"
"- If context is unclear, ask clarifying questions\n\n"
"**Response Guidelines:**\n"
"- Always confirm actions taken (e.g., 'I've added X to your tasks')\n"
"- Format task lists in a readable way (use bullet points or numbered lists)\n"
"- Show completed vs incomplete tasks clearly\n"
"- Be concise, friendly, and conversational\n"
"- If no tasks exist, provide an encouraging message\n"
"- If a request is ambiguous, ask clarifying questions\n"
"- When multiple tasks match a reference, list them and ask which one\n"
"- Provide helpful error messages when operations fail"
)
messages.append({
"role": "system",
"content": system_prompt
})
# Add conversation history
for msg in conversation_history:
messages.append({
"role": msg.get("role", "user"),
"content": msg.get("content", "")
})
# Add current user message
messages.append({
"role": "user",
"content": message
})
# Get user-scoped tools
tools = self.create_user_scoped_tools()
# Track tool calls for response metadata
executed_tool_calls = []
# Call OpenAI API with retry logic
max_retries = 3
retry_delay = 1 # seconds
try:
if tools:
# First API call with tools (with retry)
for attempt in range(max_retries):
try:
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=tools,
temperature=0.3,
max_tokens=500
)
break # Success, exit retry loop
except Exception as api_error:
if attempt < max_retries - 1:
# Retry on transient errors
error_str = str(api_error).lower()
if any(keyword in error_str for keyword in ['timeout', 'connection', 'rate_limit', '429', '503', '502']):
print(f"OpenAI API error (attempt {attempt + 1}/{max_retries}): {api_error}. Retrying in {retry_delay}s...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
continue
# Non-retryable error or max retries reached
raise
assistant_message = response.choices[0].message
# Check if model wants to call tools
if assistant_message.tool_calls:
# Execute each tool call
for tool_call in assistant_message.tool_calls:
tool_name = tool_call.function.name
# Handle None or empty arguments from OpenRouter
tool_args_str = tool_call.function.arguments
if tool_args_str is None or tool_args_str == "":
tool_args = {}
else:
tool_args = json.loads(tool_args_str)
# Execute tool and track timing
start_time = time.time()
tool_result = await self.execute_tool(tool_name, tool_args)
duration_ms = int((time.time() - start_time) * 1000)
# Store tool call metadata
executed_tool_calls.append({
"tool": tool_name,
"parameters": tool_args,
"result": tool_result,
"duration_ms": duration_ms
})
# Add tool call and result to messages for next API call
messages.append({
"role": "assistant",
"content": None,
"tool_calls": [{
"id": tool_call.id,
"type": "function",
"function": {
"name": tool_name,
"arguments": tool_call.function.arguments
}
}]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(tool_result)
})
# Second API call to get natural language response
final_response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.3,
max_tokens=500
)
final_content = final_response.choices[0].message.content or ""
return {
"content": final_content,
"tool_calls": executed_tool_calls if executed_tool_calls else None,
"model": self.model,
"finish_reason": final_response.choices[0].finish_reason
}
else:
# No tool calls, return direct response
content = assistant_message.content or ""
return {
"content": content,
"tool_calls": None,
"model": self.model,
"finish_reason": response.choices[0].finish_reason
}
else:
# No tools available, call without tools
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.3,
max_tokens=500
)
content = response.choices[0].message.content or ""
return {
"content": content,
"tool_calls": None,
"model": self.model,
"finish_reason": response.choices[0].finish_reason
}
except Exception as e:
# Log error and re-raise
print(f"Error processing message with OpenAI: {str(e)}")
raise
def format_conversation_history(
self,
messages: List[Any]
) -> List[Dict[str, str]]:
"""
Format database messages into OpenAI conversation history format.
Args:
messages: List of Message model instances from database
Returns:
List of message dicts in OpenAI format
"""
history = []
for msg in messages:
history.append({
"role": msg.role.value if hasattr(msg.role, 'value') else msg.role,
"content": msg.content
})
return history