mdicio's picture
openpyxl
08eb725
import re
import json
from typing import List, Union, Optional
def extract_final_answer(output: str) -> str:
"""
Extracts the text after 'FINAL ANSWER:' in the model's output.
Strips whitespace and ensures clean formatting.
If the answer is a comma-separated list, ensures a space after each comma.
"""
output = str(output)
marker = "FINAL ANSWER:"
lower_output = output.lower()
if marker.lower() in lower_output:
# Find actual case version in original output (for safety)
idx = lower_output.rfind(marker.lower())
raw_answer = output[idx + len(marker) :].strip()
# Normalize comma-separated lists: ensure single space after commas
cleaned_answer = re.sub(r",\s*", ", ", raw_answer)
return cleaned_answer
return output
def replace_tool_mentions(prompt: str) -> str:
# Replace tool mentions in backticks: `search` -> `web_search`, `wiki` -> `wikipedia_search`
prompt = re.sub(r"(?<!\w)`search`(?!\w)", "`web_search`", prompt)
prompt = re.sub(r"(?<!\w)`wiki`(?!\w)", "`wikipedia_search`", prompt)
# Replace function calls: search(...) -> web_search(...), wiki(...) -> wikipedia_search(...)
# This ensures we only catch function calls (not words like arxiv_search)
prompt = re.sub(r"(?<!\w)(?<!_)search\(", "web_search(", prompt)
prompt = re.sub(r"(?<!\w)(?<!_)wiki\(", "wikipedia_search(", prompt)
return prompt
def _question_matches(question: str, filters: Union[str, List[str]]) -> bool:
"""Helper: check if question matches any string in filters."""
if isinstance(filters, str):
filters = [filters]
return any(f.lower() in question.lower() for f in filters)
def load_online_qas(
qa_type: Union[str, List[str]] = "all",
has_file: Optional[bool] = None,
file_path = "Final_Assignment_Template/allqas.jsonl"
) -> List[dict]:
"""
Load online QAs from example_gaiaqa.json.
Parameters:
- qa_type: str or List[str], used to match substrings in the Question. Use "all" for no filtering.
- has_file: bool or None, filters QAs by presence of 'file_name':
- True: only include QAs with file_name
- False: only include QAs without file_name
- None: no file_name filtering
- file_path: a path
"""
data = []
with open(file_path ,"r") as f:
for line in f:
entry = json.loads(line)
data.append(entry)
# Apply file presence filter
if has_file is True:
data = [qa for qa in data if qa.get("file_name", "").strip()]
elif has_file is False:
data = [qa for qa in data if not qa.get("file_name", "").strip()]
# Apply question content filter
if qa_type == "all":
return data
return [qa for qa in data if _question_matches(qa.get("Question", ""), qa_type)]
def load_test_qas(qa_type: Union[str, List[str]] = "all") -> List[dict]:
"""Loads test QAs with no attached files. Optionally filters by topic keywords in questions."""
test_docs = []
with open("Final_Assignment_Template/gaia_val.jsonl", "r") as f:
for line in f:
entry = json.loads(line)
if entry.get("file_name", "").strip() == "":
test_docs.append(entry)
if qa_type == "all":
return [
{
"Question": e["Question"],
"Final answer": e.get("Final answer"),
"task_id": e["task_id"],
"tools": e.get("Annotator Metadata", {}).get("Tools"),
"file_name": e.get("file_name", "")
}
for e in test_docs
]
return [
{
"Question": e["Question"],
"Final answer": e.get("Final answer"),
"task_id": e["task_id"],
"tools": e.get("Annotator Metadata", {}).get("Tools"),
"file_name": e.get("file_name", "")
}
for e in test_docs
if _question_matches(e["Question"], qa_type)
]
def load_val_qas(qa_type: Union[str, List[str]] = "all") -> List[dict]:
"""Loads validation QAs with no attached files. Optionally filters by topic keywords in questions."""
val_docs = []
with open("Final_Assignment_Template/gaia_val.jsonl", "r") as f:
for line in f:
entry = json.loads(line)
if entry.get("file_name", "").strip() == "":
val_docs.append(entry)
if qa_type == "all":
return [
{
"Question": e["Question"],
"Final answer": e.get("Final answer"),
"task_id": e["task_id"],
"tools": e.get("Annotator Metadata", {}).get("Tools"),
"file_name": e.get("file_name", "")
}
for e in val_docs
]
return [
{
"Question": e["Question"],
"Final answer": e.get("Final answer"),
"task_id": e["task_id"],
"tools": e.get("Annotator Metadata", {}).get("Tools"),
"file_name": e.get("file_name", "")
}
for e in val_docs
if _question_matches(e["Question"], qa_type)
]
# import requests
# import json
# def fetch_and_save_questions(api_base_url: str, output_path: str):
# """
# Fetch all questions from the Agent Evaluation API and save them as JSONL.
# :param api_base_url: Base URL of the scoring API, e.g. "https://agents-course-unit4-scoring.hf.space"
# :param output_path: Path to the output .jsonl file
# """
# endpoint = f"{api_base_url}/questions"
# try:
# resp = requests.get(endpoint, timeout=30)
# resp.raise_for_status()
# questions = resp.json()
# except Exception as e:
# print(f"❌ Failed to fetch questions: {e}")
# return
# try:
# with open(output_path, "w", encoding="utf-8") as fout:
# for q in questions:
# fout.write(json.dumps(q, ensure_ascii=False) + "\n")
# print(f"✅ Saved {len(questions)} questions to {output_path}")
# except Exception as e:
# print(f"❌ Failed to write JSONL file: {e}")
# API_BASE = "https://agents-course-unit4-scoring.hf.space"
# OUTPUT_FILE = "questions.jsonl"
# fetch_and_save_questions(API_BASE, OUTPUT_FILE)
# dlf = DownloadFileFromTaskTool()
# for res in results:
# res = dlf.forward(task_id = res["task_id"])
# print(res)
# task_id = "cca530fc-4052-43b2-b130-b30968d8aa44"
# file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}"
# response = requests.get(file_url, timeout=15)
# print(response.content)
# print(response.headers.get("content-type", "").lower())
#print(response.headers)