Spaces:
Sleeping
Sleeping
| import os | |
| import requests | |
| import pandas as pd | |
| from tools._session import _session | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data") | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| def _parse_file(file_path: str, content_bytes: bytes, ext: str) -> str: | |
| """Parse file content based on extension and return as string.""" | |
| try: | |
| if ext == ".csv": | |
| df = pd.read_csv(file_path) | |
| return f"CSV file ({len(df)} rows, {len(df.columns)} columns):\n{df.to_string(index=False)}" | |
| elif ext in (".xlsx", ".xls"): | |
| # Read all sheets | |
| xl = pd.ExcelFile(file_path) | |
| parts = [] | |
| for sheet in xl.sheet_names: | |
| df = xl.parse(sheet) | |
| parts.append(f"Sheet '{sheet}' ({len(df)} rows, {len(df.columns)} columns):\n{df.to_string(index=False)}") | |
| return "\n\n".join(parts) | |
| elif ext in (".py", ".txt", ".md", ".json", ".xml", ".html", ""): | |
| return f"File contents:\n{content_bytes.decode('utf-8', errors='replace')[:5000]}" | |
| else: | |
| try: | |
| return f"File contents:\n{content_bytes.decode('utf-8', errors='replace')[:5000]}" | |
| except Exception: | |
| return f"Binary file, cannot display as text. Size: {len(content_bytes)} bytes." | |
| except Exception as e: | |
| return f"Failed to parse file: {e}" | |
| def prefetch_file(task_id: str) -> str | None: | |
| """ | |
| Try to download the file for a task_id. | |
| Returns parsed file content string if found, None if no attachment exists. | |
| Caches file to data/ directory. | |
| """ | |
| # Check cache first | |
| cached = [f for f in os.listdir(DATA_DIR) if f.startswith(task_id)] | |
| if cached: | |
| file_path = os.path.join(DATA_DIR, cached[0]) | |
| ext = os.path.splitext(cached[0])[-1].lower() | |
| with open(file_path, "rb") as f: | |
| content_bytes = f.read() | |
| return _parse_file(file_path, content_bytes, ext) | |
| file_url = f"{DEFAULT_API_URL}/files/{task_id}" | |
| try: | |
| response = _session.get(file_url, timeout=30) | |
| if response.status_code == 404: | |
| return None | |
| response.raise_for_status() | |
| except Exception: | |
| return None | |
| # Determine extension | |
| ext = "" | |
| cd = response.headers.get("content-disposition", "") | |
| if "filename=" in cd: | |
| fname = cd.split("filename=")[-1].strip().strip('"') | |
| ext = os.path.splitext(fname)[-1].lower() | |
| content_type = response.headers.get("content-type", "") | |
| if not ext: | |
| if "csv" in content_type: | |
| ext = ".csv" | |
| elif "excel" in content_type or "spreadsheet" in content_type or "openxmlformats" in content_type: | |
| ext = ".xlsx" | |
| elif "text" in content_type: | |
| ext = ".txt" | |
| # Save to data/ | |
| file_path = os.path.join(DATA_DIR, f"{task_id}{ext}") | |
| with open(file_path, "wb") as f: | |
| f.write(response.content) | |
| return _parse_file(file_path, response.content, ext) | |
| def download_and_read_file(task_id: str) -> str: | |
| """Download and read a file attachment for a given task_id. | |
| Supports CSV, Excel (.xlsx/.xls), and plain text files. | |
| """ | |
| result = prefetch_file(task_id) | |
| if result is None: | |
| return "No file attachment found for this task." | |
| return result | |