File size: 6,701 Bytes
75cfc9a 08eb725 75cfc9a 08eb725 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
import re
import json
from typing import List, Union, Optional
def extract_final_answer(output: str) -> str:
"""
Extracts the text after 'FINAL ANSWER:' in the model's output.
Strips whitespace and ensures clean formatting.
If the answer is a comma-separated list, ensures a space after each comma.
"""
output = str(output)
marker = "FINAL ANSWER:"
lower_output = output.lower()
if marker.lower() in lower_output:
# Find actual case version in original output (for safety)
idx = lower_output.rfind(marker.lower())
raw_answer = output[idx + len(marker) :].strip()
# Normalize comma-separated lists: ensure single space after commas
cleaned_answer = re.sub(r",\s*", ", ", raw_answer)
return cleaned_answer
return output
def replace_tool_mentions(prompt: str) -> str:
# Replace tool mentions in backticks: `search` -> `web_search`, `wiki` -> `wikipedia_search`
prompt = re.sub(r"(?<!\w)`search`(?!\w)", "`web_search`", prompt)
prompt = re.sub(r"(?<!\w)`wiki`(?!\w)", "`wikipedia_search`", prompt)
# Replace function calls: search(...) -> web_search(...), wiki(...) -> wikipedia_search(...)
# This ensures we only catch function calls (not words like arxiv_search)
prompt = re.sub(r"(?<!\w)(?<!_)search\(", "web_search(", prompt)
prompt = re.sub(r"(?<!\w)(?<!_)wiki\(", "wikipedia_search(", prompt)
return prompt
def _question_matches(question: str, filters: Union[str, List[str]]) -> bool:
"""Helper: check if question matches any string in filters."""
if isinstance(filters, str):
filters = [filters]
return any(f.lower() in question.lower() for f in filters)
def load_online_qas(
qa_type: Union[str, List[str]] = "all",
has_file: Optional[bool] = None,
file_path = "Final_Assignment_Template/allqas.jsonl"
) -> List[dict]:
"""
Load online QAs from example_gaiaqa.json.
Parameters:
- qa_type: str or List[str], used to match substrings in the Question. Use "all" for no filtering.
- has_file: bool or None, filters QAs by presence of 'file_name':
- True: only include QAs with file_name
- False: only include QAs without file_name
- None: no file_name filtering
- file_path: a path
"""
data = []
with open(file_path ,"r") as f:
for line in f:
entry = json.loads(line)
data.append(entry)
# Apply file presence filter
if has_file is True:
data = [qa for qa in data if qa.get("file_name", "").strip()]
elif has_file is False:
data = [qa for qa in data if not qa.get("file_name", "").strip()]
# Apply question content filter
if qa_type == "all":
return data
return [qa for qa in data if _question_matches(qa.get("Question", ""), qa_type)]
def load_test_qas(qa_type: Union[str, List[str]] = "all") -> List[dict]:
"""Loads test QAs with no attached files. Optionally filters by topic keywords in questions."""
test_docs = []
with open("Final_Assignment_Template/gaia_val.jsonl", "r") as f:
for line in f:
entry = json.loads(line)
if entry.get("file_name", "").strip() == "":
test_docs.append(entry)
if qa_type == "all":
return [
{
"Question": e["Question"],
"Final answer": e.get("Final answer"),
"task_id": e["task_id"],
"tools": e.get("Annotator Metadata", {}).get("Tools"),
"file_name": e.get("file_name", "")
}
for e in test_docs
]
return [
{
"Question": e["Question"],
"Final answer": e.get("Final answer"),
"task_id": e["task_id"],
"tools": e.get("Annotator Metadata", {}).get("Tools"),
"file_name": e.get("file_name", "")
}
for e in test_docs
if _question_matches(e["Question"], qa_type)
]
def load_val_qas(qa_type: Union[str, List[str]] = "all") -> List[dict]:
"""Loads validation QAs with no attached files. Optionally filters by topic keywords in questions."""
val_docs = []
with open("Final_Assignment_Template/gaia_val.jsonl", "r") as f:
for line in f:
entry = json.loads(line)
if entry.get("file_name", "").strip() == "":
val_docs.append(entry)
if qa_type == "all":
return [
{
"Question": e["Question"],
"Final answer": e.get("Final answer"),
"task_id": e["task_id"],
"tools": e.get("Annotator Metadata", {}).get("Tools"),
"file_name": e.get("file_name", "")
}
for e in val_docs
]
return [
{
"Question": e["Question"],
"Final answer": e.get("Final answer"),
"task_id": e["task_id"],
"tools": e.get("Annotator Metadata", {}).get("Tools"),
"file_name": e.get("file_name", "")
}
for e in val_docs
if _question_matches(e["Question"], qa_type)
]
# import requests
# import json
# def fetch_and_save_questions(api_base_url: str, output_path: str):
# """
# Fetch all questions from the Agent Evaluation API and save them as JSONL.
# :param api_base_url: Base URL of the scoring API, e.g. "https://agents-course-unit4-scoring.hf.space"
# :param output_path: Path to the output .jsonl file
# """
# endpoint = f"{api_base_url}/questions"
# try:
# resp = requests.get(endpoint, timeout=30)
# resp.raise_for_status()
# questions = resp.json()
# except Exception as e:
# print(f"❌ Failed to fetch questions: {e}")
# return
# try:
# with open(output_path, "w", encoding="utf-8") as fout:
# for q in questions:
# fout.write(json.dumps(q, ensure_ascii=False) + "\n")
# print(f"✅ Saved {len(questions)} questions to {output_path}")
# except Exception as e:
# print(f"❌ Failed to write JSONL file: {e}")
# API_BASE = "https://agents-course-unit4-scoring.hf.space"
# OUTPUT_FILE = "questions.jsonl"
# fetch_and_save_questions(API_BASE, OUTPUT_FILE)
# dlf = DownloadFileFromTaskTool()
# for res in results:
# res = dlf.forward(task_id = res["task_id"])
# print(res)
# task_id = "cca530fc-4052-43b2-b130-b30968d8aa44"
# file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}"
# response = requests.get(file_url, timeout=15)
# print(response.content)
# print(response.headers.get("content-type", "").lower())
#print(response.headers) |