import requests from huggingface_hub import login import agent import json import os DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" SPACE_ID = "https://huggingface.co/spaces/IngoTB303/Final_Assignment_Template/tree/main" api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" attachments_url = f"{api_url}/files/" submit_url = f"{api_url}/submit" agent = agent.BasicAgent() def fetch_questions(): """Fetch questions from the API endpoint.""" print(f"Fetching questions from: {questions_url}") questions_data = None try: response = requests.get(questions_url, timeout=30) response.raise_for_status() questions_data = response.json() if not questions_data: return None # Ensure cache directory exists os.makedirs("cache", exist_ok=True) # Fetch attachments for questions with file_name for question in questions_data: file_name = question.get("file_name", "") task_id = question.get("task_id") if file_name and task_id: try: att_response = requests.get(f"{attachments_url}{task_id}", timeout=15) att_response.raise_for_status() # Save binary content to file in cache folder file_path = os.path.join("cache", file_name) with open(file_path, "wb") as f: f.write(att_response.content) # Store the local file path in the question dict with double backslashes question["attachment_file"] = file_path except Exception as e: print(f"Error fetching attachment for task {task_id}: {e}") question["attachment_file"] = None else: question["attachment_file"] = "" return questions_data except Exception as e: print(f"Error fetching questions: {e}") finally: if questions_data: with open("questions.json", "w", encoding="utf-8") as f: json.dump(questions_data, f, ensure_ascii=False, indent=2) def run_agent(questions_data): answers_payload = [] print(f"Running agent on {len(questions_data)} questions...") for item in questions_data: task_id = item.get("task_id") question_text = item.get("question", "") attachment_file = item.get("attachment_file", None) if not task_id or question_text is None: print(f"Skipping item with missing task_id or question: {item}") continue try: if attachment_file not in (None, ""): submitted_answer = agent.forward(question=question_text, attachment=attachment_file) else: submitted_answer = agent.forward(question=question_text) answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) except Exception as e: print(f"Error running agent on task {task_id}: {e}") return answers_payload def load_questions(filename): """Load questions from a local JSON file and return as questions_data.""" try: with open(filename, "r", encoding="utf-8") as f: questions_data = json.load(f) return questions_data except Exception as e: print(f"Error loading questions from {filename}: {e}") return None def load_answers(filename): """Load answers from a local JSON file.""" try: with open(filename, "r", encoding="utf-8") as f: answers = json.load(f) return answers except Exception as e: print(f"Error loading answers from {filename}: {e}") return None def submit_answers_to_hf(username, agent_code, answers_payload): # Prepare Submission submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." print(status_update) # Submit print(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) print("Submission successful.") return final_status except Exception as e: status_message = f"An unexpected error occurred during submission: {e}" print(status_message) return status_message # --- fetch questions from server --- questions = fetch_questions() # --- load cached questions --- # questions = load_questions("questions.json") # test print the questions to verify, if attachments were loaded # for question in questions: # print(question["question"]) # print(question["attachment_file"]) # --- generate answers --- answers = run_agent(questions) # save answers to publish them later if answers: with open("answers.json", "w", encoding="utf-8") as f: json.dump(answers, f, ensure_ascii=False, indent=2) # --- submit results to Huggingface --- # answers = load_answers("answers.json")) # assignment_results = submit_answers_to_hf("IngoTB303", SPACE_ID, answers) # print(assignment_results) # # Find the question with the specified task_id # question_text = next((q["question"] for q in questions if q.get("task_id") == "7bd855d8-463d-4ed5-93ca-5fe35145f733"), None) # attachment_file = next((q["attachment_file"] for q in questions if q.get("task_id") == "7bd855d8-463d-4ed5-93ca-5fe35145f733"), None) # print(question_text) # print(attachment_file) # print(agent.forward(question=question_text, attachment=attachment_file))