File size: 6,837 Bytes
7acd6e0
85ffe35
 
39d66fb
 
a4095dd
dc2af28
 
 
94d5f0f
 
8abdc5b
 
89fade4
94d5f0f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
85ffe35
6c9b377
 
 
 
 
85ffe35
6c9b377
 
 
 
 
 
1fc43d8
 
 
3a5f02a
 
 
 
1fc43d8
3a5f02a
 
 
 
 
 
 
 
1fc43d8
3a5f02a
 
 
 
 
 
 
 
 
 
 
 
2cd5139
3a5f02a
1fc43d8
 
64e38db
09fb856
 
1fc43d8
3a5f02a
 
1fc43d8
3a5f02a
1fc43d8
 
 
 
 
 
3a5f02a
1fc43d8
 
 
 
 
 
 
 
 
 
 
39d66fb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8abdc5b
 
4e87a06
8abdc5b
 
6795c0e
 
 
 
 
 
 
 
 
 
 
 
8abdc5b
 
 
 
 
 
6795c0e
8abdc5b
 
 
 
 
 
 
 
 
 
89fade4
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
from smolagents import DuckDuckGoSearchTool
import random
from huggingface_hub import list_models
import pandas as pd
import numpy as np
from typing import TypedDict, Annotated, Union, Dict, Any
import base64
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
import requests
import os
import subprocess
import tempfile
import openai

def download_file(task_id: str, file_name: str) -> str:
    """Downloads a file associated with a task_id and returns the local file path"""
    try:
        # Create downloads directory if it doesn't exist
        os.makedirs("downloads", exist_ok=True)
        
        # Download the file
        file_url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}"
        response = requests.get(file_url)
        response.raise_for_status()
        
        # Save the file locally
        local_path = os.path.join("downloads", file_name)
        with open(local_path, "wb") as f:
            f.write(response.content)
            
        return local_path
        
    except Exception as e:
        return f"Error downloading file: {str(e)}"

def get_hub_stats(author: str) -> str:
    """Fetches the most downloaded model from a specific author on the Hugging Face Hub."""
    try:
        # List models from the specified author, sorted by downloads
        models = list(list_models(author=author, sort="downloads", direction=-1, limit=1))

        if models:
            model = models[0]
            return f"The most downloaded model by {author} is {model.id} with {model.downloads:,} downloads."
        else:
            return f"No models found for author {author}."
    except Exception as e:
        return f"Error fetching models for {author}: {str(e)}"


def get_image_mime_type(image_path: str) -> str:
    """Detect the MIME type of an image file"""
    import os
    _, ext = os.path.splitext(image_path.lower())
    
    mime_types = {
        '.jpg': 'image/jpeg',
        '.jpeg': 'image/jpeg', 
        '.png': 'image/png',
        '.gif': 'image/gif',
        '.bmp': 'image/bmp',
        '.webp': 'image/webp'
    }
    
    return mime_types.get(ext, 'image/jpeg')  # Default to jpeg if unknown

def encode_image_to_base64(image_path: str) -> tuple[str, str]:
    """Convert image file to base64 string and return with MIME type"""
    try:
        with open(image_path, "rb") as image_file:
            base64_data = base64.b64encode(image_file.read()).decode('utf-8')
            mime_type = get_image_mime_type(image_path)
            return base64_data, mime_type
    except Exception as e:
        raise Exception(f"Error encoding image: {e}")

def analyze_image(image_path: str, question: str = "What do you see in this image?") -> str:
    """Analyze an image using LangChain's ChatOpenAI with vision"""
    try:
        # Create vision-capable LLM
        # vision_llm = ChatOpenAI(model="gpt-4o", max_tokens=1000)
        # vision_llm = ChatOpenAI(model="gpt-4.1", max_tokens=1000)
        vision_llm = ChatOpenAI(model="gpt-4.1")
        
        # Encode the image and get correct MIME type
        base64_image, mime_type = encode_image_to_base64(image_path)
        
        # Create message with image using correct MIME type
        message = HumanMessage(
            content=[
                {"type": "text", "text": question},
                {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:{mime_type};base64,{base64_image}",
                        "detail": "high"
                    }
                }
            ]
        )
        
        # Get response
        response = vision_llm.invoke([message])
        return response.content
        
    except Exception as e:
        return f"Error analyzing image: {e}"

def read_excel_file(file_path: str) -> Dict[str, Any]:
    """
    Reads an Excel file and returns structured information about its contents
    
    Args:
        file_path: Path to the Excel file
        
    Returns:
        Dictionary containing file analysis
    """
    try:
        # Read all sheets
        excel_file = pd.ExcelFile(file_path)
        sheet_names = excel_file.sheet_names
        
        result = {
            "file_path": file_path,
            "sheet_names": sheet_names,
            "sheets_data": {},
            "summary": {}
        }
        
        for sheet_name in sheet_names:
            df = pd.read_excel(file_path, sheet_name=sheet_name)
            
            # Basic info about the sheet
            sheet_info = {
                "shape": df.shape,
                "columns": df.columns.tolist(),
                "dtypes": df.dtypes.to_dict(),
                "data": df,
                "sample_data": df.head().to_dict(),
                "numeric_columns": df.select_dtypes(include=[np.number]).columns.tolist(),
                "text_columns": df.select_dtypes(include=['object']).columns.tolist()
            }
            
            result["sheets_data"][sheet_name] = sheet_info
        
        return result
        
    except Exception as e:
        return {"error": f"Failed to read Excel file: {str(e)}"}

def execute_python_code(file_path: str, timeout: int = 60) -> str:
    """Execute Python code safely with subprocess"""
    try:
        # Check if file exists, if not try common locations
        if not os.path.exists(file_path):
            # Try in downloads directory
            alt_path = os.path.join("downloads", os.path.basename(file_path))
            if os.path.exists(alt_path):
                file_path = alt_path
            else:
                return f"Error: File not found at {file_path} or {alt_path}"
        
        # Ensure we have absolute path
        file_path = os.path.abspath(file_path)
        
        # Run in isolated subprocess with timeout
        result = subprocess.run(
            ['python', file_path],
            capture_output=True,
            text=True,
            timeout=timeout,
            cwd=os.path.dirname(file_path)  # Run in the file's directory
        )
        
        if result.returncode == 0:
            return result.stdout.strip()
        else:
            return f"Error: {result.stderr}"
            
    except subprocess.TimeoutExpired:
        return "Error: Code execution timed out"
    except Exception as e:
        return f"Error executing code: {str(e)}"

def transcribe_audio(file_path: str) -> str:
    """Transcribe audio file using OpenAI Whisper"""
    try:
        with open(file_path, "rb") as audio_file:
            transcript = openai.audio.transcriptions.create(
                model="whisper-1",
                file=audio_file
            )
        return transcript.text
    except Exception as e:
        return f"Error transcribing audio: {str(e)}"