File size: 6,701 Bytes
75cfc9a
08eb725
 
75cfc9a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
08eb725
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import re
import json
from typing import List, Union, Optional


def extract_final_answer(output: str) -> str:
    """
    Extracts the text after 'FINAL ANSWER:' in the model's output.
    Strips whitespace and ensures clean formatting.
    If the answer is a comma-separated list, ensures a space after each comma.
    """
    output = str(output)
    marker = "FINAL ANSWER:"
    lower_output = output.lower()

    if marker.lower() in lower_output:
        # Find actual case version in original output (for safety)
        idx = lower_output.rfind(marker.lower())
        raw_answer = output[idx + len(marker) :].strip()

        # Normalize comma-separated lists: ensure single space after commas
        cleaned_answer = re.sub(r",\s*", ", ", raw_answer)
        return cleaned_answer

    return output


def replace_tool_mentions(prompt: str) -> str:
    # Replace tool mentions in backticks: `search` -> `web_search`, `wiki` -> `wikipedia_search`
    prompt = re.sub(r"(?<!\w)`search`(?!\w)", "`web_search`", prompt)
    prompt = re.sub(r"(?<!\w)`wiki`(?!\w)", "`wikipedia_search`", prompt)

    # Replace function calls: search(...) -> web_search(...), wiki(...) -> wikipedia_search(...)
    # This ensures we only catch function calls (not words like arxiv_search)
    prompt = re.sub(r"(?<!\w)(?<!_)search\(", "web_search(", prompt)
    prompt = re.sub(r"(?<!\w)(?<!_)wiki\(", "wikipedia_search(", prompt)

    return prompt

def _question_matches(question: str, filters: Union[str, List[str]]) -> bool:
    """Helper: check if question matches any string in filters."""
    if isinstance(filters, str):
        filters = [filters]
    return any(f.lower() in question.lower() for f in filters)

def load_online_qas(
    qa_type: Union[str, List[str]] = "all",
    has_file: Optional[bool] = None,
    file_path = "Final_Assignment_Template/allqas.jsonl"
) -> List[dict]:
    """
    Load online QAs from example_gaiaqa.json.

    Parameters:
    - qa_type: str or List[str], used to match substrings in the Question. Use "all" for no filtering.
    - has_file: bool or None, filters QAs by presence of 'file_name':
        - True: only include QAs with file_name
        - False: only include QAs without file_name
        - None: no file_name filtering
    - file_path: a path

    """
    data = []
    with open(file_path ,"r") as f:
        for line in f:
            entry = json.loads(line)
            data.append(entry)

    # Apply file presence filter
    if has_file is True:
        data = [qa for qa in data if qa.get("file_name", "").strip()]
    elif has_file is False:
        data = [qa for qa in data if not qa.get("file_name", "").strip()]

    # Apply question content filter
    if qa_type == "all":
        return data

    return [qa for qa in data if _question_matches(qa.get("Question", ""), qa_type)]


def load_test_qas(qa_type: Union[str, List[str]] = "all") -> List[dict]:
    """Loads test QAs with no attached files. Optionally filters by topic keywords in questions."""
    test_docs = []
    with open("Final_Assignment_Template/gaia_val.jsonl", "r") as f:
        for line in f:
            entry = json.loads(line)
            if entry.get("file_name", "").strip() == "":
                test_docs.append(entry)

    if qa_type == "all":
        return [
            {
                "Question": e["Question"],
                "Final answer": e.get("Final answer"),
                "task_id": e["task_id"],
                "tools": e.get("Annotator Metadata", {}).get("Tools"),
                "file_name": e.get("file_name", "")
            }
            for e in test_docs
        ]

    return [
        {
            "Question": e["Question"],
            "Final answer": e.get("Final answer"),
            "task_id": e["task_id"],
            "tools": e.get("Annotator Metadata", {}).get("Tools"),
            "file_name": e.get("file_name", "")
        }
        for e in test_docs
        if _question_matches(e["Question"], qa_type)
    ]


def load_val_qas(qa_type: Union[str, List[str]] = "all") -> List[dict]:
    """Loads validation QAs with no attached files. Optionally filters by topic keywords in questions."""
    val_docs = []
    with open("Final_Assignment_Template/gaia_val.jsonl", "r") as f:
        for line in f:
            entry = json.loads(line)
            if entry.get("file_name", "").strip() == "":
                val_docs.append(entry)

    if qa_type == "all":
        return [
            {
                "Question": e["Question"],
                "Final answer": e.get("Final answer"),
                "task_id": e["task_id"],
                "tools": e.get("Annotator Metadata", {}).get("Tools"),
                "file_name": e.get("file_name", "")
            }
            for e in val_docs
        ]

    return [
        {
            "Question": e["Question"],
            "Final answer": e.get("Final answer"),
            "task_id": e["task_id"],
            "tools": e.get("Annotator Metadata", {}).get("Tools"),
            "file_name": e.get("file_name", "")
        }
        for e in val_docs
        if _question_matches(e["Question"], qa_type)
    ]
# import requests
# import json

# def fetch_and_save_questions(api_base_url: str, output_path: str):
#     """
#     Fetch all questions from the Agent Evaluation API and save them as JSONL.
    
#     :param api_base_url: Base URL of the scoring API, e.g. "https://agents-course-unit4-scoring.hf.space"
#     :param output_path:  Path to the output .jsonl file
#     """
#     endpoint = f"{api_base_url}/questions"
#     try:
#         resp = requests.get(endpoint, timeout=30)
#         resp.raise_for_status()
#         questions = resp.json()
#     except Exception as e:
#         print(f"❌ Failed to fetch questions: {e}")
#         return

#     try:
#         with open(output_path, "w", encoding="utf-8") as fout:
#             for q in questions:
#                 fout.write(json.dumps(q, ensure_ascii=False) + "\n")
#         print(f"✅ Saved {len(questions)} questions to {output_path}")
#     except Exception as e:
#         print(f"❌ Failed to write JSONL file: {e}")

# API_BASE = "https://agents-course-unit4-scoring.hf.space"
# OUTPUT_FILE = "questions.jsonl"
# fetch_and_save_questions(API_BASE, OUTPUT_FILE)


# dlf = DownloadFileFromTaskTool()
# for res in results:
#     res = dlf.forward(task_id = res["task_id"])
#     print(res)
# task_id = "cca530fc-4052-43b2-b130-b30968d8aa44"
# file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}"
# response = requests.get(file_url, timeout=15)

# print(response.content)
# print(response.headers.get("content-type", "").lower())
#print(response.headers)