|
|
import os |
|
|
import pandas as pd |
|
|
import re |
|
|
import glob |
|
|
from tqdm import tqdm |
|
|
import datetime |
|
|
import openai |
|
|
import argparse |
|
|
import io |
|
|
|
|
|
def summarize_results(results_dirs, output_csv, model, no_llm = False): |
|
|
client = openai.OpenAI( |
|
|
api_key = os.environ.get('CBORG_API_KEY'), |
|
|
base_url = 'https://api.cborg.lbl.gov' |
|
|
) |
|
|
|
|
|
error_description_prompt = ( |
|
|
"You are an expert assistant. Below is a comprehensive log of a multi-step workflow from a high energy physics analysis framework.\n\n" |
|
|
"The workflow includes:\n" |
|
|
"- A user provides an analysis task prompt.\n" |
|
|
"- A supervisor agent breaks down the task and instructs a coder agent.\n" |
|
|
"- The coder agent generates code, which is executed.\n" |
|
|
"- The supervisor reviews results and may iterate with the coder to fix issues until the task is complete.\n" |
|
|
"The log contains the user prompt, supervisor/coder dialogue, code, and execution outputs for all iterations.\n\n" |
|
|
"Your task: Summarize all errors encountered during the entire workflow in clear, concise language. " |
|
|
"Do NOT repeat or quote the log, prompt, or instructions. " |
|
|
"Do NOT include code, explanations, or any text except your error summary.\n\n" |
|
|
"For each error, use the following structure:\n" |
|
|
"- Error Type: [brief description of the nature of the error]\n" |
|
|
"- Cause: [if identifiable]\n" |
|
|
"- Responsible Party: [user, supervisor, coder, or external]\n" |
|
|
"- Consequence: [result or impact]\n" |
|
|
"- Context: [any important context]\n" |
|
|
"- Workflow Response: [Did the supervisor diagnose and address it?" |
|
|
"Did the coder attempt a fix? Was the fix successful, unsuccessful, or misdiagnosed?" |
|
|
"Was the error ignored or did it persist? Summarize the recovery process and its outcome for each error.]\n" |
|
|
"List each error as a separate bullet point using this template.\n" |
|
|
"If there is a validation error, look in the validation log and use the same structure to identify the causes of the validation error." |
|
|
"If no errors occurred, respond: 'No errors found.'\n" |
|
|
"Do NOT include code, explanations, or any text except your error summary.\n" |
|
|
"Limit your entire summary to 3000 characters. " |
|
|
"If no errors occurred, respond: 'No errors found.'\n\n" |
|
|
) |
|
|
|
|
|
results = [] |
|
|
for results_dir in results_dirs: |
|
|
for name in tqdm(os.listdir(results_dir), desc=f"generating error descriptions for {results_dir}"): |
|
|
output_dir = os.path.join(results_dir, name) |
|
|
|
|
|
if os.path.isdir(output_dir): |
|
|
|
|
|
config_match = re.match(r'^(.*?)_step\d+', name) |
|
|
config = config_match.group(1) if config_match else None |
|
|
|
|
|
|
|
|
step_match = re.search(r'_step(\d+)', name) |
|
|
step = int(step_match.group(1)) if step_match else None |
|
|
|
|
|
result = { |
|
|
"supervisor": None, |
|
|
"coder": None, |
|
|
"step": step, |
|
|
"success": None, |
|
|
"iterations": None, |
|
|
"duration": None, |
|
|
"API_calls": None, |
|
|
"input_tokens": None, |
|
|
"output_tokens": None, |
|
|
"user_prompt_tokens": None, |
|
|
"supervisor_to_coder_tokens": None, |
|
|
"coder_output_tokens": None, |
|
|
"feedback_to_supervisor_tokens": None, |
|
|
"error": "Uncategorized", |
|
|
"error_description": None, |
|
|
"output_dir": output_dir, |
|
|
} |
|
|
|
|
|
log_dir = os.path.join(output_dir, "logs") |
|
|
if os.path.isdir(log_dir): |
|
|
comp_log_files = glob.glob(os.path.join(log_dir, "*comprehensive_log.txt")) |
|
|
comp_log_str = None |
|
|
if comp_log_files: |
|
|
with open(comp_log_files[0], "r") as f: |
|
|
comp_log_str = f.read() |
|
|
else: |
|
|
result["success"] = False |
|
|
result["error_description"] = "comprehensive log file not found" |
|
|
results.append(result) |
|
|
continue |
|
|
|
|
|
supervisor_match = re.search(r"Supervisor:\s*([^\s]+)", comp_log_str) |
|
|
coder_match = re.search(r"Coder:\s*([^\s]+)", comp_log_str) |
|
|
if supervisor_match: |
|
|
result["supervisor"] = supervisor_match.group(1) |
|
|
if coder_match: |
|
|
result["coder"] = coder_match.group(1) |
|
|
|
|
|
iterations_match = re.search(r"Total Iterations:\s*(\d+)", comp_log_str) |
|
|
if iterations_match: |
|
|
result["iterations"] = int(iterations_match.group(1)) |
|
|
|
|
|
duration_match = re.search(r"Duration:\s*([0-9:.\s]+)", comp_log_str) |
|
|
if duration_match: |
|
|
duration_str = duration_match.group(1).strip() |
|
|
try: |
|
|
t = datetime.datetime.strptime(duration_str, "%H:%M:%S.%f") |
|
|
except ValueError: |
|
|
t = datetime.datetime.strptime(duration_str, "%H:%M:%S") |
|
|
result["duration"] = t.hour * 3600 + t.minute * 60 + t.second + t.microsecond / 1e6 |
|
|
|
|
|
api_calls_match = re.search(r"Total API Calls:\s*(\d+)", comp_log_str) |
|
|
if api_calls_match: |
|
|
result["API_calls"] = int(api_calls_match.group(1)) |
|
|
input_tokens_match = re.search(r"Total Input Tokens:\s*(\d+)", comp_log_str) |
|
|
if input_tokens_match: |
|
|
result["input_tokens"] = int(input_tokens_match.group(1)) |
|
|
output_tokens_match = re.search(r"Total Output Tokens:\s*(\d+)", comp_log_str) |
|
|
if output_tokens_match: |
|
|
result["output_tokens"] = int(output_tokens_match.group(1)) |
|
|
|
|
|
match = re.search(r"User Prompt Tokens:\s*(\d+)", comp_log_str) |
|
|
if match: |
|
|
result["user_prompt_tokens"] = int(match.group(1)) |
|
|
match = re.search(r"Supervisor to Coder Tokens:\s*(\d+)", comp_log_str) |
|
|
if match: |
|
|
result["supervisor_to_coder_tokens"] = int(match.group(1)) |
|
|
match = re.search(r"Coder Output Tokens:\s*(\d+)", comp_log_str) |
|
|
if match: |
|
|
result["coder_output_tokens"] = int(match.group(1)) |
|
|
match = re.search(r"Feedback to Supervisor Tokens:\s*(\d+)", comp_log_str) |
|
|
if match: |
|
|
result["feedback_to_supervisor_tokens"] = int(match.group(1)) |
|
|
|
|
|
|
|
|
val_log_files = glob.glob(os.path.join(log_dir, "*validation.log")) |
|
|
val_log_str = None |
|
|
if val_log_files: |
|
|
with open(val_log_files[0], "r") as f: |
|
|
val_log_str = f.read() |
|
|
matches = re.findall(r'(✅ Validation successful|❌ Validation failed)', val_log_str) |
|
|
if not matches: |
|
|
result["success"] = False |
|
|
else: |
|
|
last = matches[-1] |
|
|
result["success"] = last == "✅ Validation successful" |
|
|
if (no_llm): |
|
|
if (result["success"]): |
|
|
result["error"] = None |
|
|
else: |
|
|
result["error"] = "Validation Error" |
|
|
val_log_str = val_log_str.replace('\n', '').replace('\r', '') |
|
|
else: |
|
|
result["success"] = False |
|
|
val_log_str = "" |
|
|
if (not no_llm): |
|
|
try: |
|
|
response = client.chat.completions.create( |
|
|
model = model, |
|
|
messages = [ |
|
|
{ |
|
|
'role': 'user', |
|
|
'content': error_description_prompt + |
|
|
"\nComprehensive Log:\n" + comp_log_str + |
|
|
"\nValidation Log:\n" + val_log_str |
|
|
} |
|
|
], |
|
|
temperature = 0.0 |
|
|
) |
|
|
error_description = response.choices[-1].message.content |
|
|
error_description = " ".join(error_description.split()) |
|
|
error_description = error_description[:3000] |
|
|
result["error_description"] = error_description |
|
|
except Exception as e: |
|
|
print(f"OpenAI API error: {e}") |
|
|
else: |
|
|
if ("API call failed" in comp_log_str): |
|
|
result["error"] = "API Call Error" |
|
|
else: |
|
|
result["success"] = False |
|
|
result["error_description"] = "job submission failure" |
|
|
results.append(result) |
|
|
|
|
|
df = pd.DataFrame(results) |
|
|
df = df.sort_values(by=["supervisor", "coder", "step", "output_dir"]) |
|
|
df.to_csv(output_csv, index=False) |
|
|
print(f"Results written to {output_csv}") |
|
|
|
|
|
def categorize_errors(output_csv, model): |
|
|
|
|
|
client = openai.OpenAI( |
|
|
api_key = os.environ.get('CBORG_API_KEY'), |
|
|
base_url = 'https://api.cborg.lbl.gov' |
|
|
) |
|
|
|
|
|
|
|
|
df = pd.read_csv(output_csv, comment='#') |
|
|
|
|
|
|
|
|
error_descriptions = df['error_description'].fillna("").tolist() |
|
|
|
|
|
|
|
|
create_categories_prompt = ( |
|
|
"You are an expert at analyzing and organizing error messages from machine learning workflows in high energy physics.\n\n" |
|
|
"Workflow summary:\n" |
|
|
"- A user provides an analysis task prompt.\n" |
|
|
"- A supervisor agent breaks down the task and instructs a coder agent.\n" |
|
|
"- The coder agent generates code, which is executed.\n" |
|
|
"- The supervisor reviews results and may iterate with the coder to fix issues until the task is complete.\n" |
|
|
"Error descriptions below are collected from all steps and iterations of this workflow.\n\n" |
|
|
"Your task: Identify 5 to 10 distinct, meaningful categories that best capture the underlying nature or root cause of the errors in the list. " |
|
|
"Focus on grouping errors by what fundamentally caused them (such as logic mistakes, miscommunication, missing dependencies, data mismatches, etc.), " |
|
|
"rather than by their symptoms, error messages, or observable effects. " |
|
|
"Do NOT create categories based on how the error was observed or reported, but on the underlying issue that led to it.\n\n" |
|
|
"Each category should have a short, clear name and a one-sentence description that explains what kinds of errors belong in that category.\n\n" |
|
|
"Output only the categories in this format:\n" |
|
|
"1. [Category Name]: [One-sentence description]\n" |
|
|
"2. [Category Name]: [One-sentence description]\n" |
|
|
"...\n" |
|
|
"N. [Category Name]: [One-sentence description]\n\n" |
|
|
"Here are some example error categories:\n" |
|
|
"- Coding API Error: the coder incorrectly utilized common python packages (e.g. numpy, awkward, uproot, pandas)\n" |
|
|
"- User Prompt Misunderstanding: the supervisor did not properly interpret the user prompt" |
|
|
"Here are some error descriptions after running the workflow:\n" |
|
|
"```\n" |
|
|
) |
|
|
|
|
|
create_categories_prompt += "\n".join(error_descriptions) + "\n```" |
|
|
|
|
|
|
|
|
try: |
|
|
response = client.chat.completions.create( |
|
|
model=model, |
|
|
messages=[{'role': 'user', 'content': create_categories_prompt}], |
|
|
temperature=0.0 |
|
|
) |
|
|
error_categories = response.choices[-1].message.content.strip() |
|
|
print("Categories found by LLM:\n", error_categories) |
|
|
except Exception as e: |
|
|
print(f"LLM API error (category generation): {e}") |
|
|
return |
|
|
|
|
|
df['error'] = df['error'].astype(str) |
|
|
|
|
|
for idx, error_description in tqdm(enumerate(error_descriptions), total=len(error_descriptions), desc="categorizing errors"): |
|
|
if not error_description.strip(): |
|
|
continue |
|
|
|
|
|
categorize_errors_prompt = ( |
|
|
"You are an expert at classifying error messages from machine learning workflows in high energy physics.\n\n" |
|
|
"Workflow summary:\n" |
|
|
"- A user provides an analysis task prompt.\n" |
|
|
"- A supervisor agent breaks down the task and instructs a coder agent.\n" |
|
|
"- The coder agent generates code, which is executed.\n" |
|
|
"- The supervisor reviews results and may iterate with the coder to fix issues until the task is complete.\n" |
|
|
"The error descriptions below are collected from all steps and iterations of this workflow.\n\n" |
|
|
"Below is a list of error categories, each with a short description:\n" |
|
|
f"{error_categories}\n\n" |
|
|
"Your task: For the given error description, select the single most appropriate error category from the list above. " |
|
|
"Base your choice on the underlying nature or root cause of the error, not on the symptoms, error messages, or observable effects. " |
|
|
"Focus on what fundamentally caused the error, such as logic mistakes, missing dependencies, data mismatches, or miscommunication, rather than how the error was reported or observed.\n" |
|
|
"Return ALL applicable category names, each wrapped with three asterisks on each side, separated by commas, like this: ***Category One***, ***Category Two***" |
|
|
"Do not include any other text, explanation, or formatting." |
|
|
"Error description:\n" |
|
|
"```\n" |
|
|
f"{error_description}\n" |
|
|
"```" |
|
|
) |
|
|
|
|
|
def parse_categories(llm_output): |
|
|
|
|
|
return [cat.strip() for cat in re.findall(r"\*\*\*(.*?)\*\*\*", llm_output)] |
|
|
|
|
|
try: |
|
|
response = client.chat.completions.create( |
|
|
model=model, |
|
|
messages=[{'role': 'user', 'content': categorize_errors_prompt}], |
|
|
temperature=0.0 |
|
|
) |
|
|
assignments_text = response.choices[-1].message.content.strip() |
|
|
categories = parse_categories(assignments_text) |
|
|
df.at[idx, 'error_categories'] = categories if categories else ["Uncategorized"] |
|
|
except Exception as e: |
|
|
print(f"LLM API error (assignment) at row {idx}: {e}") |
|
|
df.at[idx, 'error'] = "LLM API error" |
|
|
|
|
|
df.to_csv(output_csv, index=False) |
|
|
|
|
|
with open(output_csv, 'w', encoding='utf-8') as f: |
|
|
f.write("# LLM Generated Error Categories:\n") |
|
|
for line in error_categories.splitlines(): |
|
|
f.write(f"# {line}\n") |
|
|
f.write("\n") |
|
|
df.to_csv(f, index=False) |
|
|
print(f"Saved categorized errors to {output_csv}") |
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Summarize experiment logs and errors") |
|
|
parser.add_argument("--results_dir", type=str, default=" ", nargs='+', required=False, help="One or more directories containing experiment results") |
|
|
parser.add_argument("--output_csv", type=str, default="results_summary.csv", help="Path to output CSV file") |
|
|
parser.add_argument("--model", type=str, default="gpt-oss-120b", help="LLM model to use for error summarization") |
|
|
parser.add_argument("--no_llm", action="store_true", default=False, help="If set, only generate the CSV without LLM error description or categorization") |
|
|
args = parser.parse_args() |
|
|
|
|
|
summarize_results( |
|
|
results_dirs=args.results_dir, |
|
|
output_csv=args.output_csv, |
|
|
model=args.model, |
|
|
no_llm=args.no_llm |
|
|
) |
|
|
|
|
|
if not args.no_llm: |
|
|
categorize_errors( |
|
|
output_csv=args.output_csv, |
|
|
model=args.model |
|
|
) |
|
|
else: |
|
|
print("LLM error description and categorization skipped (--no_llm set)") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |