|
from smolagents import DuckDuckGoSearchTool |
|
import random |
|
from huggingface_hub import list_models |
|
import pandas as pd |
|
import numpy as np |
|
from typing import TypedDict, Annotated, Union, Dict, Any |
|
import base64 |
|
from langchain_core.messages import HumanMessage |
|
from langchain_openai import ChatOpenAI |
|
import requests |
|
import os |
|
import subprocess |
|
import tempfile |
|
import openai |
|
|
|
def download_file(task_id: str, file_name: str) -> str: |
|
"""Downloads a file associated with a task_id and returns the local file path""" |
|
try: |
|
|
|
os.makedirs("downloads", exist_ok=True) |
|
|
|
|
|
file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}" |
|
response = requests.get(file_url) |
|
response.raise_for_status() |
|
|
|
|
|
local_path = os.path.join("downloads", file_name) |
|
with open(local_path, "wb") as f: |
|
f.write(response.content) |
|
|
|
return local_path |
|
|
|
except Exception as e: |
|
return f"Error downloading file: {str(e)}" |
|
|
|
def get_hub_stats(author: str) -> str: |
|
"""Fetches the most downloaded model from a specific author on the Hugging Face Hub.""" |
|
try: |
|
|
|
models = list(list_models(author=author, sort="downloads", direction=-1, limit=1)) |
|
|
|
if models: |
|
model = models[0] |
|
return f"The most downloaded model by {author} is {model.id} with {model.downloads:,} downloads." |
|
else: |
|
return f"No models found for author {author}." |
|
except Exception as e: |
|
return f"Error fetching models for {author}: {str(e)}" |
|
|
|
|
|
def get_image_mime_type(image_path: str) -> str: |
|
"""Detect the MIME type of an image file""" |
|
import os |
|
_, ext = os.path.splitext(image_path.lower()) |
|
|
|
mime_types = { |
|
'.jpg': 'image/jpeg', |
|
'.jpeg': 'image/jpeg', |
|
'.png': 'image/png', |
|
'.gif': 'image/gif', |
|
'.bmp': 'image/bmp', |
|
'.webp': 'image/webp' |
|
} |
|
|
|
return mime_types.get(ext, 'image/jpeg') |
|
|
|
def encode_image_to_base64(image_path: str) -> tuple[str, str]: |
|
"""Convert image file to base64 string and return with MIME type""" |
|
try: |
|
with open(image_path, "rb") as image_file: |
|
base64_data = base64.b64encode(image_file.read()).decode('utf-8') |
|
mime_type = get_image_mime_type(image_path) |
|
return base64_data, mime_type |
|
except Exception as e: |
|
raise Exception(f"Error encoding image: {e}") |
|
|
|
def analyze_image(image_path: str, question: str = "What do you see in this image?") -> str: |
|
"""Analyze an image using LangChain's ChatOpenAI with vision""" |
|
try: |
|
|
|
|
|
|
|
vision_llm = ChatOpenAI(model="gpt-4.1") |
|
|
|
|
|
base64_image, mime_type = encode_image_to_base64(image_path) |
|
|
|
|
|
message = HumanMessage( |
|
content=[ |
|
{"type": "text", "text": question}, |
|
{ |
|
"type": "image_url", |
|
"image_url": { |
|
"url": f"data:{mime_type};base64,{base64_image}", |
|
"detail": "high" |
|
} |
|
} |
|
] |
|
) |
|
|
|
|
|
response = vision_llm.invoke([message]) |
|
return response.content |
|
|
|
except Exception as e: |
|
return f"Error analyzing image: {e}" |
|
|
|
def read_excel_file(file_path: str) -> Dict[str, Any]: |
|
""" |
|
Reads an Excel file and returns structured information about its contents |
|
|
|
Args: |
|
file_path: Path to the Excel file |
|
|
|
Returns: |
|
Dictionary containing file analysis |
|
""" |
|
try: |
|
|
|
excel_file = pd.ExcelFile(file_path) |
|
sheet_names = excel_file.sheet_names |
|
|
|
result = { |
|
"file_path": file_path, |
|
"sheet_names": sheet_names, |
|
"sheets_data": {}, |
|
"summary": {} |
|
} |
|
|
|
for sheet_name in sheet_names: |
|
df = pd.read_excel(file_path, sheet_name=sheet_name) |
|
|
|
|
|
sheet_info = { |
|
"shape": df.shape, |
|
"columns": df.columns.tolist(), |
|
"dtypes": df.dtypes.to_dict(), |
|
"data": df, |
|
"sample_data": df.head().to_dict(), |
|
"numeric_columns": df.select_dtypes(include=[np.number]).columns.tolist(), |
|
"text_columns": df.select_dtypes(include=['object']).columns.tolist() |
|
} |
|
|
|
result["sheets_data"][sheet_name] = sheet_info |
|
|
|
return result |
|
|
|
except Exception as e: |
|
return {"error": f"Failed to read Excel file: {str(e)}"} |
|
|
|
def execute_python_code(file_path: str, timeout: int = 60) -> str: |
|
"""Execute Python code safely with subprocess""" |
|
try: |
|
|
|
if not os.path.exists(file_path): |
|
|
|
alt_path = os.path.join("downloads", os.path.basename(file_path)) |
|
if os.path.exists(alt_path): |
|
file_path = alt_path |
|
else: |
|
return f"Error: File not found at {file_path} or {alt_path}" |
|
|
|
|
|
file_path = os.path.abspath(file_path) |
|
|
|
|
|
result = subprocess.run( |
|
['python', file_path], |
|
capture_output=True, |
|
text=True, |
|
timeout=timeout, |
|
cwd=os.path.dirname(file_path) |
|
) |
|
|
|
if result.returncode == 0: |
|
return result.stdout.strip() |
|
else: |
|
return f"Error: {result.stderr}" |
|
|
|
except subprocess.TimeoutExpired: |
|
return "Error: Code execution timed out" |
|
except Exception as e: |
|
return f"Error executing code: {str(e)}" |
|
|
|
def transcribe_audio(file_path: str) -> str: |
|
"""Transcribe audio file using OpenAI Whisper""" |
|
try: |
|
with open(file_path, "rb") as audio_file: |
|
transcript = openai.audio.transcriptions.create( |
|
model="whisper-1", |
|
file=audio_file |
|
) |
|
return transcript.text |
|
except Exception as e: |
|
return f"Error transcribing audio: {str(e)}" |