File size: 6,120 Bytes
1aa16a4 9e58d03 1aa16a4 9e58d03 1aa16a4 9e58d03 1aa16a4 9e58d03 1aa16a4 9e58d03 1aa16a4 9e58d03 1aa16a4 9e58d03 1aa16a4 9e58d03 1aa16a4 9e58d03 1aa16a4 9e58d03 1aa16a4 9e58d03 1aa16a4 9e58d03 1aa16a4 9e58d03 1aa16a4 9e58d03 1aa16a4 9e58d03 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
import requests
from huggingface_hub import login
import agent
import json
import os
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
SPACE_ID = "https://huggingface.co/spaces/IngoTB303/Final_Assignment_Template/tree/main"
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
attachments_url = f"{api_url}/files/"
submit_url = f"{api_url}/submit"
agent = agent.BasicAgent()
def fetch_questions():
"""Fetch questions from the API endpoint."""
print(f"Fetching questions from: {questions_url}")
questions_data = None
try:
response = requests.get(questions_url, timeout=30)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
return None
# Ensure cache directory exists
os.makedirs("cache", exist_ok=True)
# Fetch attachments for questions with file_name
for question in questions_data:
file_name = question.get("file_name", "")
task_id = question.get("task_id")
if file_name and task_id:
try:
att_response = requests.get(f"{attachments_url}{task_id}", timeout=15)
att_response.raise_for_status()
# Save binary content to file in cache folder
file_path = os.path.join("cache", file_name)
with open(file_path, "wb") as f:
f.write(att_response.content)
# Store the local file path in the question dict with double backslashes
question["attachment_file"] = file_path
except Exception as e:
print(f"Error fetching attachment for task {task_id}: {e}")
question["attachment_file"] = None
else:
question["attachment_file"] = ""
return questions_data
except Exception as e:
print(f"Error fetching questions: {e}")
finally:
if questions_data:
with open("questions.json", "w", encoding="utf-8") as f:
json.dump(questions_data, f, ensure_ascii=False, indent=2)
def run_agent(questions_data):
answers_payload = []
print(f"Running agent on {len(questions_data)} questions...")
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question", "")
attachment_file = item.get("attachment_file", None)
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
try:
if attachment_file not in (None, ""):
submitted_answer = agent.forward(question=question_text, attachment=attachment_file)
else:
submitted_answer = agent.forward(question=question_text)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
return answers_payload
def load_questions(filename):
"""Load questions from a local JSON file and return as questions_data."""
try:
with open(filename, "r", encoding="utf-8") as f:
questions_data = json.load(f)
return questions_data
except Exception as e:
print(f"Error loading questions from {filename}: {e}")
return None
def load_answers(filename):
"""Load answers from a local JSON file."""
try:
with open(filename, "r", encoding="utf-8") as f:
answers = json.load(f)
return answers
except Exception as e:
print(f"Error loading answers from {filename}: {e}")
return None
def submit_answers_to_hf(username, agent_code, answers_payload):
# Prepare Submission
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
return final_status
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
return status_message
# --- fetch questions from server ---
questions = fetch_questions()
# --- load cached questions ---
# questions = load_questions("questions.json")
# test print the questions to verify, if attachments were loaded
# for question in questions:
# print(question["question"])
# print(question["attachment_file"])
# --- generate answers ---
answers = run_agent(questions)
# save answers to publish them later
if answers:
with open("answers.json", "w", encoding="utf-8") as f:
json.dump(answers, f, ensure_ascii=False, indent=2)
# --- submit results to Huggingface ---
# answers = load_answers("answers.json"))
# assignment_results = submit_answers_to_hf("IngoTB303", SPACE_ID, answers)
# print(assignment_results)
# # Find the question with the specified task_id
# question_text = next((q["question"] for q in questions if q.get("task_id") == "7bd855d8-463d-4ed5-93ca-5fe35145f733"), None)
# attachment_file = next((q["attachment_file"] for q in questions if q.get("task_id") == "7bd855d8-463d-4ed5-93ca-5fe35145f733"), None)
# print(question_text)
# print(attachment_file)
# print(agent.forward(question=question_text, attachment=attachment_file))
|