File size: 7,410 Bytes
a94fa9b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
import os
import requests
import shutil
import logging
logger = logging.getLogger("eval_logger")
def fetch_all_questions(api_url: str) -> list[dict] | None:
"""
Fetches all questions from the API.
Args:
api_url: The base URL of the scoring API.
Returns:
A list of question dictionaries, or None if an error occurs.
"""
questions_url = f"{api_url}/questions"
logger.info(f"Fetching all questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
logger.warning("Fetched questions list is empty.")
return None
logger.info(f"Fetched {len(questions_data)} questions successfully.")
return questions_data
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching all questions: {e}", exc_info=True)
return None
except requests.exceptions.JSONDecodeError as e:
logger.error(f"Error decoding JSON response from questions endpoint: {e}", exc_info=True)
logger.error(f"Response text: {response.text[:500] if response else 'No response'}")
return None
except Exception as e:
logger.error(f"An unexpected error occurred fetching all questions: {e}", exc_info=True)
return None
def fetch_random_question(api_url: str) -> dict | None:
"""
Fetches a single random question from the API.
Args:
api_url: The base URL of the scoring API.
Returns:
A dictionary representing a single question, or None if an error occurs.
"""
random_question_url = f"{api_url}/random-question"
logger.info(f"Fetching random question from: {random_question_url}")
try:
response = requests.get(random_question_url, timeout=15)
response.raise_for_status()
question_data = response.json()
if not question_data:
logger.warning("Fetched random question is empty.")
return None
logger.info(f"Fetched random question successfully: {question_data.get('task_id')}")
return question_data
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching random question: {e}", exc_info=True)
return None
except requests.exceptions.JSONDecodeError as e:
logger.error(f"Error decoding JSON response from random question endpoint: {e}", exc_info=True)
logger.error(f"Response text: {response.text[:500] if response else 'No response'}")
return None
except Exception as e:
logger.error(f"An unexpected error occurred fetching random question: {e}", exc_info=True)
return None
def download_file(api_url: str, task_id: str, file_name: str, download_dir: str = "downloads") -> str | None:
"""
Downloads a specific file associated with a given task ID.
Args:
api_url: The base URL of the scoring API.
task_id: The ID of the task for which to download the file.
file_name: The name of the file to be saved.
download_dir: The directory where the file should be saved. Defaults to "downloads".
Returns:
The local path to the downloaded file, or None if an error occurs.
"""
if not file_name:
logger.info(f"No file_name provided for task_id {task_id}. Skipping download.")
return None
file_url = f"{api_url}/files/{task_id}"
os.makedirs(download_dir, exist_ok=True)
local_file_path = os.path.join(download_dir, file_name)
if os.path.exists(local_file_path):
logger.info(f"File already exists at {local_file_path}. Skipping download.")
return local_file_path
logger.info(f"Downloading file for task_id {task_id} from: {file_url} to {local_file_path}")
try:
with requests.get(file_url, stream=True, timeout=30) as r:
r.raise_for_status()
with open(local_file_path, 'wb') as f:
shutil.copyfileobj(r.raw, f)
logger.info(f"File downloaded successfully: {local_file_path}")
return local_file_path
except requests.exceptions.RequestException as e:
logger.error(f"Error downloading file for task_id {task_id}: {e}", exc_info=True)
if os.path.exists(local_file_path):
os.remove(local_file_path)
return None
except Exception as e:
logger.error(f"An unexpected error occurred downloading file for task_id {task_id}: {e}", exc_info=True)
if os.path.exists(local_file_path):
os.remove(local_file_path)
return None
if __name__ == '__main__':
print("--- Testing dataset_helper.py directly ---")
print("NOTE: For full logging, run through app.py. This direct test uses print statements.")
test_api_url = "https://agents-course-unit4-scoring.hf.space"
print("\n--- Testing fetch_all_questions ---")
questions = fetch_all_questions(test_api_url)
if questions:
print(f"Successfully fetched {len(questions)} questions. First question task_id: {questions[0].get('task_id')}")
else:
print("Failed to fetch all questions.")
print("\n--- Testing fetch_random_question ---")
random_q = fetch_random_question(test_api_url)
if random_q:
print(f"Successfully fetched random question: {random_q.get('question')[:50]}...")
else:
print("Failed to fetch random question.")
print("\n--- Testing download_file (example with a known task_id and file_name if available) ---")
if questions:
test_task_with_file = None
test_file_name = None
for q_item in questions:
if q_item.get("file_name"):
test_task_with_file = q_item.get("task_id")
test_file_name = q_item.get("file_name")
break
if test_task_with_file and test_file_name:
print(f"Attempting to download file for task_id: {test_task_with_file}, file_name: {test_file_name}")
downloaded_path = download_file(test_api_url, test_task_with_file, test_file_name)
if downloaded_path:
print(f"File downloaded to: {downloaded_path}")
else:
print(f"Failed to download file for task_id: {test_task_with_file}")
else:
print("No question with an associated file found in the first batch of questions to test download.")
else:
print("Skipping download_file test as fetching questions failed.")
print("\n--- Testing download_file (with a task_id that might not have a file or invalid file_name) ---")
if questions and questions[0].get("file_name") == "":
task_id_no_file = questions[0].get("task_id")
file_name_empty = questions[0].get("file_name")
print(f"Attempting to download file for task_id: {task_id_no_file} (expected to skip due to empty file_name)")
path_no_file = download_file(test_api_url, task_id_no_file, file_name_empty)
if path_no_file is None:
print("Correctly skipped download or failed as expected for task with no file_name.")
else:
print(f"Unexpectedly downloaded something to {path_no_file} for a task with no file_name.")
else:
print("Skipping test for task with no file_name (either no questions or first question has a file).") |