Spaces:
Running
Running
import asyncio | |
from openai import AsyncOpenAI | |
from collections import defaultdict | |
import weave | |
from pydantic import BaseModel | |
from abc import ABC, abstractmethod | |
import json | |
from typing import Dict, List | |
from datetime import datetime | |
import backoff | |
from openai import APITimeoutError, APIError, RateLimitError | |
class FailureCategory(BaseModel): | |
category_id: int | |
category_name: str | |
description: str | |
class FailureCategories(BaseModel): | |
failure_categories: list[FailureCategory] | |
class TaskSummary(BaseModel): | |
task_id: str | |
summary: str | |
class TaskClassification(BaseModel): | |
task_id: str | |
category_id: str | |
category_name: str | |
explanation: str | |
class OverallAnalysis(BaseModel): | |
failure_categories: List[Dict] | |
task_classifications: Dict[str, Dict] | |
summary: str | |
class AsyncLLMClient(ABC): | |
async def generate_text(self, prompt, system_message=None, response_format=None): | |
pass | |
# class AsyncOpenAIClient(AsyncLLMClient): | |
# def __init__(self, model="gpt-4o-mini"): | |
# self.model = model | |
# self.client = AsyncOpenAI() | |
# async def generate_text(self, prompt, system_message=None, response_format=None): | |
# messages = [ | |
# {"role": "system", "content": system_message or "You are a helpful AI assistant."}, | |
# {"role": "user", "content": prompt} | |
# ] | |
# if response_format: | |
# response = await self.client.beta.chat.completions.parse(model=self.model, messages=messages, response_format=response_format) | |
# else: | |
# response = await self.client.chat.completions.create(model=self.model, messages=messages) | |
# return response.choices[0].message.content | |
class AsyncOpenAIClient(AsyncLLMClient): | |
def __init__(self, model="gpt-4o-mini", max_tries=5, max_time=300): | |
self.model = model | |
self.client = AsyncOpenAI() | |
self.max_tries = max_tries | |
self.max_time = max_time | |
async def _make_request(self, messages, response_format=None): | |
if response_format: | |
return await self.client.beta.chat.completions.parse( | |
model=self.model, | |
messages=messages, | |
response_format=response_format | |
) | |
else: | |
return await self.client.chat.completions.create( | |
model=self.model, | |
messages=messages | |
) | |
async def generate_text(self, prompt, system_message=None, response_format=None): | |
messages = [ | |
{"role": "system", "content": system_message or "You are a helpful AI assistant."}, | |
{"role": "user", "content": prompt} | |
] | |
try: | |
response = await self._make_request(messages, response_format) | |
return response.choices[0].message.content | |
except Exception as e: | |
raise Exception(f"Failed after {self.max_tries} attempts or {self.max_time} seconds: {str(e)}") | |
def get_weave_calls(client): | |
calls = client.calls() | |
processed_calls = [] | |
for call in calls: | |
ChatCompletion = weave.ref(call.output).get() | |
choices = [choice.message.content for choice in ChatCompletion.choices] | |
output = { | |
'weave_task_id': call.attributes['weave_task_id'], | |
'trace_id': call.trace_id, | |
'project_id': call.project_id, | |
'created_timestamp': ChatCompletion.created, | |
'inputs': dict(call.inputs), | |
'id': call.id, | |
'outputs': {'choices' : choices}, | |
'exception': call.exception, | |
'summary': call.summary, | |
'display_name': call.display_name, | |
'attributes': dict(call.attributes), | |
"_children": call._children, | |
'_feedback': call._feedback, | |
} | |
processed_calls.append(output) | |
return processed_calls | |
async def analyze_agent_performance(processed_calls, failed_tasks: list, llm_client): | |
task_calls = defaultdict(list) | |
for call in processed_calls: | |
if call['weave_task_id'] in failed_tasks: | |
task_calls[call['weave_task_id']].append(call) | |
for task_id in task_calls: | |
task_calls[task_id].sort(key=lambda x: x['created_timestamp']) | |
task_summaries = await asyncio.gather(*[summarize_task(task_id, calls, llm_client) for task_id, calls in task_calls.items()]) | |
failure_categories = await identify_failure_categories(task_summaries, llm_client) | |
task_classifications = await classify_tasks(task_summaries, failure_categories, llm_client) | |
overall_summary = await generate_overall_summary(failure_categories, task_classifications, llm_client) | |
task_classifications = {tc["task_id"]: tc for tc in task_classifications} | |
return dict(OverallAnalysis( | |
failure_categories=failure_categories, | |
task_classifications=task_classifications, | |
summary=overall_summary | |
)) | |
async def summarize_task(task_id, calls, llm_client): | |
calls_summary = "" | |
for i, call in enumerate(calls, 1): | |
calls_summary += f""" | |
Step {i}: | |
Input: {call['inputs']} | |
Output: {call['outputs']} | |
Timestamp: {datetime.fromtimestamp(call['created_timestamp'])} | |
""" | |
prompt = f""" | |
Summarize the AI agent's performance on the following task: | |
Task ID: {task_id} | |
Number of steps: {len(calls)} | |
Detailed steps: | |
{calls_summary} | |
Provide a brief summary of: | |
1. The main goal of the task (inferred from the inputs and outputs) | |
2. The agent's approach, including key steps and decisions made | |
3. Any significant challenges or errors encountered during the task | |
4. The final outcome why the task failed. Be detailed about the reason for failure. | |
Keep the summary concise (around 200 words) but include specific details about the agent's performance and any notable aspects of its problem-solving process. | |
""" | |
system_message = "You are an AI performance analyst tasked with summarizing an AI agent's performance on individual tasks. Focus on the most important aspects of the agent's approach and performance." | |
summary = await llm_client.generate_text(prompt, system_message, response_format=TaskSummary) | |
return json.loads(summary) | |
async def identify_failure_categories(task_summaries, llm_client): | |
summaries_text = "\n\n".join([f"Task {s['task_id']}:\n{s['summary']}" for s in task_summaries]) | |
prompt = f""" | |
Analyze the following summaries of an AI agent's performance across multiple tasks: | |
{summaries_text} | |
Identify recurring categories of failures that the agent faces across these tasks. For each category: | |
1. Provide a short, descriptive name (max 5 words) | |
2. Write a brief description explaining the nature of this failure or challenge category | |
Focus on patterns that appear across multiple tasks and represent specific errors that impacted the agent's performance. Make sure that your categories are distinct and cover a range of recurring issues. The categories should not bee too general. | |
Examples for categories could include: | |
Incorrect Implementation - The agent made a change to a reasonable area but their solution didn’t correctly address the issue. | |
Gave Up Prematurely - The agent decides to stop solving the task after encountering some difficulty. | |
Failed Edit Recovery - The agent went into an loop, making recurrent failing edits without recovering. | |
""" | |
system_message = "You are an expert in AI agent analysis, tasked with identifying recurring patterns in agent performance across multiple tasks." | |
categories = await llm_client.generate_text(prompt, system_message, response_format=FailureCategories) | |
return [dict(category) for category in json.loads(categories)['failure_categories']] | |
async def classify_tasks(task_summaries, failure_categories, llm_client): | |
categories_text = "\n".join([f"{cat['category_id']}. {cat['category_name']}: {cat['description']}" for i, cat in enumerate(failure_categories)]) | |
classifications = [] | |
for task in task_summaries: | |
prompt = f""" | |
Failure Categories: | |
{categories_text} | |
Task Summary: | |
{task['summary']} | |
Classify this task into one of the failure categories listed above. Provide: | |
1. The number of the chosen category | |
2. A brief explanation of why this category best fits the task's outcome | |
If the task doesn't clearly fit any category, you may classify it as "0. Other" and explain why. | |
""" | |
system_message = "You are an AI performance analyst tasked with classifying task outcomes into predefined categories." | |
classification = await llm_client.generate_text(prompt, system_message, response_format=TaskClassification) | |
classification = json.loads(classification) | |
category_number = classification['category_id'] | |
if str(category_number) == "0": | |
category_name = "Other" | |
else: | |
for cat in failure_categories: | |
if str(cat['category_id']) == str(category_number): | |
category_name = cat['category_name'] | |
break | |
else: | |
category_name = "Other" | |
explanation = classification['explanation'] | |
classifications.append(dict(TaskClassification( | |
task_id=task['task_id'], | |
category_id=category_number, | |
category_name=category_name, | |
explanation=explanation | |
))) | |
return classifications | |
async def generate_overall_summary(failure_categories, task_classifications, llm_client): | |
categories_text = "\n".join([f"{cat['category_name']}: {cat['description']}" for cat in failure_categories]) | |
classifications_text = "\n".join([f"Task {tc['task_id']}: {tc['category_name']}" for tc in task_classifications]) | |
prompt = f""" | |
Failure Categories: | |
{categories_text} | |
Task Classifications: | |
{classifications_text} | |
Based on the failure categories identified and the classification of tasks, provide an overall summary of the AI agent's performance across all tasks. Include: | |
1. The most common types of failures or challenges | |
2. Any patterns in the agent's performance across different tasks | |
3. Suggestions for areas of improvement in the agent's design or training | |
Keep the summary concise but insightful, focusing on the most significant findings and their implications for AI agent development. Do only return the summary itself without any preceding context etc. | |
""" | |
system_message = "You are a senior AI researcher tasked with providing a high-level analysis of an AI agent's performance across multiple tasks." | |
return await llm_client.generate_text(prompt, system_message) | |
async def main(): | |
client = weave.init("citp_agent_eval/usaco_1723148990") | |
processed_calls = get_weave_calls(client) | |
weave.finish() | |
openai_client = AsyncOpenAIClient(model="gpt-4o-mini") | |
overall_analysis = await analyze_agent_performance(processed_calls, openai_client) | |
with open("agent_performance_analysis.json", "w") as f: | |
json.dump(overall_analysis.model_dump(), f, indent=4) | |
if __name__ == "__main__": | |
asyncio.run(main()) |