Spaces:
Sleeping
Sleeping
import os | |
import re | |
import requests | |
DEFAULT_FILES_DIR = "files" # Subdirectory for task-related files | |
FILE_API_BASE_URL = "https://agents-course-unit4-scoring.hf.space/files/" | |
def _extract_filename_from_cd(cd_header: str | None) -> str | None: | |
"""Extracts filename from Content-Disposition header.""" | |
if not cd_header: | |
return None | |
# Check for filename*=UTF-8''<encoded_filename> | |
fname_star_match = re.search( | |
r"filename\*=UTF-8''([^';\s]+)", cd_header, re.IGNORECASE) | |
if fname_star_match: | |
return requests.utils.unquote(fname_star_match.group(1)) | |
# Check for filename="<filename>" | |
fname_match = re.search(r'filename="([^"]+)"', cd_header, re.IGNORECASE) | |
if fname_match: | |
return fname_match.group(1) | |
# Check for plain filename=<filename> | |
fname_plain_match = re.search( | |
r'filename=([^;"]+)', cd_header, re.IGNORECASE) | |
if fname_plain_match: | |
return fname_plain_match.group(1).strip('"') | |
return None | |
def _get_extension_from_content_type(content_type: str | None) -> str | None: | |
"""Suggests a file extension based on MIME type.""" | |
if not content_type: | |
return None | |
# Simple mapping, can be expanded | |
mime_to_ext = { | |
'text/plain': '.txt', | |
'application/json': '.json', | |
'text/csv': '.csv', | |
'application/pdf': '.pdf', | |
'image/jpeg': '.jpg', | |
'image/png': '.png', | |
'text/x-python': '.py', | |
# Often used as a generic, extension might be in filename | |
'application/octet-stream': '' | |
} | |
# Get the main type/subtype part | |
main_type = content_type.split(';')[0].strip().lower() | |
return mime_to_ext.get(main_type) | |
def get_task_file_path(task_id: str, local_files_dir: str = DEFAULT_FILES_DIR) -> str | None: | |
""" | |
Checks for a local file starting with task_id in the specified directory. | |
If not found, attempts to download it from the standard API. | |
Returns the full absolute path to the file if found or successfully downloaded, otherwise None. | |
Prints progress and errors to stdout. | |
""" | |
os.makedirs(local_files_dir, exist_ok=True) | |
# 1. Check for existing local file whose name starts with the task_id | |
try: | |
for filename in os.listdir(local_files_dir): | |
if filename.startswith(task_id): | |
full_path = os.path.abspath( | |
os.path.join(local_files_dir, filename)) | |
print( | |
f"FileHandler: Found existing local file for task {task_id}: {full_path}") | |
return full_path | |
except OSError as e: | |
print( | |
f"FileHandler: Notice - Error listing files in {local_files_dir} (will attempt download): {e}") | |
# 2. If not found locally, attempt to download | |
file_api_url = f"{FILE_API_BASE_URL}{task_id}" | |
print( | |
f"FileHandler: Local file for task {task_id} not found. Attempting download from: {file_api_url}") | |
try: | |
with requests.Session() as session: | |
# Increased timeout slightly | |
response = session.get( | |
file_api_url, timeout=15, allow_redirects=True) | |
if response.status_code == 200: | |
if not response.content: # Check if the content is empty | |
print( | |
f"FileHandler: File indicated for task {task_id} but server sent no content (empty file). Not saving.") | |
return None | |
cd_header = response.headers.get('Content-Disposition') | |
original_filename = _extract_filename_from_cd(cd_header) | |
# Determine a sane filename | |
if original_filename: | |
sane_filename_base = os.path.basename(original_filename) | |
else: # Fallback if no Content-Disposition filename | |
content_type = response.headers.get('Content-Type') | |
extension = _get_extension_from_content_type( | |
content_type) or '' | |
# Default name if no CD | |
sane_filename_base = f"{task_id}_downloaded{extension}" | |
print( | |
f"FileHandler: No filename in Content-Disposition for {task_id}. Using fallback: {sane_filename_base}") | |
# Ensure the filename starts with task_id for consistent local finding later | |
if not sane_filename_base.startswith(task_id): | |
sane_filename = f"{task_id}_{sane_filename_base}" | |
else: | |
sane_filename = sane_filename_base | |
file_path = os.path.join(local_files_dir, sane_filename) | |
with open(file_path, 'wb') as f: | |
f.write(response.content) | |
abs_path = os.path.abspath(file_path) | |
print( | |
f"FileHandler: File '{sane_filename}' for task {task_id} downloaded to '{abs_path}'. Size: {len(response.content)} bytes.") | |
return abs_path | |
elif response.status_code == 404: | |
print( | |
f"FileHandler: No file found for task_id {task_id} at API (HTTP 404 Not Found).") | |
return None | |
else: | |
print( | |
f"FileHandler: Failed to download file for task {task_id}. Server responded with HTTP status {response.status_code}.") | |
return None | |
except requests.exceptions.Timeout: | |
print( | |
f"FileHandler: Request timed out while trying to download file for task ID '{task_id}'.") | |
return None | |
except requests.exceptions.RequestException as e: | |
print( | |
f"FileHandler: An error occurred during file download for task ID '{task_id}': {type(e).__name__} - {e}.") | |
return None | |
except IOError as e: # Catch errors during file writing | |
print( | |
f"FileHandler: An IO error occurred while saving the file for task ID '{task_id}': {e}") | |
return None | |