import json import os import sys import warnings from typing import Any, Optional sys.path.append(os.path.join(os.path.dirname(__file__), "..")) warnings.filterwarnings("ignore") import pandas as pd import yaml from evaluator import Evaluator, ScoringPoint from taskweaver.app.app import TaskWeaverApp def format_output(response_obj: Any) -> str: assert hasattr(response_obj, "to_dict"), "to_dict method is not found" formatted_output = json.dumps(response_obj.to_dict()) return formatted_output def auto_evaluate_for_taskweaver( eval_case_file_path: str, interrupt_threshold: Optional[float] = None, event_handler: Optional[callable] = None, ) -> [float, float]: with open(eval_case_file_path, "r") as f: eval_meta_data = yaml.safe_load(f) app_dir = eval_meta_data["app_dir"] config_var = eval_meta_data.get("config_var", None) app = TaskWeaverApp(app_dir=app_dir, config=config_var) session = app.get_session() taskweaver_evaluator = Evaluator() score_list = [] for idx, eval_query in enumerate(eval_meta_data["eval_query"]): user_query = eval_query["user_query"] print(f"Round-{idx} user query:\n", user_query) response_round = session.send_message( user_query, event_handler=event_handler if event_handler is not None else lambda x, y: print(f"{x}:\n{y}"), ) post_index = eval_query.get("post_index", None) scoring_point_data = eval_query.get("scoring_points", None) if scoring_point_data is None: print("No scoring points are provided. Skip evaluation for this round.") continue scoring_points = [] for scoring_point in scoring_point_data: scoring_point = ScoringPoint(**scoring_point) scoring_points.append(scoring_point) if isinstance(post_index, int): response = format_output(response_round.post_list[post_index]) elif post_index is None: response = format_output(response_round) else: raise ValueError("Invalid post_index") print("Taskweaver response:\n", response) score, normalized_score = taskweaver_evaluator.evaluate(user_query, response, scoring_points) score_list.append((idx, score, normalized_score)) if interrupt_threshold is not None and interrupt_threshold > 0: if normalized_score < interrupt_threshold: print( f"Interrupted conversation testing " f"because the normalized score is lower than the threshold {interrupt_threshold}.", ) break return score_list def batch_auto_evaluate_for_taskweaver( result_file_path: str, eval_case_dir: str, flush_result_file: bool = False, interrupt_threshold: Optional[float] = None, ): if not os.path.exists(result_file_path): df = pd.DataFrame(columns=["case_file", "round", "score", "normalized_score"]) df.to_csv(result_file_path, index=False) results = pd.read_csv(result_file_path) evaluated_case_files = results["case_file"].tolist() if flush_result_file: evaluated_case_files = [] print(f"Evaluated case files: {evaluated_case_files}") eval_config_files = os.listdir(eval_case_dir) print(f"Eval config files in case dir: {eval_config_files}") for eval_config_file in eval_config_files: if eval_config_file in evaluated_case_files: print(f"Skip {eval_config_file} because it has been evaluated.") continue print("------------Start evaluating------------", eval_config_file) eval_case_file_path = os.path.join(eval_case_dir, eval_config_file) score_list = auto_evaluate_for_taskweaver( eval_case_file_path, interrupt_threshold=interrupt_threshold, ) for idx, score, normalized_score in score_list: print(f"Round-{idx} score: {score}, normalized score: {normalized_score}") new_res_row = pd.DataFrame( { "case_file": eval_config_file, "round": idx, "score": score, "normalized_score": normalized_score, }, index=[0], ) results = pd.concat([results, new_res_row], ignore_index=True) print("------------Finished evaluating------------", eval_config_file) results.to_csv(result_file_path, index=False) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Taskweaver auto evaluation script") parser.add_argument( "-m", "--mode", choices=["single", "batch"], required=True, help="Evaluation mode, single for evaluating a single case, " "batch for evaluating a batch of cases", ) parser.add_argument( "-f", "--file", type=str, required=True, help="Path to the evaluation case file or directory containing evaluation case files", ) parser.add_argument( "-r", "--result", type=str, default="sample_case_results.csv", help="Path to the result file for batch evaluation mode", ) parser.add_argument( "-t", "--threshold", type=float, default=None, help="Interrupt threshold for multi-round chat", ) parser.add_argument( "-flush", "--flush", action="store_true", help="Flush the result file", ) args = parser.parse_args() if args.mode == "single": score_list = auto_evaluate_for_taskweaver(args.file, interrupt_threshold=None) for idx, score, normalized_score in score_list: print(f"Round-{idx} score: {score}, normalized score: {normalized_score}") elif args.mode == "batch": batch_auto_evaluate_for_taskweaver( args.result, args.file, flush_result_file=args.flush, interrupt_threshold=None, )