Spaces:
Sleeping
Sleeping
| """ | |
| Utility functions for the GAIA Agent | |
| """ | |
| import os | |
| import re | |
| import shutil | |
| import urllib.parse | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from config import DEFAULT_API_URL, QUESTION_TYPES | |
| def clean_ansi_codes(text): | |
| """Remove ANSI color codes from terminal output.""" | |
| ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') | |
| return ansi_escape.sub('', text) | |
| def clean_answer(answer): | |
| """Clean the agent response by removing unnecessary formatting.""" | |
| answer = str(answer).strip() | |
| patterns_to_remove = [ | |
| (r'^Final Answer:\s*', ''), | |
| (r'^Answer:\s*', ''), | |
| (r'^The answer is\s*', ''), | |
| (r'^Based on[^,]*,\s*', ''), | |
| (r'```', ''), | |
| (r'\*\*', ''), | |
| (r'^##\s*', '') | |
| ] | |
| for pattern, replacement in patterns_to_remove: | |
| answer = re.sub(pattern, replacement, answer, flags=re.IGNORECASE) | |
| return answer.strip() | |
| def detect_question_type(question, file_name): | |
| """ | |
| Detect the question type to apply a specific strategy. | |
| Args: | |
| question: The question text | |
| file_name: Name of the attached file (if any) | |
| Returns: | |
| str: Question type (see QUESTION_TYPES in config.py) | |
| """ | |
| q_lower = question.lower() | |
| if "youtube.com" in question or "youtu.be" in question: | |
| return QUESTION_TYPES['YOUTUBE_VIDEO'] | |
| elif file_name and file_name.endswith(".png"): | |
| return QUESTION_TYPES['IMAGE_FILE'] | |
| elif file_name and file_name.endswith(".mp3"): | |
| return QUESTION_TYPES['AUDIO_FILE'] | |
| elif file_name and (file_name.endswith(".xlsx") or file_name.endswith(".csv")): | |
| return QUESTION_TYPES['DATA_FILE'] | |
| elif file_name and file_name.endswith(".py"): | |
| return QUESTION_TYPES['CODE_FILE'] | |
| elif "wikipedia" in q_lower: | |
| return QUESTION_TYPES['WIKIPEDIA'] | |
| elif any(word in q_lower for word in ["how many", "count", "number of"]): | |
| return QUESTION_TYPES['COUNTING'] | |
| elif "reverse" in q_lower or "backwards" in q_lower or ".rewsna" in question: | |
| return QUESTION_TYPES['TEXT_MANIPULATION'] | |
| else: | |
| return QUESTION_TYPES['GENERAL'] | |
| def download_file_for_task(task_id): | |
| """ | |
| Download the attached file for a task if it exists. | |
| Args: | |
| task_id: The task ID | |
| Returns: | |
| str: Path to downloaded file or None if no file exists | |
| """ | |
| file_url = f"{DEFAULT_API_URL}/files/{task_id}" | |
| try: | |
| response = requests.get(file_url, stream=True, timeout=30) | |
| if response.status_code == 200: | |
| filename = f"file_{task_id}" | |
| # Get real filename from header | |
| if "content-disposition" in response.headers: | |
| cd = response.headers["content-disposition"] | |
| if "filename=" in cd: | |
| filename = cd.split("filename=")[1].strip('"') | |
| # Ensure correct extension | |
| if "." not in filename: | |
| content_type = response.headers.get("content-type", "") | |
| if "excel" in content_type or "spreadsheet" in content_type: | |
| filename += ".xlsx" | |
| elif "audio" in content_type or "mpeg" in content_type: | |
| filename += ".mp3" | |
| elif "image" in content_type or "png" in content_type: | |
| filename += ".png" | |
| elif "python" in content_type: | |
| filename += ".py" | |
| with open(filename, 'wb') as f: | |
| shutil.copyfileobj(response.raw, f) | |
| print(f" ✓ File downloaded: {filename} ({os.path.getsize(filename)} bytes)") | |
| return filename | |
| except Exception as e: | |
| print(f" ✗ Error downloading file: {e}") | |
| return None | |
| def fetch_and_download_links(url, dest_dir, max_files=20): | |
| """ | |
| Download linked resources from a URL. | |
| Args: | |
| url: URL of the page to scan | |
| dest_dir: Destination directory for files | |
| max_files: Maximum number of files to download | |
| Returns: | |
| list: List of downloaded file paths | |
| """ | |
| downloaded = [] | |
| try: | |
| os.makedirs(dest_dir, exist_ok=True) | |
| resp = requests.get(url, timeout=20) | |
| resp.raise_for_status() | |
| soup = BeautifulSoup(resp.text, "lxml") | |
| candidates = [] | |
| for tag in soup.find_all(['a', 'link']): | |
| href = tag.get('href') | |
| if href: | |
| candidates.append(href) | |
| for tag in soup.find_all(['img', 'script', 'source']): | |
| src = tag.get('src') | |
| if src: | |
| candidates.append(src) | |
| seen = set() | |
| allowed_exts = {'.png', '.jpg', '.jpeg', '.gif', '.svg', '.pdf', '.zip', | |
| '.mp3', '.mp4', '.py', '.txt', '.csv', '.xlsx', '.xls'} | |
| for c in candidates: | |
| if len(downloaded) >= max_files: | |
| break | |
| full = urllib.parse.urljoin(url, c) | |
| if full in seen: | |
| continue | |
| seen.add(full) | |
| path = urllib.parse.urlparse(full).path | |
| ext = os.path.splitext(path)[1].lower() | |
| if ext in allowed_exts: | |
| try: | |
| r = requests.get(full, stream=True, timeout=20) | |
| r.raise_for_status() | |
| cd = r.headers.get('content-disposition') | |
| if cd and 'filename=' in cd: | |
| fname = cd.split('filename=')[1].strip('"') | |
| else: | |
| fname = os.path.basename(path) or f"resource_{len(downloaded)}{ext}" | |
| out_path = os.path.join(dest_dir, fname) | |
| with open(out_path, 'wb') as of: | |
| shutil.copyfileobj(r.raw, of) | |
| downloaded.append(out_path) | |
| except Exception: | |
| continue | |
| except Exception: | |
| pass | |
| return downloaded | |