File size: 7,062 Bytes
310260a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 | """
TaskAgent class for handling AI-powered task management conversations.
This module implements the core agent that processes user messages and
executes task operations through MCP tools.
"""
from openai import AsyncOpenAI
from typing import List, Dict, Any, Optional
from ..config import settings
from .agent_config import TASK_AGENT_SYSTEM_PROMPT, AGENT_CONFIG
class TaskAgent:
"""
AI agent for task management conversations.
This agent uses OpenAI's API (or OpenRouter) with function calling to process natural
language requests and execute task operations through MCP tools.
"""
def __init__(self):
"""
Initialize the TaskAgent with OpenAI client.
The agent is configured with:
- OpenAI API key from settings (can be OpenRouter key)
- System prompt defining agent behavior
- Model configuration (temperature, max_tokens, etc.)
- Empty tools list (tools registered later via register_tools)
"""
# Configure client for OpenRouter if base URL is provided
client_kwargs = {"api_key": settings.OPENAI_API_KEY}
if hasattr(settings, 'OPENAI_BASE_URL') and settings.OPENAI_BASE_URL:
client_kwargs["base_url"] = settings.OPENAI_BASE_URL
self.client = AsyncOpenAI(**client_kwargs)
self.model = settings.OPENAI_MODEL
self.system_prompt = TASK_AGENT_SYSTEM_PROMPT
self.temperature = AGENT_CONFIG["temperature"]
self.max_tokens = AGENT_CONFIG["max_tokens"]
self.tools: List[Dict[str, Any]] = []
def register_tools(self, tools: List[Dict[str, Any]]) -> None:
"""
Register MCP tools with the agent.
Tools should be in OpenAI function calling format:
{
"type": "function",
"function": {
"name": "tool_name",
"description": "Tool description",
"parameters": {...}
}
}
Args:
tools: List of tool definitions in OpenAI format
"""
self.tools = tools
async def process_message(
self,
message: str,
conversation_history: List[Dict[str, str]],
tool_executor: Optional[Any] = None
) -> Dict[str, Any]:
"""
Process a user message and generate a response.
This method:
1. Constructs the full message history with system prompt
2. Calls OpenAI API with tool definitions
3. Handles tool calls if the agent decides to use them
4. Returns the final response with any tool call metadata
Args:
message: The user's message to process
conversation_history: Previous messages in the conversation
Format: [{"role": "user"|"assistant", "content": "..."}]
tool_executor: Optional callable to execute tool calls
Should accept (tool_name, arguments) and return result
Returns:
Dict containing:
- content: The assistant's response text
- tool_calls: List of tool calls made (if any)
- finish_reason: Why the model stopped generating
Raises:
Exception: If OpenAI API call fails
"""
# Build messages array with system prompt
messages = [
{"role": "system", "content": self.system_prompt}
]
# Add conversation history
messages.extend(conversation_history)
# Add current user message
messages.append({"role": "user", "content": message})
# Call OpenAI API
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=self.tools if self.tools else None,
temperature=self.temperature,
max_tokens=self.max_tokens
)
# Extract response
assistant_message = response.choices[0].message
# Handle tool calls if present
tool_calls_data = []
if assistant_message.tool_calls and tool_executor:
for tool_call in assistant_message.tool_calls:
tool_name = tool_call.function.name
tool_args = tool_call.function.arguments
# Execute tool
try:
import json
args_dict = json.loads(tool_args)
result = await tool_executor(tool_name, args_dict)
tool_calls_data.append({
"id": tool_call.id,
"name": tool_name,
"arguments": args_dict,
"result": result
})
except Exception as e:
tool_calls_data.append({
"id": tool_call.id,
"name": tool_name,
"arguments": tool_args,
"error": str(e)
})
# If tools were called, make another API call with tool results
# to get the final response
messages.append({
"role": "assistant",
"content": assistant_message.content,
"tool_calls": [
{
"id": tc["id"],
"type": "function",
"function": {
"name": tc["name"],
"arguments": json.dumps(tc["arguments"])
}
}
for tc in tool_calls_data
]
})
# Add tool results
for tc in tool_calls_data:
messages.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": json.dumps(tc.get("result", {"error": tc.get("error")}))
})
# Get final response
final_response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.temperature,
max_tokens=self.max_tokens
)
final_message = final_response.choices[0].message
return {
"content": final_message.content,
"tool_calls": tool_calls_data,
"finish_reason": final_response.choices[0].finish_reason
}
# No tool calls - return direct response
return {
"content": assistant_message.content,
"tool_calls": [],
"finish_reason": response.choices[0].finish_reason
}
async def health_check(self) -> bool:
"""
Verify the agent can communicate with OpenAI API.
Returns:
True if API is accessible, False otherwise
"""
try:
await self.client.models.retrieve(self.model)
return True
except Exception:
return False
|