import re import json from typing import List, Union, Optional def extract_final_answer(output: str) -> str: """ Extracts the text after 'FINAL ANSWER:' in the model's output. Strips whitespace and ensures clean formatting. If the answer is a comma-separated list, ensures a space after each comma. """ output = str(output) marker = "FINAL ANSWER:" lower_output = output.lower() if marker.lower() in lower_output: # Find actual case version in original output (for safety) idx = lower_output.rfind(marker.lower()) raw_answer = output[idx + len(marker) :].strip() # Normalize comma-separated lists: ensure single space after commas cleaned_answer = re.sub(r",\s*", ", ", raw_answer) return cleaned_answer return output def replace_tool_mentions(prompt: str) -> str: # Replace tool mentions in backticks: `search` -> `web_search`, `wiki` -> `wikipedia_search` prompt = re.sub(r"(? web_search(...), wiki(...) -> wikipedia_search(...) # This ensures we only catch function calls (not words like arxiv_search) prompt = re.sub(r"(? bool: """Helper: check if question matches any string in filters.""" if isinstance(filters, str): filters = [filters] return any(f.lower() in question.lower() for f in filters) def load_online_qas( qa_type: Union[str, List[str]] = "all", has_file: Optional[bool] = None, file_path = "Final_Assignment_Template/allqas.jsonl" ) -> List[dict]: """ Load online QAs from example_gaiaqa.json. Parameters: - qa_type: str or List[str], used to match substrings in the Question. Use "all" for no filtering. - has_file: bool or None, filters QAs by presence of 'file_name': - True: only include QAs with file_name - False: only include QAs without file_name - None: no file_name filtering - file_path: a path """ data = [] with open(file_path ,"r") as f: for line in f: entry = json.loads(line) data.append(entry) # Apply file presence filter if has_file is True: data = [qa for qa in data if qa.get("file_name", "").strip()] elif has_file is False: data = [qa for qa in data if not qa.get("file_name", "").strip()] # Apply question content filter if qa_type == "all": return data return [qa for qa in data if _question_matches(qa.get("Question", ""), qa_type)] def load_test_qas(qa_type: Union[str, List[str]] = "all") -> List[dict]: """Loads test QAs with no attached files. Optionally filters by topic keywords in questions.""" test_docs = [] with open("Final_Assignment_Template/gaia_val.jsonl", "r") as f: for line in f: entry = json.loads(line) if entry.get("file_name", "").strip() == "": test_docs.append(entry) if qa_type == "all": return [ { "Question": e["Question"], "Final answer": e.get("Final answer"), "task_id": e["task_id"], "tools": e.get("Annotator Metadata", {}).get("Tools"), "file_name": e.get("file_name", "") } for e in test_docs ] return [ { "Question": e["Question"], "Final answer": e.get("Final answer"), "task_id": e["task_id"], "tools": e.get("Annotator Metadata", {}).get("Tools"), "file_name": e.get("file_name", "") } for e in test_docs if _question_matches(e["Question"], qa_type) ] def load_val_qas(qa_type: Union[str, List[str]] = "all") -> List[dict]: """Loads validation QAs with no attached files. Optionally filters by topic keywords in questions.""" val_docs = [] with open("Final_Assignment_Template/gaia_val.jsonl", "r") as f: for line in f: entry = json.loads(line) if entry.get("file_name", "").strip() == "": val_docs.append(entry) if qa_type == "all": return [ { "Question": e["Question"], "Final answer": e.get("Final answer"), "task_id": e["task_id"], "tools": e.get("Annotator Metadata", {}).get("Tools"), "file_name": e.get("file_name", "") } for e in val_docs ] return [ { "Question": e["Question"], "Final answer": e.get("Final answer"), "task_id": e["task_id"], "tools": e.get("Annotator Metadata", {}).get("Tools"), "file_name": e.get("file_name", "") } for e in val_docs if _question_matches(e["Question"], qa_type) ] # import requests # import json # def fetch_and_save_questions(api_base_url: str, output_path: str): # """ # Fetch all questions from the Agent Evaluation API and save them as JSONL. # :param api_base_url: Base URL of the scoring API, e.g. "https://agents-course-unit4-scoring.hf.space" # :param output_path: Path to the output .jsonl file # """ # endpoint = f"{api_base_url}/questions" # try: # resp = requests.get(endpoint, timeout=30) # resp.raise_for_status() # questions = resp.json() # except Exception as e: # print(f"❌ Failed to fetch questions: {e}") # return # try: # with open(output_path, "w", encoding="utf-8") as fout: # for q in questions: # fout.write(json.dumps(q, ensure_ascii=False) + "\n") # print(f"✅ Saved {len(questions)} questions to {output_path}") # except Exception as e: # print(f"❌ Failed to write JSONL file: {e}") # API_BASE = "https://agents-course-unit4-scoring.hf.space" # OUTPUT_FILE = "questions.jsonl" # fetch_and_save_questions(API_BASE, OUTPUT_FILE) # dlf = DownloadFileFromTaskTool() # for res in results: # res = dlf.forward(task_id = res["task_id"]) # print(res) # task_id = "cca530fc-4052-43b2-b130-b30968d8aa44" # file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}" # response = requests.get(file_url, timeout=15) # print(response.content) # print(response.headers.get("content-type", "").lower()) #print(response.headers)